blob: f2a31d12f69c0506b69ac17985d0f7ee8a1a8e52 [file] [log] [blame]
/*****************************************************************************\
* src/slurmd/slurmstepd/mgr.c - job manager functions for slurmstepd
* $Id$
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Copyright (C) 2013 Intel, Inc.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <mgrondona@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#if HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#ifdef HAVE_AIX
# undef HAVE_UNSETENV
# include <sys/checkpnt.h>
#endif
#ifndef HAVE_UNSETENV
# include "src/common/unsetenv.h"
#endif
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <sys/poll.h>
#include <unistd.h>
#include <pwd.h>
#include <grp.h>
#include <stdio.h>
#include <string.h>
#include <sys/utsname.h>
#include <sys/types.h>
#include <time.h>
#if HAVE_STDLIB_H
# include <stdlib.h>
#endif
#ifdef HAVE_PTY_H
# include <pty.h>
# ifdef HAVE_UTMP_H
# include <utmp.h>
# endif
#endif
#include "slurm/slurm_errno.h"
#include "src/common/cbuf.h"
#include "src/common/cpu_frequency.h"
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/mpi.h"
#include "src/common/node_select.h"
#include "src/common/plugstack.h"
#include "src/common/safeopen.h"
#include "src/common/slurm_jobacct_gather.h"
#include "src/common/slurm_acct_gather_profile.h"
#include "src/common/switch.h"
#include "src/common/util-net.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
#include "src/slurmd/slurmd/slurmd.h"
#include "src/slurmd/common/core_spec_plugin.h"
#include "src/slurmd/common/job_container_plugin.h"
#include "src/slurmd/common/setproctitle.h"
#include "src/slurmd/common/proctrack.h"
#include "src/slurmd/common/task_plugin.h"
#include "src/slurmd/common/run_script.h"
#include "src/slurmd/common/reverse_tree.h"
#include "src/slurmd/common/set_oomadj.h"
#include "src/slurmd/slurmstepd/slurmstepd.h"
#include "src/slurmd/slurmstepd/mgr.h"
#include "src/slurmd/slurmstepd/task.h"
#include "src/slurmd/slurmstepd/io.h"
#include "src/slurmd/slurmstepd/pdebug.h"
#include "src/slurmd/slurmstepd/req.h"
#include "src/slurmd/slurmstepd/pam_ses.h"
#include "src/slurmd/slurmstepd/ulimits.h"
#include "src/slurmd/slurmstepd/step_terminate_monitor.h"
#include "src/slurmd/slurmstepd/fname.h"
#define RETRY_DELAY 15 /* retry every 15 seconds */
#define MAX_RETRY 240 /* retry 240 times (one hour max) */
/*
* List of signals to block in this process
*/
static int mgr_sigarray[] = {
SIGINT, SIGTERM, SIGTSTP,
SIGQUIT, SIGPIPE, SIGUSR1,
SIGUSR2, SIGALRM, SIGHUP, 0
};
struct priv_state {
uid_t saved_uid;
gid_t saved_gid;
gid_t * gid_list;
int ngids;
char saved_cwd [4096];
};
step_complete_t step_complete = {
PTHREAD_COND_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
-1,
-1,
-1,
{},
-1,
-1,
true,
(bitstr_t *)NULL,
0,
NULL
};
typedef struct kill_thread {
pthread_t thread_id;
int secs;
} kill_thread_t;
/*
* Prototypes
*/
/*
* Job manager related prototypes
*/
static int _access(const char *path, int modes, uid_t uid, gid_t gid);
static void _send_launch_failure(launch_tasks_request_msg_t *,
slurm_addr_t *, int);
static int _drain_node(char *reason);
static int _fork_all_tasks(stepd_step_rec_t *job, bool *io_initialized);
static int _become_user(stepd_step_rec_t *job, struct priv_state *ps);
static void _set_prio_process (stepd_step_rec_t *job);
static void _set_job_log_prefix(stepd_step_rec_t *job);
static int _setup_normal_io(stepd_step_rec_t *job);
static int _drop_privileges(stepd_step_rec_t *job, bool do_setuid,
struct priv_state *state, bool get_list);
static int _reclaim_privileges(struct priv_state *state);
static void _send_launch_resp(stepd_step_rec_t *job, int rc);
static int _slurmd_job_log_init(stepd_step_rec_t *job);
static void _wait_for_io(stepd_step_rec_t *job);
static int _send_exit_msg(stepd_step_rec_t *job, uint32_t *tid, int n,
int status);
static void _wait_for_children_slurmstepd(stepd_step_rec_t *job);
static int _send_pending_exit_msgs(stepd_step_rec_t *job);
static void _send_step_complete_msgs(stepd_step_rec_t *job);
static void _wait_for_all_tasks(stepd_step_rec_t *job);
static int _wait_for_any_task(stepd_step_rec_t *job, bool waitflag);
static void _setargs(stepd_step_rec_t *job);
static void _random_sleep(stepd_step_rec_t *job);
static int _run_script_as_user(const char *name, const char *path,
stepd_step_rec_t *job,
int max_wait, char **env);
static void _unblock_signals(void);
/*
* Batch job management prototypes:
*/
static char * _make_batch_dir(stepd_step_rec_t *job);
static char * _make_batch_script(batch_job_launch_msg_t *msg, char *path);
static int _send_complete_batch_script_msg(stepd_step_rec_t *job,
int err, int status);
/*
* Initialize the group list using the list of gids from the slurmd if
* available. Otherwise initialize the groups with initgroups().
*/
static int _initgroups(stepd_step_rec_t *job);
static stepd_step_rec_t *reattach_job;
/*
* Launch an job step on the current node
*/
extern stepd_step_rec_t *
mgr_launch_tasks_setup(launch_tasks_request_msg_t *msg, slurm_addr_t *cli,
slurm_addr_t *self)
{
stepd_step_rec_t *job = NULL;
if (!(job = stepd_step_rec_create(msg))) {
/* We want to send back to the slurmd the reason we
failed so keep track of it since errno could be
reset in _send_launch_failure.
*/
int fail = errno;
_send_launch_failure (msg, cli, errno);
errno = fail;
return NULL;
}
_set_job_log_prefix(job);
_setargs(job);
job->envtp->cli = cli;
job->envtp->self = self;
job->envtp->select_jobinfo = msg->select_jobinfo;
return job;
}
inline static int
_send_srun_resp_msg(slurm_msg_t *resp_msg, uint32_t nnodes)
{
int rc, retry = 0, max_retry = 0;
unsigned long delay = 100000;
while (1) {
rc = slurm_send_only_node_msg(resp_msg);
if ((rc == SLURM_SUCCESS) || (errno != ETIMEDOUT))
break;
if (!max_retry)
max_retry = (nnodes / 1024) + 1;
if (retry > max_retry)
break;
debug3("_send_srun_resp_msg: failed to send msg type %u: %m",
resp_msg->msg_type);
usleep(delay);
if (delay < 800000)
delay *= 2;
retry ++;
}
return rc;
}
/*
* Find the maximum task return code
*/
static uint32_t _get_exit_code(stepd_step_rec_t *job)
{
uint32_t i;
uint32_t step_rc = NO_VAL;
for (i = 0; i < job->node_tasks; i++) {
/* if this task was killed by cmd, ignore its
* return status as it only reflects the fact
* that we killed it
*/
if (job->task[i]->killed_by_cmd) {
debug("get_exit_code task %u killed by cmd", i);
continue;
}
/* if this task called PMI_Abort or PMI2_Abort,
* then we let it define the exit status
*/
if (job->task[i]->aborted) {
step_rc = job->task[i]->estatus;
debug("get_exit_code task %u called abort", i);
break;
}
/* If signalled we need to cycle thru all the
* tasks in case one of them called abort
*/
if (WIFSIGNALED(job->task[i]->estatus)) {
error("get_exit_code task %u died by signal", i);
step_rc = job->task[i]->estatus;
break;
}
step_rc = MAX(step_complete.step_rc, job->task[i]->estatus);
}
/* If we killed all the tasks by cmd give at least one return
code. */
if (step_rc == NO_VAL && job->task[0])
step_rc = job->task[0]->estatus;
return step_rc;
}
#ifdef HAVE_ALPS_CRAY
/*
* Kludge to better inter-operate with ALPS layer:
* - CONFIRM method requires the SID of the shell executing the job script,
* - RELEASE method is more robustly called from stepdmgr.
*
* To avoid calling the same select/cray plugin function also in slurmctld,
* we use the following convention:
* - only job_id, job_state, alloc_sid, and select_jobinfo set to non-NULL,
* - batch_flag is 0 (corresponding call in slurmctld uses batch_flag = 1),
* - job_state set to the unlikely value of 'NO_VAL'.
*/
static int _call_select_plugin_from_stepd(stepd_step_rec_t *job,
uint64_t pagg_id,
int (*select_fn)(struct job_record *))
{
struct job_record fake_job_record = {0};
int rc;
fake_job_record.job_id = job->jobid;
fake_job_record.job_state = (uint16_t)NO_VAL;
fake_job_record.select_jobinfo = select_g_select_jobinfo_alloc();
select_g_select_jobinfo_set(fake_job_record.select_jobinfo,
SELECT_JOBDATA_RESV_ID, &job->resv_id);
if (pagg_id)
select_g_select_jobinfo_set(fake_job_record.select_jobinfo,
SELECT_JOBDATA_PAGG_ID, &pagg_id);
rc = (*select_fn)(&fake_job_record);
select_g_select_jobinfo_free(fake_job_record.select_jobinfo);
return rc;
}
static int _select_cray_plugin_job_ready(stepd_step_rec_t *job)
{
uint64_t pagg_id = proctrack_g_find(job->jmgr_pid);
if (pagg_id == 0) {
error("no PAGG ID: job service disabled on this host?");
/*
* If this process is not attached to a container, there is no
* sense in trying to use the SID as fallback, since the call to
* proctrack_g_add() in _fork_all_tasks() will fail later.
* Hence drain the node until sgi_job returns proper PAGG IDs.
*/
return READY_JOB_FATAL;
}
return _call_select_plugin_from_stepd(job, pagg_id, select_g_job_ready);
}
#endif
/*
* Send batch exit code to slurmctld. Non-zero rc will DRAIN the node.
*/
extern void
batch_finish(stepd_step_rec_t *job, int rc)
{
step_complete.step_rc = _get_exit_code(job);
if (job->argv[0] && (unlink(job->argv[0]) < 0))
error("unlink(%s): %m", job->argv[0]);
#ifdef HAVE_ALPS_CRAY
_call_select_plugin_from_stepd(job, 0, select_g_job_fini);
#endif
if (job->aborted) {
if ((job->stepid == NO_VAL) ||
(job->stepid == SLURM_BATCH_SCRIPT)) {
info("step %u.%u abort completed",
job->jobid, job->stepid);
} else
info("job %u abort completed", job->jobid);
} else if ((job->stepid == NO_VAL) ||
(job->stepid == SLURM_BATCH_SCRIPT)) {
verbose("job %u completed with slurm_rc = %d, job_rc = %d",
job->jobid, rc, step_complete.step_rc);
_send_complete_batch_script_msg(job, rc, step_complete.step_rc);
} else {
_wait_for_children_slurmstepd(job);
verbose("job %u.%u completed with slurm_rc = %d, job_rc = %d",
job->jobid, job->stepid, rc, step_complete.step_rc);
_send_step_complete_msgs(job);
}
/* Do not purge directory until slurmctld is notified of batch job
* completion to avoid race condition with slurmd registering missing
* batch job. */
if (job->batchdir && (rmdir(job->batchdir) < 0))
error("rmdir(%s): %m", job->batchdir);
xfree(job->batchdir);
}
/*
* Launch a batch job script on the current node
*/
stepd_step_rec_t *
mgr_launch_batch_job_setup(batch_job_launch_msg_t *msg, slurm_addr_t *cli)
{
stepd_step_rec_t *job = NULL;
if (!(job = batch_stepd_step_rec_create(msg))) {
error("batch_stepd_step_rec_create() failed: %m");
return NULL;
}
_set_job_log_prefix(job);
_setargs(job);
if ((job->batchdir = _make_batch_dir(job)) == NULL) {
goto cleanup;
}
xfree(job->argv[0]);
if ((job->argv[0] = _make_batch_script(msg, job->batchdir)) == NULL) {
goto cleanup;
}
/* this is the new way of setting environment variables */
env_array_for_batch_job(&job->env, msg, conf->node_name);
/* this is the old way of setting environment variables (but
* needed) */
job->envtp->overcommit = msg->overcommit;
job->envtp->select_jobinfo = msg->select_jobinfo;
return job;
cleanup:
error("batch script setup failed for job %u.%u",
msg->job_id, msg->step_id);
if (job->aborted)
verbose("job %u abort complete", job->jobid);
/* Do not purge directory until slurmctld is notified of batch job
* completion to avoid race condition with slurmd registering missing
* batch job. */
if (job->batchdir && (rmdir(job->batchdir) < 0))
error("rmdir(%s): %m", job->batchdir);
xfree(job->batchdir);
errno = ESLURMD_CREATE_BATCH_DIR_ERROR;
return NULL;
}
static void
_set_job_log_prefix(stepd_step_rec_t *job)
{
char buf[256];
if (job->jobid > MAX_NOALLOC_JOBID)
return;
if ((job->jobid >= MIN_NOALLOC_JOBID) || (job->stepid == NO_VAL))
snprintf(buf, sizeof(buf), "[%u]", job->jobid);
else
snprintf(buf, sizeof(buf), "[%u.%u]", job->jobid, job->stepid);
log_set_fpfx(buf);
}
static int
_setup_normal_io(stepd_step_rec_t *job)
{
int rc = 0, ii = 0;
struct priv_state sprivs;
debug2("Entering _setup_normal_io");
/*
* Temporarily drop permissions, initialize task stdio file
* descriptors (which may be connected to files), then
* reclaim privileges.
*/
if (_drop_privileges(job, true, &sprivs, true) < 0)
return ESLURMD_SET_UID_OR_GID_ERROR;
if (io_init_tasks_stdio(job) != SLURM_SUCCESS) {
rc = ESLURMD_IO_ERROR;
goto claim;
}
/*
* MUST create the initial client object before starting
* the IO thread, or we risk losing stdout/err traffic.
*/
if (!job->batch) {
srun_info_t *srun = list_peek(job->sruns);
/* local id of task that sends to srun, -1 for all tasks,
any other value for no tasks */
int srun_stdout_tasks = -1;
int srun_stderr_tasks = -1;
xassert(srun != NULL);
/* If I/O is labelled with task num, and if a separate file is
written per node or per task, the I/O needs to be sent
back to the stepd, get a label appended, and written from
the stepd rather than sent back to srun or written directly
from the node. When a task has ofname or efname == NULL, it
means data gets sent back to the client. */
if (job->labelio) {
slurmd_filename_pattern_t outpattern, errpattern;
bool same = false;
int file_flags;
io_find_filename_pattern(job, &outpattern, &errpattern,
&same);
file_flags = io_get_file_flags(job);
/* Make eio objects to write from the slurmstepd */
if (outpattern == SLURMD_ALL_UNIQUE) {
/* Open a separate file per task */
for (ii = 0; ii < job->node_tasks; ii++) {
rc = io_create_local_client(
job->task[ii]->ofname,
file_flags, job, job->labelio,
job->task[ii]->id,
same ? job->task[ii]->id : -2);
if (rc != SLURM_SUCCESS) {
error("Could not open output "
"file %s: %m",
job->task[ii]->ofname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
srun_stdout_tasks = -2;
if (same)
srun_stderr_tasks = -2;
} else if (outpattern == SLURMD_ALL_SAME) {
/* Open a file for all tasks */
rc = io_create_local_client(
job->task[0]->ofname,
file_flags, job, job->labelio,
-1, same ? -1 : -2);
if (rc != SLURM_SUCCESS) {
error("Could not open output "
"file %s: %m",
job->task[0]->ofname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
srun_stdout_tasks = -2;
if (same)
srun_stderr_tasks = -2;
}
if (!same) {
if (errpattern == SLURMD_ALL_UNIQUE) {
/* Open a separate file per task */
for (ii = 0;
ii < job->node_tasks; ii++) {
rc = io_create_local_client(
job->task[ii]->efname,
file_flags, job,
job->labelio,
-2, job->task[ii]->id);
if (rc != SLURM_SUCCESS) {
error("Could not "
"open error "
"file %s: %m",
job->task[ii]->
efname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
srun_stderr_tasks = -2;
} else if (errpattern == SLURMD_ALL_SAME) {
/* Open a file for all tasks */
rc = io_create_local_client(
job->task[0]->efname,
file_flags, job, job->labelio,
-2, -1);
if (rc != SLURM_SUCCESS) {
error("Could not open error "
"file %s: %m",
job->task[0]->efname);
rc = ESLURMD_IO_ERROR;
goto claim;
}
srun_stderr_tasks = -2;
}
}
}
if (io_initial_client_connect(srun, job, srun_stdout_tasks,
srun_stderr_tasks) < 0) {
rc = ESLURMD_IO_ERROR;
goto claim;
}
}
claim:
if (_reclaim_privileges(&sprivs) < 0) {
error("sete{u/g}id(%lu/%lu): %m",
(u_long) sprivs.saved_uid, (u_long) sprivs.saved_gid);
}
if (!rc && !job->batch) {
if (io_thread_start(job) < 0)
rc = ESLURMD_IO_ERROR;
}
debug2("Leaving _setup_normal_io");
return rc;
}
static int
_setup_user_managed_io(stepd_step_rec_t *job)
{
srun_info_t *srun;
if ((srun = list_peek(job->sruns)) == NULL) {
error("_setup_user_managed_io: no clients!");
return SLURM_ERROR;
}
return user_managed_io_client_connect(job->node_tasks, srun, job->task);
}
static void
_random_sleep(stepd_step_rec_t *job)
{
#if !defined HAVE_FRONT_END
long int delay = 0;
long int max = (3 * job->nnodes);
srand48((long int) (job->jobid + job->nodeid));
delay = lrand48() % ( max + 1 );
debug3("delaying %ldms", delay);
poll(NULL, 0, delay);
#endif
}
/*
* Send task exit message for n tasks. tid is the list of _global_
* task ids that have exited
*/
static int
_send_exit_msg(stepd_step_rec_t *job, uint32_t *tid, int n, int status)
{
slurm_msg_t resp;
task_exit_msg_t msg;
ListIterator i = NULL;
srun_info_t *srun = NULL;
debug3("sending task exit msg for %d tasks status %d", n, status);
msg.task_id_list = tid;
msg.num_tasks = n;
msg.return_code = status;
msg.job_id = job->jobid;
msg.step_id = job->stepid;
slurm_msg_t_init(&resp);
resp.data = &msg;
resp.msg_type = MESSAGE_TASK_EXIT;
/*
* Hack for TCP timeouts on exit of large, synchronized job
* termination. Delay a random amount if job->nnodes > 100
*/
if (job->nnodes > 100)
_random_sleep(job);
/*
* Notify each srun and sattach.
* No message for poe or batch jobs
*/
i = list_iterator_create(job->sruns);
while ((srun = list_next(i))) {
resp.address = srun->resp_addr;
if ((resp.address.sin_family == 0) &&
(resp.address.sin_port == 0) &&
(resp.address.sin_addr.s_addr == 0))
continue; /* no srun or sattach here */
if (_send_srun_resp_msg(&resp, job->nnodes) != SLURM_SUCCESS)
error("Failed to send MESSAGE_TASK_EXIT: %m");
}
list_iterator_destroy(i);
return SLURM_SUCCESS;
}
static void
_wait_for_children_slurmstepd(stepd_step_rec_t *job)
{
int left = 0;
int rc;
struct timespec ts = {0, 0};
pthread_mutex_lock(&step_complete.lock);
/* wait an extra 3 seconds for every level of tree below this level */
if (step_complete.children > 0) {
ts.tv_sec += 3 * (step_complete.max_depth-step_complete.depth);
ts.tv_sec += time(NULL) + REVERSE_TREE_CHILDREN_TIMEOUT;
while((left = bit_clear_count(step_complete.bits)) > 0) {
debug3("Rank %d waiting for %d (of %d) children",
step_complete.rank, left,
step_complete.children);
rc = pthread_cond_timedwait(&step_complete.cond,
&step_complete.lock, &ts);
if (rc == ETIMEDOUT) {
debug2("Rank %d timed out waiting for %d"
" (of %d) children", step_complete.rank,
left, step_complete.children);
break;
}
}
if (left == 0) {
debug2("Rank %d got all children completions",
step_complete.rank);
}
} else {
debug2("Rank %d has no children slurmstepd",
step_complete.rank);
}
step_complete.step_rc = _get_exit_code(job);
step_complete.wait_children = false;
pthread_mutex_unlock(&step_complete.lock);
}
/*
* Send a single step completion message, which represents a single range
* of complete job step nodes.
*/
/* caller is holding step_complete.lock */
static void
_one_step_complete_msg(stepd_step_rec_t *job, int first, int last)
{
slurm_msg_t req;
step_complete_msg_t msg;
int rc = -1;
int retcode;
int i;
uint16_t port = 0;
char ip_buf[16];
static bool acct_sent = false;
debug2("_one_step_complete_msg: first=%d, last=%d", first, last);
memset(&msg, 0, sizeof(step_complete_msg_t));
msg.job_id = job->jobid;
msg.job_step_id = job->stepid;
msg.range_first = first;
msg.range_last = last;
msg.step_rc = step_complete.step_rc;
msg.jobacct = jobacctinfo_create(NULL);
/************* acct stuff ********************/
if (!acct_sent) {
jobacctinfo_aggregate(step_complete.jobacct, job->jobacct);
jobacctinfo_getinfo(step_complete.jobacct,
JOBACCT_DATA_TOTAL, msg.jobacct,
SLURM_PROTOCOL_VERSION);
acct_sent = true;
}
/*********************************************/
slurm_msg_t_init(&req);
req.msg_type = REQUEST_STEP_COMPLETE;
req.data = &msg;
req.address = step_complete.parent_addr;
/* Do NOT change this check to "step_complete.rank != 0", because
* there are odd situations where SlurmUser or root could
* craft a launch without a valid credential, and no tree information
* can be built with out the hostlist from the credential.
*/
if (step_complete.parent_rank != -1) {
debug3("Rank %d sending complete to rank %d, range %d to %d",
step_complete.rank, step_complete.parent_rank,
first, last);
/* On error, pause then try sending to parent again.
* The parent slurmstepd may just not have started yet, because
* of the way that the launch message forwarding works.
*/
for (i = 0; i < REVERSE_TREE_PARENT_RETRY; i++) {
if (i)
sleep(1);
retcode = slurm_send_recv_rc_msg_only_one(&req, &rc, 0);
if ((retcode == 0) && (rc == 0))
goto finished;
}
/* on error AGAIN, send to the slurmctld instead */
debug3("Rank %d sending complete to slurmctld instead, range "
"%d to %d", step_complete.rank, first, last);
} else {
/* this is the base of the tree, its parent is slurmctld */
debug3("Rank %d sending complete to slurmctld, range %d to %d",
step_complete.rank, first, last);
}
/* Retry step complete RPC send to slurmctld indefinitely.
* Prevent orphan job step if slurmctld is down */
i = 1;
while (slurm_send_recv_controller_rc_msg(&req, &rc) < 0) {
if (i++ == 1) {
slurm_get_ip_str(&step_complete.parent_addr, &port,
ip_buf, sizeof(ip_buf));
error("Rank %d failed sending step completion message "
"directly to slurmctld (%s:%u), retrying",
step_complete.rank, ip_buf, port);
}
sleep(60);
}
if (i > 1) {
info("Rank %d sent step completion message directly to "
"slurmctld (%s:%u)", step_complete.rank, ip_buf, port);
}
finished:
jobacctinfo_destroy(msg.jobacct);
}
/* Given a starting bit in the step_complete.bits bitstring, "start",
* find the next contiguous range of set bits and return the first
* and last indices of the range in "first" and "last".
*
* caller is holding step_complete.lock
*/
static int
_bit_getrange(int start, int size, int *first, int *last)
{
int i;
bool found_first = false;
for (i = start; i < size; i++) {
if (bit_test(step_complete.bits, i)) {
if (found_first) {
*last = i;
continue;
} else {
found_first = true;
*first = i;
*last = i;
}
} else {
if (!found_first) {
continue;
} else {
*last = i - 1;
break;
}
}
}
if (found_first)
return 1;
else
return 0;
}
/*
* Send as many step completion messages as necessary to represent
* all completed nodes in the job step. There may be nodes that have
* not yet signalled their completion, so there will be gaps in the
* completed node bitmap, requiring that more than one message be sent.
*/
static void
_send_step_complete_msgs(stepd_step_rec_t *job)
{
int start, size;
int first=-1, last=-1;
bool sent_own_comp_msg = false;
pthread_mutex_lock(&step_complete.lock);
start = 0;
size = bit_size(step_complete.bits);
/* If no children, send message and return early */
if (size == 0) {
_one_step_complete_msg(job, step_complete.rank,
step_complete.rank);
pthread_mutex_unlock(&step_complete.lock);
return;
}
while(_bit_getrange(start, size, &first, &last)) {
/* THIS node is not in the bit string, so we need to prepend
the local rank */
if (start == 0 && first == 0) {
sent_own_comp_msg = true;
first = -1;
}
_one_step_complete_msg(job, (first + step_complete.rank + 1),
(last + step_complete.rank + 1));
start = last + 1;
}
if (!sent_own_comp_msg)
_one_step_complete_msg(job, step_complete.rank,
step_complete.rank);
pthread_mutex_unlock(&step_complete.lock);
}
/* This dummy function is provided so that the checkpoint functions can
* resolve this symbol name (as needed for some of the checkpoint
* functions used by slurmctld). */
extern void agent_queue_request(void *dummy)
{
fatal("Invalid agent_queue_request function call, likely from "
"checkpoint plugin");
}
/*
* Executes the functions of the slurmd job manager process,
* which runs as root and performs shared memory and interconnect
* initialization, etc.
*
* Returns 0 if job ran and completed successfully.
* Returns errno if job startup failed. NOTE: This will DRAIN the node.
*/
int
job_manager(stepd_step_rec_t *job)
{
int rc = SLURM_SUCCESS;
bool io_initialized = false;
char *ckpt_type = slurm_get_checkpoint_type();
debug3("Entered job_manager for %u.%u pid=%d",
job->jobid, job->stepid, job->jmgr_pid);
#ifdef PR_SET_DUMPABLE
if (prctl(PR_SET_DUMPABLE, 1) < 0)
debug ("Unable to set dumpable to 1");
#endif /* PR_SET_DUMPABLE */
/* run now so we don't drop permissions on any of the gather plugins */
acct_gather_conf_init();
/*
* Preload plugins.
*/
if ((core_spec_g_init()!= SLURM_SUCCESS) ||
(switch_init() != SLURM_SUCCESS) ||
(slurmd_task_init() != SLURM_SUCCESS) ||
(slurm_proctrack_init() != SLURM_SUCCESS) ||
(checkpoint_init(ckpt_type) != SLURM_SUCCESS) ||
(jobacct_gather_init() != SLURM_SUCCESS)) {
rc = SLURM_PLUGIN_NAME_INVALID;
goto fail1;
}
if (mpi_hook_slurmstepd_init(&job->env) != SLURM_SUCCESS) {
rc = SLURM_MPI_PLUGIN_NAME_INVALID;
goto fail1;
}
if (!job->batch &&
(switch_g_job_preinit(job->switch_job) < 0)) {
rc = ESLURM_INTERCONNECT_FAILURE;
goto fail1;
}
if ((job->cont_id == 0) &&
(proctrack_g_create(job) != SLURM_SUCCESS)) {
error("proctrack_g_create: %m");
rc = ESLURMD_SETUP_ENVIRONMENT_ERROR;
goto fail1;
}
#ifdef HAVE_ALPS_CRAY
/*
* Note that the previously called proctrack_g_create function is
* mandatory since the select/cray plugin needs the job container
* ID in order to CONFIRM the ALPS reservation.
* It is not a good idea to perform this setup in _fork_all_tasks(),
* since any transient failure of ALPS (which can happen in practice)
* will then set the frontend node to DRAIN.
*
* ALso note that we do not check the reservation for batch jobs with
* a reservation ID of zero and no CPUs. These are SLURM job
* allocations containing no compute nodes and thus have no ALPS
* reservation.
*/
if (!job->batch || job->resv_id || job->cpus) {
rc = _select_cray_plugin_job_ready(job);
if (rc != SLURM_SUCCESS) {
/*
* Transient error: slurmctld knows this condition to
* mean that the ALPS (not the SLURM) reservation
* failed and tries again.
*/
if (rc == READY_JOB_ERROR)
rc = ESLURM_RESERVATION_NOT_USABLE;
else
rc = ESLURMD_SETUP_ENVIRONMENT_ERROR;
error("could not confirm ALPS reservation #%u",
job->resv_id);
goto fail1;
}
}
#endif
debug2("Before call to spank_init()");
if (spank_init (job) < 0) {
error ("Plugin stack initialization failed.");
rc = SLURM_PLUGIN_NAME_INVALID;
goto fail1;
}
debug2("After call to spank_init()");
/* Call switch_g_job_init() before becoming user */
if (!job->batch && job->argv && (switch_g_job_init(job) < 0)) {
/* error("switch_g_job_init: %m"); already logged */
rc = ESLURM_INTERCONNECT_FAILURE;
goto fail2;
}
/* fork necessary threads for checkpoint */
if (checkpoint_stepd_prefork(job) != SLURM_SUCCESS) {
error("Failed checkpoint_stepd_prefork");
rc = SLURM_FAILURE;
io_close_task_fds(job);
goto fail2;
}
/* fork necessary threads for MPI */
if (mpi_hook_slurmstepd_prefork(job, &job->env) != SLURM_SUCCESS) {
error("Failed mpi_hook_slurmstepd_prefork");
rc = SLURM_FAILURE;
goto fail2;
}
/* Calls pam_setup() and requires pam_finish() if
* successful. Only check for < 0 here since other slurm
* error codes could come that are more descriptive. */
if ((rc = _fork_all_tasks(job, &io_initialized)) < 0) {
debug("_fork_all_tasks failed");
rc = ESLURMD_EXECVE_FAILED;
goto fail2;
}
/*
* If IO initialization failed, return SLURM_SUCCESS (on a
* batch step) or the node will be drain otherwise. Regular
* srun needs the error sent or it will hang waiting for the
* launch to happen.
*/
if ((rc != SLURM_SUCCESS) || !io_initialized)
goto fail2;
io_close_task_fds(job);
xsignal_block (mgr_sigarray);
reattach_job = job;
job->state = SLURMSTEPD_STEP_RUNNING;
/* if we are not polling then we need to make sure we get some
* information here
*/
if (!conf->job_acct_gather_freq)
jobacct_gather_stat_task(0);
/* Send job launch response with list of pids */
_send_launch_resp(job, 0);
_wait_for_all_tasks(job);
acct_gather_profile_endpoll();
acct_gather_profile_g_node_step_end();
acct_gather_profile_fini();
job->state = SLURMSTEPD_STEP_ENDING;
if (!job->batch &&
(switch_g_job_fini(job->switch_job) < 0)) {
error("switch_g_job_fini: %m");
}
fail2:
/*
* First call switch_g_job_postfini() - In at least one case,
* this will clean up any straggling processes. If this call
* is moved behind wait_for_io(), we may block waiting for IO
* on a hung process.
*
* Make sure all processes in session are dead. On systems
* with an IBM Federation switch, all processes must be
* terminated before the switch window can be released by
* switch_g_job_postfini().
*/
step_terminate_monitor_start(job->jobid, job->stepid);
if (job->cont_id != 0) {
proctrack_g_signal(job->cont_id, SIGKILL);
proctrack_g_wait(job->cont_id);
}
step_terminate_monitor_stop();
if (!job->batch) {
if (switch_g_job_postfini(job) < 0)
error("switch_g_job_postfini: %m");
}
/*
* Wait for io thread to complete (if there is one)
*/
if (!job->batch && !job->user_managed_io && io_initialized)
_wait_for_io(job);
/*
* Reset cpu frequency if it was changed
*/
if (job->cpu_freq != NO_VAL)
cpu_freq_reset(job);
/*
* Warn task plugin that the user's step have terminated
*/
task_g_post_step(job);
/*
* This just cleans up all of the PAM state in case rc == 0
* which means _fork_all_tasks performs well.
* Must be done after IO termination in case of IO operations
* require something provided by the PAM (i.e. security token)
*/
if (!rc)
pam_finish();
debug2("Before call to spank_fini()");
if (spank_fini (job) < 0) {
error ("spank_fini failed");
}
debug2("After call to spank_fini()");
fail1:
/* If interactive job startup was abnormal,
* be sure to notify client.
*/
if (rc != 0) {
error("job_manager exiting abnormally, rc = %d", rc);
_send_launch_resp(job, rc);
}
if (!job->batch && (step_complete.rank > -1)) {
if (job->aborted)
info("job_manager exiting with aborted job");
else
_wait_for_children_slurmstepd(job);
_send_step_complete_msgs(job);
}
if (core_spec_g_clear(job->cont_id))
error("core_spec_g_clear: %m");
xfree(ckpt_type);
return(rc);
}
static int
_pre_task_privileged(stepd_step_rec_t *job, int taskid, struct priv_state *sp)
{
if (_reclaim_privileges(sp) < 0)
return SLURM_ERROR;
if (spank_task_privileged (job, taskid) < 0)
return error("spank_task_init_privileged failed");
if (task_g_pre_launch_priv(job) < 0)
return error("pre_launch_priv failed");
/* sp->gid_list should already be initialized */
return(_drop_privileges (job, true, sp, false));
}
struct exec_wait_info {
int id;
pid_t pid;
int parentfd;
int childfd;
};
static struct exec_wait_info * exec_wait_info_create (int i)
{
int fdpair[2];
struct exec_wait_info * e;
if (pipe (fdpair) < 0) {
error ("exec_wait_info_create: pipe: %m");
return NULL;
}
fd_set_close_on_exec(fdpair[0]);
fd_set_close_on_exec(fdpair[1]);
e = xmalloc (sizeof (*e));
e->childfd = fdpair[0];
e->parentfd = fdpair[1];
e->id = i;
e->pid = -1;
return (e);
}
static void exec_wait_info_destroy (struct exec_wait_info *e)
{
if (e == NULL)
return;
if (e->parentfd >= 0)
close (e->parentfd);
if (e->childfd >= 0)
close (e->childfd);
e->id = -1;
e->pid = -1;
xfree(e);
}
static pid_t exec_wait_get_pid (struct exec_wait_info *e)
{
if (e == NULL)
return (-1);
return (e->pid);
}
static struct exec_wait_info * fork_child_with_wait_info (int id)
{
struct exec_wait_info *e;
if (!(e = exec_wait_info_create (id)))
return (NULL);
if ((e->pid = fork ()) < 0) {
exec_wait_info_destroy (e);
return (NULL);
}
/*
* Close parentfd in child, and childfd in parent:
*/
if (e->pid == 0) {
close (e->parentfd);
e->parentfd = -1;
}
else {
close (e->childfd);
e->childfd = -1;
}
return (e);
}
static int exec_wait_child_wait_for_parent (struct exec_wait_info *e)
{
char c;
if (read (e->childfd, &c, sizeof (c)) != 1)
return error ("wait_for_parent: failed: %m");
return (0);
}
static int exec_wait_signal_child (struct exec_wait_info *e)
{
char c = '\0';
if (write (e->parentfd, &c, sizeof (c)) != 1)
return error ("write to unblock task %d failed: %m", e->id);
return (0);
}
static int exec_wait_signal (struct exec_wait_info *e, stepd_step_rec_t *job)
{
debug3 ("Unblocking %u.%u task %d, writefd = %d",
job->jobid, job->stepid, e->id, e->parentfd);
exec_wait_signal_child (e);
return (0);
}
/*
* Send SIGKILL to child in exec_wait_info 'e'
* Returns 0 for success, -1 for failure.
*/
static int exec_wait_kill_child (struct exec_wait_info *e)
{
if (e->pid < 0)
return (-1);
if (kill (e->pid, SIGKILL) < 0)
return (-1);
e->pid = -1;
return (0);
}
/*
* Send all children in exec_wait_list SIGKILL.
* Returns 0 for success or < 0 on failure.
*/
static int exec_wait_kill_children (List exec_wait_list)
{
int rc = 0;
int count;
struct exec_wait_info *e;
ListIterator i;
if ((count = list_count (exec_wait_list)) == 0)
return (0);
verbose ("Killing %d remaining child%s",
count, (count > 1 ? "ren" : ""));
i = list_iterator_create (exec_wait_list);
if (i == NULL)
return error ("exec_wait_kill_children: iterator_create: %m");
while ((e = list_next (i)))
rc += exec_wait_kill_child (e);
list_iterator_destroy (i);
return (rc);
}
static void prepare_stdio (stepd_step_rec_t *job, stepd_step_task_info_t *task)
{
#ifdef HAVE_PTY_H
if (job->pty && (task->gtid == 0)) {
if (login_tty(task->stdin_fd))
error("login_tty: %m");
else
debug3("login_tty good");
return;
}
#endif
io_dup_stdio(task);
return;
}
static void _unblock_signals (void)
{
sigset_t set;
int i;
for (i = 0; mgr_sigarray[i]; i++) {
/* eliminate pending signals, then set to default */
xsignal(mgr_sigarray[i], SIG_IGN);
xsignal(mgr_sigarray[i], SIG_DFL);
}
sigemptyset(&set);
xsignal_set_mask (&set);
}
/* fork and exec N tasks
*/
static int
_fork_all_tasks(stepd_step_rec_t *job, bool *io_initialized)
{
int rc = SLURM_SUCCESS;
int i;
struct priv_state sprivs;
jobacct_id_t jobacct_id;
char *oom_value;
List exec_wait_list = NULL;
char *esc;
xassert(job != NULL);
set_oom_adj(0); /* the tasks may be killed by OOM */
if (task_g_pre_setuid(job)) {
error("Failed task affinity setup");
return SLURM_ERROR;
}
/* Temporarily drop effective privileges, except for the euid.
* We need to wait until after pam_setup() to drop euid.
*/
if (_drop_privileges (job, false, &sprivs, true) < 0)
return ESLURMD_SET_UID_OR_GID_ERROR;
if (pam_setup(job->user_name, conf->hostname)
!= SLURM_SUCCESS){
error ("error in pam_setup");
rc = SLURM_ERROR;
}
/*
* Reclaim privileges to do the io setup
*/
_reclaim_privileges (&sprivs);
if (rc)
goto fail1; /* pam_setup error */
set_umask(job); /* set umask for stdout/err files */
if (job->user_managed_io)
rc = _setup_user_managed_io(job);
else
rc = _setup_normal_io(job);
/*
* Initialize log facility to copy errors back to srun
*/
if (!rc)
rc = _slurmd_job_log_init(job);
if (rc) {
error("IO setup failed: %m");
job->task[0]->estatus = 0x0100;
step_complete.step_rc = 0x0100;
if (job->batch)
rc = SLURM_SUCCESS; /* drains node otherwise */
goto fail1;
} else {
*io_initialized = true;
}
/*
* Temporarily drop effective privileges
*/
if (_drop_privileges (job, true, &sprivs, true) < 0) {
error ("_drop_privileges: %m");
rc = SLURM_ERROR;
goto fail2;
}
/* If there is an \ in the path
* remove it.
*/
esc = is_path_escaped(job->cwd);
if (esc) {
xfree(job->cwd);
job->cwd = esc;
}
if (chdir(job->cwd) < 0) {
error("couldn't chdir to `%s': %m: going to /tmp instead",
job->cwd);
if (chdir("/tmp") < 0) {
error("couldn't chdir to /tmp either. dying.");
rc = SLURM_ERROR;
goto fail3;
}
}
if (spank_user (job) < 0) {
error("spank_user failed.");
rc = SLURM_ERROR;
goto fail4;
}
exec_wait_list = list_create ((ListDelF) exec_wait_info_destroy);
if (!exec_wait_list) {
error ("Unable to create exec_wait_list");
rc = SLURM_ERROR;
goto fail4;
}
/*
* Fork all of the task processes.
*/
for (i = 0; i < job->node_tasks; i++) {
char time_stamp[256];
pid_t pid;
struct exec_wait_info *ei;
acct_gather_profile_g_task_start(i);
if ((ei = fork_child_with_wait_info (i)) == NULL) {
error("child fork: %m");
exec_wait_kill_children (exec_wait_list);
rc = SLURM_ERROR;
goto fail4;
} else if ((pid = exec_wait_get_pid (ei)) == 0) { /* child */
/*
* Destroy exec_wait_list in the child.
* Only exec_wait_info for previous tasks have been
* added to the list so far, so everything else
* can be discarded.
*/
list_destroy (exec_wait_list);
#ifdef HAVE_AIX
(void) mkcrid(0);
#endif
/* jobacctinfo_endpoll();
* closing jobacct files here causes deadlock */
if (conf->propagate_prio)
_set_prio_process(job);
/*
* Reclaim privileges and call any plugin hooks
* that may require elevated privs
* sprivs.gid_list is already set from the
* _drop_privileges call above, no not reinitialize.
*/
if (_pre_task_privileged(job, i, &sprivs) < 0)
exit(1);
if (_become_user(job, &sprivs) < 0) {
error("_become_user failed: %m");
/* child process, should not return */
exit(1);
}
/* log_fini(); */ /* note: moved into exec_task() */
_unblock_signals();
/*
* Need to setup stdio before setpgid() is called
* in case we are setting up a tty. (login_tty()
* must be called before setpgid() or it is
* effectively disabled).
*/
prepare_stdio (job, job->task[i]);
/*
* Block until parent notifies us that it is ok to
* proceed. This allows the parent to place all
* children in any process groups or containers
* before they make a call to exec(2).
*/
if (exec_wait_child_wait_for_parent (ei) < 0)
exit (1);
exec_task(job, i);
}
/*
* Parent continues:
*/
list_append (exec_wait_list, ei);
log_timestamp(time_stamp, sizeof(time_stamp));
verbose ("task %lu (%lu) started %s",
(unsigned long) job->task[i]->gtid,
(unsigned long) pid, time_stamp);
job->task[i]->pid = pid;
if (i == 0)
job->pgid = pid;
}
/*
* All tasks are now forked and running as the user, but
* will wait for our signal before calling exec.
*/
/*
* Reclaim privileges
*/
if (_reclaim_privileges (&sprivs) < 0) {
error ("Unable to reclaim privileges");
/* Don't bother erroring out here */
}
if ((oom_value = getenv("SLURMSTEPD_OOM_ADJ"))) {
int i = atoi(oom_value);
debug("Setting slurmstepd oom_adj to %d", i);
set_oom_adj(i);
}
if (chdir (sprivs.saved_cwd) < 0) {
error ("Unable to return to working directory");
}
for (i = 0; i < job->node_tasks; i++) {
/*
* Put this task in the step process group
* login_tty() must put task zero in its own
* session, causing setpgid() to fail, setsid()
* has already set its process group as desired
*/
if ((job->pty == 0)
&& (setpgid (job->task[i]->pid, job->pgid) < 0)) {
error("Unable to put task %d (pid %d) into "
"pgrp %d: %m",
i,
job->task[i]->pid,
job->pgid);
}
if (proctrack_g_add(job, job->task[i]->pid)
== SLURM_ERROR) {
error("proctrack_g_add: %m");
rc = SLURM_ERROR;
goto fail2;
}
jobacct_id.nodeid = job->nodeid;
jobacct_id.taskid = job->task[i]->gtid;
jobacct_id.job = job;
if (i == (job->node_tasks - 1)) {
/* start polling on the last task */
jobacct_gather_set_proctrack_container_id(job->cont_id);
jobacct_gather_add_task(job->task[i]->pid, &jobacct_id,
1);
} else {
/* don't poll yet */
jobacct_gather_add_task(job->task[i]->pid, &jobacct_id,
0);
}
if (spank_task_post_fork (job, i) < 0) {
error ("spank task %d post-fork failed", i);
rc = SLURM_ERROR;
goto fail2;
}
}
// jobacct_gather_set_proctrack_container_id(job->cont_id);
if (container_g_add_cont(job->jobid, job->cont_id) != SLURM_SUCCESS)
error("container_g_add_cont(%u): %m", job->jobid);
if (core_spec_g_set(job->cont_id, job->job_core_spec))
error("core_spec_g_set: %m");
/*
* Now it's ok to unblock the tasks, so they may call exec.
*/
list_for_each (exec_wait_list, (ListForF) exec_wait_signal, job);
list_destroy (exec_wait_list);
for (i = 0; i < job->node_tasks; i++) {
/*
* Prepare process for attach by parallel debugger
* (if specified and able)
*/
if (pdebug_trace_process(job, job->task[i]->pid)
== SLURM_ERROR)
rc = SLURM_ERROR;
}
return rc;
fail4:
if (chdir (sprivs.saved_cwd) < 0) {
error ("Unable to return to working directory");
}
fail3:
_reclaim_privileges (&sprivs);
if (exec_wait_list)
list_destroy (exec_wait_list);
fail2:
io_close_task_fds(job);
fail1:
pam_finish();
return rc;
}
/*
* Loop once through tasks looking for all tasks that have exited with
* the same exit status (and whose statuses have not been sent back to
* the client) Aggregate these tasks into a single task exit message.
*
*/
static int
_send_pending_exit_msgs(stepd_step_rec_t *job)
{
int i;
int nsent = 0;
int status = 0;
bool set = false;
uint32_t tid[job->node_tasks];
/*
* Collect all exit codes with the same status into a
* single message.
*/
for (i = 0; i < job->node_tasks; i++) {
stepd_step_task_info_t *t = job->task[i];
if (!t->exited || t->esent)
continue;
if (!set) {
status = t->estatus;
set = true;
} else if (status != t->estatus)
continue;
tid[nsent++] = t->gtid;
t->esent = true;
}
if (nsent) {
debug2("Aggregated %d task exit messages", nsent);
_send_exit_msg(job, tid, nsent, status);
}
return nsent;
}
static inline void
_log_task_exit(unsigned long taskid, unsigned long pid, int status)
{
/*
* Print a nice message to the log describing the task exit status.
*
* The final else is there just in case there is ever an exit status
* that isn't WIFEXITED || WIFSIGNALED. We'll probably never reach
* that code, but it is better than dropping a potentially useful
* exit status.
*/
if (WIFEXITED(status)) {
verbose("task %lu (%lu) exited with exit code %d.",
taskid, pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
/* WCOREDUMP isn't available on AIX */
verbose("task %lu (%lu) exited. Killed by signal %d%s.",
taskid, pid, WTERMSIG(status),
#ifdef WCOREDUMP
WCOREDUMP(status) ? " (core dumped)" : ""
#else
""
#endif
);
} else {
verbose("task %lu (%lu) exited with status 0x%04x.",
taskid, pid, status);
}
}
/*
* If waitflag is true, perform a blocking wait for a single process
* and then return.
*
* If waitflag is false, do repeated non-blocking waits until
* there are no more processes to reap (waitpid returns 0).
*
* Returns the number of tasks for which a wait3() was successfully
* performed, or -1 if there are no child tasks.
*/
static int
_wait_for_any_task(stepd_step_rec_t *job, bool waitflag)
{
stepd_step_task_info_t *t = NULL;
int status = 0;
pid_t pid;
int completed = 0;
jobacctinfo_t *jobacct = NULL;
struct rusage rusage;
char **tmp_env;
do {
pid = wait3(&status, waitflag ? 0 : WNOHANG, &rusage);
if (pid == -1) {
if (errno == ECHILD) {
debug("No child processes");
if (completed == 0)
completed = -1;
goto done;
} else if (errno == EINTR) {
debug("wait3 was interrupted");
continue;
} else {
debug("Unknown errno %d", errno);
continue;
}
} else if (pid == 0) { /* WNOHANG and no pids available */
goto done;
}
/************* acct stuff ********************/
jobacct = jobacct_gather_remove_task(pid);
if (jobacct) {
jobacctinfo_setinfo(jobacct,
JOBACCT_DATA_RUSAGE, &rusage,
SLURM_PROTOCOL_VERSION);
/* Since we currently don't track energy
usage per task (only per step). We take
into account only the last poll of the last task.
Odds are it is the only one with
information anyway, but just to be safe we
will zero out the previous value since this
one will over ride it.
If this ever changes in the future this logic
will need to change.
*/
if (jobacct->energy.consumed_energy)
job->jobacct->energy.consumed_energy = 0;
jobacctinfo_aggregate(job->jobacct, jobacct);
jobacctinfo_destroy(jobacct);
}
acct_gather_profile_g_task_end(pid);
/*********************************************/
if ((t = job_task_info_by_pid(job, pid))) {
completed++;
_log_task_exit(t->gtid, pid, status);
t->exited = true;
t->estatus = status;
job->envtp->procid = t->gtid;
job->envtp->localid = t->id;
job->envtp->distribution = -1;
job->envtp->batch_flag = job->batch;
/* Modify copy of job's environment. Do not alter in
* place or concurrent searches of the environment can
* generate invalid memory references. */
job->envtp->env = env_array_copy((const char **) job->env);
setup_env(job->envtp, false);
tmp_env = job->env;
job->env = job->envtp->env;
env_array_free(tmp_env);
if (job->task_epilog) {
_run_script_as_user("user task_epilog",
job->task_epilog,
job, 5, job->env);
}
if (conf->task_epilog) {
char *my_epilog;
slurm_mutex_lock(&conf->config_mutex);
my_epilog = xstrdup(conf->task_epilog);
slurm_mutex_unlock(&conf->config_mutex);
_run_script_as_user("slurm task_epilog",
my_epilog,
job, -1, job->env);
xfree(my_epilog);
}
job->envtp->procid = t->id;
if (spank_task_exit (job, t->id) < 0) {
error ("Unable to spank task %d at exit",
t->id);
}
task_g_post_term(job, t);
}
} while ((pid > 0) && !waitflag);
done:
return completed;
}
static void
_wait_for_all_tasks(stepd_step_rec_t *job)
{
int tasks_left = 0;
int i;
for (i = 0; i < job->node_tasks; i++) {
if (job->task[i]->state < STEPD_STEP_TASK_COMPLETE) {
tasks_left++;
}
}
if (tasks_left < job->node_tasks)
verbose("Only %d of %d requested tasks successfully launched",
tasks_left, job->node_tasks);
for (i = 0; i < tasks_left; ) {
int rc;
rc = _wait_for_any_task(job, true);
if (rc != -1) {
i += rc;
if (i < tasks_left) {
/* To limit the amount of traffic back
* we will sleep a bit to make sure we
* have most if not all the tasks
* completed before we return */
usleep(100000); /* 100 msec */
rc = _wait_for_any_task(job, false);
if (rc != -1)
i += rc;
}
}
while (_send_pending_exit_msgs(job)) {;}
}
}
static void *_kill_thr(void *args)
{
kill_thread_t *kt = ( kill_thread_t *) args;
unsigned int pause = kt->secs;
do {
pause = sleep(pause);
} while (pause > 0);
pthread_cancel(kt->thread_id);
xfree(kt);
return NULL;
}
static void _delay_kill_thread(pthread_t thread_id, int secs)
{
pthread_t kill_id;
pthread_attr_t attr;
kill_thread_t *kt = xmalloc(sizeof(kill_thread_t));
int retries = 0;
kt->thread_id = thread_id;
kt->secs = secs;
slurm_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
while (pthread_create(&kill_id, &attr, &_kill_thr, (void *) kt)) {
error("_delay_kill_thread: pthread_create: %m");
if (++retries > MAX_RETRIES) {
error("_delay_kill_thread: Can't create pthread");
break;
}
usleep(10); /* sleep and again */
}
slurm_attr_destroy(&attr);
}
/*
* Wait for IO
*/
static void
_wait_for_io(stepd_step_rec_t *job)
{
debug("Waiting for IO");
io_close_all(job);
/*
* Wait until IO thread exits or kill it after 300 seconds
*/
if (job->ioid) {
_delay_kill_thread(job->ioid, 300);
pthread_join(job->ioid, NULL);
} else
info("_wait_for_io: ioid==0");
/* Close any files for stdout/stderr opened by the stepd */
io_close_local_fds(job);
return;
}
static char *
_make_batch_dir(stepd_step_rec_t *job)
{
char path[MAXPATHLEN];
if (job->stepid == NO_VAL)
snprintf(path, sizeof(path), "%s/job%05u",
conf->spooldir, job->jobid);
else {
snprintf(path, sizeof(path), "%s/job%05u.%05u",
conf->spooldir, job->jobid, job->stepid);
}
if ((mkdir(path, 0750) < 0) && (errno != EEXIST)) {
error("mkdir(%s): %m", path);
if (errno == ENOSPC)
_drain_node("SlurmdSpoolDir is full");
goto error;
}
if (chown(path, (uid_t) -1, (gid_t) job->gid) < 0) {
error("chown(%s): %m", path);
goto error;
}
if (chmod(path, 0750) < 0) {
error("chmod(%s, 750): %m", path);
goto error;
}
return xstrdup(path);
error:
return NULL;
}
static char *
_make_batch_script(batch_job_launch_msg_t *msg, char *path)
{
FILE *fp = NULL;
char script[MAXPATHLEN];
if (msg->script == NULL) {
error("_make_batch_script: called with NULL script");
return NULL;
}
snprintf(script, sizeof(script), "%s/%s", path, "slurm_script");
again:
if ((fp = safeopen(script, "w", SAFEOPEN_CREATE_ONLY)) == NULL) {
if ((errno != EEXIST) || (unlink(script) < 0)) {
error("couldn't open `%s': %m", script);
goto error;
}
goto again;
}
if (fputs(msg->script, fp) < 0) {
(void) fclose(fp);
error("fputs: %m");
if (errno == ENOSPC)
_drain_node("SlurmdSpoolDir is full");
goto error;
}
if (fclose(fp) < 0) {
error("fclose: %m");
}
if (chown(script, (uid_t) msg->uid, (gid_t) -1) < 0) {
error("chown(%s): %m", path);
goto error;
}
if (chmod(script, 0500) < 0) {
error("chmod: %m");
}
return xstrdup(script);
error:
(void) unlink(script);
return NULL;
}
static int _drain_node(char *reason)
{
slurm_msg_t req_msg;
update_node_msg_t update_node_msg;
memset(&update_node_msg, 0, sizeof(update_node_msg_t));
update_node_msg.node_names = conf->node_name;
update_node_msg.node_state = NODE_STATE_DRAIN;
update_node_msg.reason = reason;
update_node_msg.reason_uid = getuid();
update_node_msg.weight = NO_VAL;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_UPDATE_NODE;
req_msg.data = &update_node_msg;
if (slurm_send_only_controller_msg(&req_msg) < 0)
return SLURM_ERROR;
return SLURM_SUCCESS;
}
static void
_send_launch_failure (launch_tasks_request_msg_t *msg, slurm_addr_t *cli, int rc)
{
slurm_msg_t resp_msg;
launch_tasks_response_msg_t resp;
int nodeid;
char *name = NULL;
#ifndef HAVE_FRONT_END
nodeid = nodelist_find(msg->complete_nodelist, conf->node_name);
name = xstrdup(conf->node_name);
#else
nodeid = 0;
name = xstrdup(msg->complete_nodelist);
#endif
debug ("sending launch failure message: %s", slurm_strerror (rc));
slurm_msg_t_init(&resp_msg);
memcpy(&resp_msg.address, cli, sizeof(slurm_addr_t));
slurm_set_addr(&resp_msg.address,
msg->resp_port[nodeid % msg->num_resp_port],
NULL);
resp_msg.data = &resp;
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
resp.node_name = name;
resp.return_code = rc ? rc : -1;
resp.count_of_pids = 0;
if (_send_srun_resp_msg(&resp_msg, msg->nnodes) != SLURM_SUCCESS)
error("Failed to send RESPONSE_LAUNCH_TASKS: %m");
xfree(name);
return;
}
static void
_send_launch_resp(stepd_step_rec_t *job, int rc)
{
int i;
slurm_msg_t resp_msg;
launch_tasks_response_msg_t resp;
srun_info_t *srun = list_peek(job->sruns);
if (job->batch)
return;
debug("Sending launch resp rc=%d", rc);
slurm_msg_t_init(&resp_msg);
resp_msg.address = srun->resp_addr;
resp_msg.data = &resp;
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
resp.node_name = xstrdup(job->node_name);
resp.return_code = rc;
resp.count_of_pids = job->node_tasks;
resp.local_pids = xmalloc(job->node_tasks * sizeof(*resp.local_pids));
resp.task_ids = xmalloc(job->node_tasks * sizeof(*resp.task_ids));
for (i = 0; i < job->node_tasks; i++) {
resp.local_pids[i] = job->task[i]->pid;
resp.task_ids[i] = job->task[i]->gtid;
}
if (_send_srun_resp_msg(&resp_msg, job->nnodes) != SLURM_SUCCESS)
error("failed to send RESPONSE_LAUNCH_TASKS: %m");
xfree(resp.local_pids);
xfree(resp.task_ids);
xfree(resp.node_name);
}
static int
_send_complete_batch_script_msg(stepd_step_rec_t *job, int err, int status)
{
int rc, i, msg_rc;
slurm_msg_t req_msg;
complete_batch_script_msg_t req;
char * select_type;
bool msg_to_ctld;
select_type = slurm_get_select_type();
msg_to_ctld = strcmp(select_type, "select/serial");
xfree(select_type);
req.job_id = job->jobid;
req.job_rc = status;
req.jobacct = job->jobacct;
req.node_name = job->node_name;
req.slurm_rc = err;
req.user_id = (uint32_t) job->uid;
slurm_msg_t_init(&req_msg);
req_msg.msg_type= REQUEST_COMPLETE_BATCH_SCRIPT;
req_msg.data = &req;
info("sending REQUEST_COMPLETE_BATCH_SCRIPT, error:%u status %d",
err, status);
/* Note: these log messages don't go to slurmd.log from here */
for (i = 0; i <= MAX_RETRY; i++) {
if (msg_to_ctld) {
msg_rc = slurm_send_recv_controller_rc_msg(&req_msg,
&rc);
} else {
/* Send msg to slurmd, which forwards to slurmctld and
* may get a new job to launch */
if (i == 0) {
slurm_set_addr_char(&req_msg.address,
conf->port, "localhost");
}
msg_rc = slurm_send_recv_rc_msg_only_one(&req_msg,
&rc, 0);
}
if (msg_rc == SLURM_SUCCESS)
break;
info("Retrying job complete RPC for %u.%u",
job->jobid, job->stepid);
sleep(RETRY_DELAY);
}
if (i > MAX_RETRY) {
error("Unable to send job complete message: %m");
return SLURM_ERROR;
}
if ((rc == ESLURM_ALREADY_DONE) || (rc == ESLURM_INVALID_JOB_ID))
rc = SLURM_SUCCESS;
if (rc)
slurm_seterrno_ret(rc);
return SLURM_SUCCESS;
}
/* If get_list is false make sure ps->gid_list is initialized before
* hand to prevent xfree.
*/
static int
_drop_privileges(stepd_step_rec_t *job, bool do_setuid,
struct priv_state *ps, bool get_list)
{
ps->saved_uid = getuid();
ps->saved_gid = getgid();
if (!getcwd (ps->saved_cwd, sizeof (ps->saved_cwd))) {
error ("Unable to get current working directory: %m");
strncpy (ps->saved_cwd, "/tmp", sizeof (ps->saved_cwd));
}
ps->ngids = getgroups(0, NULL);
if (get_list) {
ps->gid_list = (gid_t *) xmalloc(ps->ngids * sizeof(gid_t));
if (getgroups(ps->ngids, ps->gid_list) == -1) {
error("_drop_privileges: couldn't get %d groups: %m",
ps->ngids);
xfree(ps->gid_list);
return -1;
}
}
/*
* No need to drop privileges if we're not running as root
*/
if (getuid() != (uid_t) 0)
return SLURM_SUCCESS;
if (setegid(job->gid) < 0) {
error("setegid: %m");
return -1;
}
if (_initgroups(job) < 0) {
error("_initgroups: %m");
}
if (do_setuid && seteuid(job->uid) < 0) {
error("seteuid: %m");
return -1;
}
return SLURM_SUCCESS;
}
static int
_reclaim_privileges(struct priv_state *ps)
{
int rc = SLURM_SUCCESS;
/*
* No need to reclaim privileges if our uid == job->uid
*/
if (geteuid() == ps->saved_uid)
goto done;
else if (seteuid(ps->saved_uid) < 0) {
error("seteuid: %m");
rc = -1;
} else if (setegid(ps->saved_gid) < 0) {
error("setegid: %m");
rc = -1;
} else
setgroups(ps->ngids, ps->gid_list);
done:
xfree(ps->gid_list);
return rc;
}
static int
_slurmd_job_log_init(stepd_step_rec_t *job)
{
char argv0[64];
conf->log_opts.buffered = 1;
/*
* Reset stderr logging to user requested level
* (Logfile and syslog levels remain the same)
*
* The maximum stderr log level is LOG_LEVEL_DEBUG3 because
* some higher level debug messages are generated in the
* stdio code, which would otherwise create more stderr traffic
* to srun and therefore more debug messages in an endless loop.
*/
conf->log_opts.stderr_level = LOG_LEVEL_ERROR + job->debug;
if (conf->log_opts.stderr_level > LOG_LEVEL_DEBUG3)
conf->log_opts.stderr_level = LOG_LEVEL_DEBUG3;
#if defined(MULTIPLE_SLURMD)
snprintf(argv0, sizeof(argv0), "slurmstepd-%s", conf->node_name);
#else
snprintf(argv0, sizeof(argv0), "slurmstepd");
#endif
/*
* reinitialize log
*/
log_alter(conf->log_opts, 0, NULL);
log_set_argv0(argv0);
/* Connect slurmd stderr to stderr of job, unless we are using
* user_managed_io or a pty.
*
* user_managed_io directly connects the client (e.g. poe) to the tasks
* over a TCP connection, and we fully leave it up to the client
* to manage the stream with no buffering on slurm's part.
* We also promise that we will not insert any foreign data into
* the stream, so here we need to avoid connecting slurmstepd's
* STDERR_FILENO to the tasks's stderr.
*
* When pty terminal emulation is used, the pts can potentially
* cause IO to block, so we need to avoid connecting slurmstepd's
* STDERR_FILENO to the task's pts on stderr to avoid hangs in
* the slurmstepd.
*/
if (!job->user_managed_io && !job->pty && job->task != NULL) {
if (dup2(job->task[0]->stderr_fd, STDERR_FILENO) < 0) {
error("job_log_init: dup2(stderr): %m");
return ESLURMD_IO_ERROR;
}
}
verbose("debug level = %d", conf->log_opts.stderr_level);
return SLURM_SUCCESS;
}
static void
_setargs(stepd_step_rec_t *job)
{
if (job->jobid > MAX_NOALLOC_JOBID)
return;
if ((job->jobid >= MIN_NOALLOC_JOBID) || (job->stepid == NO_VAL))
setproctitle("[%u]", job->jobid);
else
setproctitle("[%u.%u]", job->jobid, job->stepid);
return;
}
/*
* Set the priority of the job to be the same as the priority of
* the process that launched the job on the submit node.
* In support of the "PropagatePrioProcess" config keyword.
*/
static void _set_prio_process (stepd_step_rec_t *job)
{
char *env_name = "SLURM_PRIO_PROCESS";
char *env_val;
int prio_daemon, prio_process;
if (!(env_val = getenvp( job->env, env_name ))) {
error( "Couldn't find %s in environment", env_name );
prio_process = 0;
} else {
/* Users shouldn't get this in their environment */
unsetenvp( job->env, env_name );
prio_process = atoi( env_val );
}
if (conf->propagate_prio == PROP_PRIO_NICER) {
prio_daemon = getpriority( PRIO_PROCESS, 0 );
prio_process = MAX( prio_process, (prio_daemon + 1) );
}
if (setpriority( PRIO_PROCESS, 0, prio_process ))
error( "setpriority(PRIO_PROCESS, %d): %m", prio_process );
else {
debug2( "_set_prio_process: setpriority %d succeeded",
prio_process);
}
}
static int
_become_user(stepd_step_rec_t *job, struct priv_state *ps)
{
/*
* First reclaim the effective uid and gid
*/
if (geteuid() == ps->saved_uid)
return SLURM_SUCCESS;
if (seteuid(ps->saved_uid) < 0) {
error("_become_user seteuid: %m");
return SLURM_ERROR;
}
if (setegid(ps->saved_gid) < 0) {
error("_become_user setegid: %m");
return SLURM_ERROR;
}
/*
* Now drop real, effective, and saved uid/gid
*/
if (setregid(job->gid, job->gid) < 0) {
error("setregid: %m");
return SLURM_ERROR;
}
if (setreuid(job->uid, job->uid) < 0) {
error("setreuid: %m");
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
static int
_initgroups(stepd_step_rec_t *job)
{
int rc;
if (job->ngids > 0) {
xassert(job->gids);
debug2("Using gid list sent by slurmd");
return setgroups(job->ngids, job->gids);
}
debug2("Uncached user/gid: %s/%ld", job->user_name, (long)job->gid);
if ((rc = initgroups(job->user_name, job->gid))) {
if ((errno == EPERM) && (getuid() != (uid_t) 0)) {
debug("Error in initgroups(%s, %ld): %m",
job->user_name, (long)job->gid);
} else {
error("Error in initgroups(%s, %ld): %m",
job->user_name, (long)job->gid);
}
return -1;
}
return 0;
}
/*
* Check this user's access rights to a file
* path IN: pathname of file to test
* modes IN: desired access
* uid IN: user ID to access the file
* gid IN: group ID to access the file
* RET 0 on success, -1 on failure
*/
static int _access(const char *path, int modes, uid_t uid, gid_t gid)
{
struct stat buf;
int f_mode;
if (stat(path, &buf) != 0)
return -1;
if (buf.st_uid == uid)
f_mode = (buf.st_mode >> 6) & 07;
else if (buf.st_gid == gid)
f_mode = (buf.st_mode >> 3) & 07;
else
f_mode = buf.st_mode & 07;
if ((f_mode & modes) == modes)
return 0;
return -1;
}
/*
* Run a script as a specific user, with the specified uid, gid, and
* extended groups.
*
* name IN: class of program (task prolog, task epilog, etc.),
* path IN: pathname of program to run
* job IN: slurd job structue, used to get uid, gid, and groups
* max_wait IN: maximum time to wait in seconds, -1 for no limit
* env IN: environment variables to use on exec, sets minimal environment
* if NULL
*
* RET 0 on success, -1 on failure.
*/
int
_run_script_as_user(const char *name, const char *path, stepd_step_rec_t *job,
int max_wait, char **env)
{
int status, rc, opt;
pid_t cpid;
struct exec_wait_info *ei;
xassert(env);
if (path == NULL || path[0] == '\0')
return 0;
debug("[job %u] attempting to run %s [%s]", job->jobid, name, path);
if (_access(path, 5, job->uid, job->gid) < 0) {
error("Could not run %s [%s]: access denied", name, path);
return -1;
}
if ((ei = fork_child_with_wait_info(0)) == NULL) {
error ("executing %s: fork: %m", name);
return -1;
}
if ((cpid = exec_wait_get_pid (ei)) == 0) {
struct priv_state sprivs;
char *argv[2];
/* container_g_add_pid needs to be called in the
forked process part of the fork to avoid a race
condition where if this process makes a file or
detacts itself from a child before we add the pid
to the container in the parent of the fork.
*/
if ((job->jobid != 0) && /* Ignore system processes */
(container_g_add_pid(job->jobid, getpid(), job->uid)
!= SLURM_SUCCESS))
error("container_g_add_pid(%u): %m", job->jobid);
argv[0] = (char *)xstrdup(path);
argv[1] = NULL;
sprivs.gid_list = NULL; /* initialize to prevent xfree */
if (_drop_privileges(job, true, &sprivs, false) < 0) {
error("run_script_as_user _drop_privileges: %m");
/* child process, should not return */
exit(127);
}
if (_become_user(job, &sprivs) < 0) {
error("run_script_as_user _become_user failed: %m");
/* child process, should not return */
exit(127);
}
if (chdir(job->cwd) == -1)
error("run_script_as_user: couldn't "
"change working dir to %s: %m", job->cwd);
#ifdef SETPGRP_TWO_ARGS
setpgrp(0, 0);
#else
setpgrp();
#endif
/*
* Wait for signal from parent
*/
exec_wait_child_wait_for_parent (ei);
execve(path, argv, env);
error("execve(%s): %m", path);
exit(127);
}
if (exec_wait_signal_child (ei) < 0)
error ("run_script_as_user: Failed to wakeup %s", name);
exec_wait_info_destroy (ei);
if (max_wait < 0)
opt = 0;
else
opt = WNOHANG;
while (1) {
rc = waitpid(cpid, &status, opt);
if (rc < 0) {
if (errno == EINTR)
continue;
error("waidpid: %m");
status = 0;
break;
} else if (rc == 0) {
sleep(1);
if ((--max_wait) <= 0) {
killpg(cpid, SIGKILL);
opt = 0;
}
} else {
/* spawned process exited */
break;
}
}
/* Insure that all child processes get killed, one last time */
killpg(cpid, SIGKILL);
return status;
}