blob: 063c7189b10f1dee6b19c53f8df1ec3b19fa665a [file] [log] [blame]
/*****************************************************************************\
* src/slurmd/slurmstepd/mgr.c - step manager functions for slurmstepd
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Copyright (C) SchedMD LLC.
* Copyright (C) 2013 Intel, Inc.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <mgrondona@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#define _GNU_SOURCE
#include "config.h"
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#include <grp.h>
#include <limits.h>
#include <poll.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#ifdef HAVE_PTY_H
# include <pty.h>
# ifdef HAVE_UTMP_H
# include <utmp.h>
# endif
#endif
#ifdef WITH_SELINUX
#include <selinux/selinux.h>
#endif
#include "slurm/slurm_errno.h"
#include "src/common/cbuf.h"
#include "src/common/cpu_frequency.h"
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/reverse_tree.h"
#include "src/common/spank.h"
#include "src/common/strlcpy.h"
#include "src/common/tres_frequency.h"
#include "src/common/util-net.h"
#include "src/common/x11_util.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/acct_gather_profile.h"
#include "src/interfaces/cred.h"
#include "src/interfaces/gpu.h"
#include "src/interfaces/gres.h"
#include "src/interfaces/job_container.h"
#include "src/interfaces/jobacct_gather.h"
#include "src/interfaces/mpi.h"
#include "src/interfaces/prep.h"
#include "src/interfaces/proctrack.h"
#include "src/interfaces/switch.h"
#include "src/interfaces/task.h"
#include "src/conmgr/conmgr.h"
#include "src/slurmd/common/fname.h"
#include "src/slurmd/common/privileges.h"
#include "src/slurmd/common/set_oomadj.h"
#include "src/slurmd/common/slurmd_cgroup.h"
#include "src/slurmd/common/slurmd_common.h"
#include "src/slurmd/common/xcpuinfo.h"
#include "src/slurmd/slurmd/slurmd.h"
#include "src/slurmd/slurmstepd/io.h"
#include "src/slurmd/slurmstepd/pam_ses.h"
#include "src/slurmd/slurmstepd/pdebug.h"
#include "src/slurmd/slurmstepd/req.h"
#include "src/slurmd/slurmstepd/slurmstepd.h"
#include "src/slurmd/slurmstepd/step_terminate_monitor.h"
#include "src/slurmd/slurmstepd/task.h"
#include "src/slurmd/slurmstepd/ulimits.h"
#include "src/slurmd/slurmstepd/x11_forwarding.h"
#define RETRY_DELAY 15 /* retry every 15 seconds */
step_complete_t step_complete = {
PTHREAD_COND_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
-1,
-1,
-1,
(char *)NULL,
-1,
-1,
true,
(bitstr_t *)NULL,
0,
NULL
};
typedef struct kill_thread {
pthread_t thread_id;
int secs;
} kill_thread_t;
#if defined(__linux__)
typedef struct {
stepd_step_rec_t *step;
int id;
} spank_task_args_t;
#endif
/*
* Prototypes
*/
/*
* step manager related prototypes
*/
static void _send_launch_failure(launch_tasks_request_msg_t *,
slurm_addr_t *, int, uint16_t);
static int _fork_all_tasks(stepd_step_rec_t *step, bool *io_initialized);
static int _become_user(stepd_step_rec_t *step, struct priv_state *ps);
static void _set_prio_process (stepd_step_rec_t *step);
static int _setup_normal_io(stepd_step_rec_t *step);
static void _send_launch_resp(stepd_step_rec_t *step, int rc);
static int _slurmd_job_log_init(stepd_step_rec_t *step);
static void _wait_for_io(stepd_step_rec_t *step);
static int _send_exit_msg(stepd_step_rec_t *step, uint32_t *tid, int n,
int status);
static void _wait_for_all_tasks(stepd_step_rec_t *step);
static int _wait_for_any_task(stepd_step_rec_t *step, bool waitflag);
static void _random_sleep(stepd_step_rec_t *step);
static int _run_script_as_user(const char *name, const char *path,
stepd_step_rec_t *step,
int max_wait, char **env);
/*
* Batch step management prototypes:
*/
static char * _make_batch_dir(stepd_step_rec_t *step);
static int _make_batch_script(batch_job_launch_msg_t *msg,
stepd_step_rec_t *step);
static int _send_complete_batch_script_msg(stepd_step_rec_t *step,
int err, int status);
/*
* Launch an step step on the current node
*/
extern stepd_step_rec_t *
mgr_launch_tasks_setup(launch_tasks_request_msg_t *msg, slurm_addr_t *cli,
uint16_t protocol_version)
{
stepd_step_rec_t *step = NULL;
if (!(step = stepd_step_rec_create(msg, protocol_version))) {
/*
* We want to send back to the slurmd the reason we
* failed so keep track of it since errno could be
* reset in _send_launch_failure.
*/
int fail = errno;
_send_launch_failure(msg, cli, errno, protocol_version);
errno = fail;
return NULL;
}
step->envtp->cli = cli;
step->accel_bind_type = msg->accel_bind_type;
step->tres_bind = xstrdup(msg->tres_bind);
step->tres_freq = xstrdup(msg->tres_freq);
step->stepmgr = xstrdup(msg->stepmgr);
return step;
}
inline static int
_send_srun_resp_msg(slurm_msg_t *resp_msg, uint32_t nnodes)
{
int rc = SLURM_ERROR, retry = 1, max_retry = 0;
unsigned long delay = 100000;
/* NOTE: Wait until suspended job step is resumed or the RPC
* authentication credential from Munge may expire by the time
* it is resumed */
wait_for_resumed(resp_msg->msg_type);
while (1) {
if (resp_msg->protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
int msg_rc = 0;
msg_rc = slurm_send_recv_rc_msg_only_one(resp_msg,
&rc, 0);
/* Both must be zero for a successful transmission. */
if (!msg_rc && !rc)
break;
} else {
rc = SLURM_ERROR;
break;
}
if (!max_retry)
max_retry = (nnodes / 1024) + 5;
debug("%s: %d/%d failed to send msg type %s: %m",
__func__, retry, max_retry,
rpc_num2string(resp_msg->msg_type));
if (retry >= max_retry)
break;
usleep(delay);
if (delay < 800000)
delay *= 2;
retry++;
}
return rc;
}
static void _local_jobacctinfo_aggregate(
jobacctinfo_t *dest, jobacctinfo_t *from)
{
int gpumem_pos = -1, gpuutil_pos = -1;
if (from->pid) {
gpu_get_tres_pos(&gpumem_pos, &gpuutil_pos);
/*
* Here to make any sense for some variables we need to move the
* Max to the total (i.e. Mem, VMem, gpumem, gpuutil) since the
* total might be incorrect data, this way the total/ave will be
* of the Max values.
*/
from->tres_usage_in_tot[TRES_ARRAY_MEM] =
from->tres_usage_in_max[TRES_ARRAY_MEM];
from->tres_usage_in_tot[TRES_ARRAY_VMEM] =
from->tres_usage_in_max[TRES_ARRAY_VMEM];
if (gpumem_pos != -1)
from->tres_usage_in_tot[gpumem_pos] =
from->tres_usage_in_max[gpumem_pos];
if (gpuutil_pos != -1)
from->tres_usage_in_tot[gpuutil_pos] =
from->tres_usage_in_max[gpuutil_pos];
}
/*
* Here ave_watts stores the ave of the watts collected so store that
* as the last value so the total will be a total of ave instead of just
* the last watts collected.
*/
from->tres_usage_out_tot[TRES_ARRAY_ENERGY] = from->energy.ave_watts;
jobacctinfo_aggregate(dest, from);
}
/*
* Find the maximum task return code
*/
static uint32_t _get_exit_code(stepd_step_rec_t *step)
{
uint32_t i;
uint32_t step_rc = NO_VAL;
/* We are always killing/cancelling the extern_step so don't
* report that.
*/
if (step->step_id.step_id == SLURM_EXTERN_CONT)
return 0;
for (i = 0; i < step->node_tasks; i++) {
/* if this task was killed by cmd, ignore its
* return status as it only reflects the fact
* that we killed it
*/
if (step->task[i]->killed_by_cmd) {
debug("get_exit_code task %u killed by cmd", i);
continue;
}
/* if this task called PMI_Abort or PMI2_Abort,
* then we let it define the exit status
*/
if (step->task[i]->aborted) {
step_rc = step->task[i]->estatus;
debug("get_exit_code task %u called abort", i);
break;
}
/* If signaled we need to cycle thru all the
* tasks in case one of them called abort
*/
if (WIFSIGNALED(step->task[i]->estatus)) {
info("get_exit_code task %u died by signal: %d",
i, WTERMSIG(step->task[i]->estatus));
step_rc = step->task[i]->estatus;
break;
}
if ((step->task[i]->estatus & 0xff) == SIG_OOM) {
step_rc = step->task[i]->estatus;
} else if ((step_rc & 0xff) != SIG_OOM) {
step_rc = MAX(step_complete.step_rc,
step->task[i]->estatus);
}
}
/* If we killed all the tasks by cmd give at least one return
code. */
if (step_rc == NO_VAL && step->task[0])
step_rc = step->task[0]->estatus;
return step_rc;
}
static char *_batch_script_path(stepd_step_rec_t *step)
{
return xstrdup_printf("%s/%s", step->batchdir, "slurm_script");
}
/*
* Send batch exit code to slurmctld. Non-zero rc will DRAIN the node.
*/
extern void
batch_finish(stepd_step_rec_t *step, int rc)
{
char *script = _batch_script_path(step);
step_complete.step_rc = _get_exit_code(step);
if (unlink(script) < 0)
error("unlink(%s): %m", script);
xfree(script);
if (step->aborted) {
if (step->step_id.step_id != SLURM_BATCH_SCRIPT)
info("%ps abort completed", &step->step_id);
else
info("job %u abort completed", step->step_id.job_id);
} else if (step->step_id.step_id == SLURM_BATCH_SCRIPT) {
verbose("job %u completed with slurm_rc = %d, job_rc = %d",
step->step_id.job_id, rc, step_complete.step_rc);
/* if launch failed, make sure to tag step as failed too */
if (!step_complete.step_rc && rc)
step_complete.step_rc = rc;
_send_complete_batch_script_msg(
step, rc, step_complete.step_rc);
} else {
stepd_wait_for_children_slurmstepd(step);
verbose("%ps completed with slurm_rc = %d, job_rc = %d",
&step->step_id, rc, step_complete.step_rc);
stepd_send_step_complete_msgs(step);
}
/* Do not purge directory until slurmctld is notified of batch job
* completion to avoid race condition with slurmd registering missing
* batch job. */
if (step->batchdir && (rmdir(step->batchdir) < 0))
error("rmdir(%s): %m", step->batchdir);
xfree(step->batchdir);
}
/*
* Launch a batch job script on the current node
*/
stepd_step_rec_t *
mgr_launch_batch_job_setup(batch_job_launch_msg_t *msg, slurm_addr_t *cli)
{
stepd_step_rec_t *step = NULL;
if (!(step = batch_stepd_step_rec_create(msg))) {
error("batch_stepd_step_rec_create() failed for job %u on %s: %s",
msg->job_id, conf->hostname, slurm_strerror(errno));
return NULL;
}
if ((step->batchdir = _make_batch_dir(step)) == NULL) {
goto cleanup;
}
xfree(step->argv[0]);
if (_make_batch_script(msg, step))
goto cleanup;
env_array_for_batch_job(&step->env, msg, conf->node_name);
return step;
cleanup:
error("batch script setup failed for job %u on %s: %s",
msg->job_id, conf->hostname, slurm_strerror(errno));
if (step->aborted)
verbose("job %u abort complete", step->step_id.job_id);
/* Do not purge directory until slurmctld is notified of batch job
* completion to avoid race condition with slurmd registering missing
* batch job. */
if (step->batchdir && (rmdir(step->batchdir) < 0))
error("rmdir(%s): %m", step->batchdir);
xfree(step->batchdir);
errno = ESLURMD_CREATE_BATCH_DIR_ERROR;
return NULL;
}
static int
_setup_normal_io(stepd_step_rec_t *step)
{
int rc = 0, ii = 0;
struct priv_state sprivs;
debug2("Entering _setup_normal_io");
/*
* Temporarily drop permissions, initialize task stdio file
* descriptors (which may be connected to files), then
* reclaim privileges.
*/
if (drop_privileges(step, true, &sprivs, true) < 0)
return ESLURMD_SET_UID_OR_GID_ERROR;
if (io_init_tasks_stdio(step) != SLURM_SUCCESS) {
rc = ESLURMD_IO_ERROR;
goto claim;
}
/*
* MUST create the initial client object before starting
* the IO thread, or we risk losing stdout/err traffic.
*/
if (!step->batch) {
srun_info_t *srun = list_peek(step->sruns);
/* local id of task that sends to srun, -1 for all tasks,
any other value for no tasks */
int srun_stdout_tasks = -1;
int srun_stderr_tasks = -1;
xassert(srun != NULL);
/* If I/O is labelled with task num, and if a separate file is
written per node or per task, the I/O needs to be sent
back to the stepd, get a label appended, and written from
the stepd rather than sent back to srun or written directly
from the node. When a task has ofname or efname == NULL, it
means data gets sent back to the client. */
if (step->flags & LAUNCH_LABEL_IO) {
slurmd_filename_pattern_t outpattern, errpattern;
bool same = false;
int file_flags;
io_find_filename_pattern(step, &outpattern, &errpattern,
&same);
file_flags = io_get_file_flags(step);
/* Make eio objects to write from the slurmstepd */
if (outpattern == SLURMD_ALL_UNIQUE) {
/* Open a separate file per task */
for (ii = 0; ii < step->node_tasks; ii++) {
rc = io_create_local_client(
step->task[ii]->ofname,
file_flags, step, 1,
step->task[ii]->id,
same ? step->task[ii]->id : -2);
if (rc != SLURM_SUCCESS) {
error("Could not open output file %s: %m",
step->task[ii]->ofname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
srun_stdout_tasks = -2;
if (same)
srun_stderr_tasks = -2;
} else if (outpattern == SLURMD_ALL_SAME) {
/* Open a file for all tasks */
rc = io_create_local_client(
step->task[0]->ofname, file_flags,
step, 1, -1, same ? -1 : -2);
if (rc != SLURM_SUCCESS) {
error("Could not open output file %s: %m",
step->task[0]->ofname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
srun_stdout_tasks = -2;
if (same)
srun_stderr_tasks = -2;
}
if (!same) {
if (errpattern == SLURMD_ALL_UNIQUE) {
/* Open a separate file per task */
for (ii = 0;
ii < step->node_tasks; ii++) {
rc = io_create_local_client(
step->task[ii]->efname,
file_flags, step, 1,
-2, step->task[ii]->id);
if (rc != SLURM_SUCCESS) {
error("Could not open error file %s: %m",
step->task[ii]->
efname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
srun_stderr_tasks = -2;
} else if (errpattern == SLURMD_ALL_SAME) {
/* Open a file for all tasks */
rc = io_create_local_client(
step->task[0]->efname,
file_flags, step, 1, -2, -1);
if (rc != SLURM_SUCCESS) {
error("Could not open error file %s: %m",
step->task[0]->efname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
srun_stderr_tasks = -2;
}
}
}
if (io_initial_client_connect(srun, step, srun_stdout_tasks,
srun_stderr_tasks) < 0) {
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
claim:
if (reclaim_privileges(&sprivs) < 0) {
error("sete{u/g}id(%lu/%lu): %m",
(u_long) sprivs.saved_uid, (u_long) sprivs.saved_gid);
}
if (!rc && !step->batch)
io_thread_start(step);
debug2("Leaving _setup_normal_io");
return rc;
}
static void
_random_sleep(stepd_step_rec_t *step)
{
long int delay = 0;
long int max = (slurm_conf.tcp_timeout * step->nnodes);
max = MIN(max, 5000);
srand48((long int) (step->step_id.job_id + step->nodeid));
delay = lrand48() % ( max + 1 );
debug3("delaying %ldms", delay);
if (poll(NULL, 0, delay) == -1)
error("%s: poll(): %m", __func__);
}
/*
* Send task exit message for n tasks. tid is the list of _global_
* task ids that have exited
*/
static int
_send_exit_msg(stepd_step_rec_t *step, uint32_t *tid, int n, int status)
{
slurm_msg_t resp;
task_exit_msg_t msg;
list_itr_t *i = NULL;
srun_info_t *srun = NULL;
debug3("%s: sending task exit msg for %d tasks (oom:%s exit_status:%s",
__func__, n, (step->oom_error ? "true" : "false"),
slurm_strerror(status));
memset(&msg, 0, sizeof(msg));
msg.task_id_list = tid;
msg.num_tasks = n;
if (step->oom_error)
msg.return_code = SIG_OOM;
else if (WIFSIGNALED(status) && (step->flags & LAUNCH_NO_SIG_FAIL))
msg.return_code = SLURM_SUCCESS;
else
msg.return_code = status;
memcpy(&msg.step_id, &step->step_id, sizeof(msg.step_id));
slurm_msg_t_init(&resp);
resp.data = &msg;
resp.msg_type = MESSAGE_TASK_EXIT;
/*
* Hack for TCP timeouts on exit of large, synchronized step
* termination. Delay a random amount if step->nnodes > 500
*/
if (step->nnodes > 500)
_random_sleep(step);
/*
* Notify each srun and sattach.
* No message for poe or batch steps
*/
i = list_iterator_create(step->sruns);
while ((srun = list_next(i))) {
resp.address = srun->resp_addr;
if (slurm_addr_is_unspec(&resp.address))
continue; /* no srun or sattach here */
/* This should always be set to something else we have a bug. */
xassert(srun->protocol_version);
resp.protocol_version = srun->protocol_version;
resp.tls_cert = xstrdup(srun->tls_cert);
slurm_msg_set_r_uid(&resp, srun->uid);
if (_send_srun_resp_msg(&resp, step->nnodes) != SLURM_SUCCESS)
error("Failed to send MESSAGE_TASK_EXIT: %m");
}
list_iterator_destroy(i);
return SLURM_SUCCESS;
}
extern void stepd_wait_for_children_slurmstepd(stepd_step_rec_t *step)
{
int left = 0;
int rc;
struct timespec ts = {0, 0};
slurm_mutex_lock(&step_complete.lock);
/* wait an extra 3 seconds for every level of tree below this level */
if (step_complete.bits && (step_complete.children > 0)) {
ts.tv_sec += 3 * (step_complete.max_depth-step_complete.depth);
ts.tv_sec += time(NULL) + REVERSE_TREE_CHILDREN_TIMEOUT;
while((left = bit_clear_count(step_complete.bits)) > 0) {
debug3("Rank %d waiting for %d (of %d) children",
step_complete.rank, left,
step_complete.children);
rc = pthread_cond_timedwait(&step_complete.cond,
&step_complete.lock, &ts);
if (rc == ETIMEDOUT) {
debug2("Rank %d timed out waiting for %d"
" (of %d) children", step_complete.rank,
left, step_complete.children);
break;
}
}
if (left == 0) {
debug2("Rank %d got all children completions",
step_complete.rank);
}
} else {
debug2("Rank %d has no children slurmstepd",
step_complete.rank);
}
step_complete.step_rc = _get_exit_code(step);
step_complete.wait_children = false;
slurm_mutex_unlock(&step_complete.lock);
}
/*
* Send a single step completion message, which represents a single range
* of complete job step nodes.
*/
/* caller is holding step_complete.lock */
static void
_one_step_complete_msg(stepd_step_rec_t *step, int first, int last)
{
slurm_msg_t req;
step_complete_msg_t msg;
int rc = -1;
int retcode;
int i;
static bool acct_sent = false;
if (step->batch) { /* Nested batch step anomalies */
if (first == -1)
first = 0;
if (last == -1)
last = 0;
}
memset(&msg, 0, sizeof(msg));
memcpy(&msg.step_id, &step->step_id, sizeof(msg.step_id));
msg.range_first = first;
msg.range_last = last;
if (step->oom_error)
msg.step_rc = SIG_OOM;
else
msg.step_rc = step_complete.step_rc;
msg.jobacct = jobacctinfo_create(NULL);
/************* acct stuff ********************/
if (!acct_sent) {
/*
* No need to call _local_jobaccinfo_aggregate, step->jobacct
* already has the modified total for this node in the step.
*/
jobacctinfo_aggregate(step_complete.jobacct, step->jobacct);
jobacctinfo_getinfo(step_complete.jobacct,
JOBACCT_DATA_TOTAL, msg.jobacct,
SLURM_PROTOCOL_VERSION);
acct_sent = true;
}
/*********************************************/
debug2("%s: ranks=%d-%d parent_rank=%d step_rc[0x%x]=%s",
__func__, first, last, step_complete.parent_rank, msg.step_rc,
slurm_strerror(msg.step_rc));
slurm_msg_t_init(&req);
slurm_msg_set_r_uid(&req, slurm_conf.slurmd_user_id);
req.msg_type = REQUEST_STEP_COMPLETE;
req.data = &msg;
/* Do NOT change this check to "step_complete.rank != 0", because
* there are odd situations where SlurmUser or root could
* craft a launch without a valid credential, and no tree information
* can be built with out the hostlist from the credential.
*/
if (step_complete.parent_rank != -1) {
debug3("Rank %d sending complete to rank %d(%s), range %d to %d",
step_complete.rank, step_complete.parent_rank,
step_complete.parent_name, first, last);
/* On error, pause then try sending to parent again.
* The parent slurmstepd may just not have started yet, because
* of the way that the launch message forwarding works.
*/
if (slurm_conf_get_addr(step_complete.parent_name, &req.address,
0)) {
i = REVERSE_TREE_PARENT_RETRY;
error("%s: failed getting address for parent NodeName %s (parent rank %d)",
__func__, step_complete.parent_name,
step_complete.parent_rank);
} else
i = 0;
for (; i < REVERSE_TREE_PARENT_RETRY; i++) {
if (i)
sleep(1);
retcode = slurm_send_recv_rc_msg_only_one(&req, &rc, 0);
if ((retcode == 0) && (rc == 0))
goto finished;
}
/*
* On error AGAIN, send to the slurmctld instead.
* This is useful if parent_rank gave up waiting for us
* on stepd_wait_for_children_slurmstepd.
* If it's just busy handling our prev messages we'll need
* to handle duplicated messages in both the parent and
* slurmctld.
*/
debug3("Rank %d sending complete to slurmctld instead, range "
"%d to %d", step_complete.rank, first, last);
} else {
/* this is the base of the tree, its parent is slurmctld */
debug3("Rank %d sending complete to slurmctld, range %d to %d",
step_complete.rank, first, last);
}
if (step->stepmgr) {
slurm_msg_t resp_msg;
slurm_msg_t_init(&resp_msg);
slurm_conf_get_addr(step->stepmgr, &req.address,
req.flags);
slurm_msg_set_r_uid(&req, slurm_conf.slurmd_user_id);
msg.send_to_stepmgr = true;
debug3("sending complete to step_ctld host:%s",
step->stepmgr);
if (slurm_send_recv_node_msg(&req, &resp_msg, 0))
return;
goto finished;
}
/* Retry step complete RPC send to slurmctld indefinitely.
* Prevent orphan job step if slurmctld is down */
i = 1;
while (slurm_send_recv_controller_rc_msg(&req, &rc,
working_cluster_rec) < 0) {
if (i++ == 1) {
error("Rank %d failed sending step completion message directly to slurmctld, retrying",
step_complete.rank);
}
sleep(60);
}
if (i > 1) {
info("Rank %d sent step completion message directly to slurmctld",
step_complete.rank);
}
finished:
jobacctinfo_destroy(msg.jobacct);
}
/*
* Given a starting bit in the step_complete.bits bitstring, "start",
* find the next contiguous range of set bits and return the first
* and last indices of the range in "first" and "last".
*
* caller is holding step_complete.lock
*/
static int
_bit_getrange(int start, int size, int *first, int *last)
{
int i;
bool found_first = false;
if (!step_complete.bits)
return 0;
for (i = start; i < size; i++) {
if (bit_test(step_complete.bits, i)) {
if (found_first) {
*last = i;
continue;
} else {
found_first = true;
*first = i;
*last = i;
}
} else {
if (!found_first) {
continue;
} else {
*last = i - 1;
break;
}
}
}
if (found_first)
return 1;
else
return 0;
}
/*
* Send as many step completion messages as necessary to represent
* all completed nodes in the job step. There may be nodes that have
* not yet signaled their completion, so there will be gaps in the
* completed node bitmap, requiring that more than one message be sent.
*/
extern void stepd_send_step_complete_msgs(stepd_step_rec_t *step)
{
int start, size;
int first = -1, last = -1;
bool sent_own_comp_msg = false;
slurm_mutex_lock(&step_complete.lock);
start = 0;
if (step_complete.bits)
size = bit_size(step_complete.bits);
else
size = 0;
/* If no children, send message and return early */
if (size == 0) {
_one_step_complete_msg(step, step_complete.rank,
step_complete.rank);
slurm_mutex_unlock(&step_complete.lock);
return;
}
while (_bit_getrange(start, size, &first, &last)) {
/* THIS node is not in the bit string, so we need to prepend
* the local rank */
if (start == 0 && first == 0) {
sent_own_comp_msg = true;
first = -1;
}
_one_step_complete_msg(step, (first + step_complete.rank + 1),
(last + step_complete.rank + 1));
start = last + 1;
}
if (!sent_own_comp_msg) {
_one_step_complete_msg(step, step_complete.rank,
step_complete.rank);
}
slurm_mutex_unlock(&step_complete.lock);
}
extern void set_job_state(stepd_step_rec_t *step, slurmstepd_state_t new_state)
{
slurm_mutex_lock(&step->state_mutex);
step->state = new_state;
slurm_cond_signal(&step->state_cond);
slurm_mutex_unlock(&step->state_mutex);
}
/*
* Run SPANK functions within the job container.
* WARNING: This is running as a separate process, but sharing the parent's
* memory space. Be careful not to leak memory, or free resources that the
* parent needs to continue processing. Only use _exit() here, otherwise
* any plugins that registered their own fini() hooks will wreck the parent.
*/
#if defined(__linux__)
static int _spank_user_child(void *arg)
{
stepd_step_rec_t *step = arg;
struct priv_state sprivs;
int rc = 0;
if (container_g_join(&step->step_id, step->uid, false)) {
error("container_g_join(%u): %m", step->step_id.job_id);
_exit(-1);
}
if (drop_privileges(step, true, &sprivs, true) < 0) {
error("drop_privileges: %m");
_exit(-1);
}
if (spank_user(step))
rc = 1;
/*
* This is taken from the end of reclaim_privileges(). The child does
* not need to reclaim here since we simply exit, so instead clean up
* the structure and the lock so that the parent can continue to
* operate.
*/
xfree(sprivs.gid_list);
auth_setuid_unlock();
_exit(rc);
}
static int _spank_task_post_fork_child(void *arg)
{
spank_task_args_t *args = arg;
stepd_step_rec_t *step = args->step;
if (container_g_join(&step->step_id, step->uid, false)) {
error("container_g_join(%u): %m", step->step_id.job_id);
_exit(-1);
}
if (spank_task_post_fork(step, args->id))
_exit(1);
_exit(0);
}
static int _spank_task_exit_child(void *arg)
{
spank_task_args_t *args = arg;
stepd_step_rec_t *step = args->step;
if (container_g_join(&step->step_id, step->uid, false)) {
error("container_g_join(%u): %m", step->step_id.job_id);
_exit(-1);
}
if (spank_task_exit(step, args->id))
_exit(1);
_exit(0);
}
#endif
static int _run_spank_func(step_fn_t spank_func, stepd_step_rec_t *step, int id,
struct priv_state *sprivs)
{
int rc = SLURM_SUCCESS;
#if defined(__linux__)
if (slurm_conf.conf_flags & CONF_FLAG_CONTAIN_SPANK) {
pid_t pid = -1;
int flags = CLONE_VM | SIGCHLD;
char *stack = NULL;
int status = 0;
spank_task_args_t *args = NULL;
/*
* To enter the container, the process cannot share CLONE_FS
* with another process. However, the spank plugins require
* access to global memory that cannot be easily be shipped to
* another process.
*
* clone() + CLONE_VM allows the new process to have distinct
* filesystem attribites, but share memory space. By allowing
* the current process to continue executing, shared locks can
* be released allowing the new process to operate normally.
*/
if ((spank_func == SPANK_STEP_TASK_EXIT) &&
spank_has_task_exit()) {
args = xmalloc(sizeof(*args));
args->step = step;
args->id = id;
stack = xmalloc(STACK_SIZE);
pid = clone(_spank_task_exit_child,
stack + STACK_SIZE, flags, args);
} else if ((spank_func == SPANK_STEP_TASK_POST_FORK) &&
spank_has_task_post_fork()) {
args = xmalloc(sizeof(*args));
args->step = step;
args->id = id;
stack = xmalloc(STACK_SIZE);
pid = clone(_spank_task_post_fork_child,
stack + STACK_SIZE, flags, args);
} else if ((spank_func == SPANK_STEP_USER_INIT) &&
spank_has_user_init()) {
/*
* spank_user_init() runs as the user, but setns()
* requires CAP_SYS_ADMIN. Reclaim privileges here so
* setns() will function.
*/
if (reclaim_privileges(sprivs) < 0) {
error("Unable to reclaim privileges");
rc = 1;
goto fail;
}
stack = xmalloc(STACK_SIZE);
pid = clone(_spank_user_child,
stack + STACK_SIZE, flags, step);
} else {
/* no action required */
return rc;
}
if (pid == -1) {
error("clone failed before spank call: %m");
rc = SLURM_ERROR;
} else {
waitpid(pid, &status, 0);
if (WEXITSTATUS(status))
rc = SLURM_ERROR;
}
if (spank_func == SPANK_STEP_USER_INIT) {
if (drop_privileges(step, true, sprivs, true) < 0) {
error("drop_privileges: %m");
rc = 2;
}
}
fail:
xfree(args);
xfree(stack);
return rc;
}
#endif
/*
* Default case is to run these spank functions normally. To allow a
* different exit path, set rc = SLURM_ERROR if the plugstack call
* fails.
*/
if ((spank_func == SPANK_STEP_TASK_EXIT) &&
(spank_task_exit(step, id))) {
rc = SLURM_ERROR;
} else if ((spank_func == SPANK_STEP_TASK_POST_FORK) &&
(spank_task_post_fork(step, id))) {
rc = SLURM_ERROR;
} else if ((spank_func == SPANK_STEP_USER_INIT) &&
(spank_user(step))) {
rc = SLURM_ERROR;
}
return rc;
}
static bool _need_join_container()
{
/*
* To avoid potential problems with the job_container/tmpfs and
* home_xauthority, don't join the container to create the xauthority
* file when it is set.
*/
if ((xstrcasestr(slurm_conf.job_container_plugin, "tmpfs")) &&
(!xstrcasestr(slurm_conf.x11_params, "home_xauthority"))) {
return true;
}
return false;
}
static void _shutdown_x11_forward(stepd_step_rec_t *step)
{
struct priv_state sprivs = { 0 };
if (drop_privileges(step, true, &sprivs, false) < 0) {
error("%s: Unable to drop privileges", __func__);
return;
}
if (shutdown_x11_forward(step) != SLURM_SUCCESS)
error("%s: x11 forward shutdown failed", __func__);
if (reclaim_privileges(&sprivs) < 0)
error("%s: Unable to reclaim privileges", __func__);
}
static void _x11_signal_handler(conmgr_callback_args_t conmgr_args, void *arg)
{
static bool run_once = false;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
bool bail = false;
stepd_step_rec_t *step = (stepd_step_rec_t *) arg;
pid_t cpid, pid;
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
debug4("%s: cancelled", __func__);
return;
}
/*
* Protect against race of SIGTERM and step shutdown causing this
* function to run more than once.
*/
slurm_mutex_lock(&mutex);
if (run_once)
bail = true;
run_once = true;
slurm_mutex_unlock(&mutex);
if (bail) {
debug4("%s: Already run. bailing.", __func__);
return;
}
debug("Terminate signal (SIGTERM) received");
if (!_need_join_container()) {
_shutdown_x11_forward(step);
return;
}
if ((cpid = fork()) == 0) {
if (container_g_join(&step->step_id, step->uid, false) !=
SLURM_SUCCESS) {
error("%s: cannot join container",
__func__);
_exit(1);
}
_shutdown_x11_forward(step);
_exit(0);
} else if (cpid < 0) {
error("%s: fork: %m", __func__);
} else {
int status;
pid = waitpid(cpid, &status, 0);
if (pid < 0)
error("%s: waitpid failed: %m",
__func__);
else if (!WIFEXITED(status))
error("%s: child terminated abnormally",
__func__);
else if (WEXITSTATUS(status))
error("%s: child returned non-zero",
__func__);
}
}
static int _set_xauthority(stepd_step_rec_t *step)
{
struct priv_state sprivs = { 0 };
int rc = SLURM_SUCCESS;
if (drop_privileges(step, true, &sprivs, false) < 0) {
error("%s: Unable to drop privileges before xauth", __func__);
return SLURM_ERROR;
}
if (!xstrcasestr(slurm_conf.x11_params, "home_xauthority")) {
int fd;
/* protect against weak file permissions in old glibc */
umask(0077);
if ((fd = mkstemp(step->x11_xauthority)) == -1) {
error("%s: failed to create temporary XAUTHORITY file: %m",
__func__);
rc = SLURM_ERROR;
goto endit;
}
close(fd);
}
if (x11_set_xauth(step->x11_xauthority, step->x11_magic_cookie,
step->x11_display)) {
error("%s: failed to run xauth", __func__);
rc = SLURM_ERROR;
}
endit:
if (reclaim_privileges(&sprivs) < 0) {
error("%s: Unable to reclaim privileges after xauth", __func__);
return SLURM_ERROR;
}
return rc;
}
static int _run_prolog_epilog(stepd_step_rec_t *step, bool is_epilog)
{
int rc = SLURM_SUCCESS;
job_env_t job_env;
list_t *tmp_list;
memset(&job_env, 0, sizeof(job_env));
tmp_list = gres_g_prep_build_env(step->job_gres_list, step->node_list);
gres_g_prep_set_env(&job_env.gres_job_env, tmp_list, step->nodeid);
FREE_NULL_LIST(tmp_list);
job_env.jobid = step->step_id.job_id;
job_env.step_id = SLURM_EXTERN_CONT;
job_env.node_list = step->node_list;
job_env.het_job_id = step->het_job_id;
job_env.partition = step->msg->cred->arg->job_partition;
job_env.spank_job_env = step->msg->spank_job_env;
job_env.spank_job_env_size = step->msg->spank_job_env_size;
job_env.work_dir = step->cwd;
job_env.uid = step->uid;
job_env.gid = step->gid;
if (!is_epilog)
rc = run_prolog(&job_env, step->msg->cred);
else
rc = run_epilog(&job_env, step->msg->cred);
if (job_env.gres_job_env) {
for (int i = 0; job_env.gres_job_env[i]; i++)
xfree(job_env.gres_job_env[i]);
xfree(job_env.gres_job_env);
}
if (rc) {
int term_sig = 0, exit_status = 0;
if (WIFSIGNALED(rc))
term_sig = WTERMSIG(rc);
else if (WIFEXITED(rc))
exit_status = WEXITSTATUS(rc);
error("[job %u] %s failed status=%d:%d", step->step_id.job_id,
is_epilog ? "epilog" : "prolog", exit_status, term_sig);
rc = is_epilog ? ESLURMD_EPILOG_FAILED : ESLURMD_PROLOG_FAILED;
}
return rc;
}
static void _setup_x11_child(int to_parent[2], stepd_step_rec_t *step)
{
uint32_t len = 0;
if (container_g_join(&step->step_id, step->uid, false) !=
SLURM_SUCCESS) {
safe_write(to_parent[1], &len, sizeof(len));
_exit(1);
}
if (_set_xauthority(step) != SLURM_SUCCESS) {
safe_write(to_parent[1], &len, sizeof(len));
_exit(1);
}
len = strlen(step->x11_xauthority);
safe_write(to_parent[1], &len, sizeof(len));
safe_write(to_parent[1], step->x11_xauthority, len);
_exit(0);
rwfail:
error("%s: failed to write to parent: %m", __func__);
_exit(1);
}
static int _setup_x11_parent(int to_parent[2], pid_t pid, char **tmp)
{
uint32_t len = 0;
int status = 0;
safe_read(to_parent[0], &len, sizeof(len));
if (len) {
*tmp = xcalloc(len, sizeof(char));
safe_read(to_parent[0], *tmp, len);
}
if ((waitpid(pid, &status, 0) != pid) || WEXITSTATUS(status)) {
error("%s: Xauthority setup failed", __func__);
xfree(*tmp);
return SLURM_ERROR;
}
return SLURM_SUCCESS;
rwfail:
error("%s: failed to read from child: %m", __func__);
xfree(*tmp);
waitpid(pid, &status, 0);
debug2("%s: status from child %d", __func__, status);
return SLURM_ERROR;
}
static int _spawn_job_container(stepd_step_rec_t *step)
{
jobacctinfo_t *jobacct = NULL;
struct rusage rusage;
jobacct_id_t jobacct_id;
int rc = SLURM_SUCCESS;
uint32_t jobid = step->step_id.job_id;
if (container_g_stepd_create(jobid, step)) {
error("%s: container_g_stepd_create(%u): %m", __func__, jobid);
return SLURM_ERROR;
}
debug2("%s: Before call to spank_init()", __func__);
if ((rc = spank_init(step))) {
error("%s: Plugin stack initialization failed.", __func__);
/* let the slurmd know we actually are done with the setup */
close_slurmd_conn(rc);
return rc;
}
debug2("%s: After call to spank_init()", __func__);
if (task_g_pre_setuid(step)) {
error("%s: Failed to invoke task plugins: one of "
"task_p_pre_setuid functions returned error", __func__);
return SLURM_ERROR;
}
acct_gather_profile_g_task_start(0);
if (step->x11) {
struct priv_state sprivs = { 0 };
if (drop_privileges(step, true, &sprivs, false) < 0) {
error ("Unable to drop privileges");
return SLURM_ERROR;
}
if (setup_x11_forward(step) != SLURM_SUCCESS) {
/* ssh forwarding setup failed */
error("x11 port forwarding setup failed");
_exit(127);
}
if (reclaim_privileges(&sprivs) < 0) {
error ("Unable to reclaim privileges");
return SLURM_ERROR;
}
conmgr_add_work_signal(SIGTERM, _x11_signal_handler, step);
/*
* When using job_container/tmpfs we need to get into
* the correct namespace or .Xauthority won't be visible
* in /tmp from inside the job.
*/
if (_need_join_container()) {
pid_t pid;
int to_parent[2] = {-1, -1};
if (pipe(to_parent) < 0) {
error("%s: pipe failed: %m", __func__);
rc = SLURM_ERROR;
goto x11_fail;
}
/*
* The fork is necessary because we cannot join a
* namespace if we are multithreaded. Also we need to
* wait for the child to end before proceeding or there
* can be a timing race with srun starting X11 apps very
* fast.
*/
pid = fork();
if (pid == 0) {
_setup_x11_child(to_parent, step);
} else if (pid > 0) {
char *tmp = NULL;
rc = _setup_x11_parent(to_parent, pid, &tmp);
xfree(step->x11_xauthority);
if (tmp)
step->x11_xauthority = tmp;
} else {
error("fork: %m");
rc = SLURM_ERROR;
}
close(to_parent[0]);
close(to_parent[1]);
} else {
rc = _set_xauthority(step);
}
x11_fail:
if (rc != SLURM_SUCCESS) {
set_job_state(step, SLURMSTEPD_STEP_ENDING);
close_slurmd_conn(rc);
goto fail1;
}
debug("x11 forwarding local display is %d", step->x11_display);
debug("x11 forwarding local xauthority is %s",
step->x11_xauthority);
}
jobacct_id.nodeid = step->nodeid;
jobacct_id.taskid = step->nodeid; /* Treat node ID as global task ID */
jobacct_id.step = step;
jobacct_gather_set_proctrack_container_id(step->cont_id);
jobacct_gather_add_task(0, &jobacct_id, 1);
set_job_state(step, SLURMSTEPD_STEP_RUNNING);
if (!slurm_conf.job_acct_gather_freq)
jobacct_gather_stat_task(0, true);
if (_run_spank_func(SPANK_STEP_TASK_POST_FORK, step, -1, NULL) < 0) {
error("spank extern task post-fork failed");
rc = SLURM_ERROR;
} else if (slurm_conf.prolog_flags & PROLOG_FLAG_RUN_IN_JOB) {
rc = _run_prolog_epilog(step, false);
}
if (rc != SLURM_SUCCESS) {
/*
* Failure before the tasks have even started, so we will need
* to mark all of them as failed unless there is already an
* error present to avoid slurmctld from thinking this was a
* slurmd issue and the step just landed on an unhealthy node.
*/
slurm_mutex_lock(&step_complete.lock);
if (!step_complete.step_rc)
step_complete.step_rc = rc;
slurm_mutex_unlock(&step_complete.lock);
for (uint32_t i = 0; i < step->node_tasks; i++)
if (step->task[i]->estatus <= 0)
step->task[i]->estatus = W_EXITCODE(1, 0);
}
/*
* Tell slurmd the setup status; slurmd will handle a failure and
* cleanup the sleep task launched above, so we do not need to do
* anything special here to handle a setup failure.
*/
close_slurmd_conn(rc);
slurm_mutex_lock(&step->state_mutex);
while ((step->state < SLURMSTEPD_STEP_CANCELLED)) {
slurm_cond_wait(&step->state_cond, &step->state_mutex);
}
join_extern_threads();
slurm_mutex_unlock(&step->state_mutex);
/* Wait for all steps other than extern (this one) to complete */
if (!pause_for_job_completion(jobid, MAX(slurm_conf.kill_wait, 5),
true)) {
warning("steps did not complete quickly");
}
/* remove all tracked tasks */
while ((jobacct = jobacct_gather_remove_task(0))) {
if (jobacct->pid)
jobacctinfo_setinfo(jobacct, JOBACCT_DATA_RUSAGE,
&rusage, SLURM_PROTOCOL_VERSION);
step->jobacct->energy.consumed_energy = 0;
_local_jobacctinfo_aggregate(step->jobacct, jobacct);
jobacctinfo_destroy(jobacct);
}
step_complete.rank = step->nodeid;
acct_gather_profile_endpoll();
acct_gather_profile_g_node_step_end();
/* Call the other plugins to clean up
* the cgroup hierarchy.
*/
set_job_state(step, SLURMSTEPD_STEP_ENDING);
step_terminate_monitor_start(step);
proctrack_g_signal(step->cont_id, SIGKILL);
proctrack_g_wait(step->cont_id);
step_terminate_monitor_stop();
/*
* When an event is registered using the cgroups notification API and
* memory is constrained using task/cgroup, the following check needs to
* happen before any memory cgroup hierarchy removal.
*
* The eventfd will be woken up by control file implementation *or*
* when the cgroup is removed. Thus, for the second case (cgroup
* removal) we could be notified with false positive oom events.
*
* acct_gather_profile_fini() and task_g_post_step() can remove the
* cgroup hierarchy if the cgroup implementation of these plugins are
* configured.
*/
for (uint32_t i = 0; i < step->node_tasks; i++)
if (task_g_post_term(step, step->task[i]) == ENOMEM)
step->oom_error = true;
/* Lock to not collide with the _x11_signal_handler thread. */
auth_setuid_lock();
/*
* This function below calls jobacct_gather_fini(). For the case of
* jobacct_gather/cgroup, it ends up doing the cgroup hierarchy cleanup
* in here, and it should happen after the SIGKILL above so that all
* children processes from the step are gone.
*/
acct_gather_profile_fini();
task_g_post_step(step);
auth_setuid_unlock();
fail1:
conmgr_add_work_fifo(_x11_signal_handler, step);
debug2("%s: Before call to spank_fini()", __func__);
if (spank_fini(step))
error("spank_fini failed");
debug2("%s: After call to spank_fini()", __func__);
set_job_state(step, SLURMSTEPD_STEP_ENDING);
if (step_complete.rank > -1)
stepd_wait_for_children_slurmstepd(step);
/*
* Step failed outside of the exec()ed tasks, make sure to tell
* slurmctld about it to avoid the user not knowing about a
* failure.
*/
if (rc && !step_complete.step_rc)
step_complete.step_rc = rc;
stepd_send_step_complete_msgs(step);
switch_g_extern_step_fini(jobid);
if (slurm_conf.prolog_flags & PROLOG_FLAG_RUN_IN_JOB) {
/* Force all other steps to end before epilog starts */
pause_for_job_completion(jobid, 0, true);
int epilog_rc = _run_prolog_epilog(step, true);
epilog_complete(step->step_id.job_id, step->node_list,
epilog_rc);
}
return rc;
}
/*
* Executes the functions of the slurmd job manager process,
* which runs as root and performs shared memory and interconnect
* initialization, etc.
*
* Returns 0 if job ran and completed successfully.
* Returns errno if job startup failed. NOTE: This will DRAIN the node.
*/
int
job_manager(stepd_step_rec_t *step)
{
int rc = SLURM_SUCCESS;
bool io_initialized = false;
char *oom_val_str;
debug3("Entered job_manager for %ps pid=%d",
&step->step_id, step->jmgr_pid);
#ifdef PR_SET_DUMPABLE
if (prctl(PR_SET_DUMPABLE, 1) < 0)
debug ("Unable to set dumpable to 1");
#endif /* PR_SET_DUMPABLE */
/*
* Set oom_score_adj of this slurmstepd to the minimum to avoid OOM
* killing us before the user processes. If we were killed at this point
* due to other steps OOMing, no cleanup would happen, leaving for
* example cgroup stray directories if cgroup plugins were initialized.
*/
set_oom_adj(STEPD_OOM_ADJ);
debug("Setting slurmstepd(%d) oom_score_adj to %d", getpid(),
STEPD_OOM_ADJ);
/*
* Readjust this slurmstepd oom_score_adj now that we've loaded the
* task plugin. If the environment variable SLURMSTEPD_OOM_ADJ is set
* and is a valid number (from -1000 to 1000) set the score to that
* value.
*/
if ((oom_val_str = getenv("SLURMSTEPD_OOM_ADJ"))) {
int oom_val = atoi(oom_val_str);
if ((oom_val >= -1000) && (oom_val <= 1000)) {
debug("Setting slurmstepd oom_score_adj from env to %d",
oom_val);
set_oom_adj(oom_val);
}
}
if (!step->batch && (step->step_id.step_id != SLURM_EXTERN_CONT) &&
(step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
(mpi_process_env(&step->env) != SLURM_SUCCESS)) {
rc = SLURM_MPI_PLUGIN_NAME_INVALID;
goto fail1;
}
if (!step->batch && (step->step_id.step_id != SLURM_EXTERN_CONT) &&
(step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
(switch_g_job_preinit(step) < 0)) {
rc = ESLURM_INTERCONNECT_FAILURE;
goto fail1;
}
if ((step->cont_id == 0) &&
(proctrack_g_create(step) != SLURM_SUCCESS)) {
error("proctrack_g_create: %m");
rc = ESLURMD_SETUP_ENVIRONMENT_ERROR;
goto fail1;
}
if (step->step_id.step_id == SLURM_EXTERN_CONT)
return _spawn_job_container(step);
debug2("Before call to spank_init()");
if ((rc = spank_init(step))) {
error ("Plugin stack initialization failed.");
goto fail1;
}
debug2("After call to spank_init()");
/* Call switch_g_job_init() before becoming user */
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
step->argv && (switch_g_job_init(step) < 0)) {
/* error("switch_g_job_init: %m"); already logged */
rc = ESLURM_INTERCONNECT_FAILURE;
goto fail2;
}
/* fork necessary threads for MPI */
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
(mpi_g_slurmstepd_prefork(step, &step->env) != SLURM_SUCCESS)) {
error("Failed mpi_g_slurmstepd_prefork");
rc = SLURM_ERROR;
goto fail2;
}
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
(step->accel_bind_type || step->tres_bind)) {
uint64_t gpu_cnt, nic_cnt;
gpu_cnt = gres_step_count(step->step_gres_list, "gpu");
nic_cnt = gres_step_count(step->step_gres_list, "nic");
if ((gpu_cnt <= 1) || (gpu_cnt == NO_VAL64))
step->accel_bind_type &= (~ACCEL_BIND_CLOSEST_GPU);
if ((nic_cnt <= 1) || (nic_cnt == NO_VAL64))
step->accel_bind_type &= (~ACCEL_BIND_CLOSEST_NIC);
if (step->accel_bind_type == ACCEL_BIND_VERBOSE)
step->accel_bind_type = 0;
}
/*
* Calls pam_setup() and requires pam_finish() if
* successful. Only check for < 0 here since other slurm
* error codes could come that are more descriptive.
*/
if ((rc = _fork_all_tasks(step, &io_initialized)) < 0) {
debug("_fork_all_tasks failed");
rc = ESLURMD_EXECVE_FAILED;
goto fail2;
}
/*
* If IO initialization failed, return SLURM_SUCCESS (on a
* batch step) or the node will be drain otherwise. Regular
* srun needs the error sent or it will hang waiting for the
* launch to happen.
*/
if ((rc != SLURM_SUCCESS) || !io_initialized)
goto fail2;
io_close_task_fds(step);
/* Attach slurmstepd to system cgroups, if configured */
attach_system_cgroup_pid(getpid());
/* if we are not polling then we need to make sure we get some
* information here
*/
if (!slurm_conf.job_acct_gather_freq)
jobacct_gather_stat_task(0, true);
/* Send step launch response with list of pids */
_send_launch_resp(step, 0);
set_job_state(step, SLURMSTEPD_STEP_RUNNING);
#ifdef PR_SET_DUMPABLE
/* RHEL6 requires setting "dumpable" flag AGAIN; after euid changes */
if (prctl(PR_SET_DUMPABLE, 1) < 0)
debug ("Unable to set dumpable to 1");
#endif /* PR_SET_DUMPABLE */
/*
* task_g_post_term() needs to be called before
* acct_gather_profile_fini() and task_g_post_step().
*/
_wait_for_all_tasks(step);
acct_gather_profile_endpoll();
acct_gather_profile_g_node_step_end();
set_job_state(step, SLURMSTEPD_STEP_ENDING);
fail2:
/*
* First call switch_g_job_postfini() - In at least one case,
* this will clean up any straggling processes. If this call
* is moved behind wait_for_io(), we may block waiting for IO
* on a hung process.
*
* Make sure all processes in session are dead. On systems
* with an IBM Federation switch, all processes must be
* terminated before the switch window can be released by
* switch_g_job_postfini().
*/
set_job_state(step, SLURMSTEPD_STEP_ENDING);
step_terminate_monitor_start(step);
if (step->cont_id != 0) {
proctrack_g_signal(step->cont_id, SIGKILL);
proctrack_g_wait(step->cont_id);
}
step_terminate_monitor_stop();
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP)) {
/* This sends a SIGKILL to the pgid */
if (switch_g_job_postfini(step) < 0) {
error("switch_g_job_postfini: %m");
/*
* Drain the node since resources might still be kept.
* (E.g, cxi_service for switch/hpe_slingshot.)
*/
stepd_drain_node("switch_g_job_postfini failed");
}
}
/*
* This function below calls jobacct_gather_fini(). For the case of
* jobacct_gather/cgroup, it ends up doing the cgroup hierarchy cleanup
* in here, and it should happen after the SIGKILL above so that all
* children processes from the step are gone.
*/
acct_gather_profile_fini();
/*
* Wait for io thread to complete (if there is one)
*/
if (!step->batch && io_initialized)
_wait_for_io(step);
/*
* Warn task plugin that the user's step have terminated
*/
task_g_post_step(step);
/*
* Reset CPU frequencies if changed
*/
if ((step->cpu_freq_min != NO_VAL) || (step->cpu_freq_max != NO_VAL) ||
(step->cpu_freq_gov != NO_VAL))
cpu_freq_reset(step);
/*
* Reset GRES hardware, if needed. This is where GPU frequency is reset.
* Make sure stepd is root. If not, emit error.
*/
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP) &&
step->tres_freq) {
if (getuid() == (uid_t) 0)
gres_g_step_hardware_fini();
else
error("%s: invalid permissions: cannot uninitialize GRES hardware unless Slurmd was started as root",
__func__);
}
/*
* Notify srun of completion AFTER frequency reset to avoid race
* condition starting another job on these CPUs.
*/
while (stepd_send_pending_exit_msgs(step)) {;}
debug2("Before call to spank_fini()");
if (spank_fini(step))
error("spank_fini failed");
debug2("After call to spank_fini()");
/*
* This just cleans up all of the PAM state in case rc == 0
* which means _fork_all_tasks performs well.
* Must be done after IO termination in case of IO operations
* require something provided by the PAM (i.e. security token)
*/
if (!rc)
pam_finish();
fail1:
/* If interactive job startup was abnormal,
* be sure to notify client.
*/
set_job_state(step, SLURMSTEPD_STEP_ENDING);
if (rc != 0) {
error("%s: exiting abnormally: %s",
__func__, slurm_strerror(rc));
_send_launch_resp(step, rc);
}
if (!step->batch && (step_complete.rank > -1)) {
if (step->aborted)
info("job_manager exiting with aborted job");
else
stepd_wait_for_children_slurmstepd(step);
/*
* Step failed outside of the exec()ed tasks, make sure to tell
* slurmctld about it to avoid the user not knowing about a
* failure.
*/
if (rc && !step_complete.step_rc)
step_complete.step_rc = rc;
stepd_send_step_complete_msgs(step);
}
return(rc);
}
static int _pre_task_child_privileged(
stepd_step_rec_t *step, int taskid, struct priv_state *sp)
{
int setwd = 0; /* set working dir */
int rc = 0;
if (reclaim_privileges(sp) < 0)
return SLURM_ERROR;
set_oom_adj(0); /* the tasks may be killed by OOM */
if (!(step->flags & LAUNCH_NO_ALLOC)) {
/* Add job's pid to job container, if a normal job */
if (container_g_join(&step->step_id, step->uid, false)) {
error("container_g_join failed: %u",
step->step_id.job_id);
exit(1);
}
/*
* tmpfs job container plugin changes the working directory
* back to root working directory, so change it back to users
* but after dropping privillege
*/
setwd = 1;
}
if (spank_task_privileged(step, taskid))
return error("spank_task_init_privileged failed");
/* sp->gid_list should already be initialized */
rc = drop_privileges(step, true, sp, false);
if (rc) {
error ("drop_privileges: %m");
return rc;
}
if (step->container) {
/* Container jobs must start in the correct directory */
if (chdir(step->cwd) < 0) {
error("couldn't chdir to `%s': %m", step->cwd);
return errno;
}
debug2("%s: chdir(%s) success", __func__, step->cwd);
} else if (setwd) {
if (chdir(step->cwd) < 0) {
error("couldn't chdir to `%s': %m: going to /tmp instead",
step->cwd);
if (chdir("/tmp") < 0) {
error("couldn't chdir to /tmp either. dying.");
return SLURM_ERROR;
}
}
}
return rc;
}
struct exec_wait_info {
int id;
pid_t pid;
int parentfd;
int childfd;
};
static struct exec_wait_info * _exec_wait_info_create (int i)
{
int fdpair[2];
struct exec_wait_info * e;
if (pipe2(fdpair, O_CLOEXEC) < 0) {
error ("_exec_wait_info_create: pipe: %m");
return NULL;
}
e = xmalloc (sizeof (*e));
e->childfd = fdpair[0];
e->parentfd = fdpair[1];
e->id = i;
e->pid = -1;
return (e);
}
static void _exec_wait_info_destroy (struct exec_wait_info *e)
{
if (e == NULL)
return;
if (e->parentfd >= 0) {
close (e->parentfd);
e->parentfd = -1;
}
if (e->childfd >= 0) {
close (e->childfd);
e->childfd = -1;
}
e->id = -1;
e->pid = -1;
xfree(e);
}
static pid_t _exec_wait_get_pid (struct exec_wait_info *e)
{
if (e == NULL)
return (-1);
return (e->pid);
}
static struct exec_wait_info * _fork_child_with_wait_info (int id)
{
struct exec_wait_info *e;
if (!(e = _exec_wait_info_create (id)))
return (NULL);
if ((e->pid = fork ()) < 0) {
_exec_wait_info_destroy (e);
return (NULL);
}
/*
* Close parentfd in child, and childfd in parent:
*/
if (e->pid == 0) {
close (e->parentfd);
e->parentfd = -1;
} else {
close (e->childfd);
e->childfd = -1;
}
return (e);
}
static int _exec_wait_child_wait_for_parent (struct exec_wait_info *e)
{
char c;
if (read (e->childfd, &c, sizeof (c)) != 1)
return error ("_exec_wait_child_wait_for_parent: failed: %m");
return (0);
}
static int exec_wait_signal_child (struct exec_wait_info *e)
{
char c = '\0';
safe_write(e->parentfd, &c, sizeof(c));
return SLURM_SUCCESS;
rwfail:
error("%s: write(fd:%d) to unblock task %d failed",
__func__, e->parentfd, e->id);
return SLURM_ERROR;
}
static int exec_wait_signal (struct exec_wait_info *e, stepd_step_rec_t *step)
{
debug3 ("Unblocking %ps task %d, writefd = %d",
&step->step_id, e->id, e->parentfd);
if (exec_wait_signal_child(e) != SLURM_SUCCESS) {
/*
* can't unblock the task so it must have errored out already
*/
if (!step->task[e->id]->estatus)
step->task[e->id]->estatus = W_EXITCODE(1, 0);
step->task[e->id]->exited = true;
}
return (0);
}
/*
* Send SIGKILL to child in exec_wait_info 'e'
* Returns 0 for success, -1 for failure.
*/
static int exec_wait_kill_child (struct exec_wait_info *e)
{
if (e->pid < 0)
return (-1);
if (kill (e->pid, SIGKILL) < 0)
return (-1);
e->pid = -1;
return (0);
}
/*
* Send all children in exec_wait_list SIGKILL.
* Returns 0 for success or < 0 on failure.
*/
static int exec_wait_kill_children(list_t *exec_wait_list)
{
int rc = 0;
int count;
struct exec_wait_info *e;
list_itr_t *i;
if ((count = list_count (exec_wait_list)) == 0)
return (0);
verbose ("Killing %d remaining child%s",
count, (count > 1 ? "ren" : ""));
i = list_iterator_create (exec_wait_list);
if (i == NULL)
return error ("exec_wait_kill_children: iterator_create: %m");
while ((e = list_next (i)))
rc += exec_wait_kill_child (e);
list_iterator_destroy (i);
return (rc);
}
static void prepare_stdio (stepd_step_rec_t *step, stepd_step_task_info_t *task)
{
#ifdef HAVE_PTY_H
if ((step->flags & LAUNCH_PTY) && (task->gtid == 0)) {
if (login_tty(task->stdin_fd))
error("login_tty: %m");
else
debug3("login_tty good");
return;
}
#endif
io_dup_stdio(task);
return;
}
/*
* fork and exec N tasks
*/
static int
_fork_all_tasks(stepd_step_rec_t *step, bool *io_initialized)
{
int rc = SLURM_SUCCESS;
int i;
struct priv_state sprivs;
jobacct_id_t jobacct_id;
list_t *exec_wait_list = NULL;
uint32_t node_offset = 0, task_offset = 0;
char saved_cwd[PATH_MAX];
if (step->het_job_node_offset != NO_VAL)
node_offset = step->het_job_node_offset;
if (step->het_job_task_offset != NO_VAL)
task_offset = step->het_job_task_offset;
DEF_TIMERS;
START_TIMER;
xassert(step != NULL);
if (!getcwd(saved_cwd, sizeof(saved_cwd))) {
error ("Unable to get current working directory: %m");
strlcpy(saved_cwd, "/tmp", sizeof(saved_cwd));
}
if (task_g_pre_setuid(step)) {
error("Failed to invoke task plugins: one of task_p_pre_setuid functions returned error");
return SLURM_ERROR;
}
/*
* Temporarily drop effective privileges, except for the euid.
* We need to wait until after pam_setup() to drop euid.
*/
if (drop_privileges (step, false, &sprivs, true) < 0)
return ESLURMD_SET_UID_OR_GID_ERROR;
if (pam_setup(step->user_name, conf->hostname)
!= SLURM_SUCCESS){
error ("error in pam_setup");
rc = SLURM_ERROR;
}
/*
* Reclaim privileges to do the io setup
*/
if (reclaim_privileges(&sprivs) < 0) {
error("Unable to reclaim privileges");
/* Don't bother erroring out here */
}
if (rc)
goto fail1; /* pam_setup error */
set_umask(step); /* set umask for stdout/err files */
rc = _setup_normal_io(step);
/*
* Initialize log facility to copy errors back to srun
*/
if (!rc)
rc = _slurmd_job_log_init(step);
if (rc) {
error("%s: IO setup failed: %s", __func__, slurm_strerror(rc));
step->task[0]->estatus = W_EXITCODE(1, 0);
slurm_mutex_lock(&step_complete.lock);
step_complete.step_rc = rc;
slurm_mutex_unlock(&step_complete.lock);
if (step->batch)
rc = SLURM_SUCCESS; /* drains node otherwise */
goto fail1;
} else {
*io_initialized = true;
}
/*
* Now that errors will be copied back to srun, set the frequencies of
* the GPUs allocated to the step (and eventually other GRES hardware
* config options). Make sure stepd is root. If not, emit error.
* TODO: generic "settings" parameter rather than tres_freq
*/
if (!step->batch && (step->step_id.step_id != SLURM_INTERACTIVE_STEP)) {
/* Handle GpuFreqDef option */
if (!step->tres_freq && slurm_conf.gpu_freq_def) {
debug("Setting GPU to GpuFreqDef=%s",
slurm_conf.gpu_freq_def);
xstrfmtcat(step->tres_freq, "gpu:%s",
slurm_conf.gpu_freq_def);
}
if (step->tres_freq && (getuid() == (uid_t) 0)) {
gres_g_step_hardware_init(step->step_gres_list,
step->nodeid,
step->tres_freq);
} else if (step->tres_freq) {
error("%s: invalid permissions: cannot initialize GRES hardware unless Slurmd was started as root",
__func__);
}
}
/*
* Temporarily drop effective privileges
*/
if (drop_privileges (step, true, &sprivs, true) < 0) {
error ("drop_privileges: %m");
rc = SLURM_ERROR;
goto fail2;
}
if (chdir(step->cwd) < 0) {
error("couldn't chdir to `%s': %m: going to /tmp instead",
step->cwd);
if (chdir("/tmp") < 0) {
error("couldn't chdir to /tmp either. dying.");
rc = SLURM_ERROR;
goto fail3;
}
}
if ((rc = _run_spank_func(SPANK_STEP_USER_INIT, step, -1, &sprivs))) {
if (rc < 0) {
error("spank_user failed.");
rc = SLURM_ERROR;
step->task[0]->estatus = W_EXITCODE(1, 0);
step->task[0]->exited = true;
slurm_mutex_lock(&step_complete.lock);
if (!step_complete.step_rc)
step_complete.step_rc = rc;
slurm_mutex_unlock(&step_complete.lock);
goto fail4;
} else {
/*
* A drop_privileges() or reclaim_privileges() failed,
* In this case, bail out skipping the redundant
* reclaim.
*/
goto fail2;
}
}
exec_wait_list = list_create ((ListDelF) _exec_wait_info_destroy);
/*
* Fork all of the task processes.
*/
verbose("starting %u tasks", step->node_tasks);
for (i = 0; i < step->node_tasks; i++) {
char time_stamp[256];
pid_t pid;
struct exec_wait_info *ei;
acct_gather_profile_g_task_start(i);
if ((ei = _fork_child_with_wait_info(i)) == NULL) {
error("child fork: %m");
exec_wait_kill_children(exec_wait_list);
rc = SLURM_ERROR;
goto fail4;
} else if ((pid = _exec_wait_get_pid(ei)) == 0) { /* child */
int rc;
/*
* Destroy exec_wait_list in the child.
* Only exec_wait_info for previous tasks have been
* added to the list so far, so everything else
* can be discarded.
*/
FREE_NULL_LIST(exec_wait_list);
/* jobacctinfo_endpoll();
* closing jobacct files here causes deadlock */
if (slurm_conf.propagate_prio_process)
_set_prio_process(step);
/*
* Reclaim privileges for the child and call any plugin
* hooks that may require elevated privs
* sprivs.gid_list is already set from the
* drop_privileges call above, no not reinitialize.
* NOTE: Only put things in here that are self contained
* and belong in the child.
*/
if ((rc = _pre_task_child_privileged(step, i, &sprivs)))
fatal("%s: _pre_task_child_privileged() failed: %s",
__func__, slurm_strerror(rc));
if (_become_user(step, &sprivs) < 0) {
error("_become_user failed: %m");
/* child process, should not return */
_exit(1);
}
/* log_fini(); */ /* note: moved into exec_task() */
/*
* Need to setup stdio before setpgid() is called
* in case we are setting up a tty. (login_tty()
* must be called before setpgid() or it is
* effectively disabled).
*/
prepare_stdio(step, step->task[i]);
/* Close profiling file descriptors */
acct_gather_profile_g_child_forked();
/*
* Block until parent notifies us that it is ok to
* proceed. This allows the parent to place all
* children in any process groups or containers
* before they make a call to exec(2).
*/
if (_exec_wait_child_wait_for_parent(ei) < 0)
_exit(1);
exec_task(step, i);
}
/*
* Parent continues:
*/
list_append(exec_wait_list, ei);
log_timestamp(time_stamp, sizeof(time_stamp));
verbose("task %lu (%lu) started %s",
(unsigned long) step->task[i]->gtid + task_offset,
(unsigned long) pid, time_stamp);
step->task[i]->pid = pid;
if (i == 0)
step->pgid = pid;
}
/*
* All tasks are now forked and running as the user, but
* will wait for our signal before calling exec.
*/
/*
* Reclaim privileges
*/
if (reclaim_privileges(&sprivs) < 0) {
error ("Unable to reclaim privileges");
/* Don't bother erroring out here */
}
if (chdir(saved_cwd) < 0) {
error ("Unable to return to working directory");
}
for (i = 0; i < step->node_tasks; i++) {
/*
* Put this task in the step process group
* login_tty() must put task zero in its own
* session, causing setpgid() to fail, setsid()
* has already set its process group as desired
*/
if (((step->flags & LAUNCH_PTY) == 0) &&
(setpgid (step->task[i]->pid, step->pgid) < 0)) {
error("Unable to put task %d (pid %d) into pgrp %d: %m",
i, step->task[i]->pid, step->pgid);
}
if (proctrack_g_add(step, step->task[i]->pid)
== SLURM_ERROR) {
error("proctrack_g_add: %m");
rc = SLURM_ERROR;
goto fail2;
}
jobacct_id.nodeid = step->nodeid + node_offset;
jobacct_id.taskid = step->task[i]->gtid + task_offset;
jobacct_id.step = step;
if (i == (step->node_tasks - 1)) {
/* start polling on the last task */
jobacct_gather_set_proctrack_container_id(
step->cont_id);
jobacct_gather_add_task(step->task[i]->pid, &jobacct_id,
1);
} else {
/* don't poll yet */
jobacct_gather_add_task(step->task[i]->pid, &jobacct_id,
0);
}
/*
* Affinity must be set after cgroup is set, or moving pids from
* one cgroup to another will reset affinity.
*/
if (task_g_pre_launch_priv(step, i, jobacct_id.taskid) < 0) {
error("task_g_set_affinity: %m");
rc = SLURM_ERROR;
goto fail2;
}
if (_run_spank_func(SPANK_STEP_TASK_POST_FORK, step, i, NULL) < 0) {
error ("spank task %d post-fork failed", i);
rc = SLURM_ERROR;
/*
* Failure before the tasks have even started, so we
* will need to mark all of them as failed unless there
* is already an error present to avoid slurmctld from
* thinking this was a slurmd issue and the step just
* landed on an unhealthy node.
*/
slurm_mutex_lock(&step_complete.lock);
if (!step_complete.step_rc)
step_complete.step_rc = rc;
slurm_mutex_unlock(&step_complete.lock);
if (step->task[i]->estatus <= 0)
step->task[i]->estatus = W_EXITCODE(1, 0);
step->task[i]->exited = true;
goto fail2;
}
}
// jobacct_gather_set_proctrack_container_id(step->cont_id);
/*
* Now it's ok to unblock the tasks, so they may call exec.
*/
list_for_each (exec_wait_list, (ListForF) exec_wait_signal, step);
FREE_NULL_LIST (exec_wait_list);
for (i = 0; i < step->node_tasks; i++) {
/*
* Prepare process for attach by parallel debugger
* (if specified and able)
*/
if (pdebug_trace_process(step, step->task[i]->pid)
== SLURM_ERROR) {
rc = SLURM_ERROR;
goto fail2;
}
}
END_TIMER2(__func__);
return rc;
fail4:
if (chdir(saved_cwd) < 0) {
error ("Unable to return to working directory");
}
fail3:
if (reclaim_privileges(&sprivs) < 0) {
error("Unable to reclaim privileges");
/* Don't bother erroring out here */
}
fail2:
FREE_NULL_LIST(exec_wait_list);
io_close_task_fds(step);
fail1:
pam_finish();
END_TIMER2(__func__);
return rc;
}
/*
* Loop once through tasks looking for all tasks that have exited with
* the same exit status (and whose statuses have not been sent back to
* the client) Aggregate these tasks into a single task exit message.
*
*/
extern int stepd_send_pending_exit_msgs(stepd_step_rec_t *step)
{
int i;
int nsent = 0;
int status = 0;
bool set = false;
uint32_t *tid;
/*
* Collect all exit codes with the same status into a
* single message.
*/
tid = xmalloc(sizeof(uint32_t) * step->node_tasks);
for (i = 0; i < step->node_tasks; i++) {
stepd_step_task_info_t *t = step->task[i];
if (!t->exited || t->esent)
continue;
if (!set) {
status = t->estatus;
set = true;
} else if (status != t->estatus)
continue;
tid[nsent++] = t->gtid;
t->esent = true;
}
if (nsent) {
debug2("%s: aggregated %d task exit messages (rc=[0x%x]:%s)",
__func__, nsent, status, slurm_strerror(status));
_send_exit_msg(step, tid, nsent, status);
}
xfree(tid);
return nsent;
}
static inline void
_log_task_exit(unsigned long taskid, unsigned long pid, int status)
{
/*
* Print a nice message to the log describing the task exit status.
*
* The final else is there just in case there is ever an exit status
* that isn't WIFEXITED || WIFSIGNALED. We'll probably never reach
* that code, but it is better than dropping a potentially useful
* exit status.
*/
if ((status & 0xff) == SIG_OOM) {
verbose("task %lu (%lu) Out Of Memory (OOM)",
taskid, pid);
} else if (WIFEXITED(status)) {
verbose("task %lu (%lu) exited with exit code %d.",
taskid, pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
/* WCOREDUMP isn't available on AIX */
verbose("task %lu (%lu) exited. Killed by signal %d%s.",
taskid, pid, WTERMSIG(status),
#ifdef WCOREDUMP
WCOREDUMP(status) ? " (core dumped)" : ""
#else
""
#endif
);
} else {
verbose("task %lu (%lu) exited with status 0x%04x.",
taskid, pid, status);
}
}
/*
* If waitflag is true, perform a blocking wait for a single process
* and then return.
*
* If waitflag is false, do repeated non-blocking waits until
* there are no more processes to reap (waitpid returns 0).
*
* Returns the number of tasks for which a wait3() was successfully
* performed, or -1 if there are no child tasks.
*/
static int
_wait_for_any_task(stepd_step_rec_t *step, bool waitflag)
{
pid_t pid;
int completed = 0;
uint32_t task_offset = 0;
if (step->het_job_task_offset != NO_VAL)
task_offset = step->het_job_task_offset;
do {
stepd_step_task_info_t *t = NULL;
int rc = 0;
jobacctinfo_t *jobacct = NULL;
char **tmp_env;
pid = proctrack_g_wait_for_any_task(step, &t, waitflag);
if (pid == -1) {
if (errno == ECHILD) {
debug("No child processes");
if (completed == 0)
completed = -1;
break;
} else if (errno == EINTR) {
debug("wait3 was interrupted");
continue;
} else {
debug("Unknown errno %d", errno);
continue;
}
} else if (pid == 0) { /* WNOHANG and no pids available */
break;
}
/************* acct stuff ********************/
jobacct = jobacct_gather_remove_task(pid);
if (jobacct) {
jobacctinfo_setinfo(jobacct,
JOBACCT_DATA_RUSAGE, &t->rusage,
SLURM_PROTOCOL_VERSION);
/* Since we currently don't track energy
usage per task (only per step). We take
into account only the last poll of the last task.
Odds are it is the only one with
information anyway, but just to be safe we
will zero out the previous value since this
one will over ride it.
If this ever changes in the future this logic
will need to change.
*/
if (jobacct->energy.consumed_energy)
step->jobacct->energy.consumed_energy = 0;
_local_jobacctinfo_aggregate(step->jobacct, jobacct);
jobacctinfo_destroy(jobacct);
}
acct_gather_profile_g_task_end(pid);
/*********************************************/
if (t) {
debug2("%s: found ended task with primary pid %d ",
__func__, t->pid);
completed++;
_log_task_exit(t->gtid + task_offset, pid, t->estatus);
t->exited = true;
step->envtp->procid = t->gtid + task_offset;
step->envtp->localid = t->id;
step->envtp->distribution = -1;
step->envtp->batch_flag = step->batch;
step->envtp->uid = step->uid;
step->envtp->user_name = xstrdup(step->user_name);
step->envtp->nodeid = step->nodeid;
step->envtp->oom_kill_step = step->oom_kill_step ? 1 : 0;
/*
* Modify copy of job's environment. Do not alter in
* place or concurrent searches of the environment can
* generate invalid memory references.
*/
step->envtp->env =
env_array_copy((const char **) step->env);
setup_env(step->envtp, false);
tmp_env = step->env;
step->env = step->envtp->env;
env_array_free(tmp_env);
setenvf(&step->env, "SLURM_SCRIPT_CONTEXT",
"epilog_task");
setenvf(&step->env, "SLURMD_NODENAME", "%s",
conf->node_name);
if (step->task_epilog) {
rc = _run_script_as_user("user task_epilog",
step->task_epilog,
step, 5, step->env);
if (rc)
error("TaskEpilog failed status=%d",
rc);
}
if (slurm_conf.task_epilog) {
rc = _run_script_as_user("slurm task_epilog",
slurm_conf.task_epilog,
step, -1, step->env);
if (rc)
error("--task-epilog failed status=%d",
rc);
}
if (_run_spank_func(SPANK_STEP_TASK_EXIT, step, t->id,
NULL) < 0)
error ("Unable to spank task %d at exit",
t->id);
rc = task_g_post_term(step, t);
if (rc == ENOMEM)
step->oom_error = true;
else if (rc && !t->estatus)
t->estatus = rc;
if (t->estatus) {
slurm_mutex_lock(&step_complete.lock);
if (!step_complete.step_rc)
step_complete.step_rc = t->estatus;
slurm_mutex_unlock(&step_complete.lock);
}
}
} while ((pid > 0) && !waitflag);
return completed;
}
static void
_wait_for_all_tasks(stepd_step_rec_t *step)
{
int tasks_left = 0;
int i;
for (i = 0; i < step->node_tasks; i++) {
if (step->task[i]->state < STEPD_STEP_TASK_COMPLETE) {
tasks_left++;
}
}
if (tasks_left < step->node_tasks)
verbose("Only %d of %d requested tasks successfully launched",
tasks_left, step->node_tasks);
for (i = 0; i < tasks_left; ) {
int rc;
if ((rc = _wait_for_any_task(step, true)) == -1) {
error("%s: No child processes. node_tasks:%u, expected:%d, reaped:%d",
__func__, step->node_tasks, tasks_left, i);
break;
}
i += rc;
if (i < tasks_left) {
/* To limit the amount of traffic back
* we will sleep a bit to make sure we
* have most if not all the tasks
* completed before we return */
usleep(100000); /* 100 msec */
rc = _wait_for_any_task(step, false);
if (rc != -1)
i += rc;
}
if (i < tasks_left) {
/* Send partial completion message only.
* The full completion message can only be sent
* after resetting CPU frequencies */
while (stepd_send_pending_exit_msgs(step)) {;}
}
}
}
/*
* Wait for IO
*/
static void
_wait_for_io(stepd_step_rec_t *step)
{
debug("Waiting for IO");
io_close_all(step);
slurm_mutex_lock(&step->io_mutex);
if (step->io_running) {
/*
* Give the I/O thread up to 300 seconds to cleanup before
* continuing with shutdown. Note that it is *not* safe to
* try to kill that thread if it's still running - it could
* be holding some internal locks which could lead to deadlock
* on step teardown, which is infinitely worse than letting
* that thread attempt to continue as we quickly head towards
* the process exiting anyways.
*/
struct timespec ts = { 0, 0 };
ts.tv_sec = time(NULL) + 300;
slurm_cond_timedwait(&step->io_cond, &step->io_mutex, &ts);
}
slurm_mutex_unlock(&step->io_mutex);
/* Close any files for stdout/stderr opened by the stepd */
io_close_local_fds(step);
return;
}
static char *
_make_batch_dir(stepd_step_rec_t *step)
{
char path[PATH_MAX];
if (step->step_id.step_id == SLURM_BATCH_SCRIPT)
snprintf(path, sizeof(path), "%s/job%05u",
conf->spooldir, step->step_id.job_id);
else {
snprintf(path, sizeof(path), "%s/job%05u.%05u",
conf->spooldir, step->step_id.job_id,
step->step_id.step_id);
}
if ((mkdir(path, 0750) < 0) && (errno != EEXIST)) {
error("mkdir(%s): %m", path);
if (errno == ENOSPC)
stepd_drain_node("SlurmdSpoolDir is full");
goto error;
}
if (chown(path, (uid_t) -1, (gid_t) step->gid) < 0) {
error("chown(%s): %m", path);
goto error;
}
if (chmod(path, 0750) < 0) {
error("chmod(%s, 750): %m", path);
goto error;
}
return xstrdup(path);
error:
return NULL;
}
static int _make_batch_script(batch_job_launch_msg_t *msg,
stepd_step_rec_t *step)
{
int flags = O_RDWR | O_CREAT | O_EXCL | O_CLOEXEC;
int fd, length;
char *script = NULL;
char *output;
if (msg->script == NULL) {
error("%s: called with NULL script", __func__);
return SLURM_ERROR;
}
/* note: should replace this with a length as part of msg */
if ((length = strlen(msg->script)) < 1) {
error("%s: called with empty script", __func__);
return SLURM_ERROR;
}
script = _batch_script_path(step);
if ((fd = open(script, flags, S_IRWXU)) < 0) {
error("couldn't open `%s': %m", script);
goto error;
}
if (ftruncate(fd, length) == -1) {
error("%s: ftruncate to %d failed on `%s`: %m",
__func__, length, script);
close(fd);
goto error;
}
output = mmap(0, length, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
if (output == MAP_FAILED) {
error("%s: mmap failed", __func__);
close(fd);
goto error;
}
(void) close(fd);
memcpy(output, msg->script, length);
munmap(output, length);
if (chown(script, step->uid, (gid_t) -1) < 0) {
error("chown(%s): %m", script);
goto error;
}
step->argv[0] = script;
return SLURM_SUCCESS;
error:
(void) unlink(script);
xfree(script);
return SLURM_ERROR;
}
extern void stepd_drain_node(char *reason)
{
update_node_msg_t update_node_msg;
slurm_init_update_node_msg(&update_node_msg);
update_node_msg.node_names = conf->node_name;
update_node_msg.node_state = NODE_STATE_DRAIN;
update_node_msg.reason = reason;
(void) slurm_update_node(&update_node_msg);
}
static void
_send_launch_failure(launch_tasks_request_msg_t *msg, slurm_addr_t *cli, int rc,
uint16_t protocol_version)
{
slurm_msg_t resp_msg;
launch_tasks_response_msg_t resp;
int nodeid;
char *name = NULL;
slurm_cred_arg_t *cred;
uid_t launch_uid = SLURM_AUTH_NOBODY;
/*
* The extern step can get here if something goes wrong starting the
* step. If this does happen we don't have to contact the srun since
* there isn't one, just return.
*/
if ((msg->step_id.step_id == SLURM_EXTERN_CONT) ||
!msg->resp_port || !msg->num_resp_port) {
debug2("%s: The extern step has nothing to send a launch failure to",
__func__);
return;
}
nodeid = nodelist_find(msg->complete_nodelist, conf->node_name);
name = xstrdup(conf->node_name);
/* Need to fetch the step uid to restrict the response appropriately */
cred = slurm_cred_get_args(msg->cred);
launch_uid = cred->uid;
slurm_cred_unlock_args(msg->cred);
debug ("sending launch failure message: %s", slurm_strerror (rc));
slurm_msg_t_init(&resp_msg);
memcpy(&resp_msg.address, cli, sizeof(slurm_addr_t));
slurm_set_port(&resp_msg.address,
msg->resp_port[nodeid % msg->num_resp_port]);
resp_msg.data = &resp;
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
resp_msg.protocol_version = protocol_version;
resp_msg.tls_cert = xstrdup(msg->alloc_tls_cert);
slurm_msg_set_r_uid(&resp_msg, launch_uid);
memcpy(&resp.step_id, &msg->step_id, sizeof(resp.step_id));
resp.node_name = name;
resp.return_code = rc ? rc : -1;
resp.count_of_pids = 0;
if (_send_srun_resp_msg(&resp_msg, msg->nnodes) != SLURM_SUCCESS)
error("%s: Failed to send RESPONSE_LAUNCH_TASKS: %m", __func__);
xfree(name);
return;
}
static void
_send_launch_resp(stepd_step_rec_t *step, int rc)
{
int i;
slurm_msg_t resp_msg;
launch_tasks_response_msg_t resp;
srun_info_t *srun = list_peek(step->sruns);
if (step->batch)
return;
debug("Sending launch resp rc=%d", rc);
slurm_msg_t_init(&resp_msg);
resp_msg.address = srun->resp_addr;
slurm_msg_set_r_uid(&resp_msg, srun->uid);
resp_msg.protocol_version = srun->protocol_version;
resp_msg.data = &resp;
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
resp_msg.tls_cert = xstrdup(srun->tls_cert);
memcpy(&resp.step_id, &step->step_id, sizeof(resp.step_id));
resp.node_name = xstrdup(step->node_name);
resp.return_code = rc;
resp.count_of_pids = step->node_tasks;
resp.local_pids = xmalloc(step->node_tasks * sizeof(*resp.local_pids));
resp.task_ids = xmalloc(step->node_tasks * sizeof(*resp.task_ids));
for (i = 0; i < step->node_tasks; i++) {
resp.local_pids[i] = step->task[i]->pid;
/*
* Don't add offset here, this represents a bit on the other
* side.
*/
resp.task_ids[i] = step->task[i]->gtid;
}
if (_send_srun_resp_msg(&resp_msg, step->nnodes) != SLURM_SUCCESS)
error("%s: Failed to send RESPONSE_LAUNCH_TASKS: %m", __func__);
xfree(resp.local_pids);
xfree(resp.task_ids);
xfree(resp.node_name);
}
static int
_send_complete_batch_script_msg(stepd_step_rec_t *step, int err, int status)
{
int rc;
slurm_msg_t req_msg;
complete_batch_script_msg_t req;
memset(&req, 0, sizeof(req));
req.job_id = step->step_id.job_id;
if (step->oom_error)
req.job_rc = SIG_OOM;
else
req.job_rc = status;
req.jobacct = step->jobacct;
req.node_name = step->node_name;
req.slurm_rc = err;
req.user_id = (uint32_t) step->uid;
slurm_msg_t_init(&req_msg);
req_msg.msg_type= REQUEST_COMPLETE_BATCH_SCRIPT;
req_msg.data = &req;
log_flag(PROTOCOL, "sending REQUEST_COMPLETE_BATCH_SCRIPT slurm_rc:%s job_rc:%d",
slurm_strerror(err), status);
/* Note: these log messages don't go to slurmd.log from here */
/*
* Retry batch complete RPC, send to slurmctld indefinitely.
*/
while (slurm_send_recv_controller_rc_msg(&req_msg, &rc,
working_cluster_rec)) {
info("Retrying job complete RPC for %ps [sleeping %us]",
&step->step_id, RETRY_DELAY);
sleep(RETRY_DELAY);
}
if ((rc == ESLURM_ALREADY_DONE) || (rc == ESLURM_INVALID_JOB_ID))
rc = SLURM_SUCCESS;
if (rc)
slurm_seterrno_ret(rc);
return SLURM_SUCCESS;
}
static int
_slurmd_job_log_init(stepd_step_rec_t *step)
{
char argv0[64];
conf->log_opts.buffered = 1;
/*
* Reset stderr logging to user requested level
* (Logfile and syslog levels remain the same)
*
* The maximum stderr log level is LOG_LEVEL_DEBUG2 because
* some higher level debug messages are generated in the
* stdio code, which would otherwise create more stderr traffic
* to srun and therefore more debug messages in an endless loop.
*/
conf->log_opts.stderr_level = LOG_LEVEL_ERROR;
if (step->debug > LOG_LEVEL_ERROR) {
if ((step->uid == 0) || (step->uid == slurm_conf.slurm_user_id))
conf->log_opts.stderr_level = step->debug;
else
error("Use of --slurmd-debug is allowed only for root and SlurmUser(%s), ignoring it",
slurm_conf.slurm_user_name);
}
if (conf->log_opts.stderr_level > LOG_LEVEL_DEBUG2)
conf->log_opts.stderr_level = LOG_LEVEL_DEBUG2;
#if defined(MULTIPLE_SLURMD)
snprintf(argv0, sizeof(argv0), "slurmstepd-%s", conf->node_name);
#else
snprintf(argv0, sizeof(argv0), "slurmstepd");
#endif
/*
* reinitialize log
*/
log_alter(conf->log_opts, 0, NULL);
log_set_argv0(argv0);
/*
* Connect slurmd stderr to stderr of job
*/
if (step->flags & LAUNCH_PTY)
fd_set_nonblocking(STDERR_FILENO);
if (step->task != NULL) {
if (dup2(step->task[0]->stderr_fd, STDERR_FILENO) < 0) {
error("job_log_init: dup2(stderr): %m");
return ESLURMD_IO_ERROR;
}
}
verbose("debug levels are stderr='%s', logfile='%s', syslog='%s'",
log_num2string(conf->log_opts.stderr_level),
log_num2string(conf->log_opts.logfile_level),
log_num2string(conf->log_opts.syslog_level));
return SLURM_SUCCESS;
}
/*
* Set the priority of the job to be the same as the priority of
* the process that launched the job on the submit node.
* In support of the "PropagatePrioProcess" config keyword.
*/
static void _set_prio_process (stepd_step_rec_t *step)
{
char *env_name = "SLURM_PRIO_PROCESS";
char *env_val;
int prio_daemon, prio_process;
if (!(env_val = getenvp( step->env, env_name ))) {
error( "Couldn't find %s in environment", env_name );
prio_process = 0;
} else {
/* Users shouldn't get this in their environment */
unsetenvp( step->env, env_name );
prio_process = atoi( env_val );
}
if (slurm_conf.propagate_prio_process == PROP_PRIO_NICER) {
prio_daemon = getpriority( PRIO_PROCESS, 0 );
prio_process = MAX( prio_process, (prio_daemon + 1) );
}
if (setpriority( PRIO_PROCESS, 0, prio_process ))
error( "setpriority(PRIO_PROCESS, %d): %m", prio_process );
else {
debug2( "_set_prio_process: setpriority %d succeeded",
prio_process);
}
}
static int
_become_user(stepd_step_rec_t *step, struct priv_state *ps)
{
/*
* First reclaim the effective uid and gid
*/
if (geteuid() == ps->saved_uid)
return SLURM_SUCCESS;
if (seteuid(ps->saved_uid) < 0) {
error("_become_user seteuid: %m");
return SLURM_ERROR;
}
if (setegid(ps->saved_gid) < 0) {
error("_become_user setegid: %m");
return SLURM_ERROR;
}
/*
* Now drop real, effective, and saved uid/gid
*/
if (setregid(step->gid, step->gid) < 0) {
error("setregid: %m");
return SLURM_ERROR;
}
if (setreuid(step->uid, step->uid) < 0) {
error("setreuid: %m");
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
/*
* Run a script as a specific user, with the specified uid, gid, and
* extended groups.
*
* name IN: class of program (task prolog, task epilog, etc.),
* path IN: pathname of program to run
* job IN: slurd job structure, used to get uid, gid, and groups
* max_wait IN: maximum time to wait in seconds, -1 for no limit
* env IN: environment variables to use on exec, sets minimal environment
* if NULL
*
* RET 0 on success, -1 on early failure, or the return from execve().
*/
int
_run_script_as_user(const char *name, const char *path, stepd_step_rec_t *step,
int max_wait, char **env)
{
int status, rc, opt;
pid_t cpid;
struct exec_wait_info *ei;
xassert(env);
if (path == NULL || path[0] == '\0')
return 0;
debug("[job %u] attempting to run %s [%s]",
step->step_id.job_id, name, path);
if ((ei = _fork_child_with_wait_info(0)) == NULL) {
error ("executing %s: fork: %m", name);
return -1;
}
if ((cpid = _exec_wait_get_pid (ei)) == 0) {
struct priv_state sprivs;
char *argv[2];
/* container_g_join needs to be called in the
forked process part of the fork to avoid a race
condition where if this process makes a file or
detacts itself from a child before we add the pid
to the container in the parent of the fork.
*/
/* Ignore system processes */
if ((step->step_id.job_id != 0) &&
!(step->flags & LAUNCH_NO_ALLOC) &&
(container_g_join(&step->step_id, step->uid, false)))
error("container_g_join(%u): %m", step->step_id.job_id);
argv[0] = (char *)xstrdup(path);
argv[1] = NULL;
#ifdef WITH_SELINUX
if (setexeccon(step->selinux_context)) {
error("Failed to set SELinux context to '%s': %m",
step->selinux_context);
_exit(127);
}
#else
if (step->selinux_context) {
error("Built without SELinux support but context was specified");
_exit(127);
}
#endif
sprivs.gid_list = NULL; /* initialize to prevent xfree */
if (drop_privileges(step, true, &sprivs, false) < 0) {
error("run_script_as_user drop_privileges: %m");
/* child process, should not return */
_exit(127);
}
if (_become_user(step, &sprivs) < 0) {
error("run_script_as_user _become_user failed: %m");
/* child process, should not return */
_exit(127);
}
if (chdir(step->cwd) == -1)
error("run_script_as_user: couldn't "
"change working dir to %s: %m", step->cwd);
setpgid(0, 0);
/*
* Wait for signal from parent
*/
_exec_wait_child_wait_for_parent (ei);
while (1) {
execve(path, argv, env);
error("execve(%s): %m", path);
if ((errno == ENFILE) || (errno == ENOMEM)) {
/* System limit on open files or memory reached,
* retry after short delay */
sleep(1);
} else if (errno == EACCES) {
error("Could not run %s [%s]: access denied",
name, path);
break;
} else {
break;
}
}
_exit(127);
}
if (exec_wait_signal_child (ei) != SLURM_SUCCESS)
error ("run_script_as_user: Failed to wakeup %s", name);
_exec_wait_info_destroy (ei);
if (max_wait < 0)
opt = 0;
else
opt = WNOHANG;
while (1) {
rc = waitpid(cpid, &status, opt);
if (rc < 0) {
if (errno == EINTR)
continue;
error("waitpid: %m");
status = 0;
break;
} else if (rc == 0) {
sleep(1);
if ((--max_wait) <= 0) {
killpg(cpid, SIGKILL);
opt = 0;
}
} else {
/* spawned process exited */
break;
}
}
/* Ensure that all child processes get killed, one last time */
killpg(cpid, SIGKILL);
if (WIFEXITED(status))
return WEXITSTATUS(status);
return status;
}