blob: 04a78b6cffc52ded8401c5b21e22d4d7c8a1b119 [file] [log] [blame] [edit]
/*****************************************************************************
* anchor.c - Slurm scrun anchor handlers
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#define _GNU_SOURCE /* posix_openpt() */
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/param.h>
#include <sys/prctl.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <termios.h>
#include <time.h>
#include <unistd.h>
#include "slurm/slurm.h"
#include "src/common/daemonize.h"
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/http.h"
#include "src/common/log.h"
#include "src/common/net.h"
#include "src/common/pack.h"
#include "src/common/read_config.h"
#include "src/common/setproctitle.h"
#include "src/common/spank.h"
#include "src/common/uid.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/conmgr/conmgr.h"
#include "src/scrun/scrun.h"
static void _open_pty();
static int _kill_job(conmgr_fd_t *con, int signal);
static void _notify_started(void);
static void _try_start(void);
static pid_t _daemonize(bool new_session);
#define BLOCKING_REQ_MAGIC 0xa13ab9fa
typedef struct {
int magic;
conmgr_fd_t *con;
slurm_msg_t *req_msg;
} blocking_req_t;
static void _free_block_req_t(void *x)
{
blocking_req_t *args = x;
xassert(args->magic == BLOCKING_REQ_MAGIC);
args->magic = ~BLOCKING_REQ_MAGIC;
/* con is owned by conmgr */
/* state is owned by caller */
slurm_free_msg(args->req_msg);
}
static int _queue_start_request(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
blocking_req_t *args = xmalloc(sizeof(*args));
args->magic = BLOCKING_REQ_MAGIC;
args->con = con;
args->req_msg = req_msg;
debug("%s: [%s] queued start request",
__func__, conmgr_fd_get_name(con));
write_lock_state();
if (!state.start_requests)
state.start_requests = list_create(_free_block_req_t);
list_append(state.start_requests, args);
unlock_state();
_try_start();
return SLURM_SUCCESS;
}
static int _queue_delete_request(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
blocking_req_t *args = xmalloc(sizeof(*args));
args->magic = BLOCKING_REQ_MAGIC;
args->con = con;
args->req_msg = req_msg;
debug("%s: [%s] queued delete request",
__func__, conmgr_fd_get_name(con));
write_lock_state();
if (!state.delete_requests)
state.delete_requests = list_create(_free_block_req_t);
list_append(state.delete_requests, args);
unlock_state();
return SLURM_SUCCESS;
}
static void _on_pty_reply_sent(conmgr_callback_args_t conmgr_args, void *arg)
{
conmgr_fd_t *con = conmgr_args.con;
int fd;
read_lock_state();
fd = state.pts;
unlock_state();
debug("%s: [%s] sending fd:%u", __func__, conmgr_fd_get_name(con), fd);
/* this is a blocking operation */
send_fd_over_socket(conmgr_fd_get_output_fd(con), fd);
}
static int _send_pty(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
slurm_msg_t *msg;
return_code_msg_t *rc_msg;
int rc = SLURM_SUCCESS;
msg = xmalloc(sizeof(*msg));
rc_msg = xmalloc(sizeof(*rc_msg));
slurm_msg_t_init(msg);
slurm_msg_set_r_uid(msg, SLURM_AUTH_UID_ANY);
msg->msg_type = RESPONSE_CONTAINER_PTY;
msg->protocol_version = req_msg->protocol_version;
msg->data = rc_msg;
rc_msg->return_code = rc;
rc = conmgr_queue_write_msg(con, msg);
slurm_free_msg(msg);
debug("%s: [%s] requested pty", __func__, conmgr_fd_get_name(con));
conmgr_add_work_con_write_complete_fifo(con, _on_pty_reply_sent, NULL);
return rc;
}
static void _daemonize_logs()
{
/*
* Default to syslog since scrun anchor should only ever be run in
* foreground while debugging issues.
*/
xassert(!state.needs_lock);
log_fac = SYSLOG_FACILITY_DAEMON;
if (oci_conf->debug_flags) {
debug("%s: overriding debugflags=0x%"PRIx64,
__func__, oci_conf->debug_flags);
slurm_conf.debug_flags = oci_conf->debug_flags;
}
if (oci_conf->syslog_log_level) {
log_opt.syslog_level = oci_conf->syslog_log_level;
debug("%s: overriding syslog debug level=%s",
__func__, log_num2string(log_opt.syslog_level));
}
if (oci_conf->stdio_log_level) {
log_opt.stderr_level = oci_conf->stdio_log_level;
debug("%s: overriding stdio debug level=%s",
__func__, log_num2string(log_opt.stderr_level));
}
if (oci_conf->file_log_level) {
log_opt.logfile_level = oci_conf->file_log_level;
debug("%s: overriding logfile debug level=%s",
__func__, log_num2string(log_opt.logfile_level));
}
update_logging();
}
static void _tear_down(conmgr_callback_args_t conmgr_args, void *arg)
{
bool need_kill = false, need_stop = false;
int rc = SLURM_SUCCESS;
xassert(!arg);
read_lock_state();
if (state.status >= CONTAINER_ST_STOPPED) {
debug("%s: ignoring request", __func__);
unlock_state();
return;
}
need_kill = (state.status == CONTAINER_ST_RUNNING);
need_stop = state.status < CONTAINER_ST_STOPPING;
unlock_state();
/* user has requested a tear down so assume success here */
if (need_stop)
stop_anchor(SLURM_SUCCESS);
if (need_kill)
rc = _kill_job(NULL, SIGKILL);
if (!rc)
stop_anchor(rc);
}
static int _send_delete_confirmation(void *x, void *arg)
{
blocking_req_t *req = x;
slurm_msg_t *msg;
return_code_msg_t *rc_msg;
xassert(req->magic == BLOCKING_REQ_MAGIC);
debug("%s: [%s] sending delete confirmation",
__func__, conmgr_fd_get_name(req->con));
/*
* either the container is already dead or kill will handle it.
* the process is async so there isn't much to do here.
*/
msg = xmalloc(sizeof(*msg));
rc_msg = xmalloc(sizeof(*rc_msg));
slurm_msg_t_init(msg);
slurm_msg_set_r_uid(msg, SLURM_AUTH_UID_ANY);
msg->msg_type = RESPONSE_CONTAINER_DELETE;
msg->protocol_version = req->req_msg->protocol_version;
msg->data = rc_msg;
rc_msg->return_code = SLURM_SUCCESS;
conmgr_queue_write_msg(req->con, msg);
slurm_free_msg(msg);
conmgr_queue_close_fd(req->con);
return SLURM_SUCCESS;
}
/* stopping job is async: this is the final say if the job has stopped */
static void _check_if_stopped(conmgr_callback_args_t conmgr_args, void *arg)
{
int ptm = -1;
bool stopped = false;
char *pid_file = NULL, *spool_dir = NULL, *anchor_socket = NULL;
int pid_file_fd = -1;
list_t *delete_requests;
xassert(!arg);
read_lock_state();
debug2("%s: status=%s job_completed=%c staged_out=%c",
__func__, slurm_container_status_to_str(state.status),
(state.job_completed ? 'T' : 'F'),
(state.staged_out ? 'T' : 'F'));
if (state.status >= CONTAINER_ST_STOPPED) {
/* do nothing */
} else if (state.job_completed && state.staged_out &&
(state.status >= CONTAINER_ST_STARTING)) {
/* something else may have got past stopped */
if (state.status == CONTAINER_ST_STOPPING)
stopped = true;
}
unlock_state();
if (!stopped)
return;
debug3("%s: I wish they'd just wipe out the container and get it over with. It's the waiting I can't stand.",
__func__);
delete_requests = list_create(_free_block_req_t);
write_lock_state();
ptm = state.ptm;
change_status_locked(CONTAINER_ST_STOPPED);
if (state.delete_requests)
list_transfer(delete_requests, state.delete_requests);
unlock_state();
list_for_each(delete_requests, _send_delete_confirmation, NULL);
FREE_NULL_LIST(delete_requests);
/* final cleanup from stopping */
if (ptm != -1) {
int tty;
if ((ptm > STDERR_FILENO) && close(ptm))
error("%s: PTM close(%d) failed: %m", __func__, ptm);
if ((tty = open("/dev/tty", O_RDWR) >= 0)) {
/* notify client's tty we are done */
debug3("%s: calling TIOCNOTTY on /dev/tty", __func__);
if (ioctl(tty, TIOCNOTTY, 0) == -1)
debug("%s: TIOCNOTTY(%d) failed: %m",
__func__, tty);
close(tty);
}
}
debug2("%s: cleaning up temporary files", __func__);
write_lock_state();
/* pid file should already be cleared but make sure here */
SWAP(pid_file, state.pid_file);
SWAP(pid_file_fd, state.pid_file_fd);
SWAP(spool_dir, state.spool_dir);
SWAP(anchor_socket, state.anchor_socket);
unlock_state();
/* conmgr will unlink anchor_socket at shutdown */
if (pid_file && unlink(pid_file))
debug("%s: unable to unlink(%s): %m", __func__, pid_file);
if ((pid_file_fd != -1) && ftruncate(pid_file_fd, 0))
error("%s: unable to ftruncate(%d): %m", __func__, pid_file_fd);
if ((pid_file_fd != -1) && close(pid_file_fd))
debug("%s: unable to close(%d): %m", __func__, pid_file_fd);
if (spool_dir && rmdir(spool_dir))
debug("%s: unable to rmdir(%s): %m", __func__, spool_dir);
#ifdef MEMORY_LEAK_DEBUG
xfree(anchor_socket);
xfree(pid_file);
xfree(spool_dir);
#endif /* MEMORY_LEAK_DEBUG */
debug2("%s: Goodbye, cruel velvet drapes!", __func__);
conmgr_request_shutdown();
}
static void _finish_job(conmgr_callback_args_t conmgr_args, void *arg)
{
int jobid, rc;
bool existing_allocation;
xassert(!arg);
read_lock_state();
xassert(state.status >= CONTAINER_ST_STOPPING);
xassert(!state.job_completed);
jobid = state.jobid;
rc = state.srun_rc;
existing_allocation = state.existing_allocation;
unlock_state();
if (existing_allocation) {
debug("%s: skipping slurm_complete_job(jobId=%u)",
__func__, jobid);
goto done;
} else if (!jobid) {
debug("%s: no Job to complete", __func__);
return;
}
rc = slurm_complete_job(jobid, rc);
if ((rc == SLURM_ERROR) && errno)
rc = errno;
if (rc == ESLURM_ALREADY_DONE) {
debug("%s: jobId=%u already complete", __func__, jobid);
} else if (rc) {
error("%s: slurm_complete_job(jobId=%u) failed: %s",
__func__, jobid, slurm_strerror(rc));
} else {
debug("%s: jobId=%u released successfully", __func__, jobid);
}
done:
write_lock_state();
xassert(!state.job_completed);
state.job_completed = true;
unlock_state();
conmgr_add_work_fifo(_check_if_stopped, NULL);
}
static void _stage_out(conmgr_callback_args_t conmgr_args, void *arg)
{
int rc;
bool staged_in;
xassert(!arg);
read_lock_state();
xassert(state.status >= CONTAINER_ST_STOPPING);
xassert(!state.staged_out);
debug("%s: BEGIN container %s staging out", __func__, state.id);
staged_in = state.staged_in;
unlock_state();
if (staged_in) {
rc = stage_out();
} else {
rc = SLURM_SUCCESS;
debug("%s: skipping stage_out() due to stage_in() never running",
__func__);
}
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: END container %s staging out: %s",
__func__, state.id, slurm_strerror(rc));
unlock_state();
}
write_lock_state();
xassert(!state.staged_out);
state.staged_out = true;
unlock_state();
conmgr_add_work_fifo(_finish_job, NULL);
}
/* cleanup anchor and shutdown */
extern void stop_anchor(int status)
{
debug2("%s: begin", __func__);
write_lock_state();
if (state.status > CONTAINER_ST_STOPPING) {
unlock_state();
debug2("%s: already stopped", __func__);
return;
}
if (state.status == CONTAINER_ST_STOPPING) {
unlock_state();
debug2("%s: waiting for already running stop request",
__func__);
return;
}
change_status_locked(CONTAINER_ST_STOPPING);
xassert(!state.srun_exited);
xassert(!state.srun_rc);
state.srun_exited = true;
state.srun_rc = status;
xassert(!state.job_completed);
xassert(!state.staged_out);
if (state.startup_con) {
int rc;
debug4("%s: sending pid %"PRIu64" to parent due to container stopped in state %s",
__func__, (uint64_t) state.pid,
slurm_container_status_to_str(state.status));
/* send the pid now since due to failure */
if ((rc = conmgr_queue_write_data(state.startup_con, &state.pid,
sizeof(state.pid))))
fatal("%s: unable to send pid: %s",
__func__, slurm_strerror(rc));
conmgr_queue_close_fd(state.startup_con);
}
unlock_state();
conmgr_add_work_fifo(_stage_out, NULL);
debug2("%s: end", __func__);
}
static void _catch_sigchld(conmgr_callback_args_t conmgr_args, void *arg)
{
pid_t pid;
pid_t srun_pid;
/* we are acting like this is atomic - it is only for logging */
static uint32_t reaped = 0;
xassert(arg == &state);
debug("%s: caught SIGCHLD", __func__);
write_lock_state();
srun_pid = state.srun_pid;
unlock_state();
if (!srun_pid) {
debug("%s: ignoring SIGCHLD before srun started",
__func__);
return;
}
debug("%s: processing SIGCHLD: finding all anchor children (pid=%"PRIu64")",
__func__, (uint64_t) getpid());
do {
int status = 0;
pid = waitpid(-1, &status, WNOHANG);
if (pid < 0) {
if (errno == ECHILD)
debug("%s: got SIGCHLD with no child processes", __func__);
else
error("%s: waitpid(-1) failed: %m", __func__);
break;
}
reaped++;
if (pid == srun_pid) {
if (WIFEXITED(status)) {
debug("%s: srun[%d] exited with rc=0x%x",
__func__, pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
debug("%s: srun[%d] killed by signal %s[%d]",
__func__, pid,
strsignal(WTERMSIG(status)),
WTERMSIG(status));
} else {
debug("%s: srun[%d] exited rc=0x%x",
__func__, pid, status);
}
stop_anchor(status);
} else if (get_log_level() >= LOG_LEVEL_DEBUG) {
if (!pid) {
debug("%s: done reaping %u child processes",
__func__, reaped);
} else if (WIFEXITED(status)) {
debug("%s; child[%d] exited with rc=0x%x",
__func__, pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
debug("%s: child[%d] killed by signal %s[%d]",
__func__, pid,
strsignal(WTERMSIG(status)),
WTERMSIG(status));
}
}
} while (pid > 0);
}
static void *_on_cs_connection(conmgr_fd_t *con, void *arg)
{
int tty;
xassert(!arg);
read_lock_state();
/* containerd expects the PTM */
tty = state.ptm;
if (state.status >= CONTAINER_ST_STOPPED) {
error("%s: skipping sending console_socket due container %s status %s",
__func__, state.id,
slurm_container_status_to_str(state.status));
unlock_state();
/* NULL return will close the connection */
return NULL;
}
unlock_state();
debug2("%s: [%s] sending fd:%d",
__func__, conmgr_fd_get_name(con), tty);
xassert(tty != -1);
xassert(isatty(tty));
/* hand over pty to console_socket */
/* WARNING: blocking call */
errno = 0;
send_fd_over_socket(conmgr_fd_get_output_fd(con), tty);
debug2("%s: [%s] sent fd:%d rc:%m",
__func__, conmgr_fd_get_name(con), tty);
/* only sending the fd */
conmgr_queue_close_fd(con);
return &state;
}
static int _on_cs_data(conmgr_fd_t *con, void *arg)
{
xassert(!arg);
debug3("%s", __func__);
read_lock_state();
error("%s: unexpectedly sent data via console_socket %s for container %s status=%s",
__func__, state.console_socket, state.id,
slurm_container_status_to_str(state.status));
unlock_state();
return EINVAL;
}
static void _on_cs_finish(conmgr_fd_t *con, void *arg)
{
xassert(arg == &state);
check_state();
debug3("%s", __func__);
}
static void _queue_send_console_socket(void)
{
int rc;
static const conmgr_events_t events = {
.on_connection = _on_cs_connection,
.on_data = _on_cs_data,
.on_finish = _on_cs_finish,
};
struct sockaddr_un addr = {
.sun_family = AF_UNIX,
};
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
fd_set_nonblocking(fd);
fd_set_close_on_exec(fd);
if (strlcpy(addr.sun_path, state.console_socket,
sizeof(addr.sun_path)) != strlen(state.console_socket))
fatal("console socket address too long: %s",
state.console_socket);
if ((connect(fd, (struct sockaddr *) &addr, sizeof(addr))) < 0)
fatal("%s: [%s] Unable to connect() to console socket: %m",
__func__, addr.sun_path);
if ((rc = conmgr_process_fd(CON_TYPE_RAW, fd, fd, &events, CON_FLAG_NONE,
(slurm_addr_t *) &addr, sizeof(addr),
NULL, NULL)))
fatal("%s: [%s] unable to initialize console socket: %s",
__func__, addr.sun_path, slurm_strerror(rc));
debug("%s: queued up console socket %s to send pty",
__func__, addr.sun_path);
}
static int _send_start_response(conmgr_fd_t *con, slurm_msg_t *req_msg, int rc)
{
slurm_msg_t *msg;
container_started_msg_t *st_msg;
/* respond with rc */
msg = xmalloc(sizeof(*msg));
st_msg = xmalloc(sizeof(*st_msg));
slurm_msg_t_init(msg);
slurm_msg_set_r_uid(msg, SLURM_AUTH_UID_ANY);
msg->msg_type = RESPONSE_CONTAINER_START;
msg->protocol_version = req_msg->protocol_version;
msg->data = st_msg;
st_msg->rc = rc;
read_lock_state();
st_msg->step.job_id = state.jobid;
unlock_state();
st_msg->step.step_id = 0;
st_msg->step.step_het_comp = NO_VAL;
rc = conmgr_queue_write_msg(con, msg);
slurm_free_msg(msg);
conmgr_queue_close_fd(con);
return rc;
}
static int _finish_start_request(void *x, void *arg)
{
int rc;
blocking_req_t *req = x;
check_state();
xassert(!arg);
xassert(req->magic == BLOCKING_REQ_MAGIC);
debug("%s: [%s] sending start response",
__func__, conmgr_fd_get_name(req->con));
rc = _send_start_response(req->con, req->req_msg, SLURM_SUCCESS);
return (rc ? SLURM_ERROR : SLURM_SUCCESS);
}
static void _notify_started(void)
{
list_t *l = NULL;
write_lock_state();
SWAP(l, state.start_requests);
unlock_state();
if (!l)
return;
list_for_each_ro(l, _finish_start_request, NULL);
FREE_NULL_LIST(l);
}
static void _try_start(void)
{
pid_t child = 0;
write_lock_state();
if (state.status >= CONTAINER_ST_RUNNING) {
/* already started: report success */
unlock_state();
_notify_started();
return;
}
if (state.status < CONTAINER_ST_CREATED) {
if (!state.start_requests || !list_count(state.start_requests))
fatal("%s: start request queue empty", __func__);
debug("%s: deferring %d start requests while in status:%s",
__func__, list_count(state.start_requests),
slurm_container_status_to_str(state.status));
unlock_state();
return;
}
change_status_locked(CONTAINER_ST_STARTING);
unlock_state();
if ((child = _daemonize(false))) {
write_lock_state();
state.srun_pid = child;
debug("%s: forked for srun of %s to pid:%"PRIu64,
__func__, state.id, (uint64_t) state.srun_pid);
change_status_locked(CONTAINER_ST_RUNNING);
unlock_state();
_notify_started();
} else {
exec_srun_container();
fatal("should never execute this line");
}
}
static int _start(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
container_state_msg_status_t status;
read_lock_state();
status = state.status;
unlock_state();
/* NOTE: explicitly listing all possible states here */
switch (status) {
case CONTAINER_ST_INVALID:
case CONTAINER_ST_UNKNOWN:
case CONTAINER_ST_MAX:
fatal("%s: [%s] start request while in status:%s should never happen",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(status));
case CONTAINER_ST_CREATING:
debug("%s: [%s] start request while in status:%s. Deferring start request until CREATED state.",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(status));
return _queue_start_request(con, req_msg);
case CONTAINER_ST_CREATED:
{
debug("%s: [%s] queuing up start request in status:%s",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(status));
/* at created and we just changed state to STARTING */
return _queue_start_request(con, req_msg);
}
case CONTAINER_ST_STARTING:
case CONTAINER_ST_RUNNING:
debug("%s: [%s] ignoring duplicate start request while %s",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(status));
return _send_start_response(con, req_msg, SLURM_SUCCESS);
case CONTAINER_ST_STOPPING:
case CONTAINER_ST_STOPPED:
/* already ran? */
debug("%s: [%s] start request while in status:%s rejected",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(status));
/* TODO: maybe this should always be SUCCESS too? */
return _send_start_response(con, req_msg, ESLURM_ALREADY_DONE);
}
fatal("%s: should never get past switch()", __func__);
}
static int _kill_job(conmgr_fd_t *con, int signal)
{
int rc = SLURM_SUCCESS, jobid;
container_state_msg_status_t status;
read_lock_state();
jobid = state.jobid;
status = state.status;
unlock_state();
if (jobid && (status <= CONTAINER_ST_STOPPING)) {
rc = slurm_kill_job(jobid, signal, KILL_FULL_JOB);
debug("%s: [%s] slurm_kill_job(JobID=%d, Signal[%d]=%s, 0) = %s",
__func__, (con ? conmgr_fd_get_name(con) : "self"),
jobid, signal, strsignal(signal), slurm_strerror(rc));
} else {
debug("%s: [%s] job already dead",
__func__, conmgr_fd_get_name(con));
}
return rc;
}
static int _kill(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
slurm_msg_t *msg;
return_code_msg_t *rc_msg;
int rc;
container_signal_msg_t *sig_msg = req_msg->data;
xassert(req_msg->msg_type == REQUEST_CONTAINER_KILL);
debug("%s: [%s] requested signal %s",
__func__, conmgr_fd_get_name(con), strsignal(sig_msg->signal));
rc = _kill_job(con, sig_msg->signal);
/* respond with rc */
msg = xmalloc(sizeof(*msg));
rc_msg = xmalloc(sizeof(*rc_msg));
slurm_msg_t_init(msg);
slurm_msg_set_r_uid(msg, SLURM_AUTH_UID_ANY);
msg->msg_type = RESPONSE_CONTAINER_KILL;
msg->protocol_version = req_msg->protocol_version;
msg->data = rc_msg;
rc_msg->return_code = rc;
rc = conmgr_queue_write_msg(con, msg);
slurm_free_msg(msg);
return rc;
}
static int _delete(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
int rc = SLURM_SUCCESS;
container_delete_msg_t *delete_msg = req_msg->data;
debug("%s: [%s]%s delete requested: %s",
__func__, conmgr_fd_get_name(con),
(delete_msg->force ? " force" : ""), slurm_strerror(rc));
rc = _queue_delete_request(con, req_msg);
conmgr_add_work_fifo(_tear_down, NULL);
return rc;
}
static void _set_proctitle()
{
char *thread_name = NULL;
xassert(!state.needs_lock);
setproctitle("%s", state.id);
xstrfmtcat(thread_name, "scrun:%s", state.id);
if (prctl(PR_SET_NAME, thread_name, NULL, NULL, NULL) < 0) {
fatal("Unable to set process name");
}
xfree(thread_name);
}
static pid_t _daemonize(bool new_session)
{
int pipe_fd[2];
pid_t pid;
if (pipe(pipe_fd))
fatal("pipe() failed: %m");
xassert(pipe_fd[0] > STDERR_FILENO);
xassert(pipe_fd[1] > STDERR_FILENO);
if ((pid = fork()) == -1)
fatal("cannot fork: %m");
log_reinit();
if (pid) {
/* nothing else in parent */
debug("%s: forked off child %"PRIu64,
__func__, (uint64_t) pid);
if (close(pipe_fd[1]))
fatal("close(%u) failed: %m", pipe_fd[1]);
safe_read(pipe_fd[0], &pid, sizeof(pid));
if (close(pipe_fd[0]))
fatal("close(%u) failed: %m", pipe_fd[0]);
return pid;
}
if (new_session) {
/* explicitly not calling xdaemon() as it breaks stdio */
switch (fork()) {
case 0:
break; /* child */
case -1:
return -1;
default:
_exit(0); /* exit parent */
}
if (setsid() < 0)
fatal("setsid() failed: %m");
switch (fork()) {
case 0:
break; /* child */
case -1:
return -1;
default:
_exit(0); /* exit parent */
}
}
/* avoid deadlocks with logging threads */
log_reinit();
pid = getpid();
if (close(pipe_fd[0]))
fatal("close(%u) failed: %m", pipe_fd[0]);
safe_write(pipe_fd[1], &pid, sizeof(pid));
if (close(pipe_fd[1]))
fatal("close(%u) failed: %m", pipe_fd[1]);
return 0;
rwfail:
fatal("Unable to send PID to parent: %m");
}
static void _cleanup_pidfile()
{
xassert(!state.needs_lock);
if (unlink(state.pid_file) == -1) {
debug("%s: unable to remove pidfile `%s': %m",
__func__, state.pid_file);
}
if ((state.pid_file_fd != -1) && (close(state.pid_file_fd) == -1)) {
debug("%s: unable to close pidfile `%s': %m", __func__,
state.pid_file);
}
state.pid_file_fd = -1;
}
/*
* Based on create_pidfile() but does not place newline at end of file
* to be compatible with docker pidfile parsing. Defers actually writing the
* pidfile too.
*/
static void _open_pidfile()
{
int rc = SLURM_SUCCESS;
xassert(!state.needs_lock);
if (!state.pid_file)
return;
xassert(state.pid_file_fd == -1);
xassert(state.pid_file != NULL);
xassert(state.pid_file[0] == '/');
xassert(state.pid_file[1]);
state.pid_file_fd = open(state.pid_file,
O_CREAT|O_WRONLY|O_TRUNC|O_CLOEXEC,
S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (state.pid_file_fd == -1) {
rc = errno;
error("%s: unable to open pidfile `%s': %s",
__func__, state.pid_file, slurm_strerror(rc));
/* avoid unlinking pidfile from another scrun */
return;
}
fd_set_close_on_exec(state.pid_file_fd);
if (fd_get_write_lock(state.pid_file_fd) == -1) {
rc = errno;
error("%s: unable to lock pidfile `%s': %s",
__func__, state.pid_file, slurm_strerror(rc));
goto cleanup;
}
if (fchown(state.pid_file_fd, getuid(), -1) == -1) {
rc = errno;
error("%s: Unable to reset owner of pidfile `%s' to %u: %s",
__func__, state.pid_file, getuid(), slurm_strerror(rc));
goto cleanup;
}
debug("%s: opened pid file: %s", __func__, state.pid_file);
return;
cleanup:
_cleanup_pidfile();
fatal("%s: create pidfile %s failed: %s",
__func__, state.pid_file, slurm_strerror(rc));
}
static void _populate_pidfile()
{
int rc = SLURM_SUCCESS;
char *pid_str = NULL;
xassert(!state.needs_lock);
if (!state.pid_file)
return;
pid_str = xstrdup_printf("%lu", (unsigned long) state.pid);
safe_write(state.pid_file_fd, pid_str, strlen(pid_str));
xfree(pid_str);
debug("%s: populated pid file: %s", __func__, state.pid_file);
return;
rwfail:
rc = errno;
_cleanup_pidfile();
xfree(pid_str);
fatal("%s: populate pidfile %s failed: %s",
__func__, state.pid_file, slurm_strerror(rc));
}
extern void on_allocation(conmgr_callback_args_t conmgr_args, void *arg)
{
bool queue_try_start = false;
int rc;
pid_t pid;
xassert(!arg);
write_lock_state();
if (state.jobid <= 0) {
unlock_state();
debug("%s: waiting for job allocation", __func__);
return;
}
if (!state.startup_con) {
unlock_state();
debug("%s: waiting for create command connection", __func__);
return;
}
if (state.status != CONTAINER_ST_CREATING) {
error("%s: can only switch to CREATED from CREATING but current status=%s",
__func__, slurm_container_status_to_str(state.status));
unlock_state();
return;
}
/* created reqs completed */
change_status_locked(CONTAINER_ST_CREATED);
if (state.start_requests) {
debug("%s: %s requesting start as user already requested start",
__func__, state.id);
queue_try_start = true;
}
debug("%s: %s created successfully", __func__, state.id);
pid = getpid();
/* notify command_create() that container is now CREATED */
xassert(state.startup_con);
if ((rc = conmgr_queue_write_data(state.startup_con, &pid,
sizeof(pid))))
fatal("%s: unable to send pid: %s",
__func__, slurm_strerror(rc));
conmgr_queue_close_fd(state.startup_con);
unlock_state();
if (queue_try_start)
_try_start();
}
static void *_on_connection(conmgr_fd_t *con, void *arg)
{
/* may or may not need to be locked for this one */
check_state();
xassert(!arg);
debug4("%s: [%s] new connection", __func__, conmgr_fd_get_name(con));
return &state;
}
static void _on_connection_finish(conmgr_fd_t *con, void *arg)
{
xassert(arg == &state);
check_state();
}
static int _send_state(conmgr_fd_t *con, slurm_msg_t *req_msg)
{
int rc;
container_state_msg_t *state_msg;
slurm_msg_t *msg;
check_state();
msg = xmalloc(sizeof(*msg));
slurm_msg_t_init(msg);
slurm_msg_set_r_uid(msg, SLURM_AUTH_UID_ANY);
msg->msg_type = RESPONSE_CONTAINER_STATE;
msg->protocol_version = req_msg->protocol_version;
state_msg = slurm_create_container_state_msg();
msg->data = state_msg;
read_lock_state();
state_msg->oci_version = xstrdup(state.oci_version);
state_msg->id = xstrdup(state.id);
state_msg->status = state.status;
state_msg->pid = state.pid;
state_msg->bundle = xstrdup(state.bundle);
state_msg->annotations = list_shallow_copy(state.annotations);
debug("%s: [%s] sent state with status=%s",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(state.status));
rc = conmgr_queue_write_msg(con, msg);
/* must hold read lock until annotations have been packed */
unlock_state();
slurm_free_msg(msg);
return rc;
}
static int _on_connection_msg(conmgr_fd_t *con, slurm_msg_t *msg, int unpack_rc,
void *arg)
{
int rc;
uid_t user_id;
xassert(arg == &state);
if (unpack_rc || !msg->auth_ids_set) {
error("%s: [%s] rejecting malformed RPC and closing connection: %s",
__func__, conmgr_fd_get_name(con),
slurm_strerror(unpack_rc));
slurm_free_msg(msg);
return unpack_rc;
}
read_lock_state();
user_id = state.user_id;
unlock_state();
/*
* TODO: this only allows same user currently but maybe this should
* allow root/slurmuser?
*
* Note: containerd will start runc in a new username space running as
* the root user. We must check against the job user instead of
* getuid().
*/
if (!msg->auth_ids_set) {
error("%s: [%s] rejecting %s RPC with missing user auth",
__func__, conmgr_fd_get_name(con),
rpc_num2string(msg->msg_type));
return SLURM_PROTOCOL_AUTHENTICATION_ERROR;
} else if (msg->auth_uid != user_id) {
error("%s: [%s] rejecting %s RPC with user:%u != owner:%u",
__func__, conmgr_fd_get_name(con),
rpc_num2string(msg->msg_type), msg->auth_uid, user_id);
return SLURM_PROTOCOL_AUTHENTICATION_ERROR;
}
switch (msg->msg_type) {
case REQUEST_CONTAINER_STATE:
rc = _send_state(con, msg);
slurm_free_msg(msg);
break;
case REQUEST_CONTAINER_START:
rc = _start(con, msg);
/* msg is free'ed later */
break;
case REQUEST_CONTAINER_PTY:
rc = _send_pty(con, msg);
slurm_free_msg(msg);
break;
case REQUEST_CONTAINER_KILL:
rc = _kill(con, msg);
slurm_free_msg(msg);
break;
case REQUEST_CONTAINER_DELETE:
rc = _delete(con, msg);
/* msg is free'ed later */
break;
default:
rc = SLURM_UNEXPECTED_MSG_ERROR;
error("%s: [%s] unexpected message %u",
__func__, conmgr_fd_get_name(con), msg->msg_type);
slurm_free_msg(msg);
}
return rc;
}
static void _adopt_tty(void)
{
debug("STDIN_FILENO is a tty! requested_terminal=%c",
(state.requested_terminal ? 't' : 'f'));
/* grab size of terminal screen */
if (ioctl(STDIN_FILENO, TIOCGWINSZ, &state.tty_size))
fatal("ioctl(TIOCGWINSZ): %m");
state.pts = STDIN_FILENO;
}
static void _open_pty(void)
{
int ptm; /* pseudoterminal master (PTM) */
int pts; /* pseudoterminal slave (PTS) */
xassert(!state.needs_lock);
if ((ptm = posix_openpt(O_RDWR|O_NOCTTY)) < 0)
fatal("posix_openpt() failed: %m");
/*
* Per man pts:
* Before opening the pseudoterminal slave, you must pass the
* master's file descriptor to grantpt(3) and unlockpt(3).
*/
if (grantpt(ptm))
fatal("%s: Unable to grantpt() pty: %m", __func__);
if (unlockpt(ptm))
fatal("%s: Unable to unlockpt() pty: %m", __func__);
if ((pts = open(ptsname(ptm), O_RDWR)) < 0)
fatal("%s: Unable to open %s: %m",
__func__, ptsname(ptm));
debug("%s: created pty %s ptm:%d pts:%d",
__func__, ptsname(ptm), ptm, pts);
xassert(state.ptm == -1);
xassert(state.pts == -1);
state.ptm = ptm;
state.pts = pts;
}
static int _on_startup_con_data(conmgr_fd_t *con, void *arg)
{
xassert(arg == &state);
check_state();
fatal("%s: unexpected data", __func__);
}
static void *_on_startup_con(conmgr_fd_t *con, void *arg)
{
bool queue = false;
xassert(!arg);
debug4("%s: [%s] new startup connection",
__func__, conmgr_fd_get_name(con));
write_lock_state();
xassert(!state.startup_con);
state.startup_con = con;
/*
* job may already be allocated at this point so see if we need to mark
* as created
*/
if ((state.status == CONTAINER_ST_CREATING) && (state.jobid > 0) &&
!state.existing_allocation)
queue = true;
unlock_state();
if (queue) {
conmgr_add_work_fifo(on_allocation, NULL);
}
return &state;
}
static void _on_startup_con_fin(conmgr_fd_t *con, void *arg)
{
xassert(arg == &state);
write_lock_state();
xassert(state.startup_con == con);
debug4("%s: [%s] create command parent notified of start",
__func__, conmgr_fd_get_name(state.startup_con));
xassert(state.startup_con);
state.startup_con = NULL;
unlock_state();
_try_start();
}
static int _wait_create_pid(int fd, pid_t child)
{
int rc, status;
pid_t pid;
debug("%s: waiting for anchor pid on fd %d from %"PRIu64,
__func__, fd, (uint64_t) child);
safe_read(fd, &pid, sizeof(pid));
if (close(fd))
fatal("close(%u) failed: %m", fd);
debug4("%s: goodbye cruel lamp", __func__);
if (pid > 0) {
debug("%s: anchor pid %"PRIu64" ready",
__func__, (uint64_t) pid);
return SLURM_SUCCESS;
} else {
debug("%s: received failure signal pid %"PRIi64,
__func__, (int64_t) pid);
goto check_pid;
}
rwfail:
rc = errno;
debug("%s: pipe read(%d) error while waiting for pid from child process %"PRIu64" failed: %s",
__func__, fd, (uint64_t) child, slurm_strerror(rc));
check_pid:
/* check what happened to the child process */
debug("%s: waiting for anchor process %u to terminate",
__func__, child);
while (((rc = waitpid(child, &status, 0)) < 0) && (errno == EINTR))
debug("%s: waitpid(%" PRIu64 ") interrupted",
__func__, (uint64_t) child);
if (rc == -1) {
rc = errno;
debug("%s: waitpid(%"PRIu64") failed[%d]: %s",
__func__, (uint64_t) child, rc, slurm_strerror(rc));
return rc;
}
xassert(child == rc);
rc = SLURM_SUCCESS;
debug("anchor %d successfully left session", child);
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
debug("%s: anchor %"PRIu64" exited[%d]=%s",
__func__, (uint64_t) child, rc, slurm_strerror(rc));
} else if (WIFSIGNALED(status)) {
fatal("%s: anchor %"PRIu64" killed by signal %d",
__func__, (uint64_t) child, WTERMSIG(status));
}
return rc;
}
static int _anchor_child(int pipe_fd[2])
{
static const conmgr_events_t conmgr_events = {
.on_msg = _on_connection_msg,
.on_connection = _on_connection,
.on_finish = _on_connection_finish,
};
static const conmgr_events_t conmgr_startup_events = {
.on_data = _on_startup_con_data,
.on_connection = _on_startup_con,
.on_finish = _on_startup_con_fin,
};
list_t *socket_listen = list_create(xfree_ptr);
int rc, spank_rc;
state.pid = getpid();
_populate_pidfile();
/* must init conmgr after calling fork() in _daemonize() */
conmgr_init(0, 0);
change_status_force(CONTAINER_ST_CREATING);
if (mkdirpath(state.spool_dir, S_IRWXU, true)) {
fatal("%s: unable to create spool directory %s: %m",
__func__, state.spool_dir);
} else {
debug("created: %s", state.spool_dir);
}
_daemonize_logs();
_set_proctitle();
/* setup new TTY*/
if (isatty(STDIN_FILENO))
_adopt_tty();
else if (state.requested_terminal)
_open_pty();
if (state.console_socket && state.console_socket[0])
_queue_send_console_socket();
/* scrun anchor process */
/* TODO: only 1 unix socket for now */
list_append(socket_listen,
xstrdup_printf("unix:%s", state.anchor_socket));
if ((rc = conmgr_create_listen_sockets(CON_TYPE_RPC, CON_FLAG_NONE,
socket_listen, &conmgr_events,
NULL)))
fatal("%s: unable to initialize listeners: %s",
__func__, slurm_strerror(rc));
debug("%s: listening on unix:%s", __func__, state.anchor_socket);
conmgr_add_work_signal(SIGCHLD, _catch_sigchld, &state);
if ((rc = conmgr_process_fd(CON_TYPE_RAW, pipe_fd[1], pipe_fd[1],
&conmgr_startup_events, CON_FLAG_NONE, NULL,
0, NULL, NULL)))
fatal("%s: unable to initialize RPC listener: %s",
__func__, slurm_strerror(rc));
conmgr_add_work_fifo(get_allocation, NULL);
if ((spank_rc = spank_init_post_opt())) {
fatal("%s: plugin stack post-option processing failed: %s",
__func__, slurm_strerror(spank_rc));
}
/* state must be rwlocked during conmgr_run() */
#ifndef NDEBUG
slurm_mutex_lock(&state.debug_lock);
debug4("%s: BEGIN conmgr_run()", __func__);
xassert(!state.needs_lock);
xassert(!state.locked);
state.needs_lock = true;
slurm_mutex_unlock(&state.debug_lock);
#endif
rc = conmgr_run(true);
#ifndef NDEBUG
slurm_mutex_lock(&state.debug_lock);
xassert(!state.locked);
xassert(state.needs_lock);
state.needs_lock = false;
debug4("%s: END conmgr_run()", __func__);
slurm_mutex_unlock(&state.debug_lock);
#endif
FREE_NULL_LIST(socket_listen);
conmgr_fini();
return rc;
}
extern int spawn_anchor(void)
{
int pipe_fd[2] = { -1, -1 };
pid_t child;
int rc, spank_rc;
check_state();
init_lua();
if ((rc = spank_init_allocator()))
fatal("%s: failed to initialize plugin stack: %s",
__func__, slurm_strerror(rc));
if (pipe(pipe_fd))
fatal("pipe() failed: %m");
xassert(pipe_fd[0] > STDERR_FILENO);
xassert(pipe_fd[1] > STDERR_FILENO);
_open_pidfile();
if ((child = _daemonize(state.requested_terminal))) {
if (close(pipe_fd[1]))
fatal("%s: close pipe failed: %m", __func__);
rc = _wait_create_pid(pipe_fd[0], child);
goto done;
} else
rc = _anchor_child(pipe_fd);
done:
spank_rc = spank_fini(NULL);
destroy_lua();
debug2("%s: rc[%d]=%s spank_rc[%d]=%s srun_rc[%d]=%s",
__func__,
rc, slurm_strerror(rc),
spank_rc, slurm_strerror(spank_rc),
state.srun_rc, slurm_strerror(state.srun_rc));
if (rc)
return rc;
else if (spank_rc)
return spank_rc;
else if (state.srun_rc)
return state.srun_rc;
return SLURM_SUCCESS;
}