blob: bc0ade2eac81ef60c04aaa21f1c11823168216ab [file] [log] [blame]
/*****************************************************************************\
* proc_req.c - process incoming messages to slurmctld
*****************************************************************************
* Copyright (C) SchedMD LLC.
* Copyright (C) 2008-2011 Lawrence Livermore National Security.
* Copyright (C) 2002-2007 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette@llnl.gov>, et. al.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#include "slurm/slurm_errno.h"
#include "src/common/assoc_mgr.h"
#include "src/common/cron.h"
#include "src/common/fd.h"
#include "src/common/fetch_config.h"
#include "src/common/group_cache.h"
#include "src/common/hostlist.h"
#include "src/common/id_util.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/node_features.h"
#include "src/common/pack.h"
#include "src/common/persist_conn.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/xstring.h"
#include "src/interfaces/acct_gather.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/burst_buffer.h"
#include "src/interfaces/certmgr.h"
#include "src/interfaces/cgroup.h"
#include "src/interfaces/conn.h"
#include "src/interfaces/cred.h"
#include "src/interfaces/gres.h"
#include "src/interfaces/jobacct_gather.h"
#include "src/interfaces/jobcomp.h"
#include "src/interfaces/mcs.h"
#include "src/interfaces/mpi.h"
#include "src/interfaces/node_features.h"
#include "src/interfaces/preempt.h"
#include "src/interfaces/priority.h"
#include "src/interfaces/sched_plugin.h"
#include "src/interfaces/select.h"
#include "src/interfaces/switch.h"
#include "src/interfaces/topology.h"
#include "src/slurmctld/acct_policy.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/fed_mgr.h"
#include "src/slurmctld/gang.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/licenses.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/node_scheduler.h"
#include "src/slurmctld/power_save.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/read_config.h"
#include "src/slurmctld/reservation.h"
#include "src/slurmctld/rpc_queue.h"
#include "src/slurmctld/sackd_mgr.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/slurmscriptd.h"
#include "src/slurmctld/state_save.h"
#include "src/slurmctld/trigger_mgr.h"
#include "src/stepmgr/srun_comm.h"
#include "src/stepmgr/stepmgr.h"
static pthread_mutex_t rpc_mutex = PTHREAD_MUTEX_INITIALIZER;
#define RPC_TYPE_SIZE 100
static uint16_t rpc_type_id[RPC_TYPE_SIZE] = { 0 };
static uint32_t rpc_type_cnt[RPC_TYPE_SIZE] = { 0 };
static uint64_t rpc_type_time[RPC_TYPE_SIZE] = { 0 };
static uint16_t rpc_type_queued[RPC_TYPE_SIZE] = { 0 };
static uint64_t rpc_type_dropped[RPC_TYPE_SIZE] = { 0 };
static uint16_t rpc_type_cycle_last[RPC_TYPE_SIZE] = { 0 };
static uint16_t rpc_type_cycle_max[RPC_TYPE_SIZE] = { 0 };
#define RPC_USER_SIZE 200
static uint32_t rpc_user_id[RPC_USER_SIZE] = { 0 };
static uint32_t rpc_user_cnt[RPC_USER_SIZE] = { 0 };
static uint64_t rpc_user_time[RPC_USER_SIZE] = { 0 };
static bool do_post_rpc_node_registration = false;
bool running_configless = false;
static pthread_rwlock_t configless_lock = PTHREAD_RWLOCK_INITIALIZER;
static config_response_msg_t *config_for_slurmd = NULL;
static config_response_msg_t *config_for_clients = NULL;
static pthread_mutex_t throttle_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t throttle_cond = PTHREAD_COND_INITIALIZER;
static void _create_het_job_id_set(hostset_t *jobid_hostset,
uint32_t het_job_offset,
char **het_job_id_set);
static void _fill_ctld_conf(slurm_conf_t * build_ptr);
static void _kill_job_on_msg_fail(uint32_t job_id);
static int _is_prolog_finished(uint32_t job_id);
static int _route_msg_to_origin(slurm_msg_t *msg, char *job_id_str,
uint32_t job_id);
static void _throttle_fini(int *active_rpc_cnt);
static void _throttle_start(int *active_rpc_cnt);
extern diag_stats_t slurmctld_diag_stats;
typedef struct {
uid_t request_uid;
uid_t uid;
const char *id;
list_t *step_list;
} find_job_by_container_id_args_t;
typedef struct {
list_t *full_resp_list;
slurm_msg_t *msg;
} foreach_multi_msg_t;
extern void record_rpc_stats(slurm_msg_t *msg, long delta)
{
slurm_mutex_lock(&rpc_mutex);
for (int i = 0; i < RPC_TYPE_SIZE; i++) {
if (rpc_type_id[i] == 0)
rpc_type_id[i] = msg->msg_type;
else if (rpc_type_id[i] != msg->msg_type)
continue;
rpc_type_cnt[i]++;
rpc_type_time[i] += delta;
break;
}
for (int i = 0; i < RPC_USER_SIZE; i++) {
if ((rpc_user_id[i] == 0) && (i != 0))
rpc_user_id[i] = msg->auth_uid;
else if (rpc_user_id[i] != msg->auth_uid)
continue;
rpc_user_cnt[i]++;
rpc_user_time[i] += delta;
break;
}
slurm_mutex_unlock(&rpc_mutex);
}
extern void record_rpc_queue_stats(slurmctld_rpc_t *q)
{
slurm_mutex_lock(&rpc_mutex);
for (int i = 0; i < RPC_TYPE_SIZE; i++) {
if (!rpc_type_id[i])
rpc_type_id[i] = q->msg_type;
else if (rpc_type_id[i] != q->msg_type)
continue;
rpc_type_queued[i] = q->queued;
rpc_type_dropped[i] = q->dropped;
rpc_type_cycle_last[i] = q->cycle_last;
rpc_type_cycle_max[i] = q->cycle_max;
break;
}
slurm_mutex_unlock(&rpc_mutex);
}
/* These functions prevent certain RPCs from keeping the slurmctld write locks
* constantly set, which can prevent other RPCs and system functions from being
* processed. For example, a steady stream of batch submissions can prevent
* squeue from responding or jobs from being scheduled. */
static void _throttle_start(int *active_rpc_cnt)
{
slurm_mutex_lock(&throttle_mutex);
while (1) {
if (*active_rpc_cnt == 0) {
(*active_rpc_cnt)++;
break;
}
#if 1
slurm_cond_wait(&throttle_cond, &throttle_mutex);
#else
/* While an RPC is being throttled due to a running RPC of the
* same type, do not count that thread against the daemon's
* thread limit. In extreme environments, this logic can result
* in the slurmctld spawning so many pthreads that it exhausts
* system resources and fails. */
server_thread_decr();
slurm_cond_wait(&throttle_cond, &throttle_mutex);
server_thread_incr();
#endif
}
slurm_mutex_unlock(&throttle_mutex);
if (LOTS_OF_AGENTS)
usleep(1000);
else
usleep(1);
}
static void _throttle_fini(int *active_rpc_cnt)
{
slurm_mutex_lock(&throttle_mutex);
(*active_rpc_cnt)--;
slurm_cond_broadcast(&throttle_cond);
slurm_mutex_unlock(&throttle_mutex);
}
/*
* _fill_ctld_conf - make a copy of current slurm configuration
* this is done with locks set so the data can change at other times
* OUT conf_ptr - place to copy configuration to
*/
static void _fill_ctld_conf(slurm_conf_t *conf_ptr)
{
slurm_conf_t *conf = &slurm_conf;
uint32_t next_job_id;
xassert(verify_lock(CONF_LOCK, READ_LOCK));
xassert(verify_lock(JOB_LOCK, READ_LOCK));
xassert(verify_lock(PART_LOCK, READ_LOCK));
xassert(verify_lock(FED_LOCK, READ_LOCK));
next_job_id = get_next_job_id(true);
memset(conf_ptr, 0, sizeof(*conf_ptr));
conf_ptr->last_update = time(NULL);
conf_ptr->accounting_storage_enforce =
conf->accounting_storage_enforce;
conf_ptr->accounting_storage_host =
xstrdup(conf->accounting_storage_host);
conf_ptr->accounting_storage_ext_host =
xstrdup(conf->accounting_storage_ext_host);
conf_ptr->accounting_storage_backup_host =
xstrdup(conf->accounting_storage_backup_host);
conf_ptr->accounting_storage_params =
xstrdup(conf->accounting_storage_params);
conf_ptr->accounting_storage_port = conf->accounting_storage_port;
conf_ptr->accounting_storage_tres =
xstrdup(conf->accounting_storage_tres);
conf_ptr->accounting_storage_type =
xstrdup(conf->accounting_storage_type);
conf_ptr->acct_gather_conf = acct_gather_conf_values();
conf_ptr->acct_gather_energy_type =
xstrdup(conf->acct_gather_energy_type);
conf_ptr->acct_gather_filesystem_type =
xstrdup(conf->acct_gather_filesystem_type);
conf_ptr->acct_gather_interconnect_type =
xstrdup(conf->acct_gather_interconnect_type);
conf_ptr->acct_gather_profile_type =
xstrdup(conf->acct_gather_profile_type);
conf_ptr->acct_gather_node_freq = conf->acct_gather_node_freq;
conf_ptr->authinfo = xstrdup(conf->authinfo);
conf_ptr->authtype = xstrdup(conf->authtype);
conf_ptr->authalttypes = xstrdup(conf->authalttypes);
conf_ptr->authalt_params = xstrdup(conf->authalt_params);
conf_ptr->batch_start_timeout = conf->batch_start_timeout;
conf_ptr->boot_time = slurmctld_config.boot_time;
conf_ptr->bb_type = xstrdup(conf->bb_type);
conf_ptr->bcast_exclude = xstrdup(conf->bcast_exclude);
conf_ptr->bcast_parameters = xstrdup(conf->bcast_parameters);
conf_ptr->certmgr_params = xstrdup(conf->certmgr_params);
conf_ptr->certmgr_type = xstrdup(conf->certmgr_type);
if (xstrstr(conf->job_acct_gather_type, "cgroup") ||
xstrstr(conf->proctrack_type, "cgroup") ||
xstrstr(conf->task_plugin, "cgroup"))
conf_ptr->cgroup_conf = cgroup_get_conf_list();
conf_ptr->cli_filter_params = xstrdup(conf->cli_filter_params);
conf_ptr->cli_filter_plugins = xstrdup(conf->cli_filter_plugins);
conf_ptr->cluster_name = xstrdup(conf->cluster_name);
conf_ptr->comm_params = xstrdup(conf->comm_params);
conf_ptr->complete_wait = conf->complete_wait;
conf_ptr->conf_flags = conf->conf_flags;
conf_ptr->control_cnt = conf->control_cnt;
conf_ptr->control_addr = xcalloc(conf->control_cnt + 1, sizeof(char *));
conf_ptr->control_machine = xcalloc(conf->control_cnt + 1,
sizeof(char *));
for (int i = 0; i < conf_ptr->control_cnt; i++) {
conf_ptr->control_addr[i] = xstrdup(conf->control_addr[i]);
conf_ptr->control_machine[i] =
xstrdup(conf->control_machine[i]);
}
conf_ptr->cpu_freq_def = conf->cpu_freq_def;
conf_ptr->cpu_freq_govs = conf->cpu_freq_govs;
conf_ptr->cred_type = xstrdup(conf->cred_type);
conf_ptr->data_parser_parameters =
xstrdup(conf->data_parser_parameters);
conf_ptr->def_mem_per_cpu = conf->def_mem_per_cpu;
conf_ptr->debug_flags = conf->debug_flags;
conf_ptr->dependency_params = xstrdup(conf->dependency_params);
conf_ptr->eio_timeout = conf->eio_timeout;
conf_ptr->enforce_part_limits = conf->enforce_part_limits;
conf_ptr->epilog_cnt = conf->epilog_cnt;
conf_ptr->epilog = xcalloc(conf->epilog_cnt, sizeof(char *));
for (int i = 0; i < conf_ptr->epilog_cnt; i++)
conf_ptr->epilog[i] = xstrdup(conf->epilog[i]);
conf_ptr->epilog_msg_time = conf->epilog_msg_time;
conf_ptr->epilog_slurmctld_cnt = conf->epilog_slurmctld_cnt;
conf_ptr->epilog_slurmctld = xcalloc(conf->epilog_slurmctld_cnt,
sizeof(char *));
for (int i = 0; i < conf_ptr->epilog_slurmctld_cnt; i++)
conf_ptr->epilog_slurmctld[i] =
xstrdup(conf->epilog_slurmctld[i]);
conf_ptr->epilog_timeout = conf->epilog_timeout;
conf_ptr->fed_params = xstrdup(conf->fed_params);
conf_ptr->first_job_id = conf->first_job_id;
conf_ptr->fs_dampening_factor = conf->fs_dampening_factor;
conf_ptr->gres_plugins = xstrdup(conf->gres_plugins);
conf_ptr->group_time = conf->group_time;
conf_ptr->group_force = conf->group_force;
conf_ptr->gpu_freq_def = xstrdup(conf->gpu_freq_def);
conf_ptr->inactive_limit = conf->inactive_limit;
conf_ptr->interactive_step_opts = xstrdup(conf->interactive_step_opts);
conf_ptr->hash_plugin = xstrdup(conf->hash_plugin);
conf_ptr->hash_val = conf->hash_val;
conf_ptr->health_check_interval = conf->health_check_interval;
conf_ptr->health_check_node_state = conf->health_check_node_state;
conf_ptr->health_check_program = xstrdup(conf->health_check_program);
conf_ptr->http_parser_type = xstrdup(conf->http_parser_type);
conf_ptr->job_acct_gather_freq = xstrdup(conf->job_acct_gather_freq);
conf_ptr->job_acct_gather_type = xstrdup(conf->job_acct_gather_type);
conf_ptr->job_acct_gather_params= xstrdup(conf->job_acct_gather_params);
conf_ptr->job_acct_oom_kill = conf->job_acct_oom_kill;
conf_ptr->job_comp_host = xstrdup(conf->job_comp_host);
conf_ptr->job_comp_loc = xstrdup(conf->job_comp_loc);
conf_ptr->job_comp_params = xstrdup(conf->job_comp_params);
conf_ptr->job_comp_port = conf->job_comp_port;
conf_ptr->job_comp_type = xstrdup(conf->job_comp_type);
conf_ptr->job_comp_user = xstrdup(conf->job_comp_user);
conf_ptr->job_container_plugin = xstrdup(conf->job_container_plugin);
conf_ptr->job_defaults_list =
job_defaults_copy(conf->job_defaults_list);
conf_ptr->job_file_append = conf->job_file_append;
conf_ptr->job_requeue = conf->job_requeue;
conf_ptr->job_submit_plugins = xstrdup(conf->job_submit_plugins);
conf_ptr->keepalive_time = conf->keepalive_time;
conf_ptr->kill_wait = conf->kill_wait;
conf_ptr->kill_on_bad_exit = conf->kill_on_bad_exit;
conf_ptr->launch_params = xstrdup(conf->launch_params);
conf_ptr->licenses = xstrdup(conf->licenses);
conf_ptr->log_fmt = conf->log_fmt;
conf_ptr->mail_domain = xstrdup(conf->mail_domain);
conf_ptr->mail_prog = xstrdup(conf->mail_prog);
conf_ptr->max_array_sz = conf->max_array_sz;
conf_ptr->max_batch_requeue = conf->max_batch_requeue;
conf_ptr->max_dbd_msgs = conf->max_dbd_msgs;
conf_ptr->max_job_cnt = conf->max_job_cnt;
conf_ptr->max_job_id = conf->max_job_id;
conf_ptr->max_mem_per_cpu = conf->max_mem_per_cpu;
conf_ptr->max_node_cnt = conf->max_node_cnt;
conf_ptr->max_step_cnt = conf->max_step_cnt;
conf_ptr->max_tasks_per_node = conf->max_tasks_per_node;
conf_ptr->mcs_plugin = xstrdup(conf->mcs_plugin);
conf_ptr->mcs_plugin_params = xstrdup(conf->mcs_plugin_params);
conf_ptr->min_job_age = conf->min_job_age;
conf_ptr->mpi_conf = mpi_g_conf_get_printable();
conf_ptr->mpi_default = xstrdup(conf->mpi_default);
conf_ptr->mpi_params = xstrdup(conf->mpi_params);
conf_ptr->msg_timeout = conf->msg_timeout;
conf_ptr->next_job_id = next_job_id;
conf_ptr->node_features_conf = node_features_g_get_config();
conf_ptr->node_features_plugins = xstrdup(conf->node_features_plugins);
conf_ptr->over_time_limit = conf->over_time_limit;
conf_ptr->plugindir = xstrdup(conf->plugindir);
conf_ptr->plugstack = xstrdup(conf->plugstack);
conf_ptr->preempt_mode = conf->preempt_mode;
conf_ptr->preempt_params = xstrdup(conf->preempt_params);
conf_ptr->preempt_type = xstrdup(conf->preempt_type);
conf_ptr->preempt_exempt_time = conf->preempt_exempt_time;
conf_ptr->prep_params = xstrdup(conf->prep_params);
conf_ptr->prep_plugins = xstrdup(conf->prep_plugins);
conf_ptr->priority_decay_hl = conf->priority_decay_hl;
conf_ptr->priority_calc_period = conf->priority_calc_period;
conf_ptr->priority_favor_small= conf->priority_favor_small;
conf_ptr->priority_flags = conf->priority_flags;
conf_ptr->priority_max_age = conf->priority_max_age;
conf_ptr->priority_params = xstrdup(conf->priority_params);
conf_ptr->priority_reset_period = conf->priority_reset_period;
conf_ptr->priority_type = xstrdup(conf->priority_type);
conf_ptr->priority_weight_age = conf->priority_weight_age;
conf_ptr->priority_weight_assoc = conf->priority_weight_assoc;
conf_ptr->priority_weight_fs = conf->priority_weight_fs;
conf_ptr->priority_weight_js = conf->priority_weight_js;
conf_ptr->priority_weight_part= conf->priority_weight_part;
conf_ptr->priority_weight_qos = conf->priority_weight_qos;
conf_ptr->priority_weight_tres = xstrdup(conf->priority_weight_tres);
conf_ptr->private_data = conf->private_data;
conf_ptr->proctrack_type = xstrdup(conf->proctrack_type);
conf_ptr->prolog_cnt = conf->prolog_cnt;
conf_ptr->prolog = xcalloc(conf->prolog_cnt, sizeof(char *));
for (int i = 0; i < conf_ptr->prolog_cnt; i++)
conf_ptr->prolog[i] = xstrdup(conf->prolog[i]);
conf_ptr->prolog_slurmctld_cnt = conf->prolog_slurmctld_cnt;
conf_ptr->prolog_slurmctld = xcalloc(conf->prolog_slurmctld_cnt,
sizeof(char *));
for (int i = 0; i < conf_ptr->prolog_slurmctld_cnt; i++)
conf_ptr->prolog_slurmctld[i] =
xstrdup(conf->prolog_slurmctld[i]);
conf_ptr->prolog_timeout = conf->prolog_timeout;
conf_ptr->prolog_flags = conf->prolog_flags;
conf_ptr->propagate_prio_process = slurm_conf.propagate_prio_process;
conf_ptr->propagate_rlimits = xstrdup(conf->propagate_rlimits);
conf_ptr->propagate_rlimits_except = xstrdup(conf->
propagate_rlimits_except);
conf_ptr->reboot_program = xstrdup(conf->reboot_program);
conf_ptr->reconfig_flags = conf->reconfig_flags;
conf_ptr->requeue_exit = xstrdup(conf->requeue_exit);
conf_ptr->requeue_exit_hold = xstrdup(conf->requeue_exit_hold);
conf_ptr->resume_fail_program = xstrdup(conf->resume_fail_program);
conf_ptr->resume_program = xstrdup(conf->resume_program);
conf_ptr->resume_rate = conf->resume_rate;
conf_ptr->resume_timeout = conf->resume_timeout;
conf_ptr->resv_epilog = xstrdup(conf->resv_epilog);
conf_ptr->resv_over_run = conf->resv_over_run;
conf_ptr->resv_prolog = xstrdup(conf->resv_prolog);
conf_ptr->ret2service = conf->ret2service;
conf_ptr->sched_params = xstrdup(conf->sched_params);
conf_ptr->sched_logfile = xstrdup(conf->sched_logfile);
conf_ptr->sched_log_level = conf->sched_log_level;
conf_ptr->sched_time_slice = conf->sched_time_slice;
conf_ptr->schedtype = xstrdup(conf->schedtype);
conf_ptr->scron_params = xstrdup(conf->scron_params);
conf_ptr->select_type = xstrdup(conf->select_type);
conf_ptr->select_type_param = conf->select_type_param;
conf_ptr->site_factor_params = xstrdup(conf->site_factor_params);
conf_ptr->site_factor_plugin = xstrdup(conf->site_factor_plugin);
conf_ptr->slurm_user_id = conf->slurm_user_id;
conf_ptr->slurm_user_name = xstrdup(conf->slurm_user_name);
conf_ptr->slurmctld_addr = xstrdup(conf->slurmctld_addr);
conf_ptr->slurmctld_debug = conf->slurmctld_debug;
conf_ptr->slurmctld_logfile = xstrdup(conf->slurmctld_logfile);
conf_ptr->slurmctld_params = xstrdup(conf->slurmctld_params);
conf_ptr->slurmctld_pidfile = xstrdup(conf->slurmctld_pidfile);
conf_ptr->slurmctld_port = conf->slurmctld_port;
conf_ptr->slurmctld_port_count = conf->slurmctld_port_count;
conf_ptr->slurmctld_primary_off_prog =
xstrdup(conf->slurmctld_primary_off_prog);
conf_ptr->slurmctld_primary_on_prog =
xstrdup(conf->slurmctld_primary_on_prog);
conf_ptr->slurmctld_syslog_debug = conf->slurmctld_syslog_debug;
conf_ptr->slurmctld_timeout = conf->slurmctld_timeout;
conf_ptr->slurmd_debug = conf->slurmd_debug;
conf_ptr->slurmd_logfile = xstrdup(conf->slurmd_logfile);
conf_ptr->slurmd_params = xstrdup(conf->slurmd_params);
conf_ptr->slurmd_pidfile = xstrdup(conf->slurmd_pidfile);
conf_ptr->slurmd_port = conf->slurmd_port;
conf_ptr->slurmd_spooldir = xstrdup(conf->slurmd_spooldir);
conf_ptr->slurmd_syslog_debug = conf->slurmd_syslog_debug;
conf_ptr->slurmd_timeout = conf->slurmd_timeout;
conf_ptr->slurmd_user_id = conf->slurmd_user_id;
conf_ptr->slurmd_user_name = xstrdup(conf->slurmd_user_name);
conf_ptr->slurm_conf = xstrdup(conf->slurm_conf);
conf_ptr->srun_epilog = xstrdup(conf->srun_epilog);
conf_ptr->srun_port_range = xmalloc(2 * sizeof(uint16_t));
if (conf->srun_port_range) {
conf_ptr->srun_port_range[0] = conf->srun_port_range[0];
conf_ptr->srun_port_range[1] = conf->srun_port_range[1];
} else {
conf_ptr->srun_port_range[0] = 0;
conf_ptr->srun_port_range[1] = 0;
}
conf_ptr->srun_prolog = xstrdup(conf->srun_prolog);
conf_ptr->state_save_location = xstrdup(conf->state_save_location);
conf_ptr->suspend_exc_nodes = xstrdup(conf->suspend_exc_nodes);
conf_ptr->suspend_exc_parts = xstrdup(conf->suspend_exc_parts);
conf_ptr->suspend_exc_states = xstrdup(conf->suspend_exc_states);
conf_ptr->suspend_program = xstrdup(conf->suspend_program);
conf_ptr->suspend_rate = conf->suspend_rate;
conf_ptr->suspend_time = conf->suspend_time;
conf_ptr->suspend_timeout = conf->suspend_timeout;
conf_ptr->switch_param = xstrdup(conf->switch_param);
conf_ptr->switch_type = xstrdup(conf->switch_type);
conf_ptr->task_epilog = xstrdup(conf->task_epilog);
conf_ptr->task_prolog = xstrdup(conf->task_prolog);
conf_ptr->task_plugin = xstrdup(conf->task_plugin);
conf_ptr->task_plugin_param = conf->task_plugin_param;
conf_ptr->tcp_timeout = conf->tcp_timeout;
conf_ptr->tls_params = xstrdup(conf->tls_params);
conf_ptr->tls_type = xstrdup(conf->tls_type);
conf_ptr->tmp_fs = xstrdup(conf->tmp_fs);
conf_ptr->topology_param = xstrdup(conf->topology_param);
conf_ptr->topology_plugin = xstrdup(conf->topology_plugin);
conf_ptr->tree_width = conf->tree_width;
conf_ptr->wait_time = conf->wait_time;
conf_ptr->unkillable_program = xstrdup(conf->unkillable_program);
conf_ptr->unkillable_timeout = conf->unkillable_timeout;
conf_ptr->url_parser_type = xstrdup(conf->url_parser_type);
conf_ptr->version = xstrdup(SLURM_VERSION_STRING);
conf_ptr->vsize_factor = conf->vsize_factor;
conf_ptr->x11_params = xstrdup(conf->x11_params);
}
/*
* validate_super_user - validate that the uid is authorized at the
* root, SlurmUser, or SLURMDB_ADMIN_SUPER_USER level
* IN uid - user to validate
* RET true if permitted to run, false otherwise
*/
extern bool validate_super_user(uid_t uid)
{
if ((uid == 0) || (uid == slurm_conf.slurm_user_id) ||
assoc_mgr_get_admin_level(acct_db_conn, uid) >=
SLURMDB_ADMIN_SUPER_USER)
return true;
else
return false;
}
/*
* validate_operator - validate that the uid is authorized at the
* root, SlurmUser, or SLURMDB_ADMIN_OPERATOR level
* IN uid - user to validate
* RET true if permitted to run, false otherwise
*/
static bool _validate_operator_internal(uid_t uid, bool locked)
{
slurmdb_admin_level_t level;
if ((uid == 0) || (uid == slurm_conf.slurm_user_id))
return true;
if (locked)
level = assoc_mgr_get_admin_level_locked(acct_db_conn, uid);
else
level = assoc_mgr_get_admin_level(acct_db_conn, uid);
if (level >= SLURMDB_ADMIN_OPERATOR)
return true;
return false;
}
extern bool validate_operator(uid_t uid)
{
return _validate_operator_internal(uid, false);
}
extern bool validate_operator_locked(uid_t uid)
{
return _validate_operator_internal(uid, true);
}
extern bool validate_operator_user_rec(slurmdb_user_rec_t *user)
{
if ((user->uid == 0) ||
(user->uid == slurm_conf.slurm_user_id) ||
(user->admin_level >= SLURMDB_ADMIN_OPERATOR))
return true;
else
return false;
}
static void _set_identity(slurm_msg_t *msg, void **id)
{
static bool set = false, use_client_ids = false;
if (!set) {
if (xstrstr(slurm_conf.authinfo, "use_client_ids"))
use_client_ids = true;
set = true;
}
if (!use_client_ids)
return;
*id = (void *) auth_g_get_identity(msg->auth_cred);
}
static void _set_hostname(slurm_msg_t *msg, char **alloc_node)
{
xfree(*alloc_node);
*alloc_node = auth_g_get_host(msg);
}
static int _valid_id(char *caller, job_desc_msg_t *msg, uid_t uid, gid_t gid,
uint16_t protocol_version)
{
if ((msg->user_id == NO_VAL) || (msg->group_id == NO_VAL)) {
/*
* Catch and reject NO_VAL.
*/
error("%s: rejecting requested UID=NO_VAL or GID=NO_VAL as invalid",
caller);
return ESLURM_USER_ID_MISSING;
}
/*
* If UID/GID not given use the authenticated values.
*/
if (msg->user_id == SLURM_AUTH_NOBODY)
msg->user_id = uid;
if (msg->group_id == SLURM_AUTH_NOBODY)
msg->group_id = gid;
if (validate_slurm_user(uid))
return SLURM_SUCCESS;
if (uid != msg->user_id) {
error("%s: Requested UID=%u doesn't match user UID=%u.",
caller, msg->user_id, uid);
return ESLURM_USER_ID_MISSING;
}
/* if GID not given, then use GID from auth */
if (gid != msg->group_id) {
error("%s: Requested GID=%u doesn't match user GID=%u.",
caller, msg->group_id, gid);
return ESLURM_GROUP_ID_MISSING;
}
return SLURM_SUCCESS;
}
extern void configless_update(void)
{
if (!xstrcasestr(slurm_conf.slurmctld_params, "enable_configless"))
return;
grab_include_directives();
running_configless = true;
slurm_rwlock_wrlock(&configless_lock);
slurm_free_config_response_msg(config_for_slurmd);
slurm_free_config_response_msg(config_for_clients);
config_for_slurmd = new_config_response(true);
config_for_slurmd->slurmd_spooldir = xstrdup(slurm_conf.slurmd_spooldir);
config_for_clients = new_config_response(false);
slurm_rwlock_unlock(&configless_lock);
}
extern void configless_clear(void)
{
slurm_rwlock_wrlock(&configless_lock);
slurm_free_config_response_msg(config_for_slurmd);
slurm_free_config_response_msg(config_for_clients);
FREE_NULL_LIST(conf_includes_list);
slurm_rwlock_unlock(&configless_lock);
}
/* _kill_job_on_msg_fail - The request to create a job record succeeded,
* but the reply message to srun failed. We kill the job to avoid
* leaving it orphaned */
static void _kill_job_on_msg_fail(uint32_t job_id)
{
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
error("Job allocate response msg send failure, killing JobId=%u",
job_id);
lock_slurmctld(job_write_lock);
job_complete(job_id, slurm_conf.slurm_user_id, false, false, SIGTERM);
unlock_slurmctld(job_write_lock);
}
static int _het_job_cancel(void *x, void *arg)
{
job_record_t *job_ptr = (job_record_t *) x;
time_t now = time(NULL);
info("Cancelling aborted hetjob submit: %pJ", job_ptr);
job_state_set(job_ptr, JOB_CANCELLED);
job_ptr->start_time = now;
job_ptr->end_time = now;
job_ptr->exit_code = 1;
job_completion_logger(job_ptr, false);
fed_mgr_job_complete(job_ptr, 0, now);
return 0;
}
/*
* build_alloc_msg - Fill in resource_allocation_response_msg_t off job_record.
* job_ptr IN - job_record to copy members off.
* error_code IN - error code used for the response.
* job_submit_user_msg IN - user message from job submit plugin.
* RET resource_allocation_response_msg_t filled in.
*/
extern resource_allocation_response_msg_t *build_alloc_msg(
job_record_t *job_ptr, int error_code, char *job_submit_user_msg)
{
int i;
resource_allocation_response_msg_t *alloc_msg =
xmalloc(sizeof(resource_allocation_response_msg_t));
/* send job_ID and node_name_ptr */
if (job_ptr->job_resrcs && job_ptr->job_resrcs->cpu_array_cnt) {
alloc_msg->num_cpu_groups = job_ptr->job_resrcs->cpu_array_cnt;
alloc_msg->cpu_count_reps = xmalloc(sizeof(uint32_t) *
job_ptr->job_resrcs->
cpu_array_cnt);
memcpy(alloc_msg->cpu_count_reps,
job_ptr->job_resrcs->cpu_array_reps,
(sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt));
alloc_msg->cpus_per_node = xmalloc(sizeof(uint16_t) *
job_ptr->job_resrcs->
cpu_array_cnt);
memcpy(alloc_msg->cpus_per_node,
job_ptr->job_resrcs->cpu_array_value,
(sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt));
}
alloc_msg->error_code = error_code;
alloc_msg->job_submit_user_msg = xstrdup(job_submit_user_msg);
alloc_msg->job_id = job_ptr->job_id;
alloc_msg->node_cnt = job_ptr->node_cnt;
alloc_msg->node_list = xstrdup(job_ptr->nodes);
if (job_ptr->part_ptr)
alloc_msg->partition = xstrdup(job_ptr->part_ptr->name);
else
alloc_msg->partition = xstrdup(job_ptr->partition);
alloc_msg->batch_host = xstrdup(job_ptr->batch_host);
if (job_ptr->details) {
if (job_ptr->bit_flags & JOB_MEM_SET) {
alloc_msg->pn_min_memory =
job_ptr->details->pn_min_memory;
}
alloc_msg->cpu_freq_min = job_ptr->details->cpu_freq_min;
alloc_msg->cpu_freq_max = job_ptr->details->cpu_freq_max;
alloc_msg->cpu_freq_gov = job_ptr->details->cpu_freq_gov;
alloc_msg->ntasks_per_tres = job_ptr->details->ntasks_per_tres;
alloc_msg->segment_size = job_ptr->details->segment_size;
if (job_ptr->details->mc_ptr) {
alloc_msg->ntasks_per_board =
job_ptr->details->mc_ptr->ntasks_per_board;
alloc_msg->ntasks_per_core =
job_ptr->details->mc_ptr->ntasks_per_core;
alloc_msg->ntasks_per_socket =
job_ptr->details->mc_ptr->ntasks_per_socket;
}
if (job_ptr->details->env_cnt) {
alloc_msg->env_size = job_ptr->details->env_cnt;
alloc_msg->environment =
xcalloc(alloc_msg->env_size + 1,
sizeof(char *));
for (i = 0; i < alloc_msg->env_size; i++) {
alloc_msg->environment[i] =
xstrdup(job_ptr->details->env_sup[i]);
}
}
if (job_ptr->bit_flags & STEPMGR_ENABLED) {
env_array_overwrite(&alloc_msg->environment,
"SLURM_STEPMGR",
job_ptr->batch_host);
alloc_msg->env_size =
PTR_ARRAY_SIZE(alloc_msg->environment) - 1;
}
} else {
/* alloc_msg->pn_min_memory = 0; */
alloc_msg->ntasks_per_board = NO_VAL16;
alloc_msg->ntasks_per_core = NO_VAL16;
alloc_msg->ntasks_per_tres = NO_VAL16;
alloc_msg->ntasks_per_socket = NO_VAL16;
}
if (job_ptr->account)
alloc_msg->account = xstrdup(job_ptr->account);
if (job_ptr->qos_ptr) {
slurmdb_qos_rec_t *qos;
qos = (slurmdb_qos_rec_t *)job_ptr->qos_ptr;
alloc_msg->qos = xstrdup(qos->name);
}
if (job_ptr->resv_name)
alloc_msg->resv_name = xstrdup(job_ptr->resv_name);
set_remote_working_response(alloc_msg, job_ptr,
job_ptr->origin_cluster);
alloc_msg->tres_per_node = xstrdup(job_ptr->tres_per_node);
alloc_msg->tres_per_task = xstrdup(job_ptr->tres_per_task);
alloc_msg->uid = job_ptr->user_id;
alloc_msg->user_name = user_from_job(job_ptr);
alloc_msg->gid = job_ptr->group_id;
alloc_msg->group_name = group_from_job(job_ptr);
return alloc_msg;
}
static void _del_alloc_het_job_msg(void *x)
{
resource_allocation_response_msg_t *alloc_msg;
alloc_msg = (resource_allocation_response_msg_t *) x;
/* NULL out working_cluster_rec since it's pointing to global memory */
alloc_msg->working_cluster_rec = NULL;
slurm_free_resource_allocation_response_msg(alloc_msg);
}
static bool _sched_backfill(void)
{
static int backfill = -1;
if (backfill == -1) {
if (!xstrcmp(slurm_conf.schedtype, "sched/backfill"))
backfill = 1;
else
backfill = 0;
}
if (backfill)
return true;
return false;
}
/*
* If any job component has required nodes, those nodes must be excluded
* from all other components to avoid scheduling deadlock
*/
static void _exclude_het_job_nodes(list_t *job_req_list)
{
job_desc_msg_t *job_desc_msg;
list_itr_t *iter;
int het_job_cnt, req_cnt = 0, i;
char **req_nodes, *sep;
het_job_cnt = list_count(job_req_list);
req_nodes = xmalloc(sizeof(char *) * het_job_cnt);
iter = list_iterator_create(job_req_list);
while ((job_desc_msg = list_next(iter))) {
if (!job_desc_msg->req_nodes || !job_desc_msg->req_nodes[0])
continue;
req_nodes[req_cnt++] = job_desc_msg->req_nodes;
}
if (req_cnt) {
list_iterator_reset(iter);
while ((job_desc_msg = list_next(iter))) {
for (i = 0; i < req_cnt; i++) {
if (req_nodes[i] == job_desc_msg->req_nodes)
continue; /* required by this job */
if (job_desc_msg->exc_nodes &&
job_desc_msg->exc_nodes[0])
sep = ",";
else
sep = "";
xstrfmtcat(job_desc_msg->exc_nodes, "%s%s",
sep, req_nodes[i]);
}
}
}
list_iterator_destroy(iter);
xfree(req_nodes);
}
/*
* _create_het_job_id_set - Obtain the het_job_id_set
* het_job_id_set OUT - allocated in the function and must be xfreed
* be the caller.
*/
static void _create_het_job_id_set(hostset_t *jobid_hostset,
uint32_t het_job_offset,
char **het_job_id_set)
{
char *tmp_str = NULL;
char *tmp_offset = NULL;
if (!jobid_hostset)
return;
tmp_str = hostset_ranged_string_xmalloc(jobid_hostset);
tmp_offset = tmp_str;
if (tmp_str[0] == '[') {
tmp_offset = strchr(tmp_str, ']');
if (tmp_offset)
tmp_offset[0] = '\0';
tmp_offset = tmp_str + 1;
}
*het_job_id_set = xstrdup(tmp_offset);
xfree(tmp_str);
}
/* _slurm_rpc_allocate_het_job: process RPC to allocate a hetjob resources */
static void _slurm_rpc_allocate_het_job(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS, inx, het_job_cnt = -1;
DEF_TIMERS;
job_desc_msg_t *job_desc_msg;
list_t *job_req_list = msg->data;
/* Locks: Read config, write job, write node, read partition */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
job_record_t *job_ptr, *first_job_ptr = NULL;
char *err_msg = NULL, **job_submit_user_msg = NULL;
list_itr_t *iter;
list_t *submit_job_list = NULL;
uint32_t het_job_id = 0, het_job_offset = 0;
hostset_t *jobid_hostset = NULL;
char tmp_str[32];
list_t *resp = NULL;
char resp_host[INET6_ADDRSTRLEN];
char *het_job_id_set = NULL;
START_TIMER;
if (slurmctld_config.submissions_disabled) {
info("Submissions disabled on system");
error_code = ESLURM_SUBMISSIONS_DISABLED;
goto send_msg;
}
if (!_sched_backfill()) {
info("REQUEST_HET_JOB_ALLOCATION from uid=%u rejected as sched/backfill is not configured",
msg->auth_uid);
error_code = ESLURM_NOT_SUPPORTED;
goto send_msg;
}
if (!job_req_list || (list_count(job_req_list) == 0)) {
info("REQUEST_HET_JOB_ALLOCATION from uid=%u with empty job list",
msg->auth_uid);
error_code = SLURM_ERROR;
goto send_msg;
}
if (msg->address.ss_family != AF_UNSPEC) {
slurm_get_ip_str(&msg->address, resp_host, sizeof(resp_host));
} else {
info("REQUEST_HET_JOB_ALLOCATION from uid=%u, can't get peer addr",
msg->auth_uid);
error_code = SLURM_ERROR;
goto send_msg;
}
sched_debug3("Processing RPC: REQUEST_HET_JOB_ALLOCATION from uid=%u",
msg->auth_uid);
/*
* If any job component has required nodes, those nodes must be excluded
* from all other components to avoid scheduling deadlock
*/
_exclude_het_job_nodes(job_req_list);
het_job_cnt = list_count(job_req_list);
job_submit_user_msg = xmalloc(sizeof(char *) * het_job_cnt);
submit_job_list = list_create(NULL);
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
inx = 0;
iter = list_iterator_create(job_req_list);
while ((job_desc_msg = list_next(iter))) {
/*
* Ignore what was sent in the RPC, only use auth values.
*/
job_desc_msg->user_id = msg->auth_uid;
job_desc_msg->group_id = msg->auth_gid;
_set_hostname(msg, &job_desc_msg->alloc_node);
if ((job_desc_msg->alloc_node == NULL) ||
(job_desc_msg->alloc_node[0] == '\0')) {
error_code = ESLURM_INVALID_NODE_NAME;
error("REQUEST_HET_JOB_ALLOCATION lacks alloc_node from uid=%u",
msg->auth_uid);
break;
}
if (job_desc_msg->array_inx) {
error_code = ESLURM_INVALID_ARRAY;
break;
}
if (job_desc_msg->immediate) {
error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
break;
}
/* Locks are for job_submit plugin use */
job_desc_msg->het_job_offset = het_job_offset;
error_code = validate_job_create_req(job_desc_msg, msg->auth_uid,
&job_submit_user_msg[inx]);
if (error_code)
break;
dump_job_desc(job_desc_msg);
job_ptr = NULL;
if (!job_desc_msg->resp_host)
job_desc_msg->resp_host = xstrdup(resp_host);
if (het_job_offset) {
/*
* Email notifications disable except for the
* hetjob leader
*/
job_desc_msg->mail_type = 0;
xfree(job_desc_msg->mail_user);
/* license request allowed only on leader */
if (job_desc_msg->licenses) {
xstrfmtcat(job_submit_user_msg[inx],
"%slicense request allowed only on leader job",
job_submit_user_msg[inx] ? "\n" : "");
error("REQUEST_HET_JOB_ALLOCATION from uid=%u, license request on non-leader job",
msg->auth_uid);
error_code = ESLURM_INVALID_LICENSES;
break;
}
}
job_desc_msg->het_job_offset = het_job_offset;
error_code = job_allocate(job_desc_msg, false, false, NULL,
true, msg->auth_uid, false, &job_ptr,
&err_msg, msg->protocol_version);
if (!job_ptr) {
if (error_code == SLURM_SUCCESS)
error_code = SLURM_ERROR;
break;
}
if (error_code && (job_ptr->job_state == JOB_FAILED))
break;
error_code = SLURM_SUCCESS; /* Non-fatal error */
if (het_job_id == 0) {
het_job_id = job_ptr->job_id;
first_job_ptr = job_ptr;
}
snprintf(tmp_str, sizeof(tmp_str), "%u", job_ptr->job_id);
if (jobid_hostset)
hostset_insert(jobid_hostset, tmp_str);
else
jobid_hostset = hostset_create(tmp_str);
job_ptr->het_job_id = het_job_id;
job_ptr->het_job_offset = het_job_offset++;
on_job_state_change(job_ptr, job_ptr->job_state);
list_append(submit_job_list, job_ptr);
inx++;
}
list_iterator_destroy(iter);
if ((error_code == 0) && (!first_job_ptr)) {
error("%s: No error, but no het_job_id", __func__);
error_code = SLURM_ERROR;
}
/* Validate limits on hetjob as a whole */
if ((error_code == SLURM_SUCCESS) &&
(accounting_enforce & ACCOUNTING_ENFORCE_LIMITS) &&
!acct_policy_validate_het_job(submit_job_list)) {
info("Hetjob %u exceeded association/QOS limit for user %u",
het_job_id, msg->auth_uid);
error_code = ESLURM_ACCOUNTING_POLICY;
}
/* Set the het_job_id_set */
_create_het_job_id_set(jobid_hostset, het_job_offset,
&het_job_id_set);
if (first_job_ptr)
first_job_ptr->het_job_list = submit_job_list;
iter = list_iterator_create(submit_job_list);
while ((job_ptr = list_next(iter))) {
job_ptr->het_job_id_set = xstrdup(het_job_id_set);
}
list_iterator_destroy(iter);
xfree(het_job_id_set);
if (error_code) {
/* Cancel remaining job records */
(void) list_for_each(submit_job_list, _het_job_cancel, NULL);
if (!first_job_ptr)
FREE_NULL_LIST(submit_job_list);
} else {
list_itr_t *iter;
inx = 0;
iter = list_iterator_create(submit_job_list);
while ((job_ptr = list_next(iter))) {
if (!resp)
resp = list_create(_del_alloc_het_job_msg);
list_append(resp,
build_alloc_msg(
job_ptr, error_code,
job_submit_user_msg[inx++]));
log_flag(HETJOB, "Submit %pJ", job_ptr);
}
list_iterator_destroy(iter);
}
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
END_TIMER2(__func__);
if (resp) {
if (send_msg_response(msg, RESPONSE_HET_JOB_ALLOCATION, resp))
_kill_job_on_msg_fail(het_job_id);
FREE_NULL_LIST(resp);
} else {
char *aggregate_user_msg;
send_msg: info("%s: %s ", __func__, slurm_strerror(error_code));
aggregate_user_msg = NULL;
/*
* If job is rejected, add the job submit message to the error
* message to avoid it getting lost. Was saved off earlier.
*/
for (inx = 0; inx < het_job_cnt; inx++) {
char *line = NULL, *last = NULL;
if (!job_submit_user_msg[inx])
continue;
/*
* Break apart any combined sentences and tag with index
*/
line = strtok_r(job_submit_user_msg[inx], "\n", &last);
while (line) {
xstrfmtcat(aggregate_user_msg, "%s%d: %s",
(aggregate_user_msg ? "\n" : ""),
inx, line);
line = strtok_r(NULL, "\n", &last);
}
}
if (aggregate_user_msg) {
char *tmp_err_msg = err_msg;
err_msg = aggregate_user_msg;
if (tmp_err_msg) {
xstrfmtcat(err_msg, "\n%s", tmp_err_msg);
xfree(tmp_err_msg);
}
}
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
}
xfree(err_msg);
for (inx = 0; inx < het_job_cnt; inx++)
xfree(job_submit_user_msg[inx]);
xfree(job_submit_user_msg);
if (jobid_hostset)
hostset_destroy(jobid_hostset);
schedule_job_save(); /* has own locks */
}
/* _slurm_rpc_allocate_resources: process RPC to allocate resources for
* a job
*/
static void _slurm_rpc_allocate_resources(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
job_desc_msg_t *job_desc_msg = msg->data;
/* Locks: Read config, read job, read node, read partition */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
/* Locks: Read config, write job, write node, read partition */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
int immediate = job_desc_msg->immediate;
bool do_unlock = false;
bool reject_job = false;
job_record_t *job_ptr = NULL;
char *err_msg = NULL, *job_submit_user_msg = NULL;
START_TIMER;
if (slurmctld_config.submissions_disabled) {
info("Submissions disabled on system");
error_code = ESLURM_SUBMISSIONS_DISABLED;
reject_job = true;
goto send_msg;
}
/*
* Ignore what was sent in the RPC, only use auth values.
*/
job_desc_msg->user_id = msg->auth_uid;
job_desc_msg->group_id = msg->auth_gid;
sched_debug3("Processing RPC: REQUEST_RESOURCE_ALLOCATION from uid=%u",
msg->auth_uid);
_set_hostname(msg, &job_desc_msg->alloc_node);
_set_identity(msg, &job_desc_msg->id);
/* do RPC call */
if ((job_desc_msg->alloc_node == NULL) ||
(job_desc_msg->alloc_node[0] == '\0')) {
error_code = ESLURM_INVALID_NODE_NAME;
error("REQUEST_RESOURCE_ALLOCATE lacks alloc_node from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* Locks are for job_submit plugin use */
lock_slurmctld(job_read_lock);
job_desc_msg->het_job_offset = NO_VAL;
error_code = validate_job_create_req(job_desc_msg,
msg->auth_uid, &err_msg);
unlock_slurmctld(job_read_lock);
}
/*
* In validate_job_create_req(), err_msg is currently only modified in
* the call to job_submit_g_submit. We save the err_msg in a temp
* char *job_submit_user_msg because err_msg can be overwritten later
* in the calls to fed_mgr_job_allocate and/or job_allocate, and we
* need the job submit plugin value to build the resource allocation
* response in the call to build_alloc_msg.
*/
if (err_msg) {
job_submit_user_msg = err_msg;
err_msg = NULL;
}
if (error_code) {
reject_job = true;
} else if (msg->address.ss_family != AF_UNSPEC) {
/* resp_host could already be set from a federated cluster */
if (!job_desc_msg->resp_host) {
job_desc_msg->resp_host = xmalloc(INET6_ADDRSTRLEN);
slurm_get_ip_str(&msg->address, job_desc_msg->resp_host,
INET6_ADDRSTRLEN);
}
dump_job_desc(job_desc_msg);
do_unlock = true;
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
if (fed_mgr_fed_rec) {
uint32_t job_id;
if (fed_mgr_job_allocate(msg, job_desc_msg, true,
&job_id, &error_code,
&err_msg)) {
reject_job = true;
} else if (!(job_ptr = find_job_record(job_id))) {
error("%s: can't find fed job that was just created. this should never happen",
__func__);
reject_job = true;
error_code = SLURM_ERROR;
}
} else {
job_desc_msg->het_job_offset = NO_VAL;
error_code = job_allocate(job_desc_msg, immediate,
false, NULL, true,
msg->auth_uid, false,
&job_ptr, &err_msg,
msg->protocol_version);
/* unlock after finished using the job structure
* data */
/* return result */
if (!job_ptr ||
(error_code && job_ptr->job_state == JOB_FAILED))
reject_job = true;
}
END_TIMER2(__func__);
} else {
reject_job = true;
error_code = SLURM_UNKNOWN_FORWARD_ADDR;
}
send_msg:
if (!reject_job) {
resource_allocation_response_msg_t *alloc_msg =
build_alloc_msg(job_ptr, error_code,
job_submit_user_msg);
xassert(job_ptr);
sched_info("%s %pJ NodeList=%s %s",
__func__, job_ptr, job_ptr->nodes, TIME_STR);
/*
* This check really isn't needed, but just doing it
* to be more complete.
*/
if (do_unlock) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
if (send_msg_response(msg, RESPONSE_RESOURCE_ALLOCATION,
alloc_msg))
_kill_job_on_msg_fail(job_ptr->job_id);
schedule_job_save(); /* has own locks */
schedule_node_save(); /* has own locks */
if (!alloc_msg->node_cnt) /* didn't get an allocation */
queue_job_scheduler();
/* NULL out working_cluster_rec since it's pointing to global
* memory */
alloc_msg->working_cluster_rec = NULL;
slurm_free_resource_allocation_response_msg(alloc_msg);
} else { /* allocate error */
if (do_unlock) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
info("%s: %s ", __func__, slurm_strerror(error_code));
/*
* If job is rejected, add the job submit message to the error
* message to avoid it getting lost. Was saved off earlier.
*/
if (job_submit_user_msg) {
char *tmp_err_msg = err_msg;
err_msg = job_submit_user_msg;
job_submit_user_msg = NULL;
if (tmp_err_msg) {
xstrfmtcat(err_msg, "\n%s", tmp_err_msg);
xfree(tmp_err_msg);
}
}
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
}
xfree(err_msg);
xfree(job_submit_user_msg);
}
/* _slurm_rpc_dump_conf - process RPC for Slurm configuration information */
static void _slurm_rpc_dump_conf(slurm_msg_t *msg)
{
DEF_TIMERS;
last_update_msg_t *last_time_msg = msg->data;
slurm_ctl_conf_info_msg_t config_tbl;
/* Locks: Read config, job, partition, fed */
slurmctld_lock_t config_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, READ_LOCK, READ_LOCK };
START_TIMER;
lock_slurmctld(config_read_lock);
/* check to see if configuration data has changed */
if ((last_time_msg->last_update - 1) >= slurm_conf.last_update) {
unlock_slurmctld(config_read_lock);
debug2("%s, no change", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
} else {
_fill_ctld_conf(&config_tbl);
unlock_slurmctld(config_read_lock);
END_TIMER2(__func__);
/* send message */
(void) send_msg_response(msg, RESPONSE_BUILD_INFO, &config_tbl);
free_slurm_conf(&config_tbl, false);
}
}
/* _slurm_rpc_dump_jobs - process RPC for job state information */
static void _slurm_rpc_dump_jobs(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
job_info_request_msg_t *job_info_request_msg = msg->data;
/* Locks: Read config job part */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, READ_LOCK, READ_LOCK };
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_read_lock);
if ((job_info_request_msg->last_update - 1) >= last_job_update) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
debug3("%s, no change", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
} else {
if (job_info_request_msg->job_ids) {
buffer = pack_spec_jobs(job_info_request_msg->job_ids,
job_info_request_msg->show_flags,
msg->auth_uid, NO_VAL,
msg->protocol_version);
} else {
buffer = pack_all_jobs(job_info_request_msg->show_flags,
msg->auth_uid, NO_VAL,
msg->protocol_version);
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
#if 0
info("%s, size=%d %s", __func__, dump_size, TIME_STR);
#endif
/* send message */
(void) send_msg_response(msg, RESPONSE_JOB_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
}
/* _slurm_rpc_dump_jobs - process RPC for job state information */
static void _slurm_rpc_dump_jobs_user(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
job_user_id_msg_t *job_info_request_msg = msg->data;
/* Locks: Read config job part */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, READ_LOCK, READ_LOCK };
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_read_lock);
buffer = pack_all_jobs(job_info_request_msg->show_flags, msg->auth_uid,
job_info_request_msg->user_id,
msg->protocol_version);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
#if 0
info("%s, size=%d %s", __func__, dump_size, TIME_STR);
#endif
/* send message */
(void) send_msg_response(msg, RESPONSE_JOB_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
static void _slurm_rpc_job_state(slurm_msg_t *msg)
{
DEF_TIMERS;
job_state_request_msg_t *js = msg->data;
job_state_response_msg_t *jsr = NULL;
int rc;
jsr = xmalloc(sizeof(*jsr));
START_TIMER;
/* Do not lock here. Locking is done conditionally in dump_job_state */
rc = dump_job_state(js->count, js->job_ids, &jsr->jobs_count,
&jsr->jobs);
END_TIMER2(__func__);
if (rc) {
slurm_send_rc_msg(msg, rc);
} else {
(void) send_msg_response(msg, RESPONSE_JOB_STATE, jsr);
}
slurm_free_job_state_response_msg(jsr);
}
/* _slurm_rpc_dump_job_single - process RPC for one job's state information */
static void _slurm_rpc_dump_job_single(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
job_id_msg_t *job_id_msg = msg->data;
/* Locks: Read config, job, and node info */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, READ_LOCK, READ_LOCK };
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_read_lock);
buffer = pack_one_job(job_id_msg->job_id, job_id_msg->show_flags,
msg->auth_uid, msg->protocol_version);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (!buffer) {
slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
} else {
(void) send_msg_response(msg, RESPONSE_JOB_INFO, buffer);
}
FREE_NULL_BUFFER(buffer);
}
static void _slurm_rpc_hostlist_expansion(slurm_msg_t *msg)
{
DEF_TIMERS;
slurmctld_lock_t node_read_lock = {
.node = READ_LOCK,
};
bitstr_t *bitmap = NULL;
char *expanded = NULL;
START_TIMER;
if ((slurm_conf.private_data & PRIVATE_DATA_NODES) &&
(!validate_operator(msg->auth_uid))) {
error("Security violation, REQUEST_HOSTLIST_EXPANSION RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(node_read_lock);
if (!node_name2bitmap(msg->data, false, &bitmap, NULL))
expanded = bitmap2node_name_sortable(bitmap, false);
FREE_NULL_BITMAP(bitmap);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(node_read_lock);
END_TIMER2(__func__);
if (!expanded) {
slurm_send_rc_msg(msg, ESLURM_INVALID_NODE_NAME);
} else {
(void) send_msg_response(msg, RESPONSE_HOSTLIST_EXPANSION,
expanded);
}
xfree(expanded);
}
static void _slurm_rpc_get_shares(slurm_msg_t *msg)
{
DEF_TIMERS;
shares_request_msg_t *req_msg = msg->data;
shares_response_msg_t resp_msg;
START_TIMER;
memset(&resp_msg, 0, sizeof(resp_msg));
assoc_mgr_get_shares(acct_db_conn, msg->auth_uid, req_msg, &resp_msg);
(void) send_msg_response(msg, RESPONSE_SHARE_INFO, &resp_msg);
FREE_NULL_LIST(resp_msg.assoc_shares_list);
/* don't free the resp_msg.tres_names */
END_TIMER2(__func__);
debug2("%s %s", __func__, TIME_STR);
}
static void _slurm_rpc_get_priority_factors(slurm_msg_t *msg)
{
DEF_TIMERS;
priority_factors_response_msg_t resp_msg;
/* Read lock on jobs, nodes, and partitions */
slurmctld_lock_t job_read_lock = {
.job = READ_LOCK,
.node = READ_LOCK,
.part = READ_LOCK,
};
assoc_mgr_lock_t qos_read_locks = {
.qos = READ_LOCK,
};
START_TIMER;
lock_slurmctld(job_read_lock);
assoc_mgr_lock(&qos_read_locks);
resp_msg.priority_factors_list = priority_g_get_priority_factors_list(
msg->auth_uid);
(void) send_msg_response(msg, RESPONSE_PRIORITY_FACTORS, &resp_msg);
assoc_mgr_unlock(&qos_read_locks);
unlock_slurmctld(job_read_lock);
FREE_NULL_LIST(resp_msg.priority_factors_list);
END_TIMER2(__func__);
debug2("%s %s", __func__, TIME_STR);
}
/* _slurm_rpc_end_time - Process RPC for job end time */
static void _slurm_rpc_end_time(slurm_msg_t *msg)
{
DEF_TIMERS;
job_alloc_info_msg_t *time_req_msg = msg->data;
srun_timeout_msg_t timeout_msg;
int rc;
/* Locks: Read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_read_lock);
rc = job_end_time(time_req_msg, &timeout_msg);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (rc != SLURM_SUCCESS) {
slurm_send_rc_msg(msg, rc);
} else {
(void) send_msg_response(msg, SRUN_TIMEOUT, &timeout_msg);
}
debug2("%s JobId=%u %s", __func__, time_req_msg->job_id, TIME_STR);
}
/* _slurm_rpc_get_fd - process RPC for federation state information */
static void _slurm_rpc_get_fed(slurm_msg_t *msg)
{
DEF_TIMERS;
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(fed_read_lock);
/* send message */
(void) send_msg_response(msg, RESPONSE_FED_INFO, fed_mgr_fed_rec);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(fed_read_lock);
END_TIMER2(__func__);
debug2("%s %s", __func__, TIME_STR);
}
/* _slurm_rpc_dump_nodes - dump RPC for node state information */
static void _slurm_rpc_dump_nodes(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer;
node_info_request_msg_t *node_req_msg = msg->data;
/* Locks: Read config, write node (reset allocated CPU count in some
* select plugins), read part (for part_is_visible) */
slurmctld_lock_t node_write_lock = {
READ_LOCK, NO_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
if ((slurm_conf.private_data & PRIVATE_DATA_NODES) &&
(!validate_operator(msg->auth_uid))) {
error("Security violation, REQUEST_NODE_INFO RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(node_write_lock);
select_g_select_nodeinfo_set_all();
if ((node_req_msg->last_update - 1) >= last_node_update) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(node_write_lock);
debug3("%s, no change", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
} else {
buffer = pack_all_nodes(node_req_msg->show_flags,
msg->auth_uid, msg->protocol_version);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(node_write_lock);
END_TIMER2(__func__);
/* send message */
(void) send_msg_response(msg, RESPONSE_NODE_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
}
/* _slurm_rpc_dump_node_single - done RPC state information for one node */
static void _slurm_rpc_dump_node_single(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
node_info_single_msg_t *node_req_msg = msg->data;
/* Locks: Read config, read node, read part (for part_is_visible) */
slurmctld_lock_t node_write_lock = {
READ_LOCK, NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
if ((slurm_conf.private_data & PRIVATE_DATA_NODES) &&
(!validate_operator(msg->auth_uid))) {
error("Security violation, REQUEST_NODE_INFO_SINGLE RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
lock_slurmctld(node_write_lock);
#if 0
/* This function updates each node's alloc_cpus count and too slow for
* our use here. Node write lock is needed if this function is used */
select_g_select_nodeinfo_set_all();
#endif
buffer = pack_one_node(node_req_msg->show_flags, msg->auth_uid,
node_req_msg->node_name, msg->protocol_version);
unlock_slurmctld(node_write_lock);
END_TIMER2(__func__);
/* send message */
(void) send_msg_response(msg, RESPONSE_NODE_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
/* _slurm_rpc_dump_partitions - process RPC for partition state information */
static void _slurm_rpc_dump_partitions(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
part_info_request_msg_t *part_req_msg = msg->data;
/* Locks: Read configuration and partition */
slurmctld_lock_t part_read_lock = {
READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
if ((slurm_conf.private_data & PRIVATE_DATA_PARTITIONS) &&
!validate_operator(msg->auth_uid)) {
debug2("Security violation, PARTITION_INFO RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(part_read_lock);
if ((part_req_msg->last_update - 1) >= last_part_update) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(part_read_lock);
debug2("%s, no change", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
} else {
buffer = pack_all_part(part_req_msg->show_flags, msg->auth_uid,
msg->protocol_version);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(part_read_lock);
END_TIMER2(__func__);
/* send message */
(void) send_msg_response(msg, RESPONSE_PARTITION_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
}
/* _slurm_rpc_epilog_complete - process RPC noting the completion of
* the epilog denoting the completion of a job it its entirety */
static void _slurm_rpc_epilog_complete(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
static time_t config_update = 0;
static bool defer_sched = false;
DEF_TIMERS;
/* Locks: Read configuration, write job, write node */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
epilog_complete_msg_t *epilog_msg = msg->data;
job_record_t *job_ptr;
bool run_scheduler = false;
START_TIMER;
if (!validate_slurm_user(msg->auth_uid)) {
error("Security violation, EPILOG_COMPLETE RPC from uid=%u",
msg->auth_uid);
return;
}
/* Only throttle on non-composite messages, the lock should
* already be set earlier. */
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
if (config_update != slurm_conf.last_update) {
defer_sched = (xstrcasestr(slurm_conf.sched_params,
"defer"));
config_update = slurm_conf.last_update;
}
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
}
log_flag(ROUTE, "%s: node_name = %s, JobId=%u",
__func__, epilog_msg->node_name, epilog_msg->job_id);
if (job_epilog_complete(epilog_msg->job_id, epilog_msg->node_name,
epilog_msg->return_code))
run_scheduler = true;
job_ptr = find_job_record(epilog_msg->job_id);
if (epilog_msg->return_code)
error("%s: epilog error %pJ Node=%s Err=%s %s",
__func__, job_ptr, epilog_msg->node_name,
slurm_strerror(epilog_msg->return_code), TIME_STR);
else
debug2("%s: %pJ Node=%s %s",
__func__, job_ptr, epilog_msg->node_name, TIME_STR);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
END_TIMER2(__func__);
/* Functions below provide their own locking */
if (!(msg->flags & CTLD_QUEUE_PROCESSING) && run_scheduler) {
/*
* In defer mode, avoid triggering the scheduler logic
* for every epilog complete message.
* As one epilog message is sent from every node of each
* job at termination, the number of simultaneous schedule
* calls can be very high for large machine or large number
* of managed jobs.
*/
if (!LOTS_OF_AGENTS && !defer_sched)
schedule(false); /* Has own locking */
else
queue_job_scheduler();
schedule_node_save(); /* Has own locking */
schedule_job_save(); /* Has own locking */
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* _slurm_rpc_job_step_kill - process RPC to cancel an entire job or
* an individual job step */
static void _slurm_rpc_job_step_kill(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS;
job_step_kill_msg_t *job_step_kill_msg = msg->data;
log_flag(STEPS, "Processing RPC details: REQUEST_CANCEL_JOB_STEP %ps flags=0x%x",
&job_step_kill_msg->step_id, job_step_kill_msg->flags);
_throttle_start(&active_rpc_cnt);
error_code = kill_job_step(job_step_kill_msg, msg->auth_uid);
_throttle_fini(&active_rpc_cnt);
slurm_send_rc_msg(msg, error_code);
}
/* _slurm_rpc_complete_job_allocation - process RPC to note the
* completion of a job allocation */
static void _slurm_rpc_complete_job_allocation(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
complete_job_allocation_msg_t *comp_msg = msg->data;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
job_record_t *job_ptr;
/* init */
START_TIMER;
debug3("Processing RPC details: REQUEST_COMPLETE_JOB_ALLOCATION for JobId=%u rc=%d",
comp_msg->job_id, comp_msg->job_rc);
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(comp_msg->job_id);
log_flag(TRACE_JOBS, "%s: enter %pJ", __func__, job_ptr);
/* Mark job and/or job step complete */
error_code = job_complete(comp_msg->job_id, msg->auth_uid,
false, false, comp_msg->job_rc);
if (error_code) {
if (error_code == ESLURM_INVALID_JOB_ID) {
info("%s: JobId=%d error %s",
__func__, comp_msg->job_id,
slurm_strerror(error_code));
} else {
info("%s: %pJ error %s",
__func__, job_ptr, slurm_strerror(error_code));
}
} else {
debug2("%s: %pJ %s", __func__, job_ptr, TIME_STR);
}
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
END_TIMER2(__func__);
/* return result */
if (error_code) {
slurm_send_rc_msg(msg, error_code);
} else {
slurmctld_diag_stats.jobs_completed++;
slurm_send_rc_msg(msg, SLURM_SUCCESS);
(void) schedule_job_save(); /* Has own locking */
(void) schedule_node_save(); /* Has own locking */
}
log_flag(TRACE_JOBS, "%s: return %pJ", __func__, job_ptr);
}
/* _slurm_rpc_complete_prolog - process RPC to note the
* completion of a prolog */
static void _slurm_rpc_complete_prolog(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
complete_prolog_msg_t *comp_msg = msg->data;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
/* init */
START_TIMER;
debug3("Processing RPC details: REQUEST_COMPLETE_PROLOG from JobId=%u",
comp_msg->job_id);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_write_lock);
error_code = prolog_complete(comp_msg->job_id, comp_msg->prolog_rc,
comp_msg->node_name);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
/* return result */
if (error_code) {
info("%s JobId=%u: %s ",
__func__, comp_msg->job_id, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s JobId=%u %s", __func__, comp_msg->job_id, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
}
/* _slurm_rpc_complete_batch - process RPC from slurmstepd to note the
* completion of a batch script */
static void _slurm_rpc_complete_batch_script(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS, i;
DEF_TIMERS;
complete_batch_script_msg_t *comp_msg = msg->data;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
bool job_requeue = false;
bool dump_job = false, dump_node = false;
job_record_t *job_ptr = NULL;
char *nodes = comp_msg->node_name;
/* init */
START_TIMER;
debug3("Processing RPC details: REQUEST_COMPLETE_BATCH_SCRIPT for JobId=%u",
comp_msg->job_id);
if (!validate_slurm_user(msg->auth_uid)) {
error("A non superuser %u tried to complete batch JobId=%u",
msg->auth_uid, comp_msg->job_id);
/* Only the slurmstepd can complete a batch script */
END_TIMER2(__func__);
return;
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
}
job_ptr = find_job_record(comp_msg->job_id);
if (job_ptr && job_ptr->batch_host && comp_msg->node_name &&
xstrcmp(job_ptr->batch_host, comp_msg->node_name)) {
/* This can be the result of the slurmd on the batch_host
* failing, but the slurmstepd continuing to run. Then the
* batch job is requeued and started on a different node.
* The end result is one batch complete RPC from each node. */
error("Batch completion for JobId=%u sent from wrong node (%s rather than %s). Was the job requeued due to node failure?",
comp_msg->job_id,
comp_msg->node_name, job_ptr->batch_host);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
slurm_send_rc_msg(msg, error_code);
return;
}
/*
* Send batch step info to accounting, only if the job is
* still completing.
*
* When a job is requeued because of node failure, and there is no
* epilog, both EPILOG_COMPLETE and COMPLETE_BATCH_SCRIPT_COMPLETE
* messages are sent at the same time and received on different
* threads. EPILOG_COMPLETE will grab a new db_index for the job. So if
* COMPLETE_BATCH_SCRIPT happens after EPILOG_COMPLETE, then adding the
* batch step would happen on the new db instance -- which is incorrect.
* Rather than try to ensure that COMPLETE_BATCH_SCRIPT happens after
* EPILOG_COMPLETE, just throw away the batch step for node failures.
*
* NOTE: Do not use IS_JOB_PENDING since that doesn't take
* into account the COMPLETING FLAG which is valid, but not
* always set yet when the step exits normally.
*/
if (slurm_with_slurmdbd() && job_ptr &&
(job_ptr->job_state != JOB_PENDING)) {
/* This logic was taken from _slurm_rpc_step_complete() */
slurm_step_id_t step_id = { .job_id = job_ptr->job_id,
.step_id = SLURM_BATCH_SCRIPT,
.step_het_comp = NO_VAL };
step_record_t *step_ptr = find_step_record(job_ptr, &step_id);
if (!step_ptr) {
/* Ignore duplicate or late batch complete RPCs */
debug("%s: Ignoring late or duplicate REQUEST_COMPLETE_BATCH_SCRIPT received for job %pJ",
__func__, job_ptr);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
return;
} else if (step_ptr->step_id.step_id != SLURM_BATCH_SCRIPT) {
error("%s: %pJ Didn't find batch step, found step %u. This should never happen.",
__func__, job_ptr, step_ptr->step_id.step_id);
} else {
step_ptr->exit_code = comp_msg->job_rc;
jobacctinfo_destroy(step_ptr->jobacct);
step_ptr->jobacct = comp_msg->jobacct;
comp_msg->jobacct = NULL;
step_ptr->state |= JOB_COMPLETING;
jobacct_storage_g_step_complete(acct_db_conn, step_ptr);
delete_step_record(job_ptr, step_ptr);
}
}
/* do RPC call */
/* First set node DOWN if fatal error */
if ((comp_msg->slurm_rc == ESLURMD_STEP_NOTRUNNING) ||
(comp_msg->slurm_rc == ESLURM_ALREADY_DONE) ||
(comp_msg->slurm_rc == ESLURMD_CREDENTIAL_REVOKED)) {
/* race condition on job termination, not a real error */
info("slurmd error running JobId=%u from Node(s)=%s: %s",
comp_msg->job_id, nodes,
slurm_strerror(comp_msg->slurm_rc));
comp_msg->slurm_rc = SLURM_SUCCESS;
} else if ((comp_msg->slurm_rc == SLURM_COMMUNICATIONS_SEND_ERROR) ||
(comp_msg->slurm_rc == ESLURM_USER_ID_MISSING) ||
(comp_msg->slurm_rc == ESLURMD_INVALID_ACCT_FREQ) ||
(comp_msg->slurm_rc == ESPANK_JOB_FAILURE)) {
/* Handle non-fatal errors here. All others drain the node. */
error("Slurmd error running JobId=%u on Node(s)=%s: %s",
comp_msg->job_id, nodes,
slurm_strerror(comp_msg->slurm_rc));
} else if (comp_msg->slurm_rc != SLURM_SUCCESS) {
error("slurmd error running JobId=%u on Node(s)=%s: %s",
comp_msg->job_id, nodes,
slurm_strerror(comp_msg->slurm_rc));
slurmctld_diag_stats.jobs_failed++;
if (error_code == SLURM_SUCCESS) {
error_code = drain_nodes(comp_msg->node_name,
"batch job complete failure",
slurm_conf.slurm_user_id);
if ((comp_msg->job_rc != SLURM_SUCCESS) && job_ptr &&
job_ptr->details && job_ptr->details->requeue)
job_requeue = true;
dump_job = true;
dump_node = true;
}
}
/*
* If we've already sent the SIGTERM signal from
* _job_check_grace_internal assume the job completed on signal, that's
* subjected to a race condition. The job may just complete just before
* we deliver the signal.
*/
if (job_ptr && (job_ptr->bit_flags & GRACE_PREEMPT) &&
job_ptr->details && job_ptr->details->requeue &&
(slurm_job_preempt_mode(job_ptr) == PREEMPT_MODE_REQUEUE))
job_requeue = true;
/* Mark job allocation complete */
i = job_complete(comp_msg->job_id, msg->auth_uid, job_requeue, false,
comp_msg->job_rc);
error_code = MAX(error_code, i);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
/* this has to be done after the job_complete */
END_TIMER2(__func__);
/* return result */
if (error_code) {
debug2("%s JobId=%u: %s ",
__func__, comp_msg->job_id, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s JobId=%u %s", __func__, comp_msg->job_id, TIME_STR);
slurmctld_diag_stats.jobs_completed++;
dump_job = true;
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
if (dump_job)
(void) schedule_job_save(); /* Has own locking */
if (dump_node)
(void) schedule_node_save(); /* Has own locking */
}
static void _slurm_rpc_dump_batch_script(slurm_msg_t *msg)
{
DEF_TIMERS;
int rc = SLURM_SUCCESS;
job_record_t *job_ptr;
buf_t *script;
job_id_msg_t *job_id_msg = msg->data;
/* Locks: Read config, job, and node info */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
START_TIMER;
debug3("Processing RPC details: REQUEST_BATCH_SCRIPT for JobId=%u",
job_id_msg->job_id);
lock_slurmctld(job_read_lock);
if ((job_ptr = find_job_record(job_id_msg->job_id))) {
if (!validate_operator(msg->auth_uid) &&
(job_ptr->user_id != msg->auth_uid)) {
rc = ESLURM_USER_ID_MISSING;
} else {
script = get_job_script(job_ptr);
if (!script)
rc = ESLURM_JOB_SCRIPT_MISSING;
}
} else {
rc = ESLURM_INVALID_JOB_ID;
}
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (rc != SLURM_SUCCESS) {
slurm_send_rc_msg(msg, rc);
} else {
(void) send_msg_response(msg, RESPONSE_BATCH_SCRIPT, script);
FREE_NULL_BUFFER(script);
}
}
static void _step_create_job_lock(bool lock)
{
static int active_rpc_cnt = 0;
slurmctld_lock_t job_write_lock = {
.job = WRITE_LOCK,
.node = READ_LOCK,
};
if (lock) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
} else {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
}
static void _step_create_job_fail_lock(bool lock)
{
static int active_rpc_cnt = 0;
/* Same locks as _slurm_rpc_step_complete */
slurmctld_lock_t job_write_lock = {
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
};
if (lock) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
} else {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
}
/* _slurm_rpc_job_step_create - process RPC to create/register a job step
* with the stepmgr */
static void _slurm_rpc_job_step_create(slurm_msg_t *msg)
{
if (!step_create_from_msg(msg, -1,
((!(msg->flags & CTLD_QUEUE_PROCESSING)) ?
_step_create_job_lock :
NULL),
((!(msg->flags & CTLD_QUEUE_PROCESSING)) ?
_step_create_job_fail_lock :
NULL))) {
schedule_job_save(); /* Sets own locks */
}
}
static int _pack_ctld_job_steps(void *x, void *arg)
{
job_record_t *job_ptr = (job_record_t *) x;
pack_step_args_t *args = (pack_step_args_t *) arg;
if ((args->step_id->job_id != NO_VAL) &&
(args->step_id->job_id != job_ptr->job_id) &&
(args->step_id->job_id != job_ptr->array_job_id))
return 0;
args->valid_job = 1;
if (((args->show_flags & SHOW_ALL) == 0) && !args->privileged &&
(job_ptr->part_ptr) &&
part_not_on_list(args->visible_parts, job_ptr->part_ptr))
return 0;
if ((slurm_conf.private_data & PRIVATE_DATA_JOBS) &&
(job_ptr->user_id != args->uid) && !args->privileged) {
if (slurm_mcs_get_privatedata()) {
if (mcs_g_check_mcs_label(args->uid,
job_ptr->mcs_label, false))
return 0;
} else if (!assoc_mgr_is_user_acct_coord(acct_db_conn,
args->uid,
job_ptr->account,
false)) {
return 0;
}
}
/*
* Pack a single requested step, or pack all steps.
*/
if (args->step_id->step_id != NO_VAL ) {
step_record_t *step_ptr = find_step_record(job_ptr,
args->step_id);
if (!step_ptr)
goto fini;
pack_ctld_job_step_info(step_ptr, args);
} else {
list_for_each(job_ptr->step_list,
pack_ctld_job_step_info,
args);
}
fini:
/*
* Only return stepmgr_jobs if looking for a specific job to avoid
* querying all stepmgr's for all steps.
*/
if ((args->step_id->job_id != NO_VAL) &&
(job_ptr->bit_flags & STEPMGR_ENABLED) &&
IS_JOB_RUNNING(job_ptr)) {
stepmgr_job_info_t *sji = xmalloc(sizeof(*sji));
if (!args->stepmgr_jobs)
args->stepmgr_jobs = list_create(NULL);
sji->job_id = job_ptr->job_id;
sji->stepmgr = job_ptr->batch_host;
list_append(args->stepmgr_jobs, sji);
}
return 0;
}
/* _slurm_rpc_job_step_get_info - process request for job step info */
static void _slurm_rpc_job_step_get_info(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer = NULL;
int error_code = SLURM_SUCCESS;
job_step_info_request_msg_t *request = msg->data;
/* Locks: Read config, job, part */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_read_lock);
if ((request->last_update - 1) >= last_job_update) {
unlock_slurmctld(job_read_lock);
log_flag(STEPS, "%s: no change", __func__);
error_code = SLURM_NO_CHANGE_IN_DATA;
} else {
bool privileged = validate_operator(msg->auth_uid);
bool skip_visible_parts =
(request->show_flags & SHOW_ALL) || privileged;
pack_step_args_t args = {0};
buffer = init_buf(BUF_SIZE);
args.step_id = &request->step_id,
args.show_flags = request->show_flags,
args.uid = msg->auth_uid,
args.steps_packed = 0,
args.buffer = buffer,
args.privileged = privileged,
args.proto_version = msg->protocol_version,
args.valid_job = false,
args.visible_parts = build_visible_parts(msg->auth_uid,
skip_visible_parts),
args.job_step_list = job_list,
args.pack_job_step_list_func = _pack_ctld_job_steps,
error_code = pack_job_step_info_response_msg(&args);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (error_code) {
/* job_id:step_id not found or otherwise *\
\* error message is printed elsewhere */
log_flag(STEPS, "%s: %s",
__func__, slurm_strerror(error_code));
}
}
if (error_code)
slurm_send_rc_msg(msg, error_code);
else {
(void) send_msg_response(msg, RESPONSE_JOB_STEP_INFO, buffer);
}
FREE_NULL_BUFFER(buffer);
}
/* _slurm_rpc_job_will_run - process RPC to determine if job with given
* configuration can be initiated */
static void _slurm_rpc_job_will_run(slurm_msg_t *msg)
{
/* init */
DEF_TIMERS;
int error_code = SLURM_SUCCESS;
job_record_t *job_ptr = NULL;
job_desc_msg_t *job_desc_msg = msg->data;
/* Locks: Read config, read job, read node, read partition */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
/* Locks: Read config, write job, write node, read partition, read fed*/
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
will_run_response_msg_t *resp = NULL;
char *err_msg = NULL, *job_submit_user_msg = NULL;
if (slurmctld_config.submissions_disabled) {
info("Submissions disabled on system");
error_code = ESLURM_SUBMISSIONS_DISABLED;
goto send_reply;
}
START_TIMER;
if ((error_code = _valid_id("REQUEST_JOB_WILL_RUN", job_desc_msg,
msg->auth_uid, msg->auth_gid,
msg->protocol_version)))
goto send_reply;
_set_hostname(msg, &job_desc_msg->alloc_node);
_set_identity(msg, &job_desc_msg->id);
if ((job_desc_msg->alloc_node == NULL)
|| (job_desc_msg->alloc_node[0] == '\0')) {
error_code = ESLURM_INVALID_NODE_NAME;
error("REQUEST_JOB_WILL_RUN lacks alloc_node from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* Locks are for job_submit plugin use */
lock_slurmctld(job_read_lock);
job_desc_msg->het_job_offset = NO_VAL;
error_code = validate_job_create_req(job_desc_msg,
msg->auth_uid, &err_msg);
unlock_slurmctld(job_read_lock);
}
if (err_msg)
job_submit_user_msg = xstrdup(err_msg);
if (msg->address.ss_family != AF_UNSPEC) {
job_desc_msg->resp_host = xmalloc(INET6_ADDRSTRLEN);
slurm_get_ip_str(&msg->address, job_desc_msg->resp_host,
INET6_ADDRSTRLEN);
dump_job_desc(job_desc_msg);
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_write_lock);
if (job_desc_msg->job_id == NO_VAL) {
job_desc_msg->het_job_offset = NO_VAL;
error_code = job_allocate(job_desc_msg, false,
true, &resp, true,
msg->auth_uid, false,
&job_ptr,
&err_msg,
msg->protocol_version);
} else { /* existing job test */
job_ptr = find_job_record(job_desc_msg->job_id);
error_code = job_start_data(job_ptr, &resp);
}
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
}
} else {
error_code = SLURM_UNKNOWN_FORWARD_ADDR;
}
send_reply:
/* return result */
if (error_code) {
debug2("%s: %s", __func__, slurm_strerror(error_code));
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
} else if (resp) {
resp->job_submit_user_msg = job_submit_user_msg;
job_submit_user_msg = NULL;
(void) send_msg_response(msg, RESPONSE_JOB_WILL_RUN, resp);
slurm_free_will_run_response_msg(resp);
debug2("%s success %s", __func__, TIME_STR);
} else {
debug2("%s success %s", __func__, TIME_STR);
if (job_desc_msg->job_id == NO_VAL)
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
xfree(err_msg);
xfree(job_submit_user_msg);
}
static bool _node_has_feature(node_record_t *node_ptr, char *feature)
{
node_feature_t *node_feature;
if ((node_feature = list_find_first(active_feature_list,
list_find_feature, feature))) {
if (bit_test(node_feature->node_bitmap, node_ptr->index))
return true;
}
return false;
}
#define FUTURE_MAP_FAILED -1 /* failed to map registration to future node */
#define FUTURE_MAP_SUCCESS 0 /* mapped the registration to a future node */
#define FUTURE_MAP_EXISTING 1 /* found an existing mapped node */
/*
* Find available future node to associate slurmd with.
*
* Sets reg_msg->node_name to found node_name so subsequent calls to
* find the node work.
*
* return: FUTURE_MAP_*
*/
static int _find_avail_future_node(slurm_msg_t *msg)
{
node_record_t *node_ptr;
slurm_node_registration_status_msg_t *reg_msg = msg->data;
int rc = FUTURE_MAP_FAILED;
node_ptr = find_node_record2(reg_msg->hostname);
if (node_ptr == NULL) {
int i;
time_t now;
debug2("finding available dynamic future node for %s/%s",
reg_msg->node_name, reg_msg->hostname);
for (i = 0; (node_ptr = next_node(&i)); i++) {
slurm_addr_t addr;
char *comm_name = NULL;
if (!IS_NODE_FUTURE(node_ptr))
continue;
if (reg_msg->dynamic_feature &&
!_node_has_feature(
node_ptr,reg_msg->dynamic_feature))
continue;
else if ((node_ptr->cpus != reg_msg->cpus) ||
(node_ptr->boards != reg_msg->boards) ||
(node_ptr->tot_sockets != reg_msg->sockets) ||
(node_ptr->cores != reg_msg->cores) ||
(node_ptr->threads != reg_msg->threads))
continue;
/* Get IP of slurmd */
if (msg->address.ss_family != AF_UNSPEC) {
comm_name = xmalloc(INET6_ADDRSTRLEN);
slurm_get_ip_str(&addr, comm_name,
INET6_ADDRSTRLEN);
}
set_node_comm_name(node_ptr, comm_name,
reg_msg->hostname);
now = time(NULL);
node_ptr->node_state = NODE_STATE_IDLE;
node_ptr->node_state |= NODE_STATE_DYNAMIC_FUTURE;
node_ptr->last_response = now;
node_ptr->last_busy = now;
/*
* When 24.11 is no longer supported, remove this if
* block.
*/
if (msg->protocol_version <=
SLURM_24_11_PROTOCOL_VERSION) {
/*
* As we don't validate the node specs until the
* 2nd registration RPC, and slurmd only sends
* instance-like attributes in the 1st
* registration RPC of its lifetime, we need to
* store these values here.
*/
if (reg_msg->instance_id) {
xfree(node_ptr->instance_id);
if (reg_msg->instance_id[0])
node_ptr->instance_id =
xstrdup(reg_msg->instance_id);
}
if (reg_msg->instance_type) {
xfree(node_ptr->instance_type);
if (reg_msg->instance_type[0])
node_ptr->instance_type =
xstrdup(reg_msg->instance_type);
}
}
bit_clear(future_node_bitmap, node_ptr->index);
xfree(comm_name);
clusteracct_storage_g_node_up(acct_db_conn, node_ptr,
now);
rc = FUTURE_MAP_SUCCESS;
break;
}
} else {
debug2("found existing node %s/%s for dynamic future node registration",
reg_msg->node_name, reg_msg->hostname);
rc = FUTURE_MAP_EXISTING;
}
if (node_ptr && (rc != FUTURE_MAP_FAILED)) {
debug2("dynamic future node %s/%s/%s assigned to node %s",
reg_msg->node_name, node_ptr->node_hostname,
node_ptr->comm_name, node_ptr->name);
/*
* We always need to send the hostname back to the slurmd. In
* case the slurmd already registered and we found the node_ptr
* by the node_hostname.
*/
xfree(reg_msg->node_name);
reg_msg->node_name = xstrdup(node_ptr->name);
} else if (rc == FUTURE_MAP_FAILED) {
error("Failed to map %s/%s to an available future node",
reg_msg->node_name, reg_msg->hostname);
}
return rc;
}
static void _slurm_post_rpc_node_registration()
{
if (do_post_rpc_node_registration)
clusteracct_storage_g_cluster_tres(acct_db_conn, NULL, NULL, 0,
SLURM_PROTOCOL_VERSION);
do_post_rpc_node_registration = false;
}
/* _slurm_rpc_node_registration - process RPC to determine if a node's
* actual configuration satisfies the configured specification */
static void _slurm_rpc_node_registration(slurm_msg_t *msg)
{
/* init */
DEF_TIMERS;
int error_code = SLURM_SUCCESS;
bool newly_up = false;
bool already_registered = false;
slurm_node_registration_status_msg_t *node_reg_stat_msg = msg->data;
slurmctld_lock_t job_write_lock = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.part = WRITE_LOCK,
.fed = READ_LOCK,
};
START_TIMER;
if (!validate_slurm_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, NODE_REGISTER RPC from uid=%u",
msg->auth_uid);
}
if (msg->protocol_version != SLURM_PROTOCOL_VERSION)
info("Node %s appears to have a different version "
"of Slurm than ours. Please update at your earliest "
"convenience.", node_reg_stat_msg->node_name);
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
if (!(slurm_conf.debug_flags & DEBUG_FLAG_NO_CONF_HASH) &&
(node_reg_stat_msg->hash_val != NO_VAL) &&
(node_reg_stat_msg->hash_val != slurm_conf.hash_val)) {
error("Node %s appears to have a different slurm.conf "
"than the slurmctld. This could cause issues "
"with communication and functionality. "
"Please review both files and make sure they "
"are the same. If this is expected ignore, and "
"set DebugFlags=NO_CONF_HASH in your slurm.conf.",
node_reg_stat_msg->node_name);
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_write_lock);
if (node_reg_stat_msg->dynamic_type &&
(node_reg_stat_msg->flags & SLURMD_REG_FLAG_RESP)) {
if (node_reg_stat_msg->dynamic_type == DYN_NODE_FUTURE) {
int rc;
/*
* dynamic future nodes doesn't know what node
* it's mapped to to be able to load all configs
* in. slurmctld will tell the slurmd what node
* it's mapped to and then the slurmd will then
* load in configuration based off of the mapped
* name and send another registration.
*
* Subsequent slurmd registrations will have the
* mapped node_name.
*/
rc = _find_avail_future_node(msg);
/*
* FUTURE_MAP_SUCCESS: assigned registration to
* a new nodename and the slurmd just needs the
* mapped name so it can register again.
*
* FUTURE_MAP_FAILED: failed to find a future
* not do map to so, just skip validating the
* registration and return to the slurmd.
*
* FUTURE_MAP_EXISTING: the node is already
* mapped and we need to validate the
* registration.
*/
if (rc != FUTURE_MAP_EXISTING) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_write_lock);
if (rc == FUTURE_MAP_FAILED)
error_code = ESLURM_INVALID_NODE_NAME;
goto send_resp;
}
} else if (find_node_record2(
node_reg_stat_msg->node_name)) {
already_registered = true;
} else {
(void) create_dynamic_reg_node(msg);
}
}
validate_jobs_on_node(msg);
error_code = validate_node_specs(msg, &newly_up);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
if (newly_up) {
queue_job_scheduler();
}
}
send_resp:
/* return result */
if (error_code) {
error("%s node=%s: %s",
__func__, node_reg_stat_msg->node_name,
slurm_strerror(error_code));
/*
* Notify slurmd that we got the registration even if we
* consider it to be invalid to avoid having slurmd try to
* register again continuously.
*/
slurm_send_rc_msg(msg, SLURM_SUCCESS);
} else {
debug2("%s complete for %s %s",
__func__, node_reg_stat_msg->node_name, TIME_STR);
/* If the slurmd is requesting a response send it */
if (node_reg_stat_msg->flags & SLURMD_REG_FLAG_RESP) {
slurm_node_reg_resp_msg_t tmp_resp;
memset(&tmp_resp, 0, sizeof(tmp_resp));
/*
* Don't add the assoc_mgr_tres_list here as it could
* get freed later if you do. The pack functions grab
* it for us if it isn't here.
*/
//tmp_resp.tres_list = assoc_mgr_tres_list;
if (node_reg_stat_msg->dynamic_type)
tmp_resp.node_name =
node_reg_stat_msg->node_name;
(void) send_msg_response(msg,
RESPONSE_NODE_REGISTRATION,
&tmp_resp);
} else
slurm_send_rc_msg(msg, SLURM_SUCCESS);
if (!already_registered &&
(node_reg_stat_msg->dynamic_type == DYN_NODE_NORM)) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
/* Must be called outside of locks */
clusteracct_storage_g_cluster_tres(
acct_db_conn, NULL, NULL, 0,
SLURM_PROTOCOL_VERSION);
} else {
do_post_rpc_node_registration = true;
}
}
}
}
/* _slurm_rpc_job_alloc_info - process RPC to get details on existing job */
static void _slurm_rpc_job_alloc_info(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
job_record_t *job_ptr;
DEF_TIMERS;
job_alloc_info_msg_t *job_info_msg = msg->data;
resource_allocation_response_msg_t *job_info_resp_msg;
/* Locks: Read config, job, read node */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_read_lock);
error_code = job_alloc_info(msg->auth_uid, job_info_msg->job_id,
&job_ptr);
END_TIMER2(__func__);
/* return result */
if (error_code || (job_ptr == NULL) || (job_ptr->job_resrcs == NULL)) {
unlock_slurmctld(job_read_lock);
debug2("%s: JobId=%u, uid=%u: %s",
__func__, job_info_msg->job_id, msg->auth_uid,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug("%s: JobId=%u NodeList=%s %s", __func__,
job_info_msg->job_id, job_ptr->nodes, TIME_STR);
job_info_resp_msg = build_job_info_resp(job_ptr);
set_remote_working_response(job_info_resp_msg, job_ptr,
job_info_msg->req_cluster);
unlock_slurmctld(job_read_lock);
(void) send_msg_response(msg, RESPONSE_JOB_ALLOCATION_INFO,
job_info_resp_msg);
/* NULL out msg->working_cluster_rec because it's pointing to
* the global memory */
job_info_resp_msg->working_cluster_rec = NULL;
slurm_free_resource_allocation_response_msg(job_info_resp_msg);
}
}
static void _het_job_alloc_list_del(void *x)
{
resource_allocation_response_msg_t *job_info_resp_msg;
job_info_resp_msg = (resource_allocation_response_msg_t *) x;
/* NULL out msg->working_cluster_rec because it's pointing to
* the global memory */
job_info_resp_msg->working_cluster_rec = NULL;
slurm_free_resource_allocation_response_msg(job_info_resp_msg);
}
/*
* _slurm_rpc_het_job_alloc_info - process RPC to get details on existing
* hetjob.
*/
static void _slurm_rpc_het_job_alloc_info(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
job_record_t *job_ptr, *het_job;
list_itr_t *iter;
void *working_cluster_rec = NULL;
list_t *resp = NULL;
DEF_TIMERS;
job_alloc_info_msg_t *job_info_msg = msg->data;
resource_allocation_response_msg_t *job_info_resp_msg;
/* Locks: Read config, job, read node */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_read_lock);
error_code = job_alloc_info(msg->auth_uid, job_info_msg->job_id,
&job_ptr);
END_TIMER2(__func__);
/* return result */
if ((error_code == SLURM_SUCCESS) && job_ptr &&
(job_ptr->het_job_id && !job_ptr->het_job_list))
error_code = ESLURM_NOT_HET_JOB_LEADER;
if (error_code || (job_ptr == NULL) || (job_ptr->job_resrcs == NULL)) {
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
debug2("%s: JobId=%u, uid=%u: %s",
__func__, job_info_msg->job_id, msg->auth_uid,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
return;
}
debug2("%s: JobId=%u NodeList=%s %s", __func__,
job_info_msg->job_id, job_ptr->nodes, TIME_STR);
if (!job_ptr->het_job_list) {
resp = list_create(_het_job_alloc_list_del);
job_info_resp_msg = build_job_info_resp(job_ptr);
set_remote_working_response(job_info_resp_msg, job_ptr,
job_info_msg->req_cluster);
list_append(resp, job_info_resp_msg);
} else {
resp = list_create(_het_job_alloc_list_del);
iter = list_iterator_create(job_ptr->het_job_list);
while ((het_job = list_next(iter))) {
if (job_ptr->het_job_id != het_job->het_job_id) {
error("%s: Bad het_job_list for %pJ",
__func__, job_ptr);
continue;
}
if (het_job->job_id != job_info_msg->job_id)
(void) job_alloc_info_ptr(msg->auth_uid,
het_job);
job_info_resp_msg = build_job_info_resp(het_job);
if (working_cluster_rec) {
job_info_resp_msg->working_cluster_rec =
working_cluster_rec;
} else {
set_remote_working_response(job_info_resp_msg,
het_job,
job_info_msg->req_cluster);
working_cluster_rec =
job_info_resp_msg->working_cluster_rec;
}
list_append(resp, job_info_resp_msg);
}
list_iterator_destroy(iter);
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
(void) send_msg_response(msg, RESPONSE_HET_JOB_ALLOCATION, resp);
FREE_NULL_LIST(resp);
}
/* _slurm_rpc_job_sbcast_cred - process RPC to get details on existing job
* plus sbcast credential */
static void _slurm_rpc_job_sbcast_cred(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
job_record_t *job_ptr = NULL;
DEF_TIMERS;
step_alloc_info_msg_t *job_info_msg = msg->data;
job_sbcast_cred_msg_t *job_info_resp_msg = NULL;
char job_id_str[64];
/* Locks: Read config, job, read node */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_read_lock);
if (job_info_msg->het_job_offset == NO_VAL) {
error_code = job_alloc_info(msg->auth_uid,
job_info_msg->step_id.job_id,
&job_ptr);
} else {
job_ptr = find_het_job_record(job_info_msg->step_id.job_id,
job_info_msg->het_job_offset);
if (job_ptr) {
job_info_msg->step_id.job_id = job_ptr->job_id;
error_code = job_alloc_info(
msg->auth_uid, job_info_msg->step_id.job_id,
&job_ptr);
} else {
error_code = ESLURM_INVALID_JOB_ID;
}
}
if (error_code)
goto error;
if (!job_ptr) {
error_code = ESLURM_INVALID_JOB_ID;
goto error;
}
if (job_ptr->bit_flags & EXTERNAL_JOB) {
error("%s: job step creation disabled for external jobs",
__func__);
slurm_send_rc_msg(msg, ESLURM_NOT_SUPPORTED);
unlock_slurmctld(job_read_lock);
return;
}
if (job_ptr->bit_flags & STEPMGR_ENABLED) {
slurm_send_reroute_msg(msg, NULL, job_ptr->batch_host);
unlock_slurmctld(job_read_lock);
return;
}
if (!validate_operator(msg->auth_uid) &&
(job_ptr->user_id != msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
goto error;
}
error_code = stepmgr_get_job_sbcast_cred_msg(job_ptr,
&job_info_msg->step_id,
msg->protocol_version,
&job_info_resp_msg);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (error_code)
goto error;
info("%s: %s NodeList=%s - %s",
__func__,
slurm_get_selected_step_id(job_id_str, sizeof(job_id_str),
job_info_msg),
job_info_resp_msg->node_list,
TIME_STR);
(void) send_msg_response(msg, RESPONSE_JOB_SBCAST_CRED,
job_info_resp_msg);
slurm_free_sbcast_cred_msg(job_info_resp_msg);
return;
error:
unlock_slurmctld(job_read_lock);
debug2("%s: JobId=%s, uid=%u: %s",
__func__,
slurm_get_selected_step_id(job_id_str,
sizeof(job_id_str),
job_info_msg),
msg->auth_uid,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
}
static void _slurm_rpc_sbcast_cred_no_job(slurm_msg_t *msg)
{
job_sbcast_cred_msg_t *cred_resp_msg = NULL;
sbcast_cred_req_msg_t *cred_req_msg = msg->data;
sbcast_cred_arg_t sbcast_arg = { 0 };
sbcast_cred_t *sbcast_cred;
hostlist_t *req_node_list;
char *node_name;
bool node_exists = false;
int rc;
DEF_TIMERS;
START_TIMER;
if (!validate_slurm_user(msg->auth_uid)) {
error("%s: sbcast --no-allocation/-Z credential requested from uid '%u' which is not root/SlurmUser",
__func__, msg->auth_uid);
rc = ESLURM_USER_ID_MISSING;
goto fail;
}
req_node_list = hostlist_create(cred_req_msg->node_list);
while ((node_name = hostlist_shift(req_node_list))) {
node_exists = find_node_record(node_name);
if (!node_exists) {
debug("%s: sbcast --nodelist contains at least one invalid node '%s'",
__func__, node_name);
free(node_name);
break;
}
free(node_name);
}
FREE_NULL_HOSTLIST(req_node_list);
if (!node_exists) {
(void) slurm_send_rc_msg(msg, ESLURM_INVALID_NODE_NAME);
return;
}
sbcast_arg.nodes = cred_req_msg->node_list;
sbcast_arg.expiration = time(NULL) + HOUR_SECONDS;
if (!(sbcast_cred = create_sbcast_cred(&sbcast_arg, msg->auth_uid,
msg->auth_gid,
msg->protocol_version))) {
error("%s: Could not create sbcast cred for --no-allocate/-Z request",
__func__);
rc = SLURM_ERROR;
goto fail;
}
END_TIMER2(__func__);
cred_resp_msg = xmalloc(sizeof(*cred_resp_msg));
cred_resp_msg->job_id = NO_VAL;
cred_resp_msg->node_list = xstrdup(cred_req_msg->node_list);
cred_resp_msg->sbcast_cred = sbcast_cred;
(void) send_msg_response(msg, RESPONSE_JOB_SBCAST_CRED, cred_resp_msg);
slurm_free_sbcast_cred_msg(cred_resp_msg);
return;
fail:
END_TIMER2(__func__);
(void) slurm_send_rc_msg(msg, rc);
}
/* _slurm_rpc_ping - process ping RPC */
static void _slurm_rpc_ping(slurm_msg_t *msg)
{
/* We could authenticate here, if desired */
/* return result */
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_config_request(slurm_msg_t *msg)
{
config_request_msg_t *req = msg->data;
DEF_TIMERS;
START_TIMER;
if (!running_configless) {
error("%s: Rejected request as configless is disabled",
__func__);
slurm_send_rc_msg(msg, ESLURM_CONFIGLESS_DISABLED);
return;
}
if ((req->flags & CONFIG_REQUEST_SLURMD) &&
!validate_slurm_user(msg->auth_uid)) {
error("%s: Rejected request for slurmd configs by uid=%u",
__func__, msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
}
END_TIMER2(__func__);
slurm_rwlock_rdlock(&configless_lock);
{
void *data = config_for_clients;
if (req->flags & CONFIG_REQUEST_SLURMD)
data = config_for_slurmd;
(void) send_msg_response(msg, RESPONSE_CONFIG, data);
}
slurm_rwlock_unlock(&configless_lock);
if (req->flags & CONFIG_REQUEST_SACKD)
sackd_mgr_add_node(msg, req->port);
}
/* _slurm_rpc_reconfigure_controller - process RPC to re-initialize
* slurmctld from configuration file
* Anything you add to this function must be added to the
* slurm_reconfigure function inside controller.c try
* to keep these in sync.
*/
static void _slurm_rpc_reconfigure_controller(slurm_msg_t *msg)
{
if (!validate_super_user(msg->auth_uid)) {
error("Security violation, RECONFIGURE RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
conn_g_destroy(msg->tls_conn, true);
msg->tls_conn = NULL;
slurm_free_msg(msg);
return;
} else
info("Processing Reconfiguration Request");
reconfigure_slurm(msg);
}
/* _slurm_rpc_takeover - process takeover RPC */
static void _slurm_rpc_takeover(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
/* We could authenticate here, if desired */
if (!validate_super_user(msg->auth_uid)) {
error("Security violation, TAKEOVER RPC from uid=%u",
msg->auth_uid);
error_code = ESLURM_USER_ID_MISSING;
} else {
/* takeover is not possible in controller mode */
/* return success */
info("Performing RPC: REQUEST_TAKEOVER : "
"already in controller mode - skipping");
}
slurm_send_rc_msg(msg, error_code);
}
static void _slurm_rpc_request_control(slurm_msg_t *msg)
{
time_t now = time(NULL);
struct timespec ts = {0, 0};
if (!validate_super_user(msg->auth_uid)) {
error("Security violation, REQUEST_CONTROL RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
}
info("Performing RPC: REQUEST_CONTROL");
slurm_mutex_lock(&slurmctld_config.backup_finish_lock);
/* resume backup mode */
slurmctld_config.resume_backup = true;
/* do RPC call */
if (slurmctld_config.shutdown_time) {
debug2("REQUEST_CONTROL RPC issued when already in progress");
} else {
/* signal clean-up */
pthread_kill(pthread_self(), SIGTERM);
}
/* save_all_state(); performed by _slurmctld_background */
/*
* Wait for the backup to dump state and finish up everything.
* This should happen in _slurmctld_background and then release
* once we know for sure we are in backup mode in run_backup().
* Here we will wait CONTROL_TIMEOUT - 1 before we reply.
*/
ts.tv_sec = now + CONTROL_TIMEOUT - 1;
slurm_cond_timedwait(&slurmctld_config.backup_finish_cond,
&slurmctld_config.backup_finish_lock, &ts);
slurm_mutex_unlock(&slurmctld_config.backup_finish_lock);
/*
* jobcomp/elasticsearch saves/loads the state to/from file
* elasticsearch_state. Since the jobcomp API isn't designed with
* save/load state operations, the jobcomp/elasticsearch _save_state()
* is highly coupled to its fini() function. This state doesn't follow
* the same execution path as the rest of Slurm states, where in
* save_all_sate() they are all independently scheduled. So we save
* it manually here.
*/
jobcomp_g_fini();
if (slurmctld_config.resume_backup)
error("%s: REQUEST_CONTROL reply but backup not completely done relinquishing control. Old state possible", __func__);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* _slurm_rpc_shutdown_controller - process RPC to shutdown slurmctld */
static void _slurm_rpc_shutdown_controller(slurm_msg_t *msg)
{
shutdown_msg_t *shutdown_msg = msg->data;
if (!validate_super_user(msg->auth_uid)) {
error("Security violation, SHUTDOWN RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
}
info("Performing RPC: REQUEST_SHUTDOWN");
if (slurmctld_config.shutdown_time)
debug2("shutdown RPC issued when already in progress");
else {
if (shutdown_msg->options == SLURMCTLD_SHUTDOWN_ALL) {
slurmctld_lock_t node_read_lock = { .node = READ_LOCK };
lock_slurmctld(node_read_lock);
msg_to_slurmd(REQUEST_SHUTDOWN);
unlock_slurmctld(node_read_lock);
}
/* signal clean-up */
pthread_kill(pthread_self(), SIGTERM);
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static int _foreach_step_match_containerid(void *x, void *arg)
{
find_job_by_container_id_args_t *args = arg;
step_record_t *step_ptr = x;
slurm_step_id_t *step_id;
if (xstrcmp(args->id, step_ptr->container_id))
return SLURM_SUCCESS;
step_id = xmalloc(sizeof(*step_id));
*step_id = step_ptr->step_id;
list_append(args->step_list, step_id);
return SLURM_SUCCESS;
}
static int _foreach_job_filter_steps(void *x, void *arg)
{
find_job_by_container_id_args_t *args = arg;
job_record_t *job_ptr = x;
if ((slurm_conf.private_data & PRIVATE_DATA_JOBS) &&
(job_ptr->user_id != args->request_uid) &&
!validate_operator(args->request_uid)) {
if (slurm_mcs_get_privatedata()) {
if (mcs_g_check_mcs_label(args->request_uid,
job_ptr->mcs_label, false))
return SLURM_SUCCESS;
} else if (!assoc_mgr_is_user_acct_coord(acct_db_conn,
args->request_uid,
job_ptr->account,
false)) {
return SLURM_SUCCESS;
}
}
if ((args->uid != SLURM_AUTH_NOBODY) &&
(args->uid != job_ptr->user_id)) {
/* skipping per non-matching user */
return SLURM_SUCCESS;
}
/* walk steps for matching container_id */
if (list_for_each_ro(job_ptr->step_list,
_foreach_step_match_containerid,
args) < 0)
return SLURM_ERROR;
return SLURM_SUCCESS;
}
static void _find_stepids_by_container_id(uid_t request_uid, uid_t uid,
const char *id, list_t **step_list)
{
slurmctld_lock_t job_read_lock =
{ .conf = READ_LOCK, .job = READ_LOCK };
find_job_by_container_id_args_t args =
{ .request_uid = request_uid, .uid = uid, .id = id };
DEF_TIMERS;
xassert(id && id[0]);
if (!*step_list)
*step_list = list_create((ListDelF) slurm_free_step_id);
args.step_list = *step_list;
START_TIMER;
lock_slurmctld(job_read_lock);
list_for_each_ro(job_list, _foreach_job_filter_steps, &args);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
}
static void _slurm_rpc_step_by_container_id(slurm_msg_t *msg)
{
container_id_request_msg_t *req = msg->data;
container_id_response_msg_t resp = {0};
int rc = SLURM_UNEXPECTED_MSG_ERROR;
log_flag(PROTOCOL, "%s: got REQUEST_STEP_BY_CONTAINER_ID from %s auth_uid=%u flags=0x%x uid=%u container_id=%s",
__func__, (msg->auth_ids_set ? "validated" : "suspect"),
msg->auth_uid, req->show_flags, req->uid, req->container_id);
if (!msg->auth_ids_set) {
/* this should never happen? */
rc = ESLURM_AUTH_CRED_INVALID;
} else if (!req->container_id || !req->container_id[0]) {
rc = ESLURM_INVALID_CONTAINER_ID;
} else {
if (req->container_id && req->container_id[0])
_find_stepids_by_container_id(msg->auth_uid, req->uid,
req->container_id,
&resp.steps);
(void) send_msg_response(msg, RESPONSE_STEP_BY_CONTAINER_ID,
&resp);
return;
}
slurm_send_rc_msg(msg, rc);
}
/* _slurm_rpc_step_complete - process step completion RPC to note the
* completion of a job step on at least some nodes.
* If the job step is complete, it may
* represent the termination of an entire job step */
static void _slurm_rpc_step_complete(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int rc, rem;
uint32_t step_rc;
DEF_TIMERS;
step_complete_msg_t *req = msg->data;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
/* init */
START_TIMER;
log_flag(STEPS, "Processing RPC details: REQUEST_STEP_COMPLETE for %ps nodes %u-%u rc=%u(%s)",
&req->step_id, req->range_first, req->range_last,
req->step_rc, slurm_strerror(req->step_rc));
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
}
rc = step_partial_comp(req, msg->auth_uid, true, &rem, &step_rc);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
END_TIMER2(__func__);
log_flag(STEPS, "%s: %ps rc:%s %s",
__func__, &req->step_id, slurm_strerror(rc), TIME_STR);
/* return result */
(void) slurm_send_rc_msg(msg, rc);
if (rc == SLURM_SUCCESS)
(void) schedule_job_save(); /* Has own locking */
}
/* _slurm_rpc_step_layout - return the step layout structure for
* a job step, if it currently exists
*/
static void _slurm_rpc_step_layout(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
slurm_step_id_t *req = msg->data;
slurm_step_layout_t *step_layout = NULL;
/* Locks: Read config job, write node */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
job_record_t *job_ptr = NULL;
START_TIMER;
lock_slurmctld(job_read_lock);
error_code = job_alloc_info(msg->auth_uid, req->job_id, &job_ptr);
END_TIMER2(__func__);
/* return result */
if (error_code || (job_ptr == NULL)) {
unlock_slurmctld(job_read_lock);
if (error_code == ESLURM_ACCESS_DENIED) {
error("Security violation, REQUEST_STEP_LAYOUT for JobId=%u from uid=%u",
req->job_id, msg->auth_uid);
} else {
log_flag(STEPS, "%s: JobId=%u, uid=%u: %s",
__func__, req->job_id, msg->auth_uid,
slurm_strerror(error_code));
}
slurm_send_rc_msg(msg, error_code);
return;
}
if (job_ptr->bit_flags & STEPMGR_ENABLED) {
slurm_send_reroute_msg(msg, NULL, job_ptr->batch_host);
unlock_slurmctld(job_read_lock);
return;
}
error_code = stepmgr_get_step_layouts(job_ptr, req, &step_layout);
unlock_slurmctld(job_read_lock);
if (error_code) {
slurm_send_rc_msg(msg, error_code);
return;
}
(void) send_msg_response(msg, RESPONSE_STEP_LAYOUT, step_layout);
slurm_step_layout_destroy(step_layout);
}
/* _slurm_rpc_step_update - update a job step
*/
static void _slurm_rpc_step_update(slurm_msg_t *msg)
{
DEF_TIMERS;
job_record_t *job_ptr;
step_update_request_msg_t *req = msg->data;
/* Locks: Write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
int rc;
START_TIMER;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(req->job_id);
if (job_ptr == NULL) {
error("%s: invalid JobId=%u", __func__, req->job_id);
rc = ESLURM_INVALID_JOB_ID;
goto fail;
}
if ((job_ptr->user_id != msg->auth_uid) &&
!validate_operator(msg->auth_uid) &&
!assoc_mgr_is_user_acct_coord(acct_db_conn, msg->auth_uid,
job_ptr->account, false)) {
error("Security violation, STEP_UPDATE RPC from uid %u",
msg->auth_uid);
rc = ESLURM_USER_ID_MISSING;
goto fail;
}
rc = update_step(req, msg->auth_uid);
fail:
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
slurm_send_rc_msg(msg, rc);
}
/* _slurm_rpc_submit_batch_job - process RPC to submit a batch job */
static void _slurm_rpc_submit_batch_job(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
uint32_t job_id = 0, priority = 0;
job_record_t *job_ptr = NULL;
job_desc_msg_t *job_desc_msg = msg->data;
/* Locks: Read config, read job, read node, read partition */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
/* Locks: Read config, write job, write node, read partition, read
* federation */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
char *err_msg = NULL, *job_submit_user_msg = NULL;
bool reject_job = false;
START_TIMER;
if (slurmctld_config.submissions_disabled) {
info("Submissions disabled on system");
error_code = ESLURM_SUBMISSIONS_DISABLED;
reject_job = true;
goto send_msg;
}
if ((error_code = _valid_id("REQUEST_SUBMIT_BATCH_JOB", job_desc_msg,
msg->auth_uid, msg->auth_gid,
msg->protocol_version))) {
reject_job = true;
goto send_msg;
}
_set_hostname(msg, &job_desc_msg->alloc_node);
_set_identity(msg, &job_desc_msg->id);
if ((job_desc_msg->alloc_node == NULL) ||
(job_desc_msg->alloc_node[0] == '\0')) {
error_code = ESLURM_INVALID_NODE_NAME;
error("REQUEST_SUBMIT_BATCH_JOB lacks alloc_node from uid=%u",
msg->auth_uid);
}
dump_job_desc(job_desc_msg);
if (error_code == SLURM_SUCCESS) {
/* Locks are for job_submit plugin use */
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(job_read_lock);
job_desc_msg->het_job_offset = NO_VAL;
error_code = validate_job_create_req(job_desc_msg,
msg->auth_uid, &err_msg);
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(job_read_lock);
}
/*
* In validate_job_create_req(), err_msg is currently only modified in
* the call to job_submit_g_submit. We save the err_msg in a temp
* char *job_submit_user_msg because err_msg can be overwritten later
* in the calls to fed_mgr_job_allocate and/or job_allocate, and we
* need the job submit plugin value to build the resource allocation
* response in the call to build_alloc_msg.
*/
if (err_msg) {
job_submit_user_msg = err_msg;
err_msg = NULL;
}
if (error_code) {
reject_job = true;
goto send_msg;
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
}
START_TIMER; /* Restart after we have locks */
if (fed_mgr_fed_rec) {
if (fed_mgr_job_allocate(msg, job_desc_msg, false,
&job_id, &error_code, &err_msg))
reject_job = true;
} else {
/* Create new job allocation */
job_desc_msg->het_job_offset = NO_VAL;
error_code = job_allocate(job_desc_msg,
job_desc_msg->immediate,
false, NULL, 0, msg->auth_uid, false,
&job_ptr, &err_msg,
msg->protocol_version);
if (!job_ptr ||
(error_code && job_ptr->job_state == JOB_FAILED))
reject_job = true;
else {
job_id = job_ptr->job_id;
priority = job_ptr->priority;
}
if (job_desc_msg->immediate &&
(error_code != SLURM_SUCCESS)) {
error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
reject_job = true;
}
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
}
send_msg:
END_TIMER2(__func__);
if (reject_job) {
info("%s: %s", __func__, slurm_strerror(error_code));
/*
* If job is rejected, add the job submit message to the error
* message to avoid it getting lost. Was saved off earlier.
*/
if (job_submit_user_msg) {
char *tmp_err_msg = err_msg;
err_msg = job_submit_user_msg;
job_submit_user_msg = NULL;
if (tmp_err_msg) {
xstrfmtcat(err_msg, "\n%s", tmp_err_msg);
xfree(tmp_err_msg);
}
}
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
} else {
submit_response_msg_t submit_msg = {
.job_id = job_id,
.step_id = SLURM_BATCH_SCRIPT,
.error_code = error_code,
.job_submit_user_msg = job_submit_user_msg,
};
info("%s: JobId=%u InitPrio=%u %s",
__func__, job_id, priority, TIME_STR);
/* send job_ID */
(void) send_msg_response(msg, RESPONSE_SUBMIT_BATCH_JOB,
&submit_msg);
schedule_job_save(); /* Has own locks */
schedule_node_save(); /* Has own locks */
queue_job_scheduler();
}
xfree(err_msg);
xfree(job_submit_user_msg);
}
/* _slurm_rpc_submit_batch_het_job - process RPC to submit a batch hetjob */
static void _slurm_rpc_submit_batch_het_job(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
list_itr_t *iter;
int error_code = SLURM_SUCCESS, alloc_only = 0;
DEF_TIMERS;
uint32_t het_job_id = 0, het_job_offset = 0;
job_record_t *job_ptr = NULL, *first_job_ptr = NULL;
job_desc_msg_t *job_desc_msg;
char *script = NULL;
/* Locks: Read config, read job, read node, read partition */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
/* Locks: Read config, write job, write node, read partition, read fed */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
list_t *job_req_list = msg->data;
uint32_t job_uid = NO_VAL;
char *err_msg = NULL, *job_submit_user_msg = NULL;
bool reject_job = false;
list_t *submit_job_list = NULL;
hostset_t *jobid_hostset = NULL;
char tmp_str[32];
char *het_job_id_set = NULL;
START_TIMER;
if (!job_req_list || (list_count(job_req_list) == 0)) {
info("REQUEST_SUBMIT_BATCH_HET_JOB from uid=%u with empty job list",
msg->auth_uid);
error_code = SLURM_ERROR;
reject_job = true;
goto send_msg;
}
if (!_sched_backfill()) {
info("REQUEST_SUBMIT_BATCH_HET_JOB from uid=%u rejected as sched/backfill is not configured",
msg->auth_uid);
error_code = ESLURM_NOT_SUPPORTED;
reject_job = true;
goto send_msg;
}
if (slurmctld_config.submissions_disabled) {
info("Submissions disabled on system");
error_code = ESLURM_SUBMISSIONS_DISABLED;
reject_job = true;
goto send_msg;
}
if (!job_req_list || (list_count(job_req_list) == 0)) {
info("REQUEST_SUBMIT_BATCH_HET_JOB from uid=%u with empty job list",
msg->auth_uid);
error_code = SLURM_ERROR;
reject_job = true;
goto send_msg;
}
/*
* If any job component has required nodes, those nodes must be excluded
* from all other components to avoid scheduling deadlock
*/
_exclude_het_job_nodes(job_req_list);
/* Validate the individual request */
lock_slurmctld(job_read_lock); /* Locks for job_submit plugin use */
iter = list_iterator_create(job_req_list);
while ((job_desc_msg = list_next(iter))) {
if (job_uid == NO_VAL)
job_uid = job_desc_msg->user_id;
if ((error_code = _valid_id("REQUEST_SUBMIT_BATCH_JOB",
job_desc_msg, msg->auth_uid,
msg->auth_gid,
msg->protocol_version))) {
reject_job = true;
break;
}
_set_hostname(msg, &job_desc_msg->alloc_node);
_set_identity(msg, &job_desc_msg->id);
if ((job_desc_msg->alloc_node == NULL) ||
(job_desc_msg->alloc_node[0] == '\0')) {
error("REQUEST_SUBMIT_BATCH_HET_JOB lacks alloc_node from uid=%u",
msg->auth_uid);
error_code = ESLURM_INVALID_NODE_NAME;
break;
}
dump_job_desc(job_desc_msg);
job_desc_msg->het_job_offset = het_job_offset;
error_code = validate_job_create_req(job_desc_msg,
msg->auth_uid,
&err_msg);
if (err_msg) {
char *save_ptr = NULL, *tok;
tok = strtok_r(err_msg, "\n", &save_ptr);
while (tok) {
char *sep = "";
if (job_submit_user_msg)
sep = "\n";
xstrfmtcat(job_submit_user_msg, "%s%d: %s",
sep, het_job_offset, tok);
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(err_msg);
}
if (error_code != SLURM_SUCCESS) {
reject_job = true;
break;
}
/* license request allowed only on leader */
if (het_job_offset && job_desc_msg->licenses) {
xstrfmtcat(job_submit_user_msg,
"%s%d: license request allowed only on leader job",
job_submit_user_msg ? "\n" : "",
het_job_offset);
error("REQUEST_SUBMIT_BATCH_HET_JOB from uid=%u, license request on non-leader job",
msg->auth_uid);
error_code = ESLURM_INVALID_LICENSES;
reject_job = true;
break;
}
het_job_offset++;
}
list_iterator_destroy(iter);
unlock_slurmctld(job_read_lock);
if (error_code != SLURM_SUCCESS)
goto send_msg;
/*
* In validate_job_create_req, err_msg is currently only modified in
* the call to job_submit_g_submit. We save the err_msg in a temp
* char *job_submit_user_msg because err_msg can be overwritten later
* in the calls to job_allocate, and we need the job submit plugin value
* to build the resource allocation response in the call to
* build_alloc_msg.
*/
if (err_msg) {
job_submit_user_msg = err_msg;
err_msg = NULL;
}
/* Create new job allocations */
submit_job_list = list_create(NULL);
het_job_offset = 0;
_throttle_start(&active_rpc_cnt);
lock_slurmctld(job_write_lock);
START_TIMER; /* Restart after we have locks */
iter = list_iterator_create(job_req_list);
while ((job_desc_msg = list_next(iter))) {
if (!script)
script = xstrdup(job_desc_msg->script);
if (het_job_offset && job_desc_msg->script) {
info("%s: Hetjob %u offset %u has script, being ignored",
__func__, het_job_id, het_job_offset);
xfree(job_desc_msg->script);
}
if (het_job_offset) {
/*
* Email notifications disabled except for
* hetjob leader
*/
job_desc_msg->mail_type = 0;
xfree(job_desc_msg->mail_user);
}
if (!job_desc_msg->burst_buffer) {
xfree(job_desc_msg->script);
if (!(job_desc_msg->script = bb_g_build_het_job_script(
script, het_job_offset))) {
error_code =
ESLURM_INVALID_BURST_BUFFER_REQUEST;
reject_job = true;
break;
}
}
job_desc_msg->het_job_offset = het_job_offset;
error_code = job_allocate(job_desc_msg,
job_desc_msg->immediate, false,
NULL, alloc_only, msg->auth_uid,
false, &job_ptr, &err_msg,
msg->protocol_version);
if (!job_ptr ||
(error_code && job_ptr->job_state == JOB_FAILED)) {
reject_job = true;
} else {
if (het_job_id == 0) {
het_job_id = job_ptr->job_id;
first_job_ptr = job_ptr;
alloc_only = 1;
}
snprintf(tmp_str, sizeof(tmp_str), "%u",
job_ptr->job_id);
if (jobid_hostset)
hostset_insert(jobid_hostset, tmp_str);
else
jobid_hostset = hostset_create(tmp_str);
job_ptr->het_job_id = het_job_id;
job_ptr->het_job_offset = het_job_offset++;
job_ptr->batch_flag = 1;
on_job_state_change(job_ptr, job_ptr->job_state);
list_append(submit_job_list, job_ptr);
}
if (job_desc_msg->immediate &&
(error_code != SLURM_SUCCESS)) {
error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
reject_job = true;
}
if (reject_job)
break;
}
list_iterator_destroy(iter);
xfree(script);
if ((het_job_id == 0) && !reject_job) {
info("%s: No error, but no het_job_id", __func__);
error_code = SLURM_ERROR;
reject_job = true;
}
/* Validate limits on hetjob as a whole */
if (!reject_job &&
(accounting_enforce & ACCOUNTING_ENFORCE_LIMITS) &&
!acct_policy_validate_het_job(submit_job_list)) {
info("Hetjob JobId=%u exceeded association/QOS limit for user %u",
het_job_id, job_uid);
error_code = ESLURM_ACCOUNTING_POLICY;
reject_job = true;
}
_create_het_job_id_set(jobid_hostset, het_job_offset,
&het_job_id_set);
if (first_job_ptr)
first_job_ptr->het_job_list = submit_job_list;
iter = list_iterator_create(submit_job_list);
while ((job_ptr = list_next(iter))) {
job_ptr->het_job_id_set = xstrdup(het_job_id_set);
if (error_code == SLURM_SUCCESS)
log_flag(HETJOB, "Submit %pJ", job_ptr);
}
list_iterator_destroy(iter);
xfree(het_job_id_set);
if (reject_job && submit_job_list) {
(void) list_for_each(submit_job_list, _het_job_cancel, NULL);
if (!first_job_ptr)
FREE_NULL_LIST(submit_job_list);
}
unlock_slurmctld(job_write_lock);
_throttle_fini(&active_rpc_cnt);
send_msg:
END_TIMER2(__func__);
if (reject_job) {
info("%s: %s", __func__, slurm_strerror(error_code));
/*
* If job is rejected, add the job submit message to the error
* message to avoid it getting lost. Was saved off earlier.
*/
if (job_submit_user_msg) {
char *tmp_err_msg = err_msg;
err_msg = job_submit_user_msg;
job_submit_user_msg = NULL;
if (tmp_err_msg) {
xstrfmtcat(err_msg, "\n%s", tmp_err_msg);
xfree(tmp_err_msg);
}
}
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
} else {
submit_response_msg_t submit_msg = {
.job_id = het_job_id,
.step_id = SLURM_BATCH_SCRIPT,
.error_code = error_code,
.job_submit_user_msg = job_submit_user_msg,
};
info("%s: JobId=%u %s", __func__, het_job_id, TIME_STR);
/* send job_ID */
(void) send_msg_response(msg, RESPONSE_SUBMIT_BATCH_JOB,
&submit_msg);
schedule_job_save(); /* Has own locks */
}
if (jobid_hostset)
hostset_destroy(jobid_hostset);
xfree(err_msg);
xfree(job_submit_user_msg);
}
/* _slurm_rpc_update_job - process RPC to update the configuration of a
* job (e.g. priority)
*/
static void _slurm_rpc_update_job(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
job_desc_msg_t *job_desc_msg = msg->data;
/* Locks: Read config, write job, write node, read partition, read fed*/
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid = msg->auth_uid;
lock_slurmctld(fed_read_lock);
if (!_route_msg_to_origin(msg, job_desc_msg->job_id_str,
job_desc_msg->job_id)) {
unlock_slurmctld(fed_read_lock);
return;
}
unlock_slurmctld(fed_read_lock);
START_TIMER;
/* job_desc_msg->user_id is set when the uid has been overridden with
* -u <uid> or --uid=<uid>. NO_VAL is default. Verify the request has
* come from an admin */
if (job_desc_msg->user_id != SLURM_AUTH_NOBODY) {
if (!validate_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, REQUEST_UPDATE_JOB RPC from uid=%u",
uid);
/* Send back the error message for this case because
* update_job also sends back an error message */
slurm_send_rc_msg(msg, error_code);
} else {
/* override uid allowed */
uid = job_desc_msg->user_id;
}
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
dump_job_desc(job_desc_msg);
/* Ensure everything that may be written to database is lower
* case */
xstrtolower(job_desc_msg->account);
xstrtolower(job_desc_msg->wckey);
/*
* Use UID provided by scontrol. May be overridden with
* -u <uid> or --uid=<uid>
*/
lock_slurmctld(job_write_lock);
if (job_desc_msg->job_id_str)
error_code = update_job_str(msg, uid);
else
error_code = update_job(msg, uid, true);
unlock_slurmctld(job_write_lock);
}
END_TIMER2(__func__);
/* return result */
if (error_code) {
if (job_desc_msg->job_id_str) {
info("%s: JobId=%s uid=%u: %s",
__func__, job_desc_msg->job_id_str, uid,
slurm_strerror(error_code));
} else {
info("%s: JobId=%u uid=%u: %s",
__func__, job_desc_msg->job_id, uid,
slurm_strerror(error_code));
}
} else {
if (job_desc_msg->job_id_str) {
info("%s: complete JobId=%s uid=%u %s",
__func__, job_desc_msg->job_id_str, uid, TIME_STR);
} else {
info("%s: complete JobId=%u uid=%u %s",
__func__, job_desc_msg->job_id, uid, TIME_STR);
}
/* Below functions provide their own locking */
schedule_job_save();
schedule_node_save();
queue_job_scheduler();
}
}
/*
* _slurm_rpc_create_node - process RPC to create node(s).
*/
static void _slurm_rpc_create_node(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_node_msg_t *node_msg = msg->data;
char *err_msg = NULL;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, %s RPC from uid=%u",
rpc_num2string(msg->msg_type), msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
error_code = create_nodes(node_msg, &err_msg);
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
info("%s for %s: %s",
__func__, node_msg->node_names,
slurm_strerror(error_code));
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s complete for %s %s",
__func__, node_msg->node_names, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
xfree(err_msg);
/* Below functions provide their own locks */
schedule_node_save();
validate_all_reservations(false, false);
queue_job_scheduler();
trigger_reconfig();
}
/*
* _slurm_rpc_update_node - process RPC to update the configuration of a
* node (e.g. UP/DOWN)
*/
static void _slurm_rpc_update_node(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_node_msg_t *update_node_msg_ptr = msg->data;
/* Locks: Write job, partition and node */
slurmctld_lock_t node_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK };
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, UPDATE_NODE RPC from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(node_write_lock);
error_code = update_node(update_node_msg_ptr, msg->auth_uid);
unlock_slurmctld(node_write_lock);
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
info("%s for %s: %s",
__func__, update_node_msg_ptr->node_names,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s complete for %s %s",
__func__, update_node_msg_ptr->node_names, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* Below functions provide their own locks */
schedule_node_save();
validate_all_reservations(false, false);
queue_job_scheduler();
trigger_reconfig();
}
/*
* _slurm_rpc_delete_node - process RPC to delete node.
*/
static void _slurm_rpc_delete_node(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
update_node_msg_t *node_msg = msg->data;
char *err_msg = NULL;
DEF_TIMERS;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, DELETE_NODE RPC from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
error_code = delete_nodes(node_msg->node_names, &err_msg);
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
info("%s for %s: %s",
__func__, node_msg->node_names,
slurm_strerror(error_code));
if (err_msg)
slurm_send_rc_err_msg(msg, error_code, err_msg);
else
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s complete for %s %s",
__func__, node_msg->node_names, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
xfree(err_msg);
/* Below functions provide their own locks */
schedule_node_save();
validate_all_reservations(false, false);
queue_job_scheduler();
trigger_reconfig();
}
/* _slurm_rpc_update_partition - process RPC to update the configuration
* of a partition (e.g. UP/DOWN) */
static void _slurm_rpc_update_partition(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_part_msg_t *part_desc_ptr = msg->data;
/* Locks: Read config, write job, write node, write partition
* NOTE: job write lock due to gang scheduler support */
slurmctld_lock_t part_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, UPDATE_PARTITION RPC from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
if (msg->msg_type == REQUEST_CREATE_PARTITION) {
lock_slurmctld(part_write_lock);
error_code = update_part(part_desc_ptr, true);
unlock_slurmctld(part_write_lock);
} else {
lock_slurmctld(part_write_lock);
error_code = update_part(part_desc_ptr, false);
unlock_slurmctld(part_write_lock);
}
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
info("%s partition=%s: %s",
__func__, part_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("%s complete for %s %s",
__func__, part_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
schedule_part_save(); /* Has its locking */
queue_job_scheduler();
}
}
/* _slurm_rpc_delete_partition - process RPC to delete a partition */
static void _slurm_rpc_delete_partition(slurm_msg_t *msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
delete_part_msg_t *part_desc_ptr = msg->data;
/* Locks: write job, write node, write partition */
slurmctld_lock_t part_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, DELETE_PARTITION RPC from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(part_write_lock);
error_code = delete_partition(part_desc_ptr);
unlock_slurmctld(part_write_lock);
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
info("%s partition=%s: %s",
__func__, part_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info("%s complete for %s %s",
__func__, part_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
save_all_state(); /* Has own locking */
queue_job_scheduler();
}
}
/* _slurm_rpc_resv_create - process RPC to create a reservation */
static void _slurm_rpc_resv_create(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
char *err_msg = NULL;
DEF_TIMERS;
resv_desc_msg_t *resv_desc_ptr = msg->data;
/* Locks: read config, read job, write node, read partition */
slurmctld_lock_t node_write_lock = {
READ_LOCK, READ_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
if (!validate_operator(msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, CREATE_RESERVATION RPC from uid=%u",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(node_write_lock);
error_code = create_resv(resv_desc_ptr, &err_msg);
unlock_slurmctld(node_write_lock);
END_TIMER2(__func__);
}
/* return result */
if (error_code) {
if (resv_desc_ptr->name) {
info("%s reservation=%s: %s",
__func__, resv_desc_ptr->name,
slurm_strerror(error_code));
} else {
info("%s: %s", __func__, slurm_strerror(error_code));
}
slurm_send_rc_err_msg(msg, error_code, err_msg);
} else {
reservation_name_msg_t resv_resp_msg = {
.name = resv_desc_ptr->name,
};
debug2("%s complete for %s %s",
__func__, resv_desc_ptr->name, TIME_STR);
/* send reservation name */
(void) send_msg_response(msg, RESPONSE_CREATE_RESERVATION,
&resv_resp_msg);
queue_job_scheduler();
}
xfree(err_msg);
}
/* _slurm_rpc_resv_update - process RPC to update a reservation */
static void _slurm_rpc_resv_update(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
char *err_msg = NULL;
DEF_TIMERS;
resv_desc_msg_t *resv_desc_ptr = msg->data;
/* Locks: read config, read job, write node, read partition */
slurmctld_lock_t node_write_lock = {
READ_LOCK, READ_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(node_write_lock);
if (!validate_operator(msg->auth_uid)) {
if (!validate_resv_uid(resv_desc_ptr->name, msg->auth_uid) ||
!(resv_desc_ptr->flags & RESERVE_FLAG_SKIP)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, UPDATE_RESERVATION RPC from uid=%u",
msg->auth_uid);
} else {
resv_desc_msg_t *resv_desc_ptr2 =
xmalloc_nz(sizeof(*resv_desc_ptr2));
slurm_init_resv_desc_msg(resv_desc_ptr2);
/*
* Sanitize the structure since a regular user is doing
* this and is only allowed to skip the reservation and
* not update anything else.
*/
resv_desc_ptr2->name = resv_desc_ptr->name;
resv_desc_ptr->name = NULL;
resv_desc_ptr2->flags = RESERVE_FLAG_SKIP;
slurm_free_resv_desc_msg(resv_desc_ptr);
resv_desc_ptr = msg->data = resv_desc_ptr2;
}
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
error_code = update_resv(resv_desc_ptr, &err_msg);
END_TIMER2(__func__);
}
unlock_slurmctld(node_write_lock);
/* return result */
if (error_code) {
info("%s reservation=%s: %s",
__func__, resv_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_err_msg(msg, error_code, err_msg);
} else {
debug2("%s complete for %s %s",
__func__, resv_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
queue_job_scheduler();
}
xfree(err_msg);
}
/* _slurm_rpc_resv_delete - process RPC to delete a reservation */
static void _slurm_rpc_resv_delete(slurm_msg_t *msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
reservation_name_msg_t *resv_desc_ptr = msg->data;
/* Locks: write job, write node */
slurmctld_lock_t node_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
/* node_write_lock needed for validate_resv_uid */
lock_slurmctld(node_write_lock);
if (!validate_operator(msg->auth_uid) &&
!validate_resv_uid(resv_desc_ptr->name, msg->auth_uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, DELETE_RESERVATION RPC from uid=%u",
msg->auth_uid);
} else if (!resv_desc_ptr->name) {
error_code = ESLURM_INVALID_PARTITION_NAME;
error("Invalid DELETE_RESERVATION RPC from uid=%u, name is null",
msg->auth_uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
error_code = delete_resv(resv_desc_ptr);
END_TIMER2(__func__);
}
unlock_slurmctld(node_write_lock);
/* return result */
if (error_code) {
info("%s reservation=%s: %s",
__func__, resv_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info("%s complete for %s %s",
__func__, resv_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
queue_job_scheduler();
}
}
/* _slurm_rpc_resv_show - process RPC to dump reservation info */
static void _slurm_rpc_resv_show(slurm_msg_t *msg)
{
resv_info_request_msg_t *resv_req_msg = msg->data;
buf_t *buffer = NULL;
DEF_TIMERS;
/* Locks: read node */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
if ((resv_req_msg->last_update - 1) >= last_resv_update) {
debug2("%s, no change", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
} else {
lock_slurmctld(node_read_lock);
buffer = show_resv(msg->auth_uid, msg->protocol_version);
unlock_slurmctld(node_read_lock);
END_TIMER2(__func__);
/* send message */
(void) send_msg_response(msg, RESPONSE_RESERVATION_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
}
static void _slurm_rpc_node_registration_status(slurm_msg_t *msg)
{
error("slurmctld is talking with itself. SlurmctldPort == SlurmdPort");
slurm_send_rc_msg(msg, EINVAL);
}
/* determine of nodes are ready for the job */
static void _slurm_rpc_job_ready(slurm_msg_t *msg)
{
int error_code, result;
job_id_msg_t *id_msg = msg->data;
DEF_TIMERS;
/* Locks: read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_read_lock);
error_code = job_node_ready(id_msg->job_id, &result);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (error_code) {
debug2("%s: %s", __func__, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
return_code_msg_t rc_msg = {
.return_code = result,
};
debug2("%s(%u)=%d %s",
__func__, id_msg->job_id, result, TIME_STR);
if (!_is_prolog_finished(id_msg->job_id))
(void) send_msg_response(msg, RESPONSE_PROLOG_EXECUTING,
&rc_msg);
else
(void) send_msg_response(msg, RESPONSE_JOB_READY,
&rc_msg);
}
}
/* Check if prolog has already finished */
static int _is_prolog_finished(uint32_t job_id)
{
int is_running = 0;
job_record_t *job_ptr;
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
lock_slurmctld(job_read_lock);
job_ptr = find_job_record(job_id);
if (job_ptr) {
is_running = (job_ptr->state_reason != WAIT_PROLOG);
}
unlock_slurmctld(job_read_lock);
return is_running;
}
/* get node select info plugin */
static void _slurm_rpc_burst_buffer_info(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
buf_t *buffer;
uid_t uid = msg->auth_uid;
DEF_TIMERS;
START_TIMER;
buffer = init_buf(BUF_SIZE);
if (validate_super_user(msg->auth_uid))
uid = 0;
error_code = bb_g_state_pack(uid, buffer, msg->protocol_version);
END_TIMER2(__func__);
if (error_code) {
debug("%s: %s", __func__, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
(void) send_msg_response(msg, RESPONSE_BURST_BUFFER_INFO,
buffer);
FREE_NULL_BUFFER(buffer);
}
}
static void _slurm_rpc_suspend(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
suspend_msg_t *sus_ptr = msg->data;
/* Locks: write job and node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
job_record_t *job_ptr;
char *op;
START_TIMER;
switch (sus_ptr->op) {
case SUSPEND_JOB:
op = "suspend";
break;
case RESUME_JOB:
op = "resume";
break;
default:
op = "unknown";
}
debug3("Processing RPC details: REQUEST_SUSPEND(%s)", op);
/* Get the job id part of the jobid. It could be an array id. Currently
* in a federation, job arrays only run on the origin cluster so we just
* want to find if the array, not a specific task, is on the origin
* cluster. */
if ((sus_ptr->job_id == NO_VAL) && sus_ptr->job_id_str)
sus_ptr->job_id = strtol(sus_ptr->job_id_str, NULL, 10);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(sus_ptr->job_id);
/* If job is found on the cluster, it could be pending, the origin
* cluster, or running on the sibling cluster. If it's not there then
* route it to the origin, otherwise try to suspend the job. If it's
* pending an error should be returned. If it's running then it should
* suspend the job. */
if (!job_ptr && !_route_msg_to_origin(msg, NULL, sus_ptr->job_id)) {
unlock_slurmctld(job_write_lock);
return;
}
if (!job_ptr)
error_code = ESLURM_INVALID_JOB_ID;
else if (fed_mgr_job_started_on_sib(job_ptr)) {
/* Route to the cluster that is running the job. */
slurmdb_cluster_rec_t *dst =
fed_mgr_get_cluster_by_id(
job_ptr->fed_details->cluster_lock);
if (dst) {
slurm_send_reroute_msg(msg, dst, NULL);
info("%s: %s %pJ uid %u routed to %s",
__func__, rpc_num2string(msg->msg_type),
job_ptr, msg->auth_uid, dst->name);
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
return;
}
error("couldn't find cluster by cluster id %d",
job_ptr->fed_details->cluster_lock);
error_code = ESLURM_INVALID_CLUSTER_NAME;
} else if (sus_ptr->job_id_str) {
error_code = job_suspend2(msg, sus_ptr, msg->auth_uid, true,
msg->protocol_version);
} else {
error_code = job_suspend(msg, sus_ptr, msg->auth_uid,
true, msg->protocol_version);
}
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
if (!sus_ptr->job_id_str)
xstrfmtcat(sus_ptr->job_id_str, "%u", sus_ptr->job_id);
if (error_code) {
info("%s(%s) for %s %s",
__func__, op, sus_ptr->job_id_str,
slurm_strerror(error_code));
} else {
info("%s(%s) for %s %s",
__func__, op, sus_ptr->job_id_str, TIME_STR);
schedule_job_save(); /* Has own locking */
if (sus_ptr->op == SUSPEND_JOB)
queue_job_scheduler();
}
}
static void _slurm_rpc_top_job(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
top_job_msg_t *top_ptr = msg->data;
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
START_TIMER;
lock_slurmctld(job_write_lock);
error_code = job_set_top(msg, top_ptr, msg->auth_uid,
msg->protocol_version);
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
if (error_code) {
info("%s for %s %s",
__func__, top_ptr->job_id_str, slurm_strerror(error_code));
} else {
info("%s for %s %s",
__func__, top_ptr->job_id_str, TIME_STR);
}
}
static void _slurm_rpc_auth_token(slurm_msg_t *msg)
{
DEF_TIMERS;
token_request_msg_t *request_msg = msg->data;
token_response_msg_t *resp_data;
char *auth_username = NULL, *username = NULL;
int lifespan = 0;
static int max_lifespan = -1;
START_TIMER;
if (xstrstr(slurm_conf.authalt_params, "disable_token_creation") &&
!validate_slurm_user(msg->auth_uid)) {
error("%s: attempt to retrieve a token while token creation disabled UID=%u",
__func__, msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
if (!auth_is_plugin_type_inited(AUTH_PLUGIN_JWT)) {
slurm_send_rc_msg(msg, ESLURM_PLUGIN_NOT_LOADED);
return;
}
if (max_lifespan == -1) {
char *tmp_ptr;
max_lifespan = 0;
if ((tmp_ptr = xstrcasestr(slurm_conf.authalt_params,
"max_token_lifespan="))) {
max_lifespan = atoi(tmp_ptr + 19);
if (max_lifespan < 1) {
error("Invalid AuthAltParameters max_token_lifespan option, no limit enforced");
max_lifespan = 0;
}
}
}
auth_username = uid_to_string_or_null(msg->auth_uid);
if (request_msg->username) {
if (validate_slurm_user(msg->auth_uid)) {
username = request_msg->username;
} else if (!xstrcmp(request_msg->username, auth_username)) {
/* user explicitly provided their own username */
username = request_msg->username;
} else {
error("%s: attempt to retrieve a token for a different user=%s by UID=%u",
__func__, request_msg->username, msg->auth_uid);
xfree(auth_username);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
}
} else if (!auth_username) {
error("%s: attempt to retrieve a token for a missing username by UID=%u",
__func__, msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
} else {
username = auth_username;
}
if (request_msg->lifespan)
lifespan = request_msg->lifespan;
else if (max_lifespan)
lifespan = MIN(DEFAULT_AUTH_TOKEN_LIFESPAN, max_lifespan);
else
lifespan = DEFAULT_AUTH_TOKEN_LIFESPAN;
if (!validate_slurm_user(msg->auth_uid)) {
if ((max_lifespan > 0) && (lifespan > max_lifespan)) {
error("%s: rejecting token lifespan %d for user:%s[%d] requested, exceeds limit of %d",
__func__, request_msg->lifespan, username,
msg->auth_uid, max_lifespan);
xfree(auth_username);
slurm_send_rc_msg(msg, ESLURM_INVALID_TIME_LIMIT);
return;
}
}
resp_data = xmalloc(sizeof(*resp_data));
resp_data->token = auth_g_token_generate(AUTH_PLUGIN_JWT, username,
lifespan);
xfree(auth_username);
END_TIMER2(__func__);
if (!resp_data->token) {
error("%s: error generating auth token: %m", __func__);
xfree(resp_data);
slurm_send_rc_msg(msg, ESLURM_AUTH_UNABLE_TO_GENERATE_TOKEN);
return;
}
(void) send_msg_response(msg, RESPONSE_AUTH_TOKEN, resp_data);
slurm_free_token_response_msg(resp_data);
}
static void _slurm_rpc_requeue(slurm_msg_t *msg)
{
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
requeue_msg_t *req_ptr = msg->data;
/* Locks: write job and node */
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(fed_read_lock);
if (!_route_msg_to_origin(msg, req_ptr->job_id_str, req_ptr->job_id)) {
unlock_slurmctld(fed_read_lock);
return;
}
unlock_slurmctld(fed_read_lock);
START_TIMER;
lock_slurmctld(job_write_lock);
if (req_ptr->job_id_str) {
error_code = job_requeue2(msg->auth_uid, req_ptr, msg, false);
} else {
error_code = job_requeue(msg->auth_uid, req_ptr->job_id, msg,
false, req_ptr->flags);
}
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
if (error_code) {
if (!req_ptr->job_id_str)
xstrfmtcat(req_ptr->job_id_str, "%u", req_ptr->job_id);
info("%s: Requeue of JobId=%s returned an error: %s",
__func__, req_ptr->job_id_str, slurm_strerror(error_code));
}
/* Functions below provide their own locking
*/
schedule_job_save();
}
/* Copy an array of type char **, xmalloc() the array and xstrdup() the
* strings in the array */
extern char **
xduparray(uint32_t size, char ** array)
{
int i;
char ** result;
if (size == 0)
return (char **)NULL;
result = xcalloc(size, sizeof(char *));
for (i=0; i<size; i++)
result[i] = xstrdup(array[i]);
return result;
}
static void _slurm_rpc_trigger_clear(slurm_msg_t *msg)
{
int rc;
trigger_info_msg_t *trigger_ptr = msg->data;
DEF_TIMERS;
START_TIMER;
rc = trigger_clear(msg->auth_uid, trigger_ptr);
END_TIMER2(__func__);
slurm_send_rc_msg(msg, rc);
}
static void _slurm_rpc_trigger_get(slurm_msg_t *msg)
{
trigger_info_msg_t *resp_data;
trigger_info_msg_t *trigger_ptr = msg->data;
DEF_TIMERS;
START_TIMER;
resp_data = trigger_get(msg->auth_uid, trigger_ptr);
END_TIMER2(__func__);
(void) send_msg_response(msg, RESPONSE_TRIGGER_GET, resp_data);
slurm_free_trigger_msg(resp_data);
}
static void _slurm_rpc_trigger_set(slurm_msg_t *msg)
{
int rc;
trigger_info_msg_t *trigger_ptr = msg->data;
bool allow_user_triggers = xstrcasestr(slurm_conf.slurmctld_params,
"allow_user_triggers");
bool disable_triggers = xstrcasestr(slurm_conf.slurmctld_params,
"disable_triggers");
DEF_TIMERS;
START_TIMER;
if (disable_triggers) {
rc = ESLURM_DISABLED;
error("Request to set trigger, but disable_triggers is set.");
} else if (validate_slurm_user(msg->auth_uid) || allow_user_triggers) {
rc = trigger_set(msg->auth_uid, msg->auth_gid, trigger_ptr);
} else {
rc = ESLURM_ACCESS_DENIED;
error("Security violation, REQUEST_TRIGGER_SET RPC from uid=%u",
msg->auth_uid);
}
END_TIMER2(__func__);
slurm_send_rc_msg(msg, rc);
}
static void _slurm_rpc_trigger_pull(slurm_msg_t *msg)
{
int rc;
trigger_info_msg_t *trigger_ptr = msg->data;
DEF_TIMERS;
START_TIMER;
/* NOTE: No locking required here, trigger_pull only needs to lock
* it's own internal trigger structure */
if (!validate_slurm_user(msg->auth_uid)) {
rc = ESLURM_USER_ID_MISSING;
error("Security violation, REQUEST_TRIGGER_PULL RPC from uid=%u",
msg->auth_uid);
} else
rc = trigger_pull(trigger_ptr);
END_TIMER2(__func__);
slurm_send_rc_msg(msg, rc);
}
static void _slurm_rpc_get_topo(slurm_msg_t *msg)
{
int rc;
topo_info_response_msg_t *topo_resp_msg;
topo_info_request_msg_t *topo_req_msg = msg->data;
/* Locks: read node lock */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
DEF_TIMERS;
topo_resp_msg = xmalloc(sizeof(topo_info_response_msg_t));
START_TIMER;
lock_slurmctld(node_read_lock);
rc = topology_g_get(TOPO_DATA_TOPOLOGY_PTR, topo_req_msg->name,
&topo_resp_msg->topo_info);
unlock_slurmctld(node_read_lock);
END_TIMER2(__func__);
if (rc) {
slurm_send_rc_msg(msg, rc);
} else {
(void) send_msg_response(msg, RESPONSE_TOPO_INFO,
topo_resp_msg);
}
slurm_free_topo_info_msg(topo_resp_msg);
}
static void _slurm_rpc_get_topo_config(slurm_msg_t *msg)
{
topo_config_response_msg_t *topo_resp_msg;
slurmctld_lock_t node_read_lock = {
.node = READ_LOCK,
};
DEF_TIMERS;
topo_resp_msg = xmalloc(sizeof(*topo_resp_msg));
START_TIMER;
lock_slurmctld(node_read_lock);
topo_resp_msg->config = topology_g_get_config();
unlock_slurmctld(node_read_lock);
END_TIMER2(__func__);
(void) send_msg_response(msg, RESPONSE_TOPO_CONFIG, topo_resp_msg);
slurm_free_topo_config_msg(topo_resp_msg);
}
static void _slurm_rpc_job_notify(slurm_msg_t *msg)
{
int error_code;
/* Locks: read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
job_notify_msg_t *notify_msg = msg->data;
job_record_t *job_ptr;
DEF_TIMERS;
START_TIMER;
lock_slurmctld(job_read_lock);
job_ptr = find_job_record(notify_msg->step_id.job_id);
/* If job is found on the cluster, it could be pending, the origin
* cluster, or running on the sibling cluster. If it's not there then
* route it to the origin. */
if (!job_ptr &&
!_route_msg_to_origin(msg, NULL, notify_msg->step_id.job_id)) {
unlock_slurmctld(job_read_lock);
return;
}
if (!job_ptr)
error_code = ESLURM_INVALID_JOB_ID;
else if (job_ptr->batch_flag &&
fed_mgr_job_started_on_sib(job_ptr)) {
/* Route to the cluster that is running the batch job. srun jobs
* don't need to be routed to the running cluster since the
* origin cluster knows how to contact the listening srun. */
slurmdb_cluster_rec_t *dst =
fed_mgr_get_cluster_by_id(
job_ptr->fed_details->cluster_lock);
if (dst) {
slurm_send_reroute_msg(msg, dst, NULL);
info("%s: %s %pJ uid %u routed to %s",
__func__, rpc_num2string(msg->msg_type),
job_ptr, msg->auth_uid, dst->name);
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
return;
}
error("couldn't find cluster by cluster id %d",
job_ptr->fed_details->cluster_lock);
error_code = ESLURM_INVALID_CLUSTER_NAME;
} else if ((job_ptr->user_id == msg->auth_uid) ||
validate_slurm_user(msg->auth_uid))
error_code = srun_user_message(job_ptr, notify_msg->message);
else {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, REQUEST_JOB_NOTIFY RPC from uid=%u for %pJ owner %u",
msg->auth_uid, job_ptr, job_ptr->user_id);
}
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
slurm_send_rc_msg(msg, error_code);
}
static void _slurm_rpc_set_debug_flags(slurm_msg_t *msg)
{
slurmctld_lock_t config_write_lock =
{ WRITE_LOCK, READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
set_debug_flags_msg_t *request_msg = msg->data;
char *flag_string;
if (!validate_super_user(msg->auth_uid)) {
error("set debug flags request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
lock_slurmctld (config_write_lock);
slurm_conf.debug_flags &= (~request_msg->debug_flags_minus);
slurm_conf.debug_flags |= request_msg->debug_flags_plus;
slurm_conf.last_update = time(NULL);
slurmscriptd_update_debug_flags(slurm_conf.debug_flags);
/* Reset cached debug_flags values */
gs_reconfig();
gres_reconfig();
priority_g_reconfig(false);
select_g_reconfigure();
(void) sched_g_reconfig();
unlock_slurmctld (config_write_lock);
flag_string = debug_flags2str(slurm_conf.debug_flags);
info("Set DebugFlags to %s", flag_string ? flag_string : "none");
xfree(flag_string);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_set_debug_level(slurm_msg_t *msg)
{
int debug_level;
slurmctld_lock_t config_write_lock =
{ WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
set_debug_level_msg_t *request_msg = msg->data;
if (!validate_super_user(msg->auth_uid)) {
error("set debug level request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
/* NOTE: not offset by LOG_LEVEL_INFO, since it's inconvenient
* to provide negative values for scontrol */
debug_level = MIN (request_msg->debug_level, (LOG_LEVEL_END - 1));
debug_level = MAX (debug_level, LOG_LEVEL_QUIET);
lock_slurmctld(config_write_lock);
update_log_levels(debug_level, debug_level);
slurmscriptd_update_log_level(debug_level, false);
info("Set debug level to '%s'", log_num2string(debug_level));
slurm_conf.slurmctld_debug = debug_level;
slurm_conf.last_update = time(NULL);
unlock_slurmctld(config_write_lock);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static char *_update_hostset_from_mode(char *update_str,
update_mode_t mode,
char *current_str)
{
char *new_str = NULL;
if (mode == UPDATE_SET) {
if (*update_str)
new_str = xstrdup(update_str);
} else {
hostset_t *current_hostset = hostset_create(current_str);
if (mode == UPDATE_ADD) {
hostset_insert(current_hostset, update_str);
} else if (mode == UPDATE_REMOVE) {
hostset_delete(current_hostset, update_str);
} /* If bad mode is sent do nothing */
if (hostset_count(current_hostset))
new_str =
hostset_ranged_string_xmalloc(current_hostset);
hostset_destroy(current_hostset);
}
return new_str;
}
static char *_update_string_from_mode(char *update_str,
update_mode_t mode,
char *current_str,
bool lower_case_normalization)
{
char *new_str = NULL;
if (mode == UPDATE_ADD) {
if (current_str && *current_str) {
list_t *current_list = list_create(xfree_ptr);
slurm_addto_char_list_with_case(
current_list, current_str,
lower_case_normalization);
if (*update_str)
slurm_addto_char_list_with_case(
current_list, update_str,
lower_case_normalization);
new_str = slurm_char_list_to_xstr(current_list);
FREE_NULL_LIST(current_list);
} else if (*update_str) {
new_str = xstrdup(update_str);
}
} else if (mode == UPDATE_REMOVE) {
if (current_str && *current_str) {
list_t *current_list = list_create(xfree_ptr);
list_t *rem_list = list_create(xfree_ptr);
slurm_addto_char_list_with_case(
current_list, current_str,
lower_case_normalization);
slurm_addto_char_list_with_case(
rem_list, update_str,
lower_case_normalization);
slurm_remove_char_list_from_char_list(current_list,
rem_list);
new_str = slurm_char_list_to_xstr(current_list);
FREE_NULL_LIST(current_list);
FREE_NULL_LIST(rem_list);
}
} else if (mode == UPDATE_SET) {
if (*update_str)
new_str = xstrdup(update_str);
} else { /* If bad mode is sent do nothing */
error("bad update mode %d", mode);
if (current_str && *current_str)
new_str = xstrdup(current_str);
}
return new_str;
}
static void _set_power_save_settings(char *new_str, char **slurm_conf_setting)
{
slurmctld_lock_t locks = {
.conf = WRITE_LOCK,
.node = READ_LOCK,
.part = READ_LOCK,
};
lock_slurmctld(locks);
xfree(*slurm_conf_setting);
*slurm_conf_setting = new_str;
slurm_conf.last_update = time(NULL);
power_save_exc_setup(); /* Reload power save settings */
unlock_slurmctld(locks);
}
static void _slurm_rpc_set_suspend_exc_nodes(slurm_msg_t *msg)
{
suspend_exc_update_msg_t *update_msg = msg->data;
char *new_str;
if (!validate_super_user(msg->auth_uid)) {
error("set SuspendExcNodes request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
if ((update_msg->mode != UPDATE_SET) &&
(xstrchr(slurm_conf.suspend_exc_nodes, ':') ||
xstrchr(update_msg->update_str, ':'))) {
error("Append and remove from SuspendExcNodes with ':' is not supported. Please use direct assignment instead.");
slurm_send_rc_msg(msg, ESLURM_INVALID_NODE_NAME);
return;
}
new_str = _update_hostset_from_mode(update_msg->update_str,
update_msg->mode,
slurm_conf.suspend_exc_nodes);
if (!xstrcmp(new_str, slurm_conf.suspend_exc_nodes)) {
info("SuspendExcNodes did not change from %s with update: %s",
slurm_conf.suspend_exc_nodes, update_msg->update_str);
xfree(new_str);
} else {
info("Setting SuspendExcNodes to '%s'", new_str);
_set_power_save_settings(new_str,
&slurm_conf.suspend_exc_nodes);
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_set_suspend_exc_parts(slurm_msg_t *msg)
{
suspend_exc_update_msg_t *update_msg = msg->data;
char *new_str;
if (!validate_super_user(msg->auth_uid)) {
error("set SuspendExcParts request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
new_str = _update_string_from_mode(update_msg->update_str,
update_msg->mode,
slurm_conf.suspend_exc_parts, false);
if (!xstrcmp(new_str, slurm_conf.suspend_exc_parts)) {
info("SuspendExcParts did not change from %s with update: %s",
slurm_conf.suspend_exc_parts, update_msg->update_str);
xfree(new_str);
} else {
info("Setting SuspendExcParts to '%s'", new_str);
_set_power_save_settings(new_str,
&slurm_conf.suspend_exc_parts);
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_set_suspend_exc_states(slurm_msg_t *msg)
{
suspend_exc_update_msg_t *update_msg = msg->data;
char *new_str;
if (!validate_super_user(msg->auth_uid)) {
error("set SuspendExcStates request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
new_str = _update_string_from_mode(update_msg->update_str,
update_msg->mode,
slurm_conf.suspend_exc_states,
true);
if (!xstrcmp(new_str, slurm_conf.suspend_exc_states)) {
info("SuspendExcStates did not change from %s with update: %s",
slurm_conf.suspend_exc_states, update_msg->update_str);
xfree(new_str);
} else {
info("Setting SuspendExcStates to '%s'", new_str);
_set_power_save_settings(new_str, &slurm_conf.suspend_exc_states);
}
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_set_schedlog_level(slurm_msg_t *msg)
{
int schedlog_level;
slurmctld_lock_t config_read_lock =
{ READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
set_debug_level_msg_t *request_msg = msg->data;
log_options_t log_opts = SCHEDLOG_OPTS_INITIALIZER;
if (!validate_super_user(msg->auth_uid)) {
error("set scheduler log level request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
/*
* If slurm_conf.sched_logfile is NULL, then this operation
* will fail, since there is no sched logfile for which to alter
* the log level. (Calling sched_log_alter with a NULL filename
* is likely to cause a segfault at the next sched log call)
* So just give up and return "Operation Disabled"
*/
if (slurm_conf.sched_logfile == NULL) {
error("set scheduler log level failed: no log file!");
slurm_send_rc_msg (msg, ESLURM_DISABLED);
return;
}
schedlog_level = MIN (request_msg->debug_level, (LOG_LEVEL_QUIET + 1));
schedlog_level = MAX (schedlog_level, LOG_LEVEL_QUIET);
lock_slurmctld(config_read_lock);
log_opts.logfile_level = schedlog_level;
sched_log_alter(log_opts, LOG_DAEMON, slurm_conf.sched_logfile);
sched_info("Set scheduler log level to %d", schedlog_level);
slurm_conf.sched_log_level = schedlog_level;
slurm_conf.last_update = time(NULL);
unlock_slurmctld(config_read_lock);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_accounting_update_msg(slurm_msg_t *msg)
{
int rc = SLURM_SUCCESS;
accounting_update_msg_t *update_ptr = msg->data;
DEF_TIMERS;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error("Update Association request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
if (!update_ptr->update_list || !list_count(update_ptr->update_list)) {
slurm_send_rc_msg(msg, rc);
return;
}
/*
* Before we send an rc we are transferring the update_list to a common
* list to avoid the potential of messages from the dbd getting out of
* order. The list lock here should protect us here as we only access
* this list in list_transfer and list_delete_all.
*/
xassert(slurmctld_config.acct_update_list);
list_transfer(slurmctld_config.acct_update_list,
update_ptr->update_list);
/*
* Send message back to the caller letting him know we got it.
* Since we have the update list in the order we got it we should be
* good to respond. There should be no need to wait since the end
* result would be the same if we wait or not since the update has
* already happened in the database.
*/
slurm_send_rc_msg(msg, rc);
/* Signal acct_update_thread to process list */
slurm_mutex_lock(&slurmctld_config.acct_update_lock);
slurm_cond_broadcast(&slurmctld_config.acct_update_cond);
slurm_mutex_unlock(&slurmctld_config.acct_update_lock);
END_TIMER2(__func__);
if (rc != SLURM_SUCCESS)
error("assoc_mgr_update gave error: %s",
slurm_strerror(rc));
}
/* _slurm_rpc_reboot_nodes - process RPC to schedule nodes reboot */
static void _slurm_rpc_reboot_nodes(slurm_msg_t *msg)
{
int rc;
char *err_msg = NULL;
node_record_t *node_ptr;
reboot_msg_t *reboot_msg = msg->data;
char *nodelist = NULL;
bitstr_t *bitmap = NULL, *cannot_reboot_nodes = NULL;
/* Locks: write node lock */
slurmctld_lock_t node_write_lock = {
NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
time_t now = time(NULL);
DEF_TIMERS;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error("Security violation, REBOOT_NODES RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
/* do RPC call */
if (reboot_msg)
nodelist = reboot_msg->node_list;
if (!nodelist || !xstrcasecmp(nodelist, "ALL")) {
bitmap = node_conf_get_active_bitmap();
} else {
hostlist_t *hostlist;
if (!(hostlist =
nodespec_to_hostlist(nodelist, true, NULL))) {
error("%s: Bad node list in REBOOT_NODES request: \"%s\"",
__func__, nodelist);
slurm_send_rc_msg(msg, ESLURM_INVALID_NODE_NAME);
return;
} else if ((rc = hostlist2bitmap(hostlist, true, &bitmap))) {
error("%s: Can't find nodes requested in REBOOT_NODES request: \"%s\"",
__func__, nodelist);
FREE_NULL_BITMAP(bitmap);
FREE_NULL_HOSTLIST(hostlist);
slurm_send_rc_msg(msg, ESLURM_INVALID_NODE_NAME);
return;
}
FREE_NULL_HOSTLIST(hostlist);
}
cannot_reboot_nodes = bit_alloc(node_record_count);
lock_slurmctld(node_write_lock);
for (int i = 0; (node_ptr = next_node_bitmap(bitmap, &i)); i++) {
if (IS_NODE_FUTURE(node_ptr) ||
IS_NODE_REBOOT_REQUESTED(node_ptr) ||
IS_NODE_REBOOT_ISSUED(node_ptr) ||
IS_NODE_POWER_DOWN(node_ptr) ||
IS_NODE_POWERED_DOWN(node_ptr) ||
IS_NODE_POWERING_DOWN(node_ptr)) {
bit_clear(bitmap, node_ptr->index);
bit_set(cannot_reboot_nodes, node_ptr->index);
debug2("Skipping reboot of node %s in state %s",
node_ptr->name,
node_state_string(node_ptr->node_state));
continue;
}
node_ptr->node_state |= NODE_STATE_REBOOT_REQUESTED;
if (reboot_msg) {
node_ptr->next_state = reboot_msg->next_state;
if (node_ptr->next_state == NODE_RESUME)
bit_set(rs_node_bitmap, node_ptr->index);
if (reboot_msg->reason) {
xfree(node_ptr->reason);
node_ptr->reason = xstrdup(reboot_msg->reason);
node_ptr->reason_time = now;
node_ptr->reason_uid = msg->auth_uid;
}
if (reboot_msg->flags & REBOOT_FLAGS_ASAP) {
if (!IS_NODE_DRAIN(node_ptr)) {
if (node_ptr->next_state == NO_VAL)
node_ptr->next_state =
NODE_STATE_UNDRAIN;
else
node_ptr->next_state |=
NODE_STATE_UNDRAIN;
}
node_ptr->node_state |= NODE_STATE_DRAIN;
bit_clear(avail_node_bitmap, node_ptr->index);
bit_set(asap_node_bitmap, node_ptr->index);
if (node_ptr->reason == NULL) {
node_ptr->reason =
xstrdup("Reboot ASAP");
node_ptr->reason_time = now;
node_ptr->reason_uid = msg->auth_uid;
}
}
if (!node_ptr->reason) {
node_ptr->reason = xstrdup("reboot requested");
node_ptr->reason_time = now;
node_ptr->reason_uid = msg->auth_uid;
}
}
want_nodes_reboot = true;
}
if (want_nodes_reboot == true)
schedule_node_save();
unlock_slurmctld(node_write_lock);
if (want_nodes_reboot == true) {
nodelist = bitmap2node_name(bitmap);
info("reboot request queued for nodes %s", nodelist);
xfree(nodelist);
}
if (bit_ffs(cannot_reboot_nodes) != -1) {
nodelist = bitmap2node_name(cannot_reboot_nodes);
xstrfmtcat(err_msg, "Skipping reboot of nodes %s due to current node state.",
nodelist);
xfree(nodelist);
}
FREE_NULL_BITMAP(cannot_reboot_nodes);
FREE_NULL_BITMAP(bitmap);
rc = SLURM_SUCCESS;
END_TIMER2(__func__);
slurm_send_rc_err_msg(msg, rc, err_msg);
xfree(err_msg);
}
static void _slurm_rpc_accounting_first_reg(slurm_msg_t *msg)
{
time_t event_time = time(NULL);
DEF_TIMERS;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error("First Registration request from non-super user uid=%u",
msg->auth_uid);
return;
}
acct_storage_g_send_all(acct_db_conn, event_time, ACCOUNTING_FIRST_REG);
END_TIMER2(__func__);
}
static void _slurm_rpc_accounting_register_ctld(slurm_msg_t *msg)
{
DEF_TIMERS;
START_TIMER;
if (!validate_super_user(msg->auth_uid)) {
error("Registration request from non-super user uid=%u",
msg->auth_uid);
return;
}
clusteracct_storage_g_register_ctld(acct_db_conn,
slurm_conf.slurmctld_port);
END_TIMER2(__func__);
}
static void _clear_rpc_stats(void)
{
slurm_mutex_lock(&rpc_mutex);
memset(rpc_type_cnt, 0, sizeof(rpc_type_cnt));
memset(rpc_type_id, 0, sizeof(rpc_type_id));
memset(rpc_type_time, 0, sizeof(rpc_type_time));
memset(rpc_type_queued, 0, sizeof(rpc_type_queued));
memset(rpc_type_dropped, 0, sizeof(rpc_type_dropped));
memset(rpc_type_cycle_last, 0, sizeof(rpc_type_cycle_last));
memset(rpc_type_cycle_max, 0, sizeof(rpc_type_cycle_max));
memset(rpc_user_cnt, 0, sizeof(rpc_user_cnt));
memset(rpc_user_id, 0, sizeof(rpc_user_id));
memset(rpc_user_time, 0, sizeof(rpc_user_time));
slurm_mutex_unlock(&rpc_mutex);
}
static void _pack_rpc_stats(buf_t *buffer, uint16_t protocol_version)
{
slurm_mutex_lock(&rpc_mutex);
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
uint32_t rpc_count = 0, user_count = 1;
uint8_t queue_enabled = rpc_queue_enabled();
while (rpc_type_id[rpc_count])
rpc_count++;
pack32(rpc_count, buffer);
pack16_array(rpc_type_id, rpc_count, buffer);
pack32_array(rpc_type_cnt, rpc_count, buffer);
pack64_array(rpc_type_time, rpc_count, buffer);
pack8(queue_enabled, buffer);
if (queue_enabled) {
pack16_array(rpc_type_queued, rpc_count, buffer);
pack64_array(rpc_type_dropped, rpc_count, buffer);
pack16_array(rpc_type_cycle_last, rpc_count, buffer);
pack16_array(rpc_type_cycle_max, rpc_count, buffer);
}
/* user_count starts at 1 as root is in index 0 */
while (rpc_user_id[user_count])
user_count++;
pack32(user_count, buffer);
pack32_array(rpc_user_id, user_count, buffer);
pack32_array(rpc_user_cnt, user_count, buffer);
pack64_array(rpc_user_time, user_count, buffer);
agent_pack_pending_rpc_stats(buffer);
}
slurm_mutex_unlock(&rpc_mutex);
}
static void _slurm_rpc_burst_buffer_status(slurm_msg_t *msg)
{
bb_status_resp_msg_t status_resp_msg;
bb_status_req_msg_t *status_req_msg = msg->data;
memset(&status_resp_msg, 0, sizeof(status_resp_msg));
status_resp_msg.status_resp = bb_g_get_status(status_req_msg->argc,
status_req_msg->argv,
msg->auth_uid,
msg->auth_gid);
(void) send_msg_response(msg, RESPONSE_BURST_BUFFER_STATUS,
&status_resp_msg);
xfree(status_resp_msg.status_resp);
}
/* _slurm_rpc_dump_stats - process RPC for statistics information */
static void _slurm_rpc_dump_stats(slurm_msg_t *msg)
{
stats_info_request_msg_t *request_msg = msg->data;
buf_t *buffer = NULL;
if ((request_msg->command_id == STAT_COMMAND_RESET) &&
!validate_operator(msg->auth_uid)) {
error("Security violation: REQUEST_STATS_INFO reset from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
debug3("Processing RPC details: REQUEST_STATS_INFO command=%u",
request_msg->command_id);
if (request_msg->command_id == STAT_COMMAND_RESET) {
reset_stats(1);
_clear_rpc_stats();
slurm_send_rc_msg(msg, SLURM_SUCCESS);
return;
}
buffer = pack_all_stat(msg->protocol_version);
_pack_rpc_stats(buffer, msg->protocol_version);
/* send message */
(void) send_msg_response(msg, RESPONSE_STATS_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
static void _slurm_rpc_dump_licenses(slurm_msg_t *msg)
{
DEF_TIMERS;
license_info_request_msg_t *lic_req_msg = msg->data;
buf_t *buffer = NULL;
START_TIMER;
if ((lic_req_msg->last_update - 1) >= last_license_update) {
/* Don't send unnecessary data
*/
debug2("%s: no change SLURM_NO_CHANGE_IN_DATA", __func__);
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
return;
}
buffer = get_all_license_info(msg->protocol_version);
END_TIMER2(__func__);
(void) send_msg_response(msg, RESPONSE_LICENSE_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
static void _slurm_rpc_kill_job(slurm_msg_t *msg)
{
static int active_rpc_cnt = 0;
DEF_TIMERS;
job_step_kill_msg_t *kill = msg->data;
slurmctld_lock_t fed_job_read_lock =
{NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
slurmctld_lock_t lock = {READ_LOCK, WRITE_LOCK,
WRITE_LOCK, NO_LOCK, READ_LOCK };
int cc;
/*
* If the cluster is part of a federation and it isn't the origin of the
* job then if it doesn't know about the federated job, then route the
* request to the origin cluster via the client. If the cluster does
* know about the job and it owns the job, then this cluster will cancel
* the job and it will report the cancel back to the origin. If job does
* reside on this cluster but doesn't own it (e.g. pending jobs), then
* route the request back to the origin to handle it.
*/
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
lock_slurmctld(fed_job_read_lock);
if (fed_mgr_fed_rec) {
uint32_t job_id, origin_id;
job_record_t *job_ptr;
slurmdb_cluster_rec_t *origin;
job_id = strtol(kill->sjob_id, NULL, 10);
origin_id = fed_mgr_get_cluster_id(job_id);
origin = fed_mgr_get_cluster_by_id(origin_id);
/*
* only reroute to the origin if the connection is up. If it
* isn't then _signal_job will signal the sibling jobs
*/
if (origin && origin->fed.send &&
((persist_conn_t *) origin->fed.send)->tls_conn &&
(origin != fed_mgr_cluster_rec) &&
(!(job_ptr = find_job_record(job_id)) ||
(job_ptr && job_ptr->fed_details &&
(job_ptr->fed_details->cluster_lock !=
fed_mgr_cluster_rec->fed.id)))) {
slurmdb_cluster_rec_t *dst =
fed_mgr_get_cluster_by_id(origin_id);
if (!dst) {
error("couldn't find cluster by cluster id %d",
origin_id);
slurm_send_rc_msg(msg, SLURM_ERROR);
} else {
slurm_send_reroute_msg(msg, dst, NULL);
info("%s: REQUEST_KILL_JOB JobId=%s uid %u routed to %s",
__func__, kill->sjob_id, msg->auth_uid,
dst->name);
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(fed_job_read_lock);
return;
}
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING))
unlock_slurmctld(fed_job_read_lock);
START_TIMER;
info("%s: REQUEST_KILL_JOB JobId=%s uid %u",
__func__, kill->sjob_id, msg->auth_uid);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
_throttle_start(&active_rpc_cnt);
lock_slurmctld(lock);
}
if (kill->sibling) {
uint32_t job_id = strtol(kill->sjob_id, NULL, 10);
cc = fed_mgr_remove_active_sibling(job_id, kill->sibling);
} else {
cc = job_str_signal(kill->sjob_id, kill->signal, kill->flags,
msg->auth_uid, 0);
}
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(lock);
_throttle_fini(&active_rpc_cnt);
}
if (cc == ESLURM_ALREADY_DONE) {
debug2("%s: job_str_signal() uid=%u JobId=%s sig=%d returned: %s",
__func__, msg->auth_uid, kill->sjob_id,
kill->signal, slurm_strerror(cc));
} else if (cc != SLURM_SUCCESS) {
info("%s: job_str_signal() uid=%u JobId=%s sig=%d returned: %s",
__func__, msg->auth_uid, kill->sjob_id,
kill->signal, slurm_strerror(cc));
} else {
slurmctld_diag_stats.jobs_canceled++;
}
slurm_send_rc_msg(msg, cc);
END_TIMER2(__func__);
}
static char *_str_array2str(char **array, uint32_t cnt)
{
char *ret_str = NULL;
char *pos = NULL;
for (int i = 0; i < cnt; i++) {
if (!pos) /* First string */
xstrfmtcatat(ret_str, &pos, "%s", array[i]);
else
xstrfmtcatat(ret_str, &pos, ",%s", array[i]);
}
return ret_str;
}
static void _log_kill_jobs_rpc(kill_jobs_msg_t *kill_msg)
{
char *job_ids_str = _str_array2str(kill_msg->jobs_array,
kill_msg->jobs_cnt);
verbose("%s filters: account=%s; flags=0x%x; job_name=%s; partition=%s; qos=%s; reservation=%s; signal=%u; state=%d(%s); user_id=%u, user_name=%s; wckey=%s; nodelist=%s; jobs=%s",
rpc_num2string(REQUEST_KILL_JOBS), kill_msg->account,
kill_msg->flags, kill_msg->job_name, kill_msg->partition,
kill_msg->qos, kill_msg->reservation, kill_msg->signal,
kill_msg->state,
kill_msg->state ? job_state_string(kill_msg->state) : "none",
kill_msg->user_id, kill_msg->user_name, kill_msg->wckey,
kill_msg->nodelist, job_ids_str);
xfree(job_ids_str);
}
static void _slurm_rpc_kill_jobs(slurm_msg_t *msg)
{
int rc;
DEF_TIMERS;
kill_jobs_msg_t *kill_msg = msg->data;
kill_jobs_resp_msg_t *kill_msg_resp = NULL;
slurmctld_lock_t lock = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
};
if ((slurm_conf.debug_flags & DEBUG_FLAG_PROTOCOL) ||
(slurm_conf.slurmctld_debug >= LOG_LEVEL_DEBUG2))
_log_kill_jobs_rpc(kill_msg);
START_TIMER;
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
lock_slurmctld(lock);
}
rc = job_mgr_signal_jobs(kill_msg, msg->auth_uid, &kill_msg_resp);
if (!(msg->flags & CTLD_QUEUE_PROCESSING)) {
unlock_slurmctld(lock);
}
END_TIMER2(__func__);
if (rc != SLURM_SUCCESS) {
slurm_send_rc_msg(msg, rc);
} else {
(void) send_msg_response(msg, RESPONSE_KILL_JOBS,
kill_msg_resp);
}
slurm_free_kill_jobs_response_msg(kill_msg_resp);
}
/* _slurm_rpc_assoc_mgr_info()
*
* Pack the assoc_mgr lists and return it back to the caller.
*/
static void _slurm_rpc_assoc_mgr_info(slurm_msg_t *msg)
{
DEF_TIMERS;
buf_t *buffer;
START_TIMER;
/* Security is handled in the assoc_mgr */
buffer = assoc_mgr_info_get_pack_msg(msg->data, msg->auth_uid,
acct_db_conn,
msg->protocol_version);
END_TIMER2(__func__);
if (!buffer) {
slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
return;
}
(void) send_msg_response(msg, RESPONSE_ASSOC_MGR_INFO, buffer);
FREE_NULL_BUFFER(buffer);
}
/* Take a persist_msg_t and handle it like a normal slurm_msg_t */
static int _process_persist_conn(void *arg, persist_msg_t *persist_msg,
buf_t **out_buffer)
{
slurm_msg_t msg;
slurmctld_rpc_t *this_rpc = NULL;
persist_conn_t *persist_conn = arg;
*out_buffer = NULL;
slurm_msg_t_init(&msg);
msg.auth_cred = persist_conn->auth_cred;
msg.auth_uid = persist_conn->auth_uid;
msg.auth_gid = persist_conn->auth_gid;
msg.auth_ids_set = persist_conn->auth_ids_set;
msg.conn = persist_conn;
msg.tls_conn = persist_conn->tls_conn;
msg.msg_type = persist_msg->msg_type;
msg.data = persist_msg->data;
msg.protocol_version = persist_conn->version;
if (persist_conn->persist_type == PERSIST_TYPE_ACCT_UPDATE) {
if (msg.msg_type == ACCOUNTING_UPDATE_MSG) {
DEF_TIMERS;
START_TIMER;
_slurm_rpc_accounting_update_msg(&msg);
END_TIMER;
record_rpc_stats(&msg, DELTA_TIMER);
} else {
slurm_send_rc_msg(&msg, EINVAL);
}
} else if ((this_rpc = find_rpc(persist_msg->msg_type))) {
xassert(!this_rpc->keep_msg);
/* directly process the request */
slurmctld_req(&msg, this_rpc);
} else {
error("invalid RPC msg_type=%s",
rpc_num2string(persist_msg->msg_type));
slurm_send_rc_msg(&msg, EINVAL);
}
return SLURM_SUCCESS;
}
static void _slurm_rpc_persist_init(slurm_msg_t *msg)
{
DEF_TIMERS;
int rc = SLURM_SUCCESS;
char *comment = NULL;
buf_t *ret_buf;
persist_conn_t *persist_conn = NULL, p_tmp;
persist_init_req_msg_t *persist_init = msg->data;
slurm_addr_t rem_addr;
if (msg->conn)
error("We already have a persistent connect, this should never happen");
START_TIMER;
if (persist_init->version > SLURM_PROTOCOL_VERSION)
persist_init->version = SLURM_PROTOCOL_VERSION;
if (!validate_slurm_user(msg->auth_uid)) {
memset(&p_tmp, 0, sizeof(p_tmp));
p_tmp.cluster_name = persist_init->cluster_name;
p_tmp.version = persist_init->version;
p_tmp.shutdown = &slurmctld_config.shutdown_time;
rc = ESLURM_USER_ID_MISSING;
error("Security violation, REQUEST_PERSIST_INIT RPC from uid=%u",
msg->auth_uid);
goto end_it;
}
persist_conn = xmalloc(sizeof(persist_conn_t));
persist_conn->auth_uid = msg->auth_uid;
persist_conn->auth_gid = msg->auth_gid;
persist_conn->auth_ids_set = msg->auth_ids_set;
persist_conn->auth_cred = msg->auth_cred;
msg->auth_cred = NULL;
persist_conn->cluster_name = persist_init->cluster_name;
persist_init->cluster_name = NULL;
persist_conn->tls_conn = msg->tls_conn;
msg->tls_conn = NULL;
persist_conn->callback_proc = _process_persist_conn;
persist_conn->persist_type = persist_init->persist_type;
persist_conn->rem_port = persist_init->port;
persist_conn->rem_host = xmalloc(INET6_ADDRSTRLEN);
(void) slurm_get_peer_addr(conn_g_get_fd(persist_conn->tls_conn),
&rem_addr);
slurm_get_ip_str(&rem_addr, persist_conn->rem_host, INET6_ADDRSTRLEN);
/* info("got it from %d %s %s(%u)", persist_conn->fd, */
/* persist_conn->cluster_name, */
/* persist_conn->rem_host, persist_conn->rem_port); */
persist_conn->shutdown = &slurmctld_config.shutdown_time;
//persist_conn->timeout = 0; /* we want this to be 0 */
persist_conn->version = persist_init->version;
memcpy(&p_tmp, persist_conn, sizeof(persist_conn_t));
if (persist_init->persist_type == PERSIST_TYPE_FED)
rc = fed_mgr_add_sibling_conn(persist_conn, &comment);
else if (persist_init->persist_type == PERSIST_TYPE_ACCT_UPDATE) {
persist_conn->flags |= PERSIST_FLAG_ALREADY_INITED;
slurm_persist_conn_recv_thread_init(
persist_conn, conn_g_get_fd(persist_conn->tls_conn), -1,
persist_conn);
} else
rc = SLURM_ERROR;
end_it:
/* If people are really hammering the fed_mgr we could get into trouble
* with the persist_conn we sent in, so use the copy instead
*/
ret_buf = slurm_persist_make_rc_msg(&p_tmp, rc, comment, p_tmp.version);
if (slurm_persist_send_msg(&p_tmp, ret_buf) != SLURM_SUCCESS) {
debug("Problem sending response to connection %d uid(%u)",
conn_g_get_fd(p_tmp.tls_conn), msg->auth_uid);
}
if (rc && persist_conn) {
/* Free AFTER message has been sent back to remote */
persist_conn->tls_conn = NULL;
slurm_persist_conn_destroy(persist_conn);
}
xfree(comment);
FREE_NULL_BUFFER(ret_buf);
END_TIMER;
/* Don't free this here, it will be done elsewhere */
//slurm_persist_conn_destroy(persist_conn);
}
static void _slurm_rpc_tls_cert(slurm_msg_t *msg)
{
tls_cert_request_msg_t *req = msg->data;
tls_cert_response_msg_t resp = { 0 };
node_record_t *node = NULL;
bool is_client_auth = false;
if (!validate_slurm_user(msg->auth_uid)) {
error("Security violation, REQUEST_TLS_CERT from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
if (!(node = find_node_record(req->node_name))) {
log_flag(TLS, "%s: Could not find node record. Request might not be from a slurmd node",
__func__);
}
is_client_auth = conn_g_is_client_authenticated(msg->tls_conn);
if (!(resp.signed_cert =
certmgr_g_sign_csr(req->csr, is_client_auth, req->token,
req->node_name))) {
error("%s: Unable to sign certificate signing request.",
__func__);
slurm_send_rc_msg(msg, SLURM_ERROR);
} else if (node) {
node->cert_last_renewal = time(NULL);
}
if (resp.signed_cert) {
log_flag(AUDIT_TLS, "Sending signed certificate back to node \'%s\'",
req->node_name);
}
(void) send_msg_response(msg, RESPONSE_TLS_CERT, &resp);
slurm_free_tls_cert_response_msg_members(&resp);
}
static void _slurm_rpc_sib_job_lock(slurm_msg_t *msg)
{
int rc;
sib_msg_t *sib_msg = msg->data;
if (!msg->conn) {
error("Security violation, SIB_JOB_LOCK RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
rc = fed_mgr_job_lock_set(sib_msg->job_id, sib_msg->cluster_id);
slurm_send_rc_msg(msg, rc);
}
static void _slurm_rpc_sib_job_unlock(slurm_msg_t *msg)
{
int rc;
sib_msg_t *sib_msg = msg->data;
if (!msg->conn) {
error("Security violation, SIB_JOB_UNLOCK RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
rc = fed_mgr_job_lock_unset(sib_msg->job_id, sib_msg->cluster_id);
slurm_send_rc_msg(msg, rc);
}
static void _slurm_rpc_sib_msg(uint32_t uid, slurm_msg_t *msg) {
if (!msg->conn) {
error("Security violation, SIB_SUBMISSION RPC from uid=%u",
uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
fed_mgr_q_sib_msg(msg, uid);
}
static void _slurm_rpc_dependency_msg(uint32_t uid, slurm_msg_t *msg)
{
if (!msg->conn || !validate_slurm_user(uid)) {
error("Security violation, REQUEST_SEND_DEP RPC from uid=%u",
uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
fed_mgr_q_dep_msg(msg);
}
static void _slurm_rpc_update_origin_dep_msg(uint32_t uid, slurm_msg_t *msg)
{
if (!msg->conn || !validate_slurm_user(uid)) {
error("Security violation, REQUEST_UPDATE_ORIGIN_DEP RPC from uid=%u",
uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
fed_mgr_q_update_origin_dep_msg(msg);
}
static buf_t *_build_rc_buf(int rc, uint16_t rpc_version)
{
buf_t *buf = NULL;
slurm_msg_t msg;
return_code_msg_t data;
memset(&data, 0, sizeof(data));
data.return_code = rc;
slurm_msg_t_init(&msg);
msg.msg_type = RESPONSE_SLURM_RC;
msg.data = &data;
buf = init_buf(128);
pack16(msg.msg_type, buf);
if (pack_msg(&msg, buf) != SLURM_SUCCESS)
FREE_NULL_BUFFER(buf);
return buf;
}
/* Free buf_t *record from a list */
static void _ctld_free_list_msg(void *x)
{
FREE_NULL_BUFFER(x);
}
static int _foreach_proc_multi_msg(void *x, void *arg)
{
buf_t *single_req_buf = x;
foreach_multi_msg_t *multi_msg = arg;
slurm_msg_t *msg = multi_msg->msg;
slurm_msg_t sub_msg;
buf_t *ret_buf;
slurm_msg_t_init(&sub_msg);
sub_msg.protocol_version = msg->protocol_version;
if (unpack16(&sub_msg.msg_type, single_req_buf) ||
unpack_msg(&sub_msg, single_req_buf)) {
error("Sub-message unpack error for REQUEST_CTLD_MULT_MSG %u RPC",
sub_msg.msg_type);
ret_buf = _build_rc_buf(SLURM_ERROR, msg->protocol_version);
list_append(multi_msg->full_resp_list, ret_buf);
return 0;
}
sub_msg.conn = msg->conn;
sub_msg.auth_cred = msg->auth_cred;
ret_buf = NULL;
log_flag(PROTOCOL, "%s: received opcode %s",
__func__, rpc_num2string(sub_msg.msg_type));
switch (sub_msg.msg_type) {
case REQUEST_PING:
ret_buf = _build_rc_buf(SLURM_SUCCESS, msg->protocol_version);
break;
case REQUEST_SIB_MSG:
_slurm_rpc_sib_msg(msg->auth_uid, &sub_msg);
ret_buf = _build_rc_buf(SLURM_SUCCESS, msg->protocol_version);
break;
case REQUEST_SEND_DEP:
_slurm_rpc_dependency_msg(msg->auth_uid, &sub_msg);
ret_buf = _build_rc_buf(SLURM_SUCCESS, msg->protocol_version);
break;
case REQUEST_UPDATE_ORIGIN_DEP:
_slurm_rpc_update_origin_dep_msg(msg->auth_uid, &sub_msg);
ret_buf = _build_rc_buf(SLURM_SUCCESS, msg->protocol_version);
break;
default:
error("%s: Unsupported Message Type:%s",
__func__, rpc_num2string(sub_msg.msg_type));
}
(void) slurm_free_msg_data(sub_msg.msg_type, sub_msg.data);
if (!ret_buf)
ret_buf = _build_rc_buf(SLURM_ERROR, msg->protocol_version);
list_append(multi_msg->full_resp_list, ret_buf);
return 0;
}
static void _proc_multi_msg(slurm_msg_t *msg)
{
ctld_list_msg_t *ctld_req_msg = msg->data;
ctld_list_msg_t ctld_resp_msg = { 0 };
foreach_multi_msg_t multi_msg = {
.msg = msg,
};
if (!msg->conn) {
error("Security violation, REQUEST_CTLD_MULT_MSG RPC from uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, ESLURM_ACCESS_DENIED);
return;
}
multi_msg.full_resp_list = list_create(_ctld_free_list_msg);
(void) list_for_each(ctld_req_msg->my_list,
_foreach_proc_multi_msg,
&multi_msg);
ctld_resp_msg.my_list = multi_msg.full_resp_list;
/* Send message */
(void) send_msg_response(msg, RESPONSE_CTLD_MULT_MSG, &ctld_resp_msg);
FREE_NULL_LIST(multi_msg.full_resp_list);
}
/* Route msg to federated job's origin.
* RET returns SLURM_SUCCESS if the msg was routed.
*/
static int _route_msg_to_origin(slurm_msg_t *msg, char *src_job_id_str,
uint32_t src_job_id)
{
xassert(msg);
/* route msg to origin cluster if a federated job */
if (!msg->conn && fed_mgr_fed_rec) {
/* Don't send reroute if coming from a federated cluster (aka
* has a msg->conn). */
uint32_t job_id, origin_id;
if (src_job_id_str)
job_id = strtol(src_job_id_str, NULL, 10);
else
job_id = src_job_id;
origin_id = fed_mgr_get_cluster_id(job_id);
if (origin_id && (origin_id != fed_mgr_cluster_rec->fed.id)) {
slurmdb_cluster_rec_t *dst =
fed_mgr_get_cluster_by_id(origin_id);
if (!dst) {
error("couldn't find cluster by cluster id %d",
origin_id);
slurm_send_rc_msg(msg, SLURM_ERROR);
} else {
slurm_send_reroute_msg(msg, dst, NULL);
info("%s: %s JobId=%u uid %u routed to %s",
__func__, rpc_num2string(msg->msg_type),
job_id, msg->auth_uid, dst->name);
}
return SLURM_SUCCESS;
}
}
return SLURM_ERROR;
}
static void _slurm_rpc_set_fs_dampening_factor(slurm_msg_t *msg)
{
slurmctld_lock_t config_write_lock =
{ WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
set_fs_dampening_factor_msg_t *request_msg = msg->data;
uint16_t factor;
if (!validate_super_user(msg->auth_uid)) {
error("set FairShareDampeningFactor request from non-super user uid=%u",
msg->auth_uid);
slurm_send_rc_msg(msg, EACCES);
return;
}
factor = request_msg->dampening_factor;
lock_slurmctld(config_write_lock);
slurm_conf.fs_dampening_factor = factor;
slurm_conf.last_update = time(NULL);
priority_g_reconfig(false);
unlock_slurmctld(config_write_lock);
info("Set FairShareDampeningFactor to %u", factor);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
static void _slurm_rpc_request_crontab(slurm_msg_t *msg)
{
DEF_TIMERS;
int rc = SLURM_SUCCESS;
crontab_request_msg_t *req_msg = msg->data;
buf_t *crontab = NULL;
char *disabled_lines = NULL;
slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
START_TIMER;
debug3("Processing RPC details: REQUEST_CRONTAB for uid=%u",
req_msg->uid);
if (!xstrcasestr(slurm_conf.scron_params, "enable")) {
error("%s: scrontab is disabled on this cluster", __func__);
slurm_send_rc_msg(msg, SLURM_ERROR);
return;
}
lock_slurmctld(job_read_lock);
if ((req_msg->uid != msg->auth_uid) &&
!validate_operator(msg->auth_uid)) {
rc = ESLURM_USER_ID_MISSING;
} else {
char *file = NULL;
xstrfmtcat(file, "%s/crontab/crontab.%u",
slurm_conf.state_save_location, req_msg->uid);
if (!(crontab = create_mmap_buf(file)))
rc = ESLURM_JOB_SCRIPT_MISSING;
else {
int len = strlen(crontab->head) + 1;
disabled_lines = xstrndup(crontab->head + len,
crontab->size - len);
/*
* Remove extra trailing command which would be
* parsed as an extraneous 0.
*/
if (disabled_lines) {
len = strlen(disabled_lines) - 1;
disabled_lines[len] = '\0';
}
}
xfree(file);
}
unlock_slurmctld(job_read_lock);
END_TIMER2(__func__);
if (rc != SLURM_SUCCESS) {
slurm_send_rc_msg(msg, rc);
} else {
crontab_response_msg_t resp_msg = {
.crontab = crontab->head,
.disabled_lines = disabled_lines,
};
(void) send_msg_response(msg, RESPONSE_CRONTAB, &resp_msg);
FREE_NULL_BUFFER(crontab);
xfree(disabled_lines);
}
}
static void _slurm_rpc_update_crontab(slurm_msg_t *msg)
{
DEF_TIMERS;
crontab_update_request_msg_t *req_msg = msg->data;
crontab_update_response_msg_t *resp_msg;
/* probably need to mirror _slurm_rpc_dump_batch_script() */
slurmctld_lock_t job_write_lock =
{ READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
START_TIMER;
debug3("Processing RPC details: REQUEST_UPDATE_CRONTAB for uid=%u",
req_msg->uid);
if (!xstrcasestr(slurm_conf.scron_params, "enable")) {
error("%s: scrontab is disabled on this cluster", __func__);
slurm_send_rc_msg(msg, SLURM_ERROR);
return;
}
resp_msg = xmalloc(sizeof(*resp_msg));
resp_msg->err_msg = NULL;
resp_msg->job_submit_user_msg = NULL;
resp_msg->failed_lines = NULL;
resp_msg->return_code = SLURM_SUCCESS;
lock_slurmctld(job_write_lock);
if (((req_msg->uid != msg->auth_uid) ||
(req_msg->gid != msg->auth_gid)) &&
!validate_slurm_user(msg->auth_uid)) {
resp_msg->return_code = ESLURM_USER_ID_MISSING;
}
if (!resp_msg->return_code) {
char *alloc_node = NULL;
void *id = NULL;
_set_hostname(msg, &alloc_node);
_set_identity(msg, &id);
if (!alloc_node || (alloc_node[0] == '\0'))
resp_msg->return_code = ESLURM_INVALID_NODE_NAME;
else
crontab_submit(req_msg, resp_msg, alloc_node, id,
msg->protocol_version);
xfree(alloc_node);
FREE_NULL_IDENTITY(id);
}
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
(void) send_msg_response(msg, RESPONSE_UPDATE_CRONTAB, resp_msg);
slurm_free_crontab_update_response_msg(resp_msg);
}
static void _slurm_rpc_node_alias_addrs(slurm_msg_t *msg)
{
DEF_TIMERS;
char *node_list = ((slurm_node_alias_addrs_t *)msg->data)->node_list;
slurm_node_alias_addrs_t alias_addrs = {0};
hostlist_t *hl;
bitstr_t *node_bitmap = NULL;
node_record_t *node_ptr;
slurmctld_lock_t node_read_lock = {
.node = READ_LOCK,
};
START_TIMER;
debug3("Processing RPC details: REQUEST_NODE_ALIAS_ADDRS");
lock_slurmctld(node_read_lock);
if (!(hl = hostlist_create(node_list))) {
error("hostlist_create error for %s: %m",
node_list);
goto end_it;
}
hostlist2bitmap(hl, true, &node_bitmap);
FREE_NULL_HOSTLIST(hl);
if (bit_ffs(node_bitmap) != -1) {
int addr_index = 0;
alias_addrs.node_list = bitmap2node_name_sortable(node_bitmap,
false);
alias_addrs.node_cnt = bit_set_count(node_bitmap);
alias_addrs.node_addrs = xcalloc(alias_addrs.node_cnt,
sizeof(slurm_addr_t));
for (int i = 0; (node_ptr = next_node_bitmap(node_bitmap, &i));
i++) {
slurm_conf_get_addr(
node_ptr->name,
&alias_addrs.node_addrs[addr_index++], 0);
}
}
end_it:
unlock_slurmctld(node_read_lock);
END_TIMER2(__func__);
if (alias_addrs.node_addrs) {
(void) send_msg_response(msg, RESPONSE_NODE_ALIAS_ADDRS, &alias_addrs);
} else {
slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
}
xfree(alias_addrs.node_addrs);
xfree(alias_addrs.node_list);
FREE_NULL_BITMAP(node_bitmap);
}
static void _slurm_rpc_dbd_relay(slurm_msg_t *msg)
{
DEF_TIMERS;
int rc;
persist_msg_t *persist_msg = msg->data;
START_TIMER;
debug3("Processing RPC details: REQUEST_DBD_RELAY");
if (!validate_slurmd_user(msg->auth_uid)) {
error("Security violation, %s RPC from uid=%u",
rpc_num2string(msg->msg_type), msg->auth_uid);
return;
}
rc = acct_storage_g_relay_msg(acct_db_conn, persist_msg);
END_TIMER2(__func__);
slurm_send_rc_msg(msg, rc);
}
slurmctld_rpc_t slurmctld_rpcs[] =
{
{
.msg_type = REQUEST_RESOURCE_ALLOCATION,
.func = _slurm_rpc_allocate_resources,
},{
.msg_type = REQUEST_HET_JOB_ALLOCATION,
.func = _slurm_rpc_allocate_het_job,
},{
.msg_type = REQUEST_BUILD_INFO,
.func = _slurm_rpc_dump_conf,
},{
.msg_type = REQUEST_JOB_INFO,
.func = _slurm_rpc_dump_jobs,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = READ_LOCK,
.part = READ_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_JOB_STATE,
.func = _slurm_rpc_job_state,
},{
.msg_type = REQUEST_JOB_USER_INFO,
.func = _slurm_rpc_dump_jobs_user,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = READ_LOCK,
.part = READ_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_JOB_INFO_SINGLE,
.func = _slurm_rpc_dump_job_single,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = READ_LOCK,
.part = READ_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_HOSTLIST_EXPANSION,
.func = _slurm_rpc_hostlist_expansion,
.queue_enabled = true,
.locks = {
.node = READ_LOCK,
},
},{
.msg_type = REQUEST_BATCH_SCRIPT,
.func = _slurm_rpc_dump_batch_script,
},{
.msg_type = REQUEST_SHARE_INFO,
.func = _slurm_rpc_get_shares,
},{
.msg_type = REQUEST_PRIORITY_FACTORS,
.func = _slurm_rpc_get_priority_factors,
},{
.msg_type = REQUEST_JOB_END_TIME,
.func = _slurm_rpc_end_time,
},{
.msg_type = REQUEST_FED_INFO,
.func = _slurm_rpc_get_fed,
.queue_enabled = true,
.locks = {
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_NODE_INFO,
.func = _slurm_rpc_dump_nodes,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.node = WRITE_LOCK,
.part = READ_LOCK,
},
},{
.msg_type = REQUEST_NODE_INFO_SINGLE,
.func = _slurm_rpc_dump_node_single,
},{
.msg_type = REQUEST_PARTITION_INFO,
.func = _slurm_rpc_dump_partitions,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.part = READ_LOCK,
},
},{
.msg_type = MESSAGE_EPILOG_COMPLETE,
.max_per_cycle = 256,
.func = _slurm_rpc_epilog_complete,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
},
},{
.msg_type = REQUEST_CANCEL_JOB_STEP,
.func = _slurm_rpc_job_step_kill,
},{
.msg_type = REQUEST_COMPLETE_JOB_ALLOCATION,
.func = _slurm_rpc_complete_job_allocation,
},{
.msg_type = REQUEST_COMPLETE_PROLOG,
.func = _slurm_rpc_complete_prolog,
.queue_enabled = true,
.locks = {
.job = WRITE_LOCK,
},
},{
.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT,
.max_per_cycle = 256,
.func = _slurm_rpc_complete_batch_script,
.queue_enabled = true,
.locks = {
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_JOB_STEP_CREATE,
.func = _slurm_rpc_job_step_create,
.skip_stale = true,
.queue_enabled = true,
.locks = {
.job = WRITE_LOCK,
.node = READ_LOCK,
},
},{
.msg_type = REQUEST_JOB_STEP_INFO,
.func = _slurm_rpc_job_step_get_info,
},{
.msg_type = REQUEST_JOB_WILL_RUN,
.func = _slurm_rpc_job_will_run,
},{
.msg_type = REQUEST_SIB_JOB_LOCK,
.func = _slurm_rpc_sib_job_lock,
},{
.msg_type = REQUEST_SIB_JOB_UNLOCK,
.func = _slurm_rpc_sib_job_unlock,
},{
.msg_type = REQUEST_CTLD_MULT_MSG,
.func = _proc_multi_msg,
},{
.msg_type = MESSAGE_NODE_REGISTRATION_STATUS,
.func = _slurm_rpc_node_registration,
.post_func = _slurm_post_rpc_node_registration,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.part = WRITE_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_JOB_ALLOCATION_INFO,
.func = _slurm_rpc_job_alloc_info,
},{
.msg_type = REQUEST_HET_JOB_ALLOC_INFO,
.func = _slurm_rpc_het_job_alloc_info,
.skip_stale = true,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = READ_LOCK,
.node = READ_LOCK,
.part = NO_LOCK,
},
},{
.msg_type = REQUEST_JOB_SBCAST_CRED,
.func = _slurm_rpc_job_sbcast_cred,
},{
.msg_type = REQUEST_SBCAST_CRED_NO_JOB,
.func = _slurm_rpc_sbcast_cred_no_job,
},{
.msg_type = REQUEST_PING,
.func = _slurm_rpc_ping,
},{
.msg_type = REQUEST_RECONFIGURE,
.func = _slurm_rpc_reconfigure_controller,
.keep_msg = true,
},{
.msg_type = REQUEST_CONTROL,
.func = _slurm_rpc_request_control,
},{
.msg_type = REQUEST_TAKEOVER,
.func = _slurm_rpc_takeover,
},{
.msg_type = REQUEST_SHUTDOWN,
.func = _slurm_rpc_shutdown_controller,
},{
.msg_type = REQUEST_SUBMIT_BATCH_JOB,
.max_per_cycle = 256,
.func = _slurm_rpc_submit_batch_job,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.part = READ_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_SUBMIT_BATCH_HET_JOB,
.func = _slurm_rpc_submit_batch_het_job,
},{
.msg_type = REQUEST_UPDATE_JOB,
.func = _slurm_rpc_update_job,
},{
.msg_type = REQUEST_CREATE_NODE,
.func = _slurm_rpc_create_node,
},{
.msg_type = REQUEST_UPDATE_NODE,
.func = _slurm_rpc_update_node,
},{
.msg_type = REQUEST_DELETE_NODE,
.func = _slurm_rpc_delete_node,
},{
.msg_type = REQUEST_CREATE_PARTITION,
.func = _slurm_rpc_update_partition,
},{
.msg_type = REQUEST_UPDATE_PARTITION,
.func = _slurm_rpc_update_partition,
},{
.msg_type = REQUEST_DELETE_PARTITION,
.func = _slurm_rpc_delete_partition,
},{
.msg_type = REQUEST_CREATE_RESERVATION,
.func = _slurm_rpc_resv_create,
},{
.msg_type = REQUEST_UPDATE_RESERVATION,
.func = _slurm_rpc_resv_update,
},{
.msg_type = REQUEST_DELETE_RESERVATION,
.func = _slurm_rpc_resv_delete,
},{
.msg_type = REQUEST_RESERVATION_INFO,
.func = _slurm_rpc_resv_show,
},{
.msg_type = REQUEST_NODE_REGISTRATION_STATUS,
.func = _slurm_rpc_node_registration_status,
},{
.msg_type = REQUEST_SUSPEND,
.func = _slurm_rpc_suspend,
},{
.msg_type = REQUEST_TOP_JOB,
.func = _slurm_rpc_top_job,
},{
.msg_type = REQUEST_AUTH_TOKEN,
.func = _slurm_rpc_auth_token,
},{
.msg_type = REQUEST_JOB_REQUEUE,
.func = _slurm_rpc_requeue,
},{
.msg_type = REQUEST_JOB_READY,
.func = _slurm_rpc_job_ready,
},{
.msg_type = REQUEST_BURST_BUFFER_INFO,
.func = _slurm_rpc_burst_buffer_info,
},{
.msg_type = REQUEST_STEP_BY_CONTAINER_ID,
.func = _slurm_rpc_step_by_container_id,
},{
.msg_type = REQUEST_STEP_COMPLETE,
.max_per_cycle = 256,
.func = _slurm_rpc_step_complete,
.queue_enabled = true,
.locks = {
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_STEP_LAYOUT,
.func = _slurm_rpc_step_layout,
},{
.msg_type = REQUEST_UPDATE_JOB_STEP,
.func = _slurm_rpc_step_update,
},{
.msg_type = REQUEST_CONFIG,
.func = _slurm_rpc_config_request,
},{
.msg_type = REQUEST_TRIGGER_SET,
.func = _slurm_rpc_trigger_set,
},{
.msg_type = REQUEST_TRIGGER_GET,
.func = _slurm_rpc_trigger_get,
},{
.msg_type = REQUEST_TRIGGER_CLEAR,
.func = _slurm_rpc_trigger_clear,
},{
.msg_type = REQUEST_TRIGGER_PULL,
.func = _slurm_rpc_trigger_pull,
},{
.msg_type = REQUEST_JOB_NOTIFY,
.func = _slurm_rpc_job_notify,
},{
.msg_type = REQUEST_SET_DEBUG_FLAGS,
.func = _slurm_rpc_set_debug_flags,
},{
.msg_type = REQUEST_SET_DEBUG_LEVEL,
.func = _slurm_rpc_set_debug_level,
},{
.msg_type = REQUEST_SET_SCHEDLOG_LEVEL,
.func = _slurm_rpc_set_schedlog_level,
},{
.msg_type = REQUEST_SET_SUSPEND_EXC_NODES,
.func = _slurm_rpc_set_suspend_exc_nodes,
},{
.msg_type = REQUEST_SET_SUSPEND_EXC_PARTS,
.func = _slurm_rpc_set_suspend_exc_parts,
},{
.msg_type = REQUEST_SET_SUSPEND_EXC_STATES,
.func = _slurm_rpc_set_suspend_exc_states,
},{
.msg_type = ACCOUNTING_UPDATE_MSG,
.func = _slurm_rpc_accounting_update_msg,
},{
.msg_type = ACCOUNTING_FIRST_REG,
.func = _slurm_rpc_accounting_first_reg,
},{
.msg_type = ACCOUNTING_REGISTER_CTLD,
.func = _slurm_rpc_accounting_register_ctld,
},{
.msg_type = REQUEST_TOPO_CONFIG,
.func = _slurm_rpc_get_topo_config,
},{
.msg_type = REQUEST_TOPO_INFO,
.func = _slurm_rpc_get_topo,
},{
.msg_type = REQUEST_REBOOT_NODES,
.func = _slurm_rpc_reboot_nodes,
},{
.msg_type = REQUEST_STATS_INFO,
.func = _slurm_rpc_dump_stats,
},{
.msg_type = REQUEST_LICENSE_INFO,
.func = _slurm_rpc_dump_licenses,
},{
.msg_type = REQUEST_KILL_JOB,
.max_per_cycle = 256,
.func = _slurm_rpc_kill_job,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_KILL_JOBS,
.max_per_cycle = 256,
.func = _slurm_rpc_kill_jobs,
.queue_enabled = true,
.locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
},
},{
.msg_type = REQUEST_ASSOC_MGR_INFO,
.func = _slurm_rpc_assoc_mgr_info,
},{
.msg_type = REQUEST_PERSIST_INIT,
.func = _slurm_rpc_persist_init,
},{
.msg_type = REQUEST_SET_FS_DAMPENING_FACTOR,
.func = _slurm_rpc_set_fs_dampening_factor,
},{
.msg_type = REQUEST_CONTROL_STATUS,
.func = slurm_rpc_control_status,
},{
.msg_type = REQUEST_BURST_BUFFER_STATUS,
.func = _slurm_rpc_burst_buffer_status,
},{
.msg_type = REQUEST_CRONTAB,
.func = _slurm_rpc_request_crontab,
},{
.msg_type = REQUEST_UPDATE_CRONTAB,
.func = _slurm_rpc_update_crontab,
},{
.msg_type = REQUEST_TLS_CERT,
.func = _slurm_rpc_tls_cert,
},{
.msg_type = REQUEST_NODE_ALIAS_ADDRS,
.func = _slurm_rpc_node_alias_addrs,
},{
.msg_type = REQUEST_DBD_RELAY,
.func = _slurm_rpc_dbd_relay,
},{ /* terminate the array. this must be last. */
.msg_type = 0,
.func = NULL,
}
};
extern slurmctld_rpc_t *find_rpc(uint16_t msg_type)
{
for (slurmctld_rpc_t *q = slurmctld_rpcs; q->msg_type; q++) {
if (q->msg_type == msg_type) {
xassert(q->func);
return q;
}
}
return NULL;
}
extern void slurmctld_req(slurm_msg_t *msg, slurmctld_rpc_t *this_rpc)
{
DEF_TIMERS;
int fd = conn_g_get_fd(msg->tls_conn);
fd_set_nonblocking(fd);
if (!msg->auth_ids_set) {
error("%s: received message without previously validated auth",
__func__);
return;
}
/* Debug the protocol layer.
*/
START_TIMER;
if (slurm_conf.debug_flags & DEBUG_FLAG_PROTOCOL) {
const char *p = rpc_num2string(msg->msg_type);
if (msg->conn) {
info("%s: received opcode %s from persist conn on (%s)%s uid %u",
__func__, p, msg->conn->cluster_name,
msg->conn->rem_host, msg->auth_uid);
} else if (msg->address.ss_family != AF_UNSPEC) {
info("%s: received opcode %s from %pA uid %u",
__func__, p, &msg->address, msg->auth_uid);
} else {
slurm_addr_t cli_addr;
(void) slurm_get_peer_addr(fd, &cli_addr);
info("%s: received opcode %s from %pA uid %u",
__func__, p, &cli_addr, msg->auth_uid);
}
}
debug2("Processing RPC: %s from UID=%u",
rpc_num2string(msg->msg_type), msg->auth_uid);
if (this_rpc->skip_stale && !fd_is_writable(fd)) {
error("Connection is stale, discarding RPC %s",
rpc_num2string(msg->msg_type));
/* do not record RPC stats, we didn't process this */
return;
}
(*(this_rpc->func))(msg);
END_TIMER;
record_rpc_stats(msg, DELTA_TIMER);
}
static void _srun_agent_launch(slurm_addr_t *addr, char *tls_cert, char *host,
slurm_msg_type_t type, void *msg_args,
uid_t r_uid, uint16_t protocol_version)
{
agent_arg_t *agent_args = xmalloc(sizeof(agent_arg_t));
agent_args->node_count = 1;
agent_args->retry = 0;
agent_args->addr = addr;
agent_args->hostlist = hostlist_create(host);
agent_args->msg_type = type;
agent_args->msg_args = msg_args;
agent_args->tls_cert = xstrdup(tls_cert);
set_agent_arg_r_uid(agent_args, r_uid);
/*
* A federated job could have been submitted to a higher versioned
* origin cluster (job_ptr->start_protocol_ver), so we need to talk at
* the highest version that that THIS cluster understands.
*/
agent_args->protocol_version = MIN(SLURM_PROTOCOL_VERSION,
protocol_version);
agent_queue_request(agent_args);
}
static bool _pending_het_jobs(job_record_t *job_ptr)
{
job_record_t *het_job_leader, *het_job;
list_itr_t *iter;
bool pending_job = false;
if (job_ptr->het_job_id == 0)
return false;
het_job_leader = stepmgr_ops->find_job_record(job_ptr->het_job_id);
if (!het_job_leader) {
error("Hetjob leader %pJ not found", job_ptr);
return false;
}
if (!het_job_leader->het_job_list) {
error("Hetjob leader %pJ lacks het_job_list",
job_ptr);
return false;
}
iter = list_iterator_create(het_job_leader->het_job_list);
while ((het_job = list_next(iter))) {
if (het_job_leader->het_job_id != het_job->het_job_id) {
error("%s: Bad het_job_list for %pJ",
__func__, het_job_leader);
continue;
}
if (IS_JOB_PENDING(het_job)) {
pending_job = true;
break;
}
}
list_iterator_destroy(iter);
return pending_job;
}
static void _free_srun_alloc(void *x)
{
resource_allocation_response_msg_t *alloc_msg;
alloc_msg = (resource_allocation_response_msg_t *) x;
/* NULL working_cluster_rec because it's pointing to global memory */
alloc_msg->working_cluster_rec = NULL;
slurm_free_resource_allocation_response_msg(alloc_msg);
}
/*
* srun_allocate - notify srun of a resource allocation
* IN job_ptr - job allocated resources
*/
extern void srun_allocate(job_record_t *job_ptr)
{
job_record_t *het_job, *het_job_leader;
resource_allocation_response_msg_t *msg_arg = NULL;
slurm_addr_t *addr;
list_itr_t *iter;
list_t *job_resp_list = NULL;
xassert(job_ptr);
if (!job_ptr || !job_ptr->alloc_resp_port || !job_ptr->alloc_node ||
!job_ptr->resp_host || !job_ptr->job_resrcs ||
!job_ptr->job_resrcs->cpu_array_cnt)
return;
if (tls_enabled() && !job_ptr->alloc_tls_cert)
return;
if (job_ptr->het_job_id == 0) {
addr = xmalloc(sizeof(slurm_addr_t));
slurm_set_addr(addr, job_ptr->alloc_resp_port,
job_ptr->resp_host);
msg_arg = build_alloc_msg(job_ptr, SLURM_SUCCESS, NULL);
log_flag(TLS, "Certificate for allocation response listening socket:\n%s\n",
job_ptr->alloc_tls_cert);
_srun_agent_launch(addr, job_ptr->alloc_tls_cert,
job_ptr->alloc_node,
RESPONSE_RESOURCE_ALLOCATION, msg_arg,
job_ptr->user_id,
job_ptr->start_protocol_ver);
} else if (_pending_het_jobs(job_ptr)) {
return;
} else if ((het_job_leader =
stepmgr_ops->find_job_record(job_ptr->het_job_id))) {
addr = xmalloc(sizeof(slurm_addr_t));
slurm_set_addr(addr, het_job_leader->alloc_resp_port,
het_job_leader->resp_host);
job_resp_list = list_create(_free_srun_alloc);
iter = list_iterator_create(het_job_leader->het_job_list);
while ((het_job = list_next(iter))) {
if (het_job_leader->het_job_id !=
het_job->het_job_id) {
error("%s: Bad het_job_list for %pJ",
__func__, het_job_leader);
continue;
}
msg_arg = build_alloc_msg(het_job, SLURM_SUCCESS,
NULL);
list_append(job_resp_list, msg_arg);
msg_arg = NULL;
}
list_iterator_destroy(iter);
_srun_agent_launch(addr, job_ptr->alloc_tls_cert,
job_ptr->alloc_node,
RESPONSE_HET_JOB_ALLOCATION, job_resp_list,
job_ptr->user_id,
job_ptr->start_protocol_ver);
} else {
error("%s: Can not find hetjob leader %pJ",
__func__, job_ptr);
}
}