blob: c9ed9fee27d53cd751eb168d920fcbe79ba9bf27 [file] [log] [blame]
/*****************************************************************************\
* src/slurmd/slurmstepd/req.c - slurmstepd domain socket request handling
*****************************************************************************
* Copyright (C) 2005-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Christopher Morrone <morrone2@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#define _GNU_SOURCE /* needed for struct ucred definition */
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
#include "src/common/cpu_frequency.h"
#include "src/common/eio.h"
#include "src/common/fd.h"
#include "src/common/macros.h"
#include "src/common/parse_time.h"
#include "src/common/proc_args.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/stepd_api.h"
#include "src/common/stepd_proxy.h"
#include "src/common/strlcpy.h"
#include "src/common/timers.h"
#include "src/common/tres_frequency.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/acct_gather.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/job_container.h"
#include "src/interfaces/jobacct_gather.h"
#include "src/interfaces/proctrack.h"
#include "src/interfaces/switch.h"
#include "src/interfaces/task.h"
#include "src/slurmd/common/slurmstepd_init.h"
#include "src/slurmd/slurmd/slurmd.h"
#include "src/slurmd/slurmstepd/io.h"
#include "src/slurmd/slurmstepd/mgr.h"
#include "src/slurmd/slurmstepd/pdebug.h"
#include "src/slurmd/slurmstepd/req.h"
#include "src/slurmd/slurmstepd/slurmstepd.h"
#include "src/slurmd/slurmstepd/slurmstepd_job.h"
#include "src/slurmd/slurmstepd/step_terminate_monitor.h"
#include "src/slurmd/slurmstepd/ulimits.h"
#include "src/stepmgr/srun_comm.h"
#include "src/stepmgr/stepmgr.h"
static void *_handle_accept(void *arg);
static int _handle_request(int fd, stepd_step_rec_t *step,
uid_t uid, pid_t remote_pid);
static int _handle_state(int fd, stepd_step_rec_t *step);
static int _handle_mem_limits(int fd, stepd_step_rec_t *step);
static int _handle_uid(int fd, stepd_step_rec_t *step);
static int _handle_nodeid(int fd, stepd_step_rec_t *step);
static int _handle_signal_container(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_attach(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_pid_in_container(int fd, stepd_step_rec_t *step);
static void *_wait_extern_pid(void *args);
static int _handle_add_extern_pid_internal(stepd_step_rec_t *step, pid_t pid);
static int _handle_add_extern_pid(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_x11_display(int fd, stepd_step_rec_t *step);
static int _handle_getpw(int fd, stepd_step_rec_t *step, pid_t remote_pid);
static int _handle_getgr(int fd, stepd_step_rec_t *step, pid_t remote_pid);
static int _handle_gethost(int fd, stepd_step_rec_t *step, pid_t remote_pid);
static int _handle_daemon_pid(int fd, stepd_step_rec_t *step);
static int _handle_notify_job(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_suspend(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_resume(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_terminate(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_completion(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_stat_jobacct(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_task_info(int fd, stepd_step_rec_t *step);
static int _handle_list_pids(int fd, stepd_step_rec_t *step);
static int _handle_reconfig(int fd, stepd_step_rec_t *step, uid_t uid);
static int _handle_get_ns_fd(int fd, stepd_step_rec_t *step);
static bool _msg_socket_readable(eio_obj_t *obj);
static int _msg_socket_accept(eio_obj_t *obj, list_t *objs);
struct io_operations msg_socket_ops = {
.readable = &_msg_socket_readable,
.handle_read = &_msg_socket_accept
};
static char *socket_name;
static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool suspended = false;
static int extern_thread_cnt = 0;
static pthread_t *extern_threads = NULL;
static pthread_mutex_t extern_thread_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t extern_thread_cond = PTHREAD_COND_INITIALIZER;
struct request_params {
int fd;
stepd_step_rec_t *step;
};
typedef struct {
stepd_step_rec_t *step;
pid_t pid;
} extern_pid_t;
pthread_mutex_t stepmgr_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t message_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t message_cond = PTHREAD_COND_INITIALIZER;
static int message_connections = 0;
static int msg_target_node_id = 0;
/*
* Returns true if "uid" is a "slurm authorized user" - i.e. uid == 0
* or uid == slurm user id at this time.
*/
static bool
_slurm_authorized_user(uid_t uid)
{
return ((uid == (uid_t) 0) || (uid == slurm_conf.slurm_user_id));
}
/*
* Create a named unix domain listening socket.
* (cf, Stevens APUE 1st ed., section 15.5.2)
*/
static int
_create_socket(const char *name)
{
int fd;
int len;
struct sockaddr_un addr;
/*
* If socket name would be truncated, emit error and exit
*/
if (strlen(name) > sizeof(addr.sun_path) - 1) {
error("%s: Unix socket path '%s' is too long. (%ld > %ld)",
__func__, name, (long int)(strlen(name) + 1),
(long int)sizeof(addr.sun_path));
errno = ESLURMD_INVALID_SOCKET_NAME_LEN;
return -1;
}
/* create a unix domain stream socket */
if ((fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)) < 0)
return -1;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strlcpy(addr.sun_path, name, sizeof(addr.sun_path));
len = strlen(addr.sun_path)+1 + sizeof(addr.sun_family);
/* bind the name to the descriptor */
if (bind(fd, (struct sockaddr *) &addr, len) < 0) {
(void) close(fd);
return -2;
}
if (listen(fd, 32) < 0) {
(void) close(fd);
return -3;
}
return fd;
}
static int
_domain_socket_create(const char *dir, const char *nodename,
slurm_step_id_t *step_id)
{
int fd;
char *name = NULL;
struct stat stat_buf;
/*
* Make sure that "dir" exists and is a directory.
*/
if (stat(dir, &stat_buf) < 0) {
error("Domain socket directory %s: %m", dir);
return -1;
} else if (!S_ISDIR(stat_buf.st_mode)) {
error("%s is not a directory", dir);
return -1;
}
/*
* Now build the name of socket, and create the socket.
*/
xstrfmtcat(name, "%s/%s_%u.%u", dir, nodename, step_id->job_id,
step_id->step_id);
if (step_id->step_het_comp != NO_VAL)
xstrfmtcat(name, ".%u", step_id->step_het_comp);
/*
* First check to see if the named socket already exists.
*/
if (stat(name, &stat_buf) == 0) {
/* Vestigial from a slurmd crash or job requeue that did not
* happen properly (very rare conditions). Unlink the file
* and recreate it.
*/
if (unlink(name) != 0) {
error("%s: failed unlink(%s): %m",
__func__, name);
xfree(name);
errno = ESLURMD_STEP_EXISTS;
return -1;
}
}
fd = _create_socket(name);
if (fd < 0)
fatal("Could not create domain socket: %m");
if (chmod(name, 0777) == -1)
error("%s: chmod(%s): %m", __func__, name);
socket_name = name;
return fd;
}
static void
_domain_socket_destroy(int fd)
{
if (close(fd) < 0)
error("Unable to close domain socket: %m");
if (unlink(socket_name) == -1)
error("Unable to unlink domain socket `%s`: %m", socket_name);
}
/* Wait for the job to be running (pids added) before continuing. */
static int _wait_for_job_running(stepd_step_rec_t *step)
{
struct timespec ts = {0, 0};
int count = 0;
int rc = SLURM_SUCCESS;
slurm_mutex_lock(&step->state_mutex);
/*
* SLURMSTEPD_STEP_RUNNING is 2 so we need loop at least that
* many times, but we don't want to loop any more than that.
*/
while ((step->state < SLURMSTEPD_STEP_RUNNING) && (count < 2)) {
ts.tv_sec = time(NULL) + 60;
slurm_cond_timedwait(&step->state_cond,
&step->state_mutex, &ts);
count++;
}
if (step->state < SLURMSTEPD_STEP_RUNNING) {
debug("%ps not running yet %d [cont_id:%"PRIu64"]",
&step->step_id, step->state, step->cont_id);
rc = ESLURMD_STEP_NOTRUNNING;
}
slurm_mutex_unlock(&step->state_mutex);
return rc;
}
static void *
_msg_thr_internal(void *step_arg)
{
stepd_step_rec_t *step = step_arg;
debug("Message thread started pid = %lu", (unsigned long) getpid());
eio_handle_mainloop(step->msg_handle);
debug("Message thread exited");
return NULL;
}
int
msg_thr_create(stepd_step_rec_t *step)
{
int fd;
eio_obj_t *eio_obj;
errno = 0;
fd = _domain_socket_create(conf->spooldir, conf->node_name,
&step->step_id);
if (fd == -1)
return SLURM_ERROR;
fd_set_nonblocking(fd);
eio_obj = eio_obj_create(fd, &msg_socket_ops, (void *)step);
step->msg_handle = eio_handle_create(0);
eio_new_initial_obj(step->msg_handle, eio_obj);
slurm_thread_create(&step->msgid, _msg_thr_internal, step);
return SLURM_SUCCESS;
}
/*
* Bounded wait for the connection count to drop to zero.
* This gives connection threads a chance to complete any pending
* RPCs before the slurmstepd exits.
*/
static void _wait_for_connections(void)
{
struct timespec ts = {0, 0};
int rc = 0;
slurm_mutex_lock(&message_lock);
ts.tv_sec = time(NULL) + STEPD_MESSAGE_COMP_WAIT;
while (message_connections > 0 && rc == 0)
rc = pthread_cond_timedwait(&message_cond, &message_lock, &ts);
slurm_mutex_unlock(&message_lock);
}
static void _decrement_message_connections(void)
{
slurm_mutex_lock(&message_lock);
message_connections--;
slurm_cond_signal(&message_cond);
slurm_mutex_unlock(&message_lock);
}
static bool
_msg_socket_readable(eio_obj_t *obj)
{
debug3("Called _msg_socket_readable");
if (obj->shutdown == true) {
/* All spawned tasks have been completed by this point */
if (obj->fd != -1) {
debug2(" false, shutdown");
_domain_socket_destroy(obj->fd);
/* slurmd considers the job step done now that
* the domain name socket is destroyed */
obj->fd = -1;
_wait_for_connections();
} else {
debug2(" false");
}
return false;
}
return true;
}
static int _msg_socket_accept(eio_obj_t *obj, list_t *objs)
{
stepd_step_rec_t *step = obj->arg;
int fd;
struct sockaddr_un addr;
int len = sizeof(addr);
struct request_params *param = NULL;
debug3("Called _msg_socket_accept");
while ((fd = accept4(obj->fd, (struct sockaddr *) &addr,
(socklen_t *) &len, SOCK_CLOEXEC)) < 0) {
if (errno == EINTR)
continue;
if ((errno == EAGAIN) ||
(errno == ECONNABORTED) ||
(errno == EWOULDBLOCK)) {
return SLURM_SUCCESS;
}
error("Error on msg accept socket: %m");
if ((errno == EMFILE) ||
(errno == ENFILE) ||
(errno == ENOBUFS) ||
(errno == ENOMEM)) {
return SLURM_SUCCESS;
}
obj->shutdown = true;
return SLURM_SUCCESS;
}
slurm_mutex_lock(&message_lock);
message_connections++;
slurm_mutex_unlock(&message_lock);
fd_set_blocking(fd);
param = xmalloc(sizeof(struct request_params));
param->fd = fd;
param->step = step;
slurm_thread_create_detached(_handle_accept, param);
debug3("Leaving _msg_socket_accept");
return SLURM_SUCCESS;
}
static void *_handle_accept(void *arg)
{
struct request_params *param = arg;
int fd = param->fd;
stepd_step_rec_t *step = param->step;
int req;
int client_protocol_ver;
buf_t *buffer = NULL;
int rc;
uid_t uid;
pid_t remote_pid = NO_VAL;
debug3("%s: entering (new thread)", __func__);
xfree(arg);
safe_read(fd, &req, sizeof(int));
if (req >= SLURM_MIN_PROTOCOL_VERSION) {
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__)
gid_t tmp_gid;
rc = getpeereid(fd, &uid, &tmp_gid);
#else
struct ucred ucred;
socklen_t len = sizeof(ucred);
rc = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &ucred, &len);
uid = ucred.uid;
remote_pid = ucred.pid;
#endif
if (rc)
goto fail;
client_protocol_ver = req;
} else {
error("%s: Invalid Protocol Version %d", __func__, req);
goto fail;
}
debug3("%s: Protocol Version %d from uid=%u",
__func__, client_protocol_ver, uid);
rc = SLURM_PROTOCOL_VERSION;
safe_write(fd, &rc, sizeof(int));
while (1) {
rc = _handle_request(fd, step, uid, remote_pid);
if (rc != SLURM_SUCCESS)
break;
}
if (close(fd) == -1)
error("Closing accepted fd: %m");
debug3("Leaving %s", __func__);
_decrement_message_connections();
return NULL;
fail:
rc = SLURM_ERROR;
safe_write(fd, &rc, sizeof(int));
rwfail:
if (close(fd) == -1)
error("Closing accepted fd after error: %m");
debug("Leaving %s on an error", __func__);
FREE_NULL_BUFFER(buffer);
_decrement_message_connections();
return NULL;
}
/*
* NOTE: reply must be in sync with corresponding rpc handling in slurmd.
*/
static int _handle_stepmgr_relay_msg(int fd,
uid_t uid,
slurm_msg_t *msg,
uint16_t msg_type,
bool reply)
{
int rc;
buf_t *buffer;
char *data = NULL;
uint16_t protocol_version;
uint32_t data_size;
return_code_msg_t rc_msg = { 0 };
safe_read(fd, &protocol_version, sizeof(uint16_t));
safe_read(fd, &data_size, sizeof(uint32_t));
data = xmalloc(data_size);
safe_read(fd, data, data_size);
slurm_msg_t_init(msg);
msg->msg_type = msg_type;
msg->protocol_version = protocol_version;
buffer = create_buf(data, data_size);
rc = unpack_msg(msg, buffer);
FREE_NULL_BUFFER(buffer);
if (rc) {
goto done;
}
if (!_slurm_authorized_user(uid)) {
error("Security violation, %s RPC from uid=%u",
rpc_num2string(msg->msg_type), uid);
rc = ESLURM_USER_ID_MISSING;
goto done;
}
if (!job_step_ptr) {
error("%s on a non-step mgr stepd",
rpc_num2string(msg->msg_type));
rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
goto done;
}
done:
if (rc) {
if (reply) {
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, msg,
RESPONSE_SLURM_RC,
&rc_msg);
}
slurm_free_msg_members(msg);
}
return rc;
rwfail:
xfree(data);
return SLURM_ERROR;
}
static int _handle_step_create(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
job_step_create_request_msg_t *req_step_msg;
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_JOB_STEP_CREATE, true)))
goto done;
req_step_msg = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
msg.auth_uid = req_step_msg->user_id = job_step_ptr->user_id;
msg.auth_ids_set = true;
/* step_create_from_msg responds to the client */
step_create_from_msg(&msg, fd, NULL, NULL);
slurm_mutex_unlock(&stepmgr_mutex);
slurm_free_msg_members(&msg);
return SLURM_SUCCESS;
done:
return SLURM_ERROR;
}
static int _handle_job_step_get_info(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
buf_t *buffer;
slurm_msg_t msg;
job_step_info_request_msg_t *request;
pack_step_args_t args = {0};
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_JOB_STEP_INFO, true)))
goto done;
request = msg.data;
buffer = init_buf(BUF_SIZE);
slurm_mutex_lock(&stepmgr_mutex);
args.step_id = &request->step_id,
args.steps_packed = 0,
args.buffer = buffer,
args.proto_version = msg.protocol_version,
args.job_step_list = job_step_ptr->step_list,
args.pack_job_step_list_func = pack_ctld_job_step_info,
pack_job_step_info_response_msg(&args);
slurm_mutex_unlock(&stepmgr_mutex);
(void) stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_JOB_STEP_INFO,
buffer);
FREE_NULL_BUFFER(buffer);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_cancel_job_step(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
job_step_kill_msg_t *request;
return_code_msg_t rc_msg = { 0 };
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_CANCEL_JOB_STEP, true)))
goto done;
request = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
rc = job_step_signal(&request->step_id, request->signal,
request->flags, uid);
slurm_mutex_unlock(&stepmgr_mutex);
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_SLURM_RC, &rc_msg);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_srun_job_complete(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
/*
* We currently don't need anything in the message
* srun_job_complete_msg_t *request;
*/
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg, SRUN_JOB_COMPLETE,
false)))
goto done;
slurm_mutex_lock(&stepmgr_mutex);
srun_job_complete(job_step_ptr);
slurm_mutex_unlock(&stepmgr_mutex);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_srun_node_fail(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
srun_node_fail_msg_t *request;
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg, SRUN_NODE_FAIL,
false)))
goto done;
request = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
srun_node_fail(job_step_ptr, request->nodelist);
slurm_mutex_unlock(&stepmgr_mutex);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_srun_timeout(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
/*
* We currently don't need anything in the message
* srun_timeout_msg_t *request;
*/
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg, SRUN_TIMEOUT,
false)))
goto done;
slurm_mutex_lock(&stepmgr_mutex);
srun_timeout(job_step_ptr);
slurm_mutex_unlock(&stepmgr_mutex);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_update_step(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
step_update_request_msg_t *request;
return_code_msg_t rc_msg = { 0 };
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_UPDATE_JOB_STEP, true)))
goto done;
request = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
rc = update_step(request, uid);
slurm_mutex_unlock(&stepmgr_mutex);
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_SLURM_RC, &rc_msg);
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_step_layout(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
slurm_step_id_t *request;
slurm_step_layout_t *step_layout = NULL;
return_code_msg_t rc_msg = { 0 };
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg, REQUEST_STEP_LAYOUT,
true)))
goto done;
request = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
rc = stepmgr_get_step_layouts(job_step_ptr, request, &step_layout);
slurm_mutex_unlock(&stepmgr_mutex);
if (!rc) {
(void) stepd_proxy_send_resp_to_slurmd(fd, &msg,
RESPONSE_STEP_LAYOUT,
step_layout);
slurm_step_layout_destroy(step_layout);
} else {
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_SLURM_RC,
&rc_msg);
}
slurm_free_msg_members(&msg);
done:
return rc;
}
static int _handle_job_sbcast_cred(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
step_alloc_info_msg_t *request;
job_sbcast_cred_msg_t *job_info_resp_msg = NULL;
return_code_msg_t rc_msg = { 0 };
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_JOB_SBCAST_CRED, true)))
goto done;
request = msg.data;
slurm_mutex_lock(&stepmgr_mutex);
rc = stepmgr_get_job_sbcast_cred_msg(job_step_ptr, &request->step_id,
msg.protocol_version,
&job_info_resp_msg);
slurm_mutex_unlock(&stepmgr_mutex);
if (rc)
goto resp;
(void) stepd_proxy_send_resp_to_slurmd(fd, &msg,
RESPONSE_JOB_SBCAST_CRED,
job_info_resp_msg);
slurm_free_sbcast_cred_msg(job_info_resp_msg);
slurm_free_msg_members(&msg);
return rc;
resp:
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_SLURM_RC, &rc_msg);
slurm_free_msg_members(&msg);
done:
return rc;
}
static void _het_job_alloc_list_del(void *x)
{
resource_allocation_response_msg_t *job_info_resp_msg = x;
slurm_free_resource_allocation_response_msg(job_info_resp_msg);
}
static int _handle_het_job_alloc_info(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc;
slurm_msg_t msg;
job_alloc_info_msg_t *request;
resource_allocation_response_msg_t *job_info_resp_msg = NULL;
list_t *resp_list = NULL;
return_code_msg_t rc_msg = { 0 };
if ((rc = _handle_stepmgr_relay_msg(fd, uid, &msg,
REQUEST_HET_JOB_ALLOC_INFO, true)))
goto done;
request = msg.data;
if (request->job_id != job_step_ptr->job_id) {
error("attempting to get job information for jobid %u from a different stepmgr jobid %u: %s RPC from uid=%u",
request->job_id, job_step_ptr->job_id,
rpc_num2string(msg.msg_type), uid);
rc = ESLURM_INVALID_JOB_ID;
goto resp;
}
slurm_mutex_lock(&stepmgr_mutex);
resp_list = list_create(_het_job_alloc_list_del);
job_info_resp_msg = build_job_info_resp(job_step_ptr);
list_append(resp_list, job_info_resp_msg);
slurm_mutex_unlock(&stepmgr_mutex);
(void) stepd_proxy_send_resp_to_slurmd(fd, &msg,
RESPONSE_HET_JOB_ALLOCATION,
resp_list);
FREE_NULL_LIST(resp_list);
slurm_free_msg_members(&msg);
return rc;
resp:
rc_msg.return_code = rc;
stepd_proxy_send_resp_to_slurmd(fd, &msg, RESPONSE_SLURM_RC, &rc_msg);
slurm_free_msg_members(&msg);
done:
return rc;
}
int _handle_request(int fd, stepd_step_rec_t *step, uid_t uid, pid_t remote_pid)
{
int rc = SLURM_SUCCESS;
int req;
debug3("%s: entering", __func__);
if ((rc = read(fd, &req, sizeof(int))) != sizeof(int)) {
if (rc == 0) { /* EOF, normal */
return -1;
} else {
debug3("%s: leaving on read error: %m", __func__);
return SLURM_ERROR;
}
}
switch (req) {
case REQUEST_SIGNAL_CONTAINER:
debug("Handling REQUEST_SIGNAL_CONTAINER");
rc = _handle_signal_container(fd, step, uid);
break;
case REQUEST_STATE:
debug("Handling REQUEST_STATE");
rc = _handle_state(fd, step);
break;
case REQUEST_STEP_MEM_LIMITS:
debug("Handling REQUEST_STEP_MEM_LIMITS");
rc = _handle_mem_limits(fd, step);
break;
case REQUEST_STEP_UID:
debug("Handling REQUEST_STEP_UID");
rc = _handle_uid(fd, step);
break;
case REQUEST_STEP_NODEID:
debug("Handling REQUEST_STEP_NODEID");
rc = _handle_nodeid(fd, step);
break;
case REQUEST_ATTACH:
debug("Handling REQUEST_ATTACH");
rc = _handle_attach(fd, step, uid);
break;
case REQUEST_PID_IN_CONTAINER:
debug("Handling REQUEST_PID_IN_CONTAINER");
rc = _handle_pid_in_container(fd, step);
break;
case REQUEST_DAEMON_PID:
debug("Handling REQUEST_DAEMON_PID");
rc = _handle_daemon_pid(fd, step);
break;
case REQUEST_STEP_SUSPEND:
debug("Handling REQUEST_STEP_SUSPEND");
rc = _handle_suspend(fd, step, uid);
break;
case REQUEST_STEP_RESUME:
debug("Handling REQUEST_STEP_RESUME");
rc = _handle_resume(fd, step, uid);
break;
case REQUEST_STEP_TERMINATE:
debug("Handling REQUEST_STEP_TERMINATE");
rc = _handle_terminate(fd, step, uid);
break;
case REQUEST_STEP_COMPLETION:
debug("Handling REQUEST_STEP_COMPLETION");
rc = _handle_completion(fd, step, uid);
break;
case REQUEST_STEP_TASK_INFO:
debug("Handling REQUEST_STEP_TASK_INFO");
rc = _handle_task_info(fd, step);
break;
case REQUEST_STEP_STAT:
debug("Handling REQUEST_STEP_STAT");
rc = _handle_stat_jobacct(fd, step, uid);
break;
case REQUEST_STEP_LIST_PIDS:
debug("Handling REQUEST_STEP_LIST_PIDS");
rc = _handle_list_pids(fd, step);
break;
case REQUEST_STEP_RECONFIGURE:
debug("Handling REQUEST_STEP_RECONFIGURE");
rc = _handle_reconfig(fd, step, uid);
break;
case REQUEST_JOB_STEP_CREATE:
debug("Handling REQUEST_STEP_CREATE");
rc = _handle_step_create(fd, step, uid);
break;
case REQUEST_JOB_STEP_INFO:
rc = _handle_job_step_get_info(fd, step, uid);
break;
case REQUEST_JOB_NOTIFY:
debug("Handling REQUEST_JOB_NOTIFY");
rc = _handle_notify_job(fd, step, uid);
break;
case REQUEST_ADD_EXTERN_PID:
debug("Handling REQUEST_ADD_EXTERN_PID");
rc = _handle_add_extern_pid(fd, step, uid);
break;
case REQUEST_X11_DISPLAY:
debug("Handling REQUEST_X11_DISPLAY");
rc = _handle_x11_display(fd, step);
break;
case REQUEST_GETPW:
debug("Handling REQUEST_GETPW");
rc = _handle_getpw(fd, step, remote_pid);
break;
case REQUEST_GETGR:
debug("Handling REQUEST_GETGR");
rc = _handle_getgr(fd, step, remote_pid);
break;
case REQUEST_GET_NS_FD:
debug("Handling REQUEST_GET_NS_FD");
rc = _handle_get_ns_fd(fd, step);
break;
case REQUEST_GETHOST:
debug("Handling REQUEST_GETHOST");
rc = _handle_gethost(fd, step, remote_pid);
break;
case REQUEST_CANCEL_JOB_STEP:
debug("Handling REQUEST_CANCEL_JOB_STEP");
rc = _handle_cancel_job_step(fd, step, uid);
break;
case SRUN_JOB_COMPLETE:
_handle_srun_job_complete(fd, step, uid);
break;
case SRUN_NODE_FAIL:
_handle_srun_node_fail(fd, step, uid);
break;
case SRUN_TIMEOUT:
_handle_srun_timeout(fd, step, uid);
break;
case REQUEST_UPDATE_JOB_STEP:
debug("Handling REQUEST_UPDATE_JOB_STEP");
rc = _handle_update_step(fd, step, uid);
break;
case REQUEST_STEP_LAYOUT:
_handle_step_layout(fd, step, uid);
break;
case REQUEST_JOB_SBCAST_CRED:
_handle_job_sbcast_cred(fd, step, uid);
break;
case REQUEST_HET_JOB_ALLOC_INFO:
_handle_het_job_alloc_info(fd, step, uid);
break;
default:
error("Unrecognized request: %d", req);
rc = SLURM_ERROR;
break;
}
debug3("%s: leaving with rc: %d", __func__, rc);
return rc;
}
static int
_handle_state(int fd, stepd_step_rec_t *step)
{
safe_write(fd, &step->state, sizeof(slurmstepd_state_t));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_mem_limits(int fd, stepd_step_rec_t *step)
{
safe_write(fd, &step->job_mem, sizeof(uint64_t));
safe_write(fd, &step->step_mem, sizeof(uint64_t));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_uid(int fd, stepd_step_rec_t *step)
{
safe_write(fd, &step->uid, sizeof(uid_t));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_nodeid(int fd, stepd_step_rec_t *step)
{
safe_write(fd, &step->nodeid, sizeof(uid_t));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_signal_container(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int errnum = 0;
int sig, flag, details_len;
char *details = NULL;
uid_t req_uid;
static int msg_sent = 0;
stepd_step_task_info_t *task;
uint32_t i;
safe_read(fd, &sig, sizeof(int));
safe_read(fd, &flag, sizeof(int));
safe_read(fd, &details_len, sizeof(int));
if (details_len)
details = xmalloc(details_len + 1);
safe_read(fd, details, details_len);
safe_read(fd, &req_uid, sizeof(uid_t));
debug("_handle_signal_container for %ps uid=%u signal=%d flag=0x%x",
&step->step_id, req_uid, sig, flag);
/* verify uid off uid instead of req_uid as we can trust that one */
if ((uid != step->uid) && !_slurm_authorized_user(uid)) {
error("signal container req from uid %u for %ps owned by uid %u",
req_uid, &step->step_id, step->uid);
rc = -1;
errnum = EPERM;
goto done;
}
if (flag & KILL_NO_SIG_FAIL)
step->flags |= LAUNCH_NO_SIG_FAIL;
/*
* Sanity checks
*/
if ((errnum = _wait_for_job_running(step)) != SLURM_SUCCESS) {
rc = -1;
goto done;
}
if ((sig == SIGTERM) || (sig == SIGKILL)) {
/* cycle thru the tasks and mark those that have not
* called abort and/or terminated as killed_by_cmd
*/
for (i = 0; i < step->node_tasks; i++) {
if (NULL == (task = step->task[i])) {
continue;
}
if (task->aborted || task->exited) {
continue;
}
/* mark that this task is going to be killed by
* cmd so we ignore its exit status - otherwise,
* we will probably report the final exit status
* as SIGKILL
*/
task->killed_by_cmd = true;
}
}
if ((step->step_id.step_id != SLURM_EXTERN_CONT) &&
(step->nodeid == msg_target_node_id) && (msg_sent == 0) &&
(step->state < SLURMSTEPD_STEP_ENDING)) {
time_t now = time(NULL);
char entity[45], time_str[256];
if (step->step_id.step_id == SLURM_BATCH_SCRIPT) {
snprintf(entity, sizeof(entity), "JOB %u",
step->step_id.job_id);
} else {
char tmp_char[33];
log_build_step_id_str(&step->step_id, tmp_char,
sizeof(tmp_char),
STEP_ID_FLAG_NO_PREFIX);
snprintf(entity, sizeof(entity), "STEP %s", tmp_char);
}
slurm_make_time_str(&now, time_str, sizeof(time_str));
/*
* Not really errors,
* but we want messages displayed by default
*/
if (sig == SIG_TIME_LIMIT) {
error("*** %s ON %s CANCELLED AT %s DUE TO TIME LIMIT ***",
entity, step->node_name, time_str);
msg_sent = 1;
} else if (sig == SIG_PREEMPTED) {
error("*** %s ON %s CANCELLED AT %s DUE TO PREEMPTION ***",
entity, step->node_name, time_str);
msg_sent = 1;
} else if (sig == SIG_NODE_FAIL) {
error("*** %s ON %s CANCELLED AT %s DUE TO NODE "
"FAILURE, SEE SLURMCTLD LOG FOR DETAILS ***",
entity, step->node_name, time_str);
msg_sent = 1;
} else if (sig == SIG_REQUEUED) {
error("*** %s ON %s CANCELLED AT %s DUE TO JOB REQUEUE ***",
entity, step->node_name, time_str);
msg_sent = 1;
} else if (sig == SIG_FAILURE) {
error("*** %s ON %s FAILED (non-zero exit code or other "
"failure mode) ***",
entity, step->node_name);
msg_sent = 1;
} else if (sig == SIG_UME) {
error("*** %s ON %s UNCORRECTABLE MEMORY ERROR AT %s ***",
entity, step->node_name, time_str);
} else if ((sig == SIGTERM) || (sig == SIGKILL)) {
error("*** %s ON %s CANCELLED AT %s DUE to SIGNAL %s ***",
entity, step->node_name, time_str,
strsignal(sig));
msg_sent = 1;
} else if (sig == SIG_TERM_KILL) {
error("*** %s ON %s CANCELLED AT %s DUE TO TASK FAILURE ***",
entity, step->node_name, time_str);
msg_sent = 1;
}
if (details)
error("*** REASON: %s ***", details);
}
if ((sig == SIG_TIME_LIMIT) || (sig == SIG_NODE_FAIL) ||
(sig == SIG_PREEMPTED) || (sig == SIG_FAILURE) ||
(sig == SIG_REQUEUED) || (sig == SIG_UME))
goto done;
if (sig == SIG_ABORT) {
sig = SIGKILL;
step->aborted = true;
}
slurm_mutex_lock(&suspend_mutex);
if (suspended && (sig != SIGKILL)) {
rc = -1;
errnum = ESLURMD_STEP_SUSPENDED;
slurm_mutex_unlock(&suspend_mutex);
goto done;
}
if (sig == SIG_DEBUG_WAKE) {
for (int i = 0; i < step->node_tasks; i++)
pdebug_wake_process(step, step->task[i]->pid);
slurm_mutex_unlock(&suspend_mutex);
goto done;
}
if (sig == SIG_TERM_KILL) {
(void) proctrack_g_signal(step->cont_id, SIGCONT);
(void) proctrack_g_signal(step->cont_id, SIGTERM);
sleep(slurm_conf.kill_wait);
sig = SIGKILL;
}
/*
* Specific handle for the batch container and some related flags.
*/
if (step->step_id.step_id == SLURM_BATCH_SCRIPT &&
((flag & KILL_JOB_BATCH) || (flag & KILL_FULL_JOB))) {
if (flag & KILL_FULL_JOB)
rc = killpg(step->pgid, sig);
else
rc = kill(step->pgid, sig);
if (rc < 0) {
error("%s: failed signal %d pid %u %ps %m",
__func__, sig, step->pgid, &step->step_id);
rc = SLURM_ERROR;
errnum = errno;
slurm_mutex_unlock(&suspend_mutex);
goto done;
}
verbose("%s: sent signal %d to pid %u %ps",
__func__, sig, step->pgid, &step->step_id);
rc = SLURM_SUCCESS;
errnum = 0;
slurm_mutex_unlock(&suspend_mutex);
goto done;
}
/*
* Signal the container
*/
if (proctrack_g_signal(step->cont_id, sig) < 0) {
rc = -1;
errnum = errno;
verbose("Error sending signal %d to %ps: %m",
sig, &step->step_id);
} else {
verbose("Sent signal %d to %ps", sig, &step->step_id);
}
slurm_mutex_unlock(&suspend_mutex);
if ((sig == SIGTERM) || (sig == SIGKILL))
set_job_state(step, SLURMSTEPD_STEP_CANCELLED);
done:
xfree(details);
/* Send the return code and errnum */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
rwfail:
xfree(details);
return SLURM_ERROR;
}
static int
_handle_notify_job(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int len;
char *message = NULL;
debug3("_handle_notify_job for %ps", &step->step_id);
safe_read(fd, &len, sizeof(int));
if (len) {
message = xmalloc(len + 1);
safe_read(fd, message, len);
}
debug3(" uid = %u", uid);
if ((uid != step->uid) && !_slurm_authorized_user(uid)) {
debug("notify req from uid %u for %ps owned by uid %u",
uid, &step->step_id, step->uid);
rc = EPERM;
goto done;
}
error("%s", message);
xfree(message);
done:
/* Send the return code */
safe_write(fd, &rc, sizeof(int));
xfree(message);
return SLURM_SUCCESS;
rwfail:
xfree(message);
return SLURM_ERROR;
}
static int
_handle_terminate(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int errnum = 0;
stepd_step_task_info_t *task;
uint32_t i;
if (uid != step->uid && !_slurm_authorized_user(uid)) {
debug("terminate req from uid %u for %ps owned by uid %u",
uid, &step->step_id, step->uid);
rc = -1;
errnum = EPERM;
goto done;
}
debug("_handle_terminate for %ps uid=%u", &step->step_id, uid);
step_terminate_monitor_start(step);
/*
* Sanity checks
*/
if ((errnum = _wait_for_job_running(step)) != SLURM_SUCCESS) {
rc = -1;
goto done;
}
/* cycle thru the tasks and mark those that have not
* called abort and/or terminated as killed_by_cmd
*/
for (i = 0; i < step->node_tasks; i++) {
if (NULL == (task = step->task[i])) {
continue;
}
if (task->aborted || task->exited) {
continue;
}
/* mark that this task is going to be killed by
* cmd so we ignore its exit status - otherwise,
* we will probably report the final exit status
* as SIGKILL
*/
task->killed_by_cmd = true;
}
/*
* Signal the container with SIGKILL
*/
slurm_mutex_lock(&suspend_mutex);
if (suspended) {
debug("Terminating suspended %ps", &step->step_id);
suspended = false;
}
if (proctrack_g_signal(step->cont_id, SIGKILL) < 0) {
if (errno != ESRCH) { /* No error if process already gone */
rc = -1;
errnum = errno;
}
verbose("Error sending SIGKILL signal to %ps: %m",
&step->step_id);
} else {
verbose("Sent SIGKILL signal to %ps", &step->step_id);
}
slurm_mutex_unlock(&suspend_mutex);
set_job_state(step, SLURMSTEPD_STEP_CANCELLED);
done:
/* Send the return code and errnum */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_attach(int fd, stepd_step_rec_t *step, uid_t uid)
{
srun_info_t *srun;
int rc = SLURM_SUCCESS;
uint32_t *gtids = NULL, *pids = NULL;
uint32_t key_len, cert_len;
int len, i;
debug("_handle_attach for %ps", &step->step_id);
srun = xmalloc(sizeof(srun_info_t));
debug("sizeof(srun_info_t) = %d, sizeof(slurm_addr_t) = %d",
(int) sizeof(srun_info_t), (int) sizeof(slurm_addr_t));
safe_read(fd, &cert_len, sizeof(uint32_t));
if (cert_len) {
srun->tls_cert = xmalloc(cert_len);
safe_read(fd, srun->tls_cert, cert_len);
}
safe_read(fd, &srun->ioaddr, sizeof(slurm_addr_t));
safe_read(fd, &srun->resp_addr, sizeof(slurm_addr_t));
safe_read(fd, &key_len, sizeof(uint32_t));
srun->key = xmalloc(key_len);
safe_read(fd, srun->key, key_len);
safe_read(fd, &srun->uid, sizeof(uid_t));
safe_read(fd, &srun->protocol_version, sizeof(uint16_t));
if (!srun->protocol_version)
srun->protocol_version = NO_VAL16;
/*
* Check if jobstep is actually running.
*/
if (step->state != SLURMSTEPD_STEP_RUNNING) {
rc = ESLURMD_STEP_NOTRUNNING;
goto done;
}
/*
* At the moment, it only makes sense for the slurmd to make this
* call, so only _slurm_authorized_user is allowed.
*/
if (!_slurm_authorized_user(uid)) {
error("uid %u attempt to attach to %ps owned by %u",
uid, &step->step_id, step->uid);
rc = EPERM;
goto done;
}
list_prepend(step->sruns, srun);
rc = io_client_connect(srun, step);
srun = NULL;
debug(" back from io_client_connect, rc = %d", rc);
done:
/* Send the return code */
safe_write(fd, &rc, sizeof(int));
debug(" in _handle_attach rc = %d", rc);
if (rc == SLURM_SUCCESS) {
/* Send response info */
debug(" in _handle_attach sending response info");
len = step->node_tasks * sizeof(uint32_t);
pids = xmalloc(len);
gtids = xmalloc(len);
if (step->task != NULL) {
for (i = 0; i < step->node_tasks; i++) {
if (step->task[i] == NULL)
continue;
pids[i] = (uint32_t)step->task[i]->pid;
gtids[i] = step->task[i]->gtid;
}
}
safe_write(fd, &step->node_tasks, sizeof(uint32_t));
safe_write(fd, pids, len);
safe_write(fd, gtids, len);
xfree(pids);
xfree(gtids);
for (i = 0; i < step->node_tasks; i++) {
if (step->task && step->task[i] &&
step->task[i]->argv) {
len = strlen(step->task[i]->argv[0]) + 1;
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->task[i]->argv[0], len);
} else {
len = 0;
safe_write(fd, &len, sizeof(int));
}
}
}
if (srun) {
xfree(srun->key);
xfree(srun);
}
return SLURM_SUCCESS;
rwfail:
if (srun) {
xfree(srun->key);
xfree(srun);
}
xfree(pids);
xfree(gtids);
return SLURM_ERROR;
}
static int
_handle_pid_in_container(int fd, stepd_step_rec_t *step)
{
bool rc = false;
pid_t pid;
debug("_handle_pid_in_container for %ps", &step->step_id);
safe_read(fd, &pid, sizeof(pid_t));
rc = proctrack_g_has_pid(step->cont_id, pid);
/* Send the return code */
safe_write(fd, &rc, sizeof(bool));
debug("Leaving _handle_pid_in_container");
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int _handle_get_ns_fd(int fd, stepd_step_rec_t *step)
{
int ns_fd = -1;
debug("%s: for job %u:%u",
__func__, step->step_id.job_id, step->step_id.step_id);
ns_fd = container_g_join_external(step->step_id.job_id);
/*
* We need to send the ns_fd as an int first to let the receiver know if
* we have a valid fd or not as receive_fd_over_socket() will always
* try to set up the fd no matter if it is valid or not.
*/
safe_write(fd, &ns_fd, sizeof(ns_fd));
if (ns_fd > 0)
send_fd_over_socket(fd, ns_fd);
debug("sent fd: %d", ns_fd);
debug("leaving %s", __func__);
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static void _block_on_pid(pid_t pid, stepd_step_rec_t *step)
{
struct timespec ts = { 0, 0 };
slurm_mutex_lock(&extern_thread_lock);
while (kill(pid, 0) != -1) {
if (step->state >= SLURMSTEPD_STEP_CANCELLED)
break;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
slurm_cond_timedwait(&extern_thread_cond, &extern_thread_lock,
&ts);
}
slurm_mutex_unlock(&extern_thread_lock);
}
/*
* Wait for the given pid and when it ends, get any children that the pid might
* have left behind. Then wait on these if so.
*/
static void *_wait_extern_pid(void *args)
{
extern_pid_t *extern_pid = args;
stepd_step_rec_t *step = extern_pid->step;
pid_t pid = extern_pid->pid;
jobacctinfo_t *jobacct = NULL;
pid_t *pids = NULL;
int npids = 0, i;
char proc_stat_file[256]; /* Allow ~20x extra length */
FILE *stat_fp = NULL;
int fd;
char sbuf[256], *tmp, state[1];
int num_read, ppid;
xfree(extern_pid);
//info("waiting on pid %d", pid);
_block_on_pid(pid, step);
//info("done with pid %d %d: %m", pid, rc);
jobacct = jobacct_gather_remove_task(pid);
if (jobacct) {
step->jobacct->energy.consumed_energy = 0;
jobacctinfo_aggregate(step->jobacct, jobacct);
jobacctinfo_destroy(jobacct);
}
acct_gather_profile_g_task_end(pid);
if (step->state >= SLURMSTEPD_STEP_CANCELLED)
goto end;
/*
* See if we have any children of the given pid left behind, and if
* found add them to track.
*/
proctrack_g_get_pids(step->cont_id, &pids, &npids);
for (i = 0; i < npids; i++) {
snprintf(proc_stat_file, 256, "/proc/%d/stat", pids[i]);
if (!(stat_fp = fopen(proc_stat_file, "r")))
continue; /* Assume the process went away */
/*
* If this pid is slurmstepd's pid (ourselves) or it is already
* tracked in the accounting, this is not an orphaned pid,
* so just ignore it.
*/
if ((getpid() == pids[i]) ||
jobacct_gather_stat_task(pids[i], false))
goto next_pid;
fd = fileno(stat_fp);
if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
error("%s: fcntl(%s): %m", __func__, proc_stat_file);
num_read = read(fd, sbuf, (sizeof(sbuf) - 1));
if (num_read <= 0)
goto next_pid;
sbuf[num_read] = '\0';
/* get to the end of cmd name */
tmp = strrchr(sbuf, ')');
if (tmp) {
*tmp = '\0'; /* replace trailing ')' with NULL */
/* skip space after ')' too */
sscanf(tmp + 2, "%c %d ", state, &ppid);
if (ppid == 1) {
debug2("adding tracking of orphaned process %d",
pids[i]);
_handle_add_extern_pid_internal(step, pids[i]);
}
}
next_pid:
fclose(stat_fp);
}
end:
xfree(pids);
return NULL;
}
static void _wait_extern_thr_create(extern_pid_t *extern_pid)
{
/* Lock as several RPC can write to the same variable. */
slurm_mutex_lock(&extern_thread_lock);
extern_thread_cnt++;
xrecalloc(extern_threads, extern_thread_cnt, sizeof(pthread_t));
slurm_thread_create(&extern_threads[extern_thread_cnt - 1],
_wait_extern_pid, extern_pid);
slurm_mutex_unlock(&extern_thread_lock);
}
static int _handle_add_extern_pid_internal(stepd_step_rec_t *step, pid_t pid)
{
extern_pid_t *extern_pid;
jobacct_id_t jobacct_id;
if (step->step_id.step_id != SLURM_EXTERN_CONT) {
error("%s: non-extern step (%u) given for job %u.",
__func__, step->step_id.step_id, step->step_id.job_id);
return SLURM_ERROR;
}
debug("%s: for %ps, pid %d", __func__, &step->step_id, pid);
extern_pid = xmalloc(sizeof(extern_pid_t));
extern_pid->step = step;
extern_pid->pid = pid;
/* track pid: add outside of the below thread so that the pam module
* waits until the parent pid is added, before letting the parent spawn
* any children. */
jobacct_id.taskid = step->nodeid; /* Treat node ID as global task ID */
jobacct_id.nodeid = step->nodeid;
jobacct_id.step = step;
if (proctrack_g_add(step, pid) != SLURM_SUCCESS) {
error("%s: Job %u can't add pid %d to proctrack plugin in the extern_step.",
__func__, step->step_id.job_id, pid);
return SLURM_ERROR;
}
if (task_g_add_pid(pid) != SLURM_SUCCESS) {
error("%s: Job %u can't add pid %d to task plugin in the extern_step.",
__func__, step->step_id.job_id, pid);
return SLURM_ERROR;
}
if (jobacct_gather_add_task(pid, &jobacct_id, 1) != SLURM_SUCCESS) {
error("%s: Job %u can't add pid %d to jobacct_gather plugin in the extern_step.",
__func__, step->step_id.job_id, pid);
return SLURM_ERROR;
}
if (xstrcasestr(slurm_conf.launch_params, "ulimit_pam_adopt"))
set_user_limits(step, pid);
/* spawn a thread that will wait on the pid given */
_wait_extern_thr_create(extern_pid);
return SLURM_SUCCESS;
}
static int _handle_add_extern_pid(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
pid_t pid;
slurm_mutex_lock(&step->state_mutex);
if (step->state >= SLURMSTEPD_STEP_CANCELLED) {
error("Rejecting request to add extern pid from uid %u because step is ending",
uid);
goto rwfail;
}
safe_read(fd, &pid, sizeof(pid_t));
if (!_slurm_authorized_user(uid)) {
error("uid %u attempt to add pid %u to %ps",
uid, pid, &step->step_id);
rc = SLURM_ERROR;
} else
rc = _handle_add_extern_pid_internal(step, pid);
/* Send the return code */
safe_write(fd, &rc, sizeof(int));
debug("Leaving _handle_add_extern_pid");
slurm_mutex_unlock(&step->state_mutex);
return SLURM_SUCCESS;
rwfail:
slurm_mutex_unlock(&step->state_mutex);
return SLURM_ERROR;
}
static int _handle_x11_display(int fd, stepd_step_rec_t *step)
{
int len = 0;
/* Send the display number. zero indicates no display setup */
safe_write(fd, &step->x11_display, sizeof(int));
if (step->x11_xauthority) {
/* include NUL termination in length */
len = strlen(step->x11_xauthority) + 1;
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->x11_xauthority, len);
} else {
safe_write(fd, &len, sizeof(int));
}
debug("Leaving _handle_get_x11_display");
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int _handle_getpw(int fd, stepd_step_rec_t *step, pid_t remote_pid)
{
uid_t uid;
int mode = 0;
int len = 0;
char *name = NULL;
bool pid_match, user_match = false;
int found = 0;
safe_read(fd, &mode, sizeof(int));
safe_read(fd, &uid, sizeof(uid_t));
safe_read(fd, &len, sizeof(int));
if (len) {
name = xmalloc(len + 1); /* add room for NUL */
safe_read(fd, name, len);
}
pid_match = proctrack_g_has_pid(step->cont_id, remote_pid);
if (uid == step->uid)
user_match = true;
else if (!xstrcmp(name, step->user_name))
user_match = true;
if (mode == GETPW_MATCH_USER_AND_PID)
found = (user_match && pid_match);
else if (mode == GETPW_MATCH_PID)
found = pid_match;
else if (mode == GETPW_MATCH_ALWAYS)
found = 1;
if (!step->user_name || !step->pw_gecos ||
!step->pw_dir || !step->pw_shell) {
error("%s: incomplete data, ignoring request", __func__);
found = 0;
}
safe_write(fd, &found, sizeof(int));
if (!found)
return SLURM_SUCCESS;
len = strlen(step->user_name);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->user_name, len);
len = 1;
safe_write(fd, &len, sizeof(int));
safe_write(fd, "*", len);
safe_write(fd, &step->uid, sizeof(uid_t));
safe_write(fd, &step->gid, sizeof(gid_t));
len = strlen(step->pw_gecos);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->pw_gecos, len);
len = strlen(step->pw_dir);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->pw_dir, len);
len = strlen(step->pw_shell);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->pw_shell, len);
debug2("Leaving %s", __func__);
return SLURM_SUCCESS;
rwfail:
xfree(name);
return SLURM_ERROR;
}
static int _send_one_struct_group(int fd, stepd_step_rec_t *step, int offset)
{
int len;
if (!step->gr_names[offset])
goto rwfail;
len = strlen(step->gr_names[offset]);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->gr_names[offset], len);
len = 1;
safe_write(fd, &len, sizeof(int));
safe_write(fd, "*", len);
safe_write(fd, &step->gids[offset], sizeof(gid_t));
len = strlen(step->user_name);
safe_write(fd, &len, sizeof(int));
safe_write(fd, step->user_name, len);
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int _handle_getgr(int fd, stepd_step_rec_t *step, pid_t remote_pid)
{
gid_t gid;
int mode = 0;
int len = 0;
char *name = NULL;
int offset = 0;
bool pid_match;
int found = 0;
safe_read(fd, &mode, sizeof(int));
safe_read(fd, &gid, sizeof(gid_t));
safe_read(fd, &len, sizeof(int));
if (len) {
name = xmalloc(len + 1); /* add room for NUL */
safe_read(fd, name, len);
}
pid_match = proctrack_g_has_pid(step->cont_id, remote_pid);
if (!step->ngids || !step->gids || !step->gr_names) {
error("%s: incomplete data, ignoring request", __func__);
} else if ((mode == GETGR_MATCH_GROUP_AND_PID) && pid_match) {
while (offset < step->ngids) {
if (gid == step->gids[offset])
break;
if (!xstrcmp(name, step->gr_names[offset]))
break;
offset++;
}
if (offset < step->ngids)
found = 1;
} else if (mode == GETGR_MATCH_PID) {
found = pid_match ? step->ngids : 0;
} else if (mode == GETGR_MATCH_ALWAYS) {
found = step->ngids;
}
safe_write(fd, &found, sizeof(int));
if (!found)
return SLURM_SUCCESS;
if (mode == GETGR_MATCH_GROUP_AND_PID) {
if (_send_one_struct_group(fd, step, offset))
goto rwfail;
} else {
for (int i = 0; i < step->ngids; i++) {
if (_send_one_struct_group(fd, step, i))
goto rwfail;
}
}
debug2("Leaving %s", __func__);
return SLURM_SUCCESS;
rwfail:
xfree(name);
return SLURM_ERROR;
}
static int _handle_gethost(int fd, stepd_step_rec_t *step, pid_t remote_pid)
{
int mode = 0;
int len = 0;
char *nodename = NULL;
char *nodename_r = NULL;
char *hostname = NULL;
bool pid_match;
int found = 0;
unsigned char address[sizeof(struct in6_addr)];
char *address_str = NULL;
int af = AF_UNSPEC;
slurm_addr_t addr;
safe_read(fd, &mode, sizeof(int));
safe_read(fd, &len, sizeof(int));
if (len) {
nodename = xmalloc(len + 1); /* add room for NULL */
safe_read(fd, nodename, len);
}
pid_match = proctrack_g_has_pid(step->cont_id, remote_pid);
if (!(mode & GETHOST_NOT_MATCH_PID) && !pid_match)
debug("%s: no pid_match", __func__);
else if (nodename && (!slurm_conf_get_addr(nodename, &addr, 0))) {
char *tmp_str;
found = 1;
if (addr.ss_family == AF_INET)
af = AF_INET;
else if (addr.ss_family == AF_INET6)
af = AF_INET6;
nodename_r = xstrdup(nodename);
hostname = xstrdup(nodename);
slurm_get_ip_str(&addr, (char *)address, INET6_ADDRSTRLEN);
tmp_str = xstrdup((char *)address);
inet_pton(af, tmp_str, &address);
xfree(tmp_str);
} else if (nodename &&
(address_str = slurm_conf_get_address(nodename))) {
if ((mode & GETHOST_IPV6) &&
(inet_pton(AF_INET6, address_str, &address) == 1)) {
found = 1;
af = AF_INET6;
} else if ((mode & GETHOST_IPV4) &&
(inet_pton(AF_INET, address_str, &address) == 1)) {
found = 1;
af = AF_INET;
}
if (found) {
if (!(nodename_r = slurm_conf_get_nodename(nodename)) ||
!(hostname = slurm_conf_get_hostname(nodename_r))) {
xfree(nodename_r);
xfree(hostname);
found = 0;
}
}
}
xfree(nodename);
safe_write(fd, &found, sizeof(int));
if (!found)
return SLURM_SUCCESS;
len = strlen(hostname);
safe_write(fd, &len, sizeof(int));
safe_write(fd, hostname, len);
len = 1;
safe_write(fd, &len, sizeof(int));
len = strlen(nodename_r);
safe_write(fd, &len, sizeof(int));
safe_write(fd, nodename_r, len);
safe_write(fd, &af, sizeof(int));
if (af == AF_INET6) {
len = 16;
safe_write(fd, &len, sizeof(int));
safe_write(fd, &address, len);
} else if (af == AF_INET) {
len = 4;
safe_write(fd, &len, sizeof(int));
safe_write(fd, &address, len);
} else {
error("Not supported address type: %u", af);
goto rwfail;
}
xfree(hostname);
xfree(nodename_r);
debug2("Leaving %s", __func__);
return SLURM_SUCCESS;
rwfail:
xfree(hostname);
xfree(nodename_r);
return SLURM_ERROR;
}
static int
_handle_daemon_pid(int fd, stepd_step_rec_t *step)
{
safe_write(fd, &step->jmgr_pid, sizeof(pid_t));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_suspend(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int errnum = 0;
char *tmp;
static uint32_t suspend_grace_time = NO_VAL;
debug("%s for %ps uid:%u", __func__, &step->step_id, uid);
if (!_slurm_authorized_user(uid)) {
debug("job step suspend request from uid %u for %ps",
uid, &step->step_id);
rc = -1;
errnum = EPERM;
goto done;
}
if ((errnum = _wait_for_job_running(step)) != SLURM_SUCCESS) {
rc = -1;
goto done;
}
acct_gather_suspend_poll();
/*
* Signal the container
*/
slurm_mutex_lock(&suspend_mutex);
if (suspended) {
rc = -1;
errnum = ESLURMD_STEP_SUSPENDED;
slurm_mutex_unlock(&suspend_mutex);
goto done;
} else {
if (suspend_grace_time == NO_VAL) {
char *suspend_grace_str = "suspend_grace_time=";
/* Set default suspend_grace_time */
suspend_grace_time = 2;
/*
* Overwrite default suspend grace time if set in
* slurm_conf
*/
if ((tmp = xstrcasestr(slurm_conf.preempt_params,
suspend_grace_str))) {
if (parse_uint32((tmp +
strlen(suspend_grace_str)),
&suspend_grace_time)) {
error("Could not parse '%s' Using default instead.",
tmp);
}
}
}
/* SIGTSTP is sent first to let MPI daemons stop their tasks,
* then wait 2 seconds, then send SIGSTOP to the spawned
* process's container to stop everything else.
*
* In some cases, 1 second has proven insufficient. Longer
* delays may help ensure that all MPI tasks have been stopped
* (that depends upon the MPI implementation used), but will
* also permit longer time periods when more than one job can
* be running on each resource (not good). */
if (proctrack_g_signal(step->cont_id, SIGTSTP) < 0) {
verbose("Error suspending %ps (SIGTSTP): %m",
&step->step_id);
} else
sleep(suspend_grace_time);
if (proctrack_g_signal(step->cont_id, SIGSTOP) < 0) {
verbose("Error suspending %ps (SIGSTOP): %m",
&step->step_id);
} else {
verbose("Suspended %ps", &step->step_id);
}
suspended = true;
}
slurm_mutex_unlock(&suspend_mutex);
done:
/* Send the return code and errno */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_resume(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int errnum = 0;
debug("%s for %ps uid:%u", __func__, &step->step_id, uid);
if (!_slurm_authorized_user(uid)) {
debug("job step resume request from uid %u for %ps",
uid, &step->step_id);
rc = -1;
errnum = EPERM;
goto done;
}
if ((errnum = _wait_for_job_running(step)) != SLURM_SUCCESS) {
rc = -1;
goto done;
}
acct_gather_resume_poll();
/*
* Signal the container
*/
slurm_mutex_lock(&suspend_mutex);
if (!suspended) {
rc = -1;
errnum = ESLURMD_STEP_NOTSUSPENDED;
slurm_mutex_unlock(&suspend_mutex);
goto done;
} else {
if (proctrack_g_signal(step->cont_id, SIGCONT) < 0) {
verbose("Error resuming %ps: %m", &step->step_id);
} else {
verbose("Resumed %ps", &step->step_id);
}
suspended = false;
}
/*
* Reset CPU frequencies if changed
*/
if ((step->cpu_freq_min != NO_VAL) || (step->cpu_freq_max != NO_VAL) ||
(step->cpu_freq_gov != NO_VAL))
cpu_freq_set(step);
// TODO: Reset TRES frequencies?
slurm_mutex_unlock(&suspend_mutex);
done:
/* Send the return code and errno */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
static int
_handle_completion(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int errnum = 0;
int first;
int last;
jobacctinfo_t *jobacct = NULL;
int step_rc;
char *buf = NULL;
int len;
buf_t *buffer = NULL;
bool lock_set = false, do_stepmgr = false;
uint32_t step_id;
debug("_handle_completion for %ps", &step->step_id);
debug3(" uid = %u", uid);
if (!_slurm_authorized_user(uid)) {
debug("step completion message from uid %u for %ps ",
uid, &step->step_id);
rc = -1;
errnum = EPERM;
/* Send the return code and errno */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
}
safe_read(fd, &first, sizeof(int));
safe_read(fd, &last, sizeof(int));
safe_read(fd, &step_rc, sizeof(int));
safe_read(fd, &step_id, sizeof(uint32_t));
safe_read(fd, &do_stepmgr, sizeof(bool));
/*
* We must not use getinfo over a pipe with slurmd here
* Indeed, slurmstepd does a large use of setinfo over a pipe
* with slurmd and doing the reverse can result in a deadlock
* scenario with slurmd :
* slurmd(lockforread,write)/slurmstepd(write,lockforread)
* Do pack/unpack instead to be sure of independances of
* slurmd and slurmstepd
*/
safe_read(fd, &len, sizeof(int));
buf = xmalloc(len);
safe_read(fd, buf, len);
buffer = create_buf(buf, len);
buf = NULL; /* Moved to data portion of "buffer", freed with that */
if (jobacctinfo_unpack(&jobacct, SLURM_PROTOCOL_VERSION,
PROTOCOL_TYPE_SLURM, buffer, 1) != SLURM_SUCCESS)
goto rwfail;
FREE_NULL_BUFFER(buffer);
if (do_stepmgr) {
slurm_mutex_lock(&stepmgr_mutex);
if (job_step_ptr) {
int rem = 0;
uint32_t max_rc;
slurm_step_id_t temp_id = {
.job_id = job_step_ptr->job_id,
.step_het_comp = NO_VAL,
.step_id = step_id
};
step_complete_msg_t req = {
.range_first = first,
.range_last = last,
.step_id = temp_id,
.step_rc = step_rc,
.jobacct = jobacct
};
step_partial_comp(&req, uid, true, &rem, &max_rc);
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
jobacctinfo_destroy(jobacct);
rc = SLURM_SUCCESS;
} else {
error("Asked to complete a stepmgr step but we don't have a job_step_ptr. This should never happen.");
rc = SLURM_ERROR;
}
slurm_mutex_unlock(&stepmgr_mutex);
return rc;
}
/*
* Record the completed nodes
*/
slurm_mutex_lock(&step_complete.lock);
lock_set = true;
if (!step_complete.wait_children) {
rc = -1;
errnum = ETIMEDOUT; /* not used anyway */
goto timeout;
}
/*
* SlurmUser or root can craft a launch without a valid credential
* ("srun --no-alloc ...") and no tree information can be built
* without the hostlist from the credential.
*/
if (step_complete.bits && (step_complete.rank >= 0)) {
int32_t set_bits;
int32_t first_bit = first - (step_complete.rank + 1);
int32_t last_bit = last - (step_complete.rank + 1);
/* bit_set_count_range is [first, end) so +1 last_bit */
int32_t last_bit_range = last_bit + 1;
#if 0
char bits_string[128];
debug2("Setting range %d (bit %d) through %d(bit %d)",
first, first_bit,
last, last_bit);
bit_fmt(bits_string, sizeof(bits_string), step_complete.bits);
debug2(" before bits: %s", bits_string);
#endif
if (!(set_bits = bit_set_count_range(step_complete.bits,
first_bit,
last_bit_range))) {
bit_nset(step_complete.bits, first_bit, last_bit);
} else if (set_bits == (last_bit_range - first_bit)) {
debug("Step complete from %d to %d was already processed on rank %d. Probably a RPC was resent from a child.",
first, last, step_complete.rank);
goto timeout;
} else {
error("Step complete from %d to %d was half-way processed on rank %d. This should never happen.",
first, last, step_complete.rank);
goto timeout;
}
#if 0
bit_fmt(bits_string, sizeof(bits_string), step_complete.bits);
debug2(" after bits: %s", bits_string);
#endif
}
step_complete.step_rc = MAX(step_complete.step_rc, step_rc);
/************* acct stuff ********************/
jobacctinfo_aggregate(step_complete.jobacct, jobacct);
timeout:
jobacctinfo_destroy(jobacct);
/*********************************************/
/*
* Send the return code and errno, we do this within the locked
* region to ensure that the stepd doesn't exit before we can
* perform this send.
*/
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
slurm_cond_signal(&step_complete.cond);
slurm_mutex_unlock(&step_complete.lock);
return SLURM_SUCCESS;
rwfail: if (lock_set) {
slurm_cond_signal(&step_complete.cond);
slurm_mutex_unlock(&step_complete.lock);
}
xfree(buf); /* In case of failure before moving to "buffer" */
FREE_NULL_BUFFER(buffer);
return SLURM_ERROR;
}
static int
_handle_stat_jobacct(int fd, stepd_step_rec_t *step, uid_t uid)
{
bool update_data = true;
jobacctinfo_t *jobacct = NULL;
jobacctinfo_t *temp_jobacct = NULL;
int num_tasks = 0;
uint64_t msg_timeout_us;
DEF_TIMERS;
START_TIMER;
debug("_handle_stat_jobacct for %ps", &step->step_id);
debug3(" uid = %u", uid);
if (uid != step->uid && !_slurm_authorized_user(uid)) {
debug("stat jobacct from uid %u for %ps owned by uid %u",
uid, &step->step_id, step->uid);
/* Send NULL */
jobacctinfo_setinfo(jobacct, JOBACCT_DATA_PIPE, &fd,
SLURM_PROTOCOL_VERSION);
return SLURM_ERROR;
}
jobacct = jobacctinfo_create(NULL);
debug3("num tasks = %d", step->node_tasks);
/*
* Extern step has pid = -1 so it would be skipped, deal with it
* differently
*/
if (step->step_id.step_id == SLURM_EXTERN_CONT) {
/*
* We only have one task in the extern step on each node,
* despite many pids may have been adopted.
*/
jobacct_gather_stat_all_task(jobacct);
jobacctinfo_aggregate(jobacct, step->jobacct);
num_tasks = 1;
} else {
for (int i = 0; i < step->node_tasks; i++) {
temp_jobacct =
jobacct_gather_stat_task(step->task[i]->pid,
update_data);
update_data = false;
if (temp_jobacct) {
jobacctinfo_aggregate(jobacct, temp_jobacct);
jobacctinfo_destroy(temp_jobacct);
num_tasks++;
}
}
}
jobacctinfo_setinfo(jobacct, JOBACCT_DATA_PIPE, &fd,
SLURM_PROTOCOL_VERSION);
safe_write(fd, &num_tasks, sizeof(int));
jobacctinfo_destroy(jobacct);
END_TIMER;
msg_timeout_us = ((uint64_t) slurm_conf.msg_timeout) * USEC_IN_SEC;
if (DELTA_TIMER > msg_timeout_us)
error("%s: Took %s, which is more than MessageTimeout (%us). The result won't be delivered",
__func__, TIME_STR, slurm_conf.msg_timeout);
else
debug("%s: Completed in %s", __func__, TIME_STR);
return SLURM_SUCCESS;
rwfail:
jobacctinfo_destroy(jobacct);
END_TIMER;
msg_timeout_us = ((uint64_t) slurm_conf.msg_timeout) * USEC_IN_SEC;
if (DELTA_TIMER > msg_timeout_us)
error("%s: Failed in %lus", __func__, DELTA_TIMER);
return SLURM_ERROR;
}
/* We don't check the uid in this function, anyone may list the task info. */
static int
_handle_task_info(int fd, stepd_step_rec_t *step)
{
stepd_step_task_info_t *task;
debug("_handle_task_info for %ps", &step->step_id);
safe_write(fd, &step->node_tasks, sizeof(uint32_t));
for (int i = 0; i < step->node_tasks; i++) {
task = step->task[i];
safe_write(fd, &task->id, sizeof(int));
safe_write(fd, &task->gtid, sizeof(uint32_t));
safe_write(fd, &task->pid, sizeof(pid_t));
safe_write(fd, &task->exited, sizeof(bool));
safe_write(fd, &task->estatus, sizeof(int));
}
return SLURM_SUCCESS;
rwfail:
return SLURM_ERROR;
}
/* We don't check the uid in this function, anyone may list the task info. */
static int
_handle_list_pids(int fd, stepd_step_rec_t *step)
{
pid_t *pids = NULL;
int npids = 0;
uint32_t pid;
debug("_handle_list_pids for %ps", &step->step_id);
proctrack_g_get_pids(step->cont_id, &pids, &npids);
safe_write(fd, &npids, sizeof(uint32_t));
for (int i = 0; i < npids; i++) {
pid = (uint32_t)pids[i];
safe_write(fd, &pid, sizeof(uint32_t));
}
if (npids > 0)
xfree(pids);
return SLURM_SUCCESS;
rwfail:
if (npids > 0)
xfree(pids);
return SLURM_ERROR;
}
static int
_handle_reconfig(int fd, stepd_step_rec_t *step, uid_t uid)
{
int rc = SLURM_SUCCESS;
int len;
buf_t *buffer = NULL;
int errnum = 0;
if (!_slurm_authorized_user(uid)) {
debug("job step reconfigure request from uid %u for %ps",
uid, &step->step_id);
rc = -1;
errnum = EPERM;
goto done;
}
/*
* Pull in any needed configuration changes.
* len = 0 indicates we're just going for a log rotate.
*/
safe_read(fd, &len, sizeof(int));
if (len) {
buffer = init_buf(len);
safe_read(fd, buffer->head, len);
unpack_stepd_reconf(buffer);
FREE_NULL_BUFFER(buffer);
}
/*
* We just want to make sure the file handle is correct on a
* reconfigure since the file could had rolled thus making the
* current fd incorrect.
*/
log_alter(conf->log_opts, SYSLOG_FACILITY_DAEMON, conf->logfile);
debug("_handle_reconfigure for %ps successful", &step->step_id);
done:
/* Send the return code and errno */
safe_write(fd, &rc, sizeof(int));
safe_write(fd, &errnum, sizeof(int));
return SLURM_SUCCESS;
rwfail:
FREE_NULL_BUFFER(buffer);
return SLURM_ERROR;
}
extern void wait_for_resumed(uint16_t msg_type)
{
for (int i = 0; ; i++) {
if (i)
sleep(1);
if (!suspended)
return;
if (i == 0) {
info("defer sending msg_type %u to suspended job",
msg_type);
}
}
}
extern void set_msg_node_id(stepd_step_rec_t *step)
{
char *ptr = getenvp(step->env, "SLURM_STEP_KILLED_MSG_NODE_ID");
if (ptr)
msg_target_node_id = atoi(ptr);
}
extern void join_extern_threads()
{
int thread_cnt;
slurm_mutex_lock(&extern_thread_lock);
slurm_cond_broadcast(&extern_thread_cond);
thread_cnt = extern_thread_cnt;
slurm_mutex_unlock(&extern_thread_lock);
for (int i = 0; i < thread_cnt; i++) {
debug2("Joining extern pid thread %d", i);
slurm_thread_join(extern_threads[i]);
}
slurm_mutex_lock(&extern_thread_lock);
xfree(extern_threads);
extern_thread_cnt = 0;
slurm_mutex_unlock(&extern_thread_lock);
debug2("Done joining extern pid threads");
}