blob: 50b68ae12493a507fbeebd96e49f2fb5dbf92410 [file] [log] [blame] [edit]
/*****************************************************************************\
* step_launch.c - launch a parallel job step
*****************************************************************************
* Copyright (C) 2006-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Christopher J. Morrone <morrone2@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <errno.h>
#include <fcntl.h>
#include <grp.h>
#include <limits.h>
#include <netdb.h> /* for gethostbyname */
#include <netinet/in.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "slurm/slurm.h"
#include "src/common/cpu_frequency.h"
#include "src/common/eio.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/hostlist.h"
#include "src/common/macros.h"
#include "src/common/net.h"
#include "src/common/plugstack.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_cred.h"
#include "src/common/slurm_mpi.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/slurm_time.h"
#include "src/common/strlcpy.h"
#include "src/common/uid.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/api/step_launch.h"
#include "src/api/step_ctx.h"
#include "src/api/pmi_server.h"
#define STEP_ABORT_TIME 2
extern char **environ;
/**********************************************************************
* General declarations for step launch code
**********************************************************************/
static int _launch_tasks(slurm_step_ctx_t *ctx,
launch_tasks_request_msg_t *launch_msg,
uint32_t timeout, char *nodelist, int start_nodeid);
static char *_lookup_cwd(void);
static void _print_launch_msg(launch_tasks_request_msg_t *msg,
char *hostname, int nodeid);
/**********************************************************************
* Message handler declarations
**********************************************************************/
static pid_t srun_ppid = (pid_t) 0;
static uid_t slurm_uid;
static bool force_terminated_job = false;
static int task_exit_signal = 0;
static void _exec_prog(slurm_msg_t *msg);
static int _msg_thr_create(struct step_launch_state *sls, int num_nodes);
static void _handle_msg(void *arg, slurm_msg_t *msg);
static int _cr_notify_step_launch(slurm_step_ctx_t *ctx);
static void *_check_io_timeout(void *_sls);
static struct io_operations message_socket_ops = {
.readable = &eio_message_socket_readable,
.handle_read = &eio_message_socket_accept,
.handle_msg = &_handle_msg
};
/**********************************************************************
* API functions
**********************************************************************/
/*
* slurm_step_launch_params_t_init - initialize a user-allocated
* slurm_step_launch_params_t structure with default values.
* This function will NOT allocate any new memory.
* IN ptr - pointer to a structure allocated by the user.
* The structure will be initialized.
*/
extern void slurm_step_launch_params_t_init(slurm_step_launch_params_t *ptr)
{
static slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER;
/* Initialize all values to zero ("NULL" for pointers) */
memset(ptr, 0, sizeof(slurm_step_launch_params_t));
ptr->buffered_stdio = true;
memcpy(&ptr->local_fds, &fds, sizeof(fds));
ptr->gid = getgid();
ptr->cpu_freq_min = NO_VAL;
ptr->cpu_freq_max = NO_VAL;
ptr->cpu_freq_gov = NO_VAL;
ptr->node_offset = NO_VAL;
ptr->pack_jobid = NO_VAL;
ptr->pack_nnodes = NO_VAL;
ptr->pack_ntasks = NO_VAL;
ptr->pack_offset = NO_VAL;
ptr->pack_task_offset = NO_VAL;
}
/*
* Specify the plugin name to be used. This may be needed to specify the
* non-default MPI plugin when using Slurm API to launch tasks.
* IN plugin name - "none", "pmi2", etc.
* RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
*/
extern int slurm_mpi_plugin_init(char *plugin_name)
{
return mpi_hook_client_init(plugin_name);
}
/*
* For a pack job step, rebuild the MPI data structure to show what is running
* in a single MPI_COMM_WORLD
*/
static void _rebuild_mpi_layout(slurm_step_ctx_t *ctx,
const slurm_step_launch_params_t *params)
{
slurm_step_layout_t *new_step_layout, *orig_step_layout;
ctx->launch_state->mpi_info->pack_jobid = params->pack_jobid;
new_step_layout = xmalloc(sizeof(slurm_step_layout_t));
orig_step_layout = ctx->launch_state->mpi_info->step_layout;
ctx->launch_state->mpi_info->step_layout = new_step_layout;
if (orig_step_layout->front_end) {
new_step_layout->front_end =
xstrdup(orig_step_layout->front_end);
}
new_step_layout->node_cnt = params->pack_nnodes;
new_step_layout->node_list = xstrdup(params->pack_node_list);
new_step_layout->plane_size = orig_step_layout->plane_size;
new_step_layout->start_protocol_ver =
orig_step_layout->start_protocol_ver;
new_step_layout->tasks = params->pack_task_cnts;
new_step_layout->task_cnt = params->pack_ntasks;
new_step_layout->task_dist = orig_step_layout->task_dist;
new_step_layout->tids = params->pack_tids;
}
/*
* slurm_step_launch - launch a parallel job step
* IN ctx - job step context generated by slurm_step_ctx_create
* IN params - job step parameters
* IN callbacks - Identify functions to be called when various events occur
* IN pack_job_cnt - Total count of pack job steps to be launched, -1 otherwise
* RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
*/
extern int slurm_step_launch(slurm_step_ctx_t *ctx,
const slurm_step_launch_params_t *params,
const slurm_step_launch_callbacks_t *callbacks,
int pack_job_cnt)
{
launch_tasks_request_msg_t launch;
char **env = NULL;
char **mpi_env = NULL;
int rc = SLURM_SUCCESS;
bool preserve_env = params->preserve_env;
debug("Entering %s", __func__);
memset(&launch, 0, sizeof(launch));
if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
error("%s: Not a valid slurm_step_ctx_t", __func__);
slurm_seterrno(EINVAL);
return SLURM_ERROR;
}
/* Initialize the callback pointers */
if (callbacks != NULL) {
/* copy the user specified callback pointers */
memcpy(&(ctx->launch_state->callback), callbacks,
sizeof(slurm_step_launch_callbacks_t));
} else {
/* set all callbacks to NULL */
memset(&(ctx->launch_state->callback), 0,
sizeof(slurm_step_launch_callbacks_t));
}
if (mpi_hook_client_init(params->mpi_plugin_name) == SLURM_ERROR) {
slurm_seterrno(SLURM_MPI_PLUGIN_NAME_INVALID);
return SLURM_ERROR;
}
if (params->pack_jobid && (params->pack_jobid != NO_VAL))
_rebuild_mpi_layout(ctx, params);
mpi_env = xmalloc(sizeof(char *)); /* Needed for setenvf used by MPI */
if ((ctx->launch_state->mpi_state =
mpi_hook_client_prelaunch(ctx->launch_state->mpi_info, &mpi_env))
== NULL) {
slurm_seterrno(SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED);
return SLURM_ERROR;
}
/* Create message receiving sockets and handler thread */
rc = _msg_thr_create(ctx->launch_state,
ctx->step_resp->step_layout->node_cnt);
if (rc != SLURM_SUCCESS)
return rc;
/* Start tasks on compute nodes */
launch.job_id = ctx->step_req->job_id;
launch.uid = ctx->step_req->user_id;
launch.gid = params->gid;
launch.argc = params->argc;
launch.argv = params->argv;
launch.spank_job_env = params->spank_job_env;
launch.spank_job_env_size = params->spank_job_env_size;
launch.cred = ctx->step_resp->cred;
launch.job_step_id = ctx->step_resp->job_step_id;
launch.node_offset = params->node_offset;
launch.pack_jobid = params->pack_jobid;
launch.pack_nnodes = params->pack_nnodes;
launch.pack_ntasks = params->pack_ntasks;
launch.pack_offset = params->pack_offset;
launch.pack_task_offset = params->pack_task_offset;
launch.pack_task_cnts = params->pack_task_cnts;
launch.pack_node_list = params->pack_node_list;
if (params->env == NULL) {
/*
* If the user didn't specify an environment, then use the
* environment of the running process
*/
env_array_merge(&env, (const char **)environ);
} else {
env_array_merge(&env, (const char **)params->env);
}
if (params->pack_ntasks != NO_VAL)
preserve_env = true;
env_array_for_step(&env, ctx->step_resp, &launch,
ctx->launch_state->resp_port[0], preserve_env);
env_array_merge(&env, (const char **)mpi_env);
env_array_free(mpi_env);
launch.envc = envcount(env);
launch.env = env;
if (params->cwd)
launch.cwd = xstrdup(params->cwd);
else
launch.cwd = _lookup_cwd();
launch.alias_list = params->alias_list;
launch.nnodes = ctx->step_resp->step_layout->node_cnt;
launch.ntasks = ctx->step_resp->step_layout->task_cnt;
launch.slurmd_debug = params->slurmd_debug;
launch.switch_job = ctx->step_resp->switch_job;
launch.profile = params->profile;
launch.task_prolog = params->task_prolog;
launch.task_epilog = params->task_epilog;
launch.cpu_bind_type = params->cpu_bind_type;
launch.cpu_bind = params->cpu_bind;
launch.cpu_freq_min = params->cpu_freq_min;
launch.cpu_freq_max = params->cpu_freq_max;
launch.cpu_freq_gov = params->cpu_freq_gov;
launch.tres_bind = params->tres_bind;
launch.tres_freq = params->tres_freq;
launch.mem_bind_type = params->mem_bind_type;
launch.mem_bind = params->mem_bind;
launch.accel_bind_type = params->accel_bind_type;
launch.flags = 0;
if (params->multi_prog)
launch.flags |= LAUNCH_MULTI_PROG;
launch.cpus_per_task = params->cpus_per_task;
launch.ntasks_per_board = params->ntasks_per_board;
launch.ntasks_per_core = params->ntasks_per_core;
launch.ntasks_per_socket= params->ntasks_per_socket;
if (params->no_alloc)
launch.flags |= LAUNCH_NO_ALLOC;
launch.task_dist = params->task_dist;
launch.partition = params->partition;
if (params->pty)
launch.flags |= LAUNCH_PTY;
launch.ckpt_dir = params->ckpt_dir;
launch.restart_dir = params->restart_dir;
launch.acctg_freq = params->acctg_freq;
launch.open_mode = params->open_mode;
launch.options = job_options_create();
launch.complete_nodelist =
xstrdup(ctx->step_resp->step_layout->node_list);
spank_set_remote_options (launch.options);
if (params->parallel_debug)
launch.flags |= LAUNCH_PARALLEL_DEBUG;
launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
launch.global_task_ids = ctx->step_resp->step_layout->tids;
launch.select_jobinfo = ctx->step_resp->select_jobinfo;
if (params->user_managed_io)
launch.flags |= LAUNCH_USER_MANAGED_IO;
ctx->launch_state->user_managed_io = params->user_managed_io;
if (!ctx->launch_state->user_managed_io) {
launch.ofname = params->remote_output_filename;
launch.efname = params->remote_error_filename;
launch.ifname = params->remote_input_filename;
if (params->buffered_stdio)
launch.flags |= LAUNCH_BUFFERED_IO;
if (params->labelio)
launch.flags |= LAUNCH_LABEL_IO;
ctx->launch_state->io.normal =
client_io_handler_create(params->local_fds,
ctx->step_req->num_tasks,
launch.nnodes,
ctx->step_resp->cred,
params->labelio,
params->pack_offset,
params->pack_task_offset);
if (ctx->launch_state->io.normal == NULL) {
rc = SLURM_ERROR;
goto fail1;
}
/*
* The client_io_t gets a pointer back to the slurm_launch_state
* to notify it of I/O errors.
*/
ctx->launch_state->io.normal->sls = ctx->launch_state;
if (client_io_handler_start(ctx->launch_state->io.normal)
!= SLURM_SUCCESS) {
rc = SLURM_ERROR;
goto fail1;
}
launch.num_io_port = ctx->launch_state->io.normal->num_listen;
launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
(sizeof(uint16_t) * launch.num_io_port));
/*
* If the io timeout is > 0, create a flag to ping the stepds
* if io_timeout seconds pass without stdio traffic to/from
* the node.
*/
ctx->launch_state->io_timeout = slurm_get_msg_timeout();
} else { /* user_managed_io is true */
/* initialize user_managed_io_t */
ctx->launch_state->io.user =
(user_managed_io_t *)xmalloc(sizeof(user_managed_io_t));
ctx->launch_state->io.user->connected = 0;
ctx->launch_state->io.user->sockets =
(int *)xmalloc(sizeof(int) * ctx->step_req->num_tasks);
}
launch.num_resp_port = ctx->launch_state->num_resp_port;
launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port);
memcpy(launch.resp_port, ctx->launch_state->resp_port,
(sizeof(uint16_t) * launch.num_resp_port));
rc = _launch_tasks(ctx, &launch, params->msg_timeout,
launch.complete_nodelist, 0);
/* clean up */
xfree(launch.resp_port);
if (!ctx->launch_state->user_managed_io)
xfree(launch.io_port);
fail1:
xfree(launch.user_name);
xfree(launch.complete_nodelist);
xfree(launch.cwd);
env_array_free(env);
job_options_destroy(launch.options);
return rc;
}
/*
* slurm_step_launch_add - Add tasks to a step that was already started
* IN ctx - job step context generated by slurm_step_ctx_create
* IN first_ctx - job step context generated by slurm_step_ctx_create for
* first component of the job step
* IN params - job step parameters
* IN node_list - list of extra nodes to add
* IN start_nodeid - in the global scheme which node id is the first
* node in node_list.
* RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
*/
extern int slurm_step_launch_add(slurm_step_ctx_t *ctx,
slurm_step_ctx_t *first_ctx,
const slurm_step_launch_params_t *params,
char *node_list, int start_nodeid)
{
launch_tasks_request_msg_t launch;
char **env = NULL;
char **mpi_env = NULL;
int rc = SLURM_SUCCESS;
uint16_t resp_port = 0;
bool preserve_env = params->preserve_env;
debug("Entering %s", __func__);
if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
error("%s: Not a valid slurm_step_ctx_t", __func__);
slurm_seterrno(EINVAL);
return SLURM_ERROR;
}
memset(&launch, 0, sizeof(launch));
/* Start tasks on compute nodes */
launch.job_id = ctx->step_req->job_id;
launch.uid = ctx->step_req->user_id;
launch.gid = params->gid;
launch.argc = params->argc;
launch.argv = params->argv;
launch.spank_job_env = params->spank_job_env;
launch.spank_job_env_size = params->spank_job_env_size;
launch.cred = ctx->step_resp->cred;
launch.job_step_id = ctx->step_resp->job_step_id;
launch.pack_jobid = params->pack_jobid;
launch.pack_nnodes = params->pack_nnodes;
launch.pack_ntasks = params->pack_ntasks;
launch.pack_offset = params->pack_offset;
launch.pack_task_offset = params->pack_task_offset;
launch.pack_task_cnts = params->pack_task_cnts;
launch.pack_node_list = params->pack_node_list;
if (params->env == NULL) {
/*
* if the user didn't specify an environment, grab the
* environment of the running process
*/
env_array_merge(&env, (const char **)environ);
} else {
env_array_merge(&env, (const char **)params->env);
}
if (first_ctx->launch_state->resp_port)
resp_port = first_ctx->launch_state->resp_port[0];
if (params->pack_ntasks != NO_VAL)
preserve_env = true;
env_array_for_step(&env, ctx->step_resp, &launch, resp_port,
preserve_env);
env_array_merge(&env, (const char **)mpi_env);
env_array_free(mpi_env);
launch.envc = envcount(env);
launch.env = env;
if (params->cwd)
launch.cwd = xstrdup(params->cwd);
else
launch.cwd = _lookup_cwd();
launch.alias_list = params->alias_list;
launch.nnodes = ctx->step_resp->step_layout->node_cnt;
launch.ntasks = ctx->step_resp->step_layout->task_cnt;
launch.slurmd_debug = params->slurmd_debug;
launch.switch_job = ctx->step_resp->switch_job;
launch.profile = params->profile;
launch.task_prolog = params->task_prolog;
launch.task_epilog = params->task_epilog;
launch.cpu_bind_type = params->cpu_bind_type;
launch.cpu_bind = params->cpu_bind;
launch.cpu_freq_min = params->cpu_freq_min;
launch.cpu_freq_max = params->cpu_freq_max;
launch.cpu_freq_gov = params->cpu_freq_gov;
launch.tres_bind = params->tres_bind;
launch.tres_freq = params->tres_freq;
launch.mem_bind_type = params->mem_bind_type;
launch.mem_bind = params->mem_bind;
launch.accel_bind_type = params->accel_bind_type;
launch.flags = 0;
if (params->multi_prog)
launch.flags |= LAUNCH_MULTI_PROG;
launch.cpus_per_task = params->cpus_per_task;
launch.task_dist = params->task_dist;
launch.partition = params->partition;
if (params->pty)
launch.flags |= LAUNCH_PTY;
launch.ckpt_dir = params->ckpt_dir;
launch.restart_dir = params->restart_dir;
launch.acctg_freq = params->acctg_freq;
launch.open_mode = params->open_mode;
launch.options = job_options_create();
launch.complete_nodelist =
xstrdup(ctx->step_resp->step_layout->node_list);
spank_set_remote_options (launch.options);
if (params->parallel_debug)
launch.flags |= LAUNCH_PARALLEL_DEBUG;
launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
launch.global_task_ids = ctx->step_resp->step_layout->tids;
launch.select_jobinfo = ctx->step_resp->select_jobinfo;
if (params->user_managed_io)
launch.flags |= LAUNCH_USER_MANAGED_IO;
/* user_managed_io is true */
if (!ctx->launch_state->io.user) {
launch.ofname = params->remote_output_filename;
launch.efname = params->remote_error_filename;
launch.ifname = params->remote_input_filename;
if (params->buffered_stdio)
launch.flags |= LAUNCH_BUFFERED_IO;
if (params->labelio)
launch.flags |= LAUNCH_LABEL_IO;
ctx->launch_state->io.normal =
client_io_handler_create(params->local_fds,
ctx->step_req->num_tasks,
launch.nnodes,
ctx->step_resp->cred,
params->labelio,
params->pack_offset,
params->pack_task_offset);
if (ctx->launch_state->io.normal == NULL) {
rc = SLURM_ERROR;
goto fail1;
}
/*
* The client_io_t gets a pointer back to the slurm_launch_state
* to notify it of I/O errors.
*/
ctx->launch_state->io.normal->sls = ctx->launch_state;
if (client_io_handler_start(ctx->launch_state->io.normal)
!= SLURM_SUCCESS) {
rc = SLURM_ERROR;
goto fail1;
}
launch.num_io_port = ctx->launch_state->io.normal->num_listen;
launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
(sizeof(uint16_t) * launch.num_io_port));
/*
* If the io timeout is > 0, create a flag to ping the stepds
* if io_timeout seconds pass without stdio traffic to/from
* the node.
*/
ctx->launch_state->io_timeout = slurm_get_msg_timeout();
} else { /* user_managed_io is true */
xrealloc(ctx->launch_state->io.user->sockets,
sizeof(int) * ctx->step_req->num_tasks);
}
if (first_ctx->launch_state->num_resp_port &&
first_ctx->launch_state->resp_port) {
launch.num_resp_port = first_ctx->launch_state->num_resp_port;
launch.resp_port = xmalloc(sizeof(uint16_t) *
launch.num_resp_port);
memcpy(launch.resp_port, first_ctx->launch_state->resp_port,
(sizeof(uint16_t) * launch.num_resp_port));
}
rc = _launch_tasks(ctx, &launch, params->msg_timeout,
node_list, start_nodeid);
fail1:
/* clean up */
xfree(launch.user_name);
xfree(launch.resp_port);
if (!ctx->launch_state->user_managed_io)
xfree(launch.io_port);
xfree(launch.cwd);
env_array_free(env);
job_options_destroy(launch.options);
return rc;
}
static void _step_abort(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls = ctx->launch_state;
if (!sls->abort_action_taken) {
slurm_kill_job_step(ctx->job_id, ctx->step_resp->job_step_id,
SIGKILL);
sls->abort_action_taken = true;
}
}
/*
* Block until all tasks have started.
*/
int slurm_step_launch_wait_start(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls = ctx->launch_state;
struct timespec ts;
ts.tv_sec = time(NULL);
ts.tv_nsec = 0;
ts.tv_sec += 600; /* 10 min allowed for launch */
/* Wait for all tasks to start */
slurm_mutex_lock(&sls->lock);
while (bit_set_count(sls->tasks_started) < sls->tasks_requested) {
if (sls->abort) {
_step_abort(ctx);
slurm_mutex_unlock(&sls->lock);
return SLURM_ERROR;
}
if (pthread_cond_timedwait(&sls->cond, &sls->lock, &ts) ==
ETIMEDOUT) {
error("timeout waiting for task launch, "
"started %d of %d tasks",
bit_set_count(sls->tasks_started),
sls->tasks_requested);
sls->abort = true;
_step_abort(ctx);
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
return SLURM_ERROR;
}
}
if (sls->user_managed_io) {
while (sls->io.user->connected < sls->tasks_requested) {
if (sls->abort) {
_step_abort(ctx);
slurm_mutex_unlock(&sls->lock);
return SLURM_ERROR;
}
if (pthread_cond_timedwait(&sls->cond, &sls->lock,
&ts) == ETIMEDOUT) {
error("timeout waiting for I/O connect");
sls->abort = true;
_step_abort(ctx);
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
return SLURM_ERROR;
}
}
}
_cr_notify_step_launch(ctx);
slurm_mutex_unlock(&sls->lock);
return SLURM_SUCCESS;
}
/*
* Block until all tasks have finished (or failed to start altogether).
*/
void slurm_step_launch_wait_finish(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls;
struct timespec ts = {0, 0};
bool time_set = false;
int errnum;
if (!ctx || (ctx->magic != STEP_CTX_MAGIC))
return;
sls = ctx->launch_state;
/* Wait for all tasks to complete */
slurm_mutex_lock(&sls->lock);
while (bit_set_count(sls->tasks_exited) < sls->tasks_requested) {
if (!sls->abort) {
slurm_cond_wait(&sls->cond, &sls->lock);
} else {
if (!sls->abort_action_taken) {
slurm_kill_job_step(ctx->job_id,
ctx->step_resp->
job_step_id,
SIGKILL);
sls->abort_action_taken = true;
}
if (!time_set) {
uint16_t kill_wait;
/* Only set the time once, because we only want
* to wait STEP_ABORT_TIME, no matter how many
* times the condition variable is signaled.
*/
kill_wait = slurm_get_kill_wait();
ts.tv_sec = time(NULL) + STEP_ABORT_TIME
+ kill_wait;
time_set = true;
/* FIXME - should this be a callback? */
info("Job step aborted: Waiting up to "
"%d seconds for job step to finish.",
kill_wait + STEP_ABORT_TIME);
}
errnum = pthread_cond_timedwait(&sls->cond,
&sls->lock, &ts);
if (errnum == ETIMEDOUT) {
error("Timed out waiting for job step to "
"complete");
/*
* Send kill again, in case steps were still
* launching the first time.
* FIXME - eventually the slurmd should
* be made smart enough to really ensure
* that a killed step never starts.
*/
slurm_kill_job_step(ctx->job_id,
ctx->step_resp->job_step_id,
SIGKILL);
if (!sls->user_managed_io) {
client_io_handler_abort(sls->
io.normal);
}
break;
} else if (errnum != 0) {
error("Error waiting on condition in"
" slurm_step_launch_wait_finish: %m");
if (!sls->user_managed_io) {
client_io_handler_abort(sls->
io.normal);
}
break;
}
}
}
if (sls->abort && !time_set)
info("Job step aborted"); /* no need to wait */
if (!force_terminated_job && task_exit_signal)
info("Force Terminated job step %u.%u",
ctx->job_id, ctx->step_resp->job_step_id);
/*
* task_exit_signal != 0 when srun receives a message that a task
* exited with a SIGTERM or SIGKILL. Without this test, a hang in srun
* might occur when a node gets a hard power failure, and TCP does not
* indicate that the I/O connection closed. The I/O thread could
* block waiting for an EOF message, even though the remote process
* has died. In this case, use client_io_handler_abort to force the
* I/O thread to stop listening for stdout or stderr and shutdown.
*/
if (task_exit_signal && !sls->user_managed_io) {
client_io_handler_abort(sls->io.normal);
}
/* Then shutdown the message handler thread */
if (sls->msg_handle)
eio_signal_shutdown(sls->msg_handle);
slurm_mutex_unlock(&sls->lock);
if (sls->msg_thread)
pthread_join(sls->msg_thread, NULL);
slurm_mutex_lock(&sls->lock);
pmi_kvs_free();
if (sls->msg_handle) {
eio_handle_destroy(sls->msg_handle);
sls->msg_handle = NULL;
}
/* Shutdown the IO timeout thread, if one exists */
if (sls->io_timeout_thread_created) {
sls->halt_io_test = true;
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
pthread_join(sls->io_timeout_thread, NULL);
slurm_mutex_lock(&sls->lock);
}
/* Then wait for the IO thread to finish */
if (!sls->user_managed_io) {
slurm_mutex_unlock(&sls->lock);
client_io_handler_finish(sls->io.normal);
slurm_mutex_lock(&sls->lock);
client_io_handler_destroy(sls->io.normal);
sls->io.normal = NULL;
}
mpi_hook_client_fini(sls->mpi_state);
slurm_mutex_unlock(&sls->lock);
}
/*
* Abort an in-progress launch, or terminate the fully launched job step.
*
* Can be called from a signal handler.
*/
void slurm_step_launch_abort(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls;
if (!ctx || ctx->magic != STEP_CTX_MAGIC)
return;
sls = ctx->launch_state;
slurm_mutex_lock(&sls->lock);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
/*
* Forward a signal to all those nodes with running tasks
*/
extern void slurm_step_launch_fwd_signal(slurm_step_ctx_t *ctx, int signo)
{
int node_id, j, num_tasks;
slurm_msg_t req;
signal_tasks_msg_t msg;
hostlist_t hl;
char *name = NULL;
List ret_list = NULL;
ListIterator itr;
ret_data_info_t *ret_data_info = NULL;
int rc = SLURM_SUCCESS;
struct step_launch_state *sls = ctx->launch_state;
bool retry = false;
int retry_cnt = 0;
/* common to all tasks */
memset(&msg, 0, sizeof(msg));
msg.job_id = ctx->job_id;
msg.job_step_id = ctx->step_resp->job_step_id;
msg.signal = (uint16_t) signo;
slurm_mutex_lock(&sls->lock);
hl = hostlist_create(NULL);
for (node_id = 0;
node_id < ctx->step_resp->step_layout->node_cnt;
node_id++) {
bool active = false;
num_tasks = sls->layout->tasks[node_id];
for (j = 0; j < num_tasks; j++) {
if (!bit_test(sls->tasks_exited,
sls->layout->tids[node_id][j])) {
/* this one has active tasks */
active = true;
break;
}
}
if (!active)
continue;
if (ctx->step_resp->step_layout->front_end) {
hostlist_push_host(hl,
ctx->step_resp->step_layout->front_end);
break;
} else {
name = nodelist_nth_host(sls->layout->node_list,
node_id);
hostlist_push_host(hl, name);
free(name);
}
}
slurm_mutex_unlock(&sls->lock);
if (!hostlist_count(hl)) {
verbose("no active tasks in step %u.%u to send signal %d",
ctx->job_id, ctx->step_resp->job_step_id, signo);
hostlist_destroy(hl);
return;
}
name = hostlist_ranged_string_xmalloc(hl);
hostlist_destroy(hl);
RESEND: slurm_msg_t_init(&req);
req.msg_type = REQUEST_SIGNAL_TASKS;
req.data = &msg;
if (ctx->step_resp->use_protocol_ver)
req.protocol_version = ctx->step_resp->use_protocol_ver;
debug2("sending signal %d to step %u.%u on hosts %s",
signo, ctx->job_id, ctx->step_resp->job_step_id, name);
if (!(ret_list = slurm_send_recv_msgs(name, &req, 0, false))) {
error("fwd_signal: slurm_send_recv_msgs really failed badly");
xfree(name);
return;
}
itr = list_iterator_create(ret_list);
while ((ret_data_info = list_next(itr))) {
rc = slurm_get_return_code(ret_data_info->type,
ret_data_info->data);
/*
* Report error unless it is "Invalid job id" which
* probably just means the tasks exited in the meanwhile.
*/
if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) &&
(rc != ESLURMD_JOB_NOTRUNNING) && (rc != ESRCH) &&
(rc != EAGAIN) &&
(rc != ESLURM_TRANSITION_STATE_NO_UPDATE)) {
error("Failure sending signal %d to step %u.%u on node %s: %s",
signo, ctx->job_id, ctx->step_resp->job_step_id,
ret_data_info->node_name, slurm_strerror(rc));
}
if ((rc == EAGAIN) || (rc == ESLURM_TRANSITION_STATE_NO_UPDATE))
retry = true;
}
list_iterator_destroy(itr);
FREE_NULL_LIST(ret_list);
if (retry) {
retry = false;
if (retry_cnt++ < 4) {
sleep(retry_cnt);
goto RESEND;
}
}
xfree(name);
}
/**********************************************************************
* Functions used by step_ctx code, but not exported throught the API
**********************************************************************/
/*
* Create a launch state structure for a specified step context, "ctx".
*/
struct step_launch_state *step_launch_state_create(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls;
slurm_step_layout_t *layout = ctx->step_resp->step_layout;
int ii;
sls = xmalloc(sizeof(struct step_launch_state));
sls->slurmctld_socket_fd = -1;
sls->tasks_requested = layout->task_cnt;
sls->tasks_started = bit_alloc(layout->task_cnt);
sls->tasks_exited = bit_alloc(layout->task_cnt);
sls->node_io_error = bit_alloc(layout->node_cnt);
sls->io_deadline = (time_t *)xmalloc(sizeof(time_t) * layout->node_cnt);
sls->io_timeout_thread_created = false;
sls->io_timeout = 0;
sls->halt_io_test = false;
sls->layout = layout;
sls->resp_port = NULL;
sls->abort = false;
sls->abort_action_taken = false;
/* NOTE: No malloc() of sls->mpi_info required */
sls->mpi_info->jobid = ctx->step_req->job_id;
sls->mpi_info->pack_jobid = NO_VAL;
sls->mpi_info->stepid = ctx->step_resp->job_step_id;
sls->mpi_info->step_layout = layout;
sls->mpi_state = NULL;
slurm_mutex_init(&sls->lock);
slurm_cond_init(&sls->cond, NULL);
for (ii = 0; ii < layout->node_cnt; ii++) {
sls->io_deadline[ii] = (time_t)NO_VAL;
}
return sls;
}
/*
* If a steps size has changed update the launch_state structure for a
* specified step context, "ctx".
*/
void step_launch_state_alter(slurm_step_ctx_t *ctx)
{
struct step_launch_state *sls = ctx->launch_state;
slurm_step_layout_t *layout = ctx->step_resp->step_layout;
int ii;
xassert(sls);
sls->tasks_requested = layout->task_cnt;
sls->tasks_started = bit_realloc(sls->tasks_started, layout->task_cnt);
sls->tasks_exited = bit_realloc(sls->tasks_exited, layout->task_cnt);
sls->node_io_error = bit_realloc(sls->node_io_error, layout->node_cnt);
xrealloc(sls->io_deadline, sizeof(time_t) * layout->node_cnt);
sls->layout = sls->mpi_info->step_layout = layout;
for (ii = 0; ii < layout->node_cnt; ii++) {
sls->io_deadline[ii] = (time_t)NO_VAL;
}
}
/*
* Free the memory associated with the a launch state structure.
*/
void step_launch_state_destroy(struct step_launch_state *sls)
{
/* First undo anything created in step_launch_state_create() */
slurm_mutex_destroy(&sls->lock);
slurm_cond_destroy(&sls->cond);
FREE_NULL_BITMAP(sls->tasks_started);
FREE_NULL_BITMAP(sls->tasks_exited);
FREE_NULL_BITMAP(sls->node_io_error);
xfree(sls->io_deadline);
/* Now clean up anything created by slurm_step_launch() */
if (sls->resp_port != NULL) {
xfree(sls->resp_port);
}
}
/**********************************************************************
* CR functions
**********************************************************************/
/* connect to srun_cr */
static int _connect_srun_cr(char *addr)
{
struct sockaddr_un sa;
unsigned int sa_len;
int fd, rc;
if (!addr) {
error("%s: socket path name is NULL", __func__);
return -1;
}
if (strlen(addr) >= sizeof(sa.sun_path)) {
error("%s: socket path name too long (%s)", __func__, addr);
return -1;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
error("failed creating cr socket: %m");
return -1;
}
memset(&sa, 0, sizeof(sa));
sa.sun_family = AF_UNIX;
strlcpy(sa.sun_path, addr, sizeof(sa.sun_path));
sa_len = strlen(sa.sun_path) + sizeof(sa.sun_family);
while (((rc = connect(fd, (struct sockaddr *)&sa, sa_len)) < 0) &&
(errno == EINTR));
if (rc < 0) {
debug2("failed connecting cr socket: %m");
close(fd);
return -1;
}
return fd;
}
/* send job_id, step_id, node_list to srun_cr */
static int _cr_notify_step_launch(slurm_step_ctx_t *ctx)
{
int fd, len, rc = 0;
char *cr_sock_addr = NULL;
cr_sock_addr = getenv("SLURM_SRUN_CR_SOCKET");
if (cr_sock_addr == NULL) { /* not run under srun_cr */
return 0;
}
if ((fd = _connect_srun_cr(cr_sock_addr)) < 0) {
debug2("failed connecting srun_cr. take it not running under "
"srun_cr.");
return 0;
}
if (write(fd, &ctx->job_id, sizeof(uint32_t)) != sizeof(uint32_t)) {
error("failed writing job_id to srun_cr: %m");
rc = -1;
goto out;
}
if (write(fd, &ctx->step_resp->job_step_id, sizeof(uint32_t)) !=
sizeof(uint32_t)) {
error("failed writing job_step_id to srun_cr: %m");
rc = -1;
goto out;
}
len = strlen(ctx->step_resp->step_layout->node_list);
if (write(fd, &len, sizeof(int)) != sizeof(int)) {
error("failed writing nodelist length to srun_cr: %m");
rc = -1;
goto out;
}
if (write(fd, ctx->step_resp->step_layout->node_list, len + 1) !=
(len + 1)) {
error("failed writing nodelist to srun_cr: %m");
rc = -1;
}
out:
close (fd);
return rc;
}
/**********************************************************************
* Message handler functions
**********************************************************************/
static void *_msg_thr_internal(void *arg)
{
struct step_launch_state *sls = (struct step_launch_state *)arg;
eio_handle_mainloop(sls->msg_handle);
return NULL;
}
static inline int
_estimate_nports(int nclients, int cli_per_port)
{
div_t d;
d = div(nclients, cli_per_port);
return d.rem > 0 ? d.quot + 1 : d.quot;
}
static int _msg_thr_create(struct step_launch_state *sls, int num_nodes)
{
int sock = -1;
uint16_t port;
eio_obj_t *obj;
int i, rc = SLURM_SUCCESS;
uint16_t *ports;
uint16_t eio_timeout;
debug("Entering _msg_thr_create()");
slurm_uid = (uid_t) slurm_get_slurm_user_id();
eio_timeout = slurm_get_srun_eio_timeout();
sls->msg_handle = eio_handle_create(eio_timeout);
sls->num_resp_port = _estimate_nports(num_nodes, 48);
sls->resp_port = xmalloc(sizeof(uint16_t) * sls->num_resp_port);
/* multiple jobs (easily induced via no_alloc) and highly
* parallel jobs using PMI sometimes result in slow message
* responses and timeouts. Raise the default timeout for srun. */
if (!message_socket_ops.timeout)
message_socket_ops.timeout = slurm_get_msg_timeout() * 8000;
ports = slurm_get_srun_port_range();
for (i = 0; i < sls->num_resp_port; i++) {
int cc;
if (ports)
cc = net_stream_listen_ports(&sock, &port, ports, false);
else
cc = net_stream_listen(&sock, &port);
if (cc < 0) {
error("unable to initialize step launch listening "
"socket: %m");
return SLURM_ERROR;
}
sls->resp_port[i] = port;
obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
eio_new_initial_obj(sls->msg_handle, obj);
}
/* finally, add the listening port that we told the slurmctld about
* eariler in the step context creation phase */
if (sls->slurmctld_socket_fd > -1) {
obj = eio_obj_create(sls->slurmctld_socket_fd,
&message_socket_ops, (void *)sls);
eio_new_initial_obj(sls->msg_handle, obj);
}
slurm_thread_create(&sls->msg_thread, _msg_thr_internal, sls);
return rc;
}
static void
_launch_handler(struct step_launch_state *sls, slurm_msg_t *resp)
{
launch_tasks_response_msg_t *msg = resp->data;
int i;
slurm_mutex_lock(&sls->lock);
if ((msg->count_of_pids > 0) &&
bit_test(sls->tasks_started, msg->task_ids[0])) {
debug("%s: duplicate launch response received from node %s",
__func__, msg->node_name);
slurm_mutex_unlock(&sls->lock);
return;
}
if (msg->return_code) {
for (i = 0; i < msg->count_of_pids; i++) {
error("task %u launch failed: %s",
msg->task_ids[i],
slurm_strerror(msg->return_code));
bit_set(sls->tasks_started, msg->task_ids[i]);
bit_set(sls->tasks_exited, msg->task_ids[i]);
}
} else {
for (i = 0; i < msg->count_of_pids; i++)
bit_set(sls->tasks_started, msg->task_ids[i]);
}
if (sls->callback.task_start != NULL)
(sls->callback.task_start)(msg);
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
static void
_exit_handler(struct step_launch_state *sls, slurm_msg_t *exit_msg)
{
task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data;
void (*task_finish)(task_exit_msg_t *);
int i;
if ((msg->job_id != sls->mpi_info->jobid) ||
(msg->step_id != sls->mpi_info->stepid)) {
debug("Received MESSAGE_TASK_EXIT from wrong job: %u.%u",
msg->job_id, msg->step_id);
return;
}
/* Record SIGTERM and SIGKILL termination codes to
* recognize abnormal termination */
if (WIFSIGNALED(msg->return_code)) {
i = WTERMSIG(msg->return_code);
if ((i == SIGKILL) || (i == SIGTERM))
task_exit_signal = i;
}
slurm_mutex_lock(&sls->lock);
task_finish = sls->callback.task_finish;
slurm_mutex_unlock(&sls->lock);
if (task_finish != NULL)
(task_finish)(msg); /* Outside of lock for performance */
slurm_mutex_lock(&sls->lock);
for (i = 0; i < msg->num_tasks; i++) {
debug("task %u done", msg->task_id_list[i]);
bit_set(sls->tasks_exited, msg->task_id_list[i]);
}
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
static void
_job_complete_handler(struct step_launch_state *sls, slurm_msg_t *complete_msg)
{
srun_job_complete_msg_t *step_msg =
(srun_job_complete_msg_t *) complete_msg->data;
if (step_msg->step_id == NO_VAL) {
verbose("Complete job %u received",
step_msg->job_id);
} else {
verbose("Complete job step %u.%u received",
step_msg->job_id, step_msg->step_id);
}
if (sls->callback.step_complete)
(sls->callback.step_complete)(step_msg);
force_terminated_job = true;
slurm_mutex_lock(&sls->lock);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
static void
_timeout_handler(struct step_launch_state *sls, slurm_msg_t *timeout_msg)
{
srun_timeout_msg_t *step_msg =
(srun_timeout_msg_t *) timeout_msg->data;
if (sls->callback.step_timeout)
(sls->callback.step_timeout)(step_msg);
slurm_mutex_lock(&sls->lock);
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
/*
* Take the list of node names of down nodes and convert into an
* array of nodeids for the step. The nodeid array is passed to
* client_io_handler_downnodes to notify the IO handler to expect no
* further IO from that node.
*/
static void
_node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg)
{
srun_node_fail_msg_t *nf = fail_msg->data;
hostset_t fail_nodes, all_nodes;
hostlist_iterator_t fail_itr;
int num_node_ids;
int *node_ids;
int i, j;
int node_id, num_tasks;
error("Node failure on %s", nf->nodelist);
fail_nodes = hostset_create(nf->nodelist);
fail_itr = hostset_iterator_create(fail_nodes);
num_node_ids = hostset_count(fail_nodes);
node_ids = xmalloc(sizeof(int) * num_node_ids);
slurm_mutex_lock(&sls->lock);
all_nodes = hostset_create(sls->layout->node_list);
/* find the index number of each down node */
for (i = 0; i < num_node_ids; i++) {
#ifdef HAVE_FRONT_END
node_id = 0;
#else
char *node = hostlist_next(fail_itr);
node_id = node_ids[i] = hostset_find(all_nodes, node);
if (node_id < 0) {
error( "Internal error: bad SRUN_NODE_FAIL message. "
"Node %s not part of this job step", node);
free(node);
continue;
}
free(node);
#endif
/* find all of the tasks that should run on this node and
* mark them as having started and exited. If they haven't
* started yet, they never will, and likewise for exiting.
*/
num_tasks = sls->layout->tasks[node_id];
for (j = 0; j < num_tasks; j++) {
debug2("marking task %d done on failed node %d",
sls->layout->tids[node_id][j], node_id);
bit_set(sls->tasks_started,
sls->layout->tids[node_id][j]);
bit_set(sls->tasks_exited,
sls->layout->tids[node_id][j]);
}
}
if (!sls->user_managed_io) {
client_io_handler_downnodes(sls->io.normal, node_ids,
num_node_ids);
}
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
xfree(node_ids);
hostlist_iterator_destroy(fail_itr);
hostset_destroy(fail_nodes);
hostset_destroy(all_nodes);
}
/*
* Receive a message when a slurmd cold starts, that the step on that node
* may have died. Verify that tasks on these nodes(s) are still alive,
* and abort the job step if they are not.
* This message could be the result of the slurmd daemon cold-starting
* or a race condition when tasks are starting or terminating.
*/
static void
_step_missing_handler(struct step_launch_state *sls, slurm_msg_t *missing_msg)
{
srun_step_missing_msg_t *step_missing = missing_msg->data;
hostset_t fail_nodes, all_nodes;
hostlist_iterator_t fail_itr;
char *node;
int num_node_ids;
int i, j;
int node_id;
client_io_t *cio = sls->io.normal;
bool test_message_sent;
int num_tasks;
bool active;
debug("Step %u.%u missing from node(s) %s",
step_missing->job_id, step_missing->step_id,
step_missing->nodelist);
/* Ignore this message in the unusual "user_managed_io" case. No way
to confirm a bad connection, since a test message goes straight to
the task. Aborting without checking may be too dangerous. This
choice may cause srun to not exit even though the job step has
ended. */
if (sls->user_managed_io)
return;
slurm_mutex_lock(&sls->lock);
if (!sls->io_timeout_thread_created) {
sls->io_timeout_thread_created = true;
slurm_thread_create(&sls->io_timeout_thread,
_check_io_timeout, sls);
}
fail_nodes = hostset_create(step_missing->nodelist);
fail_itr = hostset_iterator_create(fail_nodes);
num_node_ids = hostset_count(fail_nodes);
all_nodes = hostset_create(sls->layout->node_list);
for (i = 0; i < num_node_ids; i++) {
node = hostlist_next(fail_itr);
node_id = hostset_find(all_nodes, node);
if (node_id < 0) {
error("Internal error: bad SRUN_STEP_MISSING message. "
"Node %s not part of this job step", node);
free(node);
continue;
}
free(node);
/*
* If all tasks for this node have either not started or already
* exited, ignore the missing step message for this node.
*/
num_tasks = sls->layout->tasks[node_id];
active = false;
for (j = 0; j < num_tasks; j++) {
if (bit_test(sls->tasks_started,
sls->layout->tids[node_id][j]) &&
!bit_test(sls->tasks_exited,
sls->layout->tids[node_id][j])) {
active = true;
break;
}
}
if (!active)
continue;
/* If this is true, an I/O error has already occurred on the
* stepd for the current node, and the job should abort */
if (bit_test(sls->node_io_error, node_id)) {
error("Aborting, step missing and io error on node %d",
node_id);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
break;
}
/*
* A test is already is progress. Ignore message for this node.
*/
if (sls->io_deadline[node_id] != NO_VAL) {
debug("Test in progress for node %d, ignoring message",
node_id);
continue;
}
sls->io_deadline[node_id] = time(NULL) + sls->io_timeout;
debug("Testing connection to node %d", node_id);
if (client_io_handler_send_test_message(cio, node_id,
&test_message_sent)) {
/*
* If unable to test a connection, assume the step
* is having problems and abort. If unable to test,
* the system is probably having serious problems, so
* aborting the step seems reasonable.
*/
error("Aborting, can not test connection to node %d.",
node_id);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
break;
}
/*
* test_message_sent should be true unless this node either
* hasn't started or already finished. Poke the io_timeout
* thread to make sure it will abort the job if the deadline
* for receiving a response passes.
*/
if (test_message_sent) {
slurm_cond_broadcast(&sls->cond);
} else {
sls->io_deadline[node_id] = (time_t)NO_VAL;
}
}
slurm_mutex_unlock(&sls->lock);
hostlist_iterator_destroy(fail_itr);
hostset_destroy(fail_nodes);
hostset_destroy(all_nodes);
}
/* This RPC typically used to send a signal an external program that
* is usually wrapped by srun.
*/
static void
_step_step_signal(struct step_launch_state *sls, slurm_msg_t *signal_msg)
{
job_step_kill_msg_t *step_signal = signal_msg->data;
debug2("Signal %u requested for step %u.%u", step_signal->signal,
step_signal->job_id, step_signal->job_step_id);
if (sls->callback.step_signal)
(sls->callback.step_signal)(step_signal->signal);
}
/*
* The TCP connection that was used to send the task_spawn_io_msg_t message
* will be used as the user managed IO stream. The remote end of the TCP stream
* will be connected to the stdin, stdout, and stderr of the task. The
* local end of the stream is stored in the user_managed_io_t structure, and
* is left to the user to manage (the user can retrieve the array of
* socket descriptors using slurm_step_ctx_get()).
*
* To allow the message TCP stream to be reused for spawn IO traffic we
* set the slurm_msg_t's conn_fd to -1 to avoid having the caller close the
* TCP stream.
*/
static void
_task_user_managed_io_handler(struct step_launch_state *sls,
slurm_msg_t *user_io_msg)
{
task_user_managed_io_msg_t *msg =
(task_user_managed_io_msg_t *) user_io_msg->data;
slurm_mutex_lock(&sls->lock);
debug("task %d user managed io stream established", msg->task_id);
/* sanity check */
if (msg->task_id >= sls->tasks_requested) {
error("_task_user_managed_io_handler:"
" bad task ID %u (of %d tasks)",
msg->task_id, sls->tasks_requested);
}
sls->io.user->connected++;
fd_set_blocking(user_io_msg->conn_fd);
sls->io.user->sockets[msg->task_id] = user_io_msg->conn_fd;
/* prevent the caller from closing the user managed IO stream */
user_io_msg->conn_fd = -1;
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
}
/*
* Identify the incoming message and call the appropriate handler function.
*/
static void
_handle_msg(void *arg, slurm_msg_t *msg)
{
char *auth_info = slurm_get_auth_info();
struct step_launch_state *sls = (struct step_launch_state *)arg;
uid_t req_uid;
uid_t uid = getuid();
srun_user_msg_t *um;
int rc;
req_uid = g_slurm_auth_get_uid(msg->auth_cred, auth_info);
xfree(auth_info);
if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) {
error ("Security violation, slurm message from uid %u",
(unsigned int) req_uid);
return;
}
switch (msg->msg_type) {
case RESPONSE_LAUNCH_TASKS:
debug2("received task launch");
_launch_handler(sls, msg);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
break;
case MESSAGE_TASK_EXIT:
debug2("received task exit");
_exit_handler(sls, msg);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
break;
case SRUN_PING:
debug3("slurmctld ping received");
slurm_send_rc_msg(msg, SLURM_SUCCESS);
break;
case SRUN_EXEC:
_exec_prog(msg);
break;
case SRUN_JOB_COMPLETE:
debug2("received job step complete message");
_job_complete_handler(sls, msg);
break;
case SRUN_TIMEOUT:
debug2("received job step timeout message");
_timeout_handler(sls, msg);
break;
case SRUN_USER_MSG:
um = msg->data;
info("%s", um->msg);
break;
case SRUN_NODE_FAIL:
debug2("received srun node fail");
_node_fail_handler(sls, msg);
break;
case SRUN_STEP_MISSING:
debug2("received notice of missing job step");
_step_missing_handler(sls, msg);
break;
case SRUN_STEP_SIGNAL:
debug2("received step signal RPC");
_step_step_signal(sls, msg);
break;
case PMI_KVS_PUT_REQ:
debug2("PMI_KVS_PUT_REQ received");
rc = pmi_kvs_put((kvs_comm_set_t *) msg->data);
slurm_send_rc_msg(msg, rc);
break;
case PMI_KVS_GET_REQ:
debug2("PMI_KVS_GET_REQ received");
rc = pmi_kvs_get((kvs_get_msg_t *) msg->data);
slurm_send_rc_msg(msg, rc);
break;
case TASK_USER_MANAGED_IO_STREAM:
debug2("TASK_USER_MANAGED_IO_STREAM");
_task_user_managed_io_handler(sls, msg);
break;
default:
error("%s: received spurious message type: %u",
__func__, msg->msg_type);
break;
}
return;
}
/**********************************************************************
* Task launch functions
**********************************************************************/
/* Since the slurmd usually controls the finishing of tasks to the
* controller this needs to happen here if there was a problem with a
* task launch to the slurmd since there will not be cleanup of this
* anywhere else.
*/
static int _fail_step_tasks(slurm_step_ctx_t *ctx, char *node, int ret_code)
{
slurm_msg_t req;
step_complete_msg_t msg;
int rc = -1;
int nodeid = 0;
struct step_launch_state *sls = ctx->launch_state;
#ifndef HAVE_FRONT_END
/* It is always 0 for front end systems */
nodeid = nodelist_find(ctx->step_resp->step_layout->node_list, node);
#endif
slurm_mutex_lock(&sls->lock);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
slurm_mutex_unlock(&sls->lock);
memset(&msg, 0, sizeof(msg));
msg.job_id = ctx->job_id;
msg.job_step_id = ctx->step_resp->job_step_id;
msg.range_first = msg.range_last = nodeid;
msg.step_rc = ret_code;
slurm_msg_t_init(&req);
req.msg_type = REQUEST_STEP_COMPLETE;
req.data = &msg;
if (ctx->step_resp->use_protocol_ver)
req.protocol_version = ctx->step_resp->use_protocol_ver;
if (slurm_send_recv_controller_rc_msg(&req, &rc,
working_cluster_rec) < 0)
return SLURM_ERROR;
return SLURM_SUCCESS;
}
static int _launch_tasks(slurm_step_ctx_t *ctx,
launch_tasks_request_msg_t *launch_msg,
uint32_t timeout, char *nodelist, int start_nodeid)
{
#ifdef HAVE_FRONT_END
slurm_cred_arg_t cred_args;
#endif
slurm_msg_t msg;
List ret_list = NULL;
ListIterator ret_itr;
ret_data_info_t *ret_data = NULL;
int rc = SLURM_SUCCESS;
int tot_rc = SLURM_SUCCESS;
debug("Entering _launch_tasks");
if (ctx->verbose_level) {
char *name = NULL;
hostlist_t hl = hostlist_create(nodelist);
int i = start_nodeid;
while ((name = hostlist_shift(hl))) {
_print_launch_msg(launch_msg, name, i++);
free(name);
}
hostlist_destroy(hl);
}
/*
* Extend timeout based upon BatchStartTime to permit for a long
* running Prolog
*/
if (timeout <= 0) {
timeout = (slurm_get_msg_timeout() +
slurm_get_batch_start_timeout()) * 1000;
}
slurm_msg_t_init(&msg);
msg.msg_type = REQUEST_LAUNCH_TASKS;
msg.data = launch_msg;
if (ctx->step_resp->use_protocol_ver)
msg.protocol_version = ctx->step_resp->use_protocol_ver;
#ifdef HAVE_FRONT_END
slurm_cred_get_args(ctx->step_resp->cred, &cred_args);
//info("hostlist=%s", cred_args.step_hostlist);
ret_list = slurm_send_recv_msgs(cred_args.step_hostlist, &msg, timeout,
false);
slurm_cred_free_args(&cred_args);
#else
ret_list = slurm_send_recv_msgs(nodelist,
&msg, timeout, false);
#endif
if (ret_list == NULL) {
error("slurm_send_recv_msgs failed miserably: %m");
return SLURM_ERROR;
}
ret_itr = list_iterator_create(ret_list);
while ((ret_data = list_next(ret_itr))) {
rc = slurm_get_return_code(ret_data->type,
ret_data->data);
debug("launch returned msg_rc=%d err=%d type=%d",
rc, ret_data->err, ret_data->type);
if (rc != SLURM_SUCCESS) {
if (ret_data->err)
tot_rc = ret_data->err;
else
tot_rc = rc;
_fail_step_tasks(ctx, ret_data->node_name, tot_rc);
errno = tot_rc;
tot_rc = SLURM_ERROR;
error("Task launch for %u.%u failed on "
"node %s: %m",
ctx->job_id, ctx->step_resp->job_step_id,
ret_data->node_name);
} else {
#if 0 /* only for debugging, might want to make this a callback */
errno = ret_data->err;
info("Launch success on node %s",
ret_data->node_name);
#endif
}
}
list_iterator_destroy(ret_itr);
FREE_NULL_LIST(ret_list);
if (tot_rc != SLURM_SUCCESS)
return tot_rc;
return rc;
}
/* returns an xmalloc cwd string, or NULL if lookup failed. */
static char *_lookup_cwd(void)
{
char buf[PATH_MAX];
if (getcwd(buf, PATH_MAX) != NULL) {
return xstrdup(buf);
} else {
return NULL;
}
}
static void _print_launch_msg(launch_tasks_request_msg_t *msg,
char *hostname, int nodeid)
{
int i;
char tmp_str[10], *task_list = NULL;
hostlist_t hl = hostlist_create(NULL);
for (i=0; i<msg->tasks_to_launch[nodeid]; i++) {
sprintf(tmp_str, "%u", msg->global_task_ids[nodeid][i]);
hostlist_push_host(hl, tmp_str);
}
task_list = hostlist_ranged_string_xmalloc(hl);
hostlist_destroy(hl);
info("launching %u.%u on host %s, %u tasks: %s",
msg->job_id, msg->job_step_id, hostname,
msg->tasks_to_launch[nodeid], task_list);
xfree(task_list);
debug3("uid:%ld gid:%ld cwd:%s %d", (long) msg->uid,
(long) msg->gid, msg->cwd, nodeid);
}
void record_ppid(void)
{
srun_ppid = getppid();
}
/* This is used to initiate an OpenMPI checkpoint program,
* but is written to be general purpose */
static void
_exec_prog(slurm_msg_t *msg)
{
pid_t child;
int pfd[2], status, exit_code = 0, i;
ssize_t len;
char *argv[4], buf[256] = "";
time_t now = time(NULL);
bool checkpoint = false;
srun_exec_msg_t *exec_msg = msg->data;
if ((exec_msg->argc < 1) || (exec_msg->argv == NULL) ||
(exec_msg->argv[0] == NULL)) {
error("%s: called with no command to execute", __func__);
return;
} else if (exec_msg->argc > 2) {
verbose("Exec '%s %s' for %u.%u",
exec_msg->argv[0], exec_msg->argv[1],
exec_msg->job_id, exec_msg->step_id);
} else {
verbose("Exec '%s' for %u.%u",
exec_msg->argv[0],
exec_msg->job_id, exec_msg->step_id);
}
if (xstrcmp(exec_msg->argv[0], "ompi-checkpoint") == 0) {
if (srun_ppid)
checkpoint = true;
else {
error("Can not create checkpoint, no srun_ppid set");
exit_code = EINVAL;
goto fini;
}
}
if (checkpoint) {
/* OpenMPI specific checkpoint support */
info("Checkpoint started at %s", slurm_ctime2(&now));
for (i=0; (exec_msg->argv[i] && (i<2)); i++) {
argv[i] = exec_msg->argv[i];
}
snprintf(buf, sizeof(buf), "%ld", (long) srun_ppid);
argv[i] = buf;
argv[i+1] = NULL;
}
if (pipe(pfd) == -1) {
snprintf(buf, sizeof(buf), "pipe: %s", strerror(errno));
error("%s", buf);
exit_code = errno;
goto fini;
}
child = fork();
if (child == 0) {
int fd = open("/dev/null", O_RDONLY);
if (fd < 0) {
error("%s: can not open /dev/null", __func__);
exit(1);
}
dup2(fd, 0); /* stdin from /dev/null */
dup2(pfd[1], 1); /* stdout to pipe */
dup2(pfd[1], 2); /* stderr to pipe */
close(pfd[0]);
close(pfd[1]);
if (checkpoint)
execvp(exec_msg->argv[0], argv);
else
execvp(exec_msg->argv[0], exec_msg->argv);
error("execvp(%s): %m", exec_msg->argv[0]);
} else if (child < 0) {
snprintf(buf, sizeof(buf), "fork: %s", strerror(errno));
error("%s", buf);
exit_code = errno;
goto fini;
} else {
close(pfd[1]);
len = read(pfd[0], buf, sizeof(buf));
if (len >= 1)
close(pfd[0]);
waitpid(child, &status, 0);
exit_code = WEXITSTATUS(status);
}
fini: if (checkpoint) {
now = time(NULL);
if (exit_code) {
info("Checkpoint completion code %d at %s",
exit_code, slurm_ctime2(&now));
} else {
info("Checkpoint completed successfully at %s",
slurm_ctime2(&now));
}
if (buf[0])
info("Checkpoint location: %s", buf);
slurm_checkpoint_complete(exec_msg->job_id, exec_msg->step_id,
time(NULL), (uint32_t) exit_code, buf);
}
}
/*
* Notify the step_launch_state that an I/O connection went bad.
* If the node is suspected to be down, abort the job.
*/
int
step_launch_notify_io_failure(step_launch_state_t *sls, int node_id)
{
slurm_mutex_lock(&sls->lock);
bit_set(sls->node_io_error, node_id);
debug("IO error on node %d", node_id);
/*
* sls->io_deadline[node_id] != (time_t)NO_VAL means that
* the _step_missing_handler was called on this node.
*/
if (sls->io_deadline[node_id] != (time_t)NO_VAL) {
error("Aborting, io error and missing step on node %d",
node_id);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
} else {
/* FIXME
* If stepd dies or we see I/O error with stepd.
* Do not abort the whole job but collect all
* taks on the node just like if they exited.
*
* Keep supporting 'srun -N x --pty bash'
*/
if (getenv("SLURM_PTY_PORT") == NULL) {
error("%s: aborting, io error with slurmstepd on node %d",
__func__, node_id);
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
}
}
slurm_mutex_unlock(&sls->lock);
return SLURM_SUCCESS;
}
/*
* This is called 1) after a node connects for the first time and 2) when
* a message comes in confirming that a connection is okay.
*
* Just in case the node was marked questionable very early in the
* job step setup, clear this flag if/when the node makes its initial
* connection.
*/
int
step_launch_clear_questionable_state(step_launch_state_t *sls, int node_id)
{
slurm_mutex_lock(&sls->lock);
sls->io_deadline[node_id] = (time_t)NO_VAL;
slurm_mutex_unlock(&sls->lock);
return SLURM_SUCCESS;
}
static void *
_check_io_timeout(void *_sls)
{
int ii;
time_t now, next_deadline;
struct timespec ts = {0, 0};
step_launch_state_t *sls = (step_launch_state_t *)_sls;
slurm_mutex_lock(&sls->lock);
while (1) {
if (sls->halt_io_test || sls->abort)
break;
now = time(NULL);
next_deadline = (time_t)NO_VAL;
for (ii = 0; ii < sls->layout->node_cnt; ii++) {
if (sls->io_deadline[ii] == (time_t)NO_VAL)
continue;
if (sls->io_deadline[ii] <= now) {
sls->abort = true;
slurm_cond_broadcast(&sls->cond);
error( "Cannot communicate with node %d. "
"Aborting job.", ii);
break;
} else if (next_deadline == (time_t)NO_VAL ||
sls->io_deadline[ii] < next_deadline) {
next_deadline = sls->io_deadline[ii];
}
}
if (sls->abort)
break;
if (next_deadline == (time_t)NO_VAL) {
debug("io timeout thread: no pending deadlines, "
"sleeping indefinitely");
slurm_cond_wait(&sls->cond, &sls->lock);
} else {
debug("io timeout thread: sleeping %lds until deadline",
(long)(next_deadline - time(NULL)));
ts.tv_sec = next_deadline;
slurm_cond_timedwait(&sls->cond, &sls->lock, &ts);
}
}
slurm_mutex_unlock(&sls->lock);
return NULL;
}