| /*****************************************************************************\ |
| * src/slurmd/slurmd/req.c - slurmd request handling |
| ***************************************************************************** |
| * Copyright (C) 2002-2007 The Regents of the University of California. |
| * Copyright (C) 2008-2010 Lawrence Livermore National Security. |
| * Portions Copyright (C) 2010-2013 SchedMD LLC. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Mark Grondona <mgrondona@llnl.gov>. |
| * CODE-OCEC-09-009. All rights reserved. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| #if HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <fcntl.h> |
| #include <pthread.h> |
| #include <sched.h> |
| #include <signal.h> |
| #include <string.h> |
| #include <sys/param.h> |
| #include <unistd.h> |
| #include <stdlib.h> |
| #include <sys/param.h> /* MAXPATHLEN */ |
| #include <sys/poll.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| #include <sys/un.h> |
| #include <utime.h> |
| #include <grp.h> |
| |
| #include "src/common/cpu_frequency.h" |
| #include "src/common/env.h" |
| #include "src/common/fd.h" |
| #include "src/common/forward.h" |
| #include "src/common/gres.h" |
| #include "src/common/hostlist.h" |
| #include "src/common/list.h" |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/node_select.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_auth.h" |
| #include "src/common/slurm_cred.h" |
| #include "src/common/slurm_acct_gather_energy.h" |
| #include "src/common/slurm_jobacct_gather.h" |
| #include "src/common/slurm_protocol_defs.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/slurm_protocol_interface.h" |
| #include "src/common/stepd_api.h" |
| #include "src/common/uid.h" |
| #include "src/common/util-net.h" |
| #include "src/common/xstring.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/plugstack.h" |
| |
| #include "src/slurmd/slurmd/get_mach_stat.h" |
| #include "src/slurmd/slurmd/slurmd.h" |
| #include "src/slurmd/slurmd/xcpu.h" |
| |
| #include "src/slurmd/common/job_container_plugin.h" |
| #include "src/slurmd/common/proctrack.h" |
| #include "src/slurmd/common/run_script.h" |
| #include "src/slurmd/common/reverse_tree_math.h" |
| #include "src/slurmd/common/slurmstepd_init.h" |
| #include "src/slurmd/common/task_plugin.h" |
| |
| #define _LIMIT_INFO 0 |
| |
| #define RETRY_DELAY 15 /* retry every 15 seconds */ |
| #define MAX_RETRY 240 /* retry 240 times (one hour max) */ |
| |
| #ifndef MAXHOSTNAMELEN |
| #define MAXHOSTNAMELEN 64 |
| #endif |
| |
| typedef struct { |
| int ngids; |
| gid_t *gids; |
| } gids_t; |
| |
| typedef struct { |
| uint32_t job_id; |
| uint32_t step_id; |
| uint32_t job_mem; |
| uint32_t step_mem; |
| } job_mem_limits_t; |
| |
| typedef struct { |
| uint32_t job_id; |
| uint32_t step_id; |
| } starting_step_t; |
| |
| typedef struct { |
| uint32_t job_id; |
| uint16_t msg_timeout; |
| bool *prolog_fini; |
| pthread_cond_t *timer_cond; |
| pthread_mutex_t *timer_mutex; |
| } timer_struct_t; |
| |
| typedef struct { |
| uint32_t jobid; |
| uint32_t step_id; |
| char *node_list; |
| char *partition; |
| char *resv_id; |
| char **spank_job_env; |
| uint32_t spank_job_env_size; |
| uid_t uid; |
| char *user_name; |
| } job_env_t; |
| |
| static int _abort_step(uint32_t job_id, uint32_t step_id); |
| static char **_build_env(job_env_t *job_env); |
| static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc); |
| static void _destroy_env(char **env); |
| static int _get_grouplist(char **user_name, uid_t my_uid, gid_t my_gid, |
| int *ngroups, gid_t **groups); |
| static bool _is_batch_job_finished(uint32_t job_id); |
| static void _job_limits_free(void *x); |
| static int _job_limits_match(void *x, void *key); |
| static bool _job_still_running(uint32_t job_id); |
| static int _kill_all_active_steps(uint32_t jobid, int sig, bool batch); |
| static int _launch_job_fail(uint32_t job_id, uint32_t slurm_rc); |
| static void _note_batch_job_finished(uint32_t job_id); |
| static int _step_limits_match(void *x, void *key); |
| static int _terminate_all_steps(uint32_t jobid, bool batch); |
| static void _rpc_launch_tasks(slurm_msg_t *); |
| static void _rpc_abort_job(slurm_msg_t *); |
| static void _rpc_batch_job(slurm_msg_t *msg, bool new_msg); |
| static void _rpc_prolog(slurm_msg_t *msg); |
| static void _rpc_job_notify(slurm_msg_t *); |
| static void _rpc_signal_tasks(slurm_msg_t *); |
| static void _rpc_checkpoint_tasks(slurm_msg_t *); |
| static void _rpc_complete_batch(slurm_msg_t *); |
| static void _rpc_terminate_tasks(slurm_msg_t *); |
| static void _rpc_timelimit(slurm_msg_t *); |
| static void _rpc_reattach_tasks(slurm_msg_t *); |
| static void _rpc_signal_job(slurm_msg_t *); |
| static void _rpc_suspend_job(slurm_msg_t *msg); |
| static void _rpc_terminate_job(slurm_msg_t *); |
| static void _rpc_update_time(slurm_msg_t *); |
| static void _rpc_shutdown(slurm_msg_t *msg); |
| static void _rpc_reconfig(slurm_msg_t *msg); |
| static void _rpc_reboot(slurm_msg_t *msg); |
| static void _rpc_pid2jid(slurm_msg_t *msg); |
| static int _rpc_file_bcast(slurm_msg_t *msg); |
| static int _rpc_ping(slurm_msg_t *); |
| static int _rpc_health_check(slurm_msg_t *); |
| static int _rpc_acct_gather_update(slurm_msg_t *); |
| static int _rpc_acct_gather_energy(slurm_msg_t *); |
| static int _rpc_step_complete(slurm_msg_t *msg); |
| static int _rpc_stat_jobacct(slurm_msg_t *msg); |
| static int _rpc_list_pids(slurm_msg_t *msg); |
| static int _rpc_daemon_status(slurm_msg_t *msg); |
| static int _run_prolog(job_env_t *job_env); |
| static int _run_epilog(job_env_t *job_env); |
| static void _rpc_forward_data(slurm_msg_t *msg); |
| |
| |
| static bool _pause_for_job_completion(uint32_t jobid, char *nodes, |
| int maxtime); |
| static bool _slurm_authorized_user(uid_t uid); |
| static void _sync_messages_kill(kill_job_msg_t *req); |
| static int _waiter_init (uint32_t jobid); |
| static int _waiter_complete (uint32_t jobid); |
| |
| static bool _steps_completed_now(uint32_t jobid); |
| static int _valid_sbcast_cred(file_bcast_msg_t *req, uid_t req_uid, |
| uint16_t block_no, uint32_t *job_id); |
| static void _wait_state_completed(uint32_t jobid, int max_delay); |
| static slurmstepd_info_t *_get_job_step_info(uint32_t jobid); |
| static long _get_job_uid(uint32_t jobid); |
| |
| static gids_t *_gids_cache_lookup(char *user, gid_t gid); |
| |
| static int _add_starting_step(slurmd_step_type_t type, void *req); |
| static int _remove_starting_step(slurmd_step_type_t type, void *req); |
| static int _compare_starting_steps(void *s0, void *s1); |
| static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id); |
| static bool _step_is_starting(uint32_t job_id, uint32_t step_id); |
| |
| static void _add_job_running_prolog(uint32_t job_id); |
| static void _remove_job_running_prolog(uint32_t job_id); |
| static int _compare_job_running_prolog(void *s0, void *s1); |
| static void _wait_for_job_running_prolog(uint32_t job_id); |
| |
| /* |
| * List of threads waiting for jobs to complete |
| */ |
| static List waiters; |
| |
| static pthread_mutex_t launch_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static time_t startup = 0; /* daemon startup time */ |
| static time_t last_slurmctld_msg = 0; |
| |
| static pthread_mutex_t job_limits_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static List job_limits_list = NULL; |
| static bool job_limits_loaded = false; |
| |
| #define FINI_JOB_CNT 32 |
| static pthread_mutex_t fini_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static uint32_t fini_job_id[FINI_JOB_CNT]; |
| static int next_fini_job_inx = 0; |
| |
| /* NUM_PARALLEL_SUSPEND controls the number of jobs suspended/resumed |
| * at one time as well as the number of jobsteps per job that can be |
| * suspended at one time */ |
| #define NUM_PARALLEL_SUSPEND 8 |
| static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static uint32_t job_suspend_array[NUM_PARALLEL_SUSPEND]; |
| static int job_suspend_size = 0; |
| |
| static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| void |
| slurmd_req(slurm_msg_t *msg) |
| { |
| int rc; |
| |
| if (msg == NULL) { |
| if (startup == 0) |
| startup = time(NULL); |
| if (waiters) { |
| list_destroy(waiters); |
| waiters = NULL; |
| } |
| slurm_mutex_lock(&job_limits_mutex); |
| if (job_limits_list) { |
| list_destroy(job_limits_list); |
| job_limits_list = NULL; |
| job_limits_loaded = false; |
| } |
| slurm_mutex_unlock(&job_limits_mutex); |
| return; |
| } |
| |
| switch(msg->msg_type) { |
| case REQUEST_LAUNCH_PROLOG: |
| debug2("Processing RPC: REQUEST_LAUNCH_PROLOG"); |
| _rpc_prolog(msg); |
| last_slurmctld_msg = time(NULL); |
| slurm_free_prolog_launch_msg(msg->data); |
| break; |
| case REQUEST_BATCH_JOB_LAUNCH: |
| debug2("Processing RPC: REQUEST_BATCH_JOB_LAUNCH"); |
| /* Mutex locking moved into _rpc_batch_job() due to |
| * very slow prolog on Blue Gene system. Only batch |
| * jobs are supported on Blue Gene (no job steps). */ |
| _rpc_batch_job(msg, true); |
| last_slurmctld_msg = time(NULL); |
| slurm_free_job_launch_msg(msg->data); |
| break; |
| case REQUEST_LAUNCH_TASKS: |
| debug2("Processing RPC: REQUEST_LAUNCH_TASKS"); |
| slurm_mutex_lock(&launch_mutex); |
| _rpc_launch_tasks(msg); |
| slurm_free_launch_tasks_request_msg(msg->data); |
| slurm_mutex_unlock(&launch_mutex); |
| break; |
| case REQUEST_SIGNAL_TASKS: |
| debug2("Processing RPC: REQUEST_SIGNAL_TASKS"); |
| _rpc_signal_tasks(msg); |
| slurm_free_kill_tasks_msg(msg->data); |
| break; |
| case REQUEST_CHECKPOINT_TASKS: |
| debug2("Processing RPC: REQUEST_CHECKPOINT_TASKS"); |
| _rpc_checkpoint_tasks(msg); |
| slurm_free_checkpoint_tasks_msg(msg->data); |
| break; |
| case REQUEST_TERMINATE_TASKS: |
| debug2("Processing RPC: REQUEST_TERMINATE_TASKS"); |
| _rpc_terminate_tasks(msg); |
| slurm_free_kill_tasks_msg(msg->data); |
| break; |
| case REQUEST_KILL_PREEMPTED: |
| debug2("Processing RPC: REQUEST_KILL_PREEMPTED"); |
| last_slurmctld_msg = time(NULL); |
| _rpc_timelimit(msg); |
| slurm_free_timelimit_msg(msg->data); |
| break; |
| case REQUEST_KILL_TIMELIMIT: |
| debug2("Processing RPC: REQUEST_KILL_TIMELIMIT"); |
| last_slurmctld_msg = time(NULL); |
| _rpc_timelimit(msg); |
| slurm_free_timelimit_msg(msg->data); |
| break; |
| case REQUEST_REATTACH_TASKS: |
| debug2("Processing RPC: REQUEST_REATTACH_TASKS"); |
| _rpc_reattach_tasks(msg); |
| slurm_free_reattach_tasks_request_msg(msg->data); |
| break; |
| case REQUEST_SIGNAL_JOB: |
| debug2("Processing RPC: REQUEST_SIGNAL_JOB"); |
| _rpc_signal_job(msg); |
| slurm_free_signal_job_msg(msg->data); |
| break; |
| case REQUEST_SUSPEND_INT: |
| debug2("Processing RPC: REQUEST_SUSPEND_INT"); |
| _rpc_suspend_job(msg); |
| last_slurmctld_msg = time(NULL); |
| slurm_free_suspend_int_msg(msg->data); |
| break; |
| case REQUEST_ABORT_JOB: |
| debug2("Processing RPC: REQUEST_ABORT_JOB"); |
| last_slurmctld_msg = time(NULL); |
| _rpc_abort_job(msg); |
| slurm_free_kill_job_msg(msg->data); |
| break; |
| case REQUEST_TERMINATE_JOB: |
| debug2("Processing RPC: REQUEST_TERMINATE_JOB"); |
| last_slurmctld_msg = time(NULL); |
| _rpc_terminate_job(msg); |
| slurm_free_kill_job_msg(msg->data); |
| break; |
| case REQUEST_COMPLETE_BATCH_SCRIPT: |
| debug2("Processing RPC: REQUEST_COMPLETE_BATCH_SCRIPT"); |
| _rpc_complete_batch(msg); |
| slurm_free_complete_batch_script_msg(msg->data); |
| break; |
| case REQUEST_UPDATE_JOB_TIME: |
| debug2("Processing RPC: REQUEST_UPDATE_JOB_TIME"); |
| _rpc_update_time(msg); |
| last_slurmctld_msg = time(NULL); |
| slurm_free_update_job_time_msg(msg->data); |
| break; |
| case REQUEST_SHUTDOWN: |
| debug2("Processing RPC: REQUEST_SHUTDOWN"); |
| _rpc_shutdown(msg); |
| slurm_free_shutdown_msg(msg->data); |
| break; |
| case REQUEST_RECONFIGURE: |
| debug2("Processing RPC: REQUEST_RECONFIGURE"); |
| _rpc_reconfig(msg); |
| last_slurmctld_msg = time(NULL); |
| /* No body to free */ |
| break; |
| case REQUEST_REBOOT_NODES: |
| debug2("Processing RPC: REQUEST_REBOOT_NODES"); |
| _rpc_reboot(msg); |
| slurm_free_reboot_msg(msg->data); |
| break; |
| case REQUEST_NODE_REGISTRATION_STATUS: |
| debug2("Processing RPC: REQUEST_NODE_REGISTRATION_STATUS"); |
| /* Treat as ping (for slurmctld agent, just return SUCCESS) */ |
| rc = _rpc_ping(msg); |
| last_slurmctld_msg = time(NULL); |
| /* No body to free */ |
| /* Then initiate a separate node registration */ |
| if (rc == SLURM_SUCCESS) |
| send_registration_msg(SLURM_SUCCESS, true); |
| break; |
| case REQUEST_PING: |
| _rpc_ping(msg); |
| last_slurmctld_msg = time(NULL); |
| /* No body to free */ |
| break; |
| case REQUEST_HEALTH_CHECK: |
| debug2("Processing RPC: REQUEST_HEALTH_CHECK"); |
| _rpc_health_check(msg); |
| last_slurmctld_msg = time(NULL); |
| /* No body to free */ |
| break; |
| case REQUEST_ACCT_GATHER_UPDATE: |
| debug2("Processing RPC: REQUEST_ACCT_GATHER_UPDATE"); |
| _rpc_acct_gather_update(msg); |
| last_slurmctld_msg = time(NULL); |
| /* No body to free */ |
| break; |
| case REQUEST_ACCT_GATHER_ENERGY: |
| debug2("Processing RPC: REQUEST_ACCT_GATHER_ENERGY"); |
| _rpc_acct_gather_energy(msg); |
| slurm_free_acct_gather_energy_req_msg(msg->data); |
| break; |
| case REQUEST_JOB_ID: |
| _rpc_pid2jid(msg); |
| slurm_free_job_id_request_msg(msg->data); |
| break; |
| case REQUEST_FILE_BCAST: |
| rc = _rpc_file_bcast(msg); |
| slurm_send_rc_msg(msg, rc); |
| slurm_free_file_bcast_msg(msg->data); |
| break; |
| case REQUEST_STEP_COMPLETE: |
| rc = _rpc_step_complete(msg); |
| slurm_free_step_complete_msg(msg->data); |
| break; |
| case REQUEST_JOB_STEP_STAT: |
| rc = _rpc_stat_jobacct(msg); |
| slurm_free_job_step_id_msg(msg->data); |
| break; |
| case REQUEST_JOB_STEP_PIDS: |
| rc = _rpc_list_pids(msg); |
| slurm_free_job_step_id_msg(msg->data); |
| break; |
| case REQUEST_DAEMON_STATUS: |
| _rpc_daemon_status(msg); |
| /* No body to free */ |
| break; |
| case REQUEST_JOB_NOTIFY: |
| _rpc_job_notify(msg); |
| slurm_free_job_notify_msg(msg->data); |
| break; |
| case REQUEST_FORWARD_DATA: |
| _rpc_forward_data(msg); |
| slurm_free_forward_data_msg(msg->data); |
| break; |
| case REQUEST_SUSPEND: /* Defunct, see REQUEST_SUSPEND_INT */ |
| default: |
| error("slurmd_req: invalid request msg type %d", |
| msg->msg_type); |
| slurm_send_rc_msg(msg, EINVAL); |
| break; |
| } |
| return; |
| } |
| static int _send_slurmd_conf_lite (int fd, slurmd_conf_t *cf) |
| { |
| int len; |
| Buf buffer = init_buf(0); |
| slurm_mutex_lock(&cf->config_mutex); |
| pack_slurmd_conf_lite(cf, buffer); |
| slurm_mutex_unlock(&cf->config_mutex); |
| len = get_buf_offset(buffer); |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, get_buf_data(buffer), len); |
| free_buf(buffer); |
| return (0); |
| rwfail: |
| return (-1); |
| } |
| |
| static int |
| _send_slurmstepd_init(int fd, slurmd_step_type_t type, void *req, |
| slurm_addr_t *cli, slurm_addr_t *self, |
| hostset_t step_hset) |
| { |
| int len = 0; |
| Buf buffer = NULL; |
| slurm_msg_t msg; |
| uid_t uid = (uid_t)-1; |
| gid_t gid = (uid_t)-1; |
| gids_t *gids = NULL; |
| |
| int rank; |
| int parent_rank, children, depth, max_depth; |
| char *parent_alias = NULL; |
| char *user_name = NULL; |
| slurm_addr_t parent_addr = {0}; |
| char pwd_buffer[PW_BUF_SIZE]; |
| struct passwd pwd, *pwd_result; |
| |
| slurm_msg_t_init(&msg); |
| /* send type over to slurmstepd */ |
| safe_write(fd, &type, sizeof(int)); |
| |
| /* step_hset can be NULL for batch scripts, OR if the user is |
| * the SlurmUser, and the job credential did not validate in |
| * _check_job_credential. If the job credential did not validate, |
| * then it did not come from the controller and there is no reason |
| * to send step completion messages to the controller. |
| */ |
| if (step_hset == NULL) { |
| if (type == LAUNCH_TASKS) { |
| info("task rank unavailable due to invalid job " |
| "credential, step completion RPC impossible"); |
| } |
| rank = -1; |
| parent_rank = -1; |
| children = 0; |
| depth = 0; |
| max_depth = 0; |
| } else if ((type == LAUNCH_TASKS) && |
| (((launch_tasks_request_msg_t *)req)->alias_list)) { |
| /* In the cloud, each task talks directly to the slurmctld |
| * since node addressing is abnormal */ |
| rank = 0; |
| parent_rank = -1; |
| children = 0; |
| depth = 0; |
| max_depth = 0; |
| } else { |
| #ifndef HAVE_FRONT_END |
| int count; |
| count = hostset_count(step_hset); |
| rank = hostset_find(step_hset, conf->node_name); |
| reverse_tree_info(rank, count, REVERSE_TREE_WIDTH, |
| &parent_rank, &children, |
| &depth, &max_depth); |
| if (rank > 0) { /* rank 0 talks directly to the slurmctld */ |
| int rc; |
| /* Find the slurm_addr_t of this node's parent slurmd |
| * in the step host list */ |
| parent_alias = hostset_nth(step_hset, parent_rank); |
| rc = slurm_conf_get_addr(parent_alias, &parent_addr); |
| if (rc != SLURM_SUCCESS) { |
| error("Failed looking up address for " |
| "NodeName %s", parent_alias); |
| /* parent_rank = -1; */ |
| } |
| } |
| #else |
| /* In FRONT_END mode, one slurmd pretends to be all |
| * NodeNames, so we can't compare conf->node_name |
| * to the NodeNames in step_hset. Just send step complete |
| * RPC directly to the controller. |
| */ |
| rank = 0; |
| parent_rank = -1; |
| children = 0; |
| depth = 0; |
| max_depth = 0; |
| #endif |
| } |
| debug3("slurmstepd rank %d (%s), parent rank %d (%s), " |
| "children %d, depth %d, max_depth %d", |
| rank, conf->node_name, |
| parent_rank, parent_alias ? parent_alias : "NONE", |
| children, depth, max_depth); |
| if (parent_alias) |
| free(parent_alias); |
| |
| /* send reverse-tree info to the slurmstepd */ |
| safe_write(fd, &rank, sizeof(int)); |
| safe_write(fd, &parent_rank, sizeof(int)); |
| safe_write(fd, &children, sizeof(int)); |
| safe_write(fd, &depth, sizeof(int)); |
| safe_write(fd, &max_depth, sizeof(int)); |
| safe_write(fd, &parent_addr, sizeof(slurm_addr_t)); |
| |
| /* send conf over to slurmstepd */ |
| if (_send_slurmd_conf_lite(fd, conf) < 0) |
| goto rwfail; |
| |
| /* send cli address over to slurmstepd */ |
| buffer = init_buf(0); |
| slurm_pack_slurm_addr(cli, buffer); |
| len = get_buf_offset(buffer); |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, get_buf_data(buffer), len); |
| free_buf(buffer); |
| buffer = NULL; |
| |
| /* send self address over to slurmstepd */ |
| if (self) { |
| buffer = init_buf(0); |
| slurm_pack_slurm_addr(self, buffer); |
| len = get_buf_offset(buffer); |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, get_buf_data(buffer), len); |
| free_buf(buffer); |
| buffer = NULL; |
| |
| } else { |
| len = 0; |
| safe_write(fd, &len, sizeof(int)); |
| } |
| |
| /* Send GRES information to slurmstepd */ |
| gres_plugin_send_stepd(fd); |
| |
| /* send cpu_frequency info to slurmstepd */ |
| cpu_freq_send_info(fd); |
| |
| /* send req over to slurmstepd */ |
| switch(type) { |
| case LAUNCH_BATCH_JOB: |
| gid = (uid_t)((batch_job_launch_msg_t *)req)->gid; |
| uid = (uid_t)((batch_job_launch_msg_t *)req)->uid; |
| user_name = ((batch_job_launch_msg_t *)req)->user_name; |
| msg.msg_type = REQUEST_BATCH_JOB_LAUNCH; |
| break; |
| case LAUNCH_TASKS: |
| /* |
| * The validity of req->uid was verified against the |
| * auth credential in _rpc_launch_tasks(). req->gid |
| * has NOT yet been checked! |
| */ |
| gid = (uid_t)((launch_tasks_request_msg_t *)req)->gid; |
| uid = (uid_t)((launch_tasks_request_msg_t *)req)->uid; |
| user_name = ((launch_tasks_request_msg_t *)req)->user_name; |
| msg.msg_type = REQUEST_LAUNCH_TASKS; |
| break; |
| default: |
| error("Was sent a task I didn't understand"); |
| break; |
| } |
| buffer = init_buf(0); |
| msg.data = req; |
| msg.protocol_version = SLURM_PROTOCOL_VERSION; |
| pack_msg(&msg, buffer); |
| len = get_buf_offset(buffer); |
| safe_write(fd, &len, sizeof(int)); |
| safe_write(fd, get_buf_data(buffer), len); |
| free_buf(buffer); |
| buffer = NULL; |
| |
| #ifdef HAVE_NATIVE_CRAY |
| /* Try to avoid calling this on a system which is a native |
| * cray. getpwuid_r is slow on the compute nodes and this has |
| * in theory been verified earlier. |
| */ |
| if (!user_name) { |
| #endif |
| /* send cached group ids array for the relevant uid */ |
| debug3("_send_slurmstepd_init: call to getpwuid_r"); |
| if (slurm_getpwuid_r(uid, &pwd, pwd_buffer, PW_BUF_SIZE, |
| &pwd_result) || (pwd_result == NULL)) { |
| error("%s: getpwuid_r: %m", __func__); |
| len = 0; |
| safe_write(fd, &len, sizeof(int)); |
| errno = ESLURMD_UID_NOT_FOUND; |
| return errno; |
| } |
| debug3("_send_slurmstepd_init: return from getpwuid_r"); |
| gid = pwd_result->pw_gid; |
| if (!user_name) |
| user_name = pwd_result->pw_name; |
| #ifdef HAVE_NATIVE_CRAY |
| } |
| #endif |
| if (!user_name) { |
| /* Sanity check since gids_cache_lookup will fail |
| * with a NULL. */ |
| error("_send_slurmstepd_init No user name for %d: %m", uid); |
| len = 0; |
| safe_write(fd, &len, sizeof(int)); |
| errno = ESLURMD_UID_NOT_FOUND; |
| return errno; |
| } |
| |
| if ((gids = _gids_cache_lookup(user_name, gid))) { |
| int i; |
| uint32_t tmp32; |
| safe_write(fd, &gids->ngids, sizeof(int)); |
| for (i = 0; i < gids->ngids; i++) { |
| tmp32 = (uint32_t)gids->gids[i]; |
| safe_write(fd, &tmp32, sizeof(uint32_t)); |
| } |
| } else { |
| len = 0; |
| safe_write(fd, &len, sizeof(int)); |
| } |
| return 0; |
| |
| rwfail: |
| if (buffer) |
| free_buf(buffer); |
| error("_send_slurmstepd_init failed"); |
| return errno; |
| } |
| |
| |
| /* |
| * Fork and exec the slurmstepd, then send the slurmstepd its |
| * initialization data. Then wait for slurmstepd to send an "ok" |
| * message before returning. When the "ok" message is received, |
| * the slurmstepd has created and begun listening on its unix |
| * domain socket. |
| * |
| * Note that this code forks twice and it is the grandchild that |
| * becomes the slurmstepd process, so the slurmstepd's parent process |
| * will be init, not slurmd. |
| */ |
| static int |
| _forkexec_slurmstepd(slurmd_step_type_t type, void *req, |
| slurm_addr_t *cli, slurm_addr_t *self, |
| const hostset_t step_hset) |
| { |
| pid_t pid; |
| int to_stepd[2] = {-1, -1}; |
| int to_slurmd[2] = {-1, -1}; |
| |
| if (pipe(to_stepd) < 0 || pipe(to_slurmd) < 0) { |
| error("_forkexec_slurmstepd pipe failed: %m"); |
| return SLURM_FAILURE; |
| } |
| |
| if (_add_starting_step(type, req)) { |
| error("_forkexec_slurmstepd failed in _add_starting_step: %m"); |
| return SLURM_FAILURE; |
| } |
| |
| if ((pid = fork()) < 0) { |
| error("_forkexec_slurmstepd: fork: %m"); |
| close(to_stepd[0]); |
| close(to_stepd[1]); |
| close(to_slurmd[0]); |
| close(to_slurmd[1]); |
| _remove_starting_step(type, req); |
| return SLURM_FAILURE; |
| } else if (pid > 0) { |
| int rc = 0; |
| #ifndef SLURMSTEPD_MEMCHECK |
| int i; |
| time_t start_time = time(NULL); |
| #endif |
| /* |
| * Parent sends initialization data to the slurmstepd |
| * over the to_stepd pipe, and waits for the return code |
| * reply on the to_slurmd pipe. |
| */ |
| if (close(to_stepd[0]) < 0) |
| error("Unable to close read to_stepd in parent: %m"); |
| if (close(to_slurmd[1]) < 0) |
| error("Unable to close write to_slurmd in parent: %m"); |
| |
| if ((rc = _send_slurmstepd_init(to_stepd[1], type, |
| req, cli, self, |
| step_hset)) != 0) { |
| error("Unable to init slurmstepd"); |
| goto done; |
| } |
| |
| /* If running under valgrind/memcheck, this pipe doesn't work |
| * correctly so just skip it. */ |
| #ifndef SLURMSTEPD_MEMCHECK |
| i = read(to_slurmd[0], &rc, sizeof(int)); |
| if (i < 0) { |
| error("Can not read return code from slurmstepd: %m"); |
| rc = SLURM_FAILURE; |
| } else if (i != sizeof(int)) { |
| error("slurmstepd failed to send return code"); |
| rc = SLURM_FAILURE; |
| } else { |
| int delta_time = time(NULL) - start_time; |
| if (delta_time > 5) { |
| info("Warning: slurmstepd startup took %d sec, " |
| "possible file system problem or full " |
| "memory", delta_time); |
| } |
| } |
| #endif |
| done: |
| if (_remove_starting_step(type, req)) |
| error("Error cleaning up starting_step list"); |
| |
| /* Reap child */ |
| if (waitpid(pid, NULL, 0) < 0) |
| error("Unable to reap slurmd child process"); |
| if (close(to_stepd[1]) < 0) |
| error("close write to_stepd in parent: %m"); |
| if (close(to_slurmd[0]) < 0) |
| error("close read to_slurmd in parent: %m"); |
| return rc; |
| } else { |
| #ifndef SLURMSTEPD_MEMCHECK |
| char *const argv[2] = { (char *)conf->stepd_loc, NULL}; |
| #else |
| char *const argv[3] = {"memcheck", |
| (char *)conf->stepd_loc, NULL}; |
| #endif |
| int failed = 0; |
| /* inform slurmstepd about our config */ |
| setenv("SLURM_CONF", conf->conffile, 1); |
| |
| /* |
| * Child forks and exits |
| */ |
| if (setsid() < 0) { |
| error("_forkexec_slurmstepd: setsid: %m"); |
| failed = 1; |
| } |
| if ((pid = fork()) < 0) { |
| error("_forkexec_slurmstepd: " |
| "Unable to fork grandchild: %m"); |
| failed = 2; |
| } else if (pid > 0) { /* child */ |
| exit(0); |
| } |
| |
| /* |
| * Grandchild exec's the slurmstepd |
| * |
| * If the slurmd is being shutdown/restarted before |
| * the pipe happens the old conf->lfd could be reused |
| * and if we close it the dup2 below will fail. |
| */ |
| if ((to_stepd[0] != conf->lfd) |
| && (to_slurmd[1] != conf->lfd)) |
| slurm_shutdown_msg_engine(conf->lfd); |
| |
| if (close(to_stepd[1]) < 0) |
| error("close write to_stepd in grandchild: %m"); |
| if (close(to_slurmd[0]) < 0) |
| error("close read to_slurmd in parent: %m"); |
| |
| (void) close(STDIN_FILENO); /* ignore return */ |
| if (dup2(to_stepd[0], STDIN_FILENO) == -1) { |
| error("dup2 over STDIN_FILENO: %m"); |
| exit(1); |
| } |
| fd_set_close_on_exec(to_stepd[0]); |
| (void) close(STDOUT_FILENO); /* ignore return */ |
| if (dup2(to_slurmd[1], STDOUT_FILENO) == -1) { |
| error("dup2 over STDOUT_FILENO: %m"); |
| exit(1); |
| } |
| fd_set_close_on_exec(to_slurmd[1]); |
| (void) close(STDERR_FILENO); /* ignore return */ |
| if (dup2(devnull, STDERR_FILENO) == -1) { |
| error("dup2 /dev/null to STDERR_FILENO: %m"); |
| exit(1); |
| } |
| fd_set_noclose_on_exec(STDERR_FILENO); |
| log_fini(); |
| if (!failed) { |
| execvp(argv[0], argv); |
| error("exec of slurmstepd failed: %m"); |
| } |
| exit(2); |
| } |
| } |
| |
| |
| /* |
| * The job(step) credential is the only place to get a definitive |
| * list of the nodes allocated to a job step. We need to return |
| * a hostset_t of the nodes. Validate the incoming RPC, updating |
| * job_mem needed. |
| */ |
| static int |
| _check_job_credential(launch_tasks_request_msg_t *req, uid_t uid, |
| int node_id, hostset_t *step_hset, |
| uint16_t protocol_version) |
| { |
| slurm_cred_arg_t arg; |
| hostset_t s_hset = NULL; |
| bool user_ok = _slurm_authorized_user(uid); |
| bool verified = true; |
| int host_index = -1; |
| int rc; |
| slurm_cred_t *cred = req->cred; |
| uint32_t jobid = req->job_id; |
| uint32_t stepid = req->job_step_id; |
| int tasks_to_launch = req->tasks_to_launch[node_id]; |
| uint32_t job_cpus = 0, step_cpus = 0; |
| |
| /* |
| * First call slurm_cred_verify() so that all valid |
| * credentials are checked |
| */ |
| rc = slurm_cred_verify(conf->vctx, cred, &arg, protocol_version); |
| if (rc < 0) { |
| verified = false; |
| if ((!user_ok) || (errno != ESLURMD_INVALID_JOB_CREDENTIAL)) |
| return SLURM_ERROR; |
| else { |
| debug("_check_job_credential slurm_cred_verify failed:" |
| " %m, but continuing anyway."); |
| } |
| } |
| req->job_core_spec = arg.job_core_spec; |
| |
| /* If uid is the SlurmUser or root and the credential is bad, |
| * then do not attempt validating the credential */ |
| if (!verified) { |
| *step_hset = NULL; |
| if (rc >= 0) { |
| if ((s_hset = hostset_create(arg.step_hostlist))) |
| *step_hset = s_hset; |
| slurm_cred_free_args(&arg); |
| } |
| return SLURM_SUCCESS; |
| } |
| |
| if ((arg.jobid != jobid) || (arg.stepid != stepid)) { |
| error("job credential for %u.%u, expected %u.%u", |
| arg.jobid, arg.stepid, jobid, stepid); |
| goto fail; |
| } |
| |
| if (arg.uid != uid) { |
| error("job credential created for uid %ld, expected %ld", |
| (long) arg.uid, (long) uid); |
| goto fail; |
| } |
| |
| /* |
| * Check that credential is valid for this host |
| */ |
| if (!(s_hset = hostset_create(arg.step_hostlist))) { |
| error("Unable to parse credential hostlist: `%s'", |
| arg.step_hostlist); |
| goto fail; |
| } |
| |
| if (!hostset_within(s_hset, conf->node_name)) { |
| error("Invalid job %u.%u credential for user %u: " |
| "host %s not in hostset %s", |
| arg.jobid, arg.stepid, arg.uid, |
| conf->node_name, arg.step_hostlist); |
| goto fail; |
| } |
| |
| if ((arg.job_nhosts > 0) && (tasks_to_launch > 0)) { |
| uint32_t hi, i, i_first_bit=0, i_last_bit=0, j; |
| bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND; |
| |
| #ifdef HAVE_FRONT_END |
| host_index = 0; /* It is always 0 for front end systems */ |
| #else |
| hostset_t j_hset; |
| /* Determine the CPU count based upon this node's index into |
| * the _job's_ allocation (job's hostlist and core_bitmap) */ |
| if (!(j_hset = hostset_create(arg.job_hostlist))) { |
| error("Unable to parse credential hostlist: `%s'", |
| arg.job_hostlist); |
| goto fail; |
| } |
| host_index = hostset_find(j_hset, conf->node_name); |
| hostset_destroy(j_hset); |
| |
| if ((host_index < 0) || (host_index >= arg.job_nhosts)) { |
| error("job cr credential invalid host_index %d for " |
| "job %u", host_index, arg.jobid); |
| goto fail; |
| } |
| #endif |
| |
| if (cpu_log) { |
| char *per_job = "", *per_step = ""; |
| uint32_t job_mem = arg.job_mem_limit; |
| uint32_t step_mem = arg.step_mem_limit; |
| if (job_mem & MEM_PER_CPU) { |
| job_mem &= (~MEM_PER_CPU); |
| per_job = "_per_CPU"; |
| } |
| if (step_mem & MEM_PER_CPU) { |
| step_mem &= (~MEM_PER_CPU); |
| per_step = "_per_CPU"; |
| } |
| info("===================="); |
| info("step_id:%u.%u job_mem:%uMB%s step_mem:%uMB%s", |
| arg.jobid, arg.stepid, job_mem, per_job, |
| step_mem, per_step); |
| } |
| |
| hi = host_index + 1; /* change from 0-origin to 1-origin */ |
| for (i=0; hi; i++) { |
| if (hi > arg.sock_core_rep_count[i]) { |
| i_first_bit += arg.sockets_per_node[i] * |
| arg.cores_per_socket[i] * |
| arg.sock_core_rep_count[i]; |
| hi -= arg.sock_core_rep_count[i]; |
| } else { |
| i_first_bit += arg.sockets_per_node[i] * |
| arg.cores_per_socket[i] * |
| (hi - 1); |
| i_last_bit = i_first_bit + |
| arg.sockets_per_node[i] * |
| arg.cores_per_socket[i]; |
| break; |
| } |
| } |
| /* Now count the allocated processors */ |
| for (i=i_first_bit, j=0; i<i_last_bit; i++, j++) { |
| char *who_has = NULL; |
| if (bit_test(arg.job_core_bitmap, i)) { |
| job_cpus++; |
| who_has = "Job"; |
| } |
| if (bit_test(arg.step_core_bitmap, i)) { |
| step_cpus++; |
| who_has = "Step"; |
| } |
| if (cpu_log && who_has) { |
| info("JobNode[%u] CPU[%u] %s alloc", |
| host_index, j, who_has); |
| } |
| } |
| if (cpu_log) |
| info("===================="); |
| if (step_cpus == 0) { |
| error("cons_res: zero processors allocated to step"); |
| step_cpus = 1; |
| } |
| /* NOTE: step_cpus is the count of allocated resources |
| * (typically cores). Convert to CPU count as needed */ |
| if (i_last_bit <= i_first_bit) |
| error("step credential has no CPUs selected"); |
| else { |
| i = conf->cpus / (i_last_bit - i_first_bit); |
| if (i > 1) { |
| info("Scaling CPU count by factor of " |
| "%d (%u/(%u-%u)", |
| i, conf->cpus, i_last_bit, i_first_bit); |
| step_cpus *= i; |
| job_cpus *= i; |
| } |
| } |
| if (tasks_to_launch > step_cpus) { |
| /* This is expected with the --overcommit option |
| * or hyperthreads */ |
| debug("cons_res: More than one tasks per logical " |
| "processor (%d > %u) on host [%u.%u %ld %s] ", |
| tasks_to_launch, step_cpus, arg.jobid, |
| arg.stepid, (long) arg.uid, arg.step_hostlist); |
| } |
| } else { |
| step_cpus = 1; |
| job_cpus = 1; |
| } |
| |
| /* Overwrite any memory limits in the RPC with contents of the |
| * memory limit within the credential. |
| * Reset the CPU count on this node to correct value. */ |
| if (arg.step_mem_limit) { |
| if (arg.step_mem_limit & MEM_PER_CPU) { |
| req->step_mem_lim = arg.step_mem_limit & |
| (~MEM_PER_CPU); |
| req->step_mem_lim *= step_cpus; |
| } else |
| req->step_mem_lim = arg.step_mem_limit; |
| } else { |
| if (arg.job_mem_limit & MEM_PER_CPU) { |
| req->step_mem_lim = arg.job_mem_limit & |
| (~MEM_PER_CPU); |
| req->step_mem_lim *= job_cpus; |
| } else |
| req->step_mem_lim = arg.job_mem_limit; |
| } |
| if (arg.job_mem_limit & MEM_PER_CPU) { |
| req->job_mem_lim = arg.job_mem_limit & (~MEM_PER_CPU); |
| req->job_mem_lim *= job_cpus; |
| } else |
| req->job_mem_lim = arg.job_mem_limit; |
| req->node_cpus = step_cpus; |
| #if 0 |
| info("%u.%u node_id:%d mem orig:%u cpus:%u limit:%u", |
| jobid, stepid, node_id, arg.job_mem_limit, |
| step_cpus, req->job_mem_lim); |
| #endif |
| |
| *step_hset = s_hset; |
| slurm_cred_free_args(&arg); |
| return SLURM_SUCCESS; |
| |
| fail: |
| if (s_hset) |
| hostset_destroy(s_hset); |
| *step_hset = NULL; |
| slurm_cred_free_args(&arg); |
| slurm_seterrno_ret(ESLURMD_INVALID_JOB_CREDENTIAL); |
| } |
| |
| |
| static void |
| _rpc_launch_tasks(slurm_msg_t *msg) |
| { |
| int errnum = SLURM_SUCCESS; |
| uint16_t port; |
| char host[MAXHOSTNAMELEN]; |
| uid_t req_uid; |
| launch_tasks_request_msg_t *req = msg->data; |
| bool super_user = false; |
| #ifndef HAVE_FRONT_END |
| bool first_job_run; |
| #endif |
| slurm_addr_t self; |
| slurm_addr_t *cli = &msg->orig_addr; |
| socklen_t adlen; |
| hostset_t step_hset = NULL; |
| job_mem_limits_t *job_limits_ptr; |
| int nodeid = 0; |
| #ifndef HAVE_FRONT_END |
| /* It is always 0 for front end systems */ |
| nodeid = nodelist_find(req->complete_nodelist, conf->node_name); |
| #endif |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| memcpy(&req->orig_addr, &msg->orig_addr, sizeof(slurm_addr_t)); |
| |
| task_g_slurmd_launch_request(req->job_id, req, nodeid); |
| |
| super_user = _slurm_authorized_user(req_uid); |
| |
| if ((super_user == false) && (req_uid != req->uid)) { |
| error("launch task request from uid %u", |
| (unsigned int) req_uid); |
| errnum = ESLURM_USER_ID_MISSING; /* or invalid user */ |
| goto done; |
| } |
| |
| slurm_get_ip_str(cli, &port, host, sizeof(host)); |
| info("launch task %u.%u request from %u.%u@%s (port %hu)", req->job_id, |
| req->job_step_id, req->uid, req->gid, host, port); |
| |
| /* this could be set previously and needs to be overwritten by |
| * this call for messages to work correctly for the new call */ |
| env_array_overwrite(&req->env, "SLURM_SRUN_COMM_HOST", host); |
| req->envc = envcount(req->env); |
| |
| #ifndef HAVE_FRONT_END |
| slurm_mutex_lock(&prolog_mutex); |
| first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id); |
| #endif |
| if (_check_job_credential(req, req_uid, nodeid, &step_hset, |
| msg->protocol_version) < 0) { |
| errnum = errno; |
| error("Invalid job credential from %ld@%s: %m", |
| (long) req_uid, host); |
| #ifndef HAVE_FRONT_END |
| slurm_mutex_unlock(&prolog_mutex); |
| #endif |
| goto done; |
| } |
| |
| #ifndef HAVE_FRONT_END |
| if (first_job_run) { |
| int rc; |
| job_env_t job_env; |
| |
| slurm_cred_insert_jobid(conf->vctx, req->job_id); |
| _add_job_running_prolog(req->job_id); |
| slurm_mutex_unlock(&prolog_mutex); |
| |
| if (container_g_create(req->job_id)) |
| error("container_g_create(%u): %m", req->job_id); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = req->job_id; |
| job_env.step_id = req->job_step_id; |
| job_env.node_list = req->complete_nodelist; |
| job_env.partition = req->partition; |
| job_env.spank_job_env = req->spank_job_env; |
| job_env.spank_job_env_size = req->spank_job_env_size; |
| job_env.uid = req->uid; |
| job_env.user_name = req->user_name; |
| rc = _run_prolog(&job_env); |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] prolog failed status=%d:%d", |
| req->job_id, exit_status, term_sig); |
| errnum = ESLURMD_PROLOG_FAILED; |
| goto done; |
| } |
| /* Since the job could have been killed while the prolog was |
| * running, test if the credential has since been revoked |
| * and exit as needed. */ |
| if (slurm_cred_revoked(conf->vctx, req->cred)) { |
| info("Job %u already killed, do not launch step %u.%u", |
| req->job_id, req->job_id, req->job_step_id); |
| errnum = ESLURMD_CREDENTIAL_REVOKED; |
| goto done; |
| } |
| } else { |
| slurm_mutex_unlock(&prolog_mutex); |
| _wait_for_job_running_prolog(req->job_id); |
| } |
| #endif |
| |
| if (req->job_mem_lim || req->step_mem_lim) { |
| step_loc_t step_info; |
| slurm_mutex_lock(&job_limits_mutex); |
| if (!job_limits_list) |
| job_limits_list = list_create(_job_limits_free); |
| step_info.jobid = req->job_id; |
| step_info.stepid = req->job_step_id; |
| job_limits_ptr = list_find_first (job_limits_list, |
| _step_limits_match, |
| &step_info); |
| if (!job_limits_ptr) { |
| job_limits_ptr = xmalloc(sizeof(job_mem_limits_t)); |
| job_limits_ptr->job_id = req->job_id; |
| job_limits_ptr->job_mem = req->job_mem_lim; |
| job_limits_ptr->step_id = req->job_step_id; |
| job_limits_ptr->step_mem = req->step_mem_lim; |
| #if _LIMIT_INFO |
| info("AddLim step:%u.%u job_mem:%u step_mem:%u", |
| job_limits_ptr->job_id, job_limits_ptr->step_id, |
| job_limits_ptr->job_mem, |
| job_limits_ptr->step_mem); |
| #endif |
| list_append(job_limits_list, job_limits_ptr); |
| } |
| slurm_mutex_unlock(&job_limits_mutex); |
| } |
| |
| adlen = sizeof(self); |
| _slurm_getsockname(msg->conn_fd, (struct sockaddr *)&self, &adlen); |
| |
| debug3("_rpc_launch_tasks: call to _forkexec_slurmstepd"); |
| errnum = _forkexec_slurmstepd(LAUNCH_TASKS, (void *)req, cli, &self, |
| step_hset); |
| debug3("_rpc_launch_tasks: return from _forkexec_slurmstepd"); |
| |
| done: |
| if (step_hset) |
| hostset_destroy(step_hset); |
| |
| if (slurm_send_rc_msg(msg, errnum) < 0) { |
| char addr_str[32]; |
| slurm_print_slurm_addr(&msg->address, addr_str, |
| sizeof(addr_str)); |
| error("_rpc_launch_tasks: unable to send return code to " |
| "address:port=%s msg_type=%u: %m", |
| addr_str, msg->msg_type); |
| |
| /* |
| * Rewind credential so that srun may perform retry |
| */ |
| slurm_cred_rewind(conf->vctx, req->cred); /* ignore errors */ |
| |
| } else if (errnum == SLURM_SUCCESS) { |
| save_cred_state(conf->vctx); |
| task_g_slurmd_reserve_resources(req->job_id, req, nodeid); |
| } |
| |
| /* |
| * If job prolog failed, indicate failure to slurmctld |
| */ |
| if (errnum == ESLURMD_PROLOG_FAILED) |
| send_registration_msg(errnum, false); |
| } |
| |
| static void |
| _prolog_error(batch_job_launch_msg_t *req, int rc) |
| { |
| char *err_name_ptr, err_name[256], path_name[MAXPATHLEN]; |
| char *fmt_char; |
| int fd; |
| |
| if (req->std_err || req->std_out) { |
| if (req->std_err) |
| strncpy(err_name, req->std_err, sizeof(err_name)); |
| else |
| strncpy(err_name, req->std_out, sizeof(err_name)); |
| if ((fmt_char = strchr(err_name, (int) '%')) && |
| (fmt_char[1] == 'j') && !strchr(fmt_char+1, (int) '%')) { |
| char tmp_name[256]; |
| fmt_char[1] = 'u'; |
| snprintf(tmp_name, sizeof(tmp_name), err_name, |
| req->job_id); |
| strncpy(err_name, tmp_name, sizeof(err_name)); |
| } |
| } else { |
| snprintf(err_name, sizeof(err_name), "slurm-%u.out", |
| req->job_id); |
| } |
| err_name_ptr = err_name; |
| if (err_name_ptr[0] == '/') |
| snprintf(path_name, MAXPATHLEN, "%s", err_name_ptr); |
| else if (req->work_dir) |
| snprintf(path_name, MAXPATHLEN, "%s/%s", |
| req->work_dir, err_name_ptr); |
| else |
| snprintf(path_name, MAXPATHLEN, "/%s", err_name_ptr); |
| |
| if ((fd = open(path_name, (O_CREAT|O_APPEND|O_WRONLY), 0644)) == -1) { |
| error("Unable to open %s: %s", path_name, |
| slurm_strerror(errno)); |
| return; |
| } |
| snprintf(err_name, sizeof(err_name), |
| "Error running slurm prolog: %d\n", WEXITSTATUS(rc)); |
| safe_write(fd, err_name, strlen(err_name)); |
| if (fchown(fd, (uid_t) req->uid, (gid_t) req->gid) == -1) { |
| snprintf(err_name, sizeof(err_name), |
| "Couldn't change fd owner to %u:%u: %m\n", |
| req->uid, req->gid); |
| } |
| rwfail: |
| close(fd); |
| } |
| |
| /* load the user's environment on this machine if requested |
| * SLURM_GET_USER_ENV environment variable is set */ |
| static void |
| _get_user_env(batch_job_launch_msg_t *req) |
| { |
| struct passwd pwd, *pwd_ptr = NULL; |
| char pwd_buf[PW_BUF_SIZE]; |
| char **new_env; |
| int i; |
| |
| for (i=0; i<req->envc; i++) { |
| if (strcmp(req->environment[i], "SLURM_GET_USER_ENV=1") == 0) |
| break; |
| } |
| if (i >= req->envc) |
| return; /* don't need to load env */ |
| |
| if (slurm_getpwuid_r(req->uid, &pwd, pwd_buf, PW_BUF_SIZE, &pwd_ptr) |
| || (pwd_ptr == NULL)) { |
| error("%s: getpwuid_r(%u):%m", __func__, req->uid); |
| } else { |
| verbose("get env for user %s here", pwd.pw_name); |
| /* Permit up to 120 second delay before using cache file */ |
| new_env = env_array_user_default(pwd.pw_name, 120, 0); |
| if (new_env) { |
| env_array_merge(&new_env, |
| (const char **) req->environment); |
| env_array_free(req->environment); |
| req->environment = new_env; |
| req->envc = envcount(new_env); |
| } else { |
| /* One option is to kill the job, but it's |
| * probably better to try running with what |
| * we have. */ |
| error("Unable to get user's local environment, " |
| "running only with passed environment"); |
| } |
| } |
| } |
| |
| /* The RPC currently contains a memory size limit, but we load the |
| * value from the job credential to be certain it has not been |
| * altered by the user */ |
| static void |
| _set_batch_job_limits(slurm_msg_t *msg) |
| { |
| int i; |
| uint32_t alloc_lps = 0, last_bit = 0; |
| bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND; |
| slurm_cred_arg_t arg; |
| batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data; |
| |
| if (slurm_cred_get_args(req->cred, &arg) != SLURM_SUCCESS) |
| return; |
| req->job_core_spec = arg.job_core_spec; /* Prevent user reset */ |
| |
| if (cpu_log) { |
| char *per_job = ""; |
| uint32_t job_mem = arg.job_mem_limit; |
| if (job_mem & MEM_PER_CPU) { |
| job_mem &= (~MEM_PER_CPU); |
| per_job = "_per_CPU"; |
| } |
| info("===================="); |
| info("batch_job:%u job_mem:%uMB%s", req->job_id, |
| job_mem, per_job); |
| } |
| if (cpu_log || (arg.job_mem_limit & MEM_PER_CPU)) { |
| if (arg.job_nhosts > 0) { |
| last_bit = arg.sockets_per_node[0] * |
| arg.cores_per_socket[0]; |
| for (i=0; i<last_bit; i++) { |
| if (!bit_test(arg.job_core_bitmap, i)) |
| continue; |
| if (cpu_log) |
| info("JobNode[0] CPU[%u] Job alloc",i); |
| alloc_lps++; |
| } |
| } |
| if (cpu_log) |
| info("===================="); |
| if (alloc_lps == 0) { |
| error("_set_batch_job_limit: alloc_lps is zero"); |
| alloc_lps = 1; |
| } |
| |
| /* NOTE: alloc_lps is the count of allocated resources |
| * (typically cores). Convert to CPU count as needed */ |
| if (last_bit < 1) |
| error("Batch job credential allocates no CPUs"); |
| else { |
| i = conf->cpus / last_bit; |
| if (i > 1) |
| alloc_lps *= i; |
| } |
| } |
| |
| if (arg.job_mem_limit & MEM_PER_CPU) { |
| req->job_mem = arg.job_mem_limit & (~MEM_PER_CPU); |
| req->job_mem *= alloc_lps; |
| } else |
| req->job_mem = arg.job_mem_limit; |
| |
| slurm_cred_free_args(&arg); |
| } |
| |
| /* These functions prevent a possible race condition if the batch script's |
| * complete RPC is processed before it's launch_successful response. This |
| * */ |
| static bool _is_batch_job_finished(uint32_t job_id) |
| { |
| bool found_job = false; |
| int i; |
| |
| slurm_mutex_lock(&fini_mutex); |
| for (i = 0; i < FINI_JOB_CNT; i++) { |
| if (fini_job_id[i] == job_id) { |
| found_job = true; |
| break; |
| } |
| } |
| slurm_mutex_unlock(&fini_mutex); |
| |
| return found_job; |
| } |
| static void _note_batch_job_finished(uint32_t job_id) |
| { |
| slurm_mutex_lock(&fini_mutex); |
| fini_job_id[next_fini_job_inx] = job_id; |
| if (++next_fini_job_inx >= FINI_JOB_CNT) |
| next_fini_job_inx = 0; |
| slurm_mutex_unlock(&fini_mutex); |
| } |
| |
| /* Send notification to slurmctld we are finished running the prolog. |
| * This is needed on system that don't use srun to launch their tasks. |
| */ |
| static void _notify_slurmctld_prolog_fini( |
| uint32_t job_id, uint32_t prolog_return_code) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| complete_prolog_msg_t req; |
| |
| slurm_msg_t_init(&req_msg); |
| req.job_id = job_id; |
| req.prolog_rc = prolog_return_code; |
| |
| req_msg.msg_type= REQUEST_COMPLETE_PROLOG; |
| req_msg.data = &req; |
| |
| if ((slurm_send_recv_controller_rc_msg(&req_msg, &rc) < 0) || |
| (rc != SLURM_SUCCESS)) |
| error("Error sending prolog completion notification: %m"); |
| } |
| |
| static void _rpc_prolog(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data; |
| job_env_t job_env; |
| bool first_job_run; |
| |
| if (req == NULL) |
| return; |
| |
| if (slurm_send_rc_msg(msg, rc) < 0) { |
| error("Error starting prolog: %m"); |
| } |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] prolog start failed status=%d:%d", |
| req->job_id, exit_status, term_sig); |
| rc = ESLURMD_PROLOG_FAILED; |
| } |
| |
| if (container_g_create(req->job_id)) |
| error("container_g_create(%u): %m", req->job_id); |
| |
| slurm_mutex_lock(&prolog_mutex); |
| first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id); |
| |
| if (first_job_run) { |
| slurm_cred_insert_jobid(conf->vctx, req->job_id); |
| _add_job_running_prolog(req->job_id); |
| slurm_mutex_unlock(&prolog_mutex); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = req->job_id; |
| job_env.step_id = 0; /* not available */ |
| job_env.node_list = req->nodes; |
| job_env.partition = req->partition; |
| job_env.spank_job_env = req->spank_job_env; |
| job_env.spank_job_env_size = req->spank_job_env_size; |
| job_env.uid = req->uid; |
| #if defined(HAVE_BG) |
| select_g_select_jobinfo_get(req->select_jobinfo, |
| SELECT_JOBDATA_BLOCK_ID, |
| &job_env.resv_id); |
| #elif defined(HAVE_ALPS_CRAY) |
| job_env.resv_id = select_g_select_jobinfo_xstrdup( |
| req->select_jobinfo, SELECT_PRINT_RESV_ID); |
| #endif |
| rc = _run_prolog(&job_env); |
| |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] prolog failed status=%d:%d", |
| req->job_id, exit_status, term_sig); |
| rc = ESLURMD_PROLOG_FAILED; |
| } |
| } else |
| slurm_mutex_unlock(&prolog_mutex); |
| |
| if (!(slurmctld_conf.prolog_flags & PROLOG_FLAG_NOHOLD)) |
| _notify_slurmctld_prolog_fini(req->job_id, rc); |
| } |
| |
| static void |
| _rpc_batch_job(slurm_msg_t *msg, bool new_msg) |
| { |
| batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data; |
| bool first_job_run; |
| int rc = SLURM_SUCCESS; |
| bool replied = false, revoked; |
| slurm_addr_t *cli = &msg->orig_addr; |
| |
| if (new_msg) { |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation, batch launch RPC from uid %d", |
| req_uid); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| goto done; |
| } |
| } |
| slurm_cred_handle_reissue(conf->vctx, req->cred); |
| if (slurm_cred_revoked(conf->vctx, req->cred)) { |
| error("Job %u already killed, do not launch batch job", |
| req->job_id); |
| rc = ESLURMD_CREDENTIAL_REVOKED; /* job already ran */ |
| goto done; |
| } |
| |
| task_g_slurmd_batch_request(req->job_id, req); /* determine task affinity */ |
| |
| slurm_mutex_lock(&prolog_mutex); |
| first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id); |
| |
| /* BlueGene prolog waits for partition boot and is very slow. |
| * On any system we might need to load environment variables |
| * for Moab (see --get-user-env), which could also be slow. |
| * Just reply now and send a separate kill job request if the |
| * prolog or launch fail. */ |
| replied = true; |
| if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) { |
| /* The slurmctld is no longer waiting for a reply. |
| * This typically indicates that the slurmd was |
| * blocked from memory and/or CPUs and the slurmctld |
| * has requeued the batch job request. */ |
| error("Could not confirm batch launch for job %u, " |
| "aborting request", req->job_id); |
| rc = SLURM_COMMUNICATIONS_SEND_ERROR; |
| slurm_mutex_unlock(&prolog_mutex); |
| goto done; |
| } |
| |
| /* |
| * Insert jobid into credential context to denote that |
| * we've now "seen" an instance of the job |
| */ |
| if (first_job_run) { |
| job_env_t job_env; |
| slurm_cred_insert_jobid(conf->vctx, req->job_id); |
| _add_job_running_prolog(req->job_id); |
| slurm_mutex_unlock(&prolog_mutex); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = req->job_id; |
| job_env.step_id = req->step_id; |
| job_env.node_list = req->nodes; |
| job_env.partition = req->partition; |
| job_env.spank_job_env = req->spank_job_env; |
| job_env.spank_job_env_size = req->spank_job_env_size; |
| job_env.uid = req->uid; |
| job_env.user_name = req->user_name; |
| /* |
| * Run job prolog on this node |
| */ |
| #if defined(HAVE_BG) |
| select_g_select_jobinfo_get(req->select_jobinfo, |
| SELECT_JOBDATA_BLOCK_ID, |
| &job_env.resv_id); |
| #elif defined(HAVE_ALPS_CRAY) |
| job_env.resv_id = select_g_select_jobinfo_xstrdup( |
| req->select_jobinfo, SELECT_PRINT_RESV_ID); |
| #endif |
| if (container_g_create(req->job_id)) |
| error("container_g_create(%u): %m", req->job_id); |
| rc = _run_prolog(&job_env); |
| xfree(job_env.resv_id); |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] prolog failed status=%d:%d", |
| req->job_id, exit_status, term_sig); |
| _prolog_error(req, rc); |
| rc = ESLURMD_PROLOG_FAILED; |
| goto done; |
| } |
| } else { |
| slurm_mutex_unlock(&prolog_mutex); |
| _wait_for_job_running_prolog(req->job_id); |
| } |
| |
| _get_user_env(req); |
| _set_batch_job_limits(msg); |
| |
| /* Since job could have been killed while the prolog was |
| * running (especially on BlueGene, which can take minutes |
| * for partition booting). Test if the credential has since |
| * been revoked and exit as needed. */ |
| if (slurm_cred_revoked(conf->vctx, req->cred)) { |
| info("Job %u already killed, do not launch batch job", |
| req->job_id); |
| rc = ESLURMD_CREDENTIAL_REVOKED; /* job already ran */ |
| goto done; |
| } |
| |
| slurm_mutex_lock(&launch_mutex); |
| if (req->step_id == SLURM_BATCH_SCRIPT) |
| info("Launching batch job %u for UID %d", |
| req->job_id, req->uid); |
| else |
| info("Launching batch job %u.%u for UID %d", |
| req->job_id, req->step_id, req->uid); |
| |
| debug3("_rpc_batch_job: call to _forkexec_slurmstepd"); |
| rc = _forkexec_slurmstepd(LAUNCH_BATCH_JOB, (void *)req, cli, NULL, |
| (hostset_t)NULL); |
| debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc); |
| |
| slurm_mutex_unlock(&launch_mutex); |
| |
| /* On a busy system, slurmstepd may take a while to respond, |
| * if the job was cancelled in the interim, run through the |
| * abort logic below. */ |
| revoked = slurm_cred_revoked(conf->vctx, req->cred); |
| if (revoked && _is_batch_job_finished(req->job_id)) { |
| /* If configured with select/serial and the batch job already |
| * completed, consider the job sucessfully launched and do |
| * not repeat termination logic below, which in the worst case |
| * just slows things down with another message. */ |
| revoked = false; |
| } |
| if (revoked) { |
| info("Job %u killed while launch was in progress", |
| req->job_id); |
| sleep(1); /* give slurmstepd time to create |
| * the communication socket */ |
| _terminate_all_steps(req->job_id, true); |
| rc = ESLURMD_CREDENTIAL_REVOKED; |
| goto done; |
| } |
| |
| done: |
| if (!replied) { |
| if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) { |
| /* The slurmctld is no longer waiting for a reply. |
| * This typically indicates that the slurmd was |
| * blocked from memory and/or CPUs and the slurmctld |
| * has requeued the batch job request. */ |
| error("Could not confirm batch launch for job %u, " |
| "aborting request", req->job_id); |
| rc = SLURM_COMMUNICATIONS_SEND_ERROR; |
| } else { |
| /* No need to initiate separate reply below */ |
| rc = SLURM_SUCCESS; |
| } |
| } |
| if (rc != SLURM_SUCCESS) { |
| /* prolog or job launch failure, |
| * tell slurmctld that the job failed */ |
| if (req->step_id == SLURM_BATCH_SCRIPT) |
| _launch_job_fail(req->job_id, rc); |
| else |
| _abort_step(req->job_id, req->step_id); |
| } |
| |
| /* |
| * If job prolog failed or we could not reply, |
| * initiate message to slurmctld with current state |
| */ |
| if ((rc == ESLURMD_PROLOG_FAILED) || |
| (rc == SLURM_COMMUNICATIONS_SEND_ERROR)) |
| send_registration_msg(rc, false); |
| } |
| /* |
| * Send notification message to batch job |
| */ |
| static void |
| _rpc_job_notify(slurm_msg_t *msg) |
| { |
| job_notify_msg_t *req = msg->data; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| long job_uid; |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd = NULL; |
| int step_cnt = 0; |
| int fd; |
| |
| debug("_rpc_job_notify, uid = %d, jobid = %u", req_uid, req->job_id); |
| job_uid = _get_job_uid(req->job_id); |
| if (job_uid < 0) |
| goto no_job; |
| |
| /* |
| * check that requesting user ID is the SLURM UID or root |
| */ |
| if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) { |
| error("Security violation: job_notify(%u) from uid %d", |
| req->job_id, req_uid); |
| return; |
| } |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if ((stepd->jobid != req->job_id) || |
| (stepd->stepid != SLURM_BATCH_SCRIPT)) { |
| continue; |
| } |
| |
| step_cnt++; |
| |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| info("send notification to job %u.%u", |
| stepd->jobid, stepd->stepid); |
| if (stepd_notify_job(fd, req->message) < 0) |
| debug("notify jobid=%u failed: %m", stepd->jobid); |
| close(fd); |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| no_job: |
| if (step_cnt == 0) { |
| debug2("Can't find jobid %u to send notification message", |
| req->job_id); |
| } |
| } |
| |
| static int |
| _launch_job_fail(uint32_t job_id, uint32_t slurm_rc) |
| { |
| complete_batch_script_msg_t comp_msg; |
| struct requeue_msg req_msg; |
| slurm_msg_t resp_msg; |
| int rc; |
| |
| slurm_msg_t_init(&resp_msg); |
| |
| if (slurm_rc == ESLURMD_CREDENTIAL_REVOKED) { |
| comp_msg.job_id = job_id; |
| comp_msg.job_rc = INFINITE; |
| comp_msg.slurm_rc = slurm_rc; |
| comp_msg.node_name = conf->node_name; |
| comp_msg.jobacct = NULL; /* unused */ |
| resp_msg.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT; |
| resp_msg.data = &comp_msg; |
| } else { |
| req_msg.job_id = job_id; |
| req_msg.state = JOB_REQUEUE_HOLD; |
| resp_msg.msg_type = REQUEST_JOB_REQUEUE; |
| resp_msg.data = &req_msg; |
| } |
| |
| return slurm_send_recv_controller_rc_msg(&resp_msg, &rc); |
| } |
| |
| static int |
| _abort_step(uint32_t job_id, uint32_t step_id) |
| { |
| step_complete_msg_t resp; |
| slurm_msg_t resp_msg; |
| slurm_msg_t_init(&resp_msg); |
| int rc, rc2; |
| |
| resp.job_id = job_id; |
| resp.job_step_id = step_id; |
| resp.range_first = 0; |
| resp.range_last = 0; |
| resp.step_rc = 1; |
| resp.jobacct = jobacctinfo_create(NULL); |
| resp_msg.msg_type = REQUEST_STEP_COMPLETE; |
| resp_msg.data = &resp; |
| rc2 = slurm_send_recv_controller_rc_msg(&resp_msg, &rc); |
| /* Note: we are ignoring the RPC return code */ |
| jobacctinfo_destroy(resp.jobacct); |
| return rc2; |
| } |
| |
| static void |
| _rpc_reconfig(slurm_msg_t *msg) |
| { |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| if (!_slurm_authorized_user(req_uid)) |
| error("Security violation, reconfig RPC from uid %d", |
| req_uid); |
| else |
| kill(conf->pid, SIGHUP); |
| forward_wait(msg); |
| /* Never return a message, slurmctld does not expect one */ |
| } |
| |
| static void |
| _rpc_shutdown(slurm_msg_t *msg) |
| { |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| forward_wait(msg); |
| if (!_slurm_authorized_user(req_uid)) |
| error("Security violation, shutdown RPC from uid %d", |
| req_uid); |
| else { |
| if (kill(conf->pid, SIGTERM) != 0) |
| error("kill(%u,SIGTERM): %m", conf->pid); |
| } |
| |
| /* Never return a message, slurmctld does not expect one */ |
| } |
| |
| static void |
| _rpc_reboot(slurm_msg_t *msg) |
| { |
| char *reboot_program, *sp; |
| slurm_ctl_conf_t *cfg; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| int exit_code; |
| |
| if (!_slurm_authorized_user(req_uid)) |
| error("Security violation, reboot RPC from uid %d", |
| req_uid); |
| else { |
| cfg = slurm_conf_lock(); |
| reboot_program = cfg->reboot_program; |
| if (reboot_program) { |
| sp = strchr(reboot_program, ' '); |
| if (sp) |
| sp = xstrndup(reboot_program, |
| (sp - reboot_program)); |
| else |
| sp = xstrdup(reboot_program); |
| if (access(sp, R_OK | X_OK) < 0) |
| error("Cannot run RebootProgram [%s]: %m", sp); |
| else if ((exit_code = system(reboot_program))) |
| error("system(%s) returned %d", reboot_program, |
| exit_code); |
| xfree(sp); |
| } else |
| error("RebootProgram isn't defined in config"); |
| slurm_conf_unlock(); |
| } |
| |
| /* Never return a message, slurmctld does not expect one */ |
| /* slurm_send_rc_msg(msg, rc); */ |
| } |
| |
| static void _job_limits_free(void *x) |
| { |
| xfree(x); |
| } |
| |
| |
| static int _job_limits_match(void *x, void *key) |
| { |
| job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x; |
| uint32_t *job_id = (uint32_t *) key; |
| if (job_limits_ptr->job_id == *job_id) |
| return 1; |
| return 0; |
| } |
| |
| static int _step_limits_match(void *x, void *key) |
| { |
| job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x; |
| step_loc_t *step_ptr = (step_loc_t *) key; |
| |
| if ((job_limits_ptr->job_id == step_ptr->jobid) && |
| (job_limits_ptr->step_id == step_ptr->stepid)) |
| return 1; |
| return 0; |
| } |
| |
| /* Call only with job_limits_mutex locked */ |
| static void |
| _load_job_limits(void) |
| { |
| List steps; |
| ListIterator step_iter; |
| step_loc_t *stepd; |
| int fd; |
| job_mem_limits_t *job_limits_ptr; |
| slurmstepd_info_t *stepd_info_ptr; |
| |
| if (!job_limits_list) |
| job_limits_list = list_create(_job_limits_free); |
| job_limits_loaded = true; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| step_iter = list_iterator_create(steps); |
| while ((stepd = list_next(step_iter))) { |
| job_limits_ptr = list_find_first(job_limits_list, |
| _step_limits_match, stepd); |
| if (job_limits_ptr) /* already processed */ |
| continue; |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) |
| continue; /* step completed */ |
| stepd_info_ptr = stepd_get_info(fd); |
| if (stepd_info_ptr && |
| (stepd_info_ptr->job_mem_limit || |
| stepd_info_ptr->step_mem_limit)) { |
| /* create entry for this job */ |
| job_limits_ptr = xmalloc(sizeof(job_mem_limits_t)); |
| job_limits_ptr->job_id = stepd->jobid; |
| job_limits_ptr->step_id = stepd->stepid; |
| job_limits_ptr->job_mem = stepd_info_ptr-> |
| job_mem_limit; |
| job_limits_ptr->step_mem = stepd_info_ptr-> |
| step_mem_limit; |
| #if _LIMIT_INFO |
| info("RecLim step:%u.%u job_mem:%u step_mem:%u", |
| job_limits_ptr->job_id, job_limits_ptr->step_id, |
| job_limits_ptr->job_mem, |
| job_limits_ptr->step_mem); |
| #endif |
| list_append(job_limits_list, job_limits_ptr); |
| } |
| xfree(stepd_info_ptr); |
| close(fd); |
| } |
| list_iterator_destroy(step_iter); |
| list_destroy(steps); |
| } |
| |
| static void |
| _cancel_step_mem_limit(uint32_t job_id, uint32_t step_id) |
| { |
| slurm_msg_t msg; |
| job_notify_msg_t notify_req; |
| job_step_kill_msg_t kill_req; |
| |
| /* NOTE: Batch jobs may have no srun to get this message */ |
| slurm_msg_t_init(&msg); |
| notify_req.job_id = job_id; |
| notify_req.job_step_id = step_id; |
| notify_req.message = "Exceeded job memory limit"; |
| msg.msg_type = REQUEST_JOB_NOTIFY; |
| msg.data = ¬ify_req; |
| slurm_send_only_controller_msg(&msg); |
| |
| kill_req.job_id = job_id; |
| kill_req.job_step_id = step_id; |
| kill_req.signal = SIGKILL; |
| kill_req.flags = (uint16_t) 0; |
| msg.msg_type = REQUEST_CANCEL_JOB_STEP; |
| msg.data = &kill_req; |
| slurm_send_only_controller_msg(&msg); |
| } |
| |
| /* Enforce job memory limits here in slurmd. Step memory limits are |
| * enforced within slurmstepd (using jobacct_gather plugin). */ |
| static void |
| _enforce_job_mem_limit(void) |
| { |
| List steps; |
| ListIterator step_iter, job_limits_iter; |
| job_mem_limits_t *job_limits_ptr; |
| step_loc_t *stepd; |
| int fd, i, job_inx, job_cnt; |
| uint16_t vsize_factor; |
| uint64_t step_rss, step_vsize; |
| job_step_id_msg_t acct_req; |
| job_step_stat_t *resp = NULL; |
| struct job_mem_info { |
| uint32_t job_id; |
| uint32_t mem_limit; /* MB */ |
| uint32_t mem_used; /* MB */ |
| uint32_t vsize_limit; /* MB */ |
| uint32_t vsize_used; /* MB */ |
| }; |
| struct job_mem_info *job_mem_info_ptr = NULL; |
| |
| slurm_mutex_lock(&job_limits_mutex); |
| if (!job_limits_loaded) |
| _load_job_limits(); |
| if (list_count(job_limits_list) == 0) { |
| slurm_mutex_unlock(&job_limits_mutex); |
| return; |
| } |
| |
| /* Build table of job limits, use highest mem limit recorded */ |
| job_mem_info_ptr = xmalloc((list_count(job_limits_list) + 1) * |
| sizeof(struct job_mem_info)); |
| job_cnt = 0; |
| job_limits_iter = list_iterator_create(job_limits_list); |
| while ((job_limits_ptr = list_next(job_limits_iter))) { |
| if (job_limits_ptr->job_mem == 0) /* no job limit */ |
| continue; |
| for (i=0; i<job_cnt; i++) { |
| if (job_mem_info_ptr[i].job_id != |
| job_limits_ptr->job_id) |
| continue; |
| job_mem_info_ptr[i].mem_limit = MAX( |
| job_mem_info_ptr[i].mem_limit, |
| job_limits_ptr->job_mem); |
| break; |
| } |
| if (i < job_cnt) /* job already found & recorded */ |
| continue; |
| job_mem_info_ptr[job_cnt].job_id = job_limits_ptr->job_id; |
| job_mem_info_ptr[job_cnt].mem_limit = job_limits_ptr->job_mem; |
| job_cnt++; |
| } |
| list_iterator_destroy(job_limits_iter); |
| slurm_mutex_unlock(&job_limits_mutex); |
| |
| vsize_factor = slurm_get_vsize_factor(); |
| for (i=0; i<job_cnt; i++) { |
| job_mem_info_ptr[i].vsize_limit = job_mem_info_ptr[i]. |
| mem_limit; |
| job_mem_info_ptr[i].vsize_limit *= (vsize_factor / 100.0); |
| } |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| step_iter = list_iterator_create(steps); |
| while ((stepd = list_next(step_iter))) { |
| for (job_inx=0; job_inx<job_cnt; job_inx++) { |
| if (job_mem_info_ptr[job_inx].job_id == stepd->jobid) |
| break; |
| } |
| if (job_inx >= job_cnt) |
| continue; /* job/step not being tracked */ |
| |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) |
| continue; /* step completed */ |
| acct_req.job_id = stepd->jobid; |
| acct_req.step_id = stepd->stepid; |
| resp = xmalloc(sizeof(job_step_stat_t)); |
| |
| if (!stepd->stepd_info) |
| stepd->stepd_info = stepd_get_info(fd); |
| |
| if ((!stepd_stat_jobacct( |
| fd, &acct_req, resp, |
| stepd->stepd_info->protocol_version)) && |
| (resp->jobacct)) { |
| /* resp->jobacct is NULL if account is disabled */ |
| jobacctinfo_getinfo((struct jobacctinfo *) |
| resp->jobacct, |
| JOBACCT_DATA_TOT_RSS, |
| &step_rss, |
| stepd->stepd_info-> |
| protocol_version); |
| jobacctinfo_getinfo((struct jobacctinfo *) |
| resp->jobacct, |
| JOBACCT_DATA_TOT_VSIZE, |
| &step_vsize, |
| stepd->stepd_info-> |
| protocol_version); |
| #if _LIMIT_INFO |
| info("Step:%u.%u RSS:%"PRIu64" KB VSIZE:%"PRIu64" KB", |
| stepd->jobid, stepd->stepid, |
| step_rss, step_vsize); |
| #endif |
| step_rss /= 1024; /* KB to MB */ |
| step_rss = MAX(step_rss, 1); |
| job_mem_info_ptr[job_inx].mem_used += step_rss; |
| step_vsize /= 1024; /* KB to MB */ |
| step_vsize = MAX(step_vsize, 1); |
| job_mem_info_ptr[job_inx].vsize_used += step_vsize; |
| } |
| slurm_free_job_step_stat(resp); |
| close(fd); |
| } |
| list_iterator_destroy(step_iter); |
| list_destroy(steps); |
| |
| for (i=0; i<job_cnt; i++) { |
| if (job_mem_info_ptr[i].mem_used == 0) { |
| /* no steps found, |
| * purge records for all steps of this job */ |
| slurm_mutex_lock(&job_limits_mutex); |
| list_delete_all(job_limits_list, _job_limits_match, |
| &job_mem_info_ptr[i].job_id); |
| slurm_mutex_unlock(&job_limits_mutex); |
| break; |
| } |
| |
| if ((job_mem_info_ptr[i].mem_limit != 0) && |
| (job_mem_info_ptr[i].mem_used > |
| job_mem_info_ptr[i].mem_limit)) { |
| info("Job %u exceeded memory limit (%u>%u), " |
| "cancelling it", job_mem_info_ptr[i].job_id, |
| job_mem_info_ptr[i].mem_used, |
| job_mem_info_ptr[i].mem_limit); |
| _cancel_step_mem_limit(job_mem_info_ptr[i].job_id, |
| NO_VAL); |
| } else if ((job_mem_info_ptr[i].vsize_limit != 0) && |
| (job_mem_info_ptr[i].vsize_used > |
| job_mem_info_ptr[i].vsize_limit)) { |
| info("Job %u exceeded virtual memory limit (%u>%u), " |
| "cancelling it", job_mem_info_ptr[i].job_id, |
| job_mem_info_ptr[i].vsize_used, |
| job_mem_info_ptr[i].vsize_limit); |
| _cancel_step_mem_limit(job_mem_info_ptr[i].job_id, |
| NO_VAL); |
| } |
| } |
| xfree(job_mem_info_ptr); |
| } |
| |
| static int |
| _rpc_ping(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| static bool first_msg = true; |
| |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation, ping RPC from uid %d", |
| req_uid); |
| if (first_msg) { |
| error("Do you have SlurmUser configured as uid %d?", |
| req_uid); |
| } |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| } |
| first_msg = false; |
| |
| if (rc != SLURM_SUCCESS) { |
| /* Return result. If the reply can't be sent this indicates |
| * 1. The network is broken OR |
| * 2. slurmctld has died OR |
| * 3. slurmd was paged out due to full memory |
| * If the reply request fails, we send an registration message |
| * to slurmctld in hopes of avoiding having the node set DOWN |
| * due to slurmd paging and not being able to respond in a |
| * timely fashion. */ |
| if (slurm_send_rc_msg(msg, rc) < 0) { |
| error("Error responding to ping: %m"); |
| send_registration_msg(SLURM_SUCCESS, false); |
| } |
| } else { |
| slurm_msg_t resp_msg; |
| ping_slurmd_resp_msg_t ping_resp; |
| get_cpu_load(&ping_resp.cpu_load); |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp_msg.msg_type = RESPONSE_PING_SLURMD; |
| resp_msg.data = &ping_resp; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| } |
| |
| /* Take this opportunity to enforce any job memory limits */ |
| _enforce_job_mem_limit(); |
| return rc; |
| } |
| |
| static int |
| _rpc_health_check(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation, health check RPC from uid %d", |
| req_uid); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| } |
| |
| /* Return result. If the reply can't be sent this indicates that |
| * 1. The network is broken OR |
| * 2. slurmctld has died OR |
| * 3. slurmd was paged out due to full memory |
| * If the reply request fails, we send an registration message to |
| * slurmctld in hopes of avoiding having the node set DOWN due to |
| * slurmd paging and not being able to respond in a timely fashion. */ |
| if (slurm_send_rc_msg(msg, rc) < 0) { |
| error("Error responding to ping: %m"); |
| send_registration_msg(SLURM_SUCCESS, false); |
| } |
| |
| if ((rc == SLURM_SUCCESS) && (conf->health_check_program)) { |
| char *env[1] = { NULL }; |
| rc = run_script("health_check", conf->health_check_program, |
| 0, 60, env, 0); |
| } |
| |
| /* Take this opportunity to enforce any job memory limits */ |
| _enforce_job_mem_limit(); |
| return rc; |
| } |
| |
| |
| static int |
| _rpc_acct_gather_update(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| static bool first_msg = true; |
| |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation, acct_gather_update RPC from uid %d", |
| req_uid); |
| if (first_msg) { |
| error("Do you have SlurmUser configured as uid %d?", |
| req_uid); |
| } |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| } |
| first_msg = false; |
| |
| if (rc != SLURM_SUCCESS) { |
| /* Return result. If the reply can't be sent this indicates |
| * 1. The network is broken OR |
| * 2. slurmctld has died OR |
| * 3. slurmd was paged out due to full memory |
| * If the reply request fails, we send an registration message |
| * to slurmctld in hopes of avoiding having the node set DOWN |
| * due to slurmd paging and not being able to respond in a |
| * timely fashion. */ |
| if (slurm_send_rc_msg(msg, rc) < 0) { |
| error("Error responding to ping: %m"); |
| send_registration_msg(SLURM_SUCCESS, false); |
| } |
| } else { |
| slurm_msg_t resp_msg; |
| acct_gather_node_resp_msg_t acct_msg; |
| |
| /* Update node energy usage data */ |
| acct_gather_energy_g_update_node_energy(); |
| |
| memset(&acct_msg, 0, sizeof(acct_gather_node_resp_msg_t)); |
| acct_msg.node_name = conf->node_name; |
| acct_msg.energy = acct_gather_energy_alloc(); |
| acct_gather_energy_g_get_data( |
| ENERGY_DATA_STRUCT, acct_msg.energy); |
| |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp_msg.msg_type = RESPONSE_ACCT_GATHER_UPDATE; |
| resp_msg.data = &acct_msg; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| |
| acct_gather_energy_destroy(acct_msg.energy); |
| } |
| return rc; |
| } |
| |
| static int |
| _rpc_acct_gather_energy(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| static bool first_msg = true; |
| |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation, acct_gather_update RPC from uid %d", |
| req_uid); |
| if (first_msg) { |
| error("Do you have SlurmUser configured as uid %d?", |
| req_uid); |
| } |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| } |
| first_msg = false; |
| |
| if (rc != SLURM_SUCCESS) { |
| if (slurm_send_rc_msg(msg, rc) < 0) |
| error("Error responding to energy request: %m"); |
| } else { |
| slurm_msg_t resp_msg; |
| acct_gather_node_resp_msg_t acct_msg; |
| time_t now = time(NULL), last_poll = 0; |
| int data_type = ENERGY_DATA_STRUCT; |
| acct_gather_energy_req_msg_t *req = msg->data; |
| |
| acct_gather_energy_g_get_data(ENERGY_DATA_LAST_POLL, |
| &last_poll); |
| |
| /* If we polled later than delta seconds then force a |
| new poll. |
| */ |
| if ((now - last_poll) > req->delta) |
| data_type = ENERGY_DATA_JOULES_TASK; |
| |
| memset(&acct_msg, 0, sizeof(acct_gather_node_resp_msg_t)); |
| acct_msg.energy = acct_gather_energy_alloc(); |
| acct_gather_energy_g_get_data(data_type, acct_msg.energy); |
| |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp_msg.msg_type = RESPONSE_ACCT_GATHER_ENERGY; |
| resp_msg.data = &acct_msg; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| |
| acct_gather_energy_destroy(acct_msg.energy); |
| } |
| return rc; |
| } |
| |
| static int |
| _signal_jobstep(uint32_t jobid, uint32_t stepid, uid_t req_uid, |
| uint32_t signal) |
| { |
| int fd, rc = SLURM_SUCCESS; |
| slurmstepd_info_t *step; |
| |
| fd = stepd_connect(conf->spooldir, conf->node_name, jobid, stepid); |
| if (fd == -1) { |
| debug("signal for nonexistant %u.%u stepd_connect failed: %m", |
| jobid, stepid); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| if ((step = stepd_get_info(fd)) == NULL) { |
| debug("signal for nonexistent job %u.%u requested", |
| jobid, stepid); |
| close(fd); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| |
| if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { |
| debug("kill req from uid %ld for job %u.%u owned by uid %ld", |
| (long) req_uid, jobid, stepid, (long) step->uid); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| goto done2; |
| } |
| |
| #ifdef HAVE_AIX |
| # ifdef SIGMIGRATE |
| # ifdef SIGSOUND |
| /* SIGMIGRATE and SIGSOUND are used to initiate job checkpoint on AIX. |
| * These signals are not sent to the entire process group, but just a |
| * single process, namely the PMD. */ |
| if (signal == SIGMIGRATE || signal == SIGSOUND) { |
| rc = stepd_signal_task_local(fd, signal, 0); |
| goto done2; |
| } |
| # endif |
| # endif |
| #endif |
| |
| rc = stepd_signal_container(fd, signal); |
| if (rc == -1) |
| rc = ESLURMD_JOB_NOTRUNNING; |
| |
| done2: |
| xfree(step); |
| close(fd); |
| return rc; |
| } |
| |
| static void |
| _rpc_signal_tasks(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; |
| |
| #ifdef HAVE_XCPU |
| if (!_slurm_authorized_user(req_uid)) { |
| error("REQUEST_SIGNAL_TASKS not support with XCPU system"); |
| return ESLURM_NOT_SUPPORTED; |
| } |
| #endif |
| |
| debug("Sending signal %u to step %u.%u", req->signal, req->job_id, |
| req->job_step_id); |
| rc = _signal_jobstep(req->job_id, req->job_step_id, req_uid, |
| req->signal); |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| static void |
| _rpc_checkpoint_tasks(slurm_msg_t *msg) |
| { |
| int fd; |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| checkpoint_tasks_msg_t *req = (checkpoint_tasks_msg_t *) msg->data; |
| slurmstepd_info_t *step; |
| |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->job_step_id); |
| if (fd == -1) { |
| debug("checkpoint for nonexistant %u.%u stepd_connect " |
| "failed: %m", req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done; |
| } |
| if ((step = stepd_get_info(fd)) == NULL) { |
| debug("checkpoint for nonexistent job %u.%u requested", |
| req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done2; |
| } |
| |
| if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { |
| debug("checkpoint req from uid %ld for job %u.%u owned by " |
| "uid %ld", (long) req_uid, req->job_id, req->job_step_id, |
| (long) step->uid); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| goto done3; |
| } |
| |
| rc = stepd_checkpoint(fd, req->timestamp, req->image_dir); |
| if (rc == -1) |
| rc = ESLURMD_JOB_NOTRUNNING; |
| |
| done3: |
| xfree(step); |
| done2: |
| close(fd); |
| done: |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| static void |
| _rpc_terminate_tasks(slurm_msg_t *msg) |
| { |
| kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; |
| int rc = SLURM_SUCCESS; |
| int fd; |
| uid_t req_uid; |
| slurmstepd_info_t *step; |
| |
| debug3("Entering _rpc_terminate_tasks"); |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->job_step_id); |
| if (fd == -1) { |
| debug("kill for nonexistant job %u.%u stepd_connect " |
| "failed: %m", req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done; |
| } |
| if (!(step = stepd_get_info(fd))) { |
| debug("kill for nonexistent job %u.%u requested", |
| req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done2; |
| } |
| |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { |
| debug("kill req from uid %ld for job %u.%u owned by uid %ld", |
| (long) req_uid, req->job_id, req->job_step_id, |
| (long) step->uid); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| goto done3; |
| } |
| |
| rc = stepd_terminate(fd); |
| if (rc == -1) |
| rc = ESLURMD_JOB_NOTRUNNING; |
| |
| done3: |
| xfree(step); |
| done2: |
| close(fd); |
| done: |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| static int |
| _rpc_step_complete(slurm_msg_t *msg) |
| { |
| step_complete_msg_t *req = (step_complete_msg_t *)msg->data; |
| int rc = SLURM_SUCCESS; |
| int fd; |
| uid_t req_uid; |
| |
| debug3("Entering _rpc_step_complete"); |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->job_step_id); |
| if (fd == -1) { |
| error("stepd_connect to %u.%u failed: %m", |
| req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done; |
| } |
| |
| /* step completion messages are only allowed from other slurmstepd, |
| so only root or SlurmUser is allowed here */ |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| if (!_slurm_authorized_user(req_uid)) { |
| debug("step completion from uid %ld for job %u.%u", |
| (long) req_uid, req->job_id, req->job_step_id); |
| rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ |
| goto done2; |
| } |
| |
| rc = stepd_completion(fd, req); |
| if (rc == -1) |
| rc = ESLURMD_JOB_NOTRUNNING; |
| |
| done2: |
| close(fd); |
| done: |
| slurm_send_rc_msg(msg, rc); |
| |
| return rc; |
| } |
| |
| /* Get list of active jobs and steps, xfree returned value */ |
| static char * |
| _get_step_list(void) |
| { |
| char tmp[64]; |
| char *step_list = NULL; |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| int fd; |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) |
| continue; |
| if (stepd_state(fd) == SLURMSTEPD_NOT_RUNNING) { |
| debug("stale domain socket for stepd %u.%u ", |
| stepd->jobid, stepd->stepid); |
| close(fd); |
| continue; |
| } |
| close(fd); |
| |
| if (step_list) |
| xstrcat(step_list, ", "); |
| if (stepd->stepid == NO_VAL) { |
| snprintf(tmp, sizeof(tmp), "%u", |
| stepd->jobid); |
| xstrcat(step_list, tmp); |
| } else { |
| snprintf(tmp, sizeof(tmp), "%u.%u", |
| stepd->jobid, stepd->stepid); |
| xstrcat(step_list, tmp); |
| } |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| if (step_list == NULL) |
| xstrcat(step_list, "NONE"); |
| return step_list; |
| } |
| |
| static int |
| _rpc_daemon_status(slurm_msg_t *msg) |
| { |
| slurm_msg_t resp_msg; |
| slurmd_status_t *resp = NULL; |
| |
| resp = xmalloc(sizeof(slurmd_status_t)); |
| resp->actual_cpus = conf->actual_cpus; |
| resp->actual_boards = conf->actual_boards; |
| resp->actual_sockets = conf->actual_sockets; |
| resp->actual_cores = conf->actual_cores; |
| resp->actual_threads = conf->actual_threads; |
| resp->actual_real_mem = conf->real_memory_size; |
| resp->actual_tmp_disk = conf->tmp_disk_space; |
| resp->booted = startup; |
| resp->hostname = xstrdup(conf->node_name); |
| resp->step_list = _get_step_list(); |
| resp->last_slurmctld_msg = last_slurmctld_msg; |
| resp->pid = conf->pid; |
| resp->slurmd_debug = conf->debug_level; |
| resp->slurmd_logfile = xstrdup(conf->logfile); |
| resp->version = xstrdup(SLURM_VERSION_STRING); |
| |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp_msg.msg_type = RESPONSE_SLURMD_STATUS; |
| resp_msg.data = resp; |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| slurm_free_slurmd_status(resp); |
| return SLURM_SUCCESS; |
| } |
| |
| static int |
| _rpc_stat_jobacct(slurm_msg_t *msg) |
| { |
| job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data; |
| slurm_msg_t resp_msg; |
| job_step_stat_t *resp = NULL; |
| int fd; |
| uid_t req_uid; |
| long job_uid; |
| slurmstepd_info_t *stepd_info = NULL; |
| uint16_t protocol_version; |
| |
| debug3("Entering _rpc_stat_jobacct"); |
| /* step completion messages are only allowed from other slurmstepd, |
| so only root or SlurmUser is allowed here */ |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| stepd_info = _get_job_step_info(req->job_id); |
| if (!stepd_info) { |
| error("stat_jobacct For invalid job_id: %u", |
| req->job_id); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| |
| protocol_version = stepd_info->protocol_version; |
| job_uid = stepd_info->uid; |
| xfree(stepd_info); |
| |
| if (job_uid < 0) { |
| error("stat_jobacct for invalid job_id: %u", |
| req->job_id); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| |
| /* |
| * check that requesting user ID is the SLURM UID or root |
| */ |
| if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) { |
| error("stat_jobacct from uid %ld for job %u " |
| "owned by uid %ld", |
| (long) req_uid, req->job_id, job_uid); |
| |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return ESLURM_USER_ID_MISSING;/* or bad in this case */ |
| } |
| } |
| |
| resp = xmalloc(sizeof(job_step_stat_t)); |
| resp->step_pids = xmalloc(sizeof(job_step_pids_t)); |
| resp->step_pids->node_name = xstrdup(conf->node_name); |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp->return_code = SLURM_SUCCESS; |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->step_id); |
| if (fd == -1) { |
| error("stepd_connect to %u.%u failed: %m", |
| req->job_id, req->step_id); |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| slurm_free_job_step_stat(resp); |
| return ESLURM_INVALID_JOB_ID; |
| |
| } |
| |
| if (stepd_stat_jobacct(fd, req, resp, protocol_version) |
| == SLURM_ERROR) { |
| debug("accounting for nonexistent job %u.%u requested", |
| req->job_id, req->step_id); |
| } |
| |
| /* FIX ME: This should probably happen in the |
| stepd_stat_jobacct to get more information about the pids. |
| */ |
| if (stepd_list_pids(fd, &resp->step_pids->pid, |
| &resp->step_pids->pid_cnt) == SLURM_ERROR) { |
| debug("No pids for nonexistent job %u.%u requested", |
| req->job_id, req->step_id); |
| } |
| |
| close(fd); |
| |
| resp_msg.msg_type = RESPONSE_JOB_STEP_STAT; |
| resp_msg.data = resp; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| slurm_free_job_step_stat(resp); |
| return SLURM_SUCCESS; |
| } |
| |
| static int |
| _rpc_list_pids(slurm_msg_t *msg) |
| { |
| job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data; |
| slurm_msg_t resp_msg; |
| job_step_pids_t *resp = NULL; |
| int fd; |
| uid_t req_uid; |
| long job_uid; |
| |
| debug3("Entering _rpc_list_pids"); |
| /* step completion messages are only allowed from other slurmstepd, |
| so only root or SlurmUser is allowed here */ |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| job_uid = _get_job_uid(req->job_id); |
| |
| if (job_uid < 0) { |
| error("stat_pid for invalid job_id: %u", |
| req->job_id); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| |
| /* |
| * check that requesting user ID is the SLURM UID or root |
| */ |
| if ((req_uid != job_uid) |
| && (!_slurm_authorized_user(req_uid))) { |
| error("stat_pid from uid %ld for job %u " |
| "owned by uid %ld", |
| (long) req_uid, req->job_id, job_uid); |
| |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return ESLURM_USER_ID_MISSING;/* or bad in this case */ |
| } |
| } |
| |
| resp = xmalloc(sizeof(job_step_pids_t)); |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp->node_name = xstrdup(conf->node_name); |
| resp->pid_cnt = 0; |
| resp->pid = NULL; |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->step_id); |
| if (fd == -1) { |
| error("stepd_connect to %u.%u failed: %m", |
| req->job_id, req->step_id); |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| slurm_free_job_step_pids(resp); |
| return ESLURM_INVALID_JOB_ID; |
| |
| } |
| |
| if (stepd_list_pids(fd, &resp->pid, &resp->pid_cnt) == SLURM_ERROR) { |
| debug("No pids for nonexistent job %u.%u requested", |
| req->job_id, req->step_id); |
| } |
| |
| close(fd); |
| |
| resp_msg.msg_type = RESPONSE_JOB_STEP_PIDS; |
| resp_msg.data = resp; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| slurm_free_job_step_pids(resp); |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * For the specified job_id: reply to slurmctld, |
| * sleep(configured kill_wait), then send SIGKILL |
| */ |
| static void |
| _rpc_timelimit(slurm_msg_t *msg) |
| { |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| kill_job_msg_t *req = msg->data; |
| int nsteps, rc; |
| |
| if (!_slurm_authorized_user(uid)) { |
| error ("Security violation: rpc_timelimit req from uid %d", |
| uid); |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return; |
| } |
| |
| /* |
| * Indicate to slurmctld that we've received the message |
| */ |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| slurm_close_accepted_conn(msg->conn_fd); |
| msg->conn_fd = -1; |
| |
| if (req->step_id != NO_VAL) { |
| slurm_ctl_conf_t *cf; |
| int delay; |
| /* A jobstep has timed out: |
| * - send the container a SIG_TIME_LIMIT or SIG_PREEMPTED |
| * to log the event |
| * - send a SIGCONT to resume any suspended tasks |
| * - send a SIGTERM to begin termination |
| * - sleep KILL_WAIT |
| * - send a SIGKILL to clean up |
| */ |
| if (msg->msg_type == REQUEST_KILL_TIMELIMIT) { |
| rc = _signal_jobstep(req->job_id, req->step_id, uid, |
| SIG_TIME_LIMIT); |
| } else { |
| rc = _signal_jobstep(req->job_id, req->step_id, uid, |
| SIG_PREEMPTED); |
| } |
| if (rc != SLURM_SUCCESS) |
| return; |
| rc = _signal_jobstep(req->job_id, req->step_id, uid, SIGCONT); |
| if (rc != SLURM_SUCCESS) |
| return; |
| rc = _signal_jobstep(req->job_id, req->step_id, uid, SIGTERM); |
| if (rc != SLURM_SUCCESS) |
| return; |
| cf = slurm_conf_lock(); |
| delay = MAX(cf->kill_wait, 5); |
| slurm_conf_unlock(); |
| sleep(delay); |
| _signal_jobstep(req->job_id, req->step_id, uid, SIGKILL); |
| return; |
| } |
| |
| if (msg->msg_type == REQUEST_KILL_TIMELIMIT) |
| _kill_all_active_steps(req->job_id, SIG_TIME_LIMIT, true); |
| else /* (msg->type == REQUEST_KILL_PREEMPTED) */ |
| _kill_all_active_steps(req->job_id, SIG_PREEMPTED, true); |
| nsteps = xcpu_signal(SIGTERM, req->nodes) + |
| _kill_all_active_steps(req->job_id, SIGTERM, false); |
| verbose( "Job %u: timeout: sent SIGTERM to %d active steps", |
| req->job_id, nsteps ); |
| |
| /* Revoke credential, send SIGKILL, run epilog, etc. */ |
| _rpc_terminate_job(msg); |
| } |
| |
| static void _rpc_pid2jid(slurm_msg_t *msg) |
| { |
| job_id_request_msg_t *req = (job_id_request_msg_t *) msg->data; |
| slurm_msg_t resp_msg; |
| job_id_response_msg_t resp; |
| bool found = false; |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| int fd; |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) |
| continue; |
| if (stepd_pid_in_container(fd, req->job_pid) |
| || req->job_pid == stepd_daemon_pid(fd)) { |
| slurm_msg_t_copy(&resp_msg, msg); |
| resp.job_id = stepd->jobid; |
| resp.return_code = SLURM_SUCCESS; |
| found = true; |
| close(fd); |
| break; |
| } |
| close(fd); |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| if (found) { |
| debug3("_rpc_pid2jid: pid(%u) found in %u", |
| req->job_pid, resp.job_id); |
| resp_msg.address = msg->address; |
| resp_msg.msg_type = RESPONSE_JOB_ID; |
| resp_msg.data = &resp; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| } else { |
| debug3("_rpc_pid2jid: pid(%u) not found", req->job_pid); |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| } |
| } |
| |
| /* Creates an array of group ids and stores in it the list of groups |
| * that user my_uid belongs to. The pointer to the list is returned |
| * in groups and the count of gids in ngroups. The caller must free |
| * the group list array pointed to by groups */ |
| static int |
| _get_grouplist(char **user_name, uid_t my_uid, gid_t my_gid, |
| int *ngroups, gid_t **groups) |
| { |
| if (!*user_name) |
| *user_name = uid_to_string(my_uid); |
| |
| if (!*user_name) { |
| error("sbcast: Could not find uid %ld", (long)my_uid); |
| return -1; |
| } |
| |
| *groups = (gid_t *) xmalloc(*ngroups * sizeof(gid_t)); |
| |
| if (getgrouplist(*user_name, my_gid, *groups, ngroups) < 0) { |
| *groups = xrealloc(*groups, *ngroups * sizeof(gid_t)); |
| getgrouplist(*user_name, my_gid, *groups, ngroups); |
| } |
| |
| return 0; |
| |
| } |
| |
| /* Validate sbcast credential. |
| * NOTE: We can only perform the full credential validation once with |
| * Munge without generating a credential replay error |
| * RET SLURM_SUCCESS or an error code */ |
| static int |
| _valid_sbcast_cred(file_bcast_msg_t *req, uid_t req_uid, uint16_t block_no, |
| uint32_t *job_id) |
| { |
| int rc = SLURM_SUCCESS; |
| char *nodes = NULL; |
| hostset_t hset = NULL; |
| |
| *job_id = NO_VAL; |
| rc = extract_sbcast_cred(conf->vctx, req->cred, block_no, |
| job_id, &nodes); |
| if (rc != 0) { |
| error("Security violation: Invalid sbcast_cred from uid %d", |
| req_uid); |
| return ESLURMD_INVALID_JOB_CREDENTIAL; |
| } |
| |
| if (!(hset = hostset_create(nodes))) { |
| error("Unable to parse sbcast_cred hostlist %s", nodes); |
| rc = ESLURMD_INVALID_JOB_CREDENTIAL; |
| } else if (!hostset_within(hset, conf->node_name)) { |
| error("Security violation: sbcast_cred from %d has " |
| "bad hostset %s", req_uid, nodes); |
| rc = ESLURMD_INVALID_JOB_CREDENTIAL; |
| } |
| if (hset) |
| hostset_destroy(hset); |
| xfree(nodes); |
| |
| /* print_sbcast_cred(req->cred); */ |
| |
| return rc; |
| } |
| |
| static int |
| _rpc_file_bcast(slurm_msg_t *msg) |
| { |
| file_bcast_msg_t *req = msg->data; |
| int fd, flags, offset, inx, rc; |
| int ngroups = 16; |
| gid_t *groups; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| gid_t req_gid = g_slurm_auth_get_gid(msg->auth_cred, NULL); |
| pid_t child; |
| uint32_t job_id; |
| |
| #if 0 |
| info("last_block=%u force=%u modes=%o", |
| req->last_block, req->force, req->modes); |
| info("uid=%u gid=%u atime=%lu mtime=%lu block_len[0]=%u", |
| req->uid, req->gid, req->atime, req->mtime, req->block_len); |
| #if 0 |
| /* when the file being transferred is binary, the following line |
| * can break the terminal output for slurmd */ |
| info("req->block[0]=%s, @ %lu", \ |
| req->block[0], (unsigned long) &req->block); |
| #endif |
| #endif |
| |
| rc = _valid_sbcast_cred(req, req_uid, req->block_no, &job_id); |
| if ((rc != SLURM_SUCCESS) && !_slurm_authorized_user(req_uid)) |
| return rc; |
| |
| if (req->block_no == 1) { |
| info("sbcast req_uid=%u job_id=%u fname=%s block_no=%u", |
| req_uid, job_id, req->fname, req->block_no); |
| } else { |
| debug("sbcast req_uid=%u job_id=%u fname=%s block_no=%u", |
| req_uid, job_id, req->fname, req->block_no); |
| } |
| |
| if ((rc = _get_grouplist(&req->user_name, req_uid, |
| req_gid, &ngroups, &groups)) < 0) { |
| error("sbcast: getgrouplist(%u): %m", req_uid); |
| return rc; |
| } |
| |
| if ((req->block_no == 1) && (rc = container_g_create(job_id))) { |
| error("sbcast: container_g_create(%u): %m", job_id); |
| return rc; |
| } |
| |
| child = fork(); |
| if (child == -1) { |
| error("sbcast: fork failure"); |
| return errno; |
| } else if (child > 0) { |
| waitpid(child, &rc, 0); |
| xfree(groups); |
| return WEXITSTATUS(rc); |
| } |
| |
| /* container_g_add_pid needs to be called in the |
| forked process part of the fork to avoid a race |
| condition where if this process makes a file or |
| detacts itself from a child before we add the pid |
| to the container in the parent of the fork. |
| */ |
| if (container_g_add_pid(job_id, getpid(), req_uid) != SLURM_SUCCESS) |
| error("container_g_add_pid(%u): %m", job_id); |
| |
| /* The child actually performs the I/O and exits with |
| * a return code, do not return! */ |
| |
| /*********************************************************************\ |
| * NOTE: It would be best to do an exec() immediately after the fork() |
| * in order to help prevent a possible deadlock in the child process |
| * due to locks being set at the time of the fork and being freed by |
| * the parent process, but not freed by the child process. Performing |
| * the work inline is done for simplicity. Note that the logging |
| * performed by error() should be safe due to the use of |
| * atfork_install_handlers() as defined in src/common/log.c. |
| * Change the code below with caution. |
| \*********************************************************************/ |
| |
| if (setgroups(ngroups, groups) < 0) { |
| error("sbcast: uid: %u setgroups: %s", req_uid, |
| strerror(errno)); |
| exit(errno); |
| } |
| |
| if (setgid(req_gid) < 0) { |
| error("sbcast: uid:%u setgid(%u): %s", req_uid, req_gid, |
| strerror(errno)); |
| exit(errno); |
| } |
| if (setuid(req_uid) < 0) { |
| error("sbcast: getuid(%u): %s", req_uid, strerror(errno)); |
| exit(errno); |
| } |
| |
| flags = O_WRONLY; |
| if (req->block_no == 1) { |
| flags |= O_CREAT; |
| if (req->force) |
| flags |= O_TRUNC; |
| else |
| flags |= O_EXCL; |
| } else |
| flags |= O_APPEND; |
| |
| fd = open(req->fname, flags, 0700); |
| if (fd == -1) { |
| error("sbcast: uid:%u can't open `%s`: %s", |
| req_uid, req->fname, strerror(errno)); |
| exit(errno); |
| } |
| |
| offset = 0; |
| while (req->block_len - offset) { |
| inx = write(fd, &req->block[offset], |
| (req->block_len - offset)); |
| if (inx == -1) { |
| if ((errno == EINTR) || (errno == EAGAIN)) |
| continue; |
| error("sbcast: uid:%u can't write `%s`: %s", |
| req_uid, req->fname, strerror(errno)); |
| close(fd); |
| exit(errno); |
| } |
| offset += inx; |
| } |
| if (req->last_block && fchmod(fd, (req->modes & 0777))) { |
| error("sbcast: uid:%u can't chmod `%s`: %s", |
| req_uid, req->fname, strerror(errno)); |
| } |
| if (req->last_block && fchown(fd, req->uid, req->gid)) { |
| error("sbcast: uid:%u can't chown `%s`: %s", |
| req_uid, req->fname, strerror(errno)); |
| } |
| close(fd); |
| fd = 0; |
| if (req->last_block && req->atime) { |
| struct utimbuf time_buf; |
| time_buf.actime = req->atime; |
| time_buf.modtime = req->mtime; |
| if (utime(req->fname, &time_buf)) { |
| error("sbcast: uid:%u can't utime `%s`: %s", |
| req_uid, req->fname, strerror(errno)); |
| } |
| } |
| exit(SLURM_SUCCESS); |
| } |
| |
| static void |
| _rpc_reattach_tasks(slurm_msg_t *msg) |
| { |
| reattach_tasks_request_msg_t *req = msg->data; |
| reattach_tasks_response_msg_t *resp = |
| xmalloc(sizeof(reattach_tasks_response_msg_t)); |
| slurm_msg_t resp_msg; |
| int rc = SLURM_SUCCESS; |
| uint16_t port = 0; |
| char host[MAXHOSTNAMELEN]; |
| slurm_addr_t ioaddr; |
| void *job_cred_sig; |
| uint32_t len; |
| int fd; |
| uid_t req_uid; |
| slurmstepd_info_t *step = NULL; |
| slurm_addr_t *cli = &msg->orig_addr; |
| uint32_t nodeid = (uint32_t)NO_VAL; |
| |
| slurm_msg_t_copy(&resp_msg, msg); |
| fd = stepd_connect(conf->spooldir, conf->node_name, |
| req->job_id, req->job_step_id); |
| if (fd == -1) { |
| debug("reattach for nonexistent job %u.%u stepd_connect" |
| " failed: %m", req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done; |
| } |
| if ((step = stepd_get_info(fd)) == NULL) { |
| debug("reattach for nonexistent job %u.%u requested", |
| req->job_id, req->job_step_id); |
| rc = ESLURM_INVALID_JOB_ID; |
| goto done2; |
| } |
| nodeid = step->nodeid; |
| debug2("_rpc_reattach_tasks: nodeid %d in the job step", nodeid); |
| |
| req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { |
| error("uid %ld attempt to attach to job %u.%u owned by %ld", |
| (long) req_uid, req->job_id, req->job_step_id, |
| (long) step->uid); |
| rc = EPERM; |
| goto done3; |
| } |
| |
| memset(resp, 0, sizeof(reattach_tasks_response_msg_t)); |
| slurm_get_ip_str(cli, &port, host, sizeof(host)); |
| |
| /* |
| * Set response address by resp_port and client address |
| */ |
| memcpy(&resp_msg.address, cli, sizeof(slurm_addr_t)); |
| if (req->num_resp_port > 0) { |
| port = req->resp_port[nodeid % req->num_resp_port]; |
| slurm_set_addr(&resp_msg.address, port, NULL); |
| } |
| |
| /* |
| * Set IO address by io_port and client address |
| */ |
| memcpy(&ioaddr, cli, sizeof(slurm_addr_t)); |
| |
| if (req->num_io_port > 0) { |
| port = req->io_port[nodeid % req->num_io_port]; |
| slurm_set_addr(&ioaddr, port, NULL); |
| } |
| |
| /* |
| * Get the signature of the job credential. slurmstepd will need |
| * this to prove its identity when it connects back to srun. |
| */ |
| slurm_cred_get_signature(req->cred, (char **)(&job_cred_sig), &len); |
| if (len != SLURM_IO_KEY_SIZE) { |
| error("Incorrect slurm cred signature length"); |
| goto done3; |
| } |
| |
| resp->gtids = NULL; |
| resp->local_pids = NULL; |
| /* Following call fills in gtids and local_pids when successful */ |
| rc = stepd_attach(fd, &ioaddr, &resp_msg.address, job_cred_sig, resp); |
| if (rc != SLURM_SUCCESS) { |
| debug2("stepd_attach call failed"); |
| goto done3; |
| } |
| done3: |
| xfree(step); |
| done2: |
| close(fd); |
| done: |
| debug2("update step addrs rc = %d", rc); |
| resp_msg.data = resp; |
| resp_msg.msg_type = RESPONSE_REATTACH_TASKS; |
| resp->node_name = xstrdup(conf->node_name); |
| resp->return_code = rc; |
| debug2("node %s sending rc = %d", conf->node_name, rc); |
| |
| slurm_send_node_msg(msg->conn_fd, &resp_msg); |
| slurm_free_reattach_tasks_response_msg(resp); |
| } |
| |
| static slurmstepd_info_t *_get_job_step_info(uint32_t jobid) |
| { |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| slurmstepd_info_t *info = NULL; |
| int fd; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid != jobid) { |
| /* multiple jobs expected on shared nodes */ |
| continue; |
| } |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| info = stepd_get_info(fd); |
| close(fd); |
| if (info == NULL) { |
| debug("stepd_get_info failed %u.%u: %m", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| break; |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| return info; |
| } |
| |
| static long |
| _get_job_uid(uint32_t jobid) |
| { |
| slurmstepd_info_t *info = NULL; |
| long uid = -1; |
| |
| if ((info = _get_job_step_info(jobid))) { |
| uid = (long)info->uid; |
| xfree(info); |
| } |
| |
| return uid; |
| } |
| |
| /* |
| * _kill_all_active_steps - signals the container of all steps of a job |
| * jobid IN - id of job to signal |
| * sig IN - signal to send |
| * batch IN - if true signal batch script, otherwise skip it |
| * RET count of signaled job steps (plus batch script, if applicable) |
| */ |
| static int |
| _kill_all_active_steps(uint32_t jobid, int sig, bool batch) |
| { |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| int step_cnt = 0; |
| int fd; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid != jobid) { |
| /* multiple jobs expected on shared nodes */ |
| debug3("Step from other job: jobid=%u (this jobid=%u)", |
| stepd->jobid, jobid); |
| continue; |
| } |
| |
| if ((stepd->stepid == SLURM_BATCH_SCRIPT) && (!batch)) |
| continue; |
| |
| step_cnt++; |
| |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| debug2("container signal %d to job %u.%u", |
| sig, jobid, stepd->stepid); |
| if (stepd_signal_container(fd, sig) < 0) |
| debug("kill jobid=%u failed: %m", jobid); |
| close(fd); |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| if (step_cnt == 0) |
| debug2("No steps in jobid %u to send signal %d", jobid, sig); |
| return step_cnt; |
| } |
| |
| /* |
| * _terminate_all_steps - signals the container of all steps of a job |
| * jobid IN - id of job to signal |
| * batch IN - if true signal batch script, otherwise skip it |
| * RET count of signaled job steps (plus batch script, if applicable) |
| */ |
| static int |
| _terminate_all_steps(uint32_t jobid, bool batch) |
| { |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| int step_cnt = 0; |
| int fd; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid != jobid) { |
| /* multiple jobs expected on shared nodes */ |
| debug3("Step from other job: jobid=%u (this jobid=%u)", |
| stepd->jobid, jobid); |
| continue; |
| } |
| |
| if ((stepd->stepid == SLURM_BATCH_SCRIPT) && (!batch)) |
| continue; |
| |
| step_cnt++; |
| |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| debug2("terminate job step %u.%u", jobid, stepd->stepid); |
| if (stepd_terminate(fd) < 0) |
| debug("kill jobid=%u.%u failed: %m", jobid, |
| stepd->stepid); |
| close(fd); |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| if (step_cnt == 0) |
| debug2("No steps in job %u to terminate", jobid); |
| return step_cnt; |
| } |
| |
| static bool |
| _job_still_running(uint32_t job_id) |
| { |
| bool retval = false; |
| List steps; |
| ListIterator i; |
| step_loc_t *s = NULL; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((s = list_next(i))) { |
| if (s->jobid == job_id) { |
| int fd; |
| fd = stepd_connect(s->directory, s->nodename, |
| s->jobid, s->stepid); |
| if (fd == -1) |
| continue; |
| if (stepd_state(fd) != SLURMSTEPD_NOT_RUNNING) { |
| retval = true; |
| close(fd); |
| break; |
| } |
| close(fd); |
| } |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| return retval; |
| } |
| |
| /* |
| * Wait until all job steps are in SLURMSTEPD_NOT_RUNNING state. |
| * This indicates that switch_g_job_postfini has completed and |
| * freed the switch windows (as needed only for Federation switch). |
| */ |
| static void |
| _wait_state_completed(uint32_t jobid, int max_delay) |
| { |
| char *switch_type = slurm_get_switch_type(); |
| int i; |
| |
| if (strcmp(switch_type, "switch/federation")) { |
| xfree(switch_type); |
| return; |
| } |
| xfree(switch_type); |
| |
| for (i=0; i<max_delay; i++) { |
| if (_steps_completed_now(jobid)) |
| break; |
| sleep(1); |
| } |
| if (i >= max_delay) |
| error("timed out waiting for job %u to complete", jobid); |
| } |
| |
| static bool |
| _steps_completed_now(uint32_t jobid) |
| { |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| bool rc = true; |
| |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid == jobid) { |
| int fd; |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) |
| continue; |
| if (stepd_state(fd) != SLURMSTEPD_NOT_RUNNING) { |
| rc = false; |
| close(fd); |
| break; |
| } |
| close(fd); |
| } |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| return rc; |
| } |
| |
| /* |
| * Send epilog complete message to currently active controller. |
| * Returns SLURM_SUCCESS if message sent successfully, |
| * SLURM_FAILURE if epilog complete message fails to be sent. |
| */ |
| static int |
| _epilog_complete(uint32_t jobid, int rc) |
| { |
| int ret = SLURM_SUCCESS; |
| slurm_msg_t msg; |
| epilog_complete_msg_t req; |
| |
| slurm_msg_t_init(&msg); |
| |
| req.job_id = jobid; |
| req.return_code = rc; |
| req.node_name = conf->node_name; |
| |
| msg.msg_type = MESSAGE_EPILOG_COMPLETE; |
| msg.data = &req; |
| |
| /* Note: No return code to message, slurmctld will resend |
| * TERMINATE_JOB request if message send fails */ |
| if (slurm_send_only_controller_msg(&msg) < 0) { |
| error("Unable to send epilog complete message: %m"); |
| ret = SLURM_ERROR; |
| } else { |
| debug ("Job %u: sent epilog complete msg: rc = %d", jobid, rc); |
| } |
| |
| return ret; |
| } |
| |
| |
| /* |
| * Send a signal through the appropriate slurmstepds for each job step |
| * belonging to a given job allocation. |
| */ |
| static void |
| _rpc_signal_job(slurm_msg_t *msg) |
| { |
| signal_job_msg_t *req = msg->data; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| long job_uid; |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd = NULL; |
| int step_cnt = 0; |
| int fd; |
| |
| #ifdef HAVE_XCPU |
| if (!_slurm_authorized_user(req_uid)) { |
| error("REQUEST_SIGNAL_JOB not supported with XCPU system"); |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, ESLURM_NOT_SUPPORTED); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ("_rpc_signal_job: close(%d): %m", |
| msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| return; |
| } |
| #endif |
| |
| debug("_rpc_signal_job, uid = %d, signal = %d", req_uid, req->signal); |
| job_uid = _get_job_uid(req->job_id); |
| if (job_uid < 0) |
| goto no_job; |
| |
| /* |
| * check that requesting user ID is the SLURM UID or root |
| */ |
| if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) { |
| error("Security violation: kill_job(%u) from uid %d", |
| req->job_id, req_uid); |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ("_rpc_signal_job: close(%d): %m", |
| msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| return; |
| } |
| |
| /* |
| * Loop through all job steps for this job and signal the |
| * step's process group through the slurmstepd. |
| */ |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid != req->job_id) { |
| /* multiple jobs expected on shared nodes */ |
| debug3("Step from other job: jobid=%u (this jobid=%u)", |
| stepd->jobid, req->job_id); |
| continue; |
| } |
| |
| if (stepd->stepid == SLURM_BATCH_SCRIPT) { |
| debug2("batch script itself not signalled"); |
| continue; |
| } |
| |
| step_cnt++; |
| |
| fd = stepd_connect(stepd->directory, stepd->nodename, |
| stepd->jobid, stepd->stepid); |
| if (fd == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| debug2(" signal %d to job %u.%u", |
| req->signal, stepd->jobid, stepd->stepid); |
| if (stepd_signal_container(fd, req->signal) < 0) |
| debug("signal jobid=%u failed: %m", stepd->jobid); |
| close(fd); |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| no_job: |
| if (step_cnt == 0) { |
| debug2("No steps in jobid %u to send signal %d", |
| req->job_id, req->signal); |
| } |
| |
| /* |
| * At this point, if connection still open, we send controller |
| * a "success" reply to indicate that we've recvd the msg. |
| */ |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ("_rpc_signal_job: close(%d): %m", msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| } |
| |
| /* if a lock is granted to the job then return 1; else return 0 if |
| * the lock for the job is already taken or there's no more locks */ |
| static int |
| _get_suspend_job_lock(uint32_t jobid) |
| { |
| int i, spot = -1; |
| pthread_mutex_lock(&suspend_mutex); |
| |
| for (i = 0; i < job_suspend_size; i++) { |
| if (job_suspend_array[i] == -1) { |
| spot = i; |
| continue; |
| } |
| if (job_suspend_array[i] == jobid) { |
| /* another thread already has the lock */ |
| pthread_mutex_unlock(&suspend_mutex); |
| return 0; |
| } |
| } |
| i = 0; |
| if (spot != -1) { |
| /* nobody has the lock and here's an available used lock */ |
| job_suspend_array[spot] = jobid; |
| i = 1; |
| } else if (job_suspend_size < NUM_PARALLEL_SUSPEND) { |
| /* a new lock is available */ |
| job_suspend_array[job_suspend_size++] = jobid; |
| i = 1; |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| return i; |
| } |
| |
| static void |
| _unlock_suspend_job(uint32_t jobid) |
| { |
| int i; |
| pthread_mutex_lock(&suspend_mutex); |
| for (i = 0; i < job_suspend_size; i++) { |
| if (job_suspend_array[i] == jobid) |
| job_suspend_array[i] = -1; |
| } |
| pthread_mutex_unlock(&suspend_mutex); |
| } |
| |
| /* |
| * Send a job suspend/resume request through the appropriate slurmstepds for |
| * each job step belonging to a given job allocation. |
| */ |
| static void |
| _rpc_suspend_job(slurm_msg_t *msg) |
| { |
| suspend_int_msg_t *req = msg->data; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| List steps; |
| ListIterator i; |
| step_loc_t *stepd; |
| int step_cnt = 0; |
| int first_time, rc = SLURM_SUCCESS; |
| |
| if ((req->op != SUSPEND_JOB) && (req->op != RESUME_JOB)) { |
| error("REQUEST_SUSPEND_INT: bad op code %u", req->op); |
| rc = ESLURM_NOT_SUPPORTED; |
| } |
| |
| /* |
| * check that requesting user ID is the SLURM UID or root |
| */ |
| if (!_slurm_authorized_user(req_uid)) { |
| error("Security violation: suspend_job(%u) from uid %d", |
| req->job_id, req_uid); |
| rc = ESLURM_USER_ID_MISSING; |
| } |
| |
| /* send a response now, which will include any errors |
| * detected with the request */ |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, rc); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error("_rpc_suspend_job: close(%d): %m", |
| msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| if (rc != SLURM_SUCCESS) |
| return; |
| |
| /* now we can focus on performing the requested action, |
| * which could take a few seconds to complete */ |
| debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id, |
| req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume"); |
| |
| /* Try to get a thread lock for this job. If the lock |
| * is not available then sleep and try again */ |
| first_time = 1; |
| while (!_get_suspend_job_lock(req->job_id)) { |
| first_time = 0; |
| debug3("suspend lock sleep for %u", req->job_id); |
| sleep(1); |
| } |
| |
| /* If suspending and you got the lock on the first try then |
| * sleep for 1 second to give any launch requests a chance |
| * to get started and avoid a race condition that would |
| * effectively cause the suspend request to get ignored |
| * because "there's no job to suspend" */ |
| if (first_time && (req->op == SUSPEND_JOB)) { |
| debug3("suspend first sleep for %u", req->job_id); |
| sleep(1); |
| } |
| |
| if ((req->op == SUSPEND_JOB) && (req->indf_susp)) |
| switch_g_job_suspend(req->switch_info, 5); |
| |
| /* Release or reclaim resources bound to these tasks (task affinity) */ |
| if (req->op == SUSPEND_JOB) { |
| (void) task_g_slurmd_suspend_job(req->job_id); |
| } else { |
| (void) task_g_slurmd_resume_job(req->job_id); |
| } |
| |
| /* |
| * Loop through all job steps and call stepd_suspend or stepd_resume |
| * as appropriate. Since the "suspend" action contains a 'sleep 1', |
| * suspend multiple jobsteps in parallel. |
| */ |
| steps = stepd_available(conf->spooldir, conf->node_name); |
| i = list_iterator_create(steps); |
| |
| while (1) { |
| int x, fdi, fd[NUM_PARALLEL_SUSPEND]; |
| fdi = 0; |
| while ((stepd = list_next(i))) { |
| if (stepd->jobid != req->job_id) { |
| /* multiple jobs expected on shared nodes */ |
| debug3("Step from other job: jobid=%u " |
| "(this jobid=%u)", |
| stepd->jobid, req->job_id); |
| continue; |
| } |
| step_cnt++; |
| |
| fd[fdi] = stepd_connect(stepd->directory, |
| stepd->nodename, stepd->jobid, |
| stepd->stepid); |
| if (fd[fdi] == -1) { |
| debug3("Unable to connect to step %u.%u", |
| stepd->jobid, stepd->stepid); |
| continue; |
| } |
| |
| |
| fdi++; |
| if (fdi >= NUM_PARALLEL_SUSPEND) |
| break; |
| } |
| /* check for open connections */ |
| if (fdi == 0) |
| break; |
| |
| if (req->op == SUSPEND_JOB) { |
| stepd_suspend(fd, fdi, req->job_id); |
| } else { |
| /* "resume" remains a serial action (for now) */ |
| for (x = 0; x < fdi; x++) { |
| debug2("Resuming job %u (cached step count %d)", |
| req->job_id, x); |
| if (stepd_resume(fd[x]) < 0) { |
| debug("Resume of job %u failed: %m", |
| req->job_id); |
| } |
| } |
| } |
| for (x = 0; x < fdi; x++) |
| /* fd may have been closed by stepd_suspend */ |
| if (fd[x] != -1) |
| close(fd[x]); |
| |
| /* check for no more jobs */ |
| if (fdi < NUM_PARALLEL_SUSPEND) |
| break; |
| } |
| list_iterator_destroy(i); |
| list_destroy(steps); |
| |
| if ((req->op == RESUME_JOB) && (req->indf_susp)) |
| switch_g_job_resume(req->switch_info, 5); |
| |
| _unlock_suspend_job(req->job_id); |
| |
| if (step_cnt == 0) { |
| debug2("No steps in jobid %u to suspend/resume", |
| req->job_id); |
| } |
| } |
| |
| /* Job shouldn't even be running here, abort it immediately */ |
| static void |
| _rpc_abort_job(slurm_msg_t *msg) |
| { |
| kill_job_msg_t *req = msg->data; |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| job_env_t job_env; |
| |
| debug("_rpc_abort_job, uid = %d", uid); |
| /* |
| * check that requesting user ID is the SLURM UID |
| */ |
| if (!_slurm_authorized_user(uid)) { |
| error("Security violation: abort_job(%u) from uid %d", |
| req->job_id, uid); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return; |
| } |
| |
| task_g_slurmd_release_resources(req->job_id); |
| |
| /* |
| * "revoke" all future credentials for this jobid |
| */ |
| if (slurm_cred_revoke(conf->vctx, req->job_id, req->time, |
| req->start_time) < 0) { |
| debug("revoking cred for job %u: %m", req->job_id); |
| } else { |
| save_cred_state(conf->vctx); |
| debug("credential for job %u revoked", req->job_id); |
| } |
| |
| /* |
| * At this point, if connection still open, we send controller |
| * a "success" reply to indicate that we've recvd the msg. |
| */ |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ("rpc_abort_job: close(%d): %m", msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| |
| if ((xcpu_signal(SIGKILL, req->nodes) + |
| _kill_all_active_steps(req->job_id, SIG_ABORT, true)) ) { |
| /* |
| * Block until all user processes are complete. |
| */ |
| _pause_for_job_completion (req->job_id, req->nodes, 0); |
| } |
| |
| /* |
| * Begin expiration period for cached information about job. |
| * If expiration period has already begun, then do not run |
| * the epilog again, as that script has already been executed |
| * for this job. |
| */ |
| if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) { |
| debug("Not running epilog for jobid %d: %m", req->job_id); |
| return; |
| } |
| |
| save_cred_state(conf->vctx); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = req->job_id; |
| job_env.node_list = req->nodes; |
| job_env.spank_job_env = req->spank_job_env; |
| job_env.spank_job_env_size = req->spank_job_env_size; |
| job_env.uid = req->job_uid; |
| |
| #if defined(HAVE_BG) |
| select_g_select_jobinfo_get(req->select_jobinfo, |
| SELECT_JOBDATA_BLOCK_ID, |
| &job_env.resv_id); |
| #elif defined(HAVE_ALPS_CRAY) |
| job_env.resv_id = select_g_select_jobinfo_xstrdup(req->select_jobinfo, |
| SELECT_PRINT_RESV_ID); |
| #endif |
| |
| _run_epilog(&job_env); |
| |
| if (container_g_delete(req->job_id)) |
| error("container_g_delete(%u): %m", req->job_id); |
| |
| xfree(job_env.resv_id); |
| } |
| |
| /* This is a variant of _rpc_terminate_job for use with select/serial */ |
| static void |
| _rpc_terminate_batch_job(uint32_t job_id, uint32_t user_id, char *node_name) |
| { |
| int rc = SLURM_SUCCESS; |
| int nsteps = 0; |
| int delay; |
| time_t now = time(NULL); |
| slurm_ctl_conf_t *cf; |
| job_env_t job_env; |
| |
| task_g_slurmd_release_resources(job_id); |
| |
| if (_waiter_init(job_id) == SLURM_ERROR) |
| return; |
| |
| /* |
| * "revoke" all future credentials for this jobid |
| */ |
| _note_batch_job_finished(job_id); |
| if (slurm_cred_revoke(conf->vctx, job_id, now, now) < 0) { |
| debug("revoking cred for job %u: %m", job_id); |
| } else { |
| save_cred_state(conf->vctx); |
| debug("credential for job %u revoked", job_id); |
| } |
| |
| /* |
| * Tasks might be stopped (possibly by a debugger) |
| * so send SIGCONT first. |
| */ |
| _kill_all_active_steps(job_id, SIGCONT, true); |
| if (errno == ESLURMD_STEP_SUSPENDED) { |
| /* |
| * If the job step is currently suspended, we don't |
| * bother with a "nice" termination. |
| */ |
| debug2("Job is currently suspended, terminating"); |
| nsteps = _terminate_all_steps(job_id, true); |
| } else { |
| nsteps = _kill_all_active_steps(job_id, SIGTERM, true); |
| } |
| |
| #ifndef HAVE_AIX |
| if ((nsteps == 0) && !conf->epilog) { |
| slurm_cred_begin_expiration(conf->vctx, job_id); |
| save_cred_state(conf->vctx); |
| _waiter_complete(job_id); |
| if (container_g_delete(job_id)) |
| error("container_g_delete(%u): %m", job_id); |
| return; |
| } |
| #endif |
| |
| /* |
| * Check for corpses |
| */ |
| cf = slurm_conf_lock(); |
| delay = MAX(cf->kill_wait, 5); |
| slurm_conf_unlock(); |
| if (!_pause_for_job_completion(job_id, NULL, delay) && |
| _terminate_all_steps(job_id, true) ) { |
| /* |
| * Block until all user processes are complete. |
| */ |
| _pause_for_job_completion(job_id, NULL, 0); |
| } |
| |
| /* |
| * Begin expiration period for cached information about job. |
| * If expiration period has already begun, then do not run |
| * the epilog again, as that script has already been executed |
| * for this job. |
| */ |
| if (slurm_cred_begin_expiration(conf->vctx, job_id) < 0) { |
| debug("Not running epilog for jobid %d: %m", job_id); |
| goto done; |
| } |
| |
| save_cred_state(conf->vctx); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = job_id; |
| job_env.node_list = node_name; |
| job_env.uid = (uid_t)user_id; |
| /* NOTE: We lack the job's SPANK environment variables */ |
| rc = _run_epilog(&job_env); |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] epilog failed status=%d:%d", |
| job_id, exit_status, term_sig); |
| rc = ESLURMD_EPILOG_FAILED; |
| } else |
| debug("completed epilog for jobid %u", job_id); |
| if (container_g_delete(job_id)) |
| error("container_g_delete(%u): %m", job_id); |
| |
| done: |
| _wait_state_completed(job_id, 5); |
| _waiter_complete(job_id); |
| } |
| |
| /* This complete batch RPC came from slurmstepd because we have select/serial |
| * configured. Terminate the job here. Forward the batch completion RPC to |
| * slurmctld and possible get a new batch launch RPC in response. */ |
| static void |
| _rpc_complete_batch(slurm_msg_t *msg) |
| { |
| int i, rc, msg_rc; |
| slurm_msg_t req_msg, resp_msg; |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| complete_batch_script_msg_t *req = msg->data; |
| |
| if (!_slurm_authorized_user(uid)) { |
| error("Security violation: complete_batch(%u) from uid %d", |
| req->job_id, uid); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return; |
| } |
| |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| _rpc_terminate_batch_job(req->job_id, req->user_id, req->node_name); |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type= REQUEST_COMPLETE_BATCH_JOB; |
| req_msg.data = msg->data; |
| for (i = 0; i <= MAX_RETRY; i++) { |
| msg_rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg); |
| if (msg_rc == SLURM_SUCCESS) |
| break; |
| info("Retrying job complete RPC for job %u", req->job_id); |
| sleep(RETRY_DELAY); |
| } |
| if (i > MAX_RETRY) { |
| error("Unable to send job complete message: %m"); |
| return; |
| } |
| |
| if (resp_msg.msg_type == RESPONSE_SLURM_RC) { |
| last_slurmctld_msg = time(NULL); |
| rc = ((return_code_msg_t *) resp_msg.data)->return_code; |
| slurm_free_return_code_msg(resp_msg.data); |
| if (rc) { |
| error("complete_batch for job %u: %s", req->job_id, |
| slurm_strerror(rc)); |
| } |
| return; |
| } |
| |
| if (resp_msg.msg_type != REQUEST_BATCH_JOB_LAUNCH) { |
| error("Invalid response msg_type (%u) to complete_batch RPC " |
| "for job %u", resp_msg.msg_type, req->job_id); |
| return; |
| } |
| |
| /* (resp_msg.msg_type == REQUEST_BATCH_JOB_LAUNCH) */ |
| debug2("Processing RPC: REQUEST_BATCH_JOB_LAUNCH"); |
| last_slurmctld_msg = time(NULL); |
| _rpc_batch_job(&resp_msg, false); |
| slurm_free_job_launch_msg(resp_msg.data); |
| } |
| |
| static void |
| _rpc_terminate_job(slurm_msg_t *msg) |
| { |
| #ifndef HAVE_AIX |
| bool have_spank = false; |
| #endif |
| int rc = SLURM_SUCCESS; |
| kill_job_msg_t *req = msg->data; |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| int nsteps = 0; |
| int delay; |
| job_env_t job_env; |
| |
| debug("_rpc_terminate_job, uid = %d", uid); |
| /* |
| * check that requesting user ID is the SLURM UID |
| */ |
| if (!_slurm_authorized_user(uid)) { |
| error("Security violation: kill_job(%u) from uid %d", |
| req->job_id, uid); |
| if (msg->conn_fd >= 0) |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return; |
| } |
| |
| task_g_slurmd_release_resources(req->job_id); |
| |
| /* |
| * Initialize a "waiter" thread for this jobid. If another |
| * thread is already waiting on termination of this job, |
| * _waiter_init() will return SLURM_ERROR. In this case, just |
| * notify slurmctld that we recvd the message successfully, |
| * then exit this thread. |
| */ |
| if (_waiter_init(req->job_id) == SLURM_ERROR) { |
| if (msg->conn_fd >= 0) { |
| /* No matter if the step hasn't started yet or |
| * not just send a success to let the |
| * controller know we got this request. |
| */ |
| slurm_send_rc_msg (msg, SLURM_SUCCESS); |
| } |
| return; |
| } |
| |
| /* |
| * "revoke" all future credentials for this jobid |
| */ |
| if (slurm_cred_revoke(conf->vctx, req->job_id, req->time, |
| req->start_time) < 0) { |
| debug("revoking cred for job %u: %m", req->job_id); |
| } else { |
| save_cred_state(conf->vctx); |
| debug("credential for job %u revoked", req->job_id); |
| } |
| |
| /* |
| * Before signalling steps, if the job has any steps that are still |
| * in the process of fork/exec/check in with slurmd, wait on a condition |
| * var for the start. Otherwise a slow-starting step can miss the |
| * job termination message and run indefinitely. |
| */ |
| if (_step_is_starting(req->job_id, NO_VAL)) { |
| if (msg->conn_fd >= 0) { |
| /* If the step hasn't started yet just send a |
| * success to let the controller know we got |
| * this request. |
| */ |
| debug("sent SUCCESS, waiting for step to start"); |
| slurm_send_rc_msg (msg, SLURM_SUCCESS); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ( "rpc_kill_job: close(%d): %m", |
| msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| if (_wait_for_starting_step(req->job_id, NO_VAL)) { |
| /* |
| * There's currently no case in which we enter this |
| * error condition. If there was, it's hard to say |
| * whether to to proceed with the job termination. |
| */ |
| error("Error in _wait_for_starting_step"); |
| } |
| } |
| if (IS_JOB_NODE_FAILED(req) || IS_JOB_PENDING(req)) /* requeued */ |
| _kill_all_active_steps(req->job_id, SIG_NODE_FAIL, true); |
| else if (IS_JOB_FAILED(req)) |
| _kill_all_active_steps(req->job_id, SIG_FAILURE, true); |
| |
| /* |
| * Tasks might be stopped (possibly by a debugger) |
| * so send SIGCONT first. |
| */ |
| xcpu_signal(SIGCONT, req->nodes); |
| _kill_all_active_steps(req->job_id, SIGCONT, true); |
| if (errno == ESLURMD_STEP_SUSPENDED) { |
| /* |
| * If the job step is currently suspended, we don't |
| * bother with a "nice" termination. |
| */ |
| debug2("Job is currently suspended, terminating"); |
| nsteps = xcpu_signal(SIGKILL, req->nodes) + |
| _terminate_all_steps(req->job_id, true); |
| } else { |
| nsteps = xcpu_signal(SIGTERM, req->nodes) + |
| _kill_all_active_steps(req->job_id, SIGTERM, true); |
| } |
| |
| #ifndef HAVE_AIX |
| if ((nsteps == 0) && !conf->epilog) { |
| struct stat stat_buf; |
| if (conf->plugstack && (stat(conf->plugstack, &stat_buf) == 0)) |
| have_spank = true; |
| } |
| /* |
| * If there are currently no active job steps and no |
| * configured epilog to run, bypass asynchronous reply and |
| * notify slurmctld that we have already completed this |
| * request. We need to send current switch state on AIX |
| * systems, so this bypass can not be used. |
| */ |
| if ((nsteps == 0) && !conf->epilog && !have_spank) { |
| debug4("sent ALREADY_COMPLETE"); |
| if (msg->conn_fd >= 0) { |
| slurm_send_rc_msg(msg, |
| ESLURMD_KILL_JOB_ALREADY_COMPLETE); |
| } |
| slurm_cred_begin_expiration(conf->vctx, req->job_id); |
| save_cred_state(conf->vctx); |
| _waiter_complete(req->job_id); |
| |
| /* |
| * The controller needs to get MESSAGE_EPILOG_COMPLETE to bring |
| * the job out of "completing" state. Otherwise, the job |
| * could remain "completing" unnecessarily, until the request |
| * to terminate is resent. |
| */ |
| _sync_messages_kill(req); |
| if (msg->conn_fd < 0) { |
| /* The epilog complete message processing on |
| * slurmctld is equivalent to that of a |
| * ESLURMD_KILL_JOB_ALREADY_COMPLETE reply above */ |
| _epilog_complete(req->job_id, rc); |
| } |
| if (container_g_delete(req->job_id)) |
| error("container_g_delete(%u): %m", req->job_id); |
| return; |
| } |
| #endif |
| |
| /* |
| * At this point, if connection still open, we send controller |
| * a "success" reply to indicate that we've recvd the msg. |
| */ |
| if (msg->conn_fd >= 0) { |
| debug4("sent SUCCESS"); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| if (slurm_close_accepted_conn(msg->conn_fd) < 0) |
| error ("rpc_kill_job: close(%d): %m", msg->conn_fd); |
| msg->conn_fd = -1; |
| } |
| |
| /* |
| * Check for corpses |
| */ |
| delay = MAX(conf->kill_wait, 5); |
| if ( !_pause_for_job_completion (req->job_id, req->nodes, delay) && |
| (xcpu_signal(SIGKILL, req->nodes) + |
| _terminate_all_steps(req->job_id, true)) ) { |
| /* |
| * Block until all user processes are complete. |
| */ |
| _pause_for_job_completion (req->job_id, req->nodes, 0); |
| } |
| |
| /* |
| * Begin expiration period for cached information about job. |
| * If expiration period has already begun, then do not run |
| * the epilog again, as that script has already been executed |
| * for this job. |
| */ |
| if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) { |
| debug("Not running epilog for jobid %d: %m", req->job_id); |
| goto done; |
| } |
| |
| save_cred_state(conf->vctx); |
| |
| memset(&job_env, 0, sizeof(job_env_t)); |
| |
| job_env.jobid = req->job_id; |
| job_env.node_list = req->nodes; |
| job_env.spank_job_env = req->spank_job_env; |
| job_env.spank_job_env_size = req->spank_job_env_size; |
| job_env.uid = req->job_uid; |
| |
| #if defined(HAVE_BG) |
| select_g_select_jobinfo_get(req->select_jobinfo, |
| SELECT_JOBDATA_BLOCK_ID, |
| &job_env.resv_id); |
| #elif defined(HAVE_ALPS_CRAY) |
| job_env.resv_id = select_g_select_jobinfo_xstrdup(req->select_jobinfo, |
| SELECT_PRINT_RESV_ID); |
| #endif |
| rc = _run_epilog(&job_env); |
| xfree(job_env.resv_id); |
| |
| if (rc) { |
| int term_sig, exit_status; |
| if (WIFSIGNALED(rc)) { |
| exit_status = 0; |
| term_sig = WTERMSIG(rc); |
| } else { |
| exit_status = WEXITSTATUS(rc); |
| term_sig = 0; |
| } |
| error("[job %u] epilog failed status=%d:%d", |
| req->job_id, exit_status, term_sig); |
| rc = ESLURMD_EPILOG_FAILED; |
| } else |
| debug("completed epilog for jobid %u", req->job_id); |
| if (container_g_delete(req->job_id)) |
| error("container_g_delete(%u): %m", req->job_id); |
| |
| done: |
| _wait_state_completed(req->job_id, 5); |
| _waiter_complete(req->job_id); |
| _sync_messages_kill(req); |
| _epilog_complete(req->job_id, rc); |
| } |
| |
| /* On a parallel job, every slurmd may send the EPILOG_COMPLETE |
| * message to the slurmctld at the same time, resulting in lost |
| * messages. We add a delay here to spead out the message traffic |
| * assuming synchronized clocks across the cluster. |
| * Allow 10 msec processing time in slurmctld for each RPC. */ |
| static void _sync_messages_kill(kill_job_msg_t *req) |
| { |
| int host_cnt, host_inx; |
| char *host; |
| hostset_t hosts; |
| int epilog_msg_time; |
| |
| hosts = hostset_create(req->nodes); |
| host_cnt = hostset_count(hosts); |
| if (host_cnt <= 64) |
| goto fini; |
| if (conf->hostname == NULL) |
| goto fini; /* should never happen */ |
| |
| for (host_inx=0; host_inx<host_cnt; host_inx++) { |
| host = hostset_shift(hosts); |
| if (host == NULL) |
| break; |
| if (strcmp(host, conf->node_name) == 0) { |
| free(host); |
| break; |
| } |
| free(host); |
| } |
| epilog_msg_time = slurm_get_epilog_msg_time(); |
| _delay_rpc(host_inx, host_cnt, epilog_msg_time); |
| |
| fini: hostset_destroy(hosts); |
| } |
| |
| /* Delay a message based upon the host index, total host count and RPC_TIME. |
| * This logic depends upon synchronized clocks across the cluster. */ |
| static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc) |
| { |
| struct timeval tv1; |
| uint32_t cur_time; /* current time in usec (just 9 digits) */ |
| uint32_t tot_time; /* total time expected for all RPCs */ |
| uint32_t offset_time; /* relative time within tot_time */ |
| uint32_t target_time; /* desired time to issue the RPC */ |
| uint32_t delta_time; |
| |
| again: if (gettimeofday(&tv1, NULL)) { |
| usleep(host_inx * usec_per_rpc); |
| return; |
| } |
| |
| cur_time = ((tv1.tv_sec % 1000) * 1000000) + tv1.tv_usec; |
| tot_time = host_cnt * usec_per_rpc; |
| offset_time = cur_time % tot_time; |
| target_time = host_inx * usec_per_rpc; |
| if (target_time < offset_time) |
| delta_time = target_time - offset_time + tot_time; |
| else |
| delta_time = target_time - offset_time; |
| if (usleep(delta_time)) { |
| if (errno == EINVAL) /* usleep for more than 1 sec */ |
| usleep(900000); |
| /* errno == EINTR */ |
| goto again; |
| } |
| } |
| |
| /* |
| * Returns true if "uid" is a "slurm authorized user" - i.e. uid == 0 |
| * or uid == slurm user id at this time. |
| */ |
| static bool |
| _slurm_authorized_user(uid_t uid) |
| { |
| return ((uid == (uid_t) 0) || (uid == conf->slurm_user_id)); |
| } |
| |
| |
| struct waiter { |
| uint32_t jobid; |
| pthread_t thd; |
| }; |
| |
| |
| static struct waiter * |
| _waiter_create(uint32_t jobid) |
| { |
| struct waiter *wp = xmalloc(sizeof(struct waiter)); |
| |
| wp->jobid = jobid; |
| wp->thd = pthread_self(); |
| |
| return wp; |
| } |
| |
| static int _find_waiter(struct waiter *w, uint32_t *jp) |
| { |
| return (w->jobid == *jp); |
| } |
| |
| static void _waiter_destroy(struct waiter *wp) |
| { |
| xfree(wp); |
| } |
| |
| static int _waiter_init (uint32_t jobid) |
| { |
| if (!waiters) |
| waiters = list_create((ListDelF) _waiter_destroy); |
| |
| /* |
| * Exit this thread if another thread is waiting on job |
| */ |
| if (list_find_first (waiters, (ListFindF) _find_waiter, &jobid)) |
| return SLURM_ERROR; |
| else |
| list_append(waiters, _waiter_create(jobid)); |
| |
| return (SLURM_SUCCESS); |
| } |
| |
| static int _waiter_complete (uint32_t jobid) |
| { |
| return (list_delete_all (waiters, (ListFindF) _find_waiter, &jobid)); |
| } |
| |
| /* |
| * Like _wait_for_procs(), but only wait for up to max_time seconds |
| * if max_time == 0, send SIGKILL to tasks repeatedly |
| * |
| * Returns true if all job processes are gone |
| */ |
| static bool |
| _pause_for_job_completion (uint32_t job_id, char *nodes, int max_time) |
| { |
| int sec = 0; |
| int pause = 1; |
| bool rc = false; |
| |
| while ((sec < max_time) || (max_time == 0)) { |
| rc = (_job_still_running (job_id) || |
| xcpu_signal(0, nodes)); |
| if (!rc) |
| break; |
| if ((max_time == 0) && (sec > 1)) { |
| xcpu_signal(SIGKILL, nodes); |
| _terminate_all_steps(job_id, true); |
| } |
| if (sec > 10) { |
| /* Reduce logging frequency about unkillable tasks */ |
| if (max_time) |
| pause = MIN((max_time - sec), 10); |
| else |
| pause = 10; |
| } |
| sleep(pause); |
| sec += pause; |
| } |
| |
| /* |
| * Return true if job is NOT running |
| */ |
| return (!rc); |
| } |
| |
| /* |
| * Does nothing and returns SLURM_SUCCESS (if uid authenticates). |
| * |
| * Timelimit is not currently used in the slurmd or slurmstepd. |
| */ |
| static void |
| _rpc_update_time(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| |
| if ((req_uid != conf->slurm_user_id) && (req_uid != 0)) { |
| rc = ESLURM_USER_ID_MISSING; |
| error("Security violation, uid %d can't update time limit", |
| req_uid); |
| goto done; |
| } |
| |
| /* if (shm_update_job_timelimit(req->job_id, req->expiration_time) < 0) { */ |
| /* error("updating lifetime for job %u: %m", req->job_id); */ |
| /* rc = ESLURM_INVALID_JOB_ID; */ |
| /* } else */ |
| /* debug("reset job %u lifetime", req->job_id); */ |
| |
| done: |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| /* NOTE: call _destroy_env() to free returned value */ |
| static char ** |
| _build_env(job_env_t *job_env) |
| { |
| char **env = xmalloc(sizeof(char *)); |
| bool user_name_set = 0; |
| |
| env[0] = NULL; |
| if (!valid_spank_job_env(job_env->spank_job_env, |
| job_env->spank_job_env_size, |
| job_env->uid)) { |
| /* If SPANK job environment is bad, log it and do not use */ |
| job_env->spank_job_env_size = 0; |
| job_env->spank_job_env = (char **) NULL; |
| } |
| if (job_env->spank_job_env_size) |
| env_array_merge(&env, (const char **) job_env->spank_job_env); |
| |
| slurm_mutex_lock(&conf->config_mutex); |
| setenvf(&env, "SLURMD_NODENAME", "%s", conf->node_name); |
| setenvf(&env, "SLURM_CONF", conf->conffile); |
| slurm_mutex_unlock(&conf->config_mutex); |
| |
| setenvf(&env, "SLURM_CLUSTER_NAME", "%s", conf->cluster_name); |
| setenvf(&env, "SLURM_JOB_ID", "%u", job_env->jobid); |
| setenvf(&env, "SLURM_JOB_UID", "%u", job_env->uid); |
| |
| #ifndef HAVE_NATIVE_CRAY |
| /* uid_to_string on a cray is a heavy call, so try to avoid it */ |
| if (!job_env->user_name) { |
| job_env->user_name = uid_to_string(job_env->uid); |
| user_name_set = 1; |
| } |
| #endif |
| |
| setenvf(&env, "SLURM_JOB_USER", "%s", job_env->user_name); |
| if (user_name_set) |
| xfree(job_env->user_name); |
| |
| setenvf(&env, "SLURM_JOBID", "%u", job_env->jobid); |
| setenvf(&env, "SLURM_UID", "%u", job_env->uid); |
| if (job_env->node_list) |
| setenvf(&env, "SLURM_NODELIST", "%s", job_env->node_list); |
| |
| if (job_env->partition) |
| setenvf(&env, "SLURM_JOB_PARTITION", "%s", job_env->partition); |
| |
| if (job_env->resv_id) { |
| #if defined(HAVE_BG) |
| setenvf(&env, "MPIRUN_PARTITION", "%s", job_env->resv_id); |
| # ifdef HAVE_BGP |
| /* Needed for HTC jobs */ |
| setenvf(&env, "SUBMIT_POOL", "%s", job_env->resv_id); |
| # endif |
| #elif defined(HAVE_ALPS_CRAY) |
| setenvf(&env, "BASIL_RESERVATION_ID", "%s", job_env->resv_id); |
| #endif |
| } |
| return env; |
| } |
| |
| static void |
| _destroy_env(char **env) |
| { |
| int i=0; |
| |
| if (env) { |
| for(i=0; env[i]; i++) { |
| xfree(env[i]); |
| } |
| xfree(env); |
| } |
| |
| return; |
| } |
| |
| static int |
| _run_spank_job_script (const char *mode, char **env, uint32_t job_id, uid_t uid) |
| { |
| pid_t cpid; |
| int status = 0; |
| int pfds[2]; |
| |
| if (pipe (pfds) < 0) { |
| error ("_run_spank_job_script: pipe: %m"); |
| return (-1); |
| } |
| |
| fd_set_close_on_exec (pfds[1]); |
| |
| debug ("Calling %s spank %s", conf->stepd_loc, mode); |
| if ((cpid = fork ()) < 0) { |
| error ("executing spank %s: %m", mode); |
| return (-1); |
| } |
| if (cpid == 0) { |
| /* Run slurmstepd spank [prolog|epilog] */ |
| char *argv[4] = { |
| (char *) conf->stepd_loc, |
| "spank", |
| (char *) mode, |
| NULL }; |
| |
| /* container_g_add_pid needs to be called in the |
| forked process part of the fork to avoid a race |
| condition where if this process makes a file or |
| detacts itself from a child before we add the pid |
| to the container in the parent of the fork. |
| */ |
| if (container_g_add_pid(job_id, getpid(), getuid()) |
| != SLURM_SUCCESS) |
| error("container_g_add_pid(%u): %m", job_id); |
| |
| if (dup2 (pfds[0], STDIN_FILENO) < 0) |
| fatal ("dup2: %m"); |
| #ifdef SETPGRP_TWO_ARGS |
| setpgrp(0, 0); |
| #else |
| setpgrp(); |
| #endif |
| execve (argv[0], argv, env); |
| error ("execve(%s): %m", argv[0]); |
| exit (127); |
| } |
| |
| close (pfds[0]); |
| |
| if (_send_slurmd_conf_lite (pfds[1], conf) < 0) |
| error ("Failed to send slurmd conf to slurmstepd\n"); |
| close (pfds[1]); |
| |
| /* |
| * Wait for up to 120s for all spank plugins to complete: |
| */ |
| if (waitpid_timeout (mode, cpid, &status, 120) < 0) { |
| error ("spank/%s timed out after 120s", mode); |
| return (-1); |
| } |
| |
| if (status) |
| error ("spank/%s returned status 0x%04x", mode, status); |
| |
| /* |
| * No longer need SPANK option env vars in environment |
| */ |
| spank_clear_remote_options_env (env); |
| |
| return (status); |
| } |
| |
| static int _run_job_script(const char *name, const char *path, |
| uint32_t jobid, int timeout, char **env, uid_t uid) |
| { |
| bool have_spank = false; |
| struct stat stat_buf; |
| int status = 0, rc; |
| |
| /* |
| * Always run both spank prolog/epilog and real prolog/epilog script, |
| * even if spank plugins fail. (May want to alter this in the future) |
| * If both "script" mechanisms fail, prefer to return the "real" |
| * prolog/epilog status. |
| */ |
| if (conf->plugstack && (stat(conf->plugstack, &stat_buf) == 0)) |
| have_spank = true; |
| if (have_spank) |
| status = _run_spank_job_script(name, env, jobid, uid); |
| if ((rc = run_script(name, path, jobid, timeout, env, uid))) |
| status = rc; |
| return (status); |
| } |
| |
| #ifdef HAVE_BG |
| /* a slow prolog is expected on bluegene systems */ |
| static int |
| _run_prolog(job_env_t *job_env) |
| { |
| int rc; |
| char *my_prolog; |
| char **my_env; |
| |
| my_env = _build_env(job_env); |
| setenvf(&my_env, "SLURM_STEP_ID", "%u", job_env->step_id); |
| |
| slurm_mutex_lock(&conf->config_mutex); |
| my_prolog = xstrdup(conf->prolog); |
| slurm_mutex_unlock(&conf->config_mutex); |
| |
| rc = _run_job_script("prolog", my_prolog, job_env->jobid, |
| -1, my_env, job_env->uid); |
| _remove_job_running_prolog(job_env->jobid); |
| xfree(my_prolog); |
| _destroy_env(my_env); |
| |
| return rc; |
| } |
| #else |
| static void *_prolog_timer(void *x) |
| { |
| int delay_time, rc = SLURM_SUCCESS; |
| struct timespec abs_time; |
| slurm_msg_t msg; |
| job_notify_msg_t notify_req; |
| char srun_msg[128]; |
| timer_struct_t *timer_struct = (timer_struct_t *) x; |
| |
| delay_time = MAX(2, (timer_struct->msg_timeout - 2)); |
| abs_time.tv_sec = time(NULL) + delay_time; |
| abs_time.tv_nsec = 0; |
| slurm_mutex_lock(timer_struct->timer_mutex); |
| if (!timer_struct->prolog_fini) { |
| rc = pthread_cond_timedwait(timer_struct->timer_cond, |
| timer_struct->timer_mutex, |
| &abs_time); |
| } |
| slurm_mutex_unlock(timer_struct->timer_mutex); |
| |
| if (rc != ETIMEDOUT) |
| return NULL; |
| |
| slurm_msg_t_init(&msg); |
| snprintf(srun_msg, sizeof(srun_msg), "Prolog hung on node %s", |
| conf->node_name); |
| notify_req.job_id = timer_struct->job_id; |
| notify_req.job_step_id = NO_VAL; |
| notify_req.message = srun_msg; |
| msg.msg_type = REQUEST_JOB_NOTIFY; |
| msg.data = ¬ify_req; |
| slurm_send_only_controller_msg(&msg); |
| return NULL; |
| } |
| |
| static int |
| _run_prolog(job_env_t *job_env) |
| { |
| int rc, diff_time; |
| char *my_prolog; |
| time_t start_time = time(NULL); |
| static uint16_t msg_timeout = 0; |
| pthread_t timer_id; |
| pthread_attr_t timer_attr; |
| pthread_cond_t timer_cond = PTHREAD_COND_INITIALIZER; |
| pthread_mutex_t timer_mutex = PTHREAD_MUTEX_INITIALIZER; |
| timer_struct_t timer_struct; |
| bool prolog_fini = false; |
| char **my_env; |
| |
| my_env = _build_env(job_env); |
| setenvf(&my_env, "SLURM_STEP_ID", "%u", job_env->step_id); |
| |
| if (msg_timeout == 0) |
| msg_timeout = slurm_get_msg_timeout(); |
| |
| slurm_mutex_lock(&conf->config_mutex); |
| my_prolog = xstrdup(conf->prolog); |
| slurm_mutex_unlock(&conf->config_mutex); |
| |
| slurm_attr_init(&timer_attr); |
| timer_struct.job_id = job_env->jobid; |
| timer_struct.msg_timeout = msg_timeout; |
| timer_struct.prolog_fini = &prolog_fini; |
| timer_struct.timer_cond = &timer_cond; |
| timer_struct.timer_mutex = &timer_mutex; |
| pthread_create(&timer_id, &timer_attr, &_prolog_timer, &timer_struct); |
| rc = _run_job_script("prolog", my_prolog, job_env->jobid, |
| -1, my_env, job_env->uid); |
| slurm_mutex_lock(&timer_mutex); |
| prolog_fini = true; |
| pthread_cond_broadcast(&timer_cond); |
| slurm_mutex_unlock(&timer_mutex); |
| |
| diff_time = difftime(time(NULL), start_time); |
| if (diff_time >= (msg_timeout / 2)) { |
| info("prolog for job %u ran for %d seconds", |
| job_env->jobid, diff_time); |
| } |
| |
| _remove_job_running_prolog(job_env->jobid); |
| xfree(my_prolog); |
| _destroy_env(my_env); |
| |
| pthread_join(timer_id, NULL); |
| return rc; |
| } |
| #endif |
| |
| static int |
| _run_epilog(job_env_t *job_env) |
| { |
| time_t start_time = time(NULL); |
| static uint16_t msg_timeout = 0; |
| int error_code, diff_time; |
| char *my_epilog; |
| char **my_env = _build_env(job_env); |
| |
| if (msg_timeout == 0) |
| msg_timeout = slurm_get_msg_timeout(); |
| |
| slurm_mutex_lock(&conf->config_mutex); |
| my_epilog = xstrdup(conf->epilog); |
| slurm_mutex_unlock(&conf->config_mutex); |
| |
| _wait_for_job_running_prolog(job_env->jobid); |
| error_code = _run_job_script("epilog", my_epilog, job_env->jobid, |
| -1, my_env, job_env->uid); |
| xfree(my_epilog); |
| _destroy_env(my_env); |
| |
| diff_time = difftime(time(NULL), start_time); |
| if (diff_time >= (msg_timeout / 2)) { |
| info("epilog for job %u ran for %d seconds", |
| job_env->jobid, diff_time); |
| } |
| |
| return error_code; |
| } |
| |
| |
| /**********************************************************************/ |
| /* Because calling initgroups(2) in Linux 2.4/2.6 looks very costly, */ |
| /* we cache the group access list and call setgroups(2). */ |
| /**********************************************************************/ |
| |
| typedef struct gid_cache_s { |
| char *user; |
| gid_t gid; |
| gids_t *gids; |
| struct gid_cache_s *next; |
| } gids_cache_t; |
| |
| #define GIDS_HASH_LEN 64 |
| static gids_cache_t *gids_hashtbl[GIDS_HASH_LEN] = {NULL}; |
| |
| |
| static gids_t * |
| _alloc_gids(int n, gid_t *gids) |
| { |
| gids_t *new; |
| |
| new = (gids_t *)xmalloc(sizeof(gids_t)); |
| new->ngids = n; |
| new->gids = gids; |
| return new; |
| } |
| |
| static void |
| _dealloc_gids(gids_t *p) |
| { |
| xfree(p->gids); |
| xfree(p); |
| } |
| |
| static gids_cache_t * |
| _alloc_gids_cache(char *user, gid_t gid, gids_t *gids, gids_cache_t *next) |
| { |
| gids_cache_t *p; |
| |
| p = (gids_cache_t *)xmalloc(sizeof(gids_cache_t)); |
| p->user = xstrdup(user); |
| p->gid = gid; |
| p->gids = gids; |
| p->next = next; |
| return p; |
| } |
| |
| static void |
| _dealloc_gids_cache(gids_cache_t *p) |
| { |
| xfree(p->user); |
| _dealloc_gids(p->gids); |
| xfree(p); |
| } |
| |
| static int |
| _gids_hashtbl_idx(char *user) |
| { |
| unsigned char *p = (unsigned char *)user; |
| unsigned int x = 0; |
| |
| while (*p) { |
| x += (unsigned int)*p; |
| p++; |
| } |
| return x % GIDS_HASH_LEN; |
| } |
| |
| static void |
| _gids_cache_purge(void) |
| { |
| int i; |
| gids_cache_t *p, *q; |
| |
| for (i=0; i<GIDS_HASH_LEN; i++) { |
| p = gids_hashtbl[i]; |
| while (p) { |
| q = p->next; |
| _dealloc_gids_cache(p); |
| p = q; |
| } |
| gids_hashtbl[i] = NULL; |
| } |
| } |
| |
| static gids_t * |
| _gids_cache_lookup(char *user, gid_t gid) |
| { |
| int idx; |
| gids_cache_t *p; |
| |
| idx = _gids_hashtbl_idx(user); |
| p = gids_hashtbl[idx]; |
| while (p) { |
| if (strcmp(p->user, user) == 0 && p->gid == gid) { |
| return p->gids; |
| } |
| p = p->next; |
| } |
| return NULL; |
| } |
| |
| static void |
| _gids_cache_register(char *user, gid_t gid, gids_t *gids) |
| { |
| int idx; |
| gids_cache_t *p, *q; |
| |
| idx = _gids_hashtbl_idx(user); |
| q = gids_hashtbl[idx]; |
| p = _alloc_gids_cache(user, gid, gids, q); |
| gids_hashtbl[idx] = p; |
| debug2("Cached group access list for %s/%d", user, gid); |
| } |
| |
| static gids_t * |
| _getgroups(void) |
| { |
| int n; |
| gid_t *gg; |
| |
| if ((n = getgroups(0, NULL)) < 0) { |
| error("getgroups:_getgroups: %m"); |
| return NULL; |
| } |
| gg = (gid_t *)xmalloc(n * sizeof(gid_t)); |
| if (getgroups(n, gg) == -1) { |
| error("_getgroups: couldn't get %d groups: %m", n); |
| xfree(gg); |
| return NULL; |
| } |
| return _alloc_gids(n, gg); |
| } |
| |
| extern void |
| destroy_starting_step(void *x) |
| { |
| xfree(x); |
| } |
| |
| |
| |
| extern void |
| init_gids_cache(int cache) |
| { |
| struct passwd *pwd; |
| int ngids; |
| gid_t *orig_gids; |
| gids_t *gids; |
| #ifdef HAVE_AIX |
| FILE *fp = NULL; |
| #elif defined (__APPLE__) || defined (__CYGWIN__) |
| #else |
| struct passwd pw; |
| char buf[BUF_SIZE]; |
| #endif |
| |
| if (!cache) { |
| _gids_cache_purge(); |
| return; |
| } |
| |
| if ((ngids = getgroups(0, NULL)) < 0) { |
| error("getgroups: init_gids_cache: %m"); |
| return; |
| } |
| orig_gids = (gid_t *)xmalloc(ngids * sizeof(gid_t)); |
| if (getgroups(ngids, orig_gids) == -1) { |
| error("init_gids_cache: couldn't get %d groups: %m", ngids); |
| xfree(orig_gids); |
| return; |
| } |
| |
| #ifdef HAVE_AIX |
| setpwent_r(&fp); |
| while (!getpwent_r(&pw, buf, BUF_SIZE, &fp)) { |
| pwd = &pw; |
| #else |
| setpwent(); |
| #if defined (__sun) |
| while ((pwd = getpwent_r(&pw, buf, BUF_SIZE)) != NULL) { |
| #elif defined (__APPLE__) || defined (__CYGWIN__) |
| while ((pwd = getpwent()) != NULL) { |
| #else |
| |
| while (!getpwent_r(&pw, buf, BUF_SIZE, &pwd)) { |
| #endif |
| #endif |
| if (_gids_cache_lookup(pwd->pw_name, pwd->pw_gid)) |
| continue; |
| if (initgroups(pwd->pw_name, pwd->pw_gid)) { |
| if ((errno == EPERM) && (getuid() != (uid_t) 0)) |
| debug("initgroups:init_gids_cache: %m"); |
| else |
| error("initgroups:init_gids_cache: %m"); |
| continue; |
| } |
| if ((gids = _getgroups()) == NULL) |
| continue; |
| _gids_cache_register(pwd->pw_name, pwd->pw_gid, gids); |
| } |
| #ifdef HAVE_AIX |
| endpwent_r(&fp); |
| #else |
| endpwent(); |
| #endif |
| |
| setgroups(ngids, orig_gids); |
| xfree(orig_gids); |
| } |
| |
| |
| static int |
| _add_starting_step(slurmd_step_type_t type, void *req) |
| { |
| starting_step_t *starting_step; |
| int rc = SLURM_SUCCESS; |
| |
| /* Add the step info to a list of starting processes that |
| cannot reliably be contacted. */ |
| slurm_mutex_lock(&conf->starting_steps_lock); |
| starting_step = xmalloc(sizeof(starting_step_t)); |
| if (!starting_step) { |
| error("_add_starting_step failed to allocate memory"); |
| rc = SLURM_FAILURE; |
| goto fail; |
| } |
| switch(type) { |
| case LAUNCH_BATCH_JOB: |
| starting_step->job_id = |
| ((batch_job_launch_msg_t *)req)->job_id; |
| starting_step->step_id = |
| ((batch_job_launch_msg_t *)req)->step_id; |
| break; |
| case LAUNCH_TASKS: |
| starting_step->job_id = |
| ((launch_tasks_request_msg_t *)req)->job_id; |
| starting_step->step_id = |
| ((launch_tasks_request_msg_t *)req)->job_step_id; |
| break; |
| default: |
| error("_add_starting_step called with an invalid type"); |
| rc = SLURM_FAILURE; |
| xfree(starting_step); |
| goto fail; |
| } |
| if (!list_append(conf->starting_steps, starting_step)) { |
| error("_add_starting_step failed to allocate memory for list"); |
| rc = SLURM_FAILURE; |
| xfree(starting_step); |
| goto fail; |
| } |
| |
| fail: |
| slurm_mutex_unlock(&conf->starting_steps_lock); |
| return rc; |
| } |
| |
| |
| static int |
| _remove_starting_step(slurmd_step_type_t type, void *req) |
| { |
| uint32_t job_id, step_id; |
| ListIterator iter; |
| starting_step_t *starting_step; |
| int rc = SLURM_SUCCESS; |
| bool found = false; |
| |
| slurm_mutex_lock(&conf->starting_steps_lock); |
| |
| switch(type) { |
| case LAUNCH_BATCH_JOB: |
| job_id = ((batch_job_launch_msg_t *)req)->job_id; |
| step_id = ((batch_job_launch_msg_t *)req)->step_id; |
| break; |
| case LAUNCH_TASKS: |
| job_id = ((launch_tasks_request_msg_t *)req)->job_id; |
| step_id = ((launch_tasks_request_msg_t *)req)->job_step_id; |
| break; |
| default: |
| error("_remove_starting_step called with an invalid type"); |
| rc = SLURM_FAILURE; |
| goto fail; |
| } |
| |
| iter = list_iterator_create(conf->starting_steps); |
| while ((starting_step = list_next(iter))) { |
| if (starting_step->job_id == job_id && |
| starting_step->step_id == step_id) { |
| starting_step = list_remove(iter); |
| xfree(starting_step); |
| |
| found = true; |
| pthread_cond_broadcast(&conf->starting_steps_cond); |
| break; |
| } |
| } |
| if (!found) { |
| error("_remove_starting_step: step not found"); |
| rc = SLURM_FAILURE; |
| } |
| fail: |
| slurm_mutex_unlock(&conf->starting_steps_lock); |
| return rc; |
| } |
| |
| |
| |
| static int _compare_starting_steps(void *listentry, void *key) |
| { |
| starting_step_t *step0 = (starting_step_t *)listentry; |
| starting_step_t *step1 = (starting_step_t *)key; |
| |
| if (step1->step_id != NO_VAL) |
| return (step0->job_id == step1->job_id && |
| step0->step_id == step1->step_id); |
| else |
| return (step0->job_id == step1->job_id); |
| } |
| |
| |
| /* Wait for a step to get far enough in the launch process to have |
| a socket open, ready to handle RPC calls. Pass step_id = NO_VAL |
| to wait on any step for the given job. */ |
| |
| static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id) |
| { |
| starting_step_t starting_step; |
| starting_step.job_id = job_id; |
| starting_step.step_id = step_id; |
| int num_passes = 0; |
| |
| slurm_mutex_lock(&conf->starting_steps_lock); |
| |
| while (list_find_first( conf->starting_steps, |
| &_compare_starting_steps, |
| &starting_step )) { |
| if (num_passes == 0) { |
| if (step_id != NO_VAL) |
| debug( "Blocked waiting for step %d.%d", |
| job_id, step_id); |
| else |
| debug( "Blocked waiting for job %d, all steps", |
| job_id); |
| } |
| num_passes++; |
| |
| pthread_cond_wait(&conf->starting_steps_cond, |
| &conf->starting_steps_lock); |
| } |
| if (num_passes > 0) { |
| if (step_id != NO_VAL) |
| debug( "Finished wait for step %d.%d", |
| job_id, step_id); |
| else |
| debug( "Finished wait for job %d, all steps", |
| job_id); |
| } |
| slurm_mutex_unlock(&conf->starting_steps_lock); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| |
| /* Return true if the step has not yet confirmed that its socket to |
| handle RPC calls has been created. Pass step_id = NO_VAL |
| to return true if any of the job's steps are still starting. */ |
| static bool _step_is_starting(uint32_t job_id, uint32_t step_id) |
| { |
| starting_step_t starting_step; |
| starting_step.job_id = job_id; |
| starting_step.step_id = step_id; |
| bool ret = false; |
| |
| slurm_mutex_lock(&conf->starting_steps_lock); |
| |
| if (list_find_first( conf->starting_steps, |
| &_compare_starting_steps, |
| &starting_step )) { |
| ret = true; |
| } |
| |
| slurm_mutex_unlock(&conf->starting_steps_lock); |
| return ret; |
| } |
| |
| /* Add this job to the list of jobs currently running their prolog */ |
| static void _add_job_running_prolog(uint32_t job_id) |
| { |
| uint32_t *job_running_prolog; |
| |
| /* Add the job to a list of jobs whose prologs are running */ |
| slurm_mutex_lock(&conf->prolog_running_lock); |
| job_running_prolog = xmalloc(sizeof(uint32_t)); |
| if (!job_running_prolog) { |
| error("_add_job_running_prolog failed to allocate memory"); |
| goto fail; |
| } |
| |
| *job_running_prolog = job_id; |
| if (!list_append(conf->prolog_running_jobs, job_running_prolog)) { |
| error("_add_job_running_prolog failed to append job to list"); |
| xfree(job_running_prolog); |
| } |
| |
| fail: |
| slurm_mutex_unlock(&conf->prolog_running_lock); |
| } |
| |
| /* Remove this job from the list of jobs currently running their prolog */ |
| static void _remove_job_running_prolog(uint32_t job_id) |
| { |
| ListIterator iter; |
| uint32_t *job_running_prolog; |
| bool found = false; |
| |
| slurm_mutex_lock(&conf->prolog_running_lock); |
| |
| iter = list_iterator_create(conf->prolog_running_jobs); |
| while ((job_running_prolog = list_next(iter))) { |
| if (*job_running_prolog == job_id) { |
| job_running_prolog = list_remove(iter); |
| xfree(job_running_prolog); |
| |
| found = true; |
| pthread_cond_broadcast(&conf->prolog_running_cond); |
| break; |
| } |
| } |
| if (!found) |
| error("_remove_job_running_prolog: job not found"); |
| |
| slurm_mutex_unlock(&conf->prolog_running_lock); |
| } |
| |
| static int _compare_job_running_prolog(void *listentry, void *key) |
| { |
| uint32_t *job0 = (uint32_t *)listentry; |
| uint32_t *job1 = (uint32_t *)key; |
| |
| return (*job0 == *job1); |
| } |
| |
| /* Wait for the job's prolog to complete */ |
| static void _wait_for_job_running_prolog(uint32_t job_id) |
| { |
| debug( "Waiting for job %d's prolog to complete", job_id); |
| slurm_mutex_lock(&conf->prolog_running_lock); |
| |
| while (list_find_first( conf->prolog_running_jobs, |
| &_compare_job_running_prolog, |
| &job_id )) { |
| |
| pthread_cond_wait(&conf->prolog_running_cond, |
| &conf->prolog_running_lock); |
| } |
| |
| slurm_mutex_unlock(&conf->prolog_running_lock); |
| debug( "Finished wait for job %d's prolog to complete", job_id); |
| } |
| |
| |
| static void |
| _rpc_forward_data(slurm_msg_t *msg) |
| { |
| forward_data_msg_t *req = (forward_data_msg_t *)msg->data; |
| uint32_t req_uid; |
| struct sockaddr_un sa; |
| int fd = -1, rc; |
| |
| debug3("Entering _rpc_forward_data, address: %s, len: %u", |
| req->address, req->len); |
| |
| /* sanity check */ |
| if (strlen(req->address) > sizeof(sa.sun_path) - 1) { |
| rc = EINVAL; |
| goto done; |
| } |
| |
| /* connect to specified address */ |
| fd = socket(AF_UNIX, SOCK_STREAM, 0); |
| if (fd < 0) { |
| error("failed creating UNIX domain socket: %m"); |
| goto done; |
| } |
| memset(&sa, 0, sizeof(sa)); |
| sa.sun_family = AF_UNIX; |
| strcpy(sa.sun_path, req->address); |
| while ((rc = connect(fd, (struct sockaddr *)&sa, SUN_LEN(&sa)) < 0) && |
| (errno == EINTR)); |
| if (rc < 0) { |
| debug2("failed connecting to specified socket '%s': %m", |
| req->address); |
| goto done; |
| } |
| |
| req_uid = (uint32_t)g_slurm_auth_get_uid(msg->auth_cred, NULL); |
| /* |
| * although always in localhost, we still convert it to network |
| * byte order, to make it consistent with pack/unpack. |
| */ |
| req_uid = htonl(req_uid); |
| safe_write(fd, &req_uid, sizeof(uint32_t)); |
| req_uid = htonl(req->len); |
| safe_write(fd, &req_uid, sizeof(uint32_t)); |
| safe_write(fd, req->data, req->len); |
| |
| rwfail: |
| done: |
| if (fd >= 0) |
| close(fd); |
| rc = errno; |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| |