| /*****************************************************************************\ |
| * slurmscriptd.c - Slurm script functions. |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "config.h" |
| |
| #define _GNU_SOURCE /* For POLLRDHUP */ |
| |
| #if HAVE_SYS_PRCTL_H |
| # include <sys/prctl.h> |
| #endif |
| |
| #include <fcntl.h> |
| #include <poll.h> |
| #include <signal.h> |
| #include <sys/wait.h> |
| #include <unistd.h> |
| |
| #include "slurm/slurm_errno.h" |
| |
| #include "src/common/eio.h" |
| #include "src/common/env.h" |
| #include "src/common/fd.h" |
| #include "src/common/fetch_config.h" |
| #include "src/common/log.h" |
| #include "src/common/msg_type.h" |
| #include "src/common/run_command.h" |
| #include "src/common/setproctitle.h" |
| #include "src/common/slurm_protocol_pack.h" |
| #include "src/common/track_script.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/conmgr/conmgr.h" |
| |
| #include "src/interfaces/burst_buffer.h" |
| #include "src/interfaces/hash.h" |
| |
| #include "src/slurmctld/locks.h" |
| #include "src/slurmctld/slurmctld.h" |
| #include "src/slurmctld/slurmscriptd.h" |
| #include "src/slurmctld/slurmscriptd_protocol_defs.h" |
| #include "src/slurmctld/slurmscriptd_protocol_pack.h" |
| |
| #define MAX_POLL_WAIT 500 /* in milliseconds */ |
| #define MAX_SHUTDOWN_DELAY 10 |
| |
| #ifndef POLLRDHUP |
| #define POLLRDHUP POLLHUP |
| #endif |
| |
| /* |
| ***************************************************************************** |
| * The following are meant to be used by both slurmscriptd and slurmctld |
| ***************************************************************************** |
| */ |
| |
| static bool _msg_readable(eio_obj_t *obj); |
| static int _msg_accept(eio_obj_t *obj, list_t *objs); |
| static int _handle_close(eio_obj_t *obj, list_t *objs); |
| |
| struct io_operations msg_ops = { |
| .readable = _msg_readable, |
| .handle_read = _msg_accept, |
| .handle_close = _handle_close, |
| }; |
| |
| typedef struct { |
| buf_t *buffer; |
| int req; |
| } req_args_t; |
| |
| static eio_handle_t *msg_handle = NULL; |
| static pthread_mutex_t write_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| /* |
| ***************************************************************************** |
| * The following are meant to be used by only slurmctld |
| ***************************************************************************** |
| */ |
| typedef struct { |
| pthread_cond_t cond; |
| char *key; |
| pthread_mutex_t mutex; |
| int rc; |
| bool received_response; |
| char *resp_msg; |
| bool track_script_signalled; |
| } script_response_t; |
| |
| static void _incr_script_cnt(void); |
| |
| static bool shutting_down = false; |
| static int slurmctld_readfd = -1; |
| static int slurmctld_writefd = -1; |
| static pid_t slurmscriptd_pid; |
| static pthread_t slurmctld_listener_tid; |
| static int script_count = 0; |
| static pthread_mutex_t script_count_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t script_count_cond = PTHREAD_COND_INITIALIZER; |
| |
| static pthread_mutex_t script_resp_map_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static xhash_t *script_resp_map = NULL; |
| |
| /* |
| ***************************************************************************** |
| * The following are meant to be used by only slurmscriptd |
| ***************************************************************************** |
| */ |
| static int slurmscriptd_readfd = -1; |
| static int slurmscriptd_writefd = -1; |
| static pthread_mutex_t powersave_script_count_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t powersave_script_cond = PTHREAD_COND_INITIALIZER; |
| static int powersave_script_count = 0; |
| static bool powersave_wait_called = false; |
| |
| |
| /* Function definitions: */ |
| |
| /* Fetch key from xhash_t item. Called from function ptr */ |
| static void _resp_map_key_id(void *item, const char **key, uint32_t *key_len) |
| { |
| script_response_t *lua_resp = (script_response_t *)item; |
| |
| xassert(lua_resp); |
| |
| *key = lua_resp->key; |
| *key_len = strlen(lua_resp->key); |
| } |
| |
| /* Free item from xhash_t. Called from function ptr */ |
| static void _resp_map_free(void *item) |
| { |
| script_response_t *script_resp = (script_response_t *)item; |
| |
| if (!script_resp) |
| return; |
| |
| slurm_cond_destroy(&script_resp->cond); |
| xfree(script_resp->key); |
| slurm_mutex_destroy(&script_resp->mutex); |
| xfree(script_resp->resp_msg); |
| xfree(script_resp); |
| } |
| |
| /* Add an entry to script_resp_map */ |
| static script_response_t *_script_resp_map_add(void) |
| { |
| script_response_t *script_resp; |
| |
| script_resp = xmalloc(sizeof *script_resp); |
| slurm_cond_init(&script_resp->cond, NULL); |
| /* |
| * Use pthread_self() to create a unique identifier for the key. |
| * The caller must ensure that this thread does not end for the |
| * lifetime of the entry in the hashmap. The caller can do this by |
| * calling _wait_for_script_resp() which will block until the response |
| * RPC is received. |
| */ |
| script_resp->key = xstrdup_printf("%"PRIu64, (uint64_t) pthread_self()); |
| slurm_mutex_init(&script_resp->mutex); |
| script_resp->resp_msg = NULL; |
| |
| slurm_mutex_lock(&script_resp_map_mutex); |
| xhash_add(script_resp_map, script_resp); |
| slurm_mutex_unlock(&script_resp_map_mutex); |
| |
| return script_resp; |
| } |
| |
| static void _script_resp_map_remove(char *key) |
| { |
| slurm_mutex_lock(&script_resp_map_mutex); |
| xhash_delete(script_resp_map, key, strlen(key)); |
| slurm_mutex_unlock(&script_resp_map_mutex); |
| } |
| |
| static void _wait_for_script_resp(script_response_t *script_resp, |
| int *status, char **resp_msg, |
| bool *track_script_signalled) |
| { |
| /* script_resp->mutex should already be locked */ |
| /* Loop to handle spurious wakeups */ |
| while (!script_resp->received_response) { |
| slurm_cond_wait(&script_resp->cond, &script_resp->mutex); |
| } |
| /* The script is done now, and we should have the response */ |
| *status = script_resp->rc; |
| if (resp_msg) |
| *resp_msg = xstrdup(script_resp->resp_msg); |
| if (track_script_signalled) |
| *track_script_signalled = script_resp->track_script_signalled; |
| } |
| |
| static void _wait_for_powersave_scripts() |
| { |
| int cnt = 0; |
| struct timespec ts = {0, 0}; |
| time_t start; |
| time_t now; |
| bool first = true; |
| |
| /* |
| * Only do this wait once. Under normal operation, this is called twice: |
| * (1) _handle_shutdown() |
| * (2) _handle_close() |
| * We could just call this from _handle_shutdown(). However, if |
| * slurmctld fatal()'s or dies in some other way without sending |
| * SLURMSCRIPTD_SHUTDOWN, then only _handle_close() is called. So, we |
| * need this to be called from both places but only happen once. |
| */ |
| if (powersave_wait_called) |
| return; |
| powersave_wait_called = true; |
| |
| /* |
| * ResumeProgram has a temporary file open held in memory. |
| * Wait up to MAX_SHUTDOWN_DELAY seconds for powersave scripts to |
| * finish before shutting down (which will close the temporary file). |
| */ |
| slurm_mutex_lock(&powersave_script_count_mutex); |
| start = now = time(NULL); |
| while (now < (start + MAX_SHUTDOWN_DELAY)) { |
| cnt = powersave_script_count; |
| if (!cnt) |
| break; |
| if (first) { |
| log_flag(SCRIPT, "Waiting up to %d seconds for %d powersave scripts to complete", |
| MAX_SHUTDOWN_DELAY, cnt); |
| first = false; |
| } |
| |
| ts.tv_sec = now + 2; |
| slurm_cond_timedwait(&powersave_script_cond, |
| &powersave_script_count_mutex, &ts); |
| now = time(NULL); |
| } |
| slurm_mutex_unlock(&powersave_script_count_mutex); |
| |
| /* Kill or orphan running scripts. */ |
| run_command_shutdown(); |
| if (cnt) { |
| error("power_save: orphaning %d processes which are not terminating so slurmctld can exit", |
| cnt); |
| |
| /* |
| * Wait for the script completion messages to be processed and |
| * sent to slurmctld, otherwise slurmctld may wait forever for |
| * a message that won't come. |
| */ |
| slurm_mutex_lock(&powersave_script_count_mutex); |
| while (cnt) { |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&powersave_script_cond, |
| &powersave_script_count_mutex, |
| &ts); |
| cnt = powersave_script_count; |
| } |
| slurm_mutex_unlock(&powersave_script_count_mutex); |
| } |
| |
| } |
| |
| static int _handle_close(eio_obj_t *obj, list_t *objs) |
| { |
| debug3("Called %s", __func__); |
| |
| /* |
| * This happens on normal shutdown, but it also happens when either |
| * slurmctld or slurmscriptd are killed (e.g., by fatal(), SIGKILL) |
| * and then the pipe is closed because the process closed. |
| * If that happens then we want to shutdown instead of run forever. |
| * Also, if this is slurmscriptd, then we want to kill any running |
| * scripts. |
| */ |
| log_flag(SCRIPT, "close() on pipe"); |
| |
| obj->shutdown = true; |
| |
| if (!running_in_slurmctld()) { /* Only do this for slurmscriptd */ |
| _wait_for_powersave_scripts(); |
| track_script_flush(); |
| } else { |
| /* fd has been closed */ |
| slurmctld_readfd = -1; |
| } |
| |
| return SLURM_SUCCESS; /* Note: Return value is ignored by eio. */ |
| } |
| |
| static bool _msg_readable(eio_obj_t *obj) |
| { |
| debug3("Called %s", __func__); |
| if (obj->shutdown) { |
| log_flag(SCRIPT, "%s: false, shutdown", __func__); |
| return false; |
| } |
| return true; |
| } |
| |
| static int _write_msg(int fd, int req, buf_t *buffer, bool lock) |
| { |
| int len = 0; |
| |
| if (lock) |
| slurm_mutex_lock(&write_mutex); |
| safe_write(fd, &req, sizeof(int)); |
| if (buffer) { |
| len = get_buf_offset(buffer); |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, get_buf_data(buffer), len); |
| } else /* Write 0 length so the receiver knows not to read anymore */ |
| safe_write(fd, &len, sizeof(int)); |
| if (lock) |
| slurm_mutex_unlock(&write_mutex); |
| |
| return SLURM_SUCCESS; |
| |
| rwfail: |
| if (running_in_slurmctld()) |
| error("%s: read/write op failed, restart slurmctld now: %m", |
| __func__); |
| if (lock) |
| slurm_mutex_unlock(&write_mutex); |
| return SLURM_ERROR; |
| } |
| |
| /* |
| * Send an RPC from slurmctld to slurmscriptd. |
| * |
| * IN msg_type - type of message to send |
| * IN msg_data - pointer to the message to send |
| * IN wait - whether or not to wait for a response |
| * OUT resp_msg - If not null, then this is set to the response string from |
| * the script. Caller is responsible to free. |
| * OUT signalled - If not null, then this is set to true if the script was |
| * signalled by track_script, false if not. |
| * |
| * RET SLURM_SUCCESS or SLURM_ERROR |
| */ |
| static int _send_to_slurmscriptd(uint32_t msg_type, void *msg_data, bool wait, |
| char **resp_msg, bool *signalled) |
| { |
| slurmscriptd_msg_t msg; |
| int rc = SLURM_SUCCESS; |
| script_response_t *script_resp = NULL; |
| buf_t *buffer = init_buf(0); |
| |
| xassert(running_in_slurmctld()); |
| memset(&msg, 0, sizeof(msg)); |
| |
| if (wait) { |
| script_resp = _script_resp_map_add(); |
| msg.key = script_resp->key; |
| } |
| msg.msg_data = msg_data; |
| msg.msg_type = msg_type; |
| |
| if (slurmscriptd_pack_msg(&msg, buffer) != SLURM_SUCCESS) { |
| rc = SLURM_ERROR; |
| goto cleanup; |
| } |
| if (msg_type == SLURMSCRIPTD_REQUEST_RUN_SCRIPT) |
| _incr_script_cnt(); |
| |
| if (wait) |
| slurm_mutex_lock(&script_resp->mutex); |
| rc = _write_msg(slurmctld_writefd, msg.msg_type, buffer, true); |
| |
| if ((rc == SLURM_SUCCESS) && wait) { |
| _wait_for_script_resp(script_resp, &rc, resp_msg, signalled); |
| } |
| if (wait) { |
| slurm_mutex_unlock(&script_resp->mutex); |
| _script_resp_map_remove(script_resp->key); |
| } |
| |
| cleanup: |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| } |
| |
| static void *_async_send_to_slurmscriptd(void *x) |
| { |
| slurmscriptd_msg_t *send_args = x; |
| |
| _send_to_slurmscriptd(send_args->msg_type, send_args->msg_data, false, |
| NULL, NULL); |
| slurmscriptd_free_msg(send_args); |
| xfree(send_args); |
| |
| return NULL; |
| } |
| |
| /* |
| * This should only be called by slurmscriptd. |
| */ |
| static int _respond_to_slurmctld(char *key, uint32_t job_id, char *resp_msg, |
| char *script_name, script_type_t script_type, |
| bool signalled, int status, bool timed_out) |
| { |
| int rc = SLURM_SUCCESS; |
| slurmscriptd_msg_t msg; |
| script_complete_t script_complete; |
| buf_t *buffer = init_buf(0); |
| |
| /* Check that we're running in slurmscriptd. */ |
| xassert(!running_in_slurmctld()); |
| |
| memset(&script_complete, 0, sizeof(script_complete)); |
| script_complete.job_id = job_id; |
| /* Just point to strings, don't xstrdup, so don't free. */ |
| script_complete.resp_msg = resp_msg; |
| script_complete.script_name = script_name; |
| script_complete.script_type = script_type; |
| script_complete.signalled = signalled; |
| script_complete.status = status; |
| script_complete.timed_out = timed_out; |
| |
| memset(&msg, 0, sizeof(msg)); |
| msg.key = key; |
| msg.msg_data = &script_complete; |
| msg.msg_type = SLURMSCRIPTD_REQUEST_SCRIPT_COMPLETE; |
| |
| if (slurmscriptd_pack_msg(&msg, buffer) != SLURM_SUCCESS) { |
| rc = SLURM_ERROR; |
| goto cleanup; |
| } |
| _write_msg(slurmscriptd_writefd, msg.msg_type, buffer, true); |
| |
| cleanup: |
| FREE_NULL_BUFFER(buffer); |
| return rc; |
| } |
| |
| static void _decr_script_cnt(void) |
| { |
| slurm_mutex_lock(&script_count_mutex); |
| script_count--; |
| if (!script_count && shutting_down) |
| slurm_cond_signal(&script_count_cond); |
| slurm_mutex_unlock(&script_count_mutex); |
| } |
| |
| static void _incr_script_cnt(void) |
| { |
| slurm_mutex_lock(&script_count_mutex); |
| script_count++; |
| slurm_mutex_unlock(&script_count_mutex); |
| } |
| |
| static void _change_proc_name(int argc, char **argv, char *proc_name) |
| { |
| char *log_prefix; |
| |
| /* Update slurm_daemon to ensure run_in_daemon() works properly. */ |
| slurm_daemon = IS_SLURMSCRIPTD; |
| |
| /* |
| * Change the process name to slurmscriptd. |
| * Since slurmscriptd logs to the slurmctld log file, add a |
| * prefix to make it clear which daemon a log comes from. |
| */ |
| init_setproctitle(argc, argv); |
| setproctitle("%s", proc_name); |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, proc_name, NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", |
| __func__, proc_name); |
| } |
| #endif |
| /* log_set_prefix takes control of an xmalloc()'d string */ |
| log_prefix = xstrdup_printf("%s: ", proc_name); |
| log_set_prefix(&log_prefix); |
| } |
| |
| static void _send_bb_script_msg(int write_fd, void *cb_arg) |
| { |
| run_script_msg_t *script_msg = cb_arg; |
| buf_t *buffer = init_buf(0); |
| bb_script_info_msg_t bb_msg = { |
| .cluster_name = slurm_conf.cluster_name, |
| .extra_buf = script_msg->extra_buf, |
| .extra_buf_size = script_msg->extra_buf_size, |
| .function = script_msg->script_name, |
| .job_id = script_msg->job_id, |
| .slurmctld_debug = slurm_conf.slurmctld_debug, |
| .slurmctld_logfile = slurm_conf.slurmctld_logfile, |
| .log_fmt = slurm_conf.log_fmt, |
| .plugindir = slurm_conf.plugindir, |
| .slurm_user_name = slurm_conf.slurm_user_name, |
| .slurm_user_id = slurm_conf.slurm_user_id, |
| }; |
| slurmscriptd_msg_t msg = { |
| .msg_data = &bb_msg, |
| .msg_type = SLURMSCRIPTD_REQUEST_BB_SCRIPT_INFO, |
| }; |
| |
| /* |
| * Send bb_script_info_msg_t. The write_mutex controls writing on |
| * slurmctld_writefd or slurmscriptd_writefd. We are writing to a pipe |
| * to a running script, so we do not need to lock write_mutex. |
| */ |
| slurmscriptd_pack_msg(&msg, buffer); |
| if (_write_msg(write_fd, SLURMSCRIPTD_REQUEST_BB_SCRIPT_INFO, buffer, |
| false) != SLURM_SUCCESS) { |
| error("%s: Failed writing data to script: burst_buffer.lua:%s, JobId=%u", |
| __func__, script_msg->script_name, script_msg->job_id); |
| goto fini; |
| } |
| |
| fini: |
| FREE_NULL_BUFFER(buffer); |
| } |
| |
| static int _recv_bb_script_msg(bb_script_info_msg_t **msg_pptr) |
| { |
| int rc = SLURM_SUCCESS, req = 0, len = 0; |
| int fd = STDIN_FILENO; |
| char *data = NULL; |
| buf_t *buffer = NULL; |
| slurmscriptd_msg_t scriptd_msg = { 0 }; |
| |
| safe_read(fd, &req, sizeof(req)); |
| scriptd_msg.msg_type = req; |
| if (scriptd_msg.msg_type != SLURMSCRIPTD_REQUEST_BB_SCRIPT_INFO) { |
| fatal("%s: Invalid msg_type=%u", |
| __func__, scriptd_msg.msg_type); |
| } |
| |
| safe_read(fd, &len, sizeof(len)); |
| if (!len) |
| fatal("%s: Invalid message length == 0", __func__); |
| |
| data = xmalloc(len); |
| safe_read(fd, data, len); |
| |
| /* Unpack bb_script_info_msg_t */ |
| buffer = create_buf(data, len); |
| rc = slurmscriptd_unpack_msg(&scriptd_msg, buffer); |
| *msg_pptr = scriptd_msg.msg_data; |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| |
| rwfail: |
| error("%s Failed", __func__); |
| return SLURM_ERROR; |
| } |
| |
| /* |
| * Run a script with a given timeout (in seconds). |
| * Return the status or SLURM_ERROR if fork() fails. |
| */ |
| static int _run_script(run_command_args_t *run_command_args, |
| run_script_msg_t *script_msg, |
| char **resp_msg, bool *signalled) |
| { |
| int status = SLURM_ERROR; |
| int ms_timeout; |
| char *resp = NULL; |
| bool killed = false; |
| int tmp_fd = 0; |
| uint32_t job_id = script_msg->job_id; |
| int timeout = script_msg->timeout; |
| char *tmp_file_env_name = script_msg->tmp_file_env_name; |
| char *tmp_file_str = script_msg->tmp_file_str; |
| |
| if ((timeout <= 0) || (timeout == NO_VAL16)) |
| ms_timeout = -1; /* wait indefinitely in run_command() */ |
| else |
| ms_timeout = timeout * 1000; |
| |
| run_command_args->max_wait = ms_timeout; |
| run_command_args->status = &status; |
| |
| if (tmp_file_str) { |
| char *tmp_file = NULL; |
| /* |
| * Open a file into which we dump tmp_file_str. |
| * Set an environment variable so the script will know how to |
| * read this file. We need to keep this file open for as long |
| * as the script is running. |
| */ |
| xassert(tmp_file_env_name); |
| tmp_fd = dump_to_memfd((char*) run_command_args->script_type, |
| tmp_file_str, &tmp_file); |
| if (tmp_fd == SLURM_ERROR) { |
| error("Failed to create tmp file for %s", |
| run_command_args->script_type); |
| tmp_fd = 0; |
| } else { |
| env_array_append(&run_command_args->env, |
| tmp_file_env_name, tmp_file); |
| } |
| xfree(tmp_file); |
| } |
| |
| if (run_command_args->tid) |
| track_script_rec_add(job_id, 0, pthread_self()); |
| resp = run_command(run_command_args); |
| if (run_command_args->tid) |
| killed = track_script_killed(pthread_self(), status, true); |
| else if (WIFSIGNALED(status) && (WTERMSIG(status) == SIGKILL)) |
| killed = true; |
| if (killed) { |
| info("%s: JobId=%u %s killed by signal %u", |
| __func__, job_id, run_command_args->script_type, |
| WTERMSIG(status)); |
| } else if (status != 0) { |
| error("%s: JobId=%u %s exit status %u:%u", |
| __func__, job_id, run_command_args->script_type, |
| WEXITSTATUS(status), |
| WTERMSIG(status)); |
| } else { |
| if (job_id) |
| log_flag(SCRIPT, "%s JobId=%u %s completed", |
| __func__, job_id, |
| run_command_args->script_type); |
| else |
| log_flag(SCRIPT, "%s %s completed", |
| __func__, run_command_args->script_type); |
| } |
| |
| /* |
| * Use pthread_self here instead of track_script_rec->tid to avoid any |
| * potential for race. |
| */ |
| if (run_command_args->tid) |
| track_script_remove(pthread_self()); |
| |
| if (tmp_fd) |
| close(tmp_fd); |
| |
| if (resp_msg) |
| *resp_msg = resp; |
| else |
| xfree(resp); |
| if (signalled) |
| *signalled = killed; |
| |
| return status; |
| } |
| |
| static int _handle_flush(slurmscriptd_msg_t *recv_msg) |
| { |
| log_flag(SCRIPT, "Handling %s", rpc_num2string(recv_msg->msg_type)); |
| /* Kill all running scripts */ |
| track_script_flush(); |
| /* |
| * DO NOT CALL _wait_for_powersave_scripts HERE. That would result in |
| * reconfigure waiting for up to MAX_SHUTDOWN_DELAY seconds, which is |
| * an unacceptably long time for reconfigure. |
| */ |
| |
| /* We need to respond to slurmctld that we are done */ |
| _respond_to_slurmctld(recv_msg->key, 0, NULL, |
| "SLURMSCRIPTD_REQUEST_FLUSH", SLURMSCRIPTD_NONE, |
| false, SLURM_SUCCESS, false); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _handle_flush_job(slurmscriptd_msg_t *recv_msg) |
| { |
| flush_job_msg_t *flush_msg = recv_msg->msg_data; |
| |
| log_flag(SCRIPT, "Handling %s for JobId=%u", |
| rpc_num2string(recv_msg->msg_type), flush_msg->job_id); |
| |
| track_script_flush_job(flush_msg->job_id); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _handle_shutdown(slurmscriptd_msg_t *recv_msg) |
| { |
| log_flag(SCRIPT, "Handling %s", rpc_num2string(recv_msg->msg_type)); |
| /* Kill or orphan all running scripts. */ |
| _wait_for_powersave_scripts(); |
| track_script_flush(); |
| |
| conmgr_request_shutdown(); |
| eio_signal_shutdown(msg_handle); |
| |
| return SLURM_ERROR; /* Don't handle any more requests. */ |
| } |
| |
| static int _handle_run_script(slurmscriptd_msg_t *recv_msg) |
| { |
| extern char **environ; |
| run_script_msg_t *script_msg = recv_msg->msg_data; |
| int rc, status = 0; |
| char *resp_msg = NULL; |
| bool signalled = false; |
| bool timed_out = false; |
| pthread_t tid = pthread_self(); |
| run_command_args_t run_command_args = { |
| .env = env_array_copy((const char **) script_msg->env), |
| .script_argv = script_msg->argv, |
| .script_path = script_msg->script_path, |
| .script_type = script_msg->script_name, |
| .tid = tid, |
| .timed_out = &timed_out, |
| }; |
| |
| log_flag(SCRIPT, "Handling %s (name=%s%s, JobId=%u, timeout=%u seconds, argc=%u, key=%s)", |
| rpc_num2string(recv_msg->msg_type), |
| script_msg->script_type == SLURMSCRIPTD_BB_LUA ? |
| "burst_buffer.lua:" : "", |
| script_msg->script_name, |
| script_msg->job_id, |
| script_msg->timeout, |
| script_msg->argc, |
| recv_msg->key); |
| |
| switch (script_msg->script_type) { |
| case SLURMSCRIPTD_BB_LUA: |
| /* |
| * Set SLURM_SCRIPT_CONTEXT in env for slurmctld, but we also |
| * need to preserve the parent's environment. There was not any |
| * env passed to us in script_msg. |
| */ |
| xassert(!run_command_args.env); |
| run_command_args.env = env_array_copy((const char **) environ); |
| env_array_append(&run_command_args.env, "SLURM_SCRIPT_CONTEXT", |
| "burst_buffer.lua"); |
| |
| /* burst_buffer.lua is not exec'd, it is run directly from C */ |
| run_command_args.ignore_path_exec_check = true; |
| |
| /* Send needed script info and configs to slurmctld */ |
| run_command_args.write_to_child = true; |
| run_command_args.cb = _send_bb_script_msg; |
| run_command_args.cb_arg = script_msg; |
| |
| status = _run_script(&run_command_args, script_msg, |
| &resp_msg, &signalled); |
| break; |
| case SLURMSCRIPTD_EPILOG: /* fall-through */ |
| case SLURMSCRIPTD_MAIL: |
| case SLURMSCRIPTD_PROLOG: |
| case SLURMSCRIPTD_REBOOT: |
| case SLURMSCRIPTD_RESV: |
| /* |
| * script_msg->timeout is in seconds but |
| * run_command_args.max_wait expects milliseconds. |
| * script_msg->timeout may also not be set (NO_VAL16). |
| * Let _run_script handle the conversion. |
| */ |
| status = _run_script(&run_command_args, script_msg, |
| &resp_msg, &signalled); |
| break; |
| case SLURMSCRIPTD_POWER: |
| slurm_mutex_lock(&powersave_script_count_mutex); |
| powersave_script_count++; |
| slurm_mutex_unlock(&powersave_script_count_mutex); |
| |
| /* |
| * We want these scripts to keep running even if slurmctld |
| * shuts down, so do not track these scripts with track_script |
| * so they don't get killed when slurmctld shuts down. |
| */ |
| run_command_args.tid = 0; |
| run_command_args.orphan_on_shutdown = true; |
| status = _run_script(&run_command_args, script_msg, |
| &resp_msg, &signalled); |
| |
| break; |
| default: |
| error("%s: Invalid script type=%d", |
| __func__, script_msg->script_type); |
| status = SLURM_ERROR; |
| break; |
| } |
| |
| /* Send response */ |
| rc = _respond_to_slurmctld(recv_msg->key, script_msg->job_id, |
| resp_msg, script_msg->script_name, |
| script_msg->script_type, signalled, status, |
| timed_out); |
| |
| if (script_msg->script_type == SLURMSCRIPTD_POWER) { |
| slurm_mutex_lock(&powersave_script_count_mutex); |
| powersave_script_count--; |
| if (!powersave_script_count && powersave_wait_called) |
| slurm_cond_signal(&powersave_script_cond); |
| slurm_mutex_unlock(&powersave_script_count_mutex); |
| } |
| xfree(resp_msg); |
| env_array_free(run_command_args.env); |
| |
| return rc; |
| } |
| |
| static int _notify_script_done(char *key, script_complete_t *script_complete) |
| { |
| int rc = SLURM_SUCCESS; |
| script_response_t *script_resp; |
| |
| slurm_mutex_lock(&script_resp_map_mutex); |
| script_resp = xhash_get(script_resp_map, key, strlen(key)); |
| if (!script_resp) { |
| /* |
| * This should never happen. We don't know how to notify |
| * whoever started this script that it is done. |
| */ |
| error("%s: We don't know who started this script (JobId=%u, func=%s, key=%s) so we can't notify them.", |
| __func__, script_complete->job_id, |
| script_complete->script_name, key); |
| rc = SLURM_ERROR; |
| } else { |
| slurm_mutex_lock(&script_resp->mutex); |
| script_resp->received_response = true; |
| script_resp->resp_msg = xstrdup(script_complete->resp_msg); |
| script_resp->rc = script_complete->status; |
| script_resp->track_script_signalled = |
| script_complete->signalled; |
| slurm_cond_signal(&script_resp->cond); |
| slurm_mutex_unlock(&script_resp->mutex); |
| } |
| slurm_mutex_unlock(&script_resp_map_mutex); |
| |
| return rc; |
| } |
| |
| static int _handle_script_complete(slurmscriptd_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| script_complete_t *script_complete = msg->msg_data; |
| |
| /* Notify the waiting thread that the script is done */ |
| if (msg->key) |
| rc = _notify_script_done(msg->key, script_complete); |
| |
| log_flag(SCRIPT, "Handling %s (name=%s, JobId=%u, resp_msg=%s)", |
| rpc_num2string(msg->msg_type), |
| script_complete->script_name, |
| script_complete->job_id, |
| script_complete->resp_msg); |
| |
| switch (script_complete->script_type) { |
| case SLURMSCRIPTD_BB_LUA: |
| case SLURMSCRIPTD_MAIL: |
| case SLURMSCRIPTD_REBOOT: |
| case SLURMSCRIPTD_RESV: |
| break; /* Nothing more to do */ |
| case SLURMSCRIPTD_EPILOG: |
| prep_epilog_slurmctld_callback(script_complete->status, |
| script_complete->job_id, |
| script_complete->timed_out); |
| break; |
| case SLURMSCRIPTD_POWER: |
| ping_nodes_now = true; |
| break; |
| case SLURMSCRIPTD_PROLOG: |
| prep_prolog_slurmctld_callback(script_complete->status, |
| script_complete->job_id, |
| script_complete->timed_out); |
| break; |
| case SLURMSCRIPTD_NONE: |
| /* |
| * Some other RPC (for example, SLURMSCRIPTD_REQUEST_FLUSH) |
| * completed and sent this back to notify a waiting thread of |
| * its completion. We do not want to call _decr_script_cnt() |
| * since it wasn't a script that ran, so we just return right |
| * now. |
| */ |
| return SLURM_SUCCESS; |
| default: |
| error("%s: unknown script type for script=%s, JobId=%u", |
| rpc_num2string(msg->msg_type), |
| script_complete->script_name, script_complete->job_id); |
| break; |
| } |
| |
| _decr_script_cnt(); |
| |
| return rc; |
| } |
| |
| static int _handle_update_debug_flags(slurmscriptd_msg_t *msg) |
| { |
| slurmctld_lock_t config_write_lock = |
| { .conf = WRITE_LOCK }; |
| debug_flags_msg_t *debug_msg = msg->msg_data; |
| char *flag_string; |
| |
| flag_string = debug_flags2str(debug_msg->debug_flags); |
| log_flag(SCRIPT, "Handling %s; set DebugFlags to '%s'", |
| rpc_num2string(msg->msg_type), |
| flag_string ? flag_string : "none"); |
| xfree(flag_string); |
| |
| lock_slurmctld(config_write_lock); |
| slurm_conf.debug_flags = debug_msg->debug_flags; |
| slurm_conf.last_update = time(NULL); |
| unlock_slurmctld(config_write_lock); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _handle_update_log(slurmscriptd_msg_t *msg) |
| { |
| slurmctld_lock_t config_write_lock = |
| { .conf = WRITE_LOCK }; |
| log_msg_t *log_msg = msg->msg_data; |
| int debug_level = (int) log_msg->debug_level; |
| bool log_rotate = log_msg->log_rotate; |
| |
| log_flag(SCRIPT, "Handling %s; set debug level to '%s'%s", |
| rpc_num2string(msg->msg_type), |
| log_num2string(debug_level), |
| log_rotate ? ", logrotate" : ""); |
| |
| lock_slurmctld(config_write_lock); |
| if (log_rotate) { |
| update_logging(); |
| } else { |
| update_log_levels(debug_level, debug_level); |
| slurm_conf.slurmctld_debug = debug_level; |
| slurm_conf.last_update = time(NULL); |
| } |
| unlock_slurmctld(config_write_lock); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _handle_request(int req, buf_t *buffer) |
| { |
| int rc; |
| slurmscriptd_msg_t recv_msg; |
| |
| memset(&recv_msg, 0, sizeof(recv_msg)); |
| recv_msg.msg_type = (uint32_t)req; |
| if (slurmscriptd_unpack_msg(&recv_msg, buffer) != SLURM_SUCCESS) { |
| error("%s: Unable to handle message %d", __func__, req); |
| rc = SLURM_ERROR; |
| goto cleanup; |
| } |
| |
| switch (req) { |
| case SLURMSCRIPTD_REQUEST_FLUSH: |
| rc = _handle_flush(&recv_msg); |
| break; |
| case SLURMSCRIPTD_REQUEST_FLUSH_JOB: |
| rc = _handle_flush_job(&recv_msg); |
| break; |
| case SLURMSCRIPTD_REQUEST_RUN_SCRIPT: |
| rc = _handle_run_script(&recv_msg); |
| break; |
| case SLURMSCRIPTD_REQUEST_SCRIPT_COMPLETE: |
| rc = _handle_script_complete(&recv_msg); |
| break; |
| case SLURMSCRIPTD_REQUEST_UPDATE_DEBUG_FLAGS: |
| rc = _handle_update_debug_flags(&recv_msg); |
| break; |
| case SLURMSCRIPTD_REQUEST_UPDATE_LOG: |
| rc = _handle_update_log(&recv_msg); |
| break; |
| case SLURMSCRIPTD_SHUTDOWN: |
| rc = _handle_shutdown(&recv_msg); |
| break; |
| default: |
| error("%s: slurmscriptd: Unrecognied request: %d", |
| __func__, req); |
| rc = SLURM_ERROR; |
| break; |
| } |
| |
| cleanup: |
| slurmscriptd_free_msg(&recv_msg); |
| return rc; |
| } |
| |
| static void *_handle_accept(void *args) |
| { |
| req_args_t *req_args = (req_args_t *)args; |
| |
| _handle_request(req_args->req, req_args->buffer); |
| FREE_NULL_BUFFER(req_args->buffer); |
| xfree(req_args); |
| |
| return NULL; |
| } |
| |
| static int _msg_accept(eio_obj_t *obj, list_t *objs) |
| { |
| int rc = SLURM_SUCCESS, req, buf_len = 0; |
| char *incoming_buffer = NULL; |
| buf_t *buffer = NULL; |
| req_args_t *req_args; |
| |
| while (true) { |
| if ((rc = read(obj->fd, &req, sizeof(int))) != sizeof(int)) { |
| if (rc == 0) { /* EOF, normal */ |
| break; |
| } else { |
| debug3("%s: leaving on read error: %m", __func__); |
| rc = SLURM_ERROR; |
| break; |
| } |
| } |
| /* |
| * We always write the length of the buffer so we can read |
| * the whole thing right here. We write a 0 for the length if |
| * no additional data was sent. |
| */ |
| safe_read(obj->fd, &buf_len, sizeof(int)); |
| if (buf_len) { |
| incoming_buffer = xmalloc(buf_len); |
| safe_read(obj->fd, incoming_buffer, buf_len); |
| buffer = create_buf(incoming_buffer, buf_len); |
| } |
| |
| req_args = xmalloc(sizeof *req_args); |
| req_args->req = req; |
| req_args->buffer = buffer; |
| slurm_thread_create_detached(_handle_accept, req_args); |
| |
| /* |
| * xmalloc()'d data will be xfree()'d by _handle_accept() |
| */ |
| incoming_buffer = NULL; |
| buffer = NULL; |
| req_args = NULL; |
| } |
| |
| return rc; |
| rwfail: |
| error("%s: read/write op failed", __func__); |
| return SLURM_ERROR; |
| } |
| |
| static void _setup_eio(int fd) |
| { |
| eio_obj_t *eio_obj; |
| |
| fd_set_nonblocking(fd); |
| |
| eio_obj = eio_obj_create(fd, &msg_ops, NULL); |
| msg_handle = eio_handle_create(0); |
| eio_new_initial_obj(msg_handle, eio_obj); |
| } |
| |
| static void _on_sigint(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGINT. Ignoring."); |
| } |
| |
| static void _on_sigterm(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGTERM. Ignoring."); |
| } |
| |
| static void _on_sigchld(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGCHLD. Ignoring"); |
| } |
| |
| static void _on_sigquit(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGQUIT. Ignoring."); |
| } |
| |
| static void _on_sighup(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGHUP. Ignoring."); |
| } |
| |
| static void _on_sigusr1(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGUSR1. Ignoring."); |
| } |
| |
| static void _on_sigusr2(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGUSR2. Ignoring."); |
| } |
| |
| static void _on_sigpipe(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| /* debug5 to avoid polluting the SCRIPT debug flag */ |
| debug5("Caught SIGPIPE. Ignoring."); |
| } |
| |
| static void _on_sigxcpu(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGXCPU. Ignoring."); |
| } |
| |
| static void _on_sigabrt(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGABRT. Ignoring."); |
| } |
| |
| static void _on_sigalrm(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| log_flag(SCRIPT, "Caught SIGALRM. Ignoring."); |
| } |
| |
| static void _init_slurmscriptd_conmgr(void) |
| { |
| if (slurm_conf.slurmctld_params) |
| conmgr_set_params(slurm_conf.slurmctld_params); |
| |
| conmgr_init(0, 0); |
| |
| /* |
| * Ignore signals. slurmscriptd should only handle requests directly |
| * from slurmctld. |
| */ |
| conmgr_add_work_signal(SIGINT, _on_sigint, NULL); |
| conmgr_add_work_signal(SIGTERM, _on_sigterm, NULL); |
| conmgr_add_work_signal(SIGCHLD, _on_sigchld, NULL); |
| conmgr_add_work_signal(SIGQUIT, _on_sigquit, NULL); |
| conmgr_add_work_signal(SIGHUP, _on_sighup, NULL); |
| conmgr_add_work_signal(SIGUSR1, _on_sigusr1, NULL); |
| conmgr_add_work_signal(SIGUSR2, _on_sigusr2, NULL); |
| conmgr_add_work_signal(SIGPIPE, _on_sigpipe, NULL); |
| conmgr_add_work_signal(SIGXCPU, _on_sigxcpu, NULL); |
| conmgr_add_work_signal(SIGABRT, _on_sigabrt, NULL); |
| conmgr_add_work_signal(SIGALRM, _on_sigalrm, NULL); |
| |
| conmgr_run(false); |
| } |
| |
| __attribute__((noreturn)) |
| extern void slurmscriptd_run_slurmscriptd(int argc, char **argv, |
| char *binary_path) |
| { |
| ssize_t i; |
| int rc = SLURM_ERROR, ack; |
| |
| slurmscriptd_writefd = SLURMSCRIPT_WRITE_FD; |
| slurmscriptd_readfd = SLURMSCRIPT_READ_FD; |
| |
| _change_proc_name(argc, argv, "slurmscriptd"); |
| |
| /* Test communications with slurmctld. */ |
| ack = SLURM_SUCCESS; |
| i = write(slurmscriptd_writefd, &ack, sizeof(int)); |
| if (i != sizeof(int)) { |
| error("%s: slurmscriptd: failed to send return code to slurmctld: %m", |
| __func__); |
| _exit(1); |
| } |
| i = read(slurmscriptd_readfd, &rc, sizeof(int)); |
| if (i < 0) { |
| error("%s: slurmscriptd: Can not read ack from slurmctld: %m", |
| __func__); |
| _exit(1); |
| } else if (i != sizeof(int)) { |
| error("%s: slurmscriptd: slurmctld failed to send expected ack: received %zd bytes when %zd bytes was expected.", |
| __func__, i, sizeof(int)); |
| _exit(1); |
| } |
| |
| debug("slurmscriptd: Got ack from slurmctld"); |
| |
| i = write(slurmscriptd_writefd, &ack, sizeof(int)); |
| if (i != sizeof(int)) |
| fatal("%s: Failed to send initialization code to slurmctld", |
| __func__); |
| |
| _init_slurmscriptd_conmgr(); |
| |
| debug("Initialization successful"); |
| |
| slurm_mutex_init(&powersave_script_count_mutex); |
| slurm_mutex_init(&write_mutex); |
| if ((run_command_init(0, NULL, binary_path) != SLURM_SUCCESS) && |
| binary_path && binary_path[0]) |
| fatal("%s: Unable to reliably execute %s", |
| __func__, binary_path); |
| |
| _setup_eio(slurmscriptd_readfd); |
| |
| debug("%s: started", __func__); |
| eio_handle_mainloop(msg_handle); |
| debug("%s: finished", __func__); |
| |
| #ifdef MEMORY_LEAK_DEBUG |
| track_script_fini(); |
| slurm_mutex_destroy(&powersave_script_count_mutex); |
| #endif |
| |
| /* We never want to return from here, only exit. */ |
| _exit(0); |
| } |
| |
| static void *_slurmctld_listener_thread(void *x) |
| { |
| debug("%s: started listening to slurmscriptd", __func__); |
| eio_handle_mainloop(msg_handle); |
| debug("%s: finished", __func__); |
| |
| return NULL; |
| } |
| |
| static void _wait_for_all_scripts(void) |
| { |
| int last_pc = 0; |
| struct timespec ts = {0, 0}; |
| |
| /* |
| * Wait until all script complete messages have been processed or until |
| * the readfd is closed, in which case we know we'll never get more |
| * messages from slurmscriptd. |
| */ |
| slurm_mutex_lock(&script_count_mutex); |
| while (slurmctld_readfd > 0) { |
| if (!script_count) |
| break; |
| if (last_pc != script_count) |
| info("waiting for %d running processes", script_count); |
| last_pc = script_count; |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&script_count_cond, &script_count_mutex, |
| &ts); |
| } |
| slurm_mutex_unlock(&script_count_mutex); |
| } |
| |
| static void _kill_slurmscriptd(void) |
| { |
| int status; |
| int rc; |
| |
| if (slurmscriptd_pid <= 0) { |
| error("%s: slurmscriptd_pid < 0, we don't know the PID of slurmscriptd.", |
| __func__); |
| return; |
| } |
| |
| shutting_down = true; |
| slurmscriptd_flush(); |
| |
| /* Tell slurmscriptd to shutdown, then wait for it to finish. */ |
| rc = _send_to_slurmscriptd(SLURMSCRIPTD_SHUTDOWN, NULL, false, NULL, |
| NULL); |
| |
| if (rc == SLURM_SUCCESS) |
| _wait_for_all_scripts(); |
| |
| if (rc != SLURM_SUCCESS) { |
| /* Shutdown signal failed. Try to reap slurmscriptd now. */ |
| if (waitpid(slurmscriptd_pid, &status, WNOHANG) == 0) { |
| /* |
| * slurmscriptd is not reaped and we cannot send a |
| * shutdown signal to slurmscriptd; kill it so we know |
| * that we won't wait forever. |
| */ |
| run_command_waitpid_timeout("slurmscriptd", |
| slurmscriptd_pid, |
| &status, 10 * MSEC_IN_SEC, |
| 0, 0, NULL); |
| } |
| } else { |
| while (waitpid(slurmscriptd_pid, &status, 0) < 0) { |
| if (errno == EINTR) |
| continue; |
| error("%s: Unable to reap slurmscriptd child process", |
| __func__); |
| break; |
| } |
| } |
| } |
| |
| /* |
| * Initialize a run_script_msg_t. This doesn't set all fields in |
| * run_script_msg_t but sets the ones most likely to just be duplicate code |
| * everywhere else. Use this when you need to allocate run_script_msg on the |
| * heap. |
| * |
| * Return a heap allocated structure that must be free'd with |
| * slurmscriptd_free_run_script_msg(). |
| */ |
| static run_script_msg_t *_init_run_script_msg(char **env, |
| char *script_name, |
| char *script_path, |
| script_type_t script_type, |
| uint32_t timeout) |
| { |
| run_script_msg_t *run_script_msg; |
| |
| run_script_msg = xmalloc(sizeof(*run_script_msg)); |
| run_script_msg->env = env_array_copy((const char **) env); |
| run_script_msg->script_name = xstrdup(script_name); |
| run_script_msg->script_path = xstrdup(script_path); |
| run_script_msg->script_type = script_type; |
| run_script_msg->timeout = timeout; |
| |
| return run_script_msg; |
| } |
| |
| static job_info_msg_t *_unpack_bb_job_info(bb_script_info_msg_t *bb_msg) |
| { |
| buf_t *extra_buf; |
| slurm_msg_t *extra_msg; |
| job_info_msg_t *job_info; |
| |
| if (!bb_msg->extra_buf_size) |
| return NULL; |
| |
| extra_msg = xmalloc(sizeof *extra_msg); |
| slurm_msg_t_init(extra_msg); |
| extra_msg->protocol_version = SLURM_PROTOCOL_VERSION; |
| extra_msg->msg_type = RESPONSE_JOB_INFO; |
| extra_buf = create_buf(bb_msg->extra_buf, bb_msg->extra_buf_size); |
| unpack_msg(extra_msg, extra_buf); |
| job_info = extra_msg->data; |
| extra_msg->data = NULL; |
| |
| /* create_buf() does not duplicate the data, it just points to it. */ |
| extra_buf->head = NULL; |
| FREE_NULL_BUFFER(extra_buf); |
| slurm_free_msg(extra_msg); |
| |
| return job_info; |
| } |
| |
| static void _init_bb_script_config(char **function, uint32_t *job_id, |
| job_info_msg_t **job_info) |
| { |
| bb_script_info_msg_t *bb_msg = NULL; |
| |
| if (_recv_bb_script_msg(&bb_msg) != SLURM_SUCCESS) |
| fatal("Failed to receive burst buffer script msg"); |
| if (!bb_msg->function || !bb_msg->function[0]) |
| fatal_abort("%s: Invalid NULL function", __func__); |
| |
| *function = bb_msg->function; |
| *job_id = bb_msg->job_id; |
| *job_info = _unpack_bb_job_info(bb_msg); |
| |
| slurm_conf.cluster_name = bb_msg->cluster_name; |
| slurm_conf.slurmctld_debug = bb_msg->slurmctld_debug; |
| slurm_conf.slurmctld_logfile = bb_msg->slurmctld_logfile; |
| slurm_conf.log_fmt = bb_msg->log_fmt; |
| slurm_conf.plugindir = bb_msg->plugindir; |
| slurm_conf.slurm_user_name = bb_msg->slurm_user_name; |
| slurm_conf.slurm_user_id = bb_msg->slurm_user_id; |
| |
| /* |
| * We copied the pointers in bb_msg, so only free bb_msg, not its |
| * contents. |
| */ |
| xfree(bb_msg); |
| } |
| |
| extern void slurmscriptd_flush(void) |
| { |
| _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_FLUSH, NULL, true, NULL, |
| NULL); |
| |
| _wait_for_all_scripts(); |
| } |
| |
| extern void slurmscriptd_handle_bb_lua_mode(int argc, char **argv) |
| { |
| int exit_code = 127; |
| char *function = NULL; |
| uint32_t job_id = 0; |
| char *proc_name = "burst_buffer.lua"; |
| char *resp = NULL; |
| char **script_argv = NULL; |
| int script_argc = 0; |
| /* The lock is required for update_logging() */ |
| slurmctld_lock_t config_write_lock = { |
| .conf = WRITE_LOCK, |
| }; |
| /* |
| * Only log errors until we read the config and update to the configured |
| * level. |
| */ |
| log_options_t log_opts = LOG_OPTS_STDERR_ONLY; |
| job_info_msg_t *job_info = NULL; |
| |
| setpgid(0, 0); |
| closeall(3); /* Do this before initializing logging */ |
| /* coverity[leaked_handle] */ |
| |
| /* Logging will go to stdout/stderr until we call update_logging(). */ |
| log_init(proc_name, log_opts, LOG_DAEMON, NULL); |
| |
| /* |
| * Change our process name and make it so running_in_slurmctld() and |
| * run_in_daemon() return false. |
| */ |
| if (argc < RUN_COMMAND_LAUNCHER_ARGC) { |
| fatal("%s: Unexpected argc=%d, it should be >= %d", |
| __func__, argc, RUN_COMMAND_LAUNCHER_ARGC); |
| } |
| |
| script_argc = argc - RUN_COMMAND_LAUNCHER_ARGC; |
| /* _change_proc_name overwrites argv. Copy the args that we need. */ |
| script_argv = slurm_char_array_copy(script_argc, |
| &argv[RUN_COMMAND_LAUNCHER_ARGC]); |
| _change_proc_name(argc, argv, proc_name); |
| |
| /* Minimal config setup: */ |
| init_slurm_conf(&slurm_conf); |
| _init_bb_script_config(&function, &job_id, &job_info); |
| |
| /* |
| * Initialize plugins. |
| */ |
| slurm_conf.bb_type = "burst_buffer/lua"; |
| if (bb_g_init() != SLURM_SUCCESS) |
| fatal("failed to initialize burst_buffer plugin"); |
| |
| /* |
| * update_logging() makes logs go to the slurmctld log file. |
| * Call update_logging() after initializing plugins to avoid seeing some |
| * extra debug logs about loading plugins. |
| */ |
| lock_slurmctld(config_write_lock); |
| update_logging(); |
| unlock_slurmctld(config_write_lock); |
| |
| /* Run the script */ |
| exit_code = bb_g_run_script(function, job_id, script_argc, script_argv, |
| job_info, &resp); |
| if (resp) |
| safe_write(STDOUT_FILENO, resp, strlen(resp)); |
| |
| /* Ignore memory leaks because we are calling exit() */ |
| rwfail: |
| exit(exit_code); |
| } |
| |
| extern void slurmscriptd_flush_job(uint32_t job_id) |
| { |
| flush_job_msg_t *msg = xmalloc(sizeof(*msg)); |
| slurmscriptd_msg_t *send_args = xmalloc(sizeof(*send_args)); |
| |
| msg->job_id = job_id; |
| send_args->msg_data = msg; |
| send_args->msg_type = SLURMSCRIPTD_REQUEST_FLUSH_JOB; |
| |
| slurm_thread_create_detached(_async_send_to_slurmscriptd, send_args); |
| } |
| |
| extern int slurmscriptd_run_mail(char *script_path, uint32_t argc, char **argv, |
| char **env, uint32_t timeout, char **resp) |
| { |
| int status; |
| run_script_msg_t run_script_msg; |
| |
| memset(&run_script_msg, 0, sizeof(run_script_msg)); |
| |
| /* Init run_script_msg */ |
| run_script_msg.argc = argc; |
| run_script_msg.argv = argv; |
| run_script_msg.env = env; |
| run_script_msg.script_name = "MailProg"; |
| run_script_msg.script_path = script_path; |
| run_script_msg.script_type = SLURMSCRIPTD_MAIL; |
| run_script_msg.timeout = timeout; |
| |
| /* Send message; wait for response */ |
| status = _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_RUN_SCRIPT, |
| &run_script_msg, true, resp, NULL); |
| |
| /* Cleanup */ |
| return status; |
| } |
| |
| extern void slurmscriptd_run_power(char *script_path, char *hosts, |
| char *features, uint32_t job_id, |
| char *script_name, uint32_t timeout, |
| char *tmp_file_env_name, char *tmp_file_str) |
| { |
| run_script_msg_t *run_script_msg; |
| slurmscriptd_msg_t *send_args = xmalloc(sizeof(*send_args)); |
| int argc; |
| char **env, **argv; |
| |
| argc = 3; |
| argv = xcalloc(argc + 1, sizeof(char*)); /* Null terminated */ |
| argv[0] = xstrdup(script_path); |
| argv[1] = xstrdup(hosts); |
| argv[2] = xstrdup(features); |
| |
| env = env_array_create(); |
| env_array_append(&env, "SLURM_CONF", slurm_conf.slurm_conf); |
| if (job_id) |
| env_array_append_fmt(&env, "SLURM_JOB_ID", "%u", job_id); |
| |
| /* Init run_script_msg */ |
| run_script_msg = _init_run_script_msg(NULL, script_name, script_path, |
| SLURMSCRIPTD_POWER, timeout); |
| |
| run_script_msg->argc = argc; |
| run_script_msg->argv = argv; |
| run_script_msg->env = env; |
| run_script_msg->job_id = job_id; |
| run_script_msg->tmp_file_env_name = xstrdup(tmp_file_env_name); |
| run_script_msg->tmp_file_str = xstrdup(tmp_file_str); |
| |
| /* Send message; don't wait for response */ |
| send_args->msg_data = run_script_msg; |
| send_args->msg_type = SLURMSCRIPTD_REQUEST_RUN_SCRIPT; |
| slurm_thread_create_detached(_async_send_to_slurmscriptd, send_args); |
| } |
| |
| extern int slurmscriptd_run_bb_lua(uint32_t job_id, char *function, |
| uint32_t argc, char **argv, uint32_t timeout, |
| buf_t *job_buf, char **resp, |
| bool *track_script_signalled) |
| { |
| int status, rc = SLURM_ERROR; |
| uint32_t extra_buf_size = job_buf ? job_buf->processed : 0; |
| run_script_msg_t run_script_msg; |
| |
| memset(&run_script_msg, 0, sizeof(run_script_msg)); |
| |
| /* Init run_script_msg */ |
| run_script_msg.argc = argc; |
| run_script_msg.argv = argv; |
| run_script_msg.extra_buf = job_buf ? job_buf->head : NULL; |
| run_script_msg.extra_buf_size = extra_buf_size; |
| run_script_msg.job_id = job_id; |
| run_script_msg.script_name = function; /* Shallow copy, do not free */ |
| run_script_msg.script_path = "burst_buffer.lua"; |
| run_script_msg.script_type = SLURMSCRIPTD_BB_LUA; |
| run_script_msg.timeout = timeout; |
| |
| /* Send message; wait for response */ |
| status = _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_RUN_SCRIPT, |
| &run_script_msg, true, resp, |
| track_script_signalled); |
| |
| if (WIFEXITED(status)) |
| rc = WEXITSTATUS(status); |
| else |
| rc = SLURM_ERROR; |
| |
| return rc; |
| } |
| |
| extern void slurmscriptd_run_prepilog(uint32_t job_id, bool is_epilog, |
| char *script, char **env) |
| { |
| run_script_msg_t *run_script_msg; |
| slurmscriptd_msg_t *send_args = xmalloc(sizeof(*send_args)); |
| char *script_name; |
| script_type_t script_type; |
| int timeout = is_epilog ? |
| slurm_conf.epilog_timeout : slurm_conf.prolog_timeout; |
| |
| if (is_epilog) { |
| script_name = "EpilogSlurmctld"; |
| script_type = SLURMSCRIPTD_EPILOG; |
| } else { |
| script_name = "PrologSlurmctld"; |
| script_type = SLURMSCRIPTD_PROLOG; |
| } |
| |
| run_script_msg = _init_run_script_msg(env, script_name, script, |
| script_type, timeout); |
| run_script_msg->argc = 1; |
| run_script_msg->argv = xcalloc(2, sizeof(char *)); /* NULL terminated */ |
| run_script_msg->argv[0] = xstrdup(script); |
| run_script_msg->job_id = job_id; |
| |
| /* |
| * Because this thread is holding the job write lock, do the write in |
| * a different detached thread so we do not lock up the slurmctld |
| * process if the write is blocked for some reason. |
| */ |
| send_args->msg_data = run_script_msg; |
| send_args->msg_type = SLURMSCRIPTD_REQUEST_RUN_SCRIPT; |
| slurm_thread_create_detached(_async_send_to_slurmscriptd, send_args); |
| } |
| |
| extern int slurmscriptd_run_reboot(char *script_path, uint32_t argc, |
| char **argv) |
| { |
| int status; |
| |
| run_script_msg_t run_script_msg; |
| |
| memset(&run_script_msg, 0, sizeof(run_script_msg)); |
| |
| /* Init run_script_msg */ |
| run_script_msg.argc = argc; |
| run_script_msg.argv = argv; |
| run_script_msg.script_name = "RebootProgram"; |
| run_script_msg.script_path = script_path; |
| run_script_msg.script_type = SLURMSCRIPTD_REBOOT; |
| |
| /* Send message; wait for response */ |
| status = _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_RUN_SCRIPT, |
| &run_script_msg, true, NULL, NULL); |
| |
| return status; |
| } |
| |
| extern void slurmscriptd_run_resv(char *script_path, uint32_t argc, char **argv, |
| uint32_t timeout, char *script_name) |
| { |
| run_script_msg_t *run_script_msg; |
| slurmscriptd_msg_t *send_args = xmalloc(sizeof(*send_args)); |
| |
| /* Init run_script_msg */ |
| run_script_msg = _init_run_script_msg(NULL, script_name, script_path, |
| SLURMSCRIPTD_RESV, timeout); |
| run_script_msg->argc = argc; |
| run_script_msg->argv = slurm_char_array_copy(argc, argv); |
| |
| /* Send message; don't wait for response */ |
| send_args->msg_data = run_script_msg; |
| send_args->msg_type = SLURMSCRIPTD_REQUEST_RUN_SCRIPT; |
| slurm_thread_create_detached(_async_send_to_slurmscriptd, send_args); |
| } |
| |
| extern void slurmscriptd_update_debug_flags(uint64_t debug_flags) |
| { |
| debug_flags_msg_t msg; |
| |
| memset(&msg, 0, sizeof(msg)); |
| |
| msg.debug_flags = debug_flags; |
| _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_UPDATE_DEBUG_FLAGS, &msg, |
| false, NULL, NULL); |
| } |
| |
| extern void slurmscriptd_update_log_level(int debug_level, bool log_rotate) |
| { |
| log_msg_t log_msg; |
| |
| memset(&log_msg, 0, sizeof(log_msg)); |
| |
| log_msg.debug_level = (uint32_t) debug_level; |
| log_msg.log_rotate = log_rotate; |
| _send_to_slurmscriptd(SLURMSCRIPTD_REQUEST_UPDATE_LOG, &log_msg, |
| false, NULL, NULL); |
| } |
| |
| extern int slurmscriptd_init(char **argv, char *binary_path) |
| { |
| int to_slurmscriptd[2] = {-1, -1}; |
| int to_slurmctld[2] = {-1, -1}; |
| |
| if ((pipe(to_slurmscriptd) < 0) || (pipe(to_slurmctld) < 0)) |
| fatal("%s: pipe failed: %m", __func__); |
| |
| slurmctld_readfd = to_slurmctld[0]; |
| slurmctld_writefd = to_slurmscriptd[1]; |
| slurmscriptd_readfd = to_slurmscriptd[0]; |
| slurmscriptd_writefd = to_slurmctld[1]; |
| |
| slurmscriptd_pid = fork(); |
| if (slurmscriptd_pid < 0) { /* fork() failed */ |
| fatal("%s: fork() failed: %m", __func__); |
| } else if (slurmscriptd_pid > 0) { /* parent (slurmctld) */ |
| ssize_t i; |
| int rc = SLURM_ERROR, ack; |
| |
| /* |
| * Communication between slurmctld and slurmscriptd happens via |
| * the to_slurmscriptd and to_slurmctld pipes. |
| * slurmctld writes data to slurmscriptd with the |
| * to_slurmscriptd pipe and slurmscriptd writes data to |
| * slurmctld with the to_slurmctld pipe. |
| * If there is a failure with startup, SIGKILL the slurmscriptd |
| * and then exit. The slurmscriptd pid will be adopted and then |
| * reaped by init, so we don't need to call waitpid(). |
| */ |
| if (close(to_slurmscriptd[0]) < 0) { |
| rc = errno; |
| killpg(slurmscriptd_pid, SIGKILL); |
| errno = rc; |
| fatal("%s: slurmctld: Unable to close read to_slurmscriptd in parent: %m", |
| __func__); |
| } |
| if (close(to_slurmctld[1]) < 0) { |
| rc = errno; |
| killpg(slurmscriptd_pid, SIGKILL); |
| errno = rc; |
| fatal("%s: slurmctld: Unable to close write to_slurmctld in parent: %m", |
| __func__); |
| } |
| |
| /* Test communications with slurmscriptd. */ |
| i = read(slurmctld_readfd, &rc, sizeof(int)); |
| if (i < 0) { |
| rc = errno; |
| killpg(slurmscriptd_pid, SIGKILL); |
| errno = rc; |
| fatal_abort("%s: slurmctld: Can not read return code from slurmscriptd: %m", |
| __func__); |
| } else if (i != sizeof(int)) { |
| rc = errno; |
| killpg(slurmscriptd_pid, SIGKILL); |
| errno = rc; |
| fatal_abort("%s: slurmctld: slurmscriptd failed to send return code: %m", |
| __func__); |
| } |
| if (rc != SLURM_SUCCESS) { |
| killpg(slurmscriptd_pid, SIGKILL); |
| fatal_abort("%s: slurmctld: slurmscriptd did not initialize", |
| __func__); |
| } |
| ack = SLURM_SUCCESS; |
| i = write(slurmctld_writefd, &ack, sizeof(int)); |
| if (i != sizeof(int)) { |
| rc = errno; |
| killpg(slurmscriptd_pid, SIGKILL); |
| errno = rc; |
| fatal_abort("%s: slurmctld: failed to send ack to slurmscriptd: %m", |
| __func__); |
| } |
| |
| /* Get slurmscriptd initialization status */ |
| i = read(slurmctld_readfd, &rc, sizeof(int)); |
| if (i < 0) |
| fatal("%s: Cannot read slurmscriptd initialization code", |
| __func__); |
| if (rc != SLURM_SUCCESS) |
| fatal("%s: slurmscriptd initialization failed", |
| __func__); |
| |
| |
| slurm_mutex_init(&script_count_mutex); |
| slurm_mutex_init(&write_mutex); |
| slurm_mutex_init(&script_resp_map_mutex); |
| script_resp_map = xhash_init(_resp_map_key_id, _resp_map_free); |
| _setup_eio(slurmctld_readfd); |
| slurm_thread_create(&slurmctld_listener_tid, |
| _slurmctld_listener_thread, NULL); |
| debug("slurmctld: slurmscriptd fork()'d and initialized."); |
| } else { /* child (slurmscriptd_pid == 0) */ |
| /* |
| * Dup needed file descriptors and re-exec self. |
| * We do not need to closeall() here because it will happen on |
| * the re-exec. |
| */ |
| dup2(slurmscriptd_readfd, SLURMSCRIPT_READ_FD); |
| dup2(slurmscriptd_writefd, SLURMSCRIPT_WRITE_FD); |
| setenv(SLURMSCRIPTD_MODE_ENV, "1", 1); |
| execv(binary_path, argv); |
| fatal("%s: execv() failed: %m", __func__); |
| /* Never returns */ |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern int slurmscriptd_fini(void) |
| { |
| debug("%s starting", __func__); |
| _kill_slurmscriptd(); |
| |
| /* Now shutdown communications. */ |
| eio_signal_shutdown(msg_handle); |
| slurm_thread_join(slurmctld_listener_tid); |
| slurm_mutex_destroy(&script_resp_map_mutex); |
| xhash_clear(script_resp_map); |
| slurm_mutex_destroy(&write_mutex); |
| (void) close(slurmctld_writefd); |
| (void) close(slurmctld_readfd); |
| |
| debug("%s complete", __func__); |
| |
| return SLURM_SUCCESS; |
| } |