blob: cdfe7e653a043dd64baf94ab0dbf9ebc418203aa [file] [log] [blame] [edit]
/*****************************************************************************\
* src/common/stepd_api.c - slurmstepd message API
* $Id$
*****************************************************************************
* Copyright (C) 2005-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Portions Copyright (C) 2008 Vijay Ramasubramanian
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Christopher Morrone <morrone2@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.schedmd.com/slurmdocs/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <dirent.h>
#include <inttypes.h>
#include <regex.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/pack.h"
#include "src/common/read_config.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_cred.h"
#include "src/common/slurm_jobacct_gather.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/stepd_api.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
static bool
_slurm_authorized_user()
{
uid_t uid, slurm_user_id;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
slurm_user_id = (uid_t)conf->slurm_user_id;
slurm_conf_unlock();
uid = getuid();
return ((uid == (uid_t)0) || (uid == slurm_user_id));
}
/*
* Should be called when a connect() to a socket returns ECONNREFUSED.
* Presumably the ECONNREFUSED means that nothing is attached to the listening
* side of the unix domain socket.
* If the socket is at least five minutes old, go ahead an unlink it.
*/
static void
_handle_stray_socket(const char *socket_name)
{
struct stat buf;
uid_t uid;
time_t now;
/* Only attempt to remove the stale socket if process is running
as root or the SlurmUser. */
if (!_slurm_authorized_user())
return;
if (stat(socket_name, &buf) == -1) {
debug3("_handle_stray_socket: unable to stat %s: %m",
socket_name);
return;
}
if ((uid = getuid()) != buf.st_uid) {
debug3("_handle_stray_socket: socket %s is not owned by uid %d",
socket_name, (int)uid);
return;
}
now = time(NULL);
if ((now-buf.st_mtime) > 300) {
/* remove the socket */
if (unlink(socket_name) == -1) {
if (errno != ENOENT) {
error("_handle_stray_socket: unable to clean up"
" stray socket %s: %m", socket_name);
}
} else {
debug("Cleaned up stray socket %s", socket_name);
}
}
}
static int
_step_connect(const char *directory, const char *nodename,
uint32_t jobid, uint32_t stepid)
{
int fd;
int len;
struct sockaddr_un addr;
char *name = NULL;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
debug("_step_connect: socket: %m");
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
xstrfmtcat(name, "%s/%s_%u.%u", directory, nodename, jobid, stepid);
strcpy(addr.sun_path, name);
len = strlen(addr.sun_path)+1 + sizeof(addr.sun_family);
if (connect(fd, (struct sockaddr *) &addr, len) < 0) {
if (errno == ECONNREFUSED) {
_handle_stray_socket(name);
} else {
debug("_step_connect: connect: %m");
}
xfree(name);
close(fd);
return -1;
}
xfree(name);
return fd;
}
static char *
_guess_nodename()
{
char host[256];
char *nodename = NULL;
if (gethostname_short(host, 256) != 0)
return NULL;
nodename = slurm_conf_get_nodename(host);
if (nodename == NULL)
nodename = slurm_conf_get_aliased_nodename();
if (nodename == NULL) /* if no match, try localhost */
nodename = slurm_conf_get_nodename("localhost");
return nodename;
}
/*
* Connect to a slurmstepd proccess by way of its unix domain socket.
*
* Both "directory" and "nodename" may be null, in which case stepd_connect
* will attempt to determine them on its own. If you are using multiple
* slurmd on one node (unusual outside of development environments), you
* will get one of the local NodeNames more-or-less at random.
*
* Returns a socket descriptor for the opened socket on success,
* and -1 on error.
*/
int
stepd_connect(const char *directory, const char *nodename,
uint32_t jobid, uint32_t stepid)
{
int req = REQUEST_CONNECT;
int fd = -1;
int rc;
void *auth_cred;
Buf buffer;
int len;
if (nodename == NULL) {
if (!(nodename = _guess_nodename()))
return -1;
}
if (directory == NULL) {
slurm_ctl_conf_t *cf;
cf = slurm_conf_lock();
directory = slurm_conf_expand_slurmd_path(
cf->slurmd_spooldir, nodename);
slurm_conf_unlock();
}
buffer = init_buf(0);
/* Create an auth credential */
auth_cred = g_slurm_auth_create(NULL, 2, NULL);
if (auth_cred == NULL) {
error("Creating authentication credential: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(NULL)));
slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
goto fail1;
}
/* Pack the auth credential */
rc = g_slurm_auth_pack(auth_cred, buffer);
(void) g_slurm_auth_destroy(auth_cred);
if (rc) {
error("Packing authentication credential: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred)));
slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
goto fail1;
}
/* Connect to the step */
fd = _step_connect(directory, nodename, jobid, stepid);
if (fd == -1)
goto fail1;
safe_write(fd, &req, sizeof(int));
len = size_buf(buffer);
safe_write(fd, &len, sizeof(int));
safe_write(fd, get_buf_data(buffer), len);
safe_read(fd, &rc, sizeof(int));
if (rc < 0) {
error("slurmstepd refused authentication: %m");
slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
goto rwfail;
}
free_buf(buffer);
return fd;
rwfail:
close(fd);
fail1:
free_buf(buffer);
return -1;
}
/*
* Retrieve a job step's current state.
*/
slurmstepd_state_t
stepd_state(int fd)
{
int req = REQUEST_STATE;
slurmstepd_state_t status = SLURMSTEPD_NOT_RUNNING;
safe_write(fd, &req, sizeof(int));
safe_read(fd, &status, sizeof(slurmstepd_state_t));
rwfail:
return status;
}
/*
* Retrieve slurmstepd_info_t structure for a job step.
*
* Must be xfree'd by the caller.
*/
slurmstepd_info_t *
stepd_get_info(int fd)
{
int req = REQUEST_INFO;
slurmstepd_info_t *step_info;
uint16_t protocol_version;
step_info = xmalloc(sizeof(slurmstepd_info_t));
safe_write(fd, &req, sizeof(int));
safe_read(fd, &step_info->uid, sizeof(uid_t));
safe_read(fd, &step_info->jobid, sizeof(uint32_t));
safe_read(fd, &step_info->stepid, sizeof(uint32_t));
safe_read(fd, &protocol_version, sizeof(uint16_t));
if (protocol_version >= SLURM_2_2_PROTOCOL_VERSION) {
safe_read(fd, &step_info->nodeid, sizeof(uint32_t));
safe_read(fd, &step_info->job_mem_limit, sizeof(uint32_t));
safe_read(fd, &step_info->step_mem_limit, sizeof(uint32_t));
} else {
step_info->nodeid = protocol_version << 16;
safe_read(fd, &protocol_version, sizeof(uint16_t));
step_info->nodeid |= protocol_version;
safe_read(fd, &step_info->job_mem_limit, sizeof(uint32_t));
step_info->step_mem_limit = step_info->job_mem_limit;
verbose("Old version slurmstepd for step %u.%u",
step_info->jobid, step_info->stepid);
}
return step_info;
rwfail:
xfree(step_info);
return NULL;
}
/*
* Send a signal to the process group of a job step.
*/
int
stepd_signal(int fd, int signal)
{
int req = REQUEST_SIGNAL_PROCESS_GROUP;
int rc;
safe_write(fd, &req, sizeof(int));
safe_write(fd, &signal, sizeof(int));
/* Receive the return code */
safe_read(fd, &rc, sizeof(int));
return rc;
rwfail:
return -1;
}
/*
* Send job notification message to a batch job
*/
int
stepd_notify_job(int fd, char *message)
{
int req = REQUEST_JOB_NOTIFY;
int rc;
safe_write(fd, &req, sizeof(int));
if (message) {
rc = strlen(message) + 1;
safe_write(fd, &rc, sizeof(int));
safe_write(fd, message, rc);
} else {
rc = 0;
safe_write(fd, &rc, sizeof(int));
}
/* Receive the return code */
safe_read(fd, &rc, sizeof(int));
return rc;
rwfail:
return -1;
}
/*
* Send a checkpoint request to all tasks of a job step.
*/
int
stepd_checkpoint(int fd, time_t timestamp, char *image_dir)
{
int req = REQUEST_CHECKPOINT_TASKS;
int rc;
safe_write(fd, &req, sizeof(int));
safe_write(fd, &timestamp, sizeof(time_t));
if (image_dir) {
rc = strlen(image_dir) + 1;
safe_write(fd, &rc, sizeof(int));
safe_write(fd, image_dir, rc);
} else {
rc = 0;
safe_write(fd, &rc, sizeof(int));
}
/* Receive the return code */
safe_read(fd, &rc, sizeof(int));
return rc;
rwfail:
return -1;
}
/*
* Send a signal to a single task in a job step.
*/
int
stepd_signal_task_local(int fd, int signal, int ltaskid)
{
int req = REQUEST_SIGNAL_TASK_LOCAL;
int rc;
safe_write(fd, &req, sizeof(int));
safe_write(fd, &signal, sizeof(int));
safe_write(fd, &ltaskid, sizeof(int));
/* Receive the return code */
safe_read(fd, &rc, sizeof(int));
return rc;
rwfail:
return -1;
}
/*
* Send a signal to the proctrack container of a job step.
*/
int
stepd_signal_container(int fd, int signal)
{
int req = REQUEST_SIGNAL_CONTAINER;
int rc;
int errnum = 0;
safe_write(fd, &req, sizeof(int));
safe_write(fd, &signal, sizeof(int));
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
}
/*
* Attach a client to a running job step.
*
* On success returns SLURM_SUCCESS and fills in resp->local_pids,
* resp->gtids, resp->ntasks, and resp->executable.
*/
int
stepd_attach(int fd, slurm_addr_t *ioaddr, slurm_addr_t *respaddr,
void *job_cred_sig, reattach_tasks_response_msg_t *resp)
{
int req = REQUEST_ATTACH;
int rc = SLURM_SUCCESS;
safe_write(fd, &req, sizeof(int));
safe_write(fd, ioaddr, sizeof(slurm_addr_t));
safe_write(fd, respaddr, sizeof(slurm_addr_t));
safe_write(fd, job_cred_sig, SLURM_IO_KEY_SIZE);
/* Receive the return code */
safe_read(fd, &rc, sizeof(int));
if (rc == SLURM_SUCCESS) {
/* Receive response info */
uint32_t ntasks;
int len, i;
safe_read(fd, &ntasks, sizeof(uint32_t));
resp->ntasks = ntasks;
len = ntasks * sizeof(uint32_t);
resp->local_pids = xmalloc(len);
safe_read(fd, resp->local_pids, len);
resp->gtids = xmalloc(len);
safe_read(fd, resp->gtids, len);
resp->executable_names =
(char **)xmalloc(sizeof(char *) * ntasks);
for (i = 0; i < ntasks; i++) {
safe_read(fd, &len, sizeof(int));
resp->executable_names[i] = (char *)xmalloc(len);
safe_read(fd, resp->executable_names[i], len);
}
}
return rc;
rwfail:
return SLURM_ERROR;
}
static void
_free_step_loc_t(step_loc_t *loc)
{
if (loc->directory)
xfree(loc->directory);
if (loc->nodename)
xfree(loc->nodename);
xfree(loc);
}
static int
_sockname_regex_init(regex_t *re, const char *nodename)
{
char *pattern = NULL;
xstrcat(pattern, "^");
xstrcat(pattern, nodename);
xstrcat(pattern, "_([[:digit:]]*)\\.([[:digit:]]*)$");
if (regcomp(re, pattern, REG_EXTENDED) != 0) {
error("sockname regex compilation failed");
return -1;
}
xfree(pattern);
return 0;
}
static int
_sockname_regex(regex_t *re, const char *filename,
uint32_t *jobid, uint32_t *stepid)
{
size_t nmatch = 5;
regmatch_t pmatch[5];
char *match;
memset(pmatch, 0, sizeof(regmatch_t)*nmatch);
if (regexec(re, filename, nmatch, pmatch, 0) == REG_NOMATCH) {
return -1;
}
match = strndup(filename + pmatch[1].rm_so,
(size_t)(pmatch[1].rm_eo - pmatch[1].rm_so));
*jobid = (uint32_t)atoll(match);
free(match);
match = strndup(filename + pmatch[2].rm_so,
(size_t)(pmatch[2].rm_eo - pmatch[2].rm_so));
*stepid = (uint32_t)atoll(match);
free(match);
return 0;
}
/*
* Scan for available running slurm step daemons by checking
* "directory" for unix domain sockets with names beginning in "nodename".
*
* Both "directory" and "nodename" may be null, in which case stepd_available
* will attempt to determine them on its own. If you are using multiple
* slurmd on one node (unusual outside of development environments), you
* will get one of the local NodeNames more-or-less at random.
*
* Returns a List of pointers to step_loc_t structures.
*/
List
stepd_available(const char *directory, const char *nodename)
{
List l;
DIR *dp;
struct dirent *ent;
regex_t re;
struct stat stat_buf;
if (nodename == NULL) {
if (!(nodename = _guess_nodename()))
return NULL;
}
if (directory == NULL) {
slurm_ctl_conf_t *cf;
cf = slurm_conf_lock();
directory = slurm_conf_expand_slurmd_path(
cf->slurmd_spooldir, nodename);
slurm_conf_unlock();
}
l = list_create((ListDelF) _free_step_loc_t);
if(_sockname_regex_init(&re, nodename) == -1)
goto done;
/*
* Make sure that "directory" exists and is a directory.
*/
if (stat(directory, &stat_buf) < 0) {
error("Domain socket directory %s: %m", directory);
goto done;
} else if (!S_ISDIR(stat_buf.st_mode)) {
error("%s is not a directory", directory);
goto done;
}
if ((dp = opendir(directory)) == NULL) {
error("Unable to open directory: %m");
goto done;
}
while ((ent = readdir(dp)) != NULL) {
step_loc_t *loc;
uint32_t jobid, stepid;
if (_sockname_regex(&re, ent->d_name, &jobid, &stepid) == 0) {
debug4("found jobid = %u, stepid = %u", jobid, stepid);
loc = xmalloc(sizeof(step_loc_t));
loc->directory = xstrdup(directory);
loc->nodename = xstrdup(nodename);
loc->jobid = jobid;
loc->stepid = stepid;
list_append(l, (void *)loc);
}
}
closedir(dp);
done:
regfree(&re);
return l;
}
/*
* Send the termination signal to all of the unix domain socket files
* for a given directory and nodename, and then unlink the files.
* Returns SLURM_ERROR if any sockets could not be unlinked.
*/
int
stepd_cleanup_sockets(const char *directory, const char *nodename)
{
DIR *dp;
struct dirent *ent;
regex_t re;
struct stat stat_buf;
int rc = SLURM_SUCCESS;
_sockname_regex_init(&re, nodename);
/*
* Make sure that "directory" exists and is a directory.
*/
if (stat(directory, &stat_buf) < 0) {
error("Domain socket directory %s: %m", directory);
goto done;
} else if (!S_ISDIR(stat_buf.st_mode)) {
error("%s is not a directory", directory);
goto done;
}
if ((dp = opendir(directory)) == NULL) {
error("Unable to open directory: %m");
goto done;
}
while ((ent = readdir(dp)) != NULL) {
uint32_t jobid, stepid;
if (_sockname_regex(&re, ent->d_name, &jobid, &stepid) == 0) {
char *path;
int fd;
path = NULL;
xstrfmtcat(path, "%s/%s", directory, ent->d_name);
verbose("Cleaning up stray job step %u.%u",
jobid, stepid);
/* signal the slurmstepd to terminate its step */
fd = stepd_connect((char *) directory, (char *) nodename,
jobid, stepid);
if (fd == -1) {
debug("Unable to connect to socket %s", path);
} else {
stepd_signal_container(fd, SIGKILL);
close(fd);
}
/* make sure that the socket has been removed */
if (unlink(path) == -1 && errno != ENOENT) {
error("Unable to clean up stray socket %s: %m",
path);
rc = SLURM_ERROR;
}
xfree(path);
}
}
closedir(dp);
done:
regfree(&re);
return rc;
}
/*
* Return true if the process with process ID "pid" is found in
* the proctrack container of the slurmstepd "step".
*/
bool
stepd_pid_in_container(int fd, pid_t pid)
{
int req = REQUEST_PID_IN_CONTAINER;
bool rc;
safe_write(fd, &req, sizeof(int));
safe_write(fd, &pid, sizeof(pid_t));
/* Receive the return code */
safe_read(fd, &rc, sizeof(bool));
debug("Leaving stepd_pid_in_container");
return rc;
rwfail:
return false;
}
/*
* Return the process ID of the slurmstepd.
*/
pid_t
stepd_daemon_pid(int fd)
{
int req = REQUEST_DAEMON_PID;
pid_t pid;
safe_write(fd, &req, sizeof(int));
safe_read(fd, &pid, sizeof(pid_t));
return pid;
rwfail:
return (pid_t)-1;
}
int
_step_suspend_write(int fd)
{
int req = REQUEST_STEP_SUSPEND;
safe_write(fd, &req, sizeof(int));
return 0;
rwfail:
return -1;
}
int
_step_suspend_read(int fd)
{
int rc, errnum = 0;
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
}
/*
* Suspend execution of the job step. Only root or SlurmUser is
* authorized to use this call. Since this activity includes a 'sleep 1'
* in the slurmstepd, initiate the "suspend" in parallel.
*
* Returns SLURM_SUCCESS is successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_suspend(int *fd, int size, uint32_t jobid)
{
int i;
int rc = 0;
for (i = 0; i < size; i++) {
debug2("Suspending job %u cached step count %d", jobid, i);
if (_step_suspend_write(fd[i]) < 0) {
debug(" suspend send failed: job %u (%d): %m",
jobid, i);
close(fd[i]);
fd[i] = -1;
rc = -1;
}
}
for (i = 0; i < size; i++) {
if (fd[i] == -1)
continue;
if (_step_suspend_read(fd[i]) < 0) {
debug(" resume failed for cached step count %d: %m",
i);
rc = -1;
}
}
return rc;
}
/*
* Resume execution of the job step that has been suspended by a
* call to stepd_suspend(). Only root or SlurmUser is
* authorized to use this call.
*
* Returns SLURM_SUCCESS is successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_resume(int fd)
{
int req = REQUEST_STEP_RESUME;
int rc;
int errnum = 0;
safe_write(fd, &req, sizeof(int));
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
}
/*
* Reconfigure the job step (Primarily to allow the stepd to refresh
* it's log file pointer.
*
* Returns SLURM_SUCCESS is successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_reconfig(int fd)
{
int req = REQUEST_STEP_RECONFIGURE;
int rc;
int errnum = 0;
safe_write(fd, &req, sizeof(int));
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
}
/*
* Terminate the job step.
*
* Returns SLURM_SUCCESS is successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_terminate(int fd)
{
int req = REQUEST_STEP_TERMINATE;
int rc;
int errnum = 0;
safe_write(fd, &req, sizeof(int));
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
}
/*
*
* Returns SLURM_SUCCESS if successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_completion(int fd, step_complete_msg_t *sent)
{
#if (SLURM_PROTOCOL_VERSION <= SLURM_2_4_PROTOCOL_VERSION)
/* FIXME: Remove this code plus the read code from src/slurmd/slurmstepd/req.c
* in SLURM version 2.5 */
int req = REQUEST_STEP_COMPLETION;
int rc;
int errnum = 0;
debug("Entering stepd_completion, range_first = %d, range_last = %d",
sent->range_first, sent->range_last);
safe_write(fd, &req, sizeof(int));
safe_write(fd, &sent->range_first, sizeof(int));
safe_write(fd, &sent->range_last, sizeof(int));
safe_write(fd, &sent->step_rc, sizeof(int));
jobacct_gather_g_setinfo(sent->jobacct, JOBACCT_DATA_PIPE, &fd);
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
#else
int req = REQUEST_STEP_COMPLETION_V2;
int rc;
int errnum = 0;
Buf buffer;
int len = 0;
int version = SLURM_PROTOCOL_VERSION;
buffer = init_buf(0);
debug("Entering stepd_completion, range_first = %d, range_last = %d",
sent->range_first, sent->range_last);
safe_write(fd, &req, sizeof(int));
safe_write(fd, &version, sizeof(int));
safe_write(fd, &sent->range_first, sizeof(int));
safe_write(fd, &sent->range_last, sizeof(int));
safe_write(fd, &sent->step_rc, sizeof(int));
/*
* We must not use setinfo over a pipe with slurmstepd here
* Indeed, slurmd does a large use of getinfo over a pipe
* with slurmstepd and doing the reverse can result in a deadlock
* scenario with slurmstepd :
* slurmd(lockforread,write)/slurmstepd(write,lockforread)
* Do pack/unpack instead to be sure of independances of
* slurmd and slurmstepd
*/
jobacct_gather_g_pack(sent->jobacct, SLURM_PROTOCOL_VERSION, buffer);
len = get_buf_offset(buffer);
safe_write(fd, &len, sizeof(int));
safe_write(fd, get_buf_data(buffer), len);
free_buf(buffer);
/* Receive the return code and errno */
safe_read(fd, &rc, sizeof(int));
safe_read(fd, &errnum, sizeof(int));
errno = errnum;
return rc;
rwfail:
return -1;
#endif
}
/* Wait for a file descriptor to be readable (up to 300 seconds).
* Return 0 when readable or -1 on error */
static int _wait_fd_readable(int fd)
{
fd_set except_fds, read_fds;
struct timeval timeout;
int rc;
FD_ZERO(&except_fds);
FD_SET(fd, &except_fds);
FD_ZERO(&read_fds);
FD_SET(fd, &read_fds);
timeout.tv_sec = 300;
timeout.tv_usec = 0;
while (1) {
rc = select(fd+1, &read_fds, NULL, &except_fds, &timeout);
if (rc > 0) { /* activity on this fd */
if (FD_ISSET(fd, &read_fds))
return 0;
else /* Exception */
return -1;
} else if (rc == 0) {
error("Timeout waiting for slurmstepd");
return -1;
} else if (errno == EINTR) {
error("select(): %m");
return -1;
}
}
}
/*
*
* Returns jobacctinfo_t struct on success, NULL on error.
* jobacctinfo_t must be freed after calling this function.
*/
int
stepd_stat_jobacct(int fd, job_step_id_msg_t *sent, job_step_stat_t *resp)
{
int req = REQUEST_STEP_STAT;
int rc = SLURM_SUCCESS;
int tasks = 0;
debug("Entering stepd_stat_jobacct for job %u.%u",
sent->job_id, sent->step_id);
safe_write(fd, &req, sizeof(int));
/* Receive the jobacct struct and return */
resp->jobacct = jobacct_gather_g_create(NULL);
/* Do not attempt reading data until there is something to read.
* Avoid locking the jobacct_gather plugin early and creating
* possible deadlock. */
if (_wait_fd_readable(fd))
goto rwfail;
rc = jobacct_gather_g_getinfo(resp->jobacct, JOBACCT_DATA_PIPE, &fd);
safe_read(fd, &tasks, sizeof(int));
resp->num_tasks = tasks;
return rc;
rwfail:
error("gathering job accounting: %d", rc);
jobacct_gather_g_destroy(resp->jobacct);
resp->jobacct = NULL;
return rc;
}
/*
* List all of task process IDs and their local and global SLURM IDs.
*
* Returns SLURM_SUCCESS on success. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_task_info(int fd, slurmstepd_task_info_t **task_info,
uint32_t *task_info_count)
{
int req = REQUEST_STEP_TASK_INFO;
slurmstepd_task_info_t *task;
uint32_t ntasks;
int i;
safe_write(fd, &req, sizeof(int));
safe_read(fd, &ntasks, sizeof(uint32_t));
task = (slurmstepd_task_info_t *)xmalloc(
ntasks * sizeof(slurmstepd_task_info_t));
for (i = 0; i < ntasks; i++) {
safe_read(fd, &(task[i].id), sizeof(int));
safe_read(fd, &(task[i].gtid), sizeof(uint32_t));
safe_read(fd, &(task[i].pid), sizeof(pid_t));
safe_read(fd, &(task[i].exited), sizeof(bool));
safe_read(fd, &(task[i].estatus), sizeof(int));
}
if (ntasks == 0) {
*task_info_count = 0;
*task_info = NULL;
} else {
*task_info_count = ntasks;
*task_info = task;
}
return SLURM_SUCCESS;
rwfail:
*task_info_count = 0;
*task_info = NULL;
return SLURM_ERROR;
}
/*
* List all of process IDs in the proctrack container.
*
* Returns SLURM_SUCCESS is successful. On error returns SLURM_ERROR
* and sets errno.
*/
int
stepd_list_pids(int fd, uint32_t **pids_array, uint32_t *pids_count)
{
int req = REQUEST_STEP_LIST_PIDS;
uint32_t npids;
uint32_t *pids = NULL;
int i;
safe_write(fd, &req, sizeof(int));
/* read the pid list */
safe_read(fd, &npids, sizeof(uint32_t));
pids = xmalloc(npids * sizeof(uint32_t));
for (i = 0; i < npids; i++) {
safe_read(fd, &pids[i], sizeof(uint32_t));
}
if (npids == 0)
xfree(pids);
*pids_count = npids;
*pids_array = pids;
return SLURM_SUCCESS;
rwfail:
xfree(pids);
*pids_count = 0;
*pids_array = NULL;
return SLURM_ERROR;
}