| /*****************************************************************************\ |
| * src/slurmd/slurmstepd/req.c - slurmstepd domain socket request handling |
| * $Id$ |
| ***************************************************************************** |
| * Copyright (C) 2005 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Christopher Morrone <morrone2@llnl.gov> |
| * UCRL-CODE-226842. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://www.llnl.gov/linux/slurm/>. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #if HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <sys/stat.h> |
| #include <unistd.h> |
| #include <signal.h> |
| #include <time.h> |
| |
| #include "src/common/xstring.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/fd.h" |
| #include "src/common/eio.h" |
| #include "src/common/slurm_auth.h" |
| #include "src/common/slurm_jobacct.h" |
| #include "src/common/stepd_api.h" |
| |
| #include "src/slurmd/slurmd/slurmd.h" |
| #include "src/slurmd/common/proctrack.h" |
| #include "src/slurmd/slurmstepd/slurmstepd.h" |
| #include "src/slurmd/slurmstepd/slurmstepd_job.h" |
| #include "src/slurmd/slurmstepd/req.h" |
| #include "src/slurmd/slurmstepd/io.h" |
| #include "src/slurmd/slurmstepd/mgr.h" |
| #include "src/slurmd/slurmstepd/step_terminate_monitor.h" |
| |
| static void *_handle_accept(void *arg); |
| static int _handle_request(int fd, slurmd_job_t *job, uid_t uid, gid_t gid); |
| static int _handle_state(int fd, slurmd_job_t *job); |
| static int _handle_info(int fd, slurmd_job_t *job); |
| static int _handle_signal_process_group(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_signal_task_local(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_signal_container(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_attach(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_pid_in_container(int fd, slurmd_job_t *job); |
| static int _handle_daemon_pid(int fd, slurmd_job_t *job); |
| static int _handle_suspend(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_resume(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_terminate(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_completion(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_stat_jobacct(int fd, slurmd_job_t *job, uid_t uid); |
| static int _handle_task_info(int fd, slurmd_job_t *job); |
| static int _handle_list_pids(int fd, slurmd_job_t *job); |
| static bool _msg_socket_readable(eio_obj_t *obj); |
| static int _msg_socket_accept(eio_obj_t *obj, List objs); |
| |
| struct io_operations msg_socket_ops = { |
| readable: &_msg_socket_readable, |
| handle_read: &_msg_socket_accept |
| }; |
| |
| static char *socket_name; |
| static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static bool suspended = false; |
| |
| struct request_params { |
| int fd; |
| slurmd_job_t *job; |
| }; |
| |
| static pthread_mutex_t message_lock = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t message_cond = PTHREAD_COND_INITIALIZER; |
| static int message_connections; |
| |
| /* |
| * Returns true if "uid" is a "slurm authorized user" - i.e. uid == 0 |
| * or uid == slurm user id at this time. |
| */ |
| static bool |
| _slurm_authorized_user(uid_t uid) |
| { |
| return ((uid == (uid_t) 0) || (uid == conf->slurm_user_id)); |
| } |
| |
| /* |
| * Create a named unix domain listening socket. |
| * (cf, Stevens APUE 1st ed., section 15.5.2) |
| */ |
| static int |
| _create_socket(const char *name) |
| { |
| int fd; |
| int len; |
| struct sockaddr_un addr; |
| |
| /* create a unix domain stream socket */ |
| if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) |
| return -1; |
| fd_set_close_on_exec(fd); |
| |
| memset(&addr, 0, sizeof(addr)); |
| addr.sun_family = AF_UNIX; |
| strcpy(addr.sun_path, name); |
| len = strlen(addr.sun_path)+1 + sizeof(addr.sun_family); |
| |
| /* bind the name to the descriptor */ |
| if (bind(fd, (struct sockaddr *) &addr, len) < 0) |
| return -2; |
| |
| if (listen(fd, 5) < 0) |
| return -3; |
| |
| return fd; |
| } |
| |
| static int |
| _domain_socket_create(const char *dir, const char *nodename, |
| uint32_t jobid, uint32_t stepid) |
| { |
| int fd; |
| char *name = NULL; |
| struct stat stat_buf; |
| |
| /* |
| * Make sure that "dir" exists and is a directory. |
| */ |
| if (stat(dir, &stat_buf) < 0) { |
| error("Domain socket directory %s: %m", dir); |
| return -1; |
| } else if (!S_ISDIR(stat_buf.st_mode)) { |
| error("%s is not a directory", dir); |
| return -1; |
| } |
| |
| /* |
| * Now build the the name of socket, and create the socket. |
| */ |
| xstrfmtcat(name, "%s/%s_%u.%u", dir, nodename, jobid, stepid); |
| |
| /* |
| * First check to see if the named socket already exists. |
| */ |
| if (stat(name, &stat_buf) == 0) { |
| error("Socket %s already exists", name); |
| xfree(name); |
| errno = ESLURMD_STEP_EXISTS; |
| return -1; |
| } |
| |
| fd = _create_socket(name); |
| if (fd < 0) |
| fatal("Could not create domain socket: %m"); |
| |
| chmod(name, 0777); |
| socket_name = name; |
| return fd; |
| } |
| |
| static void |
| _domain_socket_destroy(int fd) |
| { |
| if (close(fd) < 0) |
| error("Unable to close domain socket: %m"); |
| |
| if (unlink(socket_name) == -1) |
| error("Unable to unlink domain socket: %m"); |
| } |
| |
| |
| static void * |
| _msg_thr_internal(void *job_arg) |
| { |
| slurmd_job_t *job = (slurmd_job_t *) job_arg; |
| |
| debug("Message thread started pid = %lu", (unsigned long) getpid()); |
| eio_handle_mainloop(job->msg_handle); |
| debug("Message thread exited"); |
| |
| return NULL; |
| } |
| |
| int |
| msg_thr_create(slurmd_job_t *job) |
| { |
| int fd; |
| eio_obj_t *eio_obj; |
| pthread_attr_t attr; |
| int rc = SLURM_SUCCESS, retries = 0; |
| errno = 0; |
| fd = _domain_socket_create(conf->spooldir, conf->node_name, |
| job->jobid, job->stepid); |
| if (fd == -1) |
| return SLURM_ERROR; |
| |
| fd_set_nonblocking(fd); |
| |
| eio_obj = eio_obj_create(fd, &msg_socket_ops, (void *)job); |
| job->msg_handle = eio_handle_create(); |
| eio_new_initial_obj(job->msg_handle, eio_obj); |
| |
| slurm_attr_init(&attr); |
| |
| while (pthread_create(&job->msgid, &attr, |
| &_msg_thr_internal, (void *)job)) { |
| error("msg_thr_create: pthread_create error %m"); |
| if (++retries > MAX_RETRIES) { |
| error("msg_thr_create: Can't create pthread"); |
| rc = SLURM_ERROR; |
| break; |
| } |
| usleep(10); /* sleep and again */ |
| } |
| |
| slurm_attr_destroy(&attr); |
| |
| return rc; |
| } |
| |
| /* |
| * Bounded wait for the connection count to drop to zero. |
| * This gives connection threads a chance to complete any pending |
| * RPCs before the slurmstepd exits. |
| */ |
| static void _wait_for_connections() |
| { |
| struct timespec ts = {0, 0}; |
| int rc = 0; |
| |
| pthread_mutex_lock(&message_lock); |
| ts.tv_sec = time(NULL) + STEPD_MESSAGE_COMP_WAIT; |
| while (message_connections > 0 && rc == 0) |
| rc = pthread_cond_timedwait(&message_cond, &message_lock, &ts); |
| |
| pthread_mutex_unlock(&message_lock); |
| } |
| |
| static bool |
| _msg_socket_readable(eio_obj_t *obj) |
| { |
| debug3("Called _msg_socket_readable"); |
| if (obj->shutdown == true) { |
| if (obj->fd != -1) { |
| debug2(" false, shutdown"); |
| _domain_socket_destroy(obj->fd); |
| obj->fd = -1; |
| _wait_for_connections(); |
| } else { |
| debug2(" false"); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| static int |
| _msg_socket_accept(eio_obj_t *obj, List objs) |
| { |
| slurmd_job_t *job = (slurmd_job_t *)obj->arg; |
| int fd; |
| struct sockaddr_un addr; |
| int len = sizeof(addr); |
| struct request_params *param = NULL; |
| pthread_attr_t attr; |
| pthread_t id; |
| int retries = 0; |
| |
| debug3("Called _msg_socket_accept"); |
| |
| while ((fd = accept(obj->fd, (struct sockaddr *)&addr, |
| (socklen_t *)&len)) < 0) { |
| if (errno == EINTR) |
| continue; |
| if (errno == EAGAIN |
| || errno == ECONNABORTED |
| || errno == EWOULDBLOCK) { |
| return SLURM_SUCCESS; |
| } |
| error("Error on msg accept socket: %m"); |
| obj->shutdown = true; |
| return SLURM_SUCCESS; |
| } |
| |
| pthread_mutex_lock(&message_lock); |
| message_connections++; |
| pthread_mutex_unlock(&message_lock); |
| |
| fd_set_close_on_exec(fd); |
| fd_set_blocking(fd); |
| |
| slurm_attr_init(&attr); |
| if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) { |
| error("Unable to set detachstate on attr: %m"); |
| slurm_attr_destroy(&attr); |
| close(fd); |
| return SLURM_ERROR; |
| } |
| |
| param = xmalloc(sizeof(struct request_params)); |
| param->fd = fd; |
| param->job = job; |
| while (pthread_create(&id, &attr, &_handle_accept, (void *)param)) { |
| error("stepd_api message engine pthread_create: %m"); |
| if (++retries > MAX_RETRIES) { |
| error("running handle_accept without " |
| "starting a thread stepd will be " |
| "unresponsive until done"); |
| _handle_accept((void *)param); |
| info("stepd should be responsive now"); |
| break; |
| } |
| usleep(10); /* sleep and again */ |
| } |
| |
| slurm_attr_destroy(&attr); |
| param = NULL; |
| |
| debug3("Leaving _msg_socket_accept"); |
| return SLURM_SUCCESS; |
| } |
| |
| static void * |
| _handle_accept(void *arg) |
| { |
| /*struct request_params *param = (struct request_params *)arg;*/ |
| int fd = ((struct request_params *)arg)->fd; |
| slurmd_job_t *job = ((struct request_params *)arg)->job; |
| int req; |
| int len; |
| Buf buffer; |
| void *auth_cred; |
| int rc; |
| uid_t uid; |
| gid_t gid; |
| |
| debug3("Entering _handle_accept (new thread)"); |
| xfree(arg); |
| |
| safe_read(fd, &req, sizeof(int)); |
| if (req != REQUEST_CONNECT) { |
| error("First message must be REQUEST_CONNECT"); |
| goto fail; |
| } |
| |
| safe_read(fd, &len, sizeof(int)); |
| buffer = init_buf(len); |
| safe_read(fd, get_buf_data(buffer), len); |
| |
| /* Unpack and verify the auth credential */ |
| auth_cred = g_slurm_auth_unpack(buffer); |
| if (auth_cred == NULL) { |
| error("Unpacking authentication credential: %s", |
| g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); |
| free_buf(buffer); |
| goto fail; |
| } |
| rc = g_slurm_auth_verify(auth_cred, NULL, 2); |
| if (rc != SLURM_SUCCESS) { |
| error("Verifying authentication credential: %s", |
| g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred))); |
| (void) g_slurm_auth_destroy(auth_cred); |
| free_buf(buffer); |
| goto fail; |
| } |
| |
| /* Get the uid & gid from the credential, then destroy it. */ |
| uid = g_slurm_auth_get_uid(auth_cred); |
| gid = g_slurm_auth_get_gid(auth_cred); |
| debug3(" Identity: uid=%d, gid=%d", uid, gid); |
| g_slurm_auth_destroy(auth_cred); |
| free_buf(buffer); |
| |
| rc = SLURM_SUCCESS; |
| safe_write(fd, &rc, sizeof(int)); |
| |
| while (1) { |
| rc = _handle_request(fd, job, uid, gid); |
| if (rc != SLURM_SUCCESS) |
| break; |
| } |
| |
| if (close(fd) == -1) |
| error("Closing accepted fd: %m"); |
| |
| pthread_mutex_lock(&message_lock); |
| message_connections--; |
| pthread_cond_signal(&message_cond); |
| pthread_mutex_unlock(&message_lock); |
| |
| debug3("Leaving _handle_accept"); |
| return NULL; |
| |
| fail: |
| rc = SLURM_FAILURE; |
| safe_write(fd, &rc, sizeof(int)); |
| rwfail: |
| if (close(fd) == -1) |
| error("Closing accepted fd after error: %m"); |
| debug("Leaving _handle_accept on an error"); |
| return NULL; |
| } |
| |
| |
| int |
| _handle_request(int fd, slurmd_job_t *job, uid_t uid, gid_t gid) |
| { |
| int rc = 0; |
| int req; |
| |
| debug3("Entering _handle_request"); |
| if ((rc = read(fd, &req, sizeof(int))) != sizeof(int)) { |
| if (rc == 0) { /* EOF, normal */ |
| return -1; |
| } else { |
| debug3("Leaving _handle_request on read error"); |
| return SLURM_FAILURE; |
| } |
| } |
| debug3("Got request"); |
| rc = SLURM_SUCCESS; |
| switch (req) { |
| case REQUEST_SIGNAL_PROCESS_GROUP: |
| debug("Handling REQUEST_SIGNAL_PROCESS_GROUP"); |
| rc = _handle_signal_process_group(fd, job, uid); |
| break; |
| case REQUEST_SIGNAL_TASK_LOCAL: |
| debug("Handling REQUEST_SIGNAL_TASK_LOCAL"); |
| rc = _handle_signal_task_local(fd, job, uid); |
| break; |
| case REQUEST_SIGNAL_TASK_GLOBAL: |
| debug("Handling REQUEST_SIGNAL_TASK_GLOBAL (not implemented)"); |
| break; |
| case REQUEST_SIGNAL_CONTAINER: |
| debug("Handling REQUEST_SIGNAL_CONTAINER"); |
| rc = _handle_signal_container(fd, job, uid); |
| break; |
| case REQUEST_STATE: |
| debug("Handling REQUEST_STATE"); |
| rc = _handle_state(fd, job); |
| break; |
| case REQUEST_INFO: |
| debug("Handling REQUEST_INFO"); |
| rc = _handle_info(fd, job); |
| break; |
| case REQUEST_ATTACH: |
| debug("Handling REQUEST_ATTACH"); |
| rc = _handle_attach(fd, job, uid); |
| break; |
| case REQUEST_PID_IN_CONTAINER: |
| debug("Handling REQUEST_PID_IN_CONTAINER"); |
| rc = _handle_pid_in_container(fd, job); |
| break; |
| case REQUEST_DAEMON_PID: |
| debug("Handling REQUEST_DAEMON_PID"); |
| rc = _handle_daemon_pid(fd, job); |
| break; |
| case REQUEST_STEP_SUSPEND: |
| debug("Handling REQUEST_STEP_SUSPEND"); |
| rc = _handle_suspend(fd, job, uid); |
| break; |
| case REQUEST_STEP_RESUME: |
| debug("Handling REQUEST_STEP_RESUME"); |
| rc = _handle_resume(fd, job, uid); |
| break; |
| case REQUEST_STEP_TERMINATE: |
| debug("Handling REQUEST_STEP_TERMINATE"); |
| rc = _handle_terminate(fd, job, uid); |
| break; |
| case REQUEST_STEP_COMPLETION: |
| debug("Handling REQUEST_STEP_COMPLETION"); |
| rc = _handle_completion(fd, job, uid); |
| break; |
| case MESSAGE_STAT_JOBACCT: |
| debug("Handling MESSAGE_STAT_JOBACCT"); |
| rc = _handle_stat_jobacct(fd, job, uid); |
| break; |
| case REQUEST_STEP_TASK_INFO: |
| debug("Handling REQUEST_STEP_TASK_INFO"); |
| rc = _handle_task_info(fd, job); |
| break; |
| case REQUEST_STEP_LIST_PIDS: |
| debug("Handling REQUEST_STEP_LIST_PIDS"); |
| rc = _handle_list_pids(fd, job); |
| break; |
| default: |
| error("Unrecognized request: %d", req); |
| rc = SLURM_FAILURE; |
| break; |
| } |
| |
| debug3("Leaving _handle_request: %s", |
| rc ? "SLURM_FAILURE" : "SLURM_SUCCESS"); |
| return rc; |
| } |
| |
| static int |
| _handle_state(int fd, slurmd_job_t *job) |
| { |
| safe_write(fd, &job->state, sizeof(slurmstepd_state_t)); |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_info(int fd, slurmd_job_t *job) |
| { |
| safe_write(fd, &job->uid, sizeof(uid_t)); |
| safe_write(fd, &job->jobid, sizeof(uint32_t)); |
| safe_write(fd, &job->stepid, sizeof(uint32_t)); |
| safe_write(fd, &job->nodeid, sizeof(uint32_t)); |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_signal_process_group(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int signal; |
| |
| debug3("_handle_signal_process_group for job %u.%u", |
| job->jobid, job->stepid); |
| |
| safe_read(fd, &signal, sizeof(int)); |
| |
| debug3(" uid = %d", uid); |
| if (uid != job->uid && !_slurm_authorized_user(uid)) { |
| debug("kill req from uid %ld for job %u.%u owned by uid %ld", |
| (long)uid, job->jobid, job->stepid, (long)job->uid); |
| rc = EPERM; |
| goto done; |
| } |
| |
| /* |
| * Sanity checks |
| */ |
| if (job->pgid <= (pid_t)1) { |
| debug ("step %u.%u invalid [jmgr_pid:%d pgid:%u]", |
| job->jobid, job->stepid, job->jmgr_pid, job->pgid); |
| rc = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| /* |
| * Signal the process group |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (suspended) { |
| rc = ESLURMD_STEP_SUSPENDED; |
| pthread_mutex_unlock(&suspend_mutex); |
| goto done; |
| } |
| |
| if (killpg(job->pgid, signal) == -1) { |
| rc = -1; |
| verbose("Error sending signal %d to %u.%u, pgid %d: %m", |
| signal, job->jobid, job->stepid, job->pgid); |
| } else { |
| verbose("Sent signal %d to %u.%u, pgid %d", |
| signal, job->jobid, job->stepid, job->pgid); |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code */ |
| safe_write(fd, &rc, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_signal_task_local(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int signal; |
| int ltaskid; /* local task index */ |
| |
| debug("_handle_signal_task_local for job %u.%u", |
| job->jobid, job->stepid); |
| |
| safe_read(fd, &signal, sizeof(int)); |
| safe_read(fd, <askid, sizeof(int)); |
| |
| debug3(" uid = %d", uid); |
| if (uid != job->uid && !_slurm_authorized_user(uid)) { |
| debug("kill req from uid %ld for job %u.%u owned by uid %ld", |
| (long)uid, job->jobid, job->stepid, (long)job->uid); |
| rc = EPERM; |
| goto done; |
| } |
| |
| /* |
| * Sanity checks |
| */ |
| if (ltaskid < 0 || ltaskid >= job->ntasks) { |
| debug("step %u.%u invalid local task id %d", |
| job->jobid, job->stepid, ltaskid); |
| rc = SLURM_ERROR; |
| goto done; |
| } |
| if (!job->task |
| || !job->task[ltaskid]) { |
| debug("step %u.%u no task info for task id %d", |
| job->jobid, job->stepid, ltaskid); |
| rc = SLURM_ERROR; |
| goto done; |
| } |
| if (job->task[ltaskid]->pid <= 1) { |
| debug("step %u.%u invalid pid %d for task %d", |
| job->jobid, job->stepid, |
| job->task[ltaskid]->pid, ltaskid); |
| rc = SLURM_ERROR; |
| goto done; |
| } |
| |
| /* |
| * Signal the task |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (suspended) { |
| rc = ESLURMD_STEP_SUSPENDED; |
| pthread_mutex_unlock(&suspend_mutex); |
| goto done; |
| } |
| |
| if (kill(job->task[ltaskid]->pid, signal) == -1) { |
| rc = -1; |
| verbose("Error sending signal %d to %u.%u, pid %d: %m", |
| signal, job->jobid, job->stepid, |
| job->task[ltaskid]->pid); |
| } else { |
| verbose("Sent signal %d to %u.%u, pid %d", |
| signal, job->jobid, job->stepid, |
| job->task[ltaskid]->pid); |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code */ |
| safe_write(fd, &rc, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_signal_container(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int errnum = 0; |
| int sig; |
| static int msg_sent = 0; |
| |
| debug("_handle_signal_container for job %u.%u", |
| job->jobid, job->stepid); |
| |
| safe_read(fd, &sig, sizeof(int)); |
| |
| debug3(" uid = %d", uid); |
| if (uid != job->uid && !_slurm_authorized_user(uid)) { |
| debug("kill container req from uid %ld for job %u.%u " |
| "owned by uid %ld", |
| (long)uid, job->jobid, job->stepid, (long)job->uid); |
| rc = -1; |
| errnum = EPERM; |
| goto done; |
| } |
| |
| /* |
| * Sanity checks |
| */ |
| if (job->cont_id == 0) { |
| debug ("step %u.%u invalid container [cont_id:%u]", |
| job->jobid, job->stepid, job->cont_id); |
| rc = -1; |
| errnum = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| if ((job->nodeid == 0) && (msg_sent == 0) && |
| (job->state < SLURMSTEPD_STEP_ENDING)) { |
| char *entity; |
| if (job->stepid == SLURM_BATCH_SCRIPT) |
| entity = "JOB"; |
| else |
| entity = "STEP"; |
| /* Not really errors, |
| * but we want messages displayed by default */ |
| if (sig == SIGXCPU) { |
| error("*** %s CANCELLED DUE TO TIME LIMIT ***", entity); |
| msg_sent = 1; /* we just want to log the event */ |
| goto done; /* don't actually send the signal */ |
| } else if ((sig == SIGTERM) || (sig == SIGKILL)) { |
| error("*** %s CANCELLED ***", entity); |
| msg_sent = 1; |
| } |
| } |
| |
| /* |
| * Signal the container |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (suspended) { |
| rc = -1; |
| errnum = ESLURMD_STEP_SUSPENDED; |
| pthread_mutex_unlock(&suspend_mutex); |
| goto done; |
| } |
| |
| if (slurm_container_signal(job->cont_id, sig) < 0) { |
| rc = -1; |
| errnum = errno; |
| verbose("Error sending signal %d to %u.%u: %m", |
| sig, job->jobid, job->stepid); |
| } else { |
| verbose("Sent signal %d to %u.%u", |
| sig, job->jobid, job->stepid); |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code and errnum */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_terminate(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int errnum = 0; |
| |
| debug("_handle_terminate for job %u.%u", |
| job->jobid, job->stepid); |
| step_terminate_monitor_start(job->jobid, job->stepid); |
| |
| debug3(" uid = %d", uid); |
| if (uid != job->uid && !_slurm_authorized_user(uid)) { |
| debug("terminate req from uid %ld for job %u.%u " |
| "owned by uid %ld", |
| (long)uid, job->jobid, job->stepid, (long)job->uid); |
| rc = -1; |
| errnum = EPERM; |
| goto done; |
| } |
| |
| /* |
| * Sanity checks |
| */ |
| if (job->cont_id == 0) { |
| debug ("step %u.%u invalid container [cont_id:%u]", |
| job->jobid, job->stepid, job->cont_id); |
| rc = -1; |
| errnum = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| /* |
| * Signal the container with SIGKILL |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (suspended) { |
| debug("Terminating suspended job step %u.%u", |
| job->jobid, job->stepid); |
| } |
| |
| if (slurm_container_signal(job->cont_id, SIGKILL) < 0) { |
| rc = -1; |
| errnum = errno; |
| verbose("Error sending SIGKILL signal to %u.%u: %m", |
| job->jobid, job->stepid); |
| } else { |
| verbose("Sent SIGKILL signal to %u.%u", |
| job->jobid, job->stepid); |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code and errnum */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_attach(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| srun_info_t *srun; |
| int rc = SLURM_SUCCESS; |
| |
| debug("_handle_attach for job %u.%u", job->jobid, job->stepid); |
| |
| srun = xmalloc(sizeof(srun_info_t)); |
| srun->key = (srun_key_t *)xmalloc(SLURM_IO_KEY_SIZE); |
| |
| debug("sizeof(srun_info_t) = %d, sizeof(slurm_addr) = %d", |
| sizeof(srun_info_t), sizeof(slurm_addr)); |
| safe_read(fd, &srun->ioaddr, sizeof(slurm_addr)); |
| safe_read(fd, &srun->resp_addr, sizeof(slurm_addr)); |
| safe_read(fd, srun->key, SLURM_IO_KEY_SIZE); |
| |
| /* |
| * Check if jobstep is actually running. |
| */ |
| if (job->state != SLURMSTEPD_STEP_RUNNING) { |
| rc = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| /* |
| * At the moment, it only makes sense for the slurmd to make this |
| * call, so only _slurm_authorized_user is allowed. |
| */ |
| if (!_slurm_authorized_user(uid)) { |
| error("uid %ld attempt to attach to job %u.%u owned by %ld", |
| (long) uid, job->jobid, job->stepid, (long)job->uid); |
| rc = EPERM; |
| goto done; |
| } |
| |
| list_prepend(job->sruns, (void *) srun); |
| rc = io_client_connect(srun, job); |
| debug(" back from io_client_connect, rc = %d", rc); |
| done: |
| /* Send the return code */ |
| safe_write(fd, &rc, sizeof(int)); |
| |
| debug(" in _handle_attach rc = %d", rc); |
| if (rc == SLURM_SUCCESS) { |
| /* Send response info */ |
| uint32_t *pids, *gtids; |
| int len, i; |
| |
| debug(" in _handle_attach sending response info"); |
| len = job->ntasks * sizeof(uint32_t); |
| pids = xmalloc(len); |
| gtids = xmalloc(len); |
| |
| if (job->task != NULL) { |
| for (i = 0; i < job->ntasks; i++) { |
| if (job->task[i] == NULL) |
| continue; |
| pids[i] = (uint32_t)job->task[i]->pid; |
| gtids[i] = job->task[i]->gtid; |
| } |
| } |
| |
| safe_write(fd, &job->ntasks, sizeof(uint32_t)); |
| safe_write(fd, pids, len); |
| safe_write(fd, gtids, len); |
| xfree(pids); |
| xfree(gtids); |
| |
| for (i = 0; i < job->ntasks; i++) { |
| len = strlen(job->task[i]->argv[0]) + 1; |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, job->task[i]->argv[0], len); |
| } |
| } |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_pid_in_container(int fd, slurmd_job_t *job) |
| { |
| bool rc = false; |
| pid_t pid; |
| |
| debug("_handle_pid_in_container for job %u.%u", |
| job->jobid, job->stepid); |
| |
| safe_read(fd, &pid, sizeof(pid_t)); |
| |
| rc = slurm_container_has_pid(job->cont_id, pid); |
| |
| /* Send the return code */ |
| safe_write(fd, &rc, sizeof(bool)); |
| |
| debug("Leaving _handle_pid_in_container"); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_daemon_pid(int fd, slurmd_job_t *job) |
| { |
| safe_write(fd, &job->jmgr_pid, sizeof(pid_t)); |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_suspend(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int errnum = 0; |
| |
| debug("_handle_suspend for job %u.%u", |
| job->jobid, job->stepid); |
| |
| debug3(" uid = %d", uid); |
| if (!_slurm_authorized_user(uid)) { |
| debug("job step suspend request from uid %ld for job %u.%u ", |
| (long)uid, job->jobid, job->stepid); |
| rc = -1; |
| errnum = EPERM; |
| goto done; |
| } |
| |
| if (job->cont_id == 0) { |
| debug ("step %u.%u invalid container [cont_id:%u]", |
| job->jobid, job->stepid, job->cont_id); |
| rc = -1; |
| errnum = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| jobacct_g_suspend_poll(); |
| |
| /* |
| * Signal the container |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (suspended) { |
| rc = -1; |
| errnum = ESLURMD_STEP_SUSPENDED; |
| pthread_mutex_unlock(&suspend_mutex); |
| goto done; |
| } else { |
| /* SIGTSTP is sent first to let MPI daemons stop their |
| * tasks, then we send SIGSTOP to stop everything else */ |
| if (slurm_container_signal(job->cont_id, SIGTSTP) < 0) { |
| verbose("Error suspending %u.%u (SIGTSTP): %m", |
| job->jobid, job->stepid); |
| } else |
| sleep(1); |
| |
| if (slurm_container_signal(job->cont_id, SIGSTOP) < 0) { |
| verbose("Error suspending %u.%u (SIGSTOP): %m", |
| job->jobid, job->stepid); |
| } else { |
| verbose("Suspended %u.%u", job->jobid, job->stepid); |
| } |
| suspended = true; |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code and errno */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_resume(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int errnum = 0; |
| |
| debug("_handle_resume for job %u.%u", |
| job->jobid, job->stepid); |
| |
| debug3(" uid = %d", uid); |
| if (!_slurm_authorized_user(uid)) { |
| debug("job step resume request from uid %ld for job %u.%u ", |
| (long)uid, job->jobid, job->stepid); |
| rc = -1; |
| errnum = EPERM; |
| goto done; |
| } |
| |
| if (job->cont_id == 0) { |
| debug ("step %u.%u invalid container [cont_id:%u]", |
| job->jobid, job->stepid, job->cont_id); |
| rc = -1; |
| errnum = ESLURMD_JOB_NOTRUNNING; |
| goto done; |
| } |
| |
| jobacct_g_resume_poll(); |
| /* |
| * Signal the container |
| */ |
| pthread_mutex_lock(&suspend_mutex); |
| if (!suspended) { |
| rc = -1; |
| errnum = ESLURMD_STEP_NOTSUSPENDED; |
| pthread_mutex_unlock(&suspend_mutex); |
| goto done; |
| } else { |
| if (slurm_container_signal(job->cont_id, SIGCONT) < 0) { |
| verbose("Error resuming %u.%u: %m", |
| job->jobid, job->stepid); |
| } else { |
| verbose("Resumed %u.%u", job->jobid, job->stepid); |
| } |
| suspended = false; |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| |
| done: |
| /* Send the return code and errno */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_completion(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| int errnum = 0; |
| int first; |
| int last; |
| jobacctinfo_t *jobacct = NULL; |
| int step_rc; |
| /* char bits_string[128]; */ |
| |
| debug("_handle_completion for job %u.%u", |
| job->jobid, job->stepid); |
| |
| debug3(" uid = %d", uid); |
| if (!_slurm_authorized_user(uid)) { |
| debug("step completion message from uid %ld for job %u.%u ", |
| (long)uid, job->jobid, job->stepid); |
| rc = -1; |
| errnum = EPERM; |
| /* Send the return code and errno */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| return SLURM_SUCCESS; |
| } |
| |
| safe_read(fd, &first, sizeof(int)); |
| safe_read(fd, &last, sizeof(int)); |
| safe_read(fd, &step_rc, sizeof(int)); |
| jobacct = jobacct_g_alloc(NULL); |
| jobacct_g_getinfo(jobacct, JOBACCT_DATA_PIPE, &fd); |
| |
| /* |
| * Record the completed nodes |
| */ |
| pthread_mutex_lock(&step_complete.lock); |
| if (! step_complete.wait_children) { |
| rc = -1; |
| errnum = ETIMEDOUT; /* not used anyway */ |
| goto timeout; |
| } |
| |
| /* debug2("Setting range %d(bit %d) through %d(bit %d)", */ |
| /* first, first-(step_complete.rank+1), */ |
| /* last, last-(step_complete.rank+1)); */ |
| /* bit_fmt(bits_string, 128, step_complete.bits); */ |
| /* debug2(" before bits: %s", bits_string); */ |
| bit_nset(step_complete.bits, |
| first - (step_complete.rank+1), |
| last - (step_complete.rank+1)); |
| /* bit_fmt(bits_string, 128, step_complete.bits); */ |
| /* debug2(" after bits: %s", bits_string); */ |
| step_complete.step_rc = MAX(step_complete.step_rc, step_rc); |
| |
| /************* acct stuff ********************/ |
| jobacct_g_aggregate(step_complete.jobacct, jobacct); |
| timeout: |
| jobacct_g_free(jobacct); |
| /*********************************************/ |
| |
| /* Send the return code and errno, we do this within the locked |
| * region to ensure that the stepd doesn't exit before we can |
| * perform this send. */ |
| safe_write(fd, &rc, sizeof(int)); |
| safe_write(fd, &errnum, sizeof(int)); |
| pthread_cond_signal(&step_complete.cond); |
| pthread_mutex_unlock(&step_complete.lock); |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| static int |
| _handle_stat_jobacct(int fd, slurmd_job_t *job, uid_t uid) |
| { |
| jobacctinfo_t *jobacct = NULL; |
| jobacctinfo_t *temp_jobacct = NULL; |
| int i = 0; |
| int num_tasks = 0; |
| debug("_handle_stat_jobacct for job %u.%u", |
| job->jobid, job->stepid); |
| |
| debug3(" uid = %d", uid); |
| if (uid != job->uid && !_slurm_authorized_user(uid)) { |
| debug("stat jobacct from uid %ld for job %u.%u " |
| "owned by uid %ld", |
| (long)uid, job->jobid, job->stepid, (long)job->uid); |
| /* Send NULL */ |
| jobacct_g_setinfo(jobacct, JOBACCT_DATA_PIPE, &fd); |
| return SLURM_ERROR; |
| } |
| |
| jobacct = jobacct_g_alloc(NULL); |
| debug3("num tasks = %d", job->ntasks); |
| |
| for (i = 0; i < job->ntasks; i++) { |
| temp_jobacct = jobacct_g_stat_task(job->task[i]->pid); |
| if(temp_jobacct) { |
| jobacct_g_aggregate(jobacct, temp_jobacct); |
| jobacct_g_free(temp_jobacct); |
| num_tasks++; |
| } |
| } |
| jobacct_g_setinfo(jobacct, JOBACCT_DATA_PIPE, &fd); |
| safe_write(fd, &num_tasks, sizeof(int)); |
| jobacct_g_free(jobacct); |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_ERROR; |
| } |
| |
| /* We don't check the uid in this function, anyone may list the task info. */ |
| static int |
| _handle_task_info(int fd, slurmd_job_t *job) |
| { |
| int i; |
| slurmd_task_info_t *task; |
| |
| debug("_handle_task_info for job %u.%u", job->jobid, job->stepid); |
| |
| safe_write(fd, &job->ntasks, sizeof(uint32_t)); |
| for (i = 0; i < job->ntasks; i++) { |
| task = job->task[i]; |
| safe_write(fd, &task->id, sizeof(int)); |
| safe_write(fd, &task->gtid, sizeof(uint32_t)); |
| safe_write(fd, &task->pid, sizeof(pid_t)); |
| safe_write(fd, &task->exited, sizeof(bool)); |
| safe_write(fd, &task->estatus, sizeof(int)); |
| } |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| return SLURM_FAILURE; |
| } |
| |
| /* We don't check the uid in this function, anyone may list the task info. */ |
| static int |
| _handle_list_pids(int fd, slurmd_job_t *job) |
| { |
| int i; |
| pid_t *pids = NULL; |
| int npids = 0; |
| |
| debug("_handle_list_pids for job %u.%u", job->jobid, job->stepid); |
| slurm_container_get_pids(job->cont_id, &pids, &npids); |
| safe_write(fd, &npids, sizeof(int)); |
| for (i = 0; i < npids; i++) { |
| safe_write(fd, &pids[i], sizeof(pid_t)); |
| } |
| if (npids > 0) |
| xfree(pids); |
| |
| return SLURM_SUCCESS; |
| rwfail: |
| if (npids > 0) |
| xfree(pids); |
| return SLURM_FAILURE; |
| } |