| /*****************************************************************************\ |
| * srun.c - user interface to allocate resources, submit jobs, and execute |
| * parallel jobs. |
| ***************************************************************************** |
| * Copyright (C) 2002-2006 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Mark Grondona <grondona@llnl.gov>, et. al. |
| * UCRL-CODE-226842. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://www.llnl.gov/linux/slurm/>. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #ifdef WITH_PTHREADS |
| # include <pthread.h> |
| #endif |
| |
| #ifdef HAVE_AIX |
| # undef HAVE_UNSETENV |
| # include <sys/checkpnt.h> |
| #endif |
| #ifndef HAVE_UNSETENV |
| # include "src/common/unsetenv.h" |
| #endif |
| |
| #include <sys/resource.h> |
| #include <sys/stat.h> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <sys/utsname.h> |
| #include <sys/wait.h> |
| #include <ctype.h> |
| #include <fcntl.h> |
| #include <pwd.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <signal.h> |
| #include <unistd.h> |
| #include <fcntl.h> |
| #include <grp.h> |
| |
| #include "src/common/fd.h" |
| #include "src/common/log.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/switch.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xsignal.h" |
| #include "src/common/xstring.h" |
| #include "src/common/net.h" |
| #include "src/common/mpi.h" |
| #include "src/common/slurm_rlimits_info.h" |
| #include "src/common/plugstack.h" |
| |
| #include "src/srun/allocate.h" |
| #include "src/srun/srun_job.h" |
| #include "src/srun/launch.h" |
| #include "src/srun/msg.h" |
| #include "src/srun/opt.h" |
| #include "src/srun/sigstr.h" |
| #include "src/srun/reattach.h" |
| #include "src/srun/attach.h" |
| #include "src/srun/srun.h" |
| #include "src/srun/signals.h" |
| |
| #define MAX_RETRIES 20 |
| #define MAX_ENTRIES 50 |
| |
| #define TYPE_NOT_TEXT 0 |
| #define TYPE_TEXT 1 |
| #define TYPE_SCRIPT 2 |
| |
| mpi_plugin_client_info_t mpi_job_info[1]; |
| pid_t srun_ppid = 0; |
| |
| /* |
| * forward declaration of static funcs |
| */ |
| static void _print_job_information(resource_allocation_response_msg_t *resp); |
| static char *_build_script (const char *argv0, char *pathname, int file_type); |
| static char *_get_shell (void); |
| static void _send_options(const int argc, char **argv); |
| static void _get_options (const char *buffer); |
| static char *_get_token(char *buf_ptr); |
| static int _is_file_text (char *, char**); |
| static int _run_batch_job (const char *argv0); |
| static int _run_job_script(srun_job_t *job, env_t *env); |
| static void _set_prio_process_env(void); |
| static int _set_rlimit_env(void); |
| static int _set_umask_env(void); |
| static char *_uint16_array_to_str(int count, const uint16_t *array); |
| static void _switch_standalone(srun_job_t *job); |
| static int _become_user (void); |
| static int _print_script_exit_status(const char *argv0, int status); |
| static void _run_srun_prolog (srun_job_t *job); |
| static void _run_srun_epilog (srun_job_t *job); |
| static int _run_srun_script (srun_job_t *job, char *script); |
| static int _change_rlimit_rss(void); |
| static int _slurm_debug_env_val (void); |
| static int _call_spank_local_user (srun_job_t *job); |
| static void _define_symbols(void); |
| |
| int srun(int ac, char **av) |
| { |
| resource_allocation_response_msg_t *resp; |
| srun_job_t *job = NULL; |
| int exitcode = 0, debug_level; |
| env_t *env = xmalloc(sizeof(env_t)); |
| uint32_t job_id = 0; |
| log_options_t logopt = LOG_OPTS_STDERR_ONLY; |
| slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER; |
| char **mpi_env = NULL; |
| mpi_plugin_client_state_t *mpi_state; |
| |
| env->stepid = -1; |
| env->procid = -1; |
| env->localid = -1; |
| env->nodeid = -1; |
| env->cli = NULL; |
| env->env = NULL; |
| |
| debug_level = _slurm_debug_env_val(); |
| logopt.stderr_level += debug_level; |
| log_init(xbasename(av[0]), logopt, 0, NULL); |
| |
| /* Initialize plugin stack, read options from plugins, etc. |
| */ |
| if (spank_init(NULL) < 0) { |
| fatal("Plug-in initialization failed"); |
| _define_symbols(); |
| } |
| |
| /* Be sure to call spank_fini when srun exits. |
| */ |
| if (atexit((void (*) (void)) spank_fini) < 0) |
| error("Failed to register atexit handler for plugins: %m"); |
| |
| /* set default options, process commandline arguments, and |
| * verify some basic values |
| */ |
| if (initialize_and_process_args(ac, av) < 0) { |
| error ("srun initialization failed"); |
| exit (1); |
| } |
| srun_ppid = getppid(); |
| |
| /* reinit log with new verbosity (if changed by command line) |
| */ |
| if (_verbose || opt.quiet) { |
| /* If log level is already increased, only increment the |
| * level to the difference of _verbose an LOG_LEVEL_INFO |
| */ |
| if ((_verbose -= (logopt.stderr_level - LOG_LEVEL_INFO)) > 0) |
| logopt.stderr_level += _verbose; |
| logopt.stderr_level -= opt.quiet; |
| logopt.prefix_level = 1; |
| log_alter(logopt, 0, NULL); |
| } else |
| _verbose = debug_level; |
| |
| if (!opt.allocate) { |
| (void) _set_rlimit_env(); |
| _set_prio_process_env(); |
| (void) _set_umask_env(); |
| } |
| /* Set up slurmctld message handler */ |
| slurmctld_msg_init(); |
| |
| /* now global "opt" should be filled in and available, |
| * create a job from opt |
| */ |
| if (opt.test_only) { |
| int rc = allocate_test(); |
| if (rc) { |
| slurm_perror("allocation failure"); |
| exit (1); |
| } |
| info("allocation success"); |
| exit (0); |
| |
| } else if (opt.batch) { |
| /* allow binding with batch submissions */ |
| env->distribution = opt.distribution; |
| env->cpu_bind_type = opt.cpu_bind_type; |
| env->cpu_bind = opt.cpu_bind; |
| env->mem_bind_type = opt.mem_bind_type; |
| env->mem_bind = opt.mem_bind; |
| setup_env(env); |
| |
| if (_run_batch_job(av[0]) < 0) |
| exit (1); |
| exit (0); |
| |
| } else if (opt.no_alloc) { |
| info("do not allocate resources"); |
| sig_setup_sigmask(); |
| job = job_create_noalloc(); |
| _switch_standalone(job); |
| |
| } else if (opt.allocate) { |
| sig_setup_sigmask(); |
| if ( !(resp = allocate_nodes()) ) |
| exit(1); |
| if (opt.noshell) { |
| fprintf (stdout, "SLURM_JOBID=%u\n", resp->job_id); |
| exit (0); |
| } |
| if (_become_user () < 0) |
| info ("Warning: unable to assume uid=%lu\n", opt.uid); |
| _print_job_information(resp); |
| |
| job = job_create_allocation(resp); |
| if(!job) |
| exit(1); |
| |
| job->step_layout = |
| fake_slurm_step_layout_create(resp->node_list, |
| resp->cpus_per_node, |
| resp->cpu_count_reps, |
| resp->node_cnt, 0); |
| if(!job->step_layout) |
| exit(1); |
| if (msg_thr_create(job) < 0) |
| job_fatal(job, "Unable to create msg thread"); |
| exitcode = _run_job_script(job, env); |
| |
| /* close up the msg thread cleanly */ |
| close(job->forked_msg->msg_par->msg_pipe[1]); |
| debug2("Waiting for message thread"); |
| if (pthread_join(job->jtid, NULL) < 0) |
| error ("Waiting on message thread: %m"); |
| debug2("done"); |
| |
| srun_job_destroy(job,exitcode); |
| |
| debug ("Spawned srun shell terminated"); |
| xfree(env->task_count); |
| xfree(env); |
| exit (exitcode); |
| |
| } else if ((resp = existing_allocation())) { |
| job_id = resp->job_id; |
| if (opt.alloc_nodelist == NULL) |
| opt.alloc_nodelist = xstrdup(resp->node_list); |
| |
| if (opt.allocate) { |
| error("job %u already has an allocation", |
| job_id); |
| slurm_free_resource_allocation_response_msg(resp); |
| exit(1); |
| } |
| |
| job = job_step_create_allocation(resp); |
| slurm_free_resource_allocation_response_msg(resp); |
| |
| if(!job) |
| exit(1); |
| |
| job->old_job = true; |
| sig_setup_sigmask(); |
| |
| if (create_job_step(job) < 0) |
| exit(1); |
| } else if (mode == MODE_ATTACH) { |
| reattach(); |
| exit (0); |
| } else { |
| /* Combined job allocation and job step launch */ |
| #ifdef HAVE_FRONT_END |
| uid_t my_uid = getuid(); |
| if ((my_uid != 0) |
| && (my_uid != slurm_get_slurm_user_id())) { |
| error("srun task launch not supported on this system"); |
| exit(1); |
| } |
| #endif |
| if (opt.job_max_memory > 0) { |
| (void) _change_rlimit_rss(); |
| } |
| sig_setup_sigmask(); |
| if ( !(resp = allocate_nodes()) ) |
| exit(1); |
| _print_job_information(resp); |
| job = job_create_allocation(resp); |
| if(!job) |
| exit(1); |
| if (create_job_step(job) < 0) { |
| srun_job_destroy(job, 0); |
| exit(1); |
| } |
| |
| slurm_free_resource_allocation_response_msg(resp); |
| } |
| |
| /* |
| * Become --uid user |
| */ |
| if (_become_user () < 0) |
| info ("Warning: Unable to assume uid=%lu\n", opt.uid); |
| |
| /* job structure should now be filled in */ |
| |
| if (_call_spank_local_user (job) < 0) |
| job_fatal(job, "Failure in local plugin stack"); |
| |
| /* |
| * Enhance environment for job |
| */ |
| env->nprocs = opt.nprocs; |
| env->cpus_per_task = opt.cpus_per_task; |
| if (opt.ntasks_per_node != NO_VAL) |
| env->ntasks_per_node = opt.ntasks_per_node; |
| if (opt.ntasks_per_socket != NO_VAL) |
| env->ntasks_per_socket = opt.ntasks_per_socket; |
| if (opt.ntasks_per_core != NO_VAL) |
| env->ntasks_per_core = opt.ntasks_per_core; |
| env->distribution = opt.distribution; |
| if (opt.plane_size != NO_VAL) |
| env->plane_size = opt.plane_size; |
| env->cpu_bind_type = opt.cpu_bind_type; |
| env->cpu_bind = opt.cpu_bind; |
| env->mem_bind_type = opt.mem_bind_type; |
| env->mem_bind = opt.mem_bind; |
| env->overcommit = opt.overcommit; |
| env->slurmd_debug = opt.slurmd_debug; |
| env->labelio = opt.labelio; |
| env->comm_port = slurmctld_comm_addr.port; |
| env->comm_hostname = slurmctld_comm_addr.hostname; |
| if(job) { |
| env->select_jobinfo = job->select_jobinfo; |
| env->nhosts = job->nhosts; |
| env->nodelist = job->nodelist; |
| env->task_count = _uint16_array_to_str( |
| job->nhosts, job->step_layout->tasks); |
| env->jobid = job->jobid; |
| env->stepid = job->stepid; |
| } |
| setup_env(env); |
| xfree(env->task_count); |
| xfree(env); |
| |
| _run_srun_prolog(job); |
| |
| if (msg_thr_create(job) < 0) |
| job_fatal(job, "Unable to create msg thread"); |
| |
| mpi_job_info->jobid = job->jobid; |
| mpi_job_info->stepid = job->stepid; |
| mpi_job_info->step_layout = job->step_layout; |
| if (!(mpi_state = mpi_hook_client_prelaunch(mpi_job_info, &mpi_env))) |
| job_fatal (job, "Failed to initialize MPI"); |
| env_array_set_environment(mpi_env); |
| env_array_free(mpi_env); |
| |
| srun_set_stdio_fds(job, &fds); |
| job->client_io = client_io_handler_create(fds, |
| job->step_layout->task_cnt, |
| job->step_layout->node_cnt, |
| job->cred, |
| opt.labelio); |
| if (!job->client_io |
| || (client_io_handler_start(job->client_io) != SLURM_SUCCESS)) |
| job_fatal(job, "failed to start IO handler"); |
| |
| if (sig_thr_create(job) < 0) |
| job_fatal(job, "Unable to create signals thread: %m"); |
| |
| if (launch_thr_create(job) < 0) |
| job_fatal(job, "Unable to create launch thread: %m"); |
| |
| /* wait for job to terminate |
| */ |
| slurm_mutex_lock(&job->state_mutex); |
| while (job->state < SRUN_JOB_TERMINATED) { |
| pthread_cond_wait(&job->state_cond, &job->state_mutex); |
| } |
| slurm_mutex_unlock(&job->state_mutex); |
| |
| /* job is now overdone, clean up |
| * |
| * If job is "forcefully terminated" exit immediately. |
| * |
| */ |
| if (job->state == SRUN_JOB_FORCETERM) { |
| info("Force Terminated job %u.%u", job->jobid, job->stepid); |
| srun_job_destroy(job, 0); |
| exit(1); |
| } else if (job->state == SRUN_JOB_CANCELLED) { |
| info("Cancelling job %u.%u", job->jobid, job->stepid); |
| srun_job_destroy(job, NO_VAL); |
| exit(1); |
| } else if (job->state == SRUN_JOB_FAILED) { |
| /* This check here is to check if the job failed |
| because we (srun or slurmd or slurmstepd wasn't |
| able to fork or make a thread or something we still |
| need the job failed check below incase the job |
| failed on it's own. |
| */ |
| info("Job Failed %u.%u", job->jobid, job->stepid); |
| srun_job_destroy(job, NO_VAL); |
| exit(1); |
| } |
| |
| /* |
| * We want to make sure we get the correct state of the job |
| * and not finish before all the messages have been sent. |
| */ |
| if (job->state == SRUN_JOB_FAILED) |
| close(job->forked_msg->msg_par->msg_pipe[1]); |
| debug("Waiting for message thread"); |
| if (pthread_join(job->jtid, NULL) < 0) |
| error ("Waiting on message thread: %m"); |
| debug("done"); |
| |
| /* have to check if job was cancelled here just to make sure |
| state didn't change when we were waiting for the message thread */ |
| exitcode = set_job_rc(job); |
| if (job->state == SRUN_JOB_CANCELLED) { |
| info("Cancelling job %u.%u", job->jobid, job->stepid); |
| srun_job_destroy(job, NO_VAL); |
| } else if (job->state == SRUN_JOB_FAILED) { |
| info("Terminating job %u.%u", job->jobid, job->stepid); |
| srun_job_destroy(job, job->rc); |
| } else |
| srun_job_destroy(job, job->rc); |
| |
| /* wait for launch thread */ |
| if (pthread_join(job->lid, NULL) < 0) |
| error ("Waiting on launch thread: %m"); |
| |
| /* |
| * Signal the IO thread to shutdown, which will stop |
| * the listening socket and file read (stdin) event |
| * IO objects, but allow file write (stdout) objects to |
| * complete any writing that remains. |
| */ |
| debug("Waiting for IO thread"); |
| if (client_io_handler_finish(job->client_io) != SLURM_SUCCESS) |
| error ("IO handler did not finish correctly: %m"); |
| client_io_handler_destroy(job->client_io); |
| debug("done"); |
| |
| |
| if (mpi_hook_client_fini (mpi_state) < 0) |
| ; /* eh, ignore errors here */ |
| |
| _run_srun_epilog(job); |
| |
| /* |
| * Let exit() clean up remaining threads. |
| */ |
| log_fini(); |
| exit(exitcode); |
| } |
| |
| static int _call_spank_local_user (srun_job_t *job) |
| { |
| struct spank_launcher_job_info info[1]; |
| |
| info->uid = opt.uid; |
| info->gid = opt.gid; |
| info->jobid = job->jobid; |
| info->stepid = job->stepid; |
| info->step_layout = job->step_layout; |
| info->argc = remote_argc; |
| info->argv = remote_argv; |
| |
| return spank_local_user(info); |
| } |
| |
| |
| static int _slurm_debug_env_val (void) |
| { |
| long int level = 0; |
| const char *val; |
| |
| if ((val = getenv ("SLURM_DEBUG"))) { |
| char *p; |
| if ((level = strtol (val, &p, 10)) < -LOG_LEVEL_INFO) |
| level = -LOG_LEVEL_INFO; |
| if (p && *p != '\0') |
| level = 0; |
| } |
| return ((int) level); |
| } |
| |
| /* |
| * Return a string representation of an array of uint32_t elements. |
| * Each value in the array is printed in decimal notation and elements |
| * are seperated by a comma. If sequential elements in the array |
| * contain the same value, the value is written out just once followed |
| * by "(xN)", where "N" is the number of times the value is repeated. |
| * |
| * Example: |
| * The array "1, 2, 1, 1, 1, 3, 2" becomes the string "1,2,1(x3),3,2" |
| * |
| * Returns an xmalloc'ed string. Free with xfree(). |
| */ |
| static char *_uint16_array_to_str(int array_len, const uint16_t *array) |
| { |
| int i; |
| int previous = 0; |
| char *sep = ","; /* seperator */ |
| char *str = xstrdup(""); |
| |
| if(array == NULL) |
| return str; |
| |
| for (i = 0; i < array_len; i++) { |
| if ((i+1 < array_len) |
| && (array[i] == array[i+1])) { |
| previous++; |
| continue; |
| } |
| |
| if (i == array_len-1) /* last time through loop */ |
| sep = ""; |
| if (previous > 0) { |
| xstrfmtcat(str, "%u(x%u)%s", |
| array[i], previous+1, sep); |
| } else { |
| xstrfmtcat(str, "%u%s", array[i], sep); |
| } |
| previous = 0; |
| } |
| |
| return str; |
| } |
| |
| static void |
| _switch_standalone(srun_job_t *job) |
| { |
| int cyclic = (opt.distribution == SLURM_DIST_CYCLIC); |
| |
| if (switch_alloc_jobinfo(&job->switch_job) < 0) |
| fatal("switch_alloc_jobinfo: %m"); |
| if (switch_build_jobinfo(job->switch_job, |
| job->nodelist, |
| job->step_layout->tasks, |
| cyclic, opt.network) < 0) |
| fatal("switch_build_jobinfo: %m"); |
| } |
| |
| |
| static void |
| _print_job_information(resource_allocation_response_msg_t *resp) |
| { |
| int i; |
| char *str = NULL; |
| char *sep = ""; |
| |
| if (!_verbose) |
| return; |
| |
| xstrfmtcat(str, "jobid %u: nodes(%u):`%s', cpu counts: ", |
| resp->job_id, resp->node_cnt, resp->node_list); |
| |
| for (i = 0; i < resp->num_cpu_groups; i++) { |
| xstrfmtcat(str, "%s%u(x%u)", |
| sep, resp->cpus_per_node[i], |
| resp->cpu_count_reps[i]); |
| sep = ","; |
| } |
| verbose("%s", str); |
| xfree(str); |
| } |
| |
| |
| /* submit a batch job and return error code */ |
| static int |
| _run_batch_job(const char *argv0) |
| { |
| int file_type, retries = 0; |
| int rc = SLURM_SUCCESS; |
| job_desc_msg_t *req; |
| submit_response_msg_t *resp; |
| char *script; |
| static char *msg = "Slurm job queue full, sleeping and retrying."; |
| |
| if ((remote_argc == 0) || (remote_argv[0] == NULL)) |
| return SLURM_ERROR; |
| |
| file_type = _is_file_text (remote_argv[0], NULL); |
| |
| /* if (file_type == TYPE_NOT_TEXT) { |
| * error ("file %s is not script", remote_argv[0]); |
| * return SLURM_ERROR; |
| * } |
| */ |
| |
| if ((script = _build_script (argv0, remote_argv[0], file_type)) |
| == NULL) { |
| error ("unable to build script from file %s", remote_argv[0]); |
| return SLURM_ERROR; |
| } |
| |
| if (!(req = job_desc_msg_create_from_opts (script))) |
| fatal ("Unable to create job request"); |
| |
| /* Do not re-use existing job id from environment variable |
| * when submitting new job from within a running job */ |
| if (!opt.jobid_set) |
| req->job_id = NO_VAL; |
| |
| while ((rc = slurm_submit_batch_job(req, &resp)) < 0) { |
| if ((errno != ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) || |
| (retries >= MAX_RETRIES)) |
| return (error("Unable to submit batch job: %m")); |
| |
| if (retries == 0) |
| error(msg); |
| else |
| debug(msg); |
| sleep (++retries); |
| } |
| |
| |
| if (rc == SLURM_SUCCESS) { |
| if (resp->step_id == NO_VAL) |
| info ("jobid %u submitted",resp->job_id); |
| else |
| info ("jobid %u.%u submitted",resp->job_id, |
| resp->step_id); |
| if (resp->error_code) { |
| if (opt.immediate) { |
| error("Job failed: %s", |
| slurm_strerror(resp->error_code)); |
| rc = resp->error_code; |
| } else { |
| info("Warning: %s", |
| slurm_strerror(resp->error_code)); |
| } |
| } |
| slurm_free_submit_response_response_msg (resp); |
| } |
| |
| job_desc_msg_destroy (req); |
| xfree (script); |
| |
| return (rc); |
| } |
| |
| static void _send_options(const int argc, char **argv) |
| { |
| int i; |
| |
| set_options(argc, argv, 0); |
| for(i=1; i<argc; i++) { |
| debug3("argv[%d] = %s.",i,argv[i]); |
| xfree(argv[i]); |
| } |
| } |
| |
| /* _get_shell - return a string containing the default shell for this user |
| * NOTE: This function is NOT reentrant (see getpwuid_r if needed) */ |
| static char * |
| _get_shell (void) |
| { |
| struct passwd *pw_ent_ptr; |
| |
| pw_ent_ptr = getpwuid (opt.uid); |
| if ( ! pw_ent_ptr ) { |
| pw_ent_ptr = getpwnam( "nobody" ); |
| info( "warning - no user information for user %d", opt.uid ); |
| } |
| return pw_ent_ptr->pw_shell; |
| } |
| |
| static char *_get_token(char *buf_ptr) |
| { |
| int i, token_size = 0; |
| char *token; |
| |
| for (i=1; (buf_ptr[i] != '\n') && (buf_ptr[i] != '\0'); |
| i++) { |
| if (isspace(buf_ptr[i])) |
| break; |
| } |
| token_size = i; |
| |
| token = xmalloc(token_size + 1); |
| strncpy(token, buf_ptr, token_size); |
| return token; |
| } |
| |
| /* _get_opts - gather options put in user script. Used for batch scripts. */ |
| static void |
| _get_options (const char *buffer) |
| { |
| int argc = 1; |
| char *argv[MAX_ENTRIES]; |
| char *buf_loc = (char *) buffer; |
| |
| while ((buf_loc = strstr(buf_loc, "#SLURM"))) { |
| buf_loc += 6; |
| /* find the tokens and move them to argv */ |
| for ( ; ((buf_loc[0] != '\n') && (buf_loc[0] != '\0')); |
| buf_loc++) { |
| if (isspace(buf_loc[0])) |
| continue; |
| argv[argc] = _get_token(buf_loc); |
| buf_loc += (strlen(argv[argc]) - 1); |
| argc++; |
| } |
| } |
| if(argc > 1) |
| _send_options(argc, argv); |
| return; |
| } |
| |
| #define F 0 /* char never appears in text */ |
| #define T 1 /* character appears in plain ASCII text */ |
| #define I 2 /* character appears in ISO-8859 text */ |
| #define X 3 /* character appears in non-ISO extended ASCII */ |
| static char text_chars[256] = { |
| /* BEL BS HT LF FF CR */ |
| F, F, F, F, F, F, F, T, T, T, T, F, T, T, F, F, /* 0x0X */ |
| /* ESC */ |
| F, F, F, F, F, F, F, F, F, F, F, T, F, F, F, F, /* 0x1X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, /* 0x2X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, /* 0x3X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, /* 0x4X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, /* 0x5X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, /* 0x6X */ |
| T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, F, /* 0x7X */ |
| /* NEL */ |
| X, X, X, X, X, T, X, X, X, X, X, X, X, X, X, X, /* 0x8X */ |
| X, X, X, X, X, X, X, X, X, X, X, X, X, X, X, X, /* 0x9X */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, /* 0xaX */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, /* 0xbX */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, /* 0xcX */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, /* 0xdX */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, /* 0xeX */ |
| I, I, I, I, I, I, I, I, I, I, I, I, I, I, I, I /* 0xfX */ |
| }; |
| |
| |
| /* _is_file_text - determine if specified file is a script |
| * shell_ptr - if not NULL, set to pointer to pathname of specified shell |
| * (if any, ie. return code of 2) |
| * return 0 if the specified file can not be read or does not contain text |
| * returns 2 if file contains text starting with "#!", otherwise |
| * returns 1 if file contains text, but lacks "#!" header |
| */ |
| static int |
| _is_file_text (char *fname, char **shell_ptr) |
| { |
| int buf_size, fd, i; |
| int rc = 1; /* initially assume the file contains text */ |
| unsigned char buffer[8192]; |
| |
| if (fname[0] != '/') { |
| info("warning: %s not found in local path", fname); |
| return 0; |
| } |
| |
| fd = open(fname, O_RDONLY); |
| if (fd < 0) { |
| error ("Unable to open file %s: %m", fname); |
| return 0; |
| } |
| |
| buf_size = read (fd, buffer, sizeof (buffer)); |
| if (buf_size < 0) { |
| error ("Unable to read file %s: %m", fname); |
| rc = 0; |
| } |
| (void) close (fd); |
| |
| for (i=0; i<buf_size; i++) { |
| if (((int) text_chars[buffer[i]] != T) |
| && ((int) text_chars[buffer[i]] != I)) { |
| rc = 0; |
| break; |
| } |
| } |
| |
| if ((rc == 1) && (buf_size > 2)) { |
| if ((buffer[0] == '#') && (buffer[1] == '!')) |
| rc = 2; |
| } |
| |
| if ((rc == 2) && shell_ptr) { |
| shell_ptr[0] = xmalloc (sizeof (buffer)); |
| for (i=2; i<sizeof(buffer); i++) { |
| if (iscntrl (buffer[i])) { |
| shell_ptr[0][i-2] = '\0'; |
| break; |
| } else |
| shell_ptr[0][i-2] = buffer[i]; |
| } |
| if (i == sizeof(buffer)) { |
| error ("shell specified in script too long, not used"); |
| xfree (shell_ptr[0]); |
| shell_ptr[0] = NULL; |
| } |
| } |
| |
| return rc; |
| } |
| |
| /* allocate and build a string containing a script for a batch job */ |
| static char * |
| _build_script (const char *argv0, char *fname, int file_type) |
| { |
| cbuf_t cb = cbuf_create(512, 1048576); |
| int fd = -1; |
| int i = 0; |
| char *buffer = NULL; |
| |
| if (file_type != 0) { |
| if ((fd = open(fname, O_RDONLY)) < 0) { |
| error ("Unable to open file %s: %m", fname); |
| return NULL; |
| } |
| } |
| |
| if (file_type != TYPE_SCRIPT) { |
| xstrfmtcat(buffer, "#!%s\n", _get_shell()); |
| if (file_type == 0) { |
| xstrfmtcat(buffer, "%s ", argv0); /* path to srun */ |
| for (i = 0; i < remote_argc; i++) |
| xstrfmtcat(buffer, "%s ", remote_argv[i]); |
| xstrcatchar(buffer, '\n'); |
| } |
| } |
| |
| if (file_type != 0) { |
| int len = buffer ? strlen(buffer) : 0; |
| int size; |
| |
| while ((size = cbuf_write_from_fd(cb, fd, -1, NULL)) > 0) |
| ; |
| |
| if (size < 0) { |
| error ("unable to read %s: %m", fname); |
| cbuf_destroy(cb); |
| return NULL; |
| } |
| |
| cbuf_write(cb, "\0", 1, NULL); |
| |
| xrealloc(buffer, cbuf_used(cb) + len +1); |
| |
| cbuf_read(cb, buffer+len, cbuf_used(cb)); |
| |
| if (close(fd) < 0) |
| error("close: %m"); |
| } |
| |
| cbuf_destroy(cb); |
| |
| _get_options(buffer); |
| |
| if (strlen(buffer) >= 0xffff) { |
| error("Job script exceeds size supported by slurm"); |
| xfree(buffer); |
| } |
| |
| return buffer; |
| } |
| |
| /* Set SLURM_UMASK environment variable with current state */ |
| static int _set_umask_env(void) |
| { |
| char mask_char[5]; |
| mode_t mask; |
| |
| if (getenv("SLURM_UMASK")) /* use this value */ |
| return SLURM_SUCCESS; |
| |
| mask = (int)umask(0); |
| umask(mask); |
| |
| sprintf(mask_char, "0%d%d%d", |
| ((mask>>6)&07), ((mask>>3)&07), mask&07); |
| if (setenvf(NULL, "SLURM_UMASK", "%s", mask_char) < 0) { |
| error ("unable to set SLURM_UMASK in environment"); |
| return SLURM_FAILURE; |
| } |
| debug ("propagating UMASK=%s", mask_char); |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * _set_prio_process_env |
| * |
| * Set the internal SLURM_PRIO_PROCESS environment variable to support |
| * the propagation of the users nice value and the "PropagatePrioProcess" |
| * config keyword. |
| */ |
| static void _set_prio_process_env(void) |
| { |
| int retval; |
| |
| errno = 0; /* needed to detect a real failure since prio can be -1 */ |
| |
| if ((retval = getpriority (PRIO_PROCESS, 0)) == -1) { |
| if (errno) { |
| error ("getpriority(PRIO_PROCESS): %m"); |
| return; |
| } |
| } |
| |
| if (setenvf (NULL, "SLURM_PRIO_PROCESS", "%d", retval) < 0) { |
| error ("unable to set SLURM_PRIO_PROCESS in environment"); |
| return; |
| } |
| |
| debug ("propagating SLURM_PRIO_PROCESS=%d", retval); |
| } |
| |
| /* |
| * Change SLURM_RLIMIT_RSS to the user specified value --job-mem |
| * or opt.job_max_memory |
| */ |
| static int _change_rlimit_rss(void) |
| { |
| struct rlimit rlim[1]; |
| long new_cur; |
| int rc = SLURM_SUCCESS; |
| |
| if (getrlimit (RLIMIT_RSS, rlim) < 0) |
| return (error ("getrlimit (RLIMIT_RSS): %m")); |
| |
| new_cur = opt.job_max_memory*1024; |
| if((new_cur > rlim->rlim_max) || (new_cur < 0)) |
| rlim->rlim_cur = rlim->rlim_max; |
| else |
| rlim->rlim_cur = new_cur; |
| |
| if (setenvf (NULL, "SLURM_RLIMIT_RSS", "%lu", rlim->rlim_cur) < 0) |
| error ("unable to set %s in environment", "RSS"); |
| |
| if (setrlimit (RLIMIT_RSS, rlim) < 0) |
| return (error ("Unable to change memoryuse: %m")); |
| |
| return rc; |
| } |
| |
| /* Set SLURM_RLIMIT_* environment variables with current resource |
| * limit values, reset RLIMIT_NOFILE to maximum possible value */ |
| static int _set_rlimit_env(void) |
| { |
| int rc = SLURM_SUCCESS; |
| struct rlimit rlim[1]; |
| unsigned long cur; |
| char name[64], *format; |
| slurm_rlimits_info_t *rli; |
| |
| for (rli = get_slurm_rlimits_info(); rli->name != NULL; rli++ ) { |
| |
| if (getrlimit (rli->resource, rlim) < 0) { |
| error ("getrlimit (RLIMIT_%s): %m", rli->name); |
| rc = SLURM_FAILURE; |
| continue; |
| } |
| |
| cur = (unsigned long) rlim->rlim_cur; |
| snprintf(name, sizeof(name), "SLURM_RLIMIT_%s", rli->name); |
| if (opt.propagate && rli->propagate_flag == PROPAGATE_RLIMITS) |
| /* |
| * Prepend 'U' to indicate user requested propagate |
| */ |
| format = "U%lu"; |
| else |
| format = "%lu"; |
| |
| if (setenvf (NULL, name, format, cur) < 0) { |
| error ("unable to set %s in environment", name); |
| rc = SLURM_FAILURE; |
| continue; |
| } |
| |
| debug ("propagating RLIMIT_%s=%lu", rli->name, cur); |
| } |
| |
| /* |
| * Now increase NOFILE to the max available for this srun |
| */ |
| if (getrlimit (RLIMIT_NOFILE, rlim) < 0) |
| return (error ("getrlimit (RLIMIT_NOFILE): %m")); |
| |
| if (rlim->rlim_cur < rlim->rlim_max) { |
| rlim->rlim_cur = rlim->rlim_max; |
| if (setrlimit (RLIMIT_NOFILE, rlim) < 0) |
| return (error ("Unable to increase max no. files: %m")); |
| } |
| |
| return rc; |
| } |
| |
| static int |
| _print_script_exit_status(const char *argv0, int status) |
| { |
| char *corestr = ""; |
| int exitcode = 0; |
| |
| if (status == 0) { |
| verbose("%s: Done", argv0); |
| return exitcode; |
| } |
| |
| #ifdef WCOREDUMP |
| if (WCOREDUMP(status)) |
| corestr = " (core dumped)"; |
| #endif |
| |
| if (WIFSIGNALED(status)) { |
| error("%s: %s%s", argv0, sigstr(status), corestr); |
| return WTERMSIG(status) + 128; |
| } |
| if (WEXITSTATUS(status)) |
| error("%s: Exit %d", argv0, WEXITSTATUS(status)); |
| return WEXITSTATUS(status); |
| } |
| |
| /* allocation option specified, spawn a script and wait for it to exit */ |
| static int _run_job_script (srun_job_t *job, env_t *env) |
| { |
| int status, exitcode; |
| pid_t cpid; |
| char **argv = (remote_argv[0] ? remote_argv : NULL); |
| |
| if (opt.nprocs_set) |
| env->nprocs = opt.nprocs; |
| if (opt.cpus_set) |
| env->cpus_per_task = opt.cpus_per_task; |
| if (opt.ntasks_per_node != NO_VAL) |
| env->ntasks_per_node = opt.ntasks_per_node; |
| if (opt.ntasks_per_socket != NO_VAL) |
| env->ntasks_per_socket = opt.ntasks_per_socket; |
| if (opt.ntasks_per_core != NO_VAL) |
| env->ntasks_per_core = opt.ntasks_per_core; |
| env->distribution = opt.distribution; |
| env->overcommit = opt.overcommit; |
| env->slurmd_debug = opt.slurmd_debug; |
| env->labelio = opt.labelio; |
| env->comm_port = slurmctld_comm_addr.port; |
| env->comm_hostname = slurmctld_comm_addr.hostname; |
| if(job) { |
| env->select_jobinfo = job->select_jobinfo; |
| env->jobid = job->jobid; |
| env->nhosts = job->nhosts; |
| env->nodelist = job->nodelist; |
| env->task_count = _uint16_array_to_str( |
| job->nhosts, job->step_layout->tasks); |
| } |
| |
| if (setup_env(env) != SLURM_SUCCESS) |
| return SLURM_ERROR; |
| |
| if (!argv) { |
| /* |
| * If no arguments were supplied, spawn a shell |
| * for the user. |
| */ |
| argv = xmalloc(2 * sizeof(char *)); |
| argv[0] = _get_shell(); |
| argv[1] = NULL; |
| } |
| |
| if ((cpid = fork()) < 0) { |
| error("fork: %m"); |
| exit(1); |
| } |
| |
| if (cpid == 0) { |
| /* |
| * Child. |
| */ |
| #ifdef HAVE_AIX |
| (void) mkcrid(0); |
| #endif |
| log_fini(); |
| sig_unblock_signals(); |
| execvp(argv[0], argv); |
| exit(1); |
| } |
| |
| /* |
| * Parent continues. |
| */ |
| |
| again: |
| if (waitpid(cpid, &status, 0) < (pid_t) 0) { |
| if (errno == EINTR) |
| goto again; |
| error("waitpid: %m"); |
| } |
| |
| exitcode = _print_script_exit_status(xbasename(argv[0]), status); |
| |
| (void) unsetenv("SLURM_JOBID"); /* no return code on some systems */ |
| return exitcode; |
| } |
| |
| static int _become_user (void) |
| { |
| struct passwd *pwd = getpwuid (opt.uid); |
| |
| if (opt.uid == getuid ()) |
| return (0); |
| |
| if ((opt.egid != (gid_t) -1) && (setgid (opt.egid) < 0)) |
| return (error ("setgid: %m")); |
| |
| initgroups (pwd->pw_name, pwd->pw_gid); /* Ignore errors */ |
| |
| if (setuid (opt.uid) < 0) |
| return (error ("setuid: %m")); |
| |
| return (0); |
| } |
| |
| static void _run_srun_prolog (srun_job_t *job) |
| { |
| int rc; |
| |
| if (opt.prolog && strcasecmp(opt.prolog, "none") != 0) { |
| rc = _run_srun_script(job, opt.prolog); |
| debug("srun prolog rc = %d", rc); |
| } |
| } |
| |
| static void _run_srun_epilog (srun_job_t *job) |
| { |
| int rc; |
| |
| if (opt.epilog && strcasecmp(opt.epilog, "none") != 0) { |
| rc = _run_srun_script(job, opt.epilog); |
| debug("srun epilog rc = %d", rc); |
| } |
| } |
| |
| static int _run_srun_script (srun_job_t *job, char *script) |
| { |
| int status; |
| pid_t cpid; |
| int i; |
| char **args = NULL; |
| |
| if (script == NULL || script[0] == '\0') |
| return 0; |
| |
| if (access(script, R_OK | X_OK) < 0) { |
| info("Access denied for %s: %m", script); |
| return 0; |
| } |
| |
| if ((cpid = fork()) < 0) { |
| error ("run_srun_script: fork: %m"); |
| return -1; |
| } |
| if (cpid == 0) { |
| |
| /* set the scripts command line arguments to the arguments |
| * for the application, but shifted one higher |
| */ |
| args = xmalloc(sizeof(char *) * 1024); |
| args[0] = script; |
| for (i = 0; i < remote_argc; i++) { |
| args[i+1] = remote_argv[i]; |
| } |
| args[i+1] = NULL; |
| execv(script, args); |
| error("help! %m"); |
| exit(127); |
| } |
| |
| do { |
| if (waitpid(cpid, &status, 0) < 0) { |
| if (errno == EINTR) |
| continue; |
| error("waidpid: %m"); |
| return 0; |
| } else |
| return status; |
| } while(1); |
| |
| /* NOTREACHED */ |
| } |
| |
| static int |
| _is_local_file (io_filename_t *fname) |
| { |
| if (fname->name == NULL) |
| return 1; |
| |
| if (fname->taskid != -1) |
| return 1; |
| |
| return ((fname->type != IO_PER_TASK) && (fname->type != IO_ONE)); |
| } |
| |
| void |
| srun_set_stdio_fds(srun_job_t *job, slurm_step_io_fds_t *cio_fds) |
| { |
| bool err_shares_out = false; |
| |
| /* |
| * create stdin file descriptor |
| */ |
| if (_is_local_file(job->ifname)) { |
| if ((job->ifname->name == NULL) || (job->ifname->taskid != -1)) { |
| cio_fds->in.fd = STDIN_FILENO; |
| } else { |
| cio_fds->in.fd = open(job->ifname->name, O_RDONLY); |
| if (cio_fds->in.fd == -1) |
| fatal("Could not open stdin file: %m"); |
| } |
| if (job->ifname->type == IO_ONE) { |
| cio_fds->in.taskid = job->ifname->taskid; |
| cio_fds->in.nodeid = slurm_step_layout_host_id( |
| job->step_layout, job->ifname->taskid); |
| } |
| } |
| |
| /* |
| * create stdout file descriptor |
| */ |
| if (_is_local_file(job->ofname)) { |
| if ((job->ofname->name == NULL) || (job->ofname->taskid != -1)) { |
| cio_fds->out.fd = STDOUT_FILENO; |
| } else { |
| cio_fds->out.fd = open(job->ofname->name, |
| O_CREAT|O_WRONLY|O_TRUNC, 0644); |
| if (cio_fds->out.fd == -1) |
| fatal("Could not open stdout file: %m"); |
| } |
| if (job->ofname->name != NULL |
| && job->efname->name != NULL |
| && !strcmp(job->ofname->name, job->efname->name)) { |
| err_shares_out = true; |
| } |
| } |
| |
| /* |
| * create seperate stderr file descriptor only if stderr is not sharing |
| * the stdout file descriptor |
| */ |
| if (err_shares_out) { |
| debug3("stdout and stderr sharing a file"); |
| cio_fds->err.fd = cio_fds->out.fd; |
| } else if (_is_local_file(job->efname)) { |
| if ((job->efname->name == NULL) || (job->efname->taskid != -1)) { |
| cio_fds->err.fd = STDERR_FILENO; |
| } else { |
| cio_fds->err.fd = open(job->efname->name, |
| O_CREAT|O_WRONLY|O_TRUNC, 0644); |
| if (cio_fds->err.fd == -1) |
| fatal("Could not open stderr file: %m"); |
| } |
| } |
| } |
| |
| /* Plugins must be able to resolve symbols. |
| * Since srun statically links with src/api/libslurmhelper rather than |
| * dynamicaly linking with libslurm, we need to reference all needed |
| * symbols within srun. None of the functions below are actually |
| * used, but we need to load the symbols. */ |
| static void _define_symbols(void) |
| { |
| slurm_signal_job_step(0,0,0); /* needed by mvapich and mpichgm */ |
| } |