blob: 38a19f376360b947f8d56b66dc6cc1563c233273 [file] [log] [blame]
/*****************************************************************************\
* launch.c - Define job launch plugin functions.
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <stdlib.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/stat.h>
#include "src/srun/allocate.h"
#include "src/srun/fname.h"
#include "src/srun/launch.h"
#include "src/srun/multi_prog.h"
#include "src/srun/task_state.h"
#include "src/api/pmi_server.h"
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/net.h"
#include "src/common/xstring.h"
#include "src/common/plugin.h"
#include "src/common/plugrack.h"
#include "src/common/proc_args.h"
#include "src/common/slurm_resource_info.h"
#include "src/common/tres_bind.h"
#include "src/common/tres_frequency.h"
#include "src/common/xsignal.h"
#include "src/interfaces/gres.h"
#ifndef OPEN_MPI_PORT_ERROR
/* This exit code indicates the launched Open MPI tasks could
* not open the reserved port. It was already open by some
* other process. */
#define OPEN_MPI_PORT_ERROR 108
#endif
#define MAX_STEP_RETRIES 4
static list_t *local_job_list = NULL;
static uint32_t *local_global_rc = NULL;
static pthread_mutex_t launch_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t het_job_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
static slurm_opt_t *opt_save = NULL;
static list_t *task_state_list = NULL;
static time_t launch_start_time;
static bool retry_step_begin = false;
static int retry_step_cnt = 0;
extern char **environ;
static int _step_signal(int signal)
{
srun_job_t *my_srun_job;
list_itr_t *iter;
int rc = SLURM_SUCCESS, rc2;
if (!local_job_list) {
debug("%s: local_job_list does not exist yet", __func__);
return SLURM_ERROR;
}
iter = list_iterator_create(local_job_list);
while ((my_srun_job = (srun_job_t *) list_next(iter))) {
info("Terminating %ps", &my_srun_job->step_id);
rc2 = slurm_kill_job_step(my_srun_job->step_id.job_id,
my_srun_job->step_id.step_id, signal,
0);
if (rc2)
rc = rc2;
}
list_iterator_destroy(iter);
return rc;
}
static char *_hostset_to_string(hostset_t *hs)
{
size_t n = 1024;
size_t maxsize = 1024 * 64;
char *str = NULL;
do {
str = xrealloc(str, n);
} while ((hostset_ranged_string(hs, n*=2, str) < 0) && (n < maxsize));
/*
* If string was truncated, indicate this with a '+' suffix.
*/
if (n >= maxsize)
strcpy(str + (maxsize - 2), "+");
return str;
}
/*
* Convert an array of task IDs into a list of host names
* RET: the string, caller must xfree() this value
*/
static char *_task_ids_to_host_list(int ntasks, uint32_t *taskids,
srun_job_t *my_srun_job)
{
int i, task_cnt = 0;
hostset_t *hs;
char *hosts;
slurm_step_layout_t *sl;
if ((sl = launch_common_get_slurm_step_layout(my_srun_job)) == NULL)
return (xstrdup("Unknown"));
/*
* If overhead of determining the hostlist is too high then srun
* communications will timeout and fail, so return "Unknown" instead.
*
* See slurm_step_layout_host_id() in src/common/slurm_step_layout.c
* for details.
*/
for (i = 0; i < sl->node_cnt; i++) {
task_cnt += sl->tasks[i];
}
if (task_cnt > 100000)
return (xstrdup("Unknown"));
hs = hostset_create(NULL);
for (i = 0; i < ntasks; i++) {
char *host = slurm_step_layout_host_name(sl, taskids[i]);
if (host) {
hostset_insert(hs, host);
free(host);
} else {
error("Could not identify host name for task %u",
taskids[i]);
}
}
hosts = _hostset_to_string(hs);
hostset_destroy(hs);
return (hosts);
}
/*
* Convert an array of task IDs into a string.
* RET: the string, caller must xfree() this value
* NOTE: the taskids array is not necessarily in numeric order,
* so we use existing bitmap functions to format
*/
static char *_task_array_to_string(int ntasks, uint32_t *taskids,
srun_job_t *my_srun_job)
{
bitstr_t *tasks_bitmap = NULL;
char *str;
int i;
tasks_bitmap = bit_alloc(my_srun_job->ntasks);
if (!tasks_bitmap) {
error("bit_alloc: memory allocation failure");
exit(error_exit);
}
for (i = 0; i < ntasks; i++)
bit_set(tasks_bitmap, taskids[i]);
str = xmalloc(2048);
bit_fmt(str, 2048, tasks_bitmap);
FREE_NULL_BITMAP(tasks_bitmap);
return str;
}
static void _update_task_exit_state(task_state_t *task_state, uint32_t ntasks,
uint32_t *taskids, int abnormal)
{
int i;
task_state_type_t t = abnormal ? TS_ABNORMAL_EXIT : TS_NORMAL_EXIT;
for (i = 0; i < ntasks; i++)
task_state_update(task_state, taskids[i], t);
}
static int _kill_on_bad_exit(void)
{
xassert(opt_save->srun_opt);
if (!opt_save || (opt_save->srun_opt->kill_bad_exit == NO_VAL))
return slurm_conf.kill_on_bad_exit;
return opt_save->srun_opt->kill_bad_exit;
}
static void _setup_max_wait_timer(void)
{
xassert(opt_save->srun_opt);
/*
* If these are the first tasks to finish we need to
* start a timer to kill off the job step if the other
* tasks don't finish within opt_save->srun_opt->max_wait seconds.
*/
verbose("First task exited. Terminating job in %ds",
opt_save->srun_opt->max_wait);
srun_max_timer = true;
alarm(opt_save->srun_opt->max_wait);
}
static const char *_taskstr(int n)
{
if (n == 1)
return "task";
else
return "tasks";
}
static int _is_openmpi_port_error(int errcode)
{
if (errcode != OPEN_MPI_PORT_ERROR)
return 0;
if (opt_save && (opt_save->resv_port_cnt == NO_VAL))
return 0;
if (difftime(time(NULL), launch_start_time) > slurm_conf.msg_timeout)
return 0;
return 1;
}
static void
_handle_openmpi_port_error(const char *tasks, const char *hosts,
slurm_step_ctx_t *step_ctx)
{
slurm_step_id_t step_id = step_ctx->step_req->step_id;
char *msg = "retrying";
if (!retry_step_begin) {
retry_step_begin = true;
retry_step_cnt++;
}
if (retry_step_cnt >= MAX_STEP_RETRIES)
msg = "aborting";
error("%s: tasks %s unable to claim reserved port, %s.",
hosts, tasks, msg);
info("Terminating job step %ps", &step_id);
slurm_kill_job_step(step_id.job_id, step_id.step_id, SIGKILL, 0);
}
static char *_mpir_get_host_name(char *node_name)
{
if ((xstrcasestr(slurm_conf.launch_params, "mpir_use_nodeaddr")))
return slurm_conf_get_nodeaddr(node_name);
return xstrdup(node_name);
}
static void _task_start(launch_tasks_response_msg_t *msg)
{
MPIR_PROCDESC *table;
uint32_t local_task_id, global_task_id;
int i;
task_state_t *task_state;
if (msg->count_of_pids) {
verbose("Node %s, %d tasks started",
msg->node_name, msg->count_of_pids);
} else {
/*
* This message should be displayed through the API,
* hence it is a debug2() instead of error().
*/
debug2("No tasks started on node %s: %s",
msg->node_name, slurm_strerror(msg->return_code));
}
task_state = task_state_find(&msg->step_id, task_state_list);
if (!task_state) {
error("%s: Could not locate task state for %ps",
__func__, &msg->step_id);
}
for (i = 0; i < msg->count_of_pids; i++) {
local_task_id = msg->task_ids[i];
global_task_id = task_state_global_id(task_state,local_task_id);
if (global_task_id >= MPIR_proctable_size) {
error("%s: task_id too large (%u >= %d)", __func__,
global_task_id, MPIR_proctable_size);
continue;
}
table = &MPIR_proctable[global_task_id];
table->host_name = _mpir_get_host_name(msg->node_name);
/* table->executable_name set in mpir_set_executable_names() */
table->pid = msg->local_pids[i];
if (!task_state) {
error("%s: Could not update task state for task ID %u",
__func__, global_task_id);
} else if (msg->return_code == 0) {
task_state_update(task_state, local_task_id,
TS_START_SUCCESS);
} else {
task_state_update(task_state, local_task_id,
TS_START_FAILURE);
}
}
}
static int _find_step(void *object, void *key)
{
srun_job_t *srun_job = (srun_job_t *)object;
slurm_step_id_t *step_id = (slurm_step_id_t *)key;
return verify_step_id(&srun_job->step_id, step_id);
}
/*
* Find the task_state structure for a given job_id, step_id and/or het group
* on a list. Specify values of NO_VAL for values that are not to be matched
* Returns NULL if not found
*/
static srun_job_t *_find_srun_job(slurm_step_id_t *step_id)
{
if (!local_job_list)
return NULL;
return list_find_first(local_job_list, _find_step, step_id);
}
static void _task_finish(task_exit_msg_t *msg)
{
char *tasks = NULL, *hosts = NULL;
bool build_task_string = false;
uint32_t rc = 0;
int normal_exit = 0;
static int reduce_task_exit_msg = -1;
static int msg_printed = 0, oom_printed = 0, last_task_exit_rc;
task_state_t *task_state;
const char *task_str = _taskstr(msg->num_tasks);
srun_job_t *my_srun_job = _find_srun_job(&msg->step_id);
if (!my_srun_job) {
error("Ignoring exit message from unrecognized %ps",
&msg->step_id);
return;
}
if (reduce_task_exit_msg == -1) {
char *ptr = getenv("SLURM_SRUN_REDUCE_TASK_EXIT_MSG");
if (ptr && atoi(ptr) != 0)
reduce_task_exit_msg = 1;
else
reduce_task_exit_msg = 0;
}
verbose("Received task exit notification for %d %s of %ps (status=0x%04x).",
msg->num_tasks, task_str, &msg->step_id, msg->return_code);
/*
* Only build the "tasks" and "hosts" strings as needed.
* Building them can take multiple milliseconds
*/
if (((msg->return_code & 0xff) == SIG_OOM) && !oom_printed) {
build_task_string = true;
} else if (WIFEXITED(msg->return_code)) {
if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
if (get_log_level() >= LOG_LEVEL_VERBOSE)
build_task_string = true;
} else {
build_task_string = true;
}
} else if (WIFSIGNALED(msg->return_code)) {
if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
if (get_log_level() >= LOG_LEVEL_VERBOSE)
build_task_string = true;
} else {
build_task_string = true;
}
}
if (build_task_string) {
tasks = _task_array_to_string(msg->num_tasks,
msg->task_id_list, my_srun_job);
hosts = _task_ids_to_host_list(msg->num_tasks,
msg->task_id_list, my_srun_job);
}
slurm_mutex_lock(&launch_lock);
if ((msg->return_code & 0xff) == SIG_OOM) {
if (!oom_printed)
error("%s: %s %s: Out Of Memory", hosts, task_str,
tasks);
oom_printed = 1;
*local_global_rc = msg->return_code;
} else if (WIFEXITED(msg->return_code)) {
if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
verbose("%s: %s %s: Completed", hosts, task_str, tasks);
normal_exit = 1;
} else if (_is_openmpi_port_error(rc)) {
_handle_openmpi_port_error(tasks, hosts,
my_srun_job->step_ctx);
} else if ((reduce_task_exit_msg == 0) ||
(msg_printed == 0) ||
(msg->return_code != last_task_exit_rc)) {
error("%s: %s %s: Exited with exit code %d",
hosts, task_str, tasks, rc);
msg_printed = 1;
}
if (((*local_global_rc & 0xff) != SIG_OOM) &&
(!WIFSIGNALED(*local_global_rc)) &&
(!WIFEXITED(*local_global_rc) ||
(rc > WEXITSTATUS(*local_global_rc))))
*local_global_rc = msg->return_code;
} else if (WIFSIGNALED(msg->return_code)) {
const char *signal_str = strsignal(WTERMSIG(msg->return_code));
char *core_str = "";
#ifdef WCOREDUMP
if (WCOREDUMP(msg->return_code))
core_str = " (core dumped)";
#endif
if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
verbose("%s: %s %s: %s%s",
hosts, task_str, tasks, signal_str, core_str);
} else if ((reduce_task_exit_msg == 0) ||
(msg_printed == 0) ||
(msg->return_code != last_task_exit_rc)) {
error("%s: %s %s: %s%s",
hosts, task_str, tasks, signal_str, core_str);
msg_printed = 1;
}
/*
* Even though lower numbered signals can be stronger than
* higher numbered signals, keep the highest signal so that it's
* predicatable to the user.
*/
rc = WTERMSIG(msg->return_code);
if (((*local_global_rc & 0xff) != SIG_OOM) &&
(!WIFSIGNALED(*local_global_rc) ||
(rc > WTERMSIG(*local_global_rc))))
*local_global_rc = msg->return_code;
}
xfree(tasks);
xfree(hosts);
task_state = task_state_find(&msg->step_id, task_state_list);
if (task_state) {
_update_task_exit_state(task_state, msg->num_tasks,
msg->task_id_list, !normal_exit);
} else {
error("%s: Could not find task state for %ps", __func__,
&msg->step_id);
}
if (task_state_first_abnormal_exit(task_state_list) &&
_kill_on_bad_exit())
(void) _step_signal(SIG_TERM_KILL);
if (task_state_first_exit(task_state_list) && opt_save &&
(opt_save->srun_opt->max_wait > 0))
_setup_max_wait_timer();
last_task_exit_rc = msg->return_code;
slurm_mutex_unlock(&launch_lock);
}
/*
* Load the multi_prog config file into argv, pass the entire file contents
* in order to avoid having to read the file on every node. We could parse
* the information here too for loading the MPIR records for TotalView
*/
static void _load_multi(int *argc, char **argv)
{
int config_fd, data_read = 0, i;
struct stat stat_buf;
char *data_buf;
if ((config_fd = open(argv[0], O_RDONLY)) == -1) {
error("Could not open multi_prog config file %s",
argv[0]);
exit(error_exit);
}
if (fstat(config_fd, &stat_buf) == -1) {
error("Could not stat multi_prog config file %s",
argv[0]);
exit(error_exit);
}
if (stat_buf.st_size > MAX_BATCH_SCRIPT_SIZE) {
error("Multi_prog config file %s is too large",
argv[0]);
exit(error_exit);
}
data_buf = xmalloc(stat_buf.st_size + 1);
while ((i = read(config_fd, &data_buf[data_read], stat_buf.st_size
- data_read)) != 0) {
if (i < 0) {
error("Error reading multi_prog config file %s",
argv[0]);
exit(error_exit);
} else
data_read += i;
}
close(config_fd);
for (i = *argc+1; i > 1; i--)
argv[i] = argv[i-1];
argv[1] = data_buf;
*argc += 1;
}
static int
_is_local_file (fname_t *fname)
{
if (fname->name == NULL)
return 1;
if (fname->taskid != -1)
return 1;
return ((fname->type != IO_PER_TASK) && (fname->type != IO_ONE));
}
static char **_build_user_env(srun_job_t *job, slurm_opt_t *opt_local)
{
char **dest_array = NULL;
char *tmp_env, *tok, *save_ptr = NULL, *eq_ptr, *value;
bool all;
if (!opt_local->export_env) {
all = true;
} else {
all = false;
tmp_env = xstrdup(opt_local->export_env);
tok = find_quote_token(tmp_env, ",", &save_ptr);
while (tok) {
if (xstrcasecmp(tok, "ALL") == 0)
all = true;
if (!xstrcasecmp(tok, "NONE"))
break;
eq_ptr = strchr(tok, '=');
if (eq_ptr) {
eq_ptr[0] = '\0';
value = eq_ptr + 1;
env_array_overwrite(&dest_array, tok, value);
} else {
value = getenv(tok);
if (value) {
env_array_overwrite(&dest_array, tok,
value);
}
}
tok = find_quote_token(NULL, ",", &save_ptr);
}
xfree(tmp_env);
}
if (!job->env)
fatal("%s: job env is NULL", __func__);
else if (all)
env_array_merge(&dest_array, (const char **) job->env);
else
env_array_merge_slurm_spank(&dest_array,
(const char **) job->env);
return dest_array;
}
static void _task_state_del(void *x)
{
task_state_t *task_state = (task_state_t *)x;
task_state_destroy(task_state);
}
/*
* Return only after all hetjob components reach this point (or timeout)
*/
static void _wait_all_het_job_comps_started(slurm_opt_t *opt_local)
{
srun_opt_t *srun_opt = opt_local->srun_opt;
static int start_cnt = 0;
static int total_cnt = -1;
struct timeval now;
struct timespec timeout;
int rc;
xassert(srun_opt);
slurm_mutex_lock(&start_mutex);
if (total_cnt == -1)
total_cnt = srun_opt->het_step_cnt;
start_cnt++;
while (start_cnt < total_cnt) {
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + 10; /* 10 sec delay max */
timeout.tv_nsec = now.tv_usec * 1000;
rc = pthread_cond_timedwait(&start_cond, &start_mutex,
&timeout);
if (rc == ETIMEDOUT)
break;
}
slurm_cond_broadcast(&start_cond);
slurm_mutex_unlock(&start_mutex);
}
extern int location_fini(void)
{
FREE_NULL_LIST(task_state_list);
return SLURM_SUCCESS;
}
extern slurm_step_layout_t *launch_common_get_slurm_step_layout(srun_job_t *job)
{
return (!job || !job->step_ctx) ?
NULL : job->step_ctx->step_resp->step_layout;
}
static int _parse_gpu_request(char *in_str)
{
char *save_ptr = NULL, *tmp_str, *tok, *sep;
int gpus_val = 0;
tmp_str = xstrdup(in_str);
tok = strtok_r(tmp_str, ",", &save_ptr);
while (tok) {
int tmp = 0;
sep = xstrchr(tok, ':');
if (sep)
tmp += atoi(sep + 1);
else
tmp += atoi(tok);
if (tmp > 0)
gpus_val += tmp;
tok = strtok_r(NULL, ",", &save_ptr);
}
xfree(tmp_str);
return gpus_val;
}
static job_step_create_request_msg_t *_create_job_step_create_request(
slurm_opt_t *opt_local, bool use_all_cpus, srun_job_t *job)
{
char *add_tres = NULL, *pos;
srun_opt_t *srun_opt = opt_local->srun_opt;
job_step_create_request_msg_t *step_req = xmalloc(sizeof(*step_req));
list_t *tmp_gres_list = NULL;
int rc;
xassert(srun_opt);
step_req->host = xshort_hostname();
step_req->cpu_freq_min = opt_local->cpu_freq_min;
step_req->cpu_freq_max = opt_local->cpu_freq_max;
step_req->cpu_freq_gov = opt_local->cpu_freq_gov;
if (opt_local->cpus_per_gpu) {
xstrfmtcat(step_req->cpus_per_tres, "gres/gpu:%d",
opt_local->cpus_per_gpu);
/* Like cpus_per_task, imply --exact */
if (srun_opt->whole)
info("Ignoring --whole since --cpus-per-gpu used");
else if (!srun_opt->exact)
verbose("Implicitly setting --exact, because --cpus-per-gpu given.");
srun_opt->exact = true;
}
step_req->exc_nodes = xstrdup(opt_local->exclude);
step_req->features = xstrdup(opt_local->constraint);
if (srun_opt->exclusive)
step_req->flags |= SSF_EXCLUSIVE;
if (srun_opt->overlap_force)
step_req->flags |= SSF_OVERLAP_FORCE;
if (opt_local->overcommit)
step_req->flags |= SSF_OVERCOMMIT;
if (opt_local->no_kill)
step_req->flags |= SSF_NO_KILL;
if (srun_opt->interactive) {
debug("interactive step launch request");
step_req->flags |= SSF_INTERACTIVE;
}
if (srun_opt->external_launcher) {
debug("external launcher step request");
step_req->flags |= SSF_EXT_LAUNCHER;
}
if (opt_local->job_flags & GRES_ALLOW_TASK_SHARING)
step_req->flags |= SSF_GRES_ALLOW_TASK_SHARING;
if (srun_opt->wait_for_children)
step_req->flags |= SSF_WAIT_FOR_CHILDREN;
if (((srun_opt->kill_bad_exit != NO_VAL) && srun_opt->kill_bad_exit) ||
((srun_opt->kill_bad_exit == NO_VAL) &&
slurm_conf.kill_on_bad_exit))
step_req->flags |= SSF_KILL_ON_BAD_EXIT;
if (opt_local->immediate == 1)
step_req->immediate = opt_local->immediate;
step_req->max_nodes = job->nhosts;
if (opt_local->max_nodes &&
(opt_local->max_nodes < step_req->max_nodes))
step_req->max_nodes = opt_local->max_nodes;
if (opt_local->mem_per_gpu != NO_VAL64)
xstrfmtcat(step_req->mem_per_tres, "gres/gpu:%"PRIu64,
opt.mem_per_gpu);
step_req->min_nodes = job->nhosts;
if (opt_local->min_nodes &&
(opt_local->min_nodes < step_req->min_nodes))
step_req->min_nodes = opt_local->min_nodes;
if (opt_local->gres)
add_tres = opt_local->gres;
else
add_tres = getenv("SLURM_STEP_GRES");
/*
* If the number of CPUs was specified (cpus_set==true), then we need to
* set exact = true. Otherwise the step will be allocated the wrong
* number of CPUs (and therefore the wrong amount memory if using
* mem_per_cpu).
*/
if (opt_local->overcommit) {
if (use_all_cpus) /* job allocation created by srun */
step_req->cpu_count = job->cpu_count;
else
step_req->cpu_count = step_req->min_nodes;
} else if (opt_local->cpus_set) {
if (opt_local->ntasks == NO_VAL)
step_req->cpu_count = NO_VAL;
else
step_req->cpu_count = opt_local->ntasks *
opt_local->cpus_per_task;
if (srun_opt->whole)
info("Ignoring --whole since -c/--cpus-per-task used");
else if (!srun_opt->exact)
verbose("Implicitly setting --exact, because -c/--cpus-per-task given.");
srun_opt->exact = true;
} else if (opt_local->cpus_per_gpu) {
if (opt_local->gpus) {
int gpus_per_step;
gpus_per_step = _parse_gpu_request(opt_local->gpus);
step_req->cpu_count = gpus_per_step *
opt_local->cpus_per_gpu;
} else if (opt_local->gpus_per_node) {
int gpus_per_node;
gpus_per_node =
_parse_gpu_request(opt_local->gpus_per_node);
/* Use a minimum value for requested cpus */
step_req->cpu_count = opt_local->min_nodes *
gpus_per_node * opt_local->cpus_per_gpu;
} else if (opt_local->tres_per_task &&
(pos = xstrstr(opt_local->tres_per_task,
"gres/gpu:"))) {
pos += 9; /* Don't include "gres/gpu:" */
step_req->cpu_count = opt_local->ntasks *
_parse_gpu_request(pos) *
opt_local->cpus_per_gpu;
} else if (add_tres) {
int rc = SLURM_SUCCESS;
uint64_t gpus_per_node = 0;
char *save_ptr = NULL;
while (slurm_option_get_tres_per_tres(
opt_local->gres, "gpu",
&gpus_per_node, &save_ptr,
&rc));
if (rc != SLURM_SUCCESS) {
/* Failed to parse, error already logged */
slurm_free_job_step_create_request_msg(
step_req);
return NULL;
}
/*
* Same math as gpus_per_node
*
* If gpus_per_node == 0, then the step did not request
* gpus, but the step may still inherit gpus from the
* job. This math will set requested cpus to zero in
* case gpus_per_node == 0.
*/
step_req->cpu_count = opt_local->min_nodes *
gpus_per_node * opt_local->cpus_per_gpu;
} else if (opt_local->gpus_per_socket) {
/*
* gpus_per_socket is not fully supported for steps and
* does not affect the gpus allocated to the step:
* slurmctld may inherit gres from the job.
*/
}
/*
* If none of these are requested, the step may inherit gres
* from the job.
*/
} else if (opt_local->ntasks_set ||
(opt_local->ntasks_per_tres != NO_VAL) ||
(opt_local->ntasks_per_gpu != NO_VAL)) {
step_req->cpu_count = opt_local->ntasks;
} else if (use_all_cpus) { /* job allocation created by srun */
step_req->cpu_count = job->cpu_count;
} else {
step_req->cpu_count = opt_local->ntasks;
}
if (slurm_option_set_by_cli(opt_local, 'J'))
step_req->name = opt_local->job_name;
else if (srun_opt->cmd_name)
step_req->name = srun_opt->cmd_name;
else
step_req->name = sropt.cmd_name;
step_req->network = xstrdup(opt_local->network);
step_req->node_list = xstrdup(opt_local->nodelist);
if (opt_local->ntasks_per_tres != NO_VAL)
step_req->ntasks_per_tres = opt_local->ntasks_per_tres;
else if (opt_local->ntasks_per_gpu != NO_VAL)
step_req->ntasks_per_tres = opt_local->ntasks_per_gpu;
else
step_req->ntasks_per_tres = NO_VAL16;
step_req->num_tasks = opt_local->ntasks;
if (opt_local->ntasks_per_core != NO_VAL)
step_req->ntasks_per_core = opt_local->ntasks_per_core;
else
step_req->ntasks_per_core = INFINITE16;
if (opt_local->mem_per_cpu != NO_VAL64)
step_req->pn_min_memory = opt_local->mem_per_cpu | MEM_PER_CPU;
else if (opt_local->pn_min_memory != NO_VAL64)
step_req->pn_min_memory = opt_local->pn_min_memory;
step_req->relative = srun_opt->relative;
if (opt_local->resv_port_cnt != NO_VAL) {
step_req->resv_port_cnt = opt_local->resv_port_cnt;
} else {
step_req->resv_port_cnt = NO_VAL16;
}
step_req->srun_pid = (uint32_t) getpid();
step_req->step_het_comp_cnt = opt_local->step_het_comp_cnt;
step_req->step_het_grps = xstrdup(opt_local->step_het_grps);
memcpy(&step_req->step_id, &job->step_id, sizeof(step_req->step_id));
step_req->array_task_id = srun_opt->array_task_id;
step_req->cwd = xstrdup(opt_local->chdir);
step_req->std_err = xstrdup(opt_local->efname);
step_req->std_in = xstrdup(opt_local->ifname);
step_req->std_out = xstrdup(opt_local->ofname);
step_req->submit_line = xstrdup(opt_local->submit_line);
if (opt_local->threads_per_core != NO_VAL) {
step_req->threads_per_core = opt.threads_per_core;
} else
step_req->threads_per_core = NO_VAL16;
if (!opt_local->tres_bind &&
((opt_local->ntasks_per_tres != NO_VAL) ||
(opt_local->ntasks_per_gpu != NO_VAL))) {
/* Implicit single GPU binding with ntasks-per-tres/gpu */
if (opt_local->ntasks_per_tres != NO_VAL)
xstrfmtcat(opt_local->tres_bind, "gres/gpu:single:%d",
opt_local->ntasks_per_tres);
else
xstrfmtcat(opt_local->tres_bind, "gres/gpu:single:%d",
opt_local->ntasks_per_gpu);
}
step_req->tres_per_task = xstrdup(opt_local->tres_per_task);
step_req->tres_bind = xstrdup(opt_local->tres_bind);
step_req->tres_freq = xstrdup(opt_local->tres_freq);
xstrfmtcat(step_req->tres_per_step, "%scpu:%u",
step_req->tres_per_step ? "," : "",
step_req->cpu_count);
xfmt_tres(&step_req->tres_per_step, "gres/gpu", opt_local->gpus);
/* add_tres set from opt_local->gres or environment above */
if (opt_local->gres)
add_tres = opt_local->gres;
else
add_tres = getenv("SLURM_STEP_GRES");
/*
* If --gres=none was requested, we need to send exactly
* tres_per_node="none" to slurmctld to not inherit gres from the job.
* Do not also send --gpus-per-node which could have been set from the
* environment.
*/
if (!add_tres || xstrcasecmp(add_tres, "NONE")) {
xfmt_tres(&step_req->tres_per_node, "gres/gpu",
opt_local->gpus_per_node);
}
if (add_tres) {
if (step_req->tres_per_node) {
xstrfmtcat(step_req->tres_per_node, ",%s", add_tres);
} else
step_req->tres_per_node = xstrdup(add_tres);
}
xfmt_tres(&step_req->tres_per_socket, "gres/gpu",
opt_local->gpus_per_socket);
if (opt_local->cpus_set)
xstrfmtcat(step_req->tres_per_task, "%scpu:%u",
step_req->tres_per_task ? "," : "",
opt_local->cpus_per_task);
if (opt_local->time_limit != NO_VAL)
step_req->time_limit = opt_local->time_limit;
step_req->user_id = opt_local->uid;
step_req->container = xstrdup(opt_local->container);
xfree(step_req->container_id);
step_req->container_id = xstrdup(opt_local->container_id);
rc = gres_step_state_validate(step_req->cpus_per_tres,
step_req->tres_per_step,
step_req->tres_per_node,
step_req->tres_per_socket,
step_req->tres_per_task,
step_req->mem_per_tres,
step_req->ntasks_per_tres,
step_req->min_nodes,
&tmp_gres_list,
job->step_id.job_id,
NO_VAL, &step_req->num_tasks,
&step_req->cpu_count, NULL);
FREE_NULL_LIST(tmp_gres_list);
if (rc) {
error("%s", slurm_strerror(rc));
return NULL;
}
step_req->plane_size = NO_VAL16;
switch (opt_local->distribution & SLURM_DIST_NODESOCKMASK) {
case SLURM_DIST_BLOCK:
case SLURM_DIST_ARBITRARY:
case SLURM_DIST_CYCLIC:
case SLURM_DIST_CYCLIC_CYCLIC:
case SLURM_DIST_CYCLIC_BLOCK:
case SLURM_DIST_BLOCK_CYCLIC:
case SLURM_DIST_BLOCK_BLOCK:
case SLURM_DIST_CYCLIC_CFULL:
case SLURM_DIST_BLOCK_CFULL:
step_req->task_dist = opt_local->distribution;
if (opt_local->ntasks_per_node != NO_VAL)
step_req->plane_size = opt_local->ntasks_per_node;
break;
case SLURM_DIST_PLANE:
step_req->task_dist = SLURM_DIST_PLANE;
step_req->plane_size = opt_local->plane_size;
break;
default:
{
uint16_t base_dist;
/* Leave distribution set to unknown if taskcount <= nodes and
* memory is set to 0. stepmgr will handle the mem=0 case. */
if (!opt_local->mem_per_cpu || !opt_local->pn_min_memory ||
srun_opt->interactive)
base_dist = SLURM_DIST_UNKNOWN;
else
base_dist = (step_req->num_tasks <=
step_req->min_nodes) ?
SLURM_DIST_CYCLIC : SLURM_DIST_BLOCK;
opt_local->distribution &= SLURM_DIST_STATE_FLAGS;
opt_local->distribution |= base_dist;
step_req->task_dist = opt_local->distribution;
if (opt_local->ntasks_per_node != NO_VAL)
step_req->plane_size = opt_local->ntasks_per_node;
break;
}
}
/*
* This must be handled *after* we potentially set srun_opt->exact
* above.
*/
if (!srun_opt->exact)
step_req->flags |= SSF_WHOLE;
return step_req;
}
extern void launch_common_set_stdio_fds(srun_job_t *job,
slurm_step_io_fds_t *cio_fds,
slurm_opt_t *opt_local)
{
bool err_shares_out = false;
int file_flags;
if (opt_local->open_mode == OPEN_MODE_APPEND)
file_flags = O_CREAT|O_WRONLY|O_APPEND;
else if (opt_local->open_mode == OPEN_MODE_TRUNCATE)
file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
else {
slurm_conf_t *conf = slurm_conf_lock();
if (conf->job_file_append)
file_flags = O_CREAT|O_WRONLY|O_APPEND;
else
file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
slurm_conf_unlock();
}
/*
* create stdin file descriptor
*/
if (_is_local_file(job->ifname)) {
if ((job->ifname->name == NULL) ||
(job->ifname->taskid != -1)) {
cio_fds->input.fd = STDIN_FILENO;
} else {
cio_fds->input.fd = open(job->ifname->name, O_RDONLY);
if (cio_fds->input.fd == -1) {
error("Could not open stdin file: %m");
exit(error_exit);
}
}
if (job->ifname->type == IO_ONE) {
cio_fds->input.taskid = job->ifname->taskid;
cio_fds->input.nodeid = slurm_step_layout_host_id(
launch_common_get_slurm_step_layout(job),
job->ifname->taskid);
}
}
/*
* create stdout file descriptor
*/
if (_is_local_file(job->ofname)) {
if ((job->ofname->name == NULL) ||
(job->ofname->taskid != -1)) {
cio_fds->out.fd = STDOUT_FILENO;
} else {
cio_fds->out.fd = open(job->ofname->name,
file_flags, 0644);
if (errno == ENOENT) {
mkdirpath(job->ofname->name, 0755, false);
cio_fds->out.fd = open(job->ofname->name,
file_flags, 0644);
}
if (cio_fds->out.fd == -1) {
error("Could not open stdout file: %m");
exit(error_exit);
}
}
if (job->ofname->name != NULL
&& job->efname->name != NULL
&& !xstrcmp(job->ofname->name, job->efname->name)) {
err_shares_out = true;
}
}
/*
* create separate stderr file descriptor only if stderr is not sharing
* the stdout file descriptor
*/
if (err_shares_out) {
debug3("stdout and stderr sharing a file");
cio_fds->err.fd = cio_fds->out.fd;
cio_fds->err.taskid = cio_fds->out.taskid;
} else if (_is_local_file(job->efname)) {
if ((job->efname->name == NULL) ||
(job->efname->taskid != -1)) {
cio_fds->err.fd = STDERR_FILENO;
} else {
cio_fds->err.fd = open(job->efname->name,
file_flags, 0644);
if (errno == ENOENT) {
mkdirpath(job->efname->name, 0755, false);
cio_fds->err.fd = open(job->efname->name,
file_flags, 0644);
}
if (cio_fds->err.fd == -1) {
error("Could not open stderr file: %m");
exit(error_exit);
}
}
}
}
/*
* Return TRUE if the job step create request should be retried later
* (i.e. the errno set by step_ctx_create_timeout() is recoverable).
*/
extern bool launch_common_step_retry_errno(int rc)
{
if ((rc == EAGAIN) ||
(rc == ESLURM_DISABLED) ||
(rc == ESLURM_INTERCONNECT_BUSY) ||
(rc == ESLURM_NODES_BUSY) ||
(rc == ESLURM_PORTS_BUSY) ||
(rc == SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT))
return true;
return false;
}
extern int launch_g_setup_srun_opt(char **rest, slurm_opt_t *opt_local)
{
srun_opt_t *srun_opt = opt_local->srun_opt;
xassert(srun_opt);
if (srun_opt->debugger_test)
MPIR_being_debugged = 1;
/*
* We need to do +2 here just in case multi-prog is needed
* (we add an extra argv on so just make space for it).
*/
opt_local->argv = xcalloc((opt_local->argc + 2), sizeof(char *));
return SLURM_SUCCESS;
}
extern int launch_g_handle_multi_prog_verify(int command_pos,
slurm_opt_t *opt_local)
{
srun_opt_t *srun_opt = opt_local->srun_opt;
xassert(srun_opt);
if (srun_opt->multi_prog) {
if (opt_local->argc < 1) {
error("configuration file not specified");
exit(error_exit);
}
_load_multi(&opt_local->argc, opt_local->argv);
if (verify_multi_name(opt_local->argv[command_pos], opt_local))
exit(error_exit);
return 1;
} else
return 0;
}
extern int launch_g_create_job_step(srun_job_t *job, bool use_all_cpus,
void (*signal_function)(int),
sig_atomic_t *destroy_job,
slurm_opt_t *opt_local)
{
srun_opt_t *srun_opt = opt_local->srun_opt;
int i, j, rc;
unsigned long step_wait = 0;
uint16_t slurmctld_timeout;
slurm_step_layout_t *step_layout;
job_step_create_request_msg_t *step_req;
xassert(srun_opt);
if (!job) {
error("launch_common_create_job_step: no job given");
return SLURM_ERROR;
}
/* Validate minimum and maximum node counts */
if (opt_local->min_nodes && opt_local->max_nodes &&
(opt_local->min_nodes > opt_local->max_nodes)) {
error ("Minimum node count > maximum node count (%d > %d)",
opt_local->min_nodes, opt_local->max_nodes);
return SLURM_ERROR;
}
if (opt_local->min_nodes && (opt_local->min_nodes > job->nhosts)) {
error ("Minimum node count > allocated node count (%d > %d)",
opt_local->min_nodes, job->nhosts);
return SLURM_ERROR;
}
step_req = _create_job_step_create_request(
opt_local, use_all_cpus, job);
if (!step_req)
return SLURM_ERROR;
if (step_req->array_task_id != NO_VAL)
debug("requesting job %u_%u, user %u, nodes %u including (%s)",
step_req->step_id.job_id, step_req->array_task_id,
step_req->user_id, step_req->min_nodes,
step_req->node_list);
else
debug("requesting job %u, user %u, nodes %u including (%s)",
step_req->step_id.job_id, step_req->user_id,
step_req->min_nodes, step_req->node_list);
debug("cpus %u, tasks %u, name %s, relative %u",
step_req->cpu_count, step_req->num_tasks,
step_req->name, step_req->relative);
for (i = 0; (!(*destroy_job)); i++) {
bool timed_out = false;
if (srun_opt->no_alloc) {
if (step_req->num_tasks == NO_VAL) {
step_req->num_tasks = job->ntasks;
step_req->cpu_count = job->cpu_count;
}
job->step_ctx = step_ctx_create_no_alloc(
step_req, job->step_id.step_id);
} else {
if (opt_local->immediate) {
step_wait = MAX(1, opt_local->immediate -
difftime(time(NULL),
srun_begin_time)) *
1000;
} else {
slurmctld_timeout = MIN(300, MAX(60,
slurm_conf.slurmctld_timeout));
step_wait = ((getpid() % 10) +
slurmctld_timeout) * 1000;
}
job->step_ctx = step_ctx_create_timeout(step_req,
step_wait,
&timed_out);
}
if (job->step_ctx != NULL) {
job->step_ctx->verbose_level = opt_local->verbose;
if (i > 0) {
info("Step created for %ps",
&step_req->step_id);
}
break;
}
rc = errno;
if (((opt_local->immediate != 0) &&
((opt_local->immediate == 1) ||
(difftime(time(NULL), srun_begin_time) >=
opt_local->immediate))) ||
((rc != ESLURM_PROLOG_RUNNING) &&
!launch_common_step_retry_errno(rc))) {
error("Unable to create step for job %u: %m",
step_req->step_id.job_id);
slurm_free_job_step_create_request_msg(step_req);
return SLURM_ERROR;
}
if (i == 0) {
if (rc == ESLURM_PROLOG_RUNNING) {
verbose("Resources allocated for job %u and "
"being configured, please wait",
step_req->step_id.job_id);
} else {
if (timed_out)
info("Job %u step creation temporarily disabled, retrying (%s)",
step_req->step_id.job_id,
slurm_strerror(rc));
else
verbose("Step completed in JobId=%u, retrying",
step_req->step_id.job_id);
}
xsignal_unblock(sig_array);
for (j = 0; sig_array[j]; j++)
xsignal(sig_array[j], signal_function);
} else {
if (rc == ESLURM_PROLOG_RUNNING)
verbose("Job %u step creation still disabled, retrying (%s)",
step_req->step_id.job_id,
slurm_strerror(rc));
else {
if (timed_out)
info("Job %u step creation still disabled, retrying (%s)",
step_req->step_id.job_id,
slurm_strerror(rc));
else
verbose("Step completed in JobId=%u, retrying",
step_req->step_id.job_id);
}
}
if (*destroy_job) {
/* cancelled by signal */
break;
}
}
if (i > 0) {
xsignal_block(sig_array);
if (*destroy_job) {
info("Cancelled pending step for job %u",
step_req->step_id.job_id);
slurm_free_job_step_create_request_msg(step_req);
return SLURM_ERROR;
}
}
job->step_id.job_id = step_req->step_id.job_id;
job->step_id.step_id = step_req->step_id.step_id;
step_layout = launch_common_get_slurm_step_layout(job);
if (!step_layout) {
info("No step_layout given for pending step for job %u",
step_req->step_id.job_id);
slurm_free_job_step_create_request_msg(step_req);
return SLURM_ERROR;
}
fwd_set_alias_addrs(step_layout->alias_addrs);
if (opt_local->cpus_set && (job->cpu_count == NO_VAL))
job->cpu_count = step_layout->task_cnt *
opt_local->cpus_per_task;
if (job->ntasks != step_layout->task_cnt)
job->ntasks = step_layout->task_cnt;
/*
* Number of hosts in job may not have been initialized yet if
* --jobid was used or only SLURM_JOB_ID was set in user env.
* Reset the value here just in case.
*/
job->nhosts = step_layout->node_cnt;
/*
* Recreate filenames which may depend upon step id
*/
job_update_io_fnames(job, opt_local);
/* set the jobid for totalview */
if (!totalview_jobid) {
xstrfmtcat(totalview_jobid, "%u", job->step_id.job_id);
xstrfmtcat(totalview_stepid, "%u", job->step_id.step_id);
}
return SLURM_SUCCESS;
}
extern int launch_g_step_launch(srun_job_t *job, slurm_step_io_fds_t *cio_fds,
uint32_t *global_rc,
slurm_step_launch_callbacks_t *step_callbacks,
slurm_opt_t *opt_local)
{
srun_job_t *local_srun_job;
srun_opt_t *srun_opt = opt_local->srun_opt;
slurm_step_launch_params_t launch_params;
slurm_step_launch_callbacks_t callbacks;
slurm_step_layout_t *layout;
int rc = SLURM_SUCCESS;
task_state_t *task_state;
bool first_launch = false;
char tmp_str[128];
xassert(srun_opt);
slurm_step_launch_params_t_init(&launch_params);
memcpy(&callbacks, step_callbacks, sizeof(callbacks));
task_state = task_state_find(&job->step_id, task_state_list);
if (!task_state) {
task_state = task_state_create(&job->step_id, job->ntasks,
job->het_job_task_offset);
slurm_mutex_lock(&het_job_lock);
if (!local_job_list)
local_job_list = list_create(NULL);
if (!task_state_list)
task_state_list = list_create(_task_state_del);
slurm_mutex_unlock(&het_job_lock);
local_srun_job = job;
local_global_rc = global_rc;
list_append(local_job_list, local_srun_job);
list_append(task_state_list, task_state);
first_launch = true;
} else {
/* Launching extra POE tasks */
task_state_alter(task_state, job->ntasks);
}
launch_params.argc = opt_local->argc;
launch_params.argv = opt_local->argv;
launch_params.multi_prog = srun_opt->multi_prog ? true : false;
launch_params.container = xstrdup(opt_local->container);
launch_params.cwd = opt_local->chdir;
launch_params.slurmd_debug = srun_opt->slurmd_debug;
launch_params.buffered_stdio = !srun_opt->unbuffered;
launch_params.labelio = srun_opt->labelio ? true : false;
launch_params.remote_output_filename = fname_remote_string(job->ofname);
launch_params.remote_input_filename = fname_remote_string(job->ifname);
launch_params.remote_error_filename = fname_remote_string(job->efname);
launch_params.het_job_node_offset = job->het_job_node_offset;
launch_params.het_job_id = job->het_job_id;
launch_params.het_job_nnodes = job->het_job_nnodes;
launch_params.het_job_ntasks = job->het_job_ntasks;
launch_params.het_job_offset = job->het_job_offset;
launch_params.het_job_step_cnt = srun_opt->het_step_cnt;
launch_params.het_job_step_task_cnts = job->het_job_step_task_cnts;
launch_params.het_job_task_offset = job->het_job_task_offset;
launch_params.het_job_task_cnts = job->het_job_task_cnts;
launch_params.het_job_tids = job->het_job_tids;
launch_params.het_job_tid_offsets = job->het_job_tid_offsets;
launch_params.het_job_node_list = job->het_job_node_list;
launch_params.profile = opt_local->profile;
launch_params.task_prolog = srun_opt->task_prolog;
launch_params.task_epilog = srun_opt->task_epilog;
if (!(srun_opt->cpu_bind_type & (~CPU_BIND_VERBOSE)) &&
job->step_ctx->step_resp->def_cpu_bind_type)
srun_opt->cpu_bind_type =
job->step_ctx->step_resp->def_cpu_bind_type |
srun_opt->cpu_bind_type;
if (get_log_level() >= LOG_LEVEL_VERBOSE) {
slurm_sprint_cpu_bind_type(tmp_str, srun_opt->cpu_bind_type);
verbose("CpuBindType=%s", tmp_str);
}
launch_params.cpu_bind = srun_opt->cpu_bind;
launch_params.cpu_bind_type = srun_opt->cpu_bind_type;
launch_params.mem_bind = opt_local->mem_bind;
launch_params.mem_bind_type = opt_local->mem_bind_type;
launch_params.accel_bind_type = srun_opt->accel_bind_type;
launch_params.open_mode = opt_local->open_mode;
if (opt_local->acctg_freq)
launch_params.acctg_freq = opt_local->acctg_freq;
launch_params.pty = srun_opt->pty;
if (opt_local->cpus_set)
launch_params.cpus_per_task = opt_local->cpus_per_task;
else
launch_params.cpus_per_task = 1;
layout = job->step_ctx->step_resp->step_layout;
launch_params.cpt_compact_array = layout->cpt_compact_array;
launch_params.cpt_compact_cnt = layout->cpt_compact_cnt;
launch_params.cpt_compact_reps = layout->cpt_compact_reps;
launch_params.threads_per_core = opt_local->threads_per_core;
launch_params.cpu_freq_min = opt_local->cpu_freq_min;
launch_params.cpu_freq_max = opt_local->cpu_freq_max;
launch_params.cpu_freq_gov = opt_local->cpu_freq_gov;
launch_params.tres_bind = opt_local->tres_bind;
launch_params.tres_freq = opt_local->tres_freq;
launch_params.task_dist = opt_local->distribution;
launch_params.preserve_env = srun_opt->preserve_env;
launch_params.spank_job_env = opt_local->spank_job_env;
launch_params.spank_job_env_size = opt_local->spank_job_env_size;
launch_params.ntasks_per_board = job->ntasks_per_board;
launch_params.ntasks_per_core = job->ntasks_per_core;
launch_params.ntasks_per_tres = job->ntasks_per_tres;
launch_params.ntasks_per_socket = job->ntasks_per_socket;
launch_params.no_alloc = srun_opt->no_alloc;
launch_params.mpi_plugin_name = srun_opt->mpi_type;
launch_params.env = _build_user_env(job, opt_local);
launch_params.tree_width = srun_opt->tree_width;
launch_params.oom_kill_step = opt_local->oom_kill_step;
memcpy(&launch_params.local_fds, cio_fds, sizeof(slurm_step_io_fds_t));
if (MPIR_being_debugged) {
launch_params.parallel_debug = true;
pmi_server_max_threads(1);
} else {
launch_params.parallel_debug = false;
}
/*
* Normally this isn't used, but if an outside process (other
* than srun (poe) is using this logic to launch tasks then we
* can use this to signal the step.
*/
callbacks.task_start = _task_start;
/*
* If poe is using this code with multi-prog it always returns
* 1 for each task which could be confusing since no real
* error happened.
*/
if (!launch_params.multi_prog
|| (!callbacks.step_signal
|| (callbacks.step_signal == launch_g_fwd_signal))) {
callbacks.task_finish = _task_finish;
slurm_mutex_lock(&launch_lock);
if (!opt_save) {
/*
* Save opt_local parameters since _task_finish()
* will lack the values
*/
opt_save = xmalloc(sizeof(slurm_opt_t));
memcpy(opt_save, opt_local, sizeof(slurm_opt_t));
opt_save->srun_opt = xmalloc(sizeof(srun_opt_t));
memcpy(opt_save->srun_opt, srun_opt,
sizeof(srun_opt_t));
}
slurm_mutex_unlock(&launch_lock);
}
update_job_state(job, SRUN_JOB_LAUNCHING);
launch_start_time = time(NULL);
if (first_launch) {
if (slurm_step_launch(job->step_ctx, &launch_params,
&callbacks) != SLURM_SUCCESS) {
rc = errno;
*local_global_rc = errno;
error("Application launch failed: %m");
slurm_step_launch_abort(job->step_ctx);
slurm_step_launch_wait_finish(job->step_ctx);
goto cleanup;
}
} else {
if (slurm_step_launch_add(job->step_ctx, job->step_ctx,
&launch_params, job->nodelist)
!= SLURM_SUCCESS) {
rc = errno;
*local_global_rc = errno;
error("Application launch add failed: %m");
slurm_step_launch_abort(job->step_ctx);
slurm_step_launch_wait_finish(job->step_ctx);
goto cleanup;
}
}
update_job_state(job, SRUN_JOB_STARTING);
if (slurm_step_launch_wait_start(job->step_ctx) == SLURM_SUCCESS) {
update_job_state(job, SRUN_JOB_RUNNING);
/*
* Only set up MPIR structures if the step launched correctly
*/
if (srun_opt->multi_prog) {
mpir_set_multi_name(job->ntasks,
launch_params.argv[0]);
} else {
mpir_set_executable_names(launch_params.argv[0],
job->het_job_task_offset,
job->ntasks);
}
_wait_all_het_job_comps_started(opt_local);
MPIR_debug_state = MPIR_DEBUG_SPAWNED;
if (srun_opt->debugger_test)
mpir_dump_proctable();
else if (srun_opt->parallel_debug)
MPIR_Breakpoint(job);
} else {
info("%ps aborted before step completely launched.",
&job->step_id);
}
cleanup:
return rc;
}
extern int launch_g_step_wait(srun_job_t *job, bool got_alloc,
slurm_opt_t *opt_local)
{
int rc = 0;
slurm_step_launch_wait_finish(job->step_ctx);
if ((MPIR_being_debugged == 0) && retry_step_begin &&
(retry_step_cnt < MAX_STEP_RETRIES) &&
(job->het_job_id == NO_VAL)) { /* Not hetjob step */
retry_step_begin = false;
step_ctx_destroy(job->step_ctx);
if (got_alloc)
rc = create_job_step(job, true, opt_local);
else
rc = create_job_step(job, false, opt_local);
if (rc < 0)
exit(error_exit);
rc = -1;
}
return rc;
}
extern int launch_g_step_terminate(void)
{
return _step_signal(SIGKILL);
}
extern void launch_g_print_status(void)
{
task_state_print(task_state_list, (log_f)slurm_info);
}
extern void launch_g_fwd_signal(int signal)
{
srun_job_t *my_srun_job;
list_itr_t *iter;
if (!local_job_list) {
debug("%s: local_job_list does not exist yet", __func__);
return;
}
iter = list_iterator_create(local_job_list);
while ((my_srun_job = (srun_job_t *) list_next(iter))) {
switch (signal) {
case SIGKILL:
slurm_step_launch_abort(my_srun_job->step_ctx);
break;
default:
slurm_step_launch_fwd_signal(my_srun_job->step_ctx,
signal);
break;
}
}
list_iterator_destroy(iter);
}