|  | /***************************************************************************** | 
|  | *  alloc.c - Slurm scrun job alloc handlers | 
|  | ***************************************************************************** | 
|  | *  Copyright (C) SchedMD LLC. | 
|  | * | 
|  | *  This file is part of Slurm, a resource management program. | 
|  | *  For details, see <https://slurm.schedmd.com/>. | 
|  | *  Please also read the included file: DISCLAIMER. | 
|  | * | 
|  | *  Slurm is free software; you can redistribute it and/or modify it under | 
|  | *  the terms of the GNU General Public License as published by the Free | 
|  | *  Software Foundation; either version 2 of the License, or (at your option) | 
|  | *  any later version. | 
|  | * | 
|  | *  In addition, as a special exception, the copyright holders give permission | 
|  | *  to link the code of portions of this program with the OpenSSL library under | 
|  | *  certain conditions as described in each individual source file, and | 
|  | *  distribute linked combinations including the two. You must obey the GNU | 
|  | *  General Public License in all respects for all of the code used other than | 
|  | *  OpenSSL. If you modify file(s) with this exception, you may extend this | 
|  | *  exception to your version of the file(s), but you are not obligated to do | 
|  | *  so. If you do not wish to do so, delete this exception statement from your | 
|  | *  version.  If you delete this exception statement from all source files in | 
|  | *  the program, then also delete it here. | 
|  | * | 
|  | *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY | 
|  | *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
|  | *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more | 
|  | *  details. | 
|  | * | 
|  | *  You should have received a copy of the GNU General Public License along | 
|  | *  with Slurm; if not, write to the Free Software Foundation, Inc., | 
|  | *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA. | 
|  | \*****************************************************************************/ | 
|  |  | 
|  | #include <limits.h> | 
|  |  | 
|  | #include "slurm/slurm.h" | 
|  |  | 
|  | #include "src/common/cpu_frequency.h" | 
|  | #include "src/common/env.h" | 
|  | #include "src/common/net.h" | 
|  | #include "src/common/read_config.h" | 
|  | #include "src/common/slurm_opt.h" | 
|  | #include "src/common/slurm_protocol_defs.h" | 
|  | #include "src/common/spank.h" | 
|  | #include "src/common/uid.h" | 
|  | #include "src/common/xassert.h" | 
|  | #include "src/common/xmalloc.h" | 
|  | #include "src/common/xstring.h" | 
|  |  | 
|  | #include "src/conmgr/conmgr.h" | 
|  |  | 
|  | #include "src/scrun/scrun.h" | 
|  |  | 
|  | /* max number of seconds to delay while waiting for job */ | 
|  | #define MAX_DELAY 60 | 
|  |  | 
|  | typedef struct { | 
|  | const char *var; | 
|  | int type; | 
|  | } env_vars_t; | 
|  |  | 
|  | static const env_vars_t env_vars[] = { | 
|  | { "SCRUN_ACCOUNT", 'A' }, | 
|  | { "SCRUN_ACCTG_FREQ", LONG_OPT_ACCTG_FREQ }, | 
|  | { "SCRUN_BURST_BUFFER", LONG_OPT_BURST_BUFFER_SPEC }, | 
|  | { "SCRUN_CLUSTER_CONSTRAINT", LONG_OPT_CLUSTER_CONSTRAINT }, | 
|  | { "SCRUN_CLUSTERS", 'M' }, | 
|  | { "SCRUN_CONSTRAINT", 'C' }, | 
|  | { "SCRUN_CORE_SPEC", 'S' }, | 
|  | { "SCRUN_CPU_BIND", LONG_OPT_CPU_BIND }, | 
|  | { "SCRUN_CPU_FREQ_REQ", LONG_OPT_CPU_FREQ }, | 
|  | { "SCRUN_CPUS_PER_GPU", LONG_OPT_CPUS_PER_GPU }, | 
|  | { "SCRUN_CPUS_PER_TASK", 'c' }, | 
|  | { "SCRUN_DELAY_BOOT", LONG_OPT_DELAY_BOOT }, | 
|  | { "SCRUN_DEPENDENCY", 'd' }, | 
|  | { "SCRUN_DISTRIBUTION", 'm' }, | 
|  | { "SCRUN_EPILOG", LONG_OPT_EPILOG }, | 
|  | { "SCRUN_EXACT", LONG_OPT_EXACT }, | 
|  | { "SCRUN_EXCLUSIVE", LONG_OPT_EXCLUSIVE }, | 
|  | { "SCRUN_GPU_BIND", LONG_OPT_GPU_BIND }, | 
|  | { "SCRUN_GPU_FREQ", LONG_OPT_GPU_FREQ }, | 
|  | { "SCRUN_GPUS", 'G' }, | 
|  | { "SCRUN_GPUS_PER_NODE", LONG_OPT_GPUS_PER_NODE }, | 
|  | { "SCRUN_GPUS_PER_SOCKET", LONG_OPT_GPUS_PER_SOCKET }, | 
|  | { "SCRUN_GPUS_PER_TASK", LONG_OPT_GPUS_PER_TASK }, | 
|  | { "SCRUN_GRES_FLAGS", LONG_OPT_GRES_FLAGS }, | 
|  | { "SCRUN_GRES", LONG_OPT_GRES }, | 
|  | { "SCRUN_HINT", LONG_OPT_HINT }, | 
|  | { "SCRUN_JOB_NAME", 'J' }, | 
|  | { "SCRUN_JOB_NODELIST", LONG_OPT_ALLOC_NODELIST }, | 
|  | { "SCRUN_JOB_NUM_NODES", 'N' }, | 
|  | { "SCRUN_LABELIO", 'l' }, | 
|  | { "SCRUN_MEM_BIND", LONG_OPT_MEM_BIND }, | 
|  | { "SCRUN_MEM_PER_CPU", LONG_OPT_MEM_PER_CPU }, | 
|  | { "SCRUN_MEM_PER_GPU", LONG_OPT_MEM_PER_GPU }, | 
|  | { "SCRUN_MEM_PER_NODE", LONG_OPT_MEM }, | 
|  | { "SCRUN_MPI_TYPE", LONG_OPT_MPI }, | 
|  | { "SCRUN_NCORES_PER_SOCKET", LONG_OPT_CORESPERSOCKET }, | 
|  | { "SCRUN_NETWORK", LONG_OPT_NETWORK }, | 
|  | { "SCRUN_NSOCKETS_PER_NODE", LONG_OPT_SOCKETSPERNODE }, | 
|  | { "SCRUN_NTASKS", 'n' }, | 
|  | { "SCRUN_NTASKS_PER_CORE", LONG_OPT_NTASKSPERCORE }, | 
|  | { "SCRUN_NTASKS_PER_GPU", LONG_OPT_NTASKSPERGPU }, | 
|  | { "SCRUN_NTASKS_PER_NODE", LONG_OPT_NTASKSPERNODE }, | 
|  | { "SCRUN_NTASKS_PER_TRES", LONG_OPT_NTASKSPERTRES }, | 
|  | { "SCRUN_OPEN_MODE", LONG_OPT_OPEN_MODE }, | 
|  | { "SCRUN_OVERCOMMIT", 'O' }, | 
|  | { "SCRUN_OVERLAP", LONG_OPT_OVERLAP }, | 
|  | { "SCRUN_PARTITION", 'p' }, | 
|  | { "SCRUN_POWER", LONG_OPT_POWER }, | 
|  | { "SCRUN_PROFILE", LONG_OPT_PROFILE }, | 
|  | { "SCRUN_PROLOG", LONG_OPT_PROLOG }, | 
|  | { "SCRUN_QOS", 'q' }, | 
|  | { "SCRUN_REMOTE_CWD", 'D' }, | 
|  | { "SCRUN_REQ_SWITCH", LONG_OPT_SWITCH_REQ }, | 
|  | { "SCRUN_RESERVATION", LONG_OPT_RESERVATION }, | 
|  | { "SCRUN_SIGNAL", LONG_OPT_SIGNAL }, | 
|  | { "SCRUN_SLURMD_DEBUG", LONG_OPT_SLURMD_DEBUG }, | 
|  | { "SCRUN_SPREAD_JOB", LONG_OPT_SPREAD_JOB }, | 
|  | { "SCRUN_TASK_EPILOG", LONG_OPT_TASK_EPILOG }, | 
|  | { "SCRUN_TASK_PROLOG", LONG_OPT_TASK_PROLOG }, | 
|  | { "SCRUN_THREAD_SPEC", LONG_OPT_THREAD_SPEC }, | 
|  | { "SCRUN_THREADS_PER_CORE", LONG_OPT_THREADSPERCORE }, | 
|  | { "SCRUN_THREADS", 'T' }, | 
|  | { "SCRUN_TIMELIMIT", 't' }, | 
|  | { "SCRUN_TRES_BIND", LONG_OPT_TRES_BIND }, | 
|  | { "SCRUN_TRES_PER_TASK", LONG_OPT_TRES_PER_TASK }, | 
|  | { "SCRUN_UNBUFFEREDIO", 'u' }, | 
|  | { "SCRUN_USE_MIN_NODES", LONG_OPT_USE_MIN_NODES }, | 
|  | { "SCRUN_WAIT4SWITCH", LONG_OPT_SWITCH_WAIT }, | 
|  | { "SCRUN_WCKEY", LONG_OPT_WCKEY }, | 
|  | { "SCRUN_WORKING_DIR", 'D' }, | 
|  | }; | 
|  |  | 
|  | #define _set_env_args(state, field, pattern, ...)                              \ | 
|  | do {                                                                   \ | 
|  | xassert(state.locked);                                         \ | 
|  | (void) env_array_overwrite_fmt(&state.job_env, field, pattern, \ | 
|  | __VA_ARGS__);                   \ | 
|  | } while (0) | 
|  |  | 
|  | #define _set_env(state, field, value)                                       \ | 
|  | do {                                                                \ | 
|  | const char *v = value; /* avoid field=(null) */             \ | 
|  | xassert(state.locked);                                      \ | 
|  | (void) env_array_overwrite_fmt(&state.job_env, field, "%s", \ | 
|  | ((v && v[0]) ? v : ""));     \ | 
|  | } while (0) | 
|  |  | 
|  | static int _foreach_env_annotation(void *x, void *arg) | 
|  | { | 
|  | config_key_pair_t *key_pair_ptr = x; | 
|  | char *key = xstrdup_printf("SCRUN_ANNOTATION_%s", key_pair_ptr->name); | 
|  |  | 
|  | xassert(!arg); | 
|  | xassert(state.locked); | 
|  |  | 
|  | _set_env(state, key, key_pair_ptr->value); | 
|  |  | 
|  | xfree(key); | 
|  | return SLURM_SUCCESS; | 
|  | } | 
|  |  | 
|  | static void _script_env(void) | 
|  | { | 
|  | xassert(state.locked); | 
|  |  | 
|  | /* variables required for OCI state */ | 
|  | _set_env(state, "SCRUN_OCI_VERSION", state.oci_version); | 
|  | _set_env(state, "SCRUN_CONTAINER_ID", state.id); | 
|  | if (state.pid && (state.pid != INFINITE64)) | 
|  | _set_env_args(state, "SCRUN_PID", "%"PRIu64, | 
|  | (uint64_t) state.pid); | 
|  | _set_env(state, "SCRUN_BUNDLE", state.bundle); | 
|  | _set_env(state, "SCRUN_SUBMISSION_BUNDLE", state.orig_bundle); | 
|  | list_for_each_ro(state.annotations, _foreach_env_annotation, NULL); | 
|  | _set_env(state, "SCRUN_PID_FILE", state.pid_file); | 
|  | _set_env(state, "SCRUN_SOCKET", state.anchor_socket); | 
|  | _set_env(state, "SCRUN_SPOOL_DIR", state.spool_dir); | 
|  | _set_env(state, "SCRUN_SUBMISSION_CONFIG_FILE", state.config_file); | 
|  | if ((state.user_id != NO_VAL) && (state.user_id != SLURM_AUTH_NOBODY)) { | 
|  | /* set user if we know it but we may not in a user namespace */ | 
|  | char *u = uid_to_string_or_null(state.user_id); | 
|  | _set_env(state, "SCRUN_USER", u); | 
|  | xfree(u); | 
|  | _set_env_args(state, "SCRUN_USER_ID", "%u", state.user_id); | 
|  | } | 
|  | if ((state.group_id != NO_VAL) && | 
|  | (state.group_id != SLURM_AUTH_NOBODY)) { | 
|  | /* set group if we know it but we may not in a user namespace */ | 
|  | char *u = gid_to_string(state.group_id); | 
|  | _set_env(state, "SCRUN_GROUP", u); | 
|  | xfree(u); | 
|  | _set_env_args(state, "SCRUN_GROUP_ID", "%u", state.group_id); | 
|  | } | 
|  | _set_env(state, "SCRUN_ROOT", state.root_dir); | 
|  | _set_env(state, "SCRUN_ROOTFS_PATH", state.root_path); | 
|  | _set_env(state, "SCRUN_SUBMISSION_ROOTFS_PATH", state.root_path); | 
|  |  | 
|  | if (log_file) | 
|  | _set_env(state, "SCRUN_LOG_FILE", log_file); | 
|  | if (log_format) | 
|  | _set_env(state, "SCRUN_LOG_FORMAT", log_format); | 
|  |  | 
|  | if (state.tty_size.ws_col) | 
|  | _set_env_args(state, "SLURM_PTY_WIN_COL", "%hu", | 
|  | state.tty_size.ws_col); | 
|  | if (state.tty_size.ws_row) | 
|  | _set_env_args(state, "SLURM_PTY_WIN_ROW", "%hu", | 
|  | state.tty_size.ws_row); | 
|  | } | 
|  |  | 
|  | #undef _set_env | 
|  | #undef _set_env_args | 
|  |  | 
|  | static int _stage_in() | 
|  | { | 
|  | int rc; | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: BEGIN container %s staging in", __func__, state.id); | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | rc = stage_in(); | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: END container %s staging in: %s", | 
|  | __func__, state.id, slurm_strerror(rc)); | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | if (rc) { | 
|  | read_lock_state(); | 
|  | error("%s: stage_in() for %s failed: %s", | 
|  | __func__, state.id, slurm_strerror(rc)); | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static void *_on_connection(conmgr_fd_t *con, void *arg) | 
|  | { | 
|  | debug("%s:[%s] new srun connection", | 
|  | __func__, conmgr_fd_get_name(con)); | 
|  |  | 
|  | /* must return !NULL or connection will be closed */ | 
|  | return con; | 
|  | } | 
|  |  | 
|  | static int _on_msg(conmgr_fd_t *con, slurm_msg_t *msg, int unpack_rc, void *arg) | 
|  | { | 
|  | int rc = SLURM_SUCCESS; | 
|  | xassert(arg == con); | 
|  |  | 
|  | if (unpack_rc || !msg->auth_ids_set) { | 
|  | error("%s: [%s] rejecting malformed RPC and closing connection: %s", | 
|  | __func__, conmgr_fd_get_name(con), | 
|  | slurm_strerror(unpack_rc)); | 
|  | slurm_free_msg(msg); | 
|  | return unpack_rc; | 
|  | } | 
|  |  | 
|  | switch (msg->msg_type) | 
|  | { | 
|  | case SRUN_PING: | 
|  | { | 
|  | /* if conmgr is alive then always respond success */ | 
|  | rc = slurm_send_rc_msg(msg, SLURM_SUCCESS); | 
|  | debug("%s:[%s] srun RPC PING has been PONGED", | 
|  | __func__, conmgr_fd_get_name(con)); | 
|  | break; | 
|  | } | 
|  | case SRUN_JOB_COMPLETE: | 
|  | { | 
|  | xassert(sizeof(srun_job_complete_msg_t) == | 
|  | sizeof(slurm_step_id_t)); | 
|  | slurm_step_id_t *step = msg->data; | 
|  | debug("%s:[%s] %pS complete srun RPC", | 
|  | __func__, conmgr_fd_get_name(con), step); | 
|  | stop_anchor(SLURM_SUCCESS); | 
|  | break; | 
|  | } | 
|  | case SRUN_TIMEOUT: | 
|  | { | 
|  | srun_timeout_msg_t *to = msg->data; | 
|  | debug("%s:[%s] srun RPC %pS timeout at %ld RPC", | 
|  | __func__, conmgr_fd_get_name(con), &to->step_id, | 
|  | to->timeout); | 
|  | stop_anchor(ESLURM_JOB_TIMEOUT_KILLED); | 
|  | break; | 
|  | } | 
|  | case SRUN_USER_MSG: | 
|  | { | 
|  | srun_user_msg_t *um = msg->data; | 
|  |  | 
|  | debug("%s:[%s] JobId=%u srun user message RPC", | 
|  | __func__, conmgr_fd_get_name(con), um->job_id); | 
|  |  | 
|  | print_multi_line_string(um->msg, -1, LOG_LEVEL_INFO); | 
|  | break; | 
|  | } | 
|  | case SRUN_NODE_FAIL: | 
|  | { | 
|  | srun_node_fail_msg_t *nf = msg->data; | 
|  | debug("%s:[%s] srun RPC %pS nodes failed: %s", | 
|  | __func__, conmgr_fd_get_name(con), &nf->step_id, | 
|  | nf->nodelist); | 
|  | stop_anchor(ESLURM_JOB_NODE_FAIL_KILLED); | 
|  | break; | 
|  | } | 
|  | case SRUN_REQUEST_SUSPEND: | 
|  | { | 
|  | suspend_msg_t *sus_msg = msg->data; | 
|  | rc = SLURM_UNEXPECTED_MSG_ERROR; | 
|  | error("%s:[%s] rejecting srun suspend RPC for %s", | 
|  | __func__, conmgr_fd_get_name(con), sus_msg->job_id_str); | 
|  | break; | 
|  | } | 
|  | case SRUN_NET_FORWARD: | 
|  | { | 
|  | net_forward_msg_t *net = msg->data; | 
|  | rc = SLURM_UNEXPECTED_MSG_ERROR; | 
|  | error("%s:[%s] rejecting srun net forward RPC for %s", | 
|  | __func__, conmgr_fd_get_name(con), net->target); | 
|  | break; | 
|  | } | 
|  | default: | 
|  | rc = SLURM_UNEXPECTED_MSG_ERROR; | 
|  | error("%s:[%s] received spurious srun message type: %s", | 
|  | __func__, conmgr_fd_get_name(con), | 
|  | rpc_num2string(msg->msg_type)); | 
|  | } | 
|  |  | 
|  | slurm_free_msg(msg); | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static void _on_finish(conmgr_fd_t *con, void *arg) | 
|  | { | 
|  | xassert(con == arg); | 
|  |  | 
|  | if (get_log_level() > LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: [%s] closed srun connection state=%s", | 
|  | __func__, conmgr_fd_get_name(con), | 
|  | slurm_container_status_to_str(state.status)); | 
|  | unlock_state(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /* | 
|  | * Listen on srun port to make sure that slurmctld doesn't mark job as dead | 
|  | * RET port listening on | 
|  | */ | 
|  | static uint32_t _setup_listener(void) | 
|  | { | 
|  | static const conmgr_events_t events = { | 
|  | .on_connection = _on_connection, | 
|  | .on_msg = _on_msg, | 
|  | .on_finish = _on_finish, | 
|  | }; | 
|  | uint16_t *ports; | 
|  | uint16_t port = 0; | 
|  | int fd = -1; | 
|  | int rc; | 
|  |  | 
|  | if ((ports = slurm_get_srun_port_range())) { | 
|  | if (net_stream_listen_ports(&fd, &port, | 
|  | slurm_get_srun_port_range(), | 
|  | false) < 0) | 
|  | fatal("%s: unable to open local listening port. Try increasing range of SrunPortRange in slurm.conf.", | 
|  | __func__); | 
|  | } else { | 
|  | if (net_stream_listen(&fd, &port) < 0) | 
|  | fatal("%s: unable to open local listening port", | 
|  | __func__); | 
|  | } | 
|  |  | 
|  | xassert(port > 0); | 
|  | debug("%s: listening for srun RPCs on port=%hu", __func__, port); | 
|  |  | 
|  | if ((rc = conmgr_process_fd(CON_TYPE_RPC, fd, fd, &events, CON_FLAG_NONE, | 
|  | NULL, 0, NULL, NULL))) | 
|  | fatal("%s: conmgr refused fd=%d: %s", | 
|  | __func__, fd, slurm_strerror(rc)); | 
|  |  | 
|  | return port; | 
|  | } | 
|  |  | 
|  | static void _pending_callback(uint32_t job_id) | 
|  | { | 
|  | info("waiting on pending job allocation %u", job_id); | 
|  | } | 
|  |  | 
|  | /* check allocation has all nodes ready */ | 
|  | extern void check_allocation(conmgr_callback_args_t conmgr_args, void *arg) | 
|  | { | 
|  | /* there must be only 1 thread that will call this at any one time */ | 
|  | static long delay = 1; | 
|  | bool bail = false; | 
|  | int rc, job_id; | 
|  |  | 
|  | read_lock_state(); | 
|  | bail = (state.status != CONTAINER_ST_CREATING); | 
|  | job_id = state.jobid; | 
|  | unlock_state(); | 
|  |  | 
|  | if (bail) { | 
|  | /* | 
|  | * Only check allocations while creating. Something else must | 
|  | * have broke before now so bail out. | 
|  | */ | 
|  | debug("%s: bailing due to status %s != %s", | 
|  | __func__, | 
|  | slurm_container_status_to_str(state.status), | 
|  | slurm_container_status_to_str(CONTAINER_ST_CREATING)); | 
|  | stop_anchor(ESLURM_ALREADY_DONE); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (conmgr_args.status != CONMGR_WORK_STATUS_RUN) { | 
|  | debug("%s: bailing due to callback status %s", | 
|  | __func__, conmgr_work_status_string(conmgr_args.status)); | 
|  | stop_anchor(ESLURM_ALREADY_DONE); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: checking JobId=%d for nodes ready", | 
|  | __func__, state.jobid); | 
|  | unlock_state(); | 
|  | } | 
|  | rc = slurm_job_node_ready(job_id); | 
|  | if ((rc == READY_JOB_ERROR) || (rc == EAGAIN)) { | 
|  | delay *= 2; | 
|  | if ((delay < 0) || (delay > MAX_DELAY)) | 
|  | delay = MAX_DELAY; | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: rechecking JobId=%d for nodes ready in %ld ns", | 
|  | __func__, state.jobid, delay); | 
|  | unlock_state(); | 
|  | } | 
|  | conmgr_add_work_delayed_fifo(check_allocation, NULL, delay, 0); | 
|  | } else if ((rc == READY_JOB_FATAL) || !(rc & READY_JOB_STATE)) { | 
|  | /* job failed! */ | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: JobId=%d failed. Bailing on checking for nodes: %s", | 
|  | __func__, state.jobid, slurm_strerror(rc)); | 
|  | unlock_state(); | 
|  | } | 
|  | stop_anchor(ESLURM_ALREADY_DONE); | 
|  | return; | 
|  | } else { | 
|  | /* job is ready! */ | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | debug("%s: JobId=%d is ready", __func__, state.jobid); | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | if ((rc = _stage_in())) { | 
|  | stop_anchor(rc); | 
|  | } else { | 
|  | /* we have a job now. see if creating is done */ | 
|  | conmgr_add_work_fifo(on_allocation, NULL); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | static void _alloc_job(void) | 
|  | { | 
|  | int rc; | 
|  | resource_allocation_response_msg_t *alloc = NULL; | 
|  | salloc_opt_t aopt = { 0 }; | 
|  | slurm_opt_t opt = { | 
|  | .salloc_opt = &aopt, | 
|  | }; | 
|  | char *opt_string = NULL; | 
|  | struct option *spanked = slurm_option_table_create(&opt, &opt_string); | 
|  | job_desc_msg_t *desc; | 
|  |  | 
|  | slurm_reset_all_options(&opt, true); | 
|  |  | 
|  | for (int i = 0; i < ARRAY_SIZE(env_vars); i++) { | 
|  | const char *val; | 
|  | const env_vars_t *e = &env_vars[i]; | 
|  |  | 
|  | if ((val = getenv(e->var)) && | 
|  | slurm_process_option(&opt, e->type, val, true, false)) | 
|  | fatal("%s: Unable to process environment variable %s=%s", | 
|  | __func__, e->var, val); | 
|  | } | 
|  |  | 
|  | /* Process spank env options */ | 
|  | if ((rc = spank_process_env_options())) | 
|  | fatal("%s: spank_process_env_options() failed: %s", | 
|  | __func__, slurm_strerror(rc)); | 
|  |  | 
|  | slurm_option_table_destroy(spanked); | 
|  | spanked = NULL; | 
|  | xfree(opt_string); | 
|  |  | 
|  | desc = slurm_opt_create_job_desc(&opt, true); | 
|  | xfree(desc->name); | 
|  | read_lock_state(); | 
|  | desc->name = xstrdup(state.id); | 
|  | desc->container_id = xstrdup(state.id); | 
|  | if (state.spank_job_env) { | 
|  | desc->spank_job_env = | 
|  | env_array_copy((const char **) state.spank_job_env); | 
|  | desc->spank_job_env_size = envcount(state.spank_job_env); | 
|  | } | 
|  | unlock_state(); | 
|  | if (!desc->min_nodes || (desc->min_nodes == NO_VAL)) | 
|  | desc->min_nodes = 1; | 
|  |  | 
|  | /* | 
|  | * Avoid giving the user/group as this may be run | 
|  | * in user namespace as uid 0. | 
|  | */ | 
|  | desc->user_id = SLURM_AUTH_NOBODY; | 
|  | desc->group_id = SLURM_AUTH_NOBODY; | 
|  | desc->name = xstrdup("scrun"); | 
|  | desc->other_port = _setup_listener(); | 
|  |  | 
|  | debug("%s: requesting allocation with %u tasks and %u hosts", | 
|  | __func__, (desc->num_tasks == NO_VAL ? 1 : desc->num_tasks), | 
|  | (desc->min_nodes == NO_VAL ? 1 : desc->min_nodes)); | 
|  | alloc = slurm_allocate_resources_blocking(desc, false, | 
|  | _pending_callback); | 
|  | if (!alloc) | 
|  | fatal("Unable to request job allocation: %m"); | 
|  | if (alloc->error_code) { | 
|  | error("%s: unable to request job allocation: %s", | 
|  | __func__, slurm_strerror(alloc->error_code)); | 
|  |  | 
|  | stop_anchor(alloc->error_code); | 
|  | } | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | char *user = uid_to_string(alloc->uid); | 
|  | char *group = gid_to_string(alloc->gid); | 
|  |  | 
|  | debug("allocated jobId=%u user[%u]=%s group[%u]=%s", | 
|  | alloc->job_id, alloc->uid, user, alloc->uid, group); | 
|  |  | 
|  | xfree(user); | 
|  | xfree(group); | 
|  | } | 
|  |  | 
|  | write_lock_state(); | 
|  | state.jobid = alloc->job_id; | 
|  |  | 
|  | /* take job env (if any) for srun calls later */ | 
|  | SWAP(state.job_env, alloc->environment); | 
|  |  | 
|  | /* apply SPANK env (if any) */ | 
|  | for (int i = 0; i < opt.spank_job_env_size; i++) { | 
|  | char *value, *name = NULL; | 
|  |  | 
|  | if ((value = strchr(opt.spank_job_env[i], '='))) { | 
|  | value[0] = '\0'; | 
|  | value++; | 
|  | } | 
|  |  | 
|  | xstrfmtcat(name, "SLURM_SPANK_%s", opt.spank_job_env[i]); | 
|  | env_array_overwrite(&state.job_env, name, value); | 
|  | xfree(name); | 
|  | } | 
|  |  | 
|  | xassert(state.user_id == getuid()); | 
|  | state.user_id = alloc->uid; | 
|  | xassert(state.user_id != SLURM_AUTH_NOBODY); | 
|  |  | 
|  | xassert(state.group_id == getgid()); | 
|  | state.group_id = alloc->gid; | 
|  | xassert(state.group_id != SLURM_AUTH_NOBODY); | 
|  |  | 
|  | env_array_for_job(&state.job_env, alloc, desc, -1); | 
|  | unlock_state(); | 
|  |  | 
|  | slurm_free_job_desc_msg(desc); | 
|  | slurm_free_resource_allocation_response_msg(alloc); | 
|  | } | 
|  |  | 
|  | extern void get_allocation(conmgr_callback_args_t conmgr_args, void *arg) | 
|  | { | 
|  | int rc; | 
|  | job_info_msg_t *jobs = NULL; | 
|  | int job_id; | 
|  | char *job_id_str = getenv("SLURM_JOB_ID"); | 
|  | bool existing_allocation = false; | 
|  |  | 
|  | if (job_id_str && job_id_str[0]) { | 
|  | extern char **environ; | 
|  | slurm_selected_step_t id = {0}; | 
|  |  | 
|  | if ((rc = unfmt_job_id_string(job_id_str, &id, NO_VAL))) { | 
|  | fatal("%s: invalid SLURM_JOB_ID=%s: %s", | 
|  | __func__, job_id_str, slurm_strerror(rc)); | 
|  | return; | 
|  | } | 
|  |  | 
|  | write_lock_state(); | 
|  | state.jobid = job_id = id.step_id.job_id; | 
|  | state.existing_allocation = existing_allocation = true; | 
|  |  | 
|  | /* scrape SLURM_* from calling env */ | 
|  | state.job_env = env_array_create(); | 
|  | env_array_merge_slurm_spank(&state.job_env, | 
|  | (const char **) environ); | 
|  | unlock_state(); | 
|  |  | 
|  | debug("Running under existing JobId=%u", job_id); | 
|  | } else { | 
|  | _alloc_job(); | 
|  |  | 
|  | read_lock_state(); | 
|  | job_id = state.jobid; | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | /* alloc response is too sparse. get full job info */ | 
|  | rc = slurm_load_job(&jobs, job_id, 0); | 
|  | if (rc || !jobs || (jobs->record_count <= 0)) { | 
|  | /* job not found or already died ? */ | 
|  | if ((rc == SLURM_ERROR) && errno) | 
|  | rc = errno; | 
|  |  | 
|  | error("%s: unable to find JobId=%u: %s", | 
|  | __func__, job_id, slurm_strerror(rc)); | 
|  |  | 
|  | stop_anchor(rc); | 
|  | return; | 
|  | } | 
|  |  | 
|  | /* grab the first job */ | 
|  | xassert(jobs->job_array->job_id == job_id); | 
|  |  | 
|  | write_lock_state(); | 
|  | if (existing_allocation) { | 
|  | xassert(state.user_id == getuid()); | 
|  | state.user_id = jobs->job_array->user_id; | 
|  | xassert(state.user_id != SLURM_AUTH_NOBODY); | 
|  |  | 
|  | xassert(state.group_id == getgid()); | 
|  | state.group_id = jobs->job_array->group_id; | 
|  | xassert(state.group_id != SLURM_AUTH_NOBODY); | 
|  | } | 
|  | _script_env(); | 
|  | unlock_state(); | 
|  |  | 
|  | if (get_log_level() >= LOG_LEVEL_DEBUG) { | 
|  | read_lock_state(); | 
|  | if (state.job_env) | 
|  | for (int i = 0; state.job_env[i]; i++) | 
|  | debug("Job env[%d]=%s", i, state.job_env[i]); | 
|  | else | 
|  | debug("JobId=%u did not provide an environment", | 
|  | job_id); | 
|  | unlock_state(); | 
|  | } | 
|  |  | 
|  | slurm_free_job_info_msg(jobs); | 
|  |  | 
|  | if (existing_allocation) { | 
|  | if ((rc = _stage_in())) | 
|  | stop_anchor(rc); | 
|  | else | 
|  | conmgr_add_work_fifo(on_allocation, NULL); | 
|  | } else { | 
|  | conmgr_add_work_delayed_fifo(check_allocation, NULL, 0, 1); | 
|  | } | 
|  | } |