blob: b3cfea24a2da9fc6dc0114cd440fe903858e1d33 [file] [log] [blame]
/*****************************************************************************\
* src/slurmd/slurmstepd/slurmstepd.c - SLURM job-step manager.
* $Id$
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Danny Auble <da@llnl.gov>
* and Christopher Morrone <morrone2@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include "src/common/cpu_frequency.h"
#include "src/common/gres.h"
#include "src/common/slurm_jobacct_gather.h"
#include "src/common/slurm_acct_gather_profile.h"
#include "src/common/slurm_rlimits_info.h"
#include "src/common/stepd_api.h"
#include "src/common/switch.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/common/plugstack.h"
#include "src/common/node_select.h"
#include "src/slurmd/common/core_spec_plugin.h"
#include "src/slurmd/common/slurmstepd_init.h"
#include "src/slurmd/common/setproctitle.h"
#include "src/slurmd/common/proctrack.h"
#include "src/slurmd/slurmd/slurmd.h"
#include "src/slurmd/slurmstepd/mgr.h"
#include "src/slurmd/slurmstepd/req.h"
#include "src/slurmd/slurmstepd/slurmstepd.h"
#include "src/slurmd/slurmstepd/slurmstepd_job.h"
static int _init_from_slurmd(int sock, char **argv, slurm_addr_t **_cli,
slurm_addr_t **_self, slurm_msg_t **_msg,
int *_ngids, gid_t **_gids);
static void _dump_user_env(void);
static void _send_ok_to_slurmd(int sock);
static void _send_fail_to_slurmd(int sock);
static stepd_step_rec_t *_step_setup(slurm_addr_t *cli, slurm_addr_t *self,
slurm_msg_t *msg);
#ifdef MEMORY_LEAK_DEBUG
static void _step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc);
#endif
static int process_cmdline (int argc, char *argv[]);
int slurmstepd_blocked_signals[] = {
SIGPIPE, 0
};
/* global variable */
slurmd_conf_t * conf;
extern char ** environ;
int
main (int argc, char *argv[])
{
slurm_addr_t *cli;
slurm_addr_t *self;
slurm_msg_t *msg;
stepd_step_rec_t *job;
int ngids;
gid_t *gids;
int rc = 0;
if (process_cmdline (argc, argv) < 0)
fatal ("Error in slurmstepd command line");
xsignal_block(slurmstepd_blocked_signals);
conf = xmalloc(sizeof(*conf));
conf->argv = &argv;
conf->argc = &argc;
init_setproctitle(argc, argv);
if (slurm_select_init(1) != SLURM_SUCCESS )
fatal( "failed to initialize node selection plugin" );
/* Receive job parameters from the slurmd */
_init_from_slurmd(STDIN_FILENO, argv, &cli, &self, &msg,
&ngids, &gids);
/* Fancy way of closing stdin that keeps STDIN_FILENO from being
* allocated to any random file. The slurmd already opened /dev/null
* on STDERR_FILENO for us. */
dup2(STDERR_FILENO, STDIN_FILENO);
/* Create the stepd_step_rec_t, mostly from info in a
* launch_tasks_request_msg_t or a batch_job_launch_msg_t */
if (!(job = _step_setup(cli, self, msg))) {
_send_fail_to_slurmd(STDOUT_FILENO);
rc = SLURM_FAILURE;
goto ending;
}
job->ngids = ngids;
job->gids = gids;
/* fork handlers cause mutexes on some global data structures
* to be re-initialized after the fork. */
list_install_fork_handlers();
slurm_conf_install_fork_handlers();
/* sets job->msg_handle and job->msgid */
if (msg_thr_create(job) == SLURM_ERROR) {
_send_fail_to_slurmd(STDOUT_FILENO);
rc = SLURM_FAILURE;
goto ending;
}
_send_ok_to_slurmd(STDOUT_FILENO);
/* Fancy way of closing stdout that keeps STDOUT_FILENO from being
* allocated to any random file. The slurmd already opened /dev/null
* on STDERR_FILENO for us. */
dup2(STDERR_FILENO, STDOUT_FILENO);
/* This does most of the stdio setup, then launches all the tasks,
* and blocks until the step is complete */
rc = job_manager(job);
if (job->batch)
batch_finish(job, rc); /* sends batch complete message */
/* signal the message thread to shutdown, and wait for it */
eio_signal_shutdown(job->msg_handle);
pthread_join(job->msgid, NULL);
ending:
#ifdef MEMORY_LEAK_DEBUG
acct_gather_conf_destroy();
(void) core_spec_g_fini();
_step_cleanup(job, msg, rc);
fini_setproctitle();
xfree(cli);
xfree(self);
xfree(conf->hostname);
xfree(conf->block_map);
xfree(conf->block_map_inv);
xfree(conf->spooldir);
xfree(conf->node_name);
xfree(conf->node_topo_addr);
xfree(conf->node_topo_pattern);
xfree(conf->logfile);
xfree(conf);
#endif
info("done with job");
return rc;
}
static slurmd_conf_t * read_slurmd_conf_lite (int fd)
{
int rc;
int len;
Buf buffer;
slurmd_conf_t *confl;
int tmp_int = 0;
/* First check to see if we've already initialized the
* global slurmd_conf_t in 'conf'. Allocate memory if not.
*/
confl = conf ? conf : xmalloc (sizeof (*confl));
safe_read(fd, &len, sizeof(int));
buffer = init_buf(len);
safe_read(fd, buffer->head, len);
rc = unpack_slurmd_conf_lite_no_alloc(confl, buffer);
if (rc == SLURM_ERROR)
fatal("slurmstepd: problem with unpack of slurmd_conf");
free_buf(buffer);
confl->log_opts.stderr_level = confl->debug_level;
confl->log_opts.logfile_level = confl->debug_level;
confl->log_opts.syslog_level = confl->debug_level;
/*
* If daemonizing, turn off stderr logging -- also, if
* logging to a file, turn off syslog.
*
* Otherwise, if remaining in foreground, turn off logging
* to syslog (but keep logfile level)
*/
if (confl->daemonize) {
confl->log_opts.stderr_level = LOG_LEVEL_QUIET;
if (confl->logfile)
confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
} else
confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
confl->acct_freq_task = (uint16_t)NO_VAL;
tmp_int = acct_gather_parse_freq(PROFILE_TASK,
confl->job_acct_gather_freq);
if (tmp_int != -1)
confl->acct_freq_task = tmp_int;
return (confl);
rwfail:
return (NULL);
}
static int get_jobid_uid_from_env (uint32_t *jobidp, uid_t *uidp)
{
const char *val;
char *p;
if (!(val = getenv ("SLURM_JOBID")))
return error ("Unable to get SLURM_JOBID in env!");
*jobidp = (uint32_t) strtoul (val, &p, 10);
if (*p != '\0')
return error ("Invalid SLURM_JOBID=%s", val);
if (!(val = getenv ("SLURM_UID")))
return error ("Unable to get SLURM_UID in env!");
*uidp = (uid_t) strtoul (val, &p, 10);
if (*p != '\0')
return error ("Invalid SLURM_UID=%s", val);
return (0);
}
static int _handle_spank_mode (int argc, char *argv[])
{
char prefix[64] = "spank-";
const char *mode = argv[2];
uid_t uid = (uid_t) -1;
uint32_t jobid = (uint32_t) -1;
log_options_t lopts = LOG_OPTS_INITIALIZER;
/*
* Not necessary to log to syslog
*/
lopts.syslog_level = LOG_LEVEL_QUIET;
/*
* Make our log prefix into spank-prolog: or spank-epilog:
*/
strcat (prefix, mode);
log_init(prefix, lopts, LOG_DAEMON, NULL);
/*
* When we are started from slurmd, a lightweight config is
* sent over the stdin fd. If we are able to read this conf
* use it to reinitialize the log.
* It is not a fatal error if we fail to read the conf file.
* This could happen if slurmstepd is run standalone for
* testing.
*/
if ((conf = read_slurmd_conf_lite (STDIN_FILENO)))
log_alter (conf->log_opts, 0, conf->logfile);
close (STDIN_FILENO);
slurm_conf_init(NULL);
if (get_jobid_uid_from_env (&jobid, &uid) < 0)
return error ("spank environment invalid");
debug("Running spank/%s for jobid [%u] uid [%u]", mode, jobid, uid);
if (strcmp (mode, "prolog") == 0) {
if (spank_job_prolog (jobid, uid) < 0)
return (-1);
}
else if (strcmp (mode, "epilog") == 0) {
if (spank_job_epilog (jobid, uid) < 0)
return (-1);
}
else {
error ("Invalid mode %s specified!", mode);
return (-1);
}
return (0);
}
/*
* Process special "modes" of slurmstepd passed as cmdline arguments.
*/
static int process_cmdline (int argc, char *argv[])
{
if ((argc == 2) && (strcmp(argv[1], "getenv") == 0)) {
print_rlimits();
_dump_user_env();
exit(0);
}
if ((argc == 3) && (strcmp(argv[1], "spank") == 0)) {
if (_handle_spank_mode(argc, argv) < 0)
exit (1);
exit (0);
}
return (0);
}
static void
_send_ok_to_slurmd(int sock)
{
/* If running under valgrind/memcheck, this pipe doesn't work correctly
* so just skip it. */
#ifndef SLURMSTEPD_MEMCHECK
int ok = SLURM_SUCCESS;
safe_write(sock, &ok, sizeof(int));
return;
rwfail:
error("Unable to send \"ok\" to slurmd");
#endif
}
static void
_send_fail_to_slurmd(int sock)
{
/* If running under valgrind/memcheck, this pipe doesn't work correctly
* so just skip it. */
#ifndef SLURMSTEPD_MEMCHECK
int fail = SLURM_FAILURE;
if (errno)
fail = errno;
safe_write(sock, &fail, sizeof(int));
return;
rwfail:
error("Unable to send \"fail\" to slurmd");
#endif
}
/*
* This function handles the initialization information from slurmd
* sent by _send_slurmstepd_init() in src/slurmd/slurmd/req.c.
*/
static int
_init_from_slurmd(int sock, char **argv,
slurm_addr_t **_cli, slurm_addr_t **_self, slurm_msg_t **_msg,
int *_ngids, gid_t **_gids)
{
char *incoming_buffer = NULL;
Buf buffer;
int step_type;
int len;
slurm_addr_t *cli = NULL;
slurm_addr_t *self = NULL;
slurm_msg_t *msg = NULL;
int ngids = 0;
gid_t *gids = NULL;
uint16_t port;
char buf[16];
log_options_t lopts = LOG_OPTS_INITIALIZER;
log_init(argv[0], lopts, LOG_DAEMON, NULL);
/* receive job type from slurmd */
safe_read(sock, &step_type, sizeof(int));
debug3("step_type = %d", step_type);
/* receive reverse-tree info from slurmd */
pthread_mutex_lock(&step_complete.lock);
safe_read(sock, &step_complete.rank, sizeof(int));
safe_read(sock, &step_complete.parent_rank, sizeof(int));
safe_read(sock, &step_complete.children, sizeof(int));
safe_read(sock, &step_complete.depth, sizeof(int));
safe_read(sock, &step_complete.max_depth, sizeof(int));
safe_read(sock, &step_complete.parent_addr, sizeof(slurm_addr_t));
step_complete.bits = bit_alloc(step_complete.children);
step_complete.jobacct = jobacctinfo_create(NULL);
pthread_mutex_unlock(&step_complete.lock);
/* receive conf from slurmd */
if ((conf = read_slurmd_conf_lite (sock)) == NULL)
fatal("Failed to read conf from slurmd");
log_alter(conf->log_opts, 0, conf->logfile);
log_set_timefmt(conf->log_fmt);
debug2("debug level is %d.", conf->debug_level);
switch_g_slurmd_step_init();
slurm_get_ip_str(&step_complete.parent_addr, &port, buf, 16);
debug3("slurmstepd rank %d, parent address = %s, port = %u",
step_complete.rank, buf, port);
/* receive cli from slurmd */
safe_read(sock, &len, sizeof(int));
incoming_buffer = xmalloc(sizeof(char) * len);
safe_read(sock, incoming_buffer, len);
buffer = create_buf(incoming_buffer,len);
cli = xmalloc(sizeof(slurm_addr_t));
if (slurm_unpack_slurm_addr_no_alloc(cli, buffer) == SLURM_ERROR)
fatal("slurmstepd: problem with unpack of slurmd_conf");
free_buf(buffer);
/* receive self from slurmd */
safe_read(sock, &len, sizeof(int));
if (len > 0) {
/* receive packed self from main slurmd */
incoming_buffer = xmalloc(sizeof(char) * len);
safe_read(sock, incoming_buffer, len);
buffer = create_buf(incoming_buffer,len);
self = xmalloc(sizeof(slurm_addr_t));
if (slurm_unpack_slurm_addr_no_alloc(self, buffer)
== SLURM_ERROR) {
fatal("slurmstepd: problem with unpack of "
"slurmd_conf");
}
free_buf(buffer);
}
/* Receive GRES information from slurmd */
gres_plugin_recv_stepd(sock);
/* Receive cpu_frequency info from slurmd */
cpu_freq_recv_info(sock);
/* receive req from slurmd */
safe_read(sock, &len, sizeof(int));
incoming_buffer = xmalloc(sizeof(char) * len);
safe_read(sock, incoming_buffer, len);
buffer = create_buf(incoming_buffer,len);
msg = xmalloc(sizeof(slurm_msg_t));
slurm_msg_t_init(msg);
msg->protocol_version = SLURM_PROTOCOL_VERSION;
switch(step_type) {
case LAUNCH_BATCH_JOB:
msg->msg_type = REQUEST_BATCH_JOB_LAUNCH;
break;
case LAUNCH_TASKS:
msg->msg_type = REQUEST_LAUNCH_TASKS;
break;
default:
fatal("Unrecognized launch RPC");
break;
}
if (unpack_msg(msg, buffer) == SLURM_ERROR)
fatal("slurmstepd: we didn't unpack the request correctly");
free_buf(buffer);
/* receive cached group ids array for the relevant uid */
safe_read(sock, &ngids, sizeof(int));
if (ngids > 0) {
int i;
uint32_t tmp32;
gids = (gid_t *)xmalloc(sizeof(gid_t) * ngids);
for (i = 0; i < ngids; i++) {
safe_read(sock, &tmp32, sizeof(uint32_t));
gids[i] = (gid_t)tmp32;
debug2("got gid %d", gids[i]);
}
}
*_cli = cli;
*_self = self;
*_msg = msg;
*_ngids = ngids;
*_gids = gids;
return 1;
rwfail:
fatal("Error reading initialization data from slurmd");
exit(1);
}
static stepd_step_rec_t *
_step_setup(slurm_addr_t *cli, slurm_addr_t *self, slurm_msg_t *msg)
{
stepd_step_rec_t *job = NULL;
switch(msg->msg_type) {
case REQUEST_BATCH_JOB_LAUNCH:
debug2("setup for a batch_job");
job = mgr_launch_batch_job_setup(msg->data, cli);
break;
case REQUEST_LAUNCH_TASKS:
debug2("setup for a launch_task");
job = mgr_launch_tasks_setup(msg->data, cli, self);
break;
default:
fatal("handle_launch_message: Unrecognized launch RPC");
break;
}
if (!job) {
error("_step_setup: no job returned");
return NULL;
}
job->jmgr_pid = getpid();
job->jobacct = jobacctinfo_create(NULL);
/* Establish GRES environment variables */
if (conf->debug_flags & DEBUG_FLAG_GRES) {
gres_plugin_job_state_log(job->job_gres_list, job->jobid);
gres_plugin_step_state_log(job->step_gres_list, job->jobid,
job->stepid);
}
if (msg->msg_type == REQUEST_BATCH_JOB_LAUNCH)
gres_plugin_job_set_env(&job->env, job->job_gres_list);
else if (msg->msg_type == REQUEST_LAUNCH_TASKS)
gres_plugin_step_set_env(&job->env, job->step_gres_list);
/*
* Add slurmd node topology informations to job env array
*/
env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR",
conf->node_topo_addr);
env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR_PATTERN",
conf->node_topo_pattern);
return job;
}
#ifdef MEMORY_LEAK_DEBUG
static void
_step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc)
{
if (job) {
jobacctinfo_destroy(job->jobacct);
if (!job->batch)
stepd_step_rec_destroy(job);
}
/*
* The message cannot be freed until the jobstep is complete
* because the job struct has pointers into the msg, such
* as the switch jobinfo pointer.
*/
switch(msg->msg_type) {
case REQUEST_BATCH_JOB_LAUNCH:
slurm_free_job_launch_msg(msg->data);
break;
case REQUEST_LAUNCH_TASKS:
slurm_free_launch_tasks_request_msg(msg->data);
break;
default:
fatal("handle_launch_message: Unrecognized launch RPC");
break;
}
jobacctinfo_destroy(step_complete.jobacct);
xfree(msg);
}
#endif
static void _dump_user_env(void)
{
int i;
for (i=0; environ[i]; i++)
printf("%s\n",environ[i]);
}