blob: 65d7b165cc7adcf37abd70723415dd77316df4ee [file] [log] [blame]
/*****************************************************************************\
* controller.c - main control machine daemon for slurm
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Copyright (C) SchedMD LLC.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>, Kevin Tew <tew1@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#include <errno.h>
#include <getopt.h>
#include <grp.h>
#include <poll.h>
#include <pthread.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <unistd.h>
#include "slurm/slurm_errno.h"
#include "src/common/assoc_mgr.h"
#include "src/common/daemonize.h"
#include "src/common/extra_constraints.h"
#include "src/common/fd.h"
#include "src/common/group_cache.h"
#include "src/common/hostlist.h"
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/pack.h"
#include "src/common/port_mgr.h"
#include "src/common/proc_args.h"
#include "src/common/read_config.h"
#include "src/common/ref.h"
#include "src/common/run_command.h"
#include "src/common/sluid.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/slurm_rlimits_info.h"
#include "src/common/state_save.h"
#include "src/common/timers.h"
#include "src/common/track_script.h"
#include "src/common/uid.h"
#include "src/common/util-net.h"
#include "src/common/xstring.h"
#include "src/common/xsystemd.h"
#include "src/conmgr/conmgr.h"
#include "src/interfaces/accounting_storage.h"
#include "src/interfaces/acct_gather_profile.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/burst_buffer.h"
#include "src/interfaces/certmgr.h"
#include "src/interfaces/cgroup.h"
#include "src/interfaces/conn.h"
#include "src/interfaces/gres.h"
#include "src/interfaces/hash.h"
#include "src/interfaces/job_submit.h"
#include "src/interfaces/jobacct_gather.h"
#include "src/interfaces/jobcomp.h"
#include "src/interfaces/mcs.h"
#include "src/interfaces/mpi.h"
#include "src/interfaces/node_features.h"
#include "src/interfaces/preempt.h"
#include "src/interfaces/prep.h"
#include "src/interfaces/priority.h"
#include "src/interfaces/sched_plugin.h"
#include "src/interfaces/select.h"
#include "src/interfaces/serializer.h"
#include "src/interfaces/site_factor.h"
#include "src/interfaces/switch.h"
#include "src/interfaces/topology.h"
#include "src/slurmctld/acct_policy.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/fed_mgr.h"
#include "src/slurmctld/gang.h"
#include "src/slurmctld/heartbeat.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/licenses.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/ping_nodes.h"
#include "src/slurmctld/power_save.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/rate_limit.h"
#include "src/slurmctld/read_config.h"
#include "src/slurmctld/reservation.h"
#include "src/slurmctld/rpc_queue.h"
#include "src/slurmctld/sackd_mgr.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/slurmscriptd.h"
#include "src/slurmctld/state_save.h"
#include "src/slurmctld/trigger_mgr.h"
#include "src/stepmgr/srun_comm.h"
#include "src/stepmgr/stepmgr.h"
decl_static_data(usage_txt);
#define SLURMCTLD_CONMGR_DEFAULT_MAX_CONNECTIONS 50
#define MIN_CHECKIN_TIME 3 /* Nodes have this number of seconds to
* check-in before we ping them */
#define SHUTDOWN_WAIT 2 /* Time to wait for backup server shutdown */
#define JOB_COUNT_INTERVAL 30 /* Time to update running job count */
#define DEV_TTY_PATH "/dev/tty"
#define DEV_NULL_PATH "/dev/null"
/**************************************************************************\
* To test for memory leaks, set MEMORY_LEAK_DEBUG to 1 using
* "configure --enable-memory-leak-debug" then execute
*
* $ valgrind --tool=memcheck --leak-check=yes --num-callers=40 \
* --leak-resolution=high ./slurmctld -Dc >valg.ctld.out 2>&1
*
* Then exercise the slurmctld functionality before executing
* > scontrol shutdown
*
* Note that --enable-memory-leak-debug will cause the daemon to
* unload the shared objects at exit thus preventing valgrind
* to display the stack where the eventual leaks may be.
* It is always best to test with and without --enable-memory-leak-debug.
*
* On some systems _keyvalue_regex_init() will generate two blocks "definitely
* lost", both of size zero.
* On some systems dlopen() will generate a small number of "definitely
* lost" blocks that are not cleared by dlclose().
* On some systems, pthread_create() will generated a small number of
* "possibly lost" blocks.
* Otherwise the report should be free of errors. Remember to reset
* MEMORY_LEAK_DEBUG to 0 for production use (non-seamless backup
* controller use).
\**************************************************************************/
uint32_t slurm_daemon = IS_SLURMCTLD;
/* Log to stderr and syslog until becomes a daemon */
log_options_t log_opts = LOG_OPTS_INITIALIZER;
/* Scheduler Log options */
log_options_t sched_log_opts = SCHEDLOG_OPTS_INITIALIZER;
/* Global variables */
bool preempt_send_user_signal = false;
uint16_t accounting_enforce = 0;
void * acct_db_conn = NULL;
int backup_inx;
int batch_sched_delay = 3;
bool cloud_dns = false;
uint32_t cluster_cpus = 0;
time_t control_time = 0;
bool disable_remote_singleton = false;
int max_depend_depth = 10;
time_t last_proc_req_start = 0;
uint32_t max_powered_nodes = NO_VAL;
bool ping_nodes_now = false;
pthread_cond_t purge_thread_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t purge_thread_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t check_bf_running_lock = PTHREAD_MUTEX_INITIALIZER;
int sched_interval = 60;
slurmctld_config_t slurmctld_config = {0};
diag_stats_t slurmctld_diag_stats;
bool slurmctld_primary = true;
bool want_nodes_reboot = true;
int slurmctld_tres_cnt = 0;
slurmdb_cluster_rec_t *response_cluster_rec = NULL;
uint16_t running_cache = RUNNING_CACHE_STATE_NOTRUNNING;
pthread_mutex_t assoc_cache_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t assoc_cache_cond = PTHREAD_COND_INITIALIZER;
/* Local variables */
static pthread_t assoc_cache_thread = (pthread_t) 0;
static char binary[PATH_MAX];
static int bu_rc = SLURM_SUCCESS;
static int bu_thread_cnt = 0;
static pthread_cond_t bu_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t bu_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool daemonize = true;
static bool setwd = false;
static int debug_level = 0;
static char * debug_logfile = NULL;
static bool dump_core = false;
static int job_sched_cnt = 0;
static int main_argc = 0;
static char **main_argv = NULL;
static uint32_t max_server_threads = MAX_SERVER_THREADS;
static time_t next_stats_reset = 0;
static int new_nice = 0;
static bool original = true;
static int pidfd = -1;
/*
* 0 = use no saved state information
* 1 = recover saved job state,
* node DOWN/DRAIN state & reason information
* 2 = recover state saved from last shutdown
*/
static int recover = 1;
static pthread_mutex_t sched_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
static char * slurm_conf_filename;
static int reconfig_rc = SLURM_SUCCESS;
static bool reconfig = false;
static list_t *reconfig_reqs = NULL;
static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t shutdown_cond = PTHREAD_COND_INITIALIZER;
static bool under_systemd = false;
/* Array of listening sockets */
static struct {
pthread_mutex_t mutex;
int count;
int *fd;
conmgr_fd_t **cons;
bool standby_mode;
bool quiesced;
} listeners = {
.mutex = PTHREAD_MUTEX_INITIALIZER,
.quiesced = true,
};
static int _accounting_cluster_ready();
static int _accounting_mark_all_nodes_down(char *reason);
static void * _assoc_cache_mgr(void *no_data);
static int _controller_index(void);
static void _create_clustername_file(void);
static void _flush_rpcs(void);
static void _get_fed_updates();
static void _init_config(void);
static void _init_pidfile(void);
static int _init_tres(void);
static void _kill_old_slurmctld(void);
static void _open_ports(void);
static void _parse_commandline(int argc, char **argv);
static void _post_reconfig(void);
static void * _purge_files_thread(void *no_data);
static void *_acct_update_thread(void *no_data);
static void _remove_assoc(slurmdb_assoc_rec_t *rec);
static void _remove_qos(slurmdb_qos_rec_t *rec);
static void _restore_job_dependencies(void);
static void _run_primary_prog(bool primary_on);
static void _send_future_cloud_to_db();
static void _service_connection(conmgr_callback_args_t conmgr_args,
int input_fd, int output_fd, void *tls_conn,
void *arg);
static void _set_work_dir(void);
static int _shutdown_backup_controller(void);
static void * _slurmctld_background(void *no_data);
static void _test_thread_limit(void);
static int _try_to_reconfig(void);
static void _update_assoc(slurmdb_assoc_rec_t *rec);
static void _update_diag_job_state_counts(void);
static void _update_cluster_tres(void);
static void _update_nice(void);
static void _update_pidfile(void);
static void _update_qos(slurmdb_qos_rec_t *rec);
static void _usage(void);
static void _verify_clustername(void);
static void _send_reconfig_replies(void)
{
slurm_msg_t *msg = NULL;
while ((msg = list_pop(reconfig_reqs))) {
/* Must avoid sending reply via msg->conmgr_con */
xassert(!msg->conmgr_con);
(void) slurm_send_rc_msg(msg, reconfig_rc);
conn_g_destroy(msg->tls_conn, true);
slurm_free_msg(msg);
}
}
static void _attempt_reconfig(void)
{
info("Attempting to reconfigure");
/*
* Reconfigure requires all connections to fully processed before
* continuing as the file descriptors will be closed during fork() and
* the parent process will call _exit() instead of finishing their
* processing if the new slurmctld process starts successfully.
*/
conmgr_quiesce(__func__);
/*
* Send RC to requesters in foreground mode now as slurmctld is about
* to call exec() which will close connections.
*/
if (!daemonize && !under_systemd)
_send_reconfig_replies();
reconfig_rc = _try_to_reconfig();
_send_reconfig_replies();
if (!reconfig_rc) {
info("Relinquishing control to new child");
_exit(0);
}
recover = 2;
/*
* Reconfigure failed which means this process needs start again
* processing connections.
*/
conmgr_unquiesce(__func__);
}
static void _on_sigint(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("Terminate signal SIGINT received");
slurmctld_shutdown();
}
static void _on_sigterm(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("Terminate signal SIGTERM received");
slurmctld_shutdown();
}
static void _on_sigchld(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
debug5("Caught SIGCHLD. Ignoring");
}
static void _on_sigquit(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("Terminate signal SIGQUIT received");
slurmctld_shutdown();
}
static void _on_sighup(conmgr_callback_args_t conmgr_args, void *arg)
{
bool standby_mode;
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("Reconfigure signal (SIGHUP) received");
slurm_mutex_lock(&listeners.mutex);
standby_mode = listeners.standby_mode;
slurm_mutex_unlock(&listeners.mutex);
if (standby_mode) {
backup_on_sighup();
return;
}
reconfig = true;
slurmctld_shutdown();
}
static void _on_sigusr1(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
debug5("Caught SIGUSR1. Ignoring.");
}
static void _on_sigusr2(conmgr_callback_args_t conmgr_args, void *arg)
{
static const slurmctld_lock_t conf_write_lock = {
.conf = WRITE_LOCK,
};
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("Logrotate signal (SIGUSR2) received");
lock_slurmctld(conf_write_lock);
update_logging();
if (slurmctld_primary)
slurmscriptd_update_log_level(slurm_conf.slurmctld_debug, true);
unlock_slurmctld(conf_write_lock);
/*
* This can happen when jobcomp hasn't been init yet, so call it here.
* It is a NOOP if it has already been init.
*/
if (slurmctld_primary && jobcomp_g_init() && jobcomp_g_set_location())
error("%s: JobComp set location operation failed on SIGUSR2",
__func__);
}
static void _on_sigpipe(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
debug5("Caught SIGPIPE. Ignoring.");
}
static void _on_sigxcpu(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
debug5("Caught SIGXCPU. Ignoring.");
}
static void _on_sigabrt(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
info("SIGABRT received");
slurmctld_shutdown();
dump_core = true;
}
static void _on_sigalrm(conmgr_callback_args_t conmgr_args, void *arg)
{
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
return;
debug5("Caught SIGALRM. Ignoring.");
}
static void _register_signal_handlers(conmgr_callback_args_t conmgr_args,
void *arg)
{
conmgr_add_work_signal(SIGINT, _on_sigint, NULL);
conmgr_add_work_signal(SIGTERM, _on_sigterm, NULL);
conmgr_add_work_signal(SIGCHLD, _on_sigchld, NULL);
conmgr_add_work_signal(SIGQUIT, _on_sigquit, NULL);
conmgr_add_work_signal(SIGHUP, _on_sighup, NULL);
conmgr_add_work_signal(SIGUSR1, _on_sigusr1, NULL);
conmgr_add_work_signal(SIGUSR2, _on_sigusr2, NULL);
conmgr_add_work_signal(SIGPIPE, _on_sigpipe, NULL);
conmgr_add_work_signal(SIGXCPU, _on_sigxcpu, NULL);
conmgr_add_work_signal(SIGABRT, _on_sigabrt, NULL);
conmgr_add_work_signal(SIGALRM, _on_sigalrm, NULL);
}
static void _reopen_stdio(void)
{
int devnull = -1;
if ((devnull = open(DEV_NULL_PATH, O_RDWR)) < 0)
fatal_abort("Unable to open %s: %m", DEV_NULL_PATH);
dup2(devnull, STDIN_FILENO);
dup2(devnull, STDOUT_FILENO);
dup2(devnull, STDERR_FILENO);
if (devnull > STDERR_FILENO)
fd_close(&devnull);
#ifdef __linux__
if (isatty(STDOUT_FILENO) && !daemonize) {
int tty = -1;
if ((tty = open(DEV_TTY_PATH, O_WRONLY)) > 0 && isatty(tty)) {
dup2(tty, STDOUT_FILENO);
dup2(tty, STDERR_FILENO);
}
if (tty > STDERR_FILENO)
fd_close(&tty);
}
#endif /* __linux__ */
}
static void _init_db_conn(void)
{
int rc;
/*
* errno is used to get an error when establishing the persistent
* connection. Set errno to 0 here to avoid a previous value for
* errno causing us to think there was a problem.
* FIXME: Stop using errno for control flow here.
*/
errno = 0;
if (acct_db_conn)
acct_storage_g_close_connection(&acct_db_conn);
acct_db_conn = acct_storage_g_get_connection(
0, NULL, false, slurm_conf.cluster_name);
rc = clusteracct_storage_g_register_ctld(acct_db_conn,
slurm_conf.slurmctld_port);
if (rc & RC_AS_CLUSTER_ID) {
uint16_t id = rc & ~RC_AS_CLUSTER_ID;
if (slurm_conf.cluster_id && (id != slurm_conf.cluster_id)) {
fatal("CLUSTER ID MISMATCH.\n"
"slurmctld has been started with \"ClusterID=%u\" from the state files in StateSaveLocation, but the DBD thinks it should be \"%u\".\n"
"Running multiple clusters from a shared StateSaveLocation WILL CAUSE CORRUPTION.\n"
"Remove %s/clustername to override this safety check if this is intentional.",
slurm_conf.cluster_id, id,
slurm_conf.state_save_location);
} else if (!slurm_conf.cluster_id) {
slurm_conf.cluster_id = id;
_create_clustername_file();
} else {
clustername_existed = 1;
}
}
}
/*
* Retry connecting to the dbd and initializing assoc_mgr until success, or
* fatal on shutdown.
*/
static void _retry_init_db_conn(assoc_init_args_t *args)
{
while (true) {
struct timespec ts = timespec_now();
ts.tv_sec += 2;
slurm_mutex_lock(&shutdown_mutex);
slurm_cond_timedwait(&shutdown_cond, &shutdown_mutex, &ts);
slurm_mutex_unlock(&shutdown_mutex);
if (slurmctld_config.shutdown_time)
fatal("slurmdbd must be up at slurmctld start time");
error("Retrying initial connection to slurmdbd");
_init_db_conn();
if (!slurm_conf.cluster_id) {
error("Still don't know my ClusterID");
continue;
}
if (!assoc_mgr_init(acct_db_conn, args, errno))
break;
}
}
/* main - slurmctld main function, start various threads and process RPCs */
int main(int argc, char **argv)
{
int error_code;
struct timeval start, now;
struct stat stat_buf;
struct rlimit rlim;
/* Locks: Write configuration, job, node, and partition */
slurmctld_lock_t config_write_lock = {
WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
prep_callbacks_t prep_callbacks = {
.prolog_slurmctld = prep_prolog_slurmctld_callback,
.epilog_slurmctld = prep_epilog_slurmctld_callback,
};
bool backup_has_control = false;
bool slurmscriptd_mode = false;
char *conf_file;
stepmgr_ops_t stepmgr_ops = {0};
stepmgr_ops.agent_queue_request = agent_queue_request;
stepmgr_ops.find_job_array_rec = find_job_array_rec;
stepmgr_ops.find_job_record = find_job_record;
stepmgr_ops.job_config_fini = job_config_fini;
stepmgr_ops.last_job_update = &last_job_update;
stepmgr_init(&stepmgr_ops);
main_argc = argc;
main_argv = argv;
if (getenv("SLURMCTLD_RECONF"))
original = false;
if (getenv(SLURMSCRIPTD_MODE_ENV))
slurmscriptd_mode = true;
/*
* Make sure we have no extra open files which
* would be propagated to spawned tasks.
*/
if (original || slurmscriptd_mode) {
closeall(slurmscriptd_mode ? SLURMSCRIPT_CLOSEALL :
(STDERR_FILENO + 1));
}
if (slurmscriptd_mode)
_reopen_stdio();
/*
* Establish initial configuration
*/
_init_config();
_parse_commandline(argc, argv);
log_init(argv[0], log_opts, LOG_DAEMON, NULL);
sched_log_init(argv[0], sched_log_opts, LOG_DAEMON, NULL);
/*
* Must pass in an explicit filename to slurm_conf_init() to avoid
* the "configless" mode of operation kicking in if no file is
* currently available.
*/
if (!(conf_file = slurm_conf_filename))
if (!(conf_file = getenv("SLURM_CONF")))
conf_file = default_slurm_config_file;
slurm_conf_init(conf_file);
lock_slurmctld(config_write_lock);
update_logging();
unlock_slurmctld(config_write_lock);
if (slurmscriptd_mode) {
/* Cleanup env */
(void) unsetenv(SLURMSCRIPTD_MODE_ENV);
/* Execute in slurmscriptd mode. */
become_slurm_user();
slurmscriptd_run_slurmscriptd(argc, argv, binary);
}
if (original && under_systemd &&
(slurm_conf.slurm_user_id != getuid())) {
/*
* Sanity check that we are running as the SlurmUser.
* If not fatal to prevent changing the permissions of the state
* save files and or losing the state save.
*/
fatal("Running user ID does not match the SlurmUser. Check that SlurmUser in slurm.conf and User in the slurmctld unit file match.");
}
memset(&slurmctld_diag_stats, 0, sizeof(slurmctld_diag_stats));
/*
* Calculate speed of gettimeofday() for sdiag.
* Large delays indicate the Linux vDSO is not in use, which
* will lead to significant scheduler performance issues.
*/
gettimeofday(&start, NULL);
for (int i = 0; i < 1000; i++)
gettimeofday(&now, NULL);
slurmctld_diag_stats.latency = (now.tv_sec - start.tv_sec) * 1000000;
slurmctld_diag_stats.latency += now.tv_usec - start.tv_usec;
if (slurmctld_diag_stats.latency > 200)
error("High latency for 1000 calls to gettimeofday(): %d microseconds",
slurmctld_diag_stats.latency);
/*
* Verify clustername from conf matches value in spool dir
* exit if inconsistent to protect state files from corruption.
* This needs to be done before we kill the old one just in case we
* fail.
*/
_verify_clustername();
_update_nice();
if (original)
_kill_old_slurmctld();
for (int i = 0; i < 3; i++)
fd_set_close_on_exec(i);
if (original && daemonize) {
if (xdaemon())
error("daemon(): %m");
sched_debug("slurmctld starting");
}
if (slurm_conf.slurmctld_params)
conmgr_set_params(slurm_conf.slurmctld_params);
conmgr_init(0, SLURMCTLD_CONMGR_DEFAULT_MAX_CONNECTIONS);
conmgr_add_work_fifo(_register_signal_handlers, NULL);
conmgr_run(false);
if (auth_g_init() != SLURM_SUCCESS)
fatal("failed to initialize auth plugin");
if (hash_g_init() != SLURM_SUCCESS)
fatal("failed to initialize hash plugin");
if (conn_g_init() != SLURM_SUCCESS)
fatal("Failed to initialize tls plugin");
if (certmgr_g_init() != SLURM_SUCCESS)
fatal("Failed to initialize certmgr plugin");
if (serializer_g_init() != SLURM_SUCCESS)
fatal("Failed to initialize serialization plugins.");
if (original && !under_systemd) {
/*
* Need to create pidfile here in case we setuid() below
* (init_pidfile() exits if it can't initialize pid file).
* On Linux we also need to make this setuid job explicitly
* able to write a core dump.
*/
_init_pidfile();
become_slurm_user();
}
reconfig_reqs = list_create(NULL);
rate_limit_init();
rpc_queue_init();
/* open ports must happen after become_slurm_user() */
_open_ports();
/*
* Create StateSaveLocation directory if necessary.
*/
set_slurmctld_state_loc();
if (daemonize || setwd)
_set_work_dir();
if (stat(slurm_conf.mail_prog, &stat_buf) != 0)
error("Configured MailProg is invalid");
if (!slurm_conf.accounting_storage_type) {
if (slurm_conf.job_acct_gather_type)
error("Job accounting information gathered, but not stored");
} else if (!slurm_conf.job_acct_gather_type)
info("Job accounting information stored, but details not gathered");
#ifdef PR_SET_DUMPABLE
if (prctl(PR_SET_DUMPABLE, 1) < 0)
debug ("Unable to set dumpable to 1");
#endif /* PR_SET_DUMPABLE */
/* Warn if the stack size is not unlimited */
if ((getrlimit(RLIMIT_STACK, &rlim) == 0) &&
(rlim.rlim_cur != RLIM_INFINITY))
info("Stack size set to %ld", rlim.rlim_max);
test_core_limit();
_test_thread_limit();
/*
* This creates a thread to listen to slurmscriptd, so this needs to
* happen after we block signals so that thread doesn't catch any
* signals.
*/
slurmscriptd_init(argv, binary);
if ((run_command_init(argc, argv, binary) != SLURM_SUCCESS) &&
binary[0])
fatal("%s: Unable to reliably execute %s", __func__, binary);
accounting_enforce = slurm_conf.accounting_storage_enforce;
if (slurm_with_slurmdbd()) {
/* we need job_list not to be NULL */
init_job_conf();
}
if (accounting_enforce && !slurm_with_slurmdbd()) {
accounting_enforce = 0;
slurm_conf.conf_flags &= (~CONF_FLAG_WCKEY);
slurm_conf.accounting_storage_enforce = 0;
error("You can not have AccountingStorageEnforce set for AccountingStorageType='%s'",
slurm_conf.accounting_storage_type);
}
info("slurmctld version %s started on cluster %s(%u)",
SLURM_VERSION_STRING, slurm_conf.cluster_name,
slurm_conf.cluster_id);
if ((error_code = gethostname_short(slurmctld_config.node_name_short,
HOST_NAME_MAX)))
fatal("getnodename_short error %s", slurm_strerror(error_code));
if ((error_code = gethostname(slurmctld_config.node_name_long,
HOST_NAME_MAX)))
fatal("getnodename error %s", slurm_strerror(error_code));
/* init job credential stuff */
if (cred_g_init() != SLURM_SUCCESS)
fatal("failed to initialize cred plugin");
/* Must set before plugins are loaded. */
backup_inx = _controller_index();
if (backup_inx == -1) {
error("This host (%s/%s) not a valid controller",
slurmctld_config.node_name_short,
slurmctld_config.node_name_long);
exit(1);
}
if (backup_inx > 0) {
slurmctld_primary = false;
if (xstrcasestr(slurm_conf.sched_params,
"no_backup_scheduling"))
slurmctld_config.scheduling_disabled = true;
}
if (!original && !slurmctld_primary) {
info("Restarted while operating as primary, resuming operation as primary.");
backup_has_control = true;
}
/*
* Initialize plugins.
* If running configuration test, report ALL failures.
*/
if (select_g_init() != SLURM_SUCCESS)
fatal("failed to initialize node selection plugin");
/* gres_init() must follow select_g_init() */
if (gres_init() != SLURM_SUCCESS)
fatal("failed to initialize gres plugin");
if (preempt_g_init() != SLURM_SUCCESS)
fatal("failed to initialize preempt plugin");
if (acct_gather_conf_init() != SLURM_SUCCESS)
fatal("failed to initialize acct_gather plugins");
if (jobacct_gather_init() != SLURM_SUCCESS)
fatal("failed to initialize jobacct_gather plugin");
if (job_submit_g_init(false) != SLURM_SUCCESS)
fatal("failed to initialize job_submit plugin");
if (prep_g_init(&prep_callbacks) != SLURM_SUCCESS)
fatal("failed to initialize prep plugin");
if (node_features_g_init() != SLURM_SUCCESS)
fatal("failed to initialize node_features plugin");
if (mpi_g_daemon_init() != SLURM_SUCCESS)
fatal("Failed to initialize MPI plugins.");
/* Fatal if we use extra_constraints without json serializer */
if (extra_constraints_enabled())
serializer_required(MIME_TYPE_JSON);
if (switch_g_init(true) != SLURM_SUCCESS)
fatal("Failed to initialize switch plugin");
/* Initialize licenses - serializer required to be initialized */
if (license_init(slurm_conf.licenses) != SLURM_SUCCESS)
fatal("Invalid Licenses value: %s", slurm_conf.licenses);
if (original && under_systemd)
xsystemd_change_mainpid(getpid());
while (1) {
bool reconfiguring = reconfig;
/* initialization for each primary<->backup switch */
slurmctld_config.shutdown_time = (time_t) 0;
slurmctld_config.resume_backup = false;
control_time = 0;
reconfig = false;
reconfig_rc = SLURM_SUCCESS;
agent_init();
/* start in primary or backup mode */
if (!slurmctld_primary && !backup_has_control) {
controller_fini_scheduling(); /* make sure shutdown */
_run_primary_prog(false);
if (acct_storage_g_init() != SLURM_SUCCESS)
fatal("failed to initialize accounting_storage plugin");
if (bb_g_init() != SLURM_SUCCESS)
fatal("failed to initialize burst buffer plugin");
slurm_mutex_lock(&listeners.mutex);
listeners.standby_mode = true;
slurm_mutex_unlock(&listeners.mutex);
/*
* run_backup() will never return unless it is time for
* standby to take control as backup controller
*/
run_backup();
slurm_mutex_lock(&listeners.mutex);
listeners.standby_mode = false;
slurm_mutex_unlock(&listeners.mutex);
(void) _shutdown_backup_controller();
} else {
if (acct_storage_g_init() != SLURM_SUCCESS)
fatal("failed to initialize accounting_storage plugin");
(void) _shutdown_backup_controller();
trigger_primary_ctld_res_ctrl();
ctld_assoc_mgr_init();
/*
* read_slurm_conf() will load the burst buffer state,
* init the burst buffer plugin early.
*/
if (bb_g_init() != SLURM_SUCCESS)
fatal("failed to initialize burst_buffer plugin");
/* Now recover the remaining state information */
lock_slurmctld(config_write_lock);
if (switch_g_restore(recover))
fatal("failed to initialize switch plugin");
}
/*
* priority_g_init() needs to be called after assoc_mgr_init()
* and before read_slurm_conf() because jobs could be killed
* during read_slurm_conf() and call priority_g_job_end().
*/
if (priority_g_init() != SLURM_SUCCESS)
fatal("failed to initialize priority plugin");
if ((slurmctld_primary || backup_has_control) &&
!reconfiguring) {
if ((error_code = read_slurm_conf(recover))) {
fatal("read_slurm_conf reading %s: %s",
slurm_conf.slurm_conf,
slurm_strerror(error_code));
}
configless_update();
if (conf_includes_list) {
/*
* clear included files so that subsequent conf
* parsings refill it with updated information.
*/
list_flush(conf_includes_list);
}
}
priority_g_thread_start();
if (slurmctld_primary || backup_has_control) {
select_g_select_nodeinfo_set_all();
unlock_slurmctld(config_write_lock);
if (recover == 0) {
slurmctld_init_db = 1;
_accounting_mark_all_nodes_down("cold-start");
}
}
slurm_persist_conn_recv_server_init();
info("Running as primary controller");
if (!reconfiguring) {
_run_primary_prog(true);
control_time = time(NULL);
heartbeat_start();
if (!slurmctld_config.resume_backup && slurmctld_primary)
trigger_primary_ctld_res_op();
}
/* Set pointers after they have been set */
stepmgr_ops.acct_db_conn = acct_db_conn;
stepmgr_ops.job_list = job_list;
stepmgr_ops.up_node_bitmap = up_node_bitmap;
_accounting_cluster_ready();
_send_future_cloud_to_db();
/*
* call after registering so that the current cluster's
* control_host and control_port will be filled in.
*/
fed_mgr_init(acct_db_conn);
_restore_job_dependencies();
sync_job_priorities();
if (mcs_g_init() != SLURM_SUCCESS)
fatal("failed to initialize mcs plugin");
/*
* create attached thread for state save
*/
slurm_thread_create(&slurmctld_config.thread_id_save,
slurmctld_state_save, NULL);
/*
* create attached thread for node power management
*/
power_save_init();
/*
* create attached thread for purging completed job files
*/
slurm_thread_create(&slurmctld_config.thread_id_purge_files,
_purge_files_thread, NULL);
/*
* create attached thread for purging completed job files
*/
slurm_thread_create(&slurmctld_config.thread_id_acct_update,
_acct_update_thread, NULL);
/*
* If reconfiguring, we need to restart the gang scheduler.
* Otherwise, gang scheduling was already started by
* read_slurm_conf().
*/
if (controller_init_scheduling(reconfiguring) != SLURM_SUCCESS)
fatal("Failed to initialize the various schedulers");
if (!original && !reconfiguring) {
notify_parent_of_success();
if (!under_systemd)
_update_pidfile();
_post_reconfig();
}
/*
* process slurm background activities, could run as pthread
*/
_slurmctld_background(NULL);
controller_fini_scheduling(); /* Stop all scheduling */
rpc_queue_shutdown();
agent_fini();
/* termination of controller */
switch_g_save();
priority_g_fini();
shutdown_state_save();
slurm_mutex_lock(&purge_thread_lock);
slurm_cond_signal(&purge_thread_cond); /* wake up last time */
slurm_mutex_unlock(&purge_thread_lock);
slurm_thread_join(slurmctld_config.thread_id_purge_files);
slurm_thread_join(slurmctld_config.thread_id_save);
slurm_mutex_lock(&slurmctld_config.acct_update_lock);
slurm_cond_broadcast(&slurmctld_config.acct_update_cond);
slurm_mutex_unlock(&slurmctld_config.acct_update_lock);
slurm_thread_join(slurmctld_config.thread_id_acct_update);
/* kill all scripts running by the slurmctld */
track_script_flush();
slurmscriptd_flush();
run_command_shutdown();
bb_g_fini();
mcs_g_fini();
fed_mgr_fini();
ctld_assoc_mgr_fini();
/* Save any pending state save RPCs */
acct_storage_g_close_connection(&acct_db_conn);
acct_storage_g_fini();
slurm_persist_conn_recv_server_fini();
power_save_fini();
/* attempt reconfig here */
if (reconfig) {
_attempt_reconfig();
continue;
}
config_power_mgr_fini();
/* stop the heartbeat last */
heartbeat_stop();
/*
* Run SlurmctldPrimaryOffProg only if we are the primary
* (backup_inx == 0). The backup controllers (backup_inx > 0)
* already run it when dropping to standby mode.
*/
if (slurmctld_primary)
_run_primary_prog(false);
if (slurmctld_config.resume_backup == false)
break;
/* primary controller doesn't resume backup mode */
if (slurmctld_config.resume_backup && slurmctld_primary)
break;
/* The backup is now meant to relinquish control */
if (slurmctld_config.resume_backup && !slurmctld_primary)
backup_has_control = false;
recover = 2;
/*
* We need to re-initialize run_command after
* run_command_shutdown() was called. Pass NULL since we do
* not want to change the script launcher location.
*/
(void) run_command_init(0, NULL, NULL);
}
slurmscriptd_fini();
jobcomp_g_fini();
/*
* Since pidfile is created as user root (its owner is
* changed to SlurmUser) SlurmUser may not be able to
* remove it, so this is not necessarily an error.
*/
if (!under_systemd && (unlink(slurm_conf.slurmctld_pidfile) < 0)) {
verbose("Unable to remove pidfile '%s': %m",
slurm_conf.slurmctld_pidfile);
}
#ifdef MEMORY_LEAK_DEBUG
{
/*
* This should purge all allocated memory.
* Anything left over represents a leak.
*/
xassert(list_is_empty(reconfig_reqs));
FREE_NULL_LIST(reconfig_reqs);
agent_purge();
/* Purge our local data structures */
configless_clear();
job_fini();
part_fini(); /* part_fini() must precede node_fini() */
node_fini();
mpi_fini();
node_features_g_fini();
resv_fini();
trigger_fini();
assoc_mgr_fini(1);
reserve_port_config(NULL, NULL);
/* Some plugins are needed to purge job/node data structures,
* unplug after other data structures are purged */
gres_fini();
job_submit_g_fini(false);
prep_g_fini();
preempt_g_fini();
jobacct_gather_fini();
acct_gather_conf_destroy();
select_g_fini();
topology_g_destroy_config();
topology_g_fini();
auth_g_fini();
hash_g_fini();
conn_g_fini();
certmgr_g_fini();
switch_g_fini();
site_factor_g_fini();
/* purge remaining data structures */
group_cache_purge();
getnameinfo_cache_purge();
license_free();
FREE_NULL_LIST(slurmctld_config.acct_update_list);
cred_g_fini();
slurm_conf_destroy();
cluster_rec_free();
track_script_fini();
cgroup_conf_destroy();
usleep(500000);
serializer_g_fini();
bit_cache_fini();
}
#endif
conmgr_request_shutdown();
conmgr_fini();
rate_limit_shutdown();
log_fini();
sched_log_fini();
if (dump_core)
abort();
else
exit(0);
}
static int _find_node_event(void *x, void *key)
{
slurmdb_event_rec_t *event = x;
char *node_name = key;
return !xstrcmp(event->node_name, node_name);
}
/*
* Create db down events for FUTURE and CLOUD+POWERED_DOWN nodes
*/
static void _send_future_cloud_to_db()
{
time_t now = time(NULL);
slurmdb_event_rec_t *event = NULL;
list_t *event_list = NULL;
bool check_db = !running_cache;
node_record_t *node_ptr;
for (int i = 0; (node_ptr = next_node(&i)); i++) {
if (!IS_NODE_FUTURE(node_ptr) &&
!IS_NODE_POWERED_DOWN(node_ptr))
continue;
/*
* If the DBD is up, then try to avoid making duplicate
* g_node_down() calls by reconciling with the db. If it's not
* up, just send the down events to preserve the startup time
* stamps.
*/
if (check_db && !event_list) {
slurmdb_event_cond_t event_cond = {0};
event_cond.event_type = SLURMDB_EVENT_NODE;
event_cond.cond_flags = SLURMDB_EVENT_COND_OPEN;
event_cond.cluster_list = list_create(xfree_ptr);
list_append(event_cond.cluster_list,
xstrdup(slurm_conf.cluster_name));
event_cond.format_list = list_create(NULL);
list_append(event_cond.format_list, "node_name");
event_cond.state_list = list_create(xfree_ptr);
list_append(event_cond.state_list,
xstrdup_printf("%u", NODE_STATE_FUTURE));
list_append(event_cond.state_list,
xstrdup_printf("%"PRIu64,
NODE_STATE_POWERED_DOWN));
event_list = acct_storage_g_get_events(acct_db_conn,
getuid(),
&event_cond);
if (!event_list)
check_db = false;
FREE_NULL_LIST(event_cond.cluster_list);
FREE_NULL_LIST(event_cond.format_list);
FREE_NULL_LIST(event_cond.state_list);
}
if (event_list &&
(event = list_find_first(event_list, _find_node_event,
node_ptr->name))) {
/* Open event record already exists, don't send again */
continue;
}
clusteracct_storage_g_node_down(
acct_db_conn, node_ptr, now,
IS_NODE_FUTURE(node_ptr) ? "Future" : "Powered down",
slurm_conf.slurm_user_id);
}
FREE_NULL_LIST(event_list);
}
/* initialization of common slurmctld configuration */
static void _init_config(void)
{
struct rlimit rlim;
rlimits_use_max_nofile();
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
rlim.rlim_cur = rlim.rlim_max;
(void) setrlimit(RLIMIT_CORE, &rlim);
}
if (getrlimit(RLIMIT_STACK, &rlim) == 0) {
/* slurmctld can spawn lots of pthreads.
* Set the (per thread) stack size to a
* more "reasonable" value to avoid running
* out of virtual memory and dying */
rlim.rlim_cur = rlim.rlim_max;
(void) setrlimit(RLIMIT_STACK, &rlim);
}
if (getrlimit(RLIMIT_DATA, &rlim) == 0) {
rlim.rlim_cur = rlim.rlim_max;
(void) setrlimit(RLIMIT_DATA, &rlim);
}
memset(&slurmctld_config, 0, sizeof(slurmctld_config_t));
FREE_NULL_LIST(slurmctld_config.acct_update_list);
slurmctld_config.acct_update_list =
list_create(slurmdb_destroy_update_object);
slurm_mutex_init(&slurmctld_config.acct_update_lock);
slurm_mutex_init(&slurmctld_config.thread_count_lock);
slurm_mutex_init(&slurmctld_config.backup_finish_lock);
slurm_mutex_lock(&slurmctld_config.acct_update_lock);
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
slurm_mutex_lock(&slurmctld_config.backup_finish_lock);
slurm_cond_init(&slurmctld_config.acct_update_cond, NULL);
slurm_cond_init(&slurmctld_config.backup_finish_cond, NULL);
slurm_cond_init(&slurmctld_config.thread_count_cond, NULL);
slurmctld_config.boot_time = time(NULL);
slurmctld_config.resume_backup = false;
slurmctld_config.server_thread_count = 0;
slurmctld_config.shutdown_time = (time_t) 0;
slurmctld_config.thread_id_main = pthread_self();
slurmctld_config.scheduling_disabled = false;
slurmctld_config.submissions_disabled = false;
track_script_init();
slurmctld_config.thread_id_main = (pthread_t) 0;
slurm_mutex_unlock(&slurmctld_config.backup_finish_lock);
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
slurm_mutex_unlock(&slurmctld_config.acct_update_lock);
}
static int _try_to_reconfig(void)
{
extern char **environ;
char **child_env;
pid_t pid;
int to_parent[2] = {-1, -1};
int *skip_close = NULL, skip_index = 0, auth_fd = -1;
child_env = env_array_copy((const char **) environ);
setenvf(&child_env, "SLURMCTLD_RECONF", "1");
if (pidfd != -1) {
setenvf(&child_env, "SLURMCTLD_RECONF_PIDFD", "%d", pidfd);
fd_set_noclose_on_exec(pidfd);
}
slurm_mutex_lock(&listeners.mutex);
/*
* Need space in array for:
* - to_parent[1]
* - pidfd
* - auth fd
* - terminator (-1)
* - listeners.count number of listening sockets
*/
skip_close = xcalloc((listeners.count + 4), sizeof(*skip_close));
if (listeners.count) {
char *ports = NULL, *pos = NULL;
setenvf(&child_env, "SLURMCTLD_RECONF_LISTEN_COUNT", "%d",
listeners.count);
for (int i = 0; i < listeners.count; i++) {
xstrfmtcatat(ports, &pos, "%d,", listeners.fd[i]);
if (listeners.fd[i] >= 0) {
fd_set_noclose_on_exec(listeners.fd[i]);
skip_close[skip_index++] = listeners.fd[i];
}
}
setenvf(&child_env, "SLURMCTLD_RECONF_LISTEN_FDS", "%s", ports);
xfree(ports);
}
slurm_mutex_unlock(&listeners.mutex);
if ((auth_fd = auth_g_get_reconfig_fd(AUTH_PLUGIN_SLURM)) >= 0)
skip_close[skip_index++] = auth_fd;
for (int i = 0; i < 3; i++)
fd_set_noclose_on_exec(i);
if (!daemonize && !under_systemd) {
/*
* If in attached mode, the slurmctld does not fork() so it
* does not change its PID. The slurmctld needs to call
* slurmscriptd_fini() to reap the slurmscriptd PID, otherwise
* the slurmscriptd PID would be a defunct entry in the process
* table.
* For detached mode, the parent slurmctld needs to keep the
* slurmscriptd running in order to recover if the child
* slurmctld fails to start. If the child slurmctld starts
* successfully, then when the parent slurmctld shuts down the
* corresponding slurmscriptd is reparented to init, shuts itself
* down, and init will reap the PID for us.
*/
slurmscriptd_fini();
goto start_child;
}
if (pipe(to_parent))
fatal("%s: pipe() failed: %m", __func__);
setenvf(&child_env, "SLURMCTLD_RECONF_PARENT_FD", "%d", to_parent[1]);
if ((pid = fork()) < 0) {
fatal("%s: fork() failed: %m", __func__);
} else if (pid > 0) {
pid_t grandchild_pid;
int rc;
/*
* Close the input side of the pipe so the read() will return
* immediately if the child process fatal()s.
* Otherwise we'd be stuck here indefinitely assuming another
* internal thread might write something to the pipe.
*/
(void) close(to_parent[1]);
safe_read(to_parent[0], &grandchild_pid, sizeof(pid_t));
info("Relinquishing control to new slurmctld process");
/*
* Ensure child has exited.
* Grandchild should be owned by init.
*/
if (under_systemd) {
waitpid(pid, &rc, 0);
xsystemd_change_mainpid(grandchild_pid);
}
xfree(skip_close);
return SLURM_SUCCESS;
rwfail:
close(to_parent[0]);
env_array_free(child_env);
waitpid(pid, &rc, 0);
info("Resuming operation, reconfigure failed.");
xfree(skip_close);
return SLURM_ERROR;
}
start_child:
if (to_parent[1] >= 0)
skip_close[skip_index++] = to_parent[1];
if (pidfd >= 0)
skip_close[skip_index++] = pidfd;
skip_close[skip_index] = -1;
closeall_except(3, skip_close);
/*
* This second fork() ensures that the new grandchild's parent is init,
* which avoids a nuisance warning from systemd of:
* "Supervising process 123456 which is not our child. We'll most likely not notice when it exits"
*/
if (under_systemd) {
if ((pid = fork()) < 0)
fatal("fork() failed: %m");
else if (pid)
exit(0);
}
execve(binary, main_argv, child_env);
fatal("execv() failed: %m");
}
extern void notify_parent_of_success(void)
{
char *parent_fd_env = getenv("SLURMCTLD_RECONF_PARENT_FD");
pid_t pid = getpid();
int fd = -1;
static bool notified = false;
if (original || !parent_fd_env || notified)
return;
notified = true;
fd = atoi(parent_fd_env);
info("child started successfully");
safe_write(fd, &pid, sizeof(pid_t));
(void) close(fd);
return;
rwfail:
error("failed to notify parent, may have two processes running now");
(void) close(fd);
}
extern void reconfigure_slurm(slurm_msg_t *msg)
{
xassert(msg);
list_append(reconfig_reqs, msg);
pthread_kill(pthread_self(), SIGHUP);
}
static void _post_reconfig(void)
{
if (running_configless) {
configless_update();
push_reconfig_to_slurmd();
sackd_mgr_push_reconfig();
} else {
msg_to_slurmd(REQUEST_RECONFIGURE);
}
}
/* Request that the job scheduler execute soon (typically within seconds) */
extern void queue_job_scheduler(void)
{
slurm_mutex_lock(&sched_cnt_mutex);
job_sched_cnt++;
slurm_mutex_unlock(&sched_cnt_mutex);
}
static void *_on_listen_connect(conmgr_fd_t *con, void *arg)
{
const int *i_ptr = arg;
const int i = *i_ptr;
int rc = EINVAL;
debug3("%s: [%s] Successfully opened RPC listener",
__func__, conmgr_fd_get_name(con));
slurm_mutex_lock(&listeners.mutex);
xassert(!listeners.cons[i]);
listeners.cons[i] = con;
if (!listeners.quiesced &&
(rc = conmgr_unquiesce_fd(listeners.cons[i])))
fatal_abort("%s: conmgr_unquiesce_fd(%s) failed: %s",
__func__, conmgr_fd_get_name(con),
slurm_strerror(rc));
slurm_mutex_unlock(&listeners.mutex);
return arg;
}
static void _on_listen_finish(conmgr_fd_t *con, void *arg)
{
int *i_ptr = arg;
const int i = *i_ptr;
debug3("%s: [%s] Closed RPC listener",
__func__, conmgr_fd_get_name(con));
slurm_mutex_lock(&listeners.mutex);
xassert(listeners.cons[i] == con);
listeners.cons[i] = NULL;
slurm_mutex_unlock(&listeners.mutex);
xfree(i_ptr);
}
static void *_on_primary_connection(conmgr_fd_t *con, void *arg)
{
debug3("%s: [%s] PRIMARY: New RPC connection",
__func__, conmgr_fd_get_name(con));
return con;
}
static void _on_primary_finish(conmgr_fd_t *con, void *arg)
{
debug3("%s: [%s] PRIMARY: RPC connection closed",
__func__, conmgr_fd_get_name(con));
}
/*
* Process incoming primary RPCs.
*
* WARNING: conmgr will read all available incoming data and could process
* multiple RPCs on a single connection but the current RPC model of the
* controller is to only have 1 incoming RPC and then to reply and close the
* connection. This is not ideal if the RPC handler should try to read from the
* extracted fd since conmgr may have already read some data it expects. This
* currently does not appear to be an issue but may be one in the future until
* all of the RPC handlers are converted to conmgr fully.
*/
static int _on_primary_msg(conmgr_fd_t *con, slurm_msg_t *msg, void *arg)
{
int rc = SLURM_SUCCESS;
if (!msg->auth_ids_set)
fatal_abort("this should never happen");
log_flag(AUDIT_RPCS, "[%s] msg_type=%s uid=%u client=[%pA] protocol=%u",
conmgr_fd_get_name(con), rpc_num2string(msg->msg_type),
msg->auth_uid, &msg->address, msg->protocol_version);
/*
* Check msg against the rate limit. Tell client to retry in a second
* to minimize controller disruption.
*/
if (rate_limit_exceeded(msg)) {
rc = slurm_send_rc_msg(msg, SLURMCTLD_COMMUNICATIONS_BACKOFF);
slurm_free_msg(msg);
} else {
/*
* The fd will be extracted from conmgr, so the conmgr
* connection ref should be removed from msg first.
*/
conmgr_fd_free_ref(&msg->conmgr_con);
if ((rc = conmgr_queue_extract_con_fd(
con, _service_connection,
XSTRINGIFY(_service_connection), msg)))
error("%s: [%s] Extracting FDs failed: %s",
__func__, conmgr_fd_get_name(con),
slurm_strerror(rc));
}
return rc;
}
static void *_on_connection(conmgr_fd_t *con, void *arg)
{
bool standby_mode;
slurm_mutex_lock(&listeners.mutex);
standby_mode = listeners.standby_mode;
slurm_mutex_unlock(&listeners.mutex);
if (!standby_mode)
return _on_primary_connection(con, arg);
else
return on_backup_connection(con, arg);
}
static void _on_finish(conmgr_fd_t *con, void *arg)
{
bool standby_mode;
slurm_mutex_lock(&listeners.mutex);
standby_mode = listeners.standby_mode;
slurm_mutex_unlock(&listeners.mutex);
if (!standby_mode)
return _on_primary_finish(con, arg);
else
return on_backup_finish(con, arg);
}
static int _on_msg(conmgr_fd_t *con, slurm_msg_t *msg, int unpack_rc, void *arg)
{
bool standby_mode;
if ((unpack_rc == SLURM_PROTOCOL_AUTHENTICATION_ERROR) ||
!msg->auth_ids_set) {
/*
* Avoid closing connection immediately on authentication
* failure to give the sender a hint to fix their authentication
* issue with authentication disabled.
*/
msg->flags |= SLURM_NO_AUTH_CRED;
slurm_send_rc_msg(msg, SLURM_PROTOCOL_AUTHENTICATION_ERROR);
slurm_free_msg(msg);
return SLURM_SUCCESS;
} else if (unpack_rc) {
error("%s: [%s] rejecting malformed RPC and closing connection: %s",
__func__, conmgr_fd_get_name(con),
slurm_strerror(unpack_rc));
slurm_free_msg(msg);
return unpack_rc;
}
slurm_mutex_lock(&listeners.mutex);
standby_mode = listeners.standby_mode;
slurm_mutex_unlock(&listeners.mutex);
if (!standby_mode)
return _on_primary_msg(con, msg, arg);
else
return on_backup_msg(con, msg, arg);
}
extern void listeners_quiesce(void)
{
slurm_mutex_lock(&listeners.mutex);
if (listeners.quiesced) {
slurm_mutex_unlock(&listeners.mutex);
return;
}
for (int i = 0; i < listeners.count; i++) {
int rc;
if (!listeners.cons[i])
continue;
/* This should always work */
if ((rc = conmgr_quiesce_fd(listeners.cons[i])))
fatal_abort("%s: conmgr_quiesce_fd(%s) failed: %s",
__func__,
conmgr_fd_get_name(listeners.cons[i]),
slurm_strerror(rc));
}
listeners.quiesced = true;
slurm_mutex_unlock(&listeners.mutex);
}
extern void listeners_unquiesce(void)
{
slurm_mutex_lock(&listeners.mutex);
if (!listeners.quiesced) {
slurm_mutex_unlock(&listeners.mutex);
return;
}
for (int i = 0; i < listeners.count; i++) {
int rc;
if (!listeners.cons[i])
continue;
/* This should always work */
if ((rc = conmgr_unquiesce_fd(listeners.cons[i])))
fatal_abort("%s: conmgr_unquiesce_fd(%s) failed: %s",
__func__,
conmgr_fd_get_name(listeners.cons[i]),
slurm_strerror(rc));
}
listeners.quiesced = false;
slurm_mutex_unlock(&listeners.mutex);
}
/*
* _open_ports - Open all ports for the slurmctld to listen on.
*/
static void _open_ports(void)
{
static const conmgr_events_t events = {
.on_listen_connect = _on_listen_connect,
.on_listen_finish = _on_listen_finish,
.on_connection = _on_connection,
.on_msg = _on_msg,
.on_finish = _on_finish,
};
slurm_mutex_lock(&listeners.mutex);
/* initialize ports for RPCs */
if (original) {
if (!(listeners.count = slurm_conf.slurmctld_port_count))
fatal("slurmctld port count is zero");
listeners.fd = xcalloc(listeners.count, sizeof(*listeners.fd));
listeners.cons = xcalloc(listeners.count,
sizeof(*listeners.cons));
for (int i = 0; i < listeners.count; i++) {
listeners.fd[i] = slurm_init_msg_engine_port(
slurm_conf.slurmctld_port + i);
}
} else {
char *pos = getenv("SLURMCTLD_RECONF_LISTEN_FDS");
listeners.count = atoi(getenv("SLURMCTLD_RECONF_LISTEN_COUNT"));
listeners.fd = xcalloc(listeners.count, sizeof(*listeners.fd));
listeners.cons = xcalloc(listeners.count,
sizeof(*listeners.cons));
for (int i = 0; i < listeners.count; i++) {
listeners.fd[i] = strtol(pos, &pos, 10);
pos++; /* skip comma */
}
}
for (uint64_t i = 0; i < listeners.count; i++) {
static conmgr_con_flags_t flags =
(CON_FLAG_RPC_KEEP_BUFFER | CON_FLAG_QUIESCE |
CON_FLAG_WATCH_WRITE_TIMEOUT |
CON_FLAG_WATCH_READ_TIMEOUT |
CON_FLAG_WATCH_CONNECT_TIMEOUT);
int rc, *index_ptr;
index_ptr = xmalloc(sizeof(*index_ptr));
*index_ptr = i;
if (tls_enabled())
flags |= CON_FLAG_TLS_SERVER;
if ((rc = conmgr_process_fd_listen(listeners.fd[i],
CON_TYPE_RPC, &events, flags,
index_ptr))) {
if (rc == SLURM_COMMUNICATIONS_INVALID_FD)
fatal("%s: Unable to listen to file descriptors. Existing slurmctld process likely already is listening on the ports.",
__func__);
fatal("%s: unable to process fd:%d error:%s",
__func__, listeners.fd[i], slurm_strerror(rc));
}
}
slurm_mutex_unlock(&listeners.mutex);
}
/*
* _service_connection - service the RPC
* IN/OUT arg - really just the connection's file descriptor, freed
* upon completion
* RET - NULL
*/
static void _service_connection(conmgr_callback_args_t conmgr_args,
int input_fd, int output_fd, void *tls_conn,
void *arg)
{
int rc;
slurm_msg_t *msg = arg;
slurmctld_rpc_t *this_rpc = NULL;
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
debug3("%s: [fd:%d] connection work cancelled",
__func__, input_fd);
goto invalid;
}
if ((input_fd < 0) || (output_fd < 0)) {
error("%s: Rejecting partially open connection input_fd=%d output_fd=%d",
__func__, input_fd, output_fd);
goto invalid;
}
/*
* The fd was extracted from conmgr, so the conmgr connection is
* invalid.
*/
conmgr_fd_free_ref(&msg->conmgr_con);
if (tls_conn) {
msg->tls_conn = tls_conn;
} else {
conn_args_t tls_args = {
.input_fd = input_fd,
.output_fd = output_fd,
};
msg->tls_conn = conn_g_create(&tls_args);
}
server_thread_incr();
if (!(rc = rpc_enqueue(msg))) {
server_thread_decr();
return;
}
if (rc == SLURMCTLD_COMMUNICATIONS_BACKOFF) {
slurm_send_rc_msg(msg, SLURMCTLD_COMMUNICATIONS_BACKOFF);
} else if (rc == SLURMCTLD_COMMUNICATIONS_HARD_DROP) {
slurm_send_rc_msg(msg, SLURMCTLD_COMMUNICATIONS_HARD_DROP);
} else if ((this_rpc = find_rpc(msg->msg_type))) {
/* directly process the request */
slurmctld_req(msg, this_rpc);
} else {
error("invalid RPC msg_type=%s", rpc_num2string(msg->msg_type));
slurm_send_rc_msg(msg, EINVAL);
}
if (!this_rpc || !this_rpc->keep_msg) {
conn_g_destroy(msg->tls_conn, true);
msg->tls_conn = NULL;
log_flag(TLS, "Destroyed server TLS connection for incoming RPC on fd %d->%d",
input_fd, output_fd);
slurm_free_msg(msg);
}
server_thread_decr();
return;
invalid:
/* Cleanup for invalid RPC */
if (!tls_conn) {
if (input_fd != output_fd)
fd_close(&output_fd);
fd_close(&input_fd);
}
slurm_free_msg(msg);
conn_g_destroy(tls_conn, true);
}
/* Decrement slurmctld thread count (as applies to thread limit) */
extern void server_thread_decr(void)
{
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
if (slurmctld_config.server_thread_count > 0)
slurmctld_config.server_thread_count--;
else
error("slurmctld_config.server_thread_count underflow");
slurm_cond_broadcast(&slurmctld_config.thread_count_cond);
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
}
/* Increment slurmctld thread count (as applies to thread limit) */
extern void server_thread_incr(void)
{
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
slurmctld_config.server_thread_count++;
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
}
static int _accounting_cluster_ready(void)
{
return clusteracct_storage_g_cluster_tres(acct_db_conn,
NULL,
NULL,
0,
SLURM_PROTOCOL_VERSION);
}
static int _accounting_mark_all_nodes_down(char *reason)
{
char *state_file;
struct stat stat_buf;
node_record_t *node_ptr;
int i;
time_t event_time;
int rc = SLURM_ERROR;
state_file = xstrdup_printf("%s/node_state",
slurm_conf.state_save_location);
if (stat(state_file, &stat_buf)) {
debug("_accounting_mark_all_nodes_down: could not stat(%s) "
"to record node down time", state_file);
event_time = time(NULL);
} else {
event_time = stat_buf.st_mtime;
}
xfree(state_file);
if ((rc = acct_storage_g_flush_jobs_on_cluster(acct_db_conn,
event_time))
== SLURM_ERROR)
return rc;
for (i = 0; (node_ptr = next_node(&i)); i++) {
if (!node_ptr->name)
continue;
if ((rc = clusteracct_storage_g_node_down(
acct_db_conn, node_ptr, event_time,
reason, slurm_conf.slurm_user_id))
== SLURM_ERROR)
break;
}
return rc;
}
static void _remove_assoc(slurmdb_assoc_rec_t *rec)
{
int cnt = 0;
bb_g_reconfig();
cnt = job_hold_by_assoc_id(rec->id);
if (cnt) {
info("Removed association id:%u user:%s, held %u jobs",
rec->id, rec->user, cnt);
} else
debug("Removed association id:%u user:%s", rec->id, rec->user);
}
static int _foreach_part_remove_qos(void *x, void *arg)
{
part_record_t *part_ptr = x;
slurmdb_qos_rec_t *rec = arg;
if (part_ptr->qos_ptr == rec) {
info("Partition %s's QOS %s was just removed, you probably didn't mean for this to happen unless you are also removing the partition.",
part_ptr->name, rec->name);
part_ptr->qos_ptr = NULL;
}
return 0;
}
static void _remove_qos(slurmdb_qos_rec_t *rec)
{
int cnt = 0;
slurmctld_lock_t part_write_lock =
{ NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK };
lock_slurmctld(part_write_lock);
if (part_list)
(void) list_for_each(part_list, _foreach_part_remove_qos, rec);
unlock_slurmctld(part_write_lock);
bb_g_reconfig();
cnt = job_hold_by_qos_id(rec->id);
if (cnt) {
info("Removed QOS:%s held %u jobs", rec->name, cnt);
} else
debug("Removed QOS:%s", rec->name);
}
static int _update_assoc_for_each(void *x, void *arg) {
slurmdb_assoc_rec_t *rec = arg;
job_record_t *job_ptr = x;
if ((rec == job_ptr->assoc_ptr) && (IS_JOB_PENDING(job_ptr)))
acct_policy_update_pending_job(job_ptr);
return 0;
}
static void _update_assoc(slurmdb_assoc_rec_t *rec)
{
/* Write lock on jobs */
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
if (!job_list || !accounting_enforce
|| !(accounting_enforce & ACCOUNTING_ENFORCE_LIMITS))
return;
lock_slurmctld(job_write_lock);
list_for_each(job_list, _update_assoc_for_each, rec);
unlock_slurmctld(job_write_lock);
}
static int _foreach_part_resize_qos(void *x, void *arg)
{
part_record_t *part_ptr = x;
if (part_ptr->allow_qos)
qos_list_build(part_ptr->allow_qos, false,
&part_ptr->allow_qos_bitstr);
if (part_ptr->deny_qos)
qos_list_build(part_ptr->deny_qos, false,
&part_ptr->deny_qos_bitstr);
return 0;
}
static void _resize_qos(void)
{
slurmctld_lock_t part_write_lock =
{ NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK };
lock_slurmctld(part_write_lock);
if (part_list)
(void) list_for_each(part_list, _foreach_part_resize_qos, NULL);
unlock_slurmctld(part_write_lock);
}
static int _update_qos_for_each(void *x, void *arg) {
slurmdb_qos_rec_t *rec = arg;
job_record_t *job_ptr = x;
if ((rec == job_ptr->qos_ptr) && (IS_JOB_PENDING(job_ptr)))
acct_policy_update_pending_job(job_ptr);
return 0;
}
static void _update_qos(slurmdb_qos_rec_t *rec)
{
/* Write lock on jobs */
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
if (!job_list || !accounting_enforce
|| !(accounting_enforce & ACCOUNTING_ENFORCE_LIMITS))
return;
lock_slurmctld(job_write_lock);
list_for_each(job_list, _update_qos_for_each, rec);
unlock_slurmctld(job_write_lock);
}
static int _init_tres(void)
{
char *temp_char;
list_t *char_list = NULL;
list_t *add_list = NULL;
slurmdb_tres_rec_t *tres_rec;
slurmdb_update_object_t update_object;
assoc_mgr_lock_t locks = { .tres = READ_LOCK };
if (!slurm_conf.accounting_storage_tres) {
error("No tres defined, this should never happen");
return SLURM_ERROR;
}
char_list = list_create(xfree_ptr);
slurm_addto_char_list(char_list, slurm_conf.accounting_storage_tres);
memset(&update_object, 0, sizeof(slurmdb_update_object_t));
if (!slurm_with_slurmdbd()) {
update_object.type = SLURMDB_ADD_TRES;
update_object.objects = list_create(slurmdb_destroy_tres_rec);
} else if (!g_tres_count)
fatal("You are running with a database but for some reason "
"we have no TRES from it. This should only happen if "
"the database is down and you don't have "
"any state files.");
else if ((g_tres_count < TRES_ARRAY_TOTAL_CNT) ||
(xstrcmp(assoc_mgr_tres_array[TRES_ARRAY_BILLING]->type,
"billing")))
fatal("You are running with a database but for some reason we have less TRES than should be here (%d < %d) and/or the \"billing\" TRES is missing. This should only happen if the database is down after an upgrade.",
g_tres_count, TRES_ARRAY_TOTAL_CNT);
while ((temp_char = list_pop(char_list))) {
tres_rec = xmalloc(sizeof(slurmdb_tres_rec_t));
tres_rec->type = temp_char;
if (!xstrcasecmp(temp_char, "cpu"))
tres_rec->id = TRES_CPU;
else if (!xstrcasecmp(temp_char, "mem"))
tres_rec->id = TRES_MEM;
else if (!xstrcasecmp(temp_char, "energy"))
tres_rec->id = TRES_ENERGY;
else if (!xstrcasecmp(temp_char, "node"))
tres_rec->id = TRES_NODE;
else if (!xstrcasecmp(temp_char, "billing"))
tres_rec->id = TRES_BILLING;
else if (!xstrcasecmp(temp_char, "vmem"))
tres_rec->id = TRES_VMEM;
else if (!xstrcasecmp(temp_char, "pages"))
tres_rec->id = TRES_PAGES;
else if (!xstrncasecmp(temp_char, "bb/", 3)) {
tres_rec->type[2] = '\0';
tres_rec->name = xstrdup(temp_char+3);
if (!tres_rec->name)
fatal("Burst Buffer type tres need to have a "
"name, (i.e. bb/datawarp). You gave %s",
temp_char);
} else if (!xstrncasecmp(temp_char, "gres/", 5)) {
tres_rec->type[4] = '\0';
tres_rec->name = xstrdup(temp_char+5);
if (!tres_rec->name)
fatal("Gres type tres need to have a name, "
"(i.e. Gres/GPU). You gave %s",
temp_char);
} else if (!xstrncasecmp(temp_char, "license/", 8)) {
tres_rec->type[7] = '\0';
tres_rec->name = xstrdup(temp_char+8);
if (!tres_rec->name)
fatal("License type tres need to "
"have a name, (i.e. License/Foo). "
"You gave %s",
temp_char);
} else if (!xstrncasecmp(temp_char, "fs/", 3)) {
tres_rec->type[2] = '\0';
tres_rec->name = xstrdup(temp_char+3);
if (!tres_rec->name)
fatal("Filesystem type tres need to have a name, (i.e. fs/disk). You gave %s",
temp_char);
if (!xstrncasecmp(tres_rec->name, "disk", 4))
tres_rec->id = TRES_FS_DISK;
} else if (!xstrncasecmp(temp_char, "ic/", 3)) {
tres_rec->type[2] = '\0';
tres_rec->name = xstrdup(temp_char+3);
if (!tres_rec->name)
fatal("Interconnect type tres need to have a name, (i.e. ic/ofed). You gave %s",
temp_char);
} else {
fatal("%s: Unknown tres type '%s', acceptable types are Billing,CPU,Energy,FS/,Gres/,IC/,License/,Mem,Node,Pages,VMem",
__func__, temp_char);
xfree(tres_rec->type);
xfree(tres_rec);
}
if (!slurm_with_slurmdbd()) {
if (!tres_rec->id)
fatal("slurmdbd is required to run with TRES %s%s%s. Either setup slurmdbd or remove this TRES from your configuration.",
tres_rec->type, tres_rec->name ? "/" : "",
tres_rec->name ? tres_rec->name : "");
list_append(update_object.objects, tres_rec);
} else if (!tres_rec->id &&
assoc_mgr_fill_in_tres(
acct_db_conn, tres_rec,
ACCOUNTING_ENFORCE_TRES, NULL, 0)
!= SLURM_SUCCESS) {
if (!add_list)
add_list = list_create(
slurmdb_destroy_tres_rec);
info("Couldn't find tres %s%s%s in the database, "
"creating.",
tres_rec->type, tres_rec->name ? "/" : "",
tres_rec->name ? tres_rec->name : "");
list_append(add_list, tres_rec);
} else
slurmdb_destroy_tres_rec(tres_rec);
}
FREE_NULL_LIST(char_list);
if (add_list) {
if (acct_storage_g_add_tres(acct_db_conn,
slurm_conf.slurm_user_id,
add_list) != SLURM_SUCCESS)
fatal("Problem adding tres to the database, "
"can't continue until database is able to "
"make new tres");
/* refresh list here since the updates are not
sent dynamically */
assoc_mgr_refresh_lists(acct_db_conn, ASSOC_MGR_CACHE_TRES);
FREE_NULL_LIST(add_list);
}
if (!slurm_with_slurmdbd()) {
assoc_mgr_update_tres(&update_object, false);
FREE_NULL_LIST(update_object.objects);
}
/* Set up the slurmctld_tres_cnt here (Current code is set to
* not have this ever change).
*/
assoc_mgr_lock(&locks);
slurmctld_tres_cnt = g_tres_count;
assoc_mgr_unlock(&locks);
return SLURM_SUCCESS;
}
/*
* NOTE: the job_write_lock as well as the assoc_mgr TRES Read lock should be
* locked before coming in here.
*/
static int _update_job_tres(void *x, void *arg)
{
job_record_t *job_ptr = x;
xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
/* If this returns 1 it means the positions were
altered so just rebuild it.
*/
if (assoc_mgr_set_tres_cnt_array(&job_ptr->tres_req_cnt,
job_ptr->tres_req_str,
0, true, false, NULL))
job_set_req_tres(job_ptr, true);
if (assoc_mgr_set_tres_cnt_array(&job_ptr->tres_alloc_cnt,
job_ptr->tres_alloc_str,
0, true, false, NULL))
job_set_alloc_tres(job_ptr, true);
update_job_limit_set_tres(&job_ptr->limit_set.tres, slurmctld_tres_cnt);
return 0;
}
/* any association manager locks should be unlocked before hand */
static void _update_cluster_tres(void)
{
/* Write lock on jobs */
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
assoc_mgr_lock_t locks = { .tres = READ_LOCK };
if (!job_list)
return;
lock_slurmctld(job_write_lock);
assoc_mgr_lock(&locks);
list_for_each(job_list, _update_job_tres, NULL);
assoc_mgr_unlock(&locks);
unlock_slurmctld(job_write_lock);
}
static void _update_parts_and_resvs()
{
update_assocs_in_resvs();
part_list_update_assoc_lists();
}
static void _queue_reboot_msg(void)
{
agent_arg_t *reboot_agent_args = NULL;
node_record_t *node_ptr;
char *host_str;
time_t now = time(NULL);
int i;
bool want_reboot;
want_nodes_reboot = false;
for (i = 0; (node_ptr = next_node(&i)); i++) {
/* Allow nodes in maintenance reservations to reboot
* (they previously could not).
*/
if (!IS_NODE_REBOOT_REQUESTED(node_ptr))
continue; /* No reboot needed */
else if (IS_NODE_REBOOT_ISSUED(node_ptr)) {
debug2("%s: Still waiting for boot of node %s",
__func__, node_ptr->name);
continue;
}
if (IS_NODE_COMPLETING(node_ptr)) {
want_nodes_reboot = true;
continue;
}
/* only active idle nodes, don't reboot
* nodes that are idle but have suspended
* jobs on them
*/
if (IS_NODE_IDLE(node_ptr)
&& !IS_NODE_NO_RESPOND(node_ptr)
&& !IS_NODE_POWERING_UP(node_ptr)
&& node_ptr->sus_job_cnt == 0)
want_reboot = true;
else if (IS_NODE_FUTURE(node_ptr) &&
(node_ptr->last_response == (time_t) 0))
want_reboot = true; /* system just restarted */
else if (IS_NODE_DOWN(node_ptr))
want_reboot = true;
else
want_reboot = false;
if (!want_reboot) {
want_nodes_reboot = true; /* defer reboot */
continue;
}
if (reboot_agent_args == NULL) {
reboot_agent_args = xmalloc(sizeof(agent_arg_t));
reboot_agent_args->msg_type = REQUEST_REBOOT_NODES;
reboot_agent_args->retry = 0;
reboot_agent_args->hostlist = hostlist_create(NULL);
reboot_agent_args->protocol_version =
SLURM_PROTOCOL_VERSION;
}
if (reboot_agent_args->protocol_version
> node_ptr->protocol_version)
reboot_agent_args->protocol_version =
node_ptr->protocol_version;
hostlist_push_host(reboot_agent_args->hostlist, node_ptr->name);
reboot_agent_args->node_count++;
/*
* node_ptr->node_state &= ~NODE_STATE_MAINT;
* The NODE_STATE_MAINT bit will just get set again as long
* as the node remains in the maintenance reservation, so
* don't clear it here because it won't do anything.
*/
node_ptr->node_state &= NODE_STATE_FLAGS;
node_ptr->node_state |= NODE_STATE_DOWN;
node_ptr->node_state &= ~NODE_STATE_REBOOT_REQUESTED;
node_ptr->node_state |= NODE_STATE_REBOOT_ISSUED;
bit_clear(avail_node_bitmap, node_ptr->index);
bit_clear(idle_node_bitmap, node_ptr->index);
/* Unset this as this node is not in reboot ASAP anymore. */
bit_clear(asap_node_bitmap, node_ptr->index);
node_ptr->boot_req_time = now;
set_node_reason(node_ptr, "reboot issued", now);
clusteracct_storage_g_node_down(acct_db_conn, node_ptr, now,
NULL, slurm_conf.slurm_user_id);
}
if (reboot_agent_args != NULL) {
hostlist_uniq(reboot_agent_args->hostlist);
host_str = hostlist_ranged_string_xmalloc(
reboot_agent_args->hostlist);
debug("Issuing reboot request for nodes %s", host_str);
xfree(host_str);
set_agent_arg_r_uid(reboot_agent_args, SLURM_AUTH_UID_ANY);
agent_queue_request(reboot_agent_args);
last_node_update = now;
schedule_node_save();
}
}
static void _flush_rpcs(void)
{
struct timespec ts = {0, 0};
struct timeval now;
int exp_thread_cnt = slurmctld_config.resume_backup ? 1 : 0;
/* wait for RPCs to complete */
gettimeofday(&now, NULL);
ts.tv_sec = now.tv_sec + CONTROL_TIMEOUT;
ts.tv_nsec = now.tv_usec * 1000;
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
while (slurmctld_config.server_thread_count > exp_thread_cnt) {
slurm_cond_timedwait(&slurmctld_config.thread_count_cond,
&slurmctld_config.thread_count_lock, &ts);
}
if (slurmctld_config.server_thread_count > exp_thread_cnt) {
info("shutdown server_thread_count=%d",
slurmctld_config.server_thread_count);
}
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
}
/*
* _slurmctld_background - process slurmctld background activities
* purge defunct job records, save state, schedule jobs, and
* ping other nodes
*/
static void *_slurmctld_background(void *no_data)
{
static time_t last_sched_time;
static time_t last_config_list_update_time;
static time_t last_full_sched_time;
static time_t last_checkpoint_time;
static time_t last_group_time;
static time_t last_health_check_time;
static time_t last_acct_gather_node_time;
static time_t last_no_resp_msg_time;
static time_t last_ping_node_time = (time_t) 0;
static time_t last_ping_srun_time;
static time_t last_purge_job_time;
static time_t last_resv_time;
static time_t last_timelimit_time;
static time_t last_assert_primary_time;
static time_t last_trigger;
static time_t last_node_acct;
static time_t last_ctld_bu_ping;
static time_t last_uid_update;
time_t now;
int no_resp_msg_interval, ping_interval, purge_job_interval;
DEF_TIMERS;
/* Locks: Read config */
slurmctld_lock_t config_read_lock = {
READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
/* Locks: Read config, read job */
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
/* Locks: Read config, write job, write node, read partition */
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
/* Locks: Write job */
slurmctld_lock_t job_write_lock2 = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
/* Locks: Read config, write job, write node
* (Might kill jobs on nodes set DOWN) */
slurmctld_lock_t node_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
/* Locks: Write node */
slurmctld_lock_t node_write_lock2 = {
NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
/* Locks: Write partition */
slurmctld_lock_t part_write_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK };
/* Locks: Read job and node */
slurmctld_lock_t job_node_read_lock = {
NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
/*
* purge_old_job modifies jobs and reads conf info. It can also
* call re_kill_job(), which can modify nodes and reads fed info.
*/
slurmctld_lock_t purge_job_locks = {
.conf = READ_LOCK,
.job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK,
};
/* Let the dust settle before doing work */
now = time(NULL);
last_sched_time = last_full_sched_time = now;
last_checkpoint_time = last_group_time = now;
last_purge_job_time = last_trigger = last_health_check_time = now;
last_timelimit_time = last_assert_primary_time = now;
last_no_resp_msg_time = last_resv_time = last_ctld_bu_ping = now;
last_uid_update = now;
last_acct_gather_node_time = now;
last_config_list_update_time = now;
last_ping_srun_time = now;
last_node_acct = now;
debug3("_slurmctld_background pid = %u", getpid());
while (1) {
bool call_schedule = false, full_queue = false;
slurm_mutex_lock(&shutdown_mutex);
if (!slurmctld_config.shutdown_time) {
struct timespec ts = {0, 0};
/* Listen to new incoming RPCs if not shutting down */
listeners_unquiesce();
ts.tv_sec = time(NULL) + 1;
slurm_cond_timedwait(&shutdown_cond, &shutdown_mutex,
&ts);
}
slurm_mutex_unlock(&shutdown_mutex);
now = time(NULL);
START_TIMER;
if (slurm_conf.slurmctld_debug <= 3)
no_resp_msg_interval = 300;
else if (slurm_conf.slurmctld_debug == 4)
no_resp_msg_interval = 60;
else
no_resp_msg_interval = 1;
if ((slurm_conf.min_job_age > 0) &&
(slurm_conf.min_job_age < PURGE_JOB_INTERVAL)) {
/* Purge jobs more quickly, especially for high job flow */
purge_job_interval = MAX(10, slurm_conf.min_job_age);
} else
purge_job_interval = PURGE_JOB_INTERVAL;
if (slurm_conf.slurmd_timeout) {
/* We ping nodes that haven't responded in SlurmdTimeout/3,
* but need to do the test at a higher frequency or we might
* DOWN nodes with times that fall in the gap. */
ping_interval = slurm_conf.slurmd_timeout / 3;
} else {
/* This will just ping non-responding nodes
* and restore them to service */
ping_interval = 100; /* 100 seconds */
}
if (!last_ping_node_time) {
last_ping_node_time = now + (time_t)MIN_CHECKIN_TIME -
ping_interval;
}
if (slurmctld_config.shutdown_time) {
/* Always stop listening when shutdown requested */
listeners_quiesce();
_flush_rpcs();
/*
* Wait for all already accepted connection work to
* finish before continuing on with control loop that
* will unload all the plugins which requires there be
* no active RPCs.
*/
conmgr_quiesce(__func__);
if (!report_locks_set()) {
info("Saving all slurm state");
save_all_state();
} else {
error("Semaphores still set after %d seconds, "
"can not save state", CONTROL_TIMEOUT);
}
/*
* Allow other connections to start processing again as
* the listeners are already quiesced
*/
conmgr_unquiesce(__func__);
break;
}
if (difftime(now, last_resv_time) >= 5) {
lock_slurmctld(node_write_lock);
now = time(NULL);
last_resv_time = now;
if (set_node_maint_mode() > 0)
queue_job_scheduler();
unlock_slurmctld(node_write_lock);
}
if (difftime(now, last_no_resp_msg_time) >=
no_resp_msg_interval) {
lock_slurmctld(node_write_lock2);
now = time(NULL);
last_no_resp_msg_time = now;
node_no_resp_msg();
unlock_slurmctld(node_write_lock2);
}
validate_all_reservations(true, true);
if (difftime(now, last_timelimit_time) >= PERIODIC_TIMEOUT) {
lock_slurmctld(job_write_lock);
now = time(NULL);
last_timelimit_time = now;
debug2("Testing job time limits and checkpoints");
job_time_limit();
job_resv_check();
unlock_slurmctld(job_write_lock);
lock_slurmctld(node_write_lock);
check_node_timers();
unlock_slurmctld(node_write_lock);
}
if (!(slurm_conf.health_check_node_state &
HEALTH_CHECK_START_ONLY) &&
slurm_conf.health_check_interval &&
(difftime(now, last_health_check_time) >=
slurm_conf.health_check_interval) &&
is_ping_done()) {
lock_slurmctld(node_write_lock);
if (slurm_conf.health_check_node_state &
HEALTH_CHECK_CYCLE) {
/* Call run_health_check() on each cycle */
} else {
now = time(NULL);
last_health_check_time = now;
}
run_health_check();
unlock_slurmctld(node_write_lock);
}
if (slurm_conf.acct_gather_node_freq &&
(difftime(now, last_acct_gather_node_time) >=
slurm_conf.acct_gather_node_freq) &&
is_ping_done()) {
lock_slurmctld(node_write_lock);
now = time(NULL);
last_acct_gather_node_time = now;
update_nodes_acct_gather_data();
unlock_slurmctld(node_write_lock);
}
if (((difftime(now, last_ping_node_time) >= ping_interval) ||
ping_nodes_now) && is_ping_done()) {
lock_slurmctld(node_write_lock);
now = time(NULL);
last_ping_node_time = now;
ping_nodes_now = false;
ping_nodes();
unlock_slurmctld(node_write_lock);
}
if (slurm_conf.inactive_limit &&
((now - last_ping_srun_time) >=
(slurm_conf.inactive_limit / 3))) {
lock_slurmctld(job_read_lock);
now = time(NULL);
last_ping_srun_time = now;
debug2("Performing srun ping");
srun_ping();
unlock_slurmctld(job_read_lock);
}
if (want_nodes_reboot) {
lock_slurmctld(node_write_lock);
_queue_reboot_msg();
unlock_slurmctld(node_write_lock);
}
/* Process any pending agent work */
agent_trigger(RPC_RETRY_INTERVAL, true, true);
if (slurm_conf.group_time &&
(difftime(now, last_group_time)
>= slurm_conf.group_time)) {
lock_slurmctld(part_write_lock);
now = time(NULL);
last_group_time = now;
load_part_uid_allow_list(slurm_conf.group_force);
reservation_update_groups(slurm_conf.group_force);
unlock_slurmctld(part_write_lock);
group_cache_cleanup();
}
if (difftime(now, last_purge_job_time) >= purge_job_interval) {
/*
* If backfill is running, it will have a list of
* job_record pointers which could include this
* job. Skip over in that case to prevent
* _attempt_backfill() from potentially dereferencing an
* invalid pointer.
*/
slurm_mutex_lock(&check_bf_running_lock);
if (!slurmctld_diag_stats.bf_active) {
lock_slurmctld(purge_job_locks);
now = time(NULL);
last_purge_job_time = now;
debug2("Performing purge of old job records");
purge_old_job();
unlock_slurmctld(purge_job_locks);
}
slurm_mutex_unlock(&check_bf_running_lock);
free_old_jobs();
}
if (difftime(now, last_full_sched_time) >= sched_interval) {
slurm_mutex_lock(&sched_cnt_mutex);
call_schedule = true;
full_queue = true;
job_sched_cnt = 0;
slurm_mutex_unlock(&sched_cnt_mutex);
last_full_sched_time = now;
} else {
slurm_mutex_lock(&sched_cnt_mutex);
if (job_sched_cnt &&
(difftime(now, last_sched_time) >=
batch_sched_delay)) {
call_schedule = true;
job_sched_cnt = 0;
}
slurm_mutex_unlock(&sched_cnt_mutex);
}
if (call_schedule) {
lock_slurmctld(job_write_lock2);
now = time(NULL);
last_sched_time = now;
bb_g_load_state(false); /* May alter job nice/prio */
unlock_slurmctld(job_write_lock2);
schedule(full_queue);
set_job_elig_time();
}
if (difftime(now, last_config_list_update_time) >=
UPDATE_CONFIG_LIST_TIMEOUT) {
last_config_list_update_time = now;
consolidate_config_list(false, false);
}
if (slurm_conf.slurmctld_timeout &&
(difftime(now, last_ctld_bu_ping) >
slurm_conf.slurmctld_timeout)) {
ping_controllers(true);
last_ctld_bu_ping = now;
}
if (difftime(now, last_trigger) > TRIGGER_INTERVAL) {
lock_slurmctld(job_node_read_lock);
now = time(NULL);
last_trigger = now;
trigger_process();
unlock_slurmctld(job_node_read_lock);
}
if (difftime(now, last_checkpoint_time) >=
PERIODIC_CHECKPOINT) {
now = time(NULL);
last_checkpoint_time = now;
debug2("Performing full system state save");
save_all_state();
}
if (difftime(now, last_node_acct) >= PERIODIC_NODE_ACCT) {
/* Report current node state to account for added
* or reconfigured nodes. Locks are done
* inside _accounting_cluster_ready, don't
* lock here. */
now = time(NULL);
last_node_acct = now;
_accounting_cluster_ready();
}
if (difftime(now, slurmctld_diag_stats.job_states_ts) >=
JOB_COUNT_INTERVAL) {
lock_slurmctld(job_read_lock);
_update_diag_job_state_counts();
unlock_slurmctld(job_read_lock);
}
/* Stats will reset at midnight (approx) local time. */
if (last_proc_req_start == 0) {
last_proc_req_start = now;
next_stats_reset = now - (now % 86400) + 86400;
} else if (now >= next_stats_reset) {
next_stats_reset = now - (now % 86400) + 86400;
reset_stats(0);
}
/*
* Reassert this machine as the primary controller.
* A network or security problem could result in
* the backup controller assuming control even
* while the real primary controller is running.
*/
lock_slurmctld(config_read_lock);
if (slurmctld_primary && slurm_conf.slurmctld_timeout &&
(difftime(now, last_assert_primary_time) >=
slurm_conf.slurmctld_timeout)) {
now = time(NULL);
last_assert_primary_time = now;
(void) _shutdown_backup_controller();
}
unlock_slurmctld(config_read_lock);
if (difftime(now, last_uid_update) >= 3600) {
bool uid_set = false;
/*
* Make sure we update the uids in the
* assoc_mgr if there were any users
* with unknown uids at the time of startup.
*/
now = time(NULL);
last_uid_update = now;
assoc_mgr_set_missing_uids(&uid_set);
/*
* If a missing uid was set, schedule a full reservation
* validation to make sure that the reservations are up
* to date.
*/
if (uid_set)
validate_all_reservations(false, true);
}
END_TIMER2(__func__);
}
debug3("_slurmctld_background shutting down");
return NULL;
}
/* save_all_state - save entire slurmctld state for later recovery */
extern void save_all_state(void)
{
/* Each of these functions lock their own databases */
schedule_job_save();
schedule_node_save();
schedule_part_save();
schedule_resv_save();
schedule_trigger_save();
dump_assoc_mgr_state();
fed_mgr_state_save();
}
/* make sure the assoc_mgr is up and running with the most current state */
extern void ctld_assoc_mgr_init(void)
{
assoc_init_args_t assoc_init_arg;
int num_jobs = 0;
slurmctld_lock_t job_read_lock =
{ NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
memset(&assoc_init_arg, 0, sizeof(assoc_init_args_t));
assoc_init_arg.enforce = accounting_enforce;
assoc_init_arg.running_cache = &running_cache;
assoc_init_arg.add_license_notify = license_add_remote;
assoc_init_arg.resize_qos_notify = _resize_qos;
assoc_init_arg.remove_assoc_notify = _remove_assoc;
assoc_init_arg.remove_license_notify = license_remove_remote;
assoc_init_arg.remove_qos_notify = _remove_qos;
assoc_init_arg.sync_license_notify = license_sync_remote;
assoc_init_arg.update_assoc_notify = _update_assoc;
assoc_init_arg.update_license_notify = license_update_remote;
assoc_init_arg.update_qos_notify = _update_qos;
assoc_init_arg.update_cluster_tres = _update_cluster_tres;
assoc_init_arg.update_resvs = _update_parts_and_resvs;
assoc_init_arg.cache_level = ASSOC_MGR_CACHE_ASSOC |
ASSOC_MGR_CACHE_USER |
ASSOC_MGR_CACHE_QOS |
ASSOC_MGR_CACHE_RES |
ASSOC_MGR_CACHE_TRES |
ASSOC_MGR_CACHE_WCKEY;
/* Don't save state but blow away old lists if they exist. */
assoc_mgr_fini(0);
_init_db_conn();
if (assoc_mgr_init(acct_db_conn, &assoc_init_arg, errno)) {
trigger_primary_dbd_fail();
error("Association database appears down, reading from state files.");
if (!slurm_conf.cluster_id ||
(load_assoc_mgr_last_tres() != SLURM_SUCCESS) ||
(load_assoc_mgr_state() != SLURM_SUCCESS)) {
error("Unable to get any information from the state file");
_retry_init_db_conn(&assoc_init_arg);
}
}
if (!slurm_conf.cluster_id) {
slurm_conf.cluster_id = generate_cluster_id();
_create_clustername_file();
}
sluid_init(slurm_conf.cluster_id, 0);
/* Now load the usage from a flat file since it isn't kept in
the database
*/
load_assoc_usage();
load_qos_usage();
lock_slurmctld(job_read_lock);
if (job_list)
num_jobs = list_count(job_list);
unlock_slurmctld(job_read_lock);
_init_tres();
/* This thread is looking for when we get correct data from
the database so we can update the assoc_ptr's in the jobs
*/
if ((running_cache != RUNNING_CACHE_STATE_NOTRUNNING) || num_jobs) {
slurm_thread_create(&assoc_cache_thread,
_assoc_cache_mgr, NULL);
}
}
/* Make sure the assoc_mgr thread is terminated */
extern void ctld_assoc_mgr_fini(void)
{
if (running_cache == RUNNING_CACHE_STATE_NOTRUNNING)
return;
/* break out and end the association cache
* thread since we are shutting down, no reason
* to wait for current info from the database */
slurm_mutex_lock(&assoc_cache_mutex);
running_cache = RUNNING_CACHE_STATE_EXITING;
slurm_cond_signal(&assoc_cache_cond);
slurm_mutex_unlock(&assoc_cache_mutex);
slurm_thread_join(assoc_cache_thread);
}
static int _add_node_gres_tres(void *x, void *arg)
{
uint64_t gres_cnt;
int tres_pos;
slurmdb_tres_rec_t *tres_rec_in = x;
node_record_t *node_ptr = arg;
xassert(tres_rec_in);
if (xstrcmp(tres_rec_in->type, "gres"))
return 0;
gres_cnt = gres_node_config_cnt(node_ptr->gres_list, tres_rec_in->name);
/*
* Set the count here for named GRES as we don't store the count the
* same way we do for unnamed GRES.
*/
if (strchr(tres_rec_in->name, ':'))
tres_rec_in->count += gres_cnt;
if ((tres_pos = assoc_mgr_find_tres_pos(tres_rec_in, true)) != -1)
node_ptr->tres_cnt[tres_pos] = gres_cnt;
return 0;
}
/*
* Set the node's billing tres to the highest billing of all partitions that the
* node is a part of.
*/
static void _set_node_billing_tres(node_record_t *node_ptr, uint64_t cpu_count,
bool assoc_mgr_locked)
{
int i;
part_record_t *part_ptr = NULL;
double max_billing = 0;
xassert(node_ptr);
for (i = 0; i < node_ptr->part_cnt; i++) {
double tmp_billing;
part_ptr = node_ptr->part_pptr[i];
if (!part_ptr->billing_weights)
continue;
tmp_billing = assoc_mgr_tres_weighted(
node_ptr->tres_cnt, part_ptr->billing_weights,
slurm_conf.priority_flags, assoc_mgr_locked);
max_billing = MAX(max_billing, tmp_billing);
}
/* Set to the configured cpu_count if no partition has
* tresbillingweights set because the job will be allocated the job's
* cpu count if there are no tresbillingweights defined. */
if (!max_billing)
max_billing = cpu_count;
node_ptr->tres_cnt[TRES_ARRAY_BILLING] = max_billing;
}
extern void set_cluster_tres(bool assoc_mgr_locked)
{
node_record_t *node_ptr;
slurmdb_tres_rec_t *tres_rec, *cpu_tres = NULL, *mem_tres = NULL;
int i;
uint64_t cluster_billing = 0;
char *unique_tres = NULL;
assoc_mgr_lock_t locks = {
.qos = WRITE_LOCK,
.tres = WRITE_LOCK };
int active_node_count = 0;
xassert(verify_lock(NODE_LOCK, WRITE_LOCK));
xassert(verify_lock(PART_LOCK, WRITE_LOCK));
if (!assoc_mgr_locked)
assoc_mgr_lock(&locks);
xassert(assoc_mgr_tres_array);
for (i = 0; i < g_tres_count; i++) {
tres_rec = assoc_mgr_tres_array[i];
if (!tres_rec->type) {
error("TRES %d doesn't have a type given, this should never happen",
tres_rec->id);
continue; /* this should never happen */
}
if (unique_tres)
xstrfmtcat(unique_tres, ",%s",
assoc_mgr_tres_name_array[i]);
else
unique_tres = xstrdup(assoc_mgr_tres_name_array[i]);
/* reset them now since we are about to add to them */
tres_rec->count = 0;
if (tres_rec->id == TRES_CPU) {
cpu_tres = tres_rec;
continue;
} else if (tres_rec->id == TRES_MEM) {
mem_tres = tres_rec;
continue;
} else if (!xstrcmp(tres_rec->type, "bb")) {
tres_rec->count = bb_g_get_system_size(tres_rec->name);
continue;
} else if (!xstrcmp(tres_rec->type, "gres")) {
/*
* Skip named GRES as we don't store
* the count the same way we do for unnamed GRES.
*/
if (strchr(tres_rec->name, ':'))
continue;
tres_rec->count =
gres_get_system_cnt(tres_rec->name, true);
if (tres_rec->count == NO_VAL64)
tres_rec->count = 0; /* GRES name not found */
continue;
} else if (!xstrcmp(tres_rec->type, "license")) {
tres_rec->count = get_total_license_cnt(
tres_rec->name);
continue;
}
/* FIXME: set up the other tres here that aren't specific */
}
xfree(slurm_conf.accounting_storage_tres);
slurm_conf.accounting_storage_tres = unique_tres;
cluster_cpus = 0;
for (i = 0; (node_ptr = next_node(&i)); i++) {
uint64_t cpu_count = 0, mem_count = 0;
if (!node_ptr->name)
continue;
active_node_count++;
cpu_count = node_ptr->cpus_efctv;
mem_count = node_ptr->config_ptr->real_memory;
cluster_cpus += cpu_count;
if (mem_tres)
mem_tres->count += mem_count;
if (!node_ptr->tres_cnt)
node_ptr->tres_cnt = xcalloc(slurmctld_tres_cnt,
sizeof(uint64_t));
node_ptr->tres_cnt[TRES_ARRAY_CPU] = cpu_count;
node_ptr->tres_cnt[TRES_ARRAY_MEM] = mem_count;
list_for_each(assoc_mgr_tres_list,
_add_node_gres_tres, node_ptr);
_set_node_billing_tres(node_ptr, cpu_count, true);
cluster_billing += node_ptr->tres_cnt[TRES_ARRAY_BILLING];
xfree(node_ptr->tres_str);
node_ptr->tres_str =
assoc_mgr_make_tres_str_from_array(node_ptr->tres_cnt,
TRES_STR_FLAG_SIMPLE,
true);
xfree(node_ptr->tres_fmt_str);
node_ptr->tres_fmt_str =
assoc_mgr_make_tres_str_from_array(
node_ptr->tres_cnt,
TRES_STR_CONVERT_UNITS,
true);
}
/* FIXME: cluster_cpus probably needs to be removed and handled
* differently in the spots this is used.
*/
if (cpu_tres)
cpu_tres->count = cluster_cpus;
assoc_mgr_tres_array[TRES_ARRAY_NODE]->count = active_node_count;
assoc_mgr_tres_array[TRES_ARRAY_BILLING]->count = cluster_billing;
set_partition_tres(true);
if (!assoc_mgr_locked)
assoc_mgr_unlock(&locks);
}
/*
* slurmctld_shutdown - wake up _slurm_rpc_mgr thread via signal
* RET 0 or error code
*/
int slurmctld_shutdown(void)
{
sched_debug("slurmctld terminating");
slurmctld_config.shutdown_time = time(NULL);
slurm_cond_signal(&shutdown_cond);
pthread_kill(pthread_self(), SIGUSR1);
return SLURM_SUCCESS;
}
/*
* _parse_commandline - parse and process any command line arguments
* IN argc - number of command line arguments
* IN argv - the command line arguments
* IN/OUT conf_ptr - pointer to current configuration, update as needed
*/
static void _parse_commandline(int argc, char **argv)
{
int c = 0;
char *tmp_char;
enum {
LONG_OPT_ENUM_START = 0x100,
LONG_OPT_SYSTEMD,
};
static struct option long_options[] = {
{"systemd", no_argument, 0, LONG_OPT_SYSTEMD},
{"version", no_argument, 0, 'V'},
{NULL, 0, 0, 0}
};
if (run_command_is_launcher(argc, argv)) {
char *ctx = getenv("SLURM_SCRIPT_CONTEXT");
if (!xstrcmp(ctx, "burst_buffer.lua")) {
unsetenv("SLURM_SCRIPT_CONTEXT");
slurmscriptd_handle_bb_lua_mode(argc, argv);
_exit(127);
}
run_command_launcher(argc, argv);
_exit(127); /* Should not get here */
}
opterr = 0;
while ((c = getopt_long(argc, argv, "cdDf:hiL:n:rRsvV",
long_options, NULL)) > 0) {
switch (c) {
case 'c':
recover = 0;
break;
case 'D':
daemonize = false;
break;
case 'f':
xfree(slurm_conf_filename);
slurm_conf_filename = xstrdup(optarg);
break;
case 'h':
_usage();
exit(0);
break;
case 'i':
ignore_state_errors = true;
break;
case 'L':
xfree(debug_logfile);
debug_logfile = xstrdup(optarg);
break;
case 'n':
new_nice = strtol(optarg, &tmp_char, 10);
if (tmp_char[0] != '\0') {
error("Invalid option for -n option (nice "
"value), ignored");
new_nice = 0;
}
break;
case 'r':
recover = 1;
break;
case 'R':
recover = 2;
break;
case 's':
setwd = true;
break;
case 'v':
debug_level++;
break;
case 'V':
print_slurm_version();
exit(0);
break;
case LONG_OPT_SYSTEMD:
under_systemd = true;
break;
default:
_usage();
exit(1);
}
}
if (under_systemd && !daemonize)
fatal("--systemd and -D options are mutually exclusive");
/*
* Reconfiguration has historically been equivalent to recover = 1.
* Force defaults in case the original process used '-c', '-i' or '-R'.
*/
if (!original) {
ignore_state_errors = false;
recover = 1;
}
if (under_systemd) {
if (!getenv("NOTIFY_SOCKET"))
fatal("Missing NOTIFY_SOCKET.");
daemonize = false;
setwd = true;
}
/*
* Using setwd() later means a relative path to ourselves may shift.
* Capture /proc/self/exe now and save this for reconfig later.
* Cannot wait to capture it later as Linux will append " (deleted)"
* to the filename if it's been replaced, which would break reconfig
* after an upgrade.
*/
if (argv[0][0] != '/') {
if (readlink("/proc/self/exe", binary, PATH_MAX) < 0)
fatal("%s: readlink failed: %m", __func__);
} else {
strlcpy(binary, argv[0], PATH_MAX);
}
}
static void _usage(void)
{
char *txt;
static_ref_to_cstring(txt, usage_txt);
fprintf(stderr, "%s", txt);
xfree(txt);
}
static void *_shutdown_bu_thread(void *arg)
{
int bu_inx, rc = SLURM_SUCCESS, rc2 = SLURM_SUCCESS;
slurm_msg_t req;
bool do_shutdown = false;
shutdown_arg_t *shutdown_arg;
shutdown_msg_t shutdown_msg;
shutdown_arg = arg;
bu_inx = shutdown_arg->index;
do_shutdown = shutdown_arg->shutdown;
xfree(arg);
slurm_msg_t_init(&req);
slurm_msg_set_r_uid(&req, slurm_conf.slurm_user_id);
slurm_set_addr(&req.address, slurm_conf.slurmctld_port,
slurm_conf.control_addr[bu_inx]);
if (do_shutdown) {
req.msg_type = REQUEST_SHUTDOWN;
shutdown_msg.options = SLURMCTLD_SHUTDOWN_CTLD;
req.data = &shutdown_msg;
} else {
req.msg_type = REQUEST_CONTROL;
}
debug("Requesting control from backup controller %s",
slurm_conf.control_machine[bu_inx]);
if (slurm_send_recv_rc_msg_only_one(&req, &rc2,
(CONTROL_TIMEOUT * 1000)) < 0) {
error("%s:send/recv %s: %m",
__func__, slurm_conf.control_machine[bu_inx]);
rc = SLURM_ERROR;
} else if (rc2 == ESLURM_DISABLED) {
debug("backup controller %s responding",
slurm_conf.control_machine[bu_inx]);
} else if (rc2 == SLURM_SUCCESS) {
debug("backup controller %s has relinquished control",
slurm_conf.control_machine[bu_inx]);
} else {
error("%s (%s): %s", __func__,
slurm_conf.control_machine[bu_inx],
slurm_strerror(rc2));
rc = SLURM_ERROR;
}
slurm_mutex_lock(&bu_mutex);
if (rc != SLURM_SUCCESS)
bu_rc = rc;
bu_thread_cnt--;
slurm_cond_signal(&bu_cond);
slurm_mutex_unlock(&bu_mutex);
return NULL;
}
/*
* Tell the backup_controllers to relinquish control, primary control_machine
* has resumed operation. Messages sent to all controllers in parallel.
* RET 0 or an error code
* NOTE: READ lock_slurmctld config before entry (or be single-threaded)
*/
static int _shutdown_backup_controller(void)
{
int i;
shutdown_arg_t *shutdown_arg;
bu_rc = SLURM_SUCCESS;
/* If we don't have any backups configured just return */
if (slurm_conf.control_cnt == 1)
return bu_rc;
debug2("shutting down backup controllers (my index: %d)", backup_inx);
for (i = 1; i < slurm_conf.control_cnt; i++) {
if (i == backup_inx)
continue; /* No message to self */
if ((slurm_conf.control_addr[i] == NULL) ||
(slurm_conf.control_addr[i][0] == '\0'))
continue;
shutdown_arg = xmalloc(sizeof(*shutdown_arg));
shutdown_arg->index = i;
/*
* need to send actual REQUEST_SHUTDOWN to non-primary ctlds
* in order to have them properly shutdown and not contend
* for primary position, otherwise "takeover" results in
* contention among backups for primary position.
*/
if (i < backup_inx)
shutdown_arg->shutdown = true;
slurm_thread_create_detached(_shutdown_bu_thread,
shutdown_arg);
slurm_mutex_lock(&bu_mutex);
bu_thread_cnt++;
slurm_mutex_unlock(&bu_mutex);
}
slurm_mutex_lock(&bu_mutex);
while (bu_thread_cnt != 0) {
slurm_cond_wait(&bu_cond, &bu_mutex);
}
slurm_mutex_unlock(&bu_mutex);
return bu_rc;
}
/*
* Update log levels given requested levels
* NOTE: Will not turn on originally configured off (quiet) channels
*/
void update_log_levels(int req_slurmctld_debug, int req_syslog_debug)
{
static bool conf_init = false;
static int conf_slurmctld_debug, conf_syslog_debug;
log_options_t log_opts = LOG_OPTS_INITIALIZER;
int slurmctld_debug;
int syslog_debug;
/*
* Keep track of the original debug levels from slurm.conf so that
* `scontrol setdebug` does not turn on non-active logging channels.
* NOTE: It is known that `scontrol reconfigure` will cause an issue
* when reconfigured with a slurm.conf that changes SlurmctldDebug
* from level QUIET to a non-quiet value.
* NOTE: Planned changes to `reconfigure` behavior should make this a
* non-issue in a future release.
*/
if (!conf_init) {
conf_slurmctld_debug = slurm_conf.slurmctld_debug;
conf_syslog_debug = slurm_conf.slurmctld_syslog_debug;
conf_init = true;
}
/*
* NOTE: not offset by LOG_LEVEL_INFO, since it's inconvenient
* to provide negative values for scontrol
*/
slurmctld_debug = MIN(req_slurmctld_debug, (LOG_LEVEL_END - 1));
slurmctld_debug = MAX(slurmctld_debug, LOG_LEVEL_QUIET);
syslog_debug = MIN(req_syslog_debug, (LOG_LEVEL_END - 1));
syslog_debug = MAX(syslog_debug, LOG_LEVEL_QUIET);
if (daemonize)
log_opts.stderr_level = LOG_LEVEL_QUIET;
else
log_opts.stderr_level = slurmctld_debug;
if (slurm_conf.slurmctld_logfile &&
(conf_slurmctld_debug != LOG_LEVEL_QUIET))
log_opts.logfile_level = slurmctld_debug;
else
log_opts.logfile_level = LOG_LEVEL_QUIET;
if (conf_syslog_debug == LOG_LEVEL_QUIET)
log_opts.syslog_level = LOG_LEVEL_QUIET;
else if (slurm_conf.slurmctld_syslog_debug != LOG_LEVEL_END)
log_opts.syslog_level = syslog_debug;
else if (!daemonize)
log_opts.syslog_level = LOG_LEVEL_QUIET;
else if (!slurm_conf.slurmctld_logfile &&
(conf_slurmctld_debug > LOG_LEVEL_QUIET))
log_opts.syslog_level = slurmctld_debug;
else
log_opts.syslog_level = LOG_LEVEL_FATAL;
log_alter(log_opts, LOG_DAEMON, slurm_conf.slurmctld_logfile);
debug("slurmctld log levels: stderr=%s logfile=%s syslog=%s",
log_num2string(log_opts.stderr_level),
log_num2string(log_opts.logfile_level),
log_num2string(log_opts.syslog_level));
}
/*
* Reset slurmctld logging based upon configuration parameters uses common
* slurm_conf data structure
*/
void update_logging(void)
{
int rc;
uid_t slurm_user_id = slurm_conf.slurm_user_id;
gid_t slurm_user_gid = gid_from_uid(slurm_user_id);
xassert(verify_lock(CONF_LOCK, WRITE_LOCK));
/* Preserve execute line arguments (if any) */
if (debug_level) {
slurm_conf.slurmctld_debug = MIN(
(LOG_LEVEL_INFO + debug_level),
(LOG_LEVEL_END - 1));
}
if (slurm_conf.slurmctld_debug != NO_VAL16) {
log_opts.logfile_level = slurm_conf.slurmctld_debug;
}
if (debug_logfile) {
xfree(slurm_conf.slurmctld_logfile);
slurm_conf.slurmctld_logfile = xstrdup(debug_logfile);
}
log_set_timefmt(slurm_conf.log_fmt);
update_log_levels(slurm_conf.slurmctld_debug,
slurm_conf.slurmctld_syslog_debug);
debug("Log file re-opened");
/*
* SchedLogLevel restore
*/
if (slurm_conf.sched_log_level != NO_VAL16)
sched_log_opts.logfile_level = slurm_conf.sched_log_level;
sched_log_alter(sched_log_opts, LOG_DAEMON, slurm_conf.sched_logfile);
if (slurm_conf.slurmctld_logfile) {
rc = chown(slurm_conf.slurmctld_logfile,
slurm_user_id, slurm_user_gid);
if (rc && daemonize) {
error("chown(%s, %u, %u): %m",
slurm_conf.slurmctld_logfile,
slurm_user_id, slurm_user_gid);
}
}
if (slurm_conf.sched_logfile) {
rc = chown(slurm_conf.sched_logfile,
slurm_user_id, slurm_user_gid);
if (rc && daemonize) {
error("chown(%s, %u, %u): %m",
slurm_conf.sched_logfile,
slurm_user_id, slurm_user_gid);
}
}
}
/* Reset slurmd nice value */
static void _update_nice(void)
{
int cur_nice;
id_t pid;
if (new_nice == 0) /* No change */
return;
pid = getpid();
cur_nice = getpriority(PRIO_PROCESS, pid);
if (cur_nice == new_nice)
return;
if (setpriority(PRIO_PROCESS, pid, new_nice))
error("Unable to reset nice value to %d: %m", new_nice);
}
/*
* Verify that ClusterName from slurm.conf matches the state directory.
* If mismatched, exit immediately to protect state files from corruption.
*/
static void _verify_clustername(void)
{
FILE *fp;
char *filename = NULL;
char name[512] = {0};
xstrfmtcat(filename, "%s/clustername", slurm_conf.state_save_location);
if ((fp = fopen(filename, "r"))) {
char *pipe;
/* read value and compare */
if (!fgets(name, sizeof(name), fp)) {
error("%s: reading cluster name from clustername file",
__func__);
}
fclose(fp);
pipe = xstrchr(name, '|');
if (pipe) {
pipe[0] = '\0';
slurm_conf.cluster_id = slurm_atoul(pipe+1);
}
if (xstrcmp(name, slurm_conf.cluster_name)) {
fatal("CLUSTER NAME MISMATCH.\n"
"slurmctld has been started with \"ClusterName=%s\", but read \"%s\" from the state files in StateSaveLocation.\n"
"Running multiple clusters from a shared StateSaveLocation WILL CAUSE CORRUPTION.\n"
"Remove %s to override this safety check if this is intentional (e.g., the ClusterName has changed).",
slurm_conf.cluster_name, name, filename);
exit(1);
}
}
xfree(filename);
}
static void _create_clustername_file(void)
{
FILE *fp;
char *filename = NULL;
char *tmp_str = xstrdup_printf("%s|%u",
slurm_conf.cluster_name,
slurm_conf.cluster_id);
filename = xstrdup_printf("%s/clustername",
slurm_conf.state_save_location);
info("creating clustername file: ClusterName=%s ClusterID=%u",
slurm_conf.cluster_name, slurm_conf.cluster_id);
clustername_existed = 0;
if (!(fp = fopen(filename, "w"))) {
fatal("%s: failed to create file %s", __func__, filename);
exit(1);
}
if (fputs(tmp_str, fp) < 0) {
fatal("%s: failed to write to file %s", __func__, filename);
exit(1);
}
fclose(fp);
xfree(tmp_str);
xfree(filename);
}
/* Kill the currently running slurmctld
* NOTE: No need to lock the config data since we are still single-threaded */
static void _kill_old_slurmctld(void)
{
int fd;
pid_t oldpid = read_pidfile(slurm_conf.slurmctld_pidfile, &fd);
if (oldpid != (pid_t) 0) {
if (!ignore_state_errors && xstrstr(slurm_conf.slurmctld_params, "no_quick_restart"))
fatal("SlurmctldParameters=no_quick_restart set. Please shutdown your previous slurmctld (pid oldpid) before starting a new one. (-i to ignore this message)");
info ("killing old slurmctld[%ld]", (long) oldpid);
kill(oldpid, SIGTERM);
/*
* Wait for previous daemon to terminate
*/
if (fd_get_readw_lock(fd) < 0)
fatal ("unable to wait for readw lock: %m");
(void) close(fd); /* Ignore errors */
}
}
/* NOTE: No need to lock the config data since we are still single-threaded */
static void _init_pidfile(void)
{
if (!xstrcmp(slurm_conf.slurmctld_pidfile, slurm_conf.slurmd_pidfile))
error("SlurmctldPid == SlurmdPid, use different names");
/* Don't close the fd returned here since we need to keep the
* fd open to maintain the write lock */
pidfd = create_pidfile(slurm_conf.slurmctld_pidfile,
slurm_conf.slurm_user_id);
}
static void _update_pidfile(void)
{
char *env = getenv("SLURMCTLD_RECONF_PIDFD");
if (!env) {
debug("%s: missing SLURMCTLD_RECONF_PIDFD envvar", __func__);
return;
}
pidfd = atoi(env);
update_pidfile(pidfd);
}
/*
* set_slurmctld_state_loc - create state directory as needed and "cd" to it
* NOTE: config read lock must be set on entry
*/
extern void set_slurmctld_state_loc(void)
{
int rc;
struct stat st;
const char *path = slurm_conf.state_save_location;
/*
* If state save location does not exist, try to create it.
* Otherwise, ensure path is a directory as expected, and that
* we have permission to write to it.
*/
if (((rc = stat(path, &st)) < 0) && (errno == ENOENT)) {
if (mkdir(path, 0755) < 0)
fatal("mkdir(%s): %m", path);
}
else if (rc < 0)
fatal("Unable to stat state save loc: %s: %m", path);
else if (!S_ISDIR(st.st_mode))
fatal("State save loc: %s: Not a directory!", path);
else if (access(path, R_OK|W_OK|X_OK) < 0)
fatal("Incorrect permissions on state save loc: %s", path);
}
static int _foreach_cache_update_job(void *x, void *arg)
{
job_record_t *job_ptr = x;
(void) _update_job_tres(job_ptr, NULL);
if (job_ptr->assoc_id) {
slurmdb_assoc_rec_t assoc_rec = {
.id = job_ptr->assoc_id,
};
debug("assoc is %zx (%d) for %pJ",
(size_t)job_ptr->assoc_ptr, job_ptr->assoc_id,
job_ptr);
if (assoc_mgr_fill_in_assoc(
acct_db_conn, &assoc_rec,
accounting_enforce,
&job_ptr->assoc_ptr, true)) {
verbose("Invalid association id %u for %pJ",
job_ptr->assoc_id, job_ptr);
/* not a fatal error, association could have
* been removed */
}
debug("now assoc is %zx (%d) for %pJ",
(size_t)job_ptr->assoc_ptr, job_ptr->assoc_id,
job_ptr);
}
if (job_ptr->qos_list) {
list_flush(job_ptr->qos_list);
char *token, *last = NULL;
char *tmp_qos_req = xstrdup(job_ptr->details->qos_req);
slurmdb_qos_rec_t *qos_ptr = NULL;
token = strtok_r(tmp_qos_req, ",", &last);
while (token) {
slurmdb_qos_rec_t qos_rec = {
.name = token,
};
if ((assoc_mgr_fill_in_qos(
acct_db_conn, &qos_rec,
accounting_enforce,
&qos_ptr,
true)) != SLURM_SUCCESS) {
verbose("Invalid qos (%u) for %pJ",
job_ptr->qos_id, job_ptr);
/* not a fatal error, qos could have
* been removed */
} else
list_append(job_ptr->qos_list, qos_ptr);
token = strtok_r(NULL, ",", &last);
}
xfree(tmp_qos_req);
if (list_count(job_ptr->qos_list)) {
list_sort(job_ptr->qos_list, priority_sort_qos_desc);
/* If we are pending we want the highest prio */
if (IS_JOB_PENDING(job_ptr)) {
job_ptr->qos_ptr = list_peek(job_ptr->qos_list);
job_ptr->qos_id = job_ptr->qos_ptr->id;
} else {
job_ptr->qos_ptr = list_find_first(
job_ptr->qos_list,
slurmdb_find_qos_in_list,
&job_ptr->qos_id);
if (!job_ptr->qos_ptr) {
verbose("Invalid qos (%u) for %pJ from qos_req '%s'",
job_ptr->qos_id,
job_ptr,
job_ptr->details->qos_req);
goto use_qos_id;
}
}
} else
FREE_NULL_LIST(job_ptr->qos_list);
} else if (job_ptr->qos_id) {
use_qos_id: ; /* must be a blank ; for older compilers (el7) */
slurmdb_qos_rec_t qos_rec = {
.id = job_ptr->qos_id,
};
if ((assoc_mgr_fill_in_qos(
acct_db_conn, &qos_rec,
accounting_enforce,
&job_ptr->qos_ptr,
true)) != SLURM_SUCCESS) {
verbose("Invalid qos (%u) for %pJ",
job_ptr->qos_id, job_ptr);
/* not a fatal error, qos could have
* been removed */
}
}
return 0;
}
static int _foreach_cache_update_part(void *x, void *arg)
{
part_record_t *part_ptr = x;
if (part_ptr->allow_qos)
qos_list_build(part_ptr->allow_qos, true,
&part_ptr->allow_qos_bitstr);
if (part_ptr->deny_qos)
qos_list_build(part_ptr->deny_qos, true,
&part_ptr->deny_qos_bitstr);
if (part_ptr->qos_char) {
slurmdb_qos_rec_t qos_rec = {
.name = part_ptr->qos_char,
};
part_ptr->qos_ptr = NULL;
if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec,
accounting_enforce,
&part_ptr->qos_ptr,
true) != SLURM_SUCCESS) {
fatal("Partition %s has an invalid qos (%s), "
"please check your configuration",
part_ptr->name, qos_rec.name);
}
}
part_update_assoc_lists(part_ptr, NULL);
return 0;
}
/* _assoc_cache_mgr - hold out until we have real data from the
* database so we can reset the job ptr's assoc ptr's */
static void *_assoc_cache_mgr(void *no_data)
{
/* Write lock on jobs, nodes and partitions */
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
assoc_mgr_lock_t locks =
{ .assoc = READ_LOCK, .qos = WRITE_LOCK, .tres = WRITE_LOCK,
.user = READ_LOCK };
if (running_cache != RUNNING_CACHE_STATE_RUNNING) {
slurm_mutex_lock(&assoc_cache_mutex);
lock_slurmctld(job_write_lock);
/*
* It is ok to have the job_write_lock here as long as
* running_cache != RUNNING_CACHE_STATE_NOTRUNNING. This short
* circuits the association manager to not call callbacks. If
* we come out of cache we need the job_write_lock locked until
* the end to prevent a race condition on the job_list (some
* running without new info and some running with the cached
* info).
*
* Make sure not to have the assoc_mgr or the
* slurmdbd_lock locked when refresh_lists is called or you may
* get deadlock.
*/
assoc_mgr_refresh_lists(acct_db_conn, 0);
if (g_tres_count != slurmctld_tres_cnt) {
info("TRES in database does not match cache (%u != %u). Updating...",
g_tres_count, slurmctld_tres_cnt);
_init_tres();
}
slurm_mutex_unlock(&assoc_cache_mutex);
}
while (running_cache == RUNNING_CACHE_STATE_RUNNING) {
slurm_mutex_lock(&assoc_cache_mutex);
slurm_cond_wait(&assoc_cache_cond, &assoc_cache_mutex);
/* This is here to see if we are exiting. If so then
just return since we are closing down.
*/
if (running_cache == RUNNING_CACHE_STATE_EXITING) {
slurm_mutex_unlock(&assoc_cache_mutex);
return NULL;
}
lock_slurmctld(job_write_lock);
/*
* It is ok to have the job_write_lock here as long as
* running_cache != RUNNING_CACHE_STATE_NOTRUNNING. This short
* circuits the association manager to not call callbacks. If
* we come out of cache we need the job_write_lock locked until
* the end to prevent a race condition on the job_list (some
* running without new info and some running with the cached
* info).
*
* Make sure not to have the assoc_mgr or the
* slurmdbd_lock locked when refresh_lists is called or you may
* get deadlock.
*/
assoc_mgr_refresh_lists(acct_db_conn, 0);
if (g_tres_count != slurmctld_tres_cnt) {
info("TRES in database does not match cache "
"(%u != %u). Updating...",
g_tres_count, slurmctld_tres_cnt);
_init_tres();
}
/*
* If running_cache == RUNNING_CACHE_STATE_LISTS_REFRESHED it
* means the assoc_mgr has deemed all is good but we can't
* actually enforce it until now since _init_tres() could call
* assoc_mgr_refresh_lists() again which makes it so you could
* get deadlock.
*/
if (running_cache == RUNNING_CACHE_STATE_LISTS_REFRESHED)
running_cache = RUNNING_CACHE_STATE_NOTRUNNING;
else if (running_cache == RUNNING_CACHE_STATE_RUNNING)
unlock_slurmctld(job_write_lock);
slurm_mutex_unlock(&assoc_cache_mutex);
}
assoc_mgr_lock(&locks);
if (job_list) {
debug2("got real data from the database refreshing the association ptr's for %d jobs",
list_count(job_list));
(void) list_for_each(job_list, _foreach_cache_update_job, NULL);
}
if (part_list) {
(void) list_for_each(part_list, _foreach_cache_update_part,
NULL);
}
set_cluster_tres(true);
assoc_mgr_unlock(&locks);
/* issuing a reconfig will reset the pointers on the burst
buffers */
bb_g_reconfig();
unlock_slurmctld(job_write_lock);
/* This needs to be after the lock and after we update the
jobs so if we need to send them we are set. */
_accounting_cluster_ready();
_get_fed_updates();
return NULL;
}
/*
* Find this host in the controller index, or return -1 on error.
*/
static int _controller_index(void)
{
int i;
/*
* Slurm internal HA mode (or no HA).
* Each controller is separately defined, and a single hostname is in
* each control_machine entry.
*/
for (i = 0; i < slurm_conf.control_cnt; i++) {
if (slurm_conf.control_machine[i] &&
slurm_conf.control_addr[i] &&
(!xstrcmp(slurmctld_config.node_name_short,
slurm_conf.control_machine[i]) ||
!xstrcmp(slurmctld_config.node_name_long,
slurm_conf.control_machine[i]))) {
return i;
}
}
/*
* External HA mode. Here a single control_addr has been defined,
* but multiple hostnames are in control_machine[0] with comma
* separation. If our hostname matches any of those, we are considered
* to be a valid controller, and which is active must be managed by
* an external HA solution.
*/
if (xstrchr(slurm_conf.control_machine[0], ',')) {
char *token, *last = NULL;
char *tmp_name = xstrdup(slurm_conf.control_machine[0]);
token = strtok_r(tmp_name, ",", &last);
while (token) {
if (!xstrcmp(slurmctld_config.node_name_short, token) ||
!xstrcmp(slurmctld_config.node_name_long, token)) {
xfree(tmp_name);
return 0;
}
token = strtok_r(NULL, ",", &last);
}
xfree(tmp_name);
}
return -1;
}
static void _test_thread_limit(void)
{
#ifdef RLIMIT_NOFILE
struct rlimit rlim[1];
if (getrlimit(RLIMIT_NOFILE, rlim) < 0)
error("Unable to get file count limit");
else if ((rlim->rlim_cur != RLIM_INFINITY) &&
(max_server_threads > rlim->rlim_cur)) {
max_server_threads = rlim->rlim_cur;
info("Reducing max_server_thread to %u due to file count limit "
"of %u", max_server_threads, max_server_threads);
}
#endif
}
static void _set_work_dir(void)
{
bool success = false;
if (slurm_conf.slurmctld_logfile &&
(slurm_conf.slurmctld_logfile[0] == '/')) {
char *slash_ptr, *work_dir;
work_dir = xstrdup(slurm_conf.slurmctld_logfile);
slash_ptr = strrchr(work_dir, '/');
if (slash_ptr == work_dir)
work_dir[1] = '\0';
else
slash_ptr[0] = '\0';
if ((access(work_dir, W_OK) != 0) || (chdir(work_dir) < 0))
error("chdir(%s): %m", work_dir);
else
success = true;
xfree(work_dir);
}
if (!success) {
if ((access(slurm_conf.state_save_location, W_OK) != 0) ||
(chdir(slurm_conf.state_save_location) < 0)) {
error("chdir(%s): %m",
slurm_conf.state_save_location);
} else
success = true;
}
if (!success) {
if ((access("/var/tmp", W_OK) != 0) ||
(chdir("/var/tmp") < 0)) {
error("chdir(/var/tmp): %m");
} else
info("chdir to /var/tmp");
}
}
/*
* _purge_files_thread - separate thread to remove job batch/environ files
* from the state directory. Runs async from purge_old_jobs to avoid
* holding locks while the files are removed, which can cause performance
* problems under high throughput conditions.
*
* Uses the purge_cond to wakeup on demand, then works through the global
* purge_files_list of job_ids and removes their files.
*/
static void *_purge_files_thread(void *no_data)
{
int *job_id;
/*
* Use the purge_files_list as a queue. _delete_job_details()
* in job_mgr.c always enqueues (at the end), while
*_purge_files_thread consumes off the front.
*
* There is a potential race condition if the job numbers have
* wrapped between _purge_thread removing the state files and
* get_next_job_id trying to re-assign it. This is mitigated
* the call to _dup_job_file_test() in job_mgr.c ensuring
* there is no existing directory for an id before assigning it.
*/
/*
* pthread_cond_wait requires a lock to release and reclaim.
* the list structure is already handling locking for itself,
* so this lock isn't actually useful, and the thread calling
* pthread_cond_signal isn't required to have the lock. So
* lock it once and hold it until slurmctld shuts down.
*/
slurm_mutex_lock(&purge_thread_lock);
while (!slurmctld_config.shutdown_time) {
slurm_cond_wait(&purge_thread_cond, &purge_thread_lock);
debug2("%s: starting, %d jobs to purge", __func__,
list_count(purge_files_list));
/*
* Use list_dequeue here (instead of list_flush) as it will not
* hold up the list lock when we try to enqueue jobs that need
* to be freed.
*/
while ((job_id = list_dequeue(purge_files_list))) {
debug2("%s: purging files from JobId=%u",
__func__, *job_id);
delete_job_desc_files(*job_id);
xfree(job_id);
}
}
slurm_mutex_unlock(&purge_thread_lock);
return NULL;
}
static int _acct_update_list_for_each(void *x, void *arg)
{
slurmdb_update_object_t *object = x;
bool locked = false;
switch (object->type) {
case SLURMDB_UPDATE_FEDS:
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fedmgr", NULL, NULL, NULL) < 0){
error("%s: cannot set my name to %s %m",
__func__, "fedmgr");
}
#endif
fed_mgr_update_feds(object);
break;
default:
(void) assoc_mgr_update_object(x, &locked);
}
/* Always delete it */
return 1;
}
static void *_acct_update_thread(void *no_data)
{
slurm_mutex_lock(&slurmctld_config.acct_update_lock);
while (!slurmctld_config.shutdown_time) {
slurm_cond_wait(&slurmctld_config.acct_update_cond,
&slurmctld_config.acct_update_lock);
(void) list_delete_all(slurmctld_config.acct_update_list,
_acct_update_list_for_each,
NULL);
}
slurm_mutex_unlock(&slurmctld_config.acct_update_lock);
return NULL;
}
static void _get_fed_updates(void)
{
list_t *fed_list = NULL;
slurmdb_update_object_t update = {0};
slurmdb_federation_cond_t fed_cond;
slurmdb_init_federation_cond(&fed_cond, 0);
fed_cond.cluster_list = list_create(NULL);
list_append(fed_cond.cluster_list, slurm_conf.cluster_name);
fed_list = acct_storage_g_get_federations(acct_db_conn,
slurm_conf.slurm_user_id,
&fed_cond);
FREE_NULL_LIST(fed_cond.cluster_list);
if (fed_list) {
update.objects = fed_list;
fed_mgr_update_feds(&update);
}
FREE_NULL_LIST(fed_list);
}
static int _foreach_job_running(void *object, void *arg)
{
job_record_t *job_ptr = object;
if (IS_JOB_PENDING(job_ptr)) {
int job_cnt = (job_ptr->array_recs &&
job_ptr->array_recs->task_cnt) ?
job_ptr->array_recs->task_cnt : 1;
slurmctld_diag_stats.jobs_pending += job_cnt;
}
if (IS_JOB_RUNNING(job_ptr))
slurmctld_diag_stats.jobs_running++;
return SLURM_SUCCESS;
}
static void _update_diag_job_state_counts(void)
{
slurmctld_diag_stats.jobs_running = 0;
slurmctld_diag_stats.jobs_pending = 0;
slurmctld_diag_stats.job_states_ts = time(NULL);
list_for_each_ro(job_list, _foreach_job_running, NULL);
}
static void _run_primary_prog(bool primary_on)
{
char *prog_name, *prog_type;
char *argv[2], *sep;
int status = 0;
pid_t cpid;
if (primary_on) {
prog_name = slurm_conf.slurmctld_primary_on_prog;
prog_type = "SlurmctldPrimaryOnProg";
} else {
prog_name = slurm_conf.slurmctld_primary_off_prog;
prog_type = "SlurmctldPrimaryOffProg";
}
if ((prog_name == NULL) || (prog_name[0] == '\0'))
return;
info("%s: Running %s", __func__, prog_type);
sep = strrchr(prog_name, '/');
if (sep)
argv[0] = sep + 1;
else
argv[0] = prog_name;
argv[1] = NULL;
if ((cpid = fork()) < 0) { /* Error */
error("%s fork error: %m", __func__);
return;
}
if (cpid == 0) { /* Child */
closeall(0);
setpgid(0, 0);
execv(prog_name, argv);
_exit(127);
}
waitpid(cpid, &status, 0);
if (status != 0)
error("%s: %s exit status %u:%u", __func__, prog_type,
WEXITSTATUS(status), WTERMSIG(status));
else
info("%s: %s completed successfully", __func__, prog_type);
}
static int _init_dep_job_ptr(void *object, void *arg)
{
depend_spec_t *dep_ptr = object;
dep_ptr->job_ptr = find_job_array_rec(dep_ptr->job_id,
dep_ptr->array_task_id);
return SLURM_SUCCESS;
}
static int _foreach_restore_job_dependencies(void *x, void *arg)
{
job_record_t *job_ptr = x;
if (job_ptr->details && job_ptr->details->depend_list)
list_for_each(job_ptr->details->depend_list,
_init_dep_job_ptr, NULL);
return 0;
}
/*
* Restore dependency job pointers.
*
* test_job_dependency() initializes dep_ptr->job_ptr but in
* case a job's dependency is updated before test_job_dependency() is called,
* dep_ptr->job_ptr needs to be initialized for all jobs so that we can test
* for circular dependencies properly. Otherwise, if slurmctld is restarted,
* then immediately a job dependency is updated before test_job_dependency()
* is called, it is possible to create a circular dependency.
*/
static void _restore_job_dependencies(void)
{
slurmctld_lock_t job_fed_lock = {.job = WRITE_LOCK, .fed = READ_LOCK};
lock_slurmctld(job_fed_lock);
(void) list_for_each(job_list, _foreach_restore_job_dependencies, NULL);
unlock_slurmctld(job_fed_lock);
}
/*
* Respond to request for primary/backup slurmctld status
*/
extern void slurm_rpc_control_status(slurm_msg_t *msg)
{
control_status_msg_t status = {
.backup_inx = backup_inx,
.control_time = control_time,
};
(void) send_msg_response(msg, RESPONSE_CONTROL_STATUS, &status);
}
extern int controller_init_scheduling(bool init_gang)
{
int rc = sched_g_init();
if (rc != SLURM_SUCCESS) {
error("failed to initialize sched plugin");
return rc;
}
main_sched_init();
if (init_gang)
gs_init();
return rc;
}
extern void controller_fini_scheduling(void)
{
(void) sched_g_fini();
main_sched_fini();
if (slurm_conf.preempt_mode & PREEMPT_MODE_GANG)
gs_fini();
}
extern void controller_reconfig_scheduling(void)
{
gs_reconfig();
(void) sched_g_reconfig();
}