blob: 197a51edc5c463e7dfc150103dcdbbb99f0b839d [file] [log] [blame]
/*****************************************************************************\
* agent.c - parallel background communication functions. This is where
* logic could be placed for broadcast communications.
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Copyright (C) SchedMD LLC.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>, et. al.
* Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*****************************************************************************
* Theory of operation:
*
* The functions below permit slurm to initiate parallel tasks as a
* detached thread and let the functions below make sure the work happens.
* For example, when a job's time limit is to be changed slurmctld needs
* to notify the slurmd on every node to which the job was allocated.
* We don't want to hang slurmctld's primary function (the job update RPC)
* to perform this work, so it just initiates an agent to perform the work.
* The agent is passed all details required to perform the work, so it will
* be possible to execute the agent as an pthread, process, or even a daemon
* on some other computer.
*
* The main agent thread creates a separate thread for each node to be
* communicated with up to AGENT_THREAD_COUNT. A special watchdog thread
* sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)
* for more than MessageTimeout seconds.
* The agent responds to slurmctld via a function call or an RPC as required.
* For example, informing slurmctld that some node is not responding.
*
* All the state for each thread is maintained in thd_t struct, which is
* used by the watchdog thread as well as the communication threads.
\*****************************************************************************/
#include "config.h"
#if HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
#include <errno.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/parse_time.h"
#include "src/common/run_command.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/uid.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/select.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/ping_nodes.h"
#include "src/slurmctld/sackd_mgr.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/slurmscriptd.h"
#include "src/slurmctld/state_save.h"
#include "src/stepmgr/srun_comm.h"
#define MAX_RETRIES 100
#define MAX_RPC_PACK_CNT 100
#define RPC_PACK_MAX_AGE 1 /* Rebuild data over 1 seconds old */
#define DUMP_RPC_COUNT 25
#define HOSTLIST_MAX_SIZE 80
#define MAIL_PROG_TIMEOUT 120 /* Timeout in seconds */
#define AGENT_SHUTDOWN_WAIT 3
typedef enum {
DSH_NEW, /* Request not yet started */
DSH_ACTIVE, /* Request in progress */
DSH_DONE, /* Request completed normally */
DSH_NO_RESP, /* Request timed out */
DSH_FAILED, /* Request resulted in error */
DSH_DUP_JOBID /* Request resulted in duplicate job ID error */
} state_t;
typedef struct {
bool work_done; /* assume all threads complete */
int fail_cnt; /* assume no threads failures */
int no_resp_cnt; /* assume all threads respond */
int retry_cnt; /* assume no required retries */
int max_delay;
time_t now;
} thd_complete_t;
typedef struct {
pthread_t thread; /* thread ID */
state_t state; /* thread state */
time_t start_time; /* start time */
time_t end_time; /* end time or delta time
* upon termination */
slurm_addr_t *addr; /* specific addr to send to
* will not do nodelist if set */
hostlist_t *nodelist; /* list of nodes to send to */
char *nodename; /* node to send to */
list_t *ret_list;
} thd_t;
typedef struct {
pthread_mutex_t thread_mutex; /* agent specific mutex */
pthread_cond_t thread_cond; /* agent specific condition */
uint32_t thread_count; /* number of threads records */
uint32_t threads_active; /* currently active threads */
uint16_t retry; /* if set, keep trying */
thd_t *thread_struct; /* thread structures */
bool get_reply; /* flag if reply expected */
uid_t r_uid; /* receiver UID */
slurm_msg_type_t msg_type; /* RPC to be issued */
void **msg_args_pptr; /* RPC data to be used */
uint16_t msg_flags; /* Flags to be added to msg*/
uint16_t protocol_version; /* if set, use this version */
char *tls_cert;
} agent_info_t;
typedef struct {
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific
* mutex */
pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
* condition */
uint32_t *threads_active_ptr; /* currently active thread ptr */
thd_t *thread_struct_ptr; /* thread structures ptr */
bool get_reply; /* flag if reply expected */
uid_t r_uid; /* receiver UID */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args_ptr; /* ptr to RPC data to be used */
uint16_t msg_flags; /* Flags to be added to msg*/
uint16_t protocol_version; /* if set, use this version */
char *tls_cert;
} task_info_t;
typedef struct {
agent_arg_t* agent_arg_ptr; /* The queued request */
time_t first_attempt; /* Time of first check for batch
* launch RPC *only* */
time_t last_attempt; /* Time of last xmit attempt */
} queued_request_t;
typedef struct {
char *user_name;
char *message;
char **environment; /* MailProg environment variables */
} mail_info_t;
static void _agent_defer(void);
static void _agent_retry(int min_wait, bool wait_too);
static int _batch_launch_defer(queued_request_t *queued_req_ptr);
static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr);
static int _signal_defer(queued_request_t *queued_req_ptr);
static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type);
static void _list_delete_retry(void *retry_entry);
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
int no_resp_cnt, int retry_cnt);
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
static void _queue_update_node(char *node_name);
static void _queue_update_srun(slurm_step_id_t *step_id);
static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
int *count, int *spot);
static void *_thread_per_group_rpc(void *args);
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr);
static void *_wdog(void *args);
static mail_info_t *_mail_alloc(void);
static void _mail_free(void *arg);
static void *_mail_proc(void *arg);
static char *_mail_type_str(uint16_t mail_type);
static char **_build_mail_env(job_record_t *job_ptr, uint32_t mail_type);
static pthread_mutex_t defer_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mail_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
static list_t *defer_list = NULL; /* agent_arg_t list for requests
* requiring job write lock */
static list_t *mail_list = NULL; /* pending e-mail requests */
static list_t *retry_list = NULL; /* agent_arg_t list for retry */
static list_t *update_node_list = NULL; /* node list for update */
static pthread_mutex_t update_nodes_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t update_nodes_cond = PTHREAD_COND_INITIALIZER;
static list_t *update_srun_list = NULL;
static pthread_mutex_t update_srun_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t update_srun_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t agent_cnt_cond = PTHREAD_COND_INITIALIZER;
static int agent_cnt = 0;
static int agent_thread_cnt = 0;
static int mail_thread_cnt = 0;
static uint16_t message_timeout = NO_VAL16;
static pthread_mutex_t pending_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t pending_cond = PTHREAD_COND_INITIALIZER;
static int pending_wait_time = NO_VAL16;
static bool pending_mail = false;
static pthread_t pending_thread_tid = 0;
static pthread_t nodes_update_tid = 0;
static pthread_t srun_update_tid = 0;
static bool pending_check_defer = false;
static bool run_scheduler = false;
static uint32_t *rpc_stat_counts = NULL, *rpc_stat_types = NULL;
static uint32_t stat_type_count = 0;
static uint32_t rpc_count = 0;
static uint32_t *rpc_type_list;
static char **rpc_host_list = NULL;
static time_t cache_build_time = 0;
/*
* agent - party responsible for transmitting an common RPC in parallel
* across a set of nodes. Use agent_queue_request() if immediate
* execution is not essential.
* IN pointer to agent_arg_t, which is xfree'd (including hostlist,
* and msg_args) upon completion
* RET always NULL (function format just for use as pthread)
*/
void *agent(void *args)
{
int i, delay;
pthread_t thread_wdog = 0;
agent_arg_t *agent_arg_ptr = args;
agent_info_t *agent_info_ptr = NULL;
thd_t *thread_ptr;
task_info_t *task_specific_ptr;
time_t begin_time;
bool spawn_retry_agent = false;
int rpc_thread_cnt;
static time_t sched_update = 0;
static bool reboot_from_ctld = false;
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "agent", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, "agent");
}
#endif
log_flag(AGENT, "%s: Agent_cnt=%d agent_thread_cnt=%d with msg_type=%s retry_list_size=%d",
__func__, agent_cnt, agent_thread_cnt,
rpc_num2string(agent_arg_ptr->msg_type),
retry_list_size());
slurm_mutex_lock(&agent_cnt_mutex);
if (sched_update != slurm_conf.last_update) {
reboot_from_ctld = false;
if (xstrcasestr(slurm_conf.slurmctld_params,
"reboot_from_controller"))
reboot_from_ctld = true;
sched_update = slurm_conf.last_update;
}
rpc_thread_cnt = 2 + MIN(agent_arg_ptr->node_count, AGENT_THREAD_COUNT);
while (1) {
if (slurmctld_config.shutdown_time ||
((agent_thread_cnt+rpc_thread_cnt) <= MAX_SERVER_THREADS)) {
agent_cnt++;
agent_thread_cnt += rpc_thread_cnt;
break;
} else { /* wait for state change and retry */
slurm_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
}
}
slurm_mutex_unlock(&agent_cnt_mutex);
if (slurmctld_config.shutdown_time)
goto cleanup;
/* basic argument value tests */
begin_time = time(NULL);
if (_valid_agent_arg(agent_arg_ptr))
goto cleanup;
if (reboot_from_ctld &&
(agent_arg_ptr->msg_type == REQUEST_REBOOT_NODES)) {
_reboot_from_ctld(agent_arg_ptr);
goto cleanup;
}
/* initialize the agent data structures */
agent_info_ptr = _make_agent_info(agent_arg_ptr);
thread_ptr = agent_info_ptr->thread_struct;
/* start the watchdog thread */
slurm_thread_create(&thread_wdog, _wdog, agent_info_ptr);
log_flag(AGENT, "%s: New agent thread_count:%d threads_active:%d retry:%c get_reply:%c r_uid:%u msg_type:%s protocol_version:%hu",
__func__, agent_info_ptr->thread_count,
agent_info_ptr->threads_active,
agent_info_ptr->retry ? 'T' : 'F',
agent_info_ptr->get_reply ? 'T' : 'F',
agent_info_ptr->r_uid,
rpc_num2string(agent_arg_ptr->msg_type),
agent_info_ptr->protocol_version);
/* start all the other threads (up to AGENT_THREAD_COUNT active) */
for (i = 0; i < agent_info_ptr->thread_count; i++) {
/* wait until "room" for another thread */
slurm_mutex_lock(&agent_info_ptr->thread_mutex);
while (agent_info_ptr->threads_active >=
AGENT_THREAD_COUNT) {
slurm_cond_wait(&agent_info_ptr->thread_cond,
&agent_info_ptr->thread_mutex);
}
/*
* create thread specific data,
* NOTE: freed from _thread_per_group_rpc()
*/
task_specific_ptr = _make_task_data(agent_info_ptr, i);
slurm_thread_create(&thread_ptr[i].thread,
_thread_per_group_rpc,
task_specific_ptr);
agent_info_ptr->threads_active++;
slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
}
/* Wait for termination of remaining threads */
slurm_thread_join(thread_wdog);
delay = (int) difftime(time(NULL), begin_time);
if (delay > (slurm_conf.msg_timeout * 2)) {
info("agent msg_type=%s ran for %d seconds",
rpc_num2string(agent_arg_ptr->msg_type), delay);
}
slurm_mutex_lock(&agent_info_ptr->thread_mutex);
while (agent_info_ptr->threads_active != 0) {
slurm_cond_wait(&agent_info_ptr->thread_cond,
&agent_info_ptr->thread_mutex);
}
for (i = 0; i < agent_info_ptr->thread_count; i++)
slurm_thread_join(thread_ptr[i].thread);
slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
log_flag(AGENT, "%s: end agent thread_count:%d threads_active:%d retry:%c get_reply:%c msg_type:%s protocol_version:%hu",
__func__, agent_info_ptr->thread_count,
agent_info_ptr->threads_active,
agent_info_ptr->retry ? 'T' : 'F',
agent_info_ptr->get_reply ? 'T' : 'F',
rpc_num2string(agent_arg_ptr->msg_type),
agent_info_ptr->protocol_version);
cleanup:
purge_agent_args(agent_arg_ptr);
if (agent_info_ptr) {
xfree(agent_info_ptr->thread_struct);
xfree(agent_info_ptr);
}
slurm_mutex_lock(&agent_cnt_mutex);
if (agent_cnt > 0) {
agent_cnt--;
} else {
error("agent_cnt underflow");
agent_cnt = 0;
}
if (agent_thread_cnt >= rpc_thread_cnt) {
agent_thread_cnt -= rpc_thread_cnt;
} else {
error("agent_thread_cnt underflow");
agent_thread_cnt = 0;
}
if ((agent_thread_cnt + AGENT_THREAD_COUNT + 2) < MAX_SERVER_THREADS)
spawn_retry_agent = true;
slurm_cond_broadcast(&agent_cnt_cond);
slurm_mutex_unlock(&agent_cnt_mutex);
if (spawn_retry_agent)
agent_trigger(RPC_RETRY_INTERVAL, true, false);
return NULL;
}
/* Basic validity test of agent argument */
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
{
int hostlist_cnt;
xassert(agent_arg_ptr);
xassert(agent_arg_ptr->hostlist);
if (agent_arg_ptr->node_count == 0)
return SLURM_ERROR; /* no messages to be sent */
hostlist_cnt = hostlist_count(agent_arg_ptr->hostlist);
if (agent_arg_ptr->node_count != hostlist_cnt) {
error("%s: node_count RPC different from hosts listed (%d!=%d)",
__func__, agent_arg_ptr->node_count, hostlist_cnt);
return SLURM_ERROR; /* no messages to be sent */
}
if (!agent_arg_ptr->r_uid_set) {
error("%s: r_uid not set for message:%u ",
__func__, agent_arg_ptr->msg_type);
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
{
agent_info_t *agent_info_ptr = NULL;
thd_t *thread_ptr = NULL;
int thr_count = 0;
char *name = NULL;
bool split;
agent_info_ptr = xmalloc(sizeof(agent_info_t));
slurm_mutex_init(&agent_info_ptr->thread_mutex);
slurm_cond_init(&agent_info_ptr->thread_cond, NULL);
agent_info_ptr->thread_count = agent_arg_ptr->node_count;
agent_info_ptr->retry = agent_arg_ptr->retry;
agent_info_ptr->threads_active = 0;
thread_ptr = xcalloc(agent_info_ptr->thread_count, sizeof(thd_t));
agent_info_ptr->thread_struct = thread_ptr;
agent_info_ptr->r_uid = agent_arg_ptr->r_uid;
agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
agent_info_ptr->msg_flags = agent_arg_ptr->msg_flags;
agent_info_ptr->protocol_version = agent_arg_ptr->protocol_version;
agent_info_ptr->tls_cert = agent_arg_ptr->tls_cert;
if (!agent_info_ptr->thread_count)
return agent_info_ptr;
xassert(agent_arg_ptr->node_count ==
hostlist_count(agent_arg_ptr->hostlist));
if ((agent_arg_ptr->msg_type != REQUEST_JOB_NOTIFY) &&
(agent_arg_ptr->msg_type != REQUEST_REBOOT_NODES) &&
(agent_arg_ptr->msg_type != REQUEST_RECONFIGURE) &&
(agent_arg_ptr->msg_type != REQUEST_RECONFIGURE_SACKD) &&
(agent_arg_ptr->msg_type != REQUEST_RECONFIGURE_WITH_CONFIG) &&
(agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) &&
(agent_arg_ptr->msg_type != SRUN_TIMEOUT) &&
(agent_arg_ptr->msg_type != SRUN_NODE_FAIL) &&
(agent_arg_ptr->msg_type != SRUN_REQUEST_SUSPEND) &&
(agent_arg_ptr->msg_type != SRUN_USER_MSG) &&
(agent_arg_ptr->msg_type != SRUN_STEP_MISSING) &&
(agent_arg_ptr->msg_type != SRUN_STEP_SIGNAL) &&
(agent_arg_ptr->msg_type != SRUN_JOB_COMPLETE)) {
/* Sending message to a possibly large number of slurmd.
* Push all message forwarding to slurmd in order to
* offload as much work from slurmctld as possible. */
split = false;
agent_info_ptr->get_reply = true;
} else {
/* Message is going to one node (for srun) or we want
* it to get processed ASAP (SHUTDOWN or RECONFIGURE).
* Send the message directly to each node. */
split = true;
}
if (agent_arg_ptr->addr || !split) {
thread_ptr[0].state = DSH_NEW;
if (agent_arg_ptr->addr) {
name = hostlist_shift(agent_arg_ptr->hostlist);
thread_ptr[0].addr = agent_arg_ptr->addr;
thread_ptr[0].nodename = xstrdup(name);
if (agent_arg_ptr->node_count > 1)
error("%s: you will only be sending this to %s",
__func__, name);
free(name);
log_flag(AGENT, "%s: sending msg_type %s to node %s",
__func__,
rpc_num2string(agent_arg_ptr->msg_type),
thread_ptr[0].nodename);
} else {
thread_ptr[0].nodelist = agent_arg_ptr->hostlist;
thread_ptr[0].addr = NULL;
if (slurm_conf.debug_flags & DEBUG_FLAG_AGENT) {
char *buf;
buf = hostlist_ranged_string_xmalloc(
agent_arg_ptr->hostlist);
debug("%s: sending msg_type %s to nodes %s",
__func__,
rpc_num2string(agent_arg_ptr->msg_type),
buf);
xfree(buf);
}
}
agent_info_ptr->thread_count = 1;
return agent_info_ptr;
}
hostlist_uniq(agent_arg_ptr->hostlist);
while (thr_count < agent_info_ptr->thread_count) {
name = hostlist_shift(agent_arg_ptr->hostlist);
if (!name) {
debug3("no more nodes to send to");
break;
}
thread_ptr[thr_count].state = DSH_NEW;
thread_ptr[thr_count].addr = NULL;
thread_ptr[thr_count].nodename = xstrdup(name);
log_flag(AGENT, "%s: sending msg_type %s to node %s",
__func__, rpc_num2string(agent_arg_ptr->msg_type),
thread_ptr[thr_count].nodename);
free(name);
thr_count++;
}
agent_info_ptr->thread_count = thr_count;
return agent_info_ptr;
}
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
{
task_info_t *task_info_ptr;
task_info_ptr = xmalloc(sizeof(task_info_t));
task_info_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_info_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
task_info_ptr->get_reply = agent_info_ptr->get_reply;
task_info_ptr->r_uid = agent_info_ptr->r_uid;
task_info_ptr->msg_type = agent_info_ptr->msg_type;
task_info_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
task_info_ptr->msg_flags = agent_info_ptr->msg_flags;
task_info_ptr->protocol_version = agent_info_ptr->protocol_version;
task_info_ptr->tls_cert = agent_info_ptr->tls_cert;
return task_info_ptr;
}
static void _update_wdog_state(thd_t *thread_ptr,
state_t *state,
thd_complete_t *thd_comp)
{
switch (*state) {
case DSH_ACTIVE:
thd_comp->work_done = false;
if (thread_ptr->end_time <= thd_comp->now) {
log_flag(AGENT, "%s: agent thread %lu timed out",
__func__, (unsigned long) thread_ptr->thread);
(void) pthread_kill(thread_ptr->thread, SIGUSR1);
thread_ptr->end_time += message_timeout;
}
break;
case DSH_NEW:
thd_comp->work_done = false;
break;
case DSH_DONE:
if (thd_comp->max_delay < (int)thread_ptr->end_time)
thd_comp->max_delay = (int)thread_ptr->end_time;
break;
case DSH_NO_RESP:
thd_comp->no_resp_cnt++;
thd_comp->retry_cnt++;
break;
case DSH_FAILED:
case DSH_DUP_JOBID:
thd_comp->fail_cnt++;
break;
}
}
/*
* _wdog - Watchdog thread. Send SIGUSR1 to threads which have been active
* for too long.
* IN args - pointer to agent_info_t with info on threads to watch
* Sleep between polls with exponential times (from 0.005 to 1.0 second)
*/
static void *_wdog(void *args)
{
bool srun_agent = false, sack_agent = false;
int i;
agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct;
unsigned long usec = 5000;
list_itr_t *itr;
thd_complete_t thd_comp;
ret_data_info_t *ret_data_info = NULL;
if ( (agent_ptr->msg_type == SRUN_JOB_COMPLETE) ||
(agent_ptr->msg_type == SRUN_REQUEST_SUSPEND) ||
(agent_ptr->msg_type == SRUN_STEP_MISSING) ||
(agent_ptr->msg_type == SRUN_STEP_SIGNAL) ||
(agent_ptr->msg_type == SRUN_NODE_FAIL) ||
(agent_ptr->msg_type == SRUN_PING) ||
(agent_ptr->msg_type == SRUN_TIMEOUT) ||
(agent_ptr->msg_type == SRUN_USER_MSG) ||
(agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
(agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) )
srun_agent = true;
if (agent_ptr->msg_type == REQUEST_RECONFIGURE_SACKD)
sack_agent = true;
thd_comp.max_delay = 0;
while (1) {
thd_comp.work_done = true;/* assume all threads complete */
thd_comp.fail_cnt = 0; /* assume no threads failures */
thd_comp.no_resp_cnt = 0; /* assume all threads respond */
thd_comp.retry_cnt = 0; /* assume no required retries */
thd_comp.now = time(NULL);
usleep(usec);
usec = MIN((usec * 2), 1000000);
slurm_mutex_lock(&agent_ptr->thread_mutex);
for (i = 0; i < agent_ptr->thread_count; i++) {
//info("thread name %s",thread_ptr[i].node_name);
if (!thread_ptr[i].ret_list) {
_update_wdog_state(&thread_ptr[i],
&thread_ptr[i].state,
&thd_comp);
} else {
itr = list_iterator_create(
thread_ptr[i].ret_list);
while ((ret_data_info = list_next(itr))) {
_update_wdog_state(&thread_ptr[i],
&ret_data_info->err,
&thd_comp);
}
list_iterator_destroy(itr);
}
}
if (thd_comp.work_done)
break;
slurm_mutex_unlock(&agent_ptr->thread_mutex);
}
if (sack_agent) {
if (thread_ptr[0].state != DSH_DONE)
sackd_mgr_remove_node(thread_ptr[0].nodename);
} else if (srun_agent) {
_notify_slurmctld_jobs(agent_ptr);
} else if (agent_ptr->msg_type != REQUEST_SHUTDOWN) {
_notify_slurmctld_nodes(agent_ptr,
thd_comp.no_resp_cnt,
thd_comp.retry_cnt);
}
for (i = 0; i < agent_ptr->thread_count; i++) {
FREE_NULL_LIST(thread_ptr[i].ret_list);
xfree(thread_ptr[i].nodename);
}
if (thd_comp.max_delay)
log_flag(AGENT, "%s: agent maximum delay %d seconds",
__func__, thd_comp.max_delay);
slurm_mutex_unlock(&agent_ptr->thread_mutex);
return NULL;
}
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
{
slurm_step_id_t step_id = {
.job_id = 0,
.step_id = NO_VAL,
.step_het_comp = NO_VAL,
};
thd_t *thread_ptr = agent_ptr->thread_struct;
if (agent_ptr->msg_type == SRUN_PING) {
srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
step_id.job_id = msg->job_id;
} else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
memcpy(&step_id, &msg->step_id, sizeof(step_id));
} else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
resource_allocation_response_msg_t *msg =
*agent_ptr->msg_args_pptr;
step_id.job_id = msg->job_id;
} else if (agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) {
list_t *het_alloc_list = *agent_ptr->msg_args_pptr;
resource_allocation_response_msg_t *msg;
if (!het_alloc_list || (list_count(het_alloc_list) == 0))
return;
msg = list_peek(het_alloc_list);
step_id.job_id = msg->job_id;
} else if ((agent_ptr->msg_type == SRUN_JOB_COMPLETE) ||
(agent_ptr->msg_type == SRUN_REQUEST_SUSPEND) ||
(agent_ptr->msg_type == SRUN_STEP_MISSING) ||
(agent_ptr->msg_type == SRUN_STEP_SIGNAL) ||
(agent_ptr->msg_type == SRUN_USER_MSG)) {
return; /* no need to note srun response */
} else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
return; /* no need to note srun response */
} else {
error("%s: invalid msg_type %u", __func__, agent_ptr->msg_type);
return;
}
if (thread_ptr[0].state == DSH_DONE)
_queue_update_srun(&step_id);
}
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
int no_resp_cnt, int retry_cnt)
{
list_itr_t *itr = NULL;
ret_data_info_t *ret_data_info = NULL;
state_t state;
int is_ret_list = 1;
/* Locks: Read config, write node */
slurmctld_lock_t node_write_lock =
{ .conf = READ_LOCK, .node = WRITE_LOCK };
thd_t *thread_ptr = agent_ptr->thread_struct;
int i;
bool locked = false;
/* Notify slurmctld of non-responding nodes */
if (no_resp_cnt) {
/* Update node table data for non-responding nodes */
if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
/* Requeue the request */
batch_job_launch_msg_t *launch_msg_ptr =
*agent_ptr->msg_args_pptr;
uint32_t job_id = launch_msg_ptr->job_id;
/* Locks: Write job, write node, read federation */
slurmctld_lock_t job_write_lock =
{ .job = WRITE_LOCK,
.node = WRITE_LOCK,
.fed = READ_LOCK };
lock_slurmctld(job_write_lock);
job_complete(job_id, slurm_conf.slurm_user_id,
true, false, 0);
unlock_slurmctld(job_write_lock);
}
}
if (retry_cnt && agent_ptr->retry)
_queue_agent_retry(agent_ptr, retry_cnt);
/* Update last_response on responding nodes */
for (i = 0; i < agent_ptr->thread_count; i++) {
char *down_msg, **node_names;
slurm_msg_type_t resp_type = RESPONSE_SLURM_RC;
if (!thread_ptr[i].ret_list) {
state = thread_ptr[i].state;
is_ret_list = 0;
goto switch_on_state;
}
is_ret_list = 1;
itr = list_iterator_create(thread_ptr[i].ret_list);
while ((ret_data_info = list_next(itr))) {
state = ret_data_info->err;
switch_on_state:
if (is_ret_list) {
node_names = &ret_data_info->node_name;
resp_type = ret_data_info->type;
} else
node_names = &thread_ptr[i].nodename;
if (!is_ret_list && !(*node_names))
goto finished;
switch (state) {
case DSH_NO_RESP:
if (!locked) {
locked = true;
lock_slurmctld(node_write_lock);
}
node_not_resp(*node_names,
thread_ptr[i].start_time,
resp_type);
break;
case DSH_FAILED:
if (!locked) {
locked = true;
lock_slurmctld(node_write_lock);
}
drain_nodes(*node_names, "Prolog/Epilog failure",
slurm_conf.slurm_user_id);
down_msg = ", set to state DRAIN";
error("Prolog/Epilog failure on nodes %s%s",
*node_names, down_msg);
break;
case DSH_DUP_JOBID:
if (!locked) {
locked = true;
lock_slurmctld(node_write_lock);
}
drain_nodes(*node_names, "Duplicate jobid",
slurm_conf.slurm_user_id);
down_msg = ", set to state DRAIN";
error("Duplicate jobid on nodes %s%s",
*node_names, down_msg);
break;
case DSH_DONE:
/*
* Process now if we've already obtained the
* lock. Otherwise delegate to the dedicated
* processing thread.
*/
if (locked) {
node_did_resp(*node_names);
} else {
_queue_update_node(*node_names);
*node_names = NULL;
}
break;
default:
error("unknown state returned for %s",
*node_names);
break;
}
if (!is_ret_list)
goto finished;
}
list_iterator_destroy(itr);
finished: ;
}
if (locked)
unlock_slurmctld(node_write_lock);
if (run_scheduler) {
run_scheduler = false;
/* below functions all have their own locking */
queue_job_scheduler();
}
if ((agent_ptr->msg_type == REQUEST_PING) ||
(agent_ptr->msg_type == REQUEST_HEALTH_CHECK) ||
(agent_ptr->msg_type == REQUEST_ACCT_GATHER_UPDATE) ||
(agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
ping_end();
}
/* Report a communications error for specified node
* This also gets logged as a non-responsive node */
static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type)
{
int rc = 1;
if ((rc = is_node_resp (node_name)))
verbose("agent/is_node_resp: node:%s RPC:%s : %m",
node_name, rpc_num2string(msg_type));
return rc;
}
/* return a value for which WEXITSTATUS() returns 1 */
static int _wif_status(void)
{
static int rc = 0;
int i;
if (rc)
return rc;
rc = 1;
for (i=0; i<64; i++) {
if (WEXITSTATUS(rc))
return rc;
rc = rc << 1;
}
error("Could not identify WEXITSTATUS");
rc = 1;
return rc;
}
/*
* _thread_per_group_rpc - thread to issue an RPC for a group of nodes
* sending message out to one and forwarding it to
* others if necessary.
* IN/OUT args - pointer to task_info_t, xfree'd on completion
*/
static void *_thread_per_group_rpc(void *args)
{
int rc = SLURM_SUCCESS;
slurm_msg_t msg;
task_info_t *task_ptr = (task_info_t *) args;
/* we cache some pointers from task_info_t because we need
* to xfree args before being finished with their use. xfree
* is required for timely termination of this pthread because
* xfree could lock it at the end, preventing a timely
* thread_exit */
pthread_mutex_t *thread_mutex_ptr = task_ptr->thread_mutex_ptr;
pthread_cond_t *thread_cond_ptr = task_ptr->thread_cond_ptr;
uint32_t *threads_active_ptr = task_ptr->threads_active_ptr;
thd_t *thread_ptr = task_ptr->thread_struct_ptr;
state_t thread_state = DSH_NO_RESP;
slurm_msg_type_t msg_type = task_ptr->msg_type;
bool is_kill_msg, srun_agent, sack_agent;
list_t *ret_list = NULL;
list_itr_t *itr;
ret_data_info_t *ret_data_info = NULL;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
/* Lock: Read node */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
/* Lock: Write node */
slurmctld_lock_t node_write_lock = {
NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
uint32_t job_id;
xassert(args != NULL);
is_kill_msg = ( (msg_type == REQUEST_KILL_TIMELIMIT) ||
(msg_type == REQUEST_KILL_PREEMPTED) ||
(msg_type == REQUEST_TERMINATE_JOB) );
srun_agent = ( (msg_type == SRUN_PING) ||
(msg_type == SRUN_JOB_COMPLETE) ||
(msg_type == SRUN_STEP_MISSING) ||
(msg_type == SRUN_STEP_SIGNAL) ||
(msg_type == SRUN_TIMEOUT) ||
(msg_type == SRUN_USER_MSG) ||
(msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
(msg_type == SRUN_NODE_FAIL) );
sack_agent = (msg_type == REQUEST_RECONFIGURE_SACKD);
thread_ptr->start_time = time(NULL);
slurm_mutex_lock(thread_mutex_ptr);
thread_ptr->state = DSH_ACTIVE;
thread_ptr->end_time = thread_ptr->start_time + message_timeout;
slurm_mutex_unlock(thread_mutex_ptr);
/* send request message */
slurm_msg_t_init(&msg);
if (task_ptr->protocol_version)
msg.protocol_version = task_ptr->protocol_version;
msg.msg_type = msg_type;
msg.data = task_ptr->msg_args_ptr;
slurm_msg_set_r_uid(&msg, task_ptr->r_uid);
msg.flags |= task_ptr->msg_flags;
msg.tls_cert = task_ptr->tls_cert;
task_ptr->tls_cert = NULL;
if (thread_ptr->nodename)
log_flag(AGENT, "%s: sending %s to %s", __func__,
rpc_num2string(msg_type), thread_ptr->nodename);
else if (slurm_conf.debug_flags & DEBUG_FLAG_AGENT) {
char *tmp_str;
tmp_str = hostlist_ranged_string_xmalloc(thread_ptr->nodelist);
debug("%s: sending %s to %s", __func__,
rpc_num2string(msg_type), tmp_str);
xfree(tmp_str);
}
if (task_ptr->get_reply) {
if (thread_ptr->addr) {
msg.address = *thread_ptr->addr;
if (!(ret_list = slurm_send_addr_recv_msgs(
&msg, thread_ptr->nodename, 0))) {
error("%s: no ret_list given", __func__);
goto cleanup;
}
} else if (thread_ptr->nodelist) {
if (!(ret_list = start_msg_tree(thread_ptr->nodelist,
&msg, 0))) {
error("%s: no ret_list given", __func__);
goto cleanup;
}
} else {
if (!(ret_list = slurm_send_recv_msgs(
thread_ptr->nodename, &msg, 0))) {
error("%s: no ret_list given", __func__);
goto cleanup;
}
}
} else {
if (thread_ptr->addr) {
//info("got the address");
msg.address = *thread_ptr->addr;
} else {
//info("no address given");
xassert(thread_ptr->nodename);
if (slurm_conf_get_addr(thread_ptr->nodename,
&msg.address, msg.flags)
== SLURM_ERROR) {
error("%s: can't find address for host %s, check slurm.conf",
__func__, thread_ptr->nodename);
goto cleanup;
}
}
//info("sending %u to %s", msg_type, thread_ptr->nodename);
if ((msg_type == SRUN_JOB_COMPLETE) ||
(msg_type == SRUN_STEP_SIGNAL)) {
/*
* The srun runs as a single thread, while the kernel
* listen() may be queuing messages for further
* processing. If we get our SYN in the listen queue
* at the same time the last MESSAGE_TASK_EXIT is being
* processed, srun may exit meaning this message is
* never received, leading to a series of error
* messages from slurm_send_only_node_msg().
* So, we use this different function that blindly
* flings the message out and disregards any
* communication problems that may arise.
*/
slurm_send_msg_maybe(&msg);
thread_state = DSH_DONE;
} else if (slurm_send_only_node_msg(&msg) == SLURM_SUCCESS) {
thread_state = DSH_DONE;
} else {
if (!sack_agent && !srun_agent) {
lock_slurmctld(node_read_lock);
_comm_err(thread_ptr->nodename, msg_type);
unlock_slurmctld(node_read_lock);
}
}
goto cleanup;
}
//info("got %d messages back", list_count(ret_list));
itr = list_iterator_create(ret_list);
while ((ret_data_info = list_next(itr))) {
rc = slurm_get_return_code(ret_data_info->type,
ret_data_info->data);
/* SPECIAL CASE: Record node's CPU load */
if (ret_data_info->type == RESPONSE_PING_SLURMD) {
ping_slurmd_resp_msg_t *ping_resp;
ping_resp = (ping_slurmd_resp_msg_t *)
ret_data_info->data;
lock_slurmctld(node_write_lock);
reset_node_load(ret_data_info->node_name,
ping_resp->cpu_load);
reset_node_free_mem(ret_data_info->node_name,
ping_resp->free_mem);
unlock_slurmctld(node_write_lock);
}
/* SPECIAL CASE: Mark node as IDLE if job already complete */
if (is_kill_msg &&
(rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
kill_job_msg_t *kill_job;
kill_job = (kill_job_msg_t *)
task_ptr->msg_args_ptr;
rc = SLURM_SUCCESS;
lock_slurmctld(job_write_lock);
if (job_epilog_complete(kill_job->step_id.job_id,
ret_data_info->node_name, rc))
run_scheduler = true;
unlock_slurmctld(job_write_lock);
}
/* SPECIAL CASE: Record node's CPU load */
if (ret_data_info->type == RESPONSE_ACCT_GATHER_UPDATE) {
lock_slurmctld(node_write_lock);
update_node_record_acct_gather_data(
ret_data_info->data);
unlock_slurmctld(node_write_lock);
}
/* SPECIAL CASE: Requeue/hold non-startable batch job,
* Requeue job prolog failure or duplicate job ID */
if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
(rc != SLURM_SUCCESS) && (rc != ESLURMD_PROLOG_FAILED) &&
(rc != ESLURM_DUPLICATE_JOB_ID) &&
(ret_data_info->type != RESPONSE_FORWARD_FAILED)) {
batch_job_launch_msg_t *launch_msg_ptr =
task_ptr->msg_args_ptr;
job_id = launch_msg_ptr->job_id;
info("Killing non-startable batch JobId=%u: %s",
job_id, slurm_strerror(rc));
thread_state = DSH_DONE;
ret_data_info->err = thread_state;
lock_slurmctld(job_write_lock);
job_complete(job_id, slurm_conf.slurm_user_id,
false, false, _wif_status());
unlock_slurmctld(job_write_lock);
continue;
} else if ((msg_type == RESPONSE_RESOURCE_ALLOCATION) &&
(rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
/* Communication issue to srun that launched the job
* Cancel rather than leave a stray-but-empty job
* behind on the allocated nodes. */
resource_allocation_response_msg_t *msg_ptr =
task_ptr->msg_args_ptr;
job_id = msg_ptr->job_id;
info("Killing interactive JobId=%u: %s",
job_id, slurm_strerror(rc));
thread_state = DSH_FAILED;
lock_slurmctld(job_write_lock);
job_complete(job_id, slurm_conf.slurm_user_id,
false, false, _wif_status());
unlock_slurmctld(job_write_lock);
continue;
} else if ((msg_type == RESPONSE_HET_JOB_ALLOCATION) &&
(rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
/* Communication issue to srun that launched the job
* Cancel rather than leave a stray-but-empty job
* behind on the allocated nodes. */
list_t *het_alloc_list = task_ptr->msg_args_ptr;
resource_allocation_response_msg_t *msg_ptr;
if (!het_alloc_list ||
(list_count(het_alloc_list) == 0))
continue;
msg_ptr = list_peek(het_alloc_list);
job_id = msg_ptr->job_id;
info("Killing interactive JobId=%u: %s",
job_id, slurm_strerror(rc));
thread_state = DSH_FAILED;
lock_slurmctld(job_write_lock);
job_complete(job_id, slurm_conf.slurm_user_id,
false, false, _wif_status());
unlock_slurmctld(job_write_lock);
continue;
}
if (msg_type == REQUEST_SIGNAL_TASKS) {
job_record_t *job_ptr;
signal_tasks_msg_t *msg_ptr =
task_ptr->msg_args_ptr;
if ((msg_ptr->signal == SIGCONT) ||
(msg_ptr->signal == SIGSTOP)) {
job_id = msg_ptr->step_id.job_id;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(job_id);
if (job_ptr == NULL) {
info("%s: invalid JobId=%u",
__func__, job_id);
} else if (rc == SLURM_SUCCESS) {
if (msg_ptr->signal == SIGSTOP) {
job_state_set_flag(job_ptr,
JOB_STOPPED);
} else { // SIGCONT
job_state_unset_flag(
job_ptr, JOB_STOPPED);
}
}
if (job_ptr)
job_state_unset_flag(job_ptr,
JOB_SIGNALING);
unlock_slurmctld(job_write_lock);
}
}
if (((msg_type == REQUEST_SIGNAL_TASKS) ||
(msg_type == REQUEST_TERMINATE_TASKS)) &&
(rc == ESRCH)) {
/* process is already dead, not a real error */
rc = SLURM_SUCCESS;
}
switch (rc) {
case SLURM_SUCCESS:
/* debug("agent processed RPC to node %s", */
/* ret_data_info->node_name); */
thread_state = DSH_DONE;
break;
case SLURM_UNKNOWN_FORWARD_ADDR:
error("We were unable to forward message to '%s'. "
"Make sure the slurm.conf for each slurmd "
"contain all other nodes in your system.",
ret_data_info->node_name);
thread_state = DSH_NO_RESP;
break;
case ESLURMD_EPILOG_FAILED:
error("Epilog failure on host %s, "
"setting DOWN",
ret_data_info->node_name);
thread_state = DSH_FAILED;
break;
case ESLURMD_PROLOG_FAILED:
thread_state = DSH_FAILED;
break;
case ESLURM_DUPLICATE_JOB_ID:
thread_state = DSH_DUP_JOBID;
break;
case ESLURM_INVALID_JOB_ID:
/* Not indicative of a real error */
case ESLURMD_STEP_NOTRUNNING:
/* Not indicative of a real error */
log_flag(AGENT, "%s: RPC to node %s failed, job not running",
__func__, ret_data_info->node_name);
thread_state = DSH_DONE;
break;
default:
if (!sack_agent && !srun_agent) {
if (ret_data_info->err)
errno = ret_data_info->err;
else
errno = rc;
lock_slurmctld(node_read_lock);
rc = _comm_err(ret_data_info->node_name,
msg_type);
unlock_slurmctld(node_read_lock);
}
if (sack_agent || srun_agent)
thread_state = DSH_FAILED;
else if (rc || (ret_data_info->type ==
RESPONSE_FORWARD_FAILED))
/* check if a forward failed */
thread_state = DSH_NO_RESP;
else { /* some will fail that don't mean anything went
* bad like a job term request on a job that is
* already finished, we will just exit on those
* cases */
thread_state = DSH_DONE;
}
}
ret_data_info->err = thread_state;
}
list_iterator_destroy(itr);
cleanup:
if (!ret_list && (msg_type == REQUEST_SIGNAL_TASKS)) {
job_record_t *job_ptr;
signal_tasks_msg_t *msg_ptr =
task_ptr->msg_args_ptr;
if ((msg_ptr->signal == SIGCONT) ||
(msg_ptr->signal == SIGSTOP)) {
job_id = msg_ptr->step_id.job_id;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(job_id);
if (job_ptr)
job_state_unset_flag(job_ptr, JOB_SIGNALING);
unlock_slurmctld(job_write_lock);
}
}
xfree(args);
/* handled at end of thread just in case resend is needed */
destroy_forward(&msg.forward);
slurm_mutex_lock(thread_mutex_ptr);
thread_ptr->ret_list = ret_list;
thread_ptr->state = thread_state;
thread_ptr->end_time = (time_t) difftime(time(NULL),
thread_ptr->start_time);
/* Signal completion so another thread can replace us */
(*threads_active_ptr)--;
slurm_cond_signal(thread_cond_ptr);
slurm_mutex_unlock(thread_mutex_ptr);
return NULL;
}
static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
int *count, int *spot)
{
node_record_t *node_ptr;
ret_data_info_t *ret_data_info = NULL;
list_itr_t *itr;
int rc = 0;
itr = list_iterator_create(thread_ptr->ret_list);
while ((ret_data_info = list_next(itr))) {
log_flag(AGENT, "%s: got err of %d",
__func__, ret_data_info->err);
if (ret_data_info->err != DSH_NO_RESP)
continue;
node_ptr = find_node_record(ret_data_info->node_name);
if (node_ptr &&
(IS_NODE_DOWN(node_ptr) ||
IS_NODE_POWERING_DOWN(node_ptr) ||
IS_NODE_POWERED_DOWN(node_ptr))) {
--(*count);
} else if (agent_arg_ptr) {
debug("%s: got the name %s to resend out of %d",
__func__, ret_data_info->node_name, *count);
hostlist_push_host(agent_arg_ptr->hostlist,
ret_data_info->node_name);
++(*spot);
}
if (*spot == *count) {
rc = 1;
break;
}
}
list_iterator_destroy(itr);
return rc;
}
/*
* _queue_agent_retry - Queue any failed RPCs for later replay
* IN agent_info_ptr - pointer to info on completed agent requests
* IN count - number of agent requests which failed, count to requeue
*/
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
{
node_record_t *node_ptr;
agent_arg_t *agent_arg_ptr;
queued_request_t *queued_req_ptr = NULL;
thd_t *thread_ptr = agent_info_ptr->thread_struct;
int i, j;
if (count == 0)
return;
/* build agent argument with just the RPCs to retry */
agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
agent_arg_ptr->node_count = count;
agent_arg_ptr->retry = 1;
agent_arg_ptr->hostlist = hostlist_create(NULL);
agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
*(agent_info_ptr->msg_args_pptr) = NULL;
set_agent_arg_r_uid(agent_arg_ptr, agent_info_ptr->r_uid);
j = 0;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
if (!thread_ptr[i].ret_list) {
if (thread_ptr[i].state != DSH_NO_RESP)
continue;
debug("got the name %s to resend",
thread_ptr[i].nodename);
node_ptr = find_node_record(thread_ptr[i].nodename);
if (node_ptr &&
(IS_NODE_DOWN(node_ptr) ||
IS_NODE_POWERING_DOWN(node_ptr) ||
IS_NODE_POWERED_DOWN(node_ptr))) {
/* Do not re-send RPC to DOWN node */
if (count)
count--;
} else {
hostlist_push_host(agent_arg_ptr->hostlist,
thread_ptr[i].nodename);
j++;
}
if (j == count)
break;
} else {
if (_setup_requeue(agent_arg_ptr, &thread_ptr[i],
&count, &j))
break;
}
}
if (count == 0) {
/* All non-responding nodes are DOWN.
* Do not requeue, but discard this RPC */
hostlist_destroy(agent_arg_ptr->hostlist);
*(agent_info_ptr->msg_args_pptr) = agent_arg_ptr->msg_args;
xfree(agent_arg_ptr);
return;
}
if (count != j) {
error("agent: Retry count (%d) != actual count (%d)",
count, j);
agent_arg_ptr->node_count = j;
}
debug2("Queue RPC msg_type=%s, nodes=%d for retry",
rpc_num2string(agent_arg_ptr->msg_type), j);
/* add the request to a list */
queued_req_ptr = xmalloc(sizeof(queued_request_t));
queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
queued_req_ptr->last_attempt = time(NULL);
slurm_mutex_lock(&retry_mutex);
if (retry_list == NULL)
retry_list = list_create(_list_delete_retry);
list_append(retry_list, queued_req_ptr);
slurm_mutex_unlock(&retry_mutex);
}
/*
* _list_delete_retry - delete an entry from the retry list,
* see common/list.h for documentation
*/
static void _list_delete_retry(void *retry_entry)
{
queued_request_t *queued_req_ptr;
if (! retry_entry)
return;
queued_req_ptr = (queued_request_t *) retry_entry;
purge_agent_args(queued_req_ptr->agent_arg_ptr);
xfree(queued_req_ptr);
}
/* Start a thread to manage queued agent requests */
static void *_agent_init(void *arg)
{
int min_wait;
bool mail_too;
struct timespec ts = {0, 0};
time_t last_defer_attempt = (time_t) 0;
while (true) {
slurm_mutex_lock(&pending_mutex);
while (!slurmctld_config.shutdown_time &&
!pending_mail && !pending_check_defer &&
(pending_wait_time == NO_VAL16)) {
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&pending_cond, &pending_mutex,
&ts);
}
if (slurmctld_config.shutdown_time) {
if (!retry_list_size() ||
(slurmctld_config.shutdown_time +
AGENT_SHUTDOWN_WAIT < time(NULL))) {
slurm_mutex_unlock(&pending_mutex);
break;
}
}
mail_too = pending_mail;
min_wait = pending_wait_time;
pending_mail = false;
pending_wait_time = NO_VAL16;
slurm_mutex_unlock(&pending_mutex);
if ((last_defer_attempt + 2 < last_job_update) ||
pending_check_defer) {
last_defer_attempt = time(NULL);
pending_check_defer = false;
_agent_defer();
}
_agent_retry(min_wait, mail_too);
}
return NULL;
}
static int _foreach_node_did_resp(void *x, void *arg)
{
node_did_resp((char *) x);
return 1;
}
/* Start a thread to manage queued agent requests */
static void *_agent_nodes_update(void *arg)
{
struct timespec ts = {0, 0};
slurmctld_lock_t node_write_lock =
{ .conf = READ_LOCK, .node = WRITE_LOCK };
slurm_mutex_lock(&update_nodes_mutex);
while (true) {
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&update_nodes_cond, &update_nodes_mutex,
&ts);
if (slurmctld_config.shutdown_time) {
slurm_mutex_unlock(&update_nodes_mutex);
break;
}
if (list_count(update_node_list)) {
lock_slurmctld(node_write_lock);
list_delete_all(update_node_list,
_foreach_node_did_resp, NULL);
unlock_slurmctld(node_write_lock);
}
ping_nodes_update();
}
return NULL;
}
static void _queue_update_node(char *node_name)
{
slurm_mutex_lock(&update_nodes_mutex);
if (!update_node_list)
update_node_list = list_create(xfree_ptr);
list_append(update_node_list, node_name);
slurm_mutex_unlock(&update_nodes_mutex);
}
static int _foreach_srun_response(void *x, void *arg)
{
srun_response(x);
return 1;
}
/* Start a thread to manage queued agent requests */
static void *_agent_srun_update(void *arg)
{
struct timespec ts = {0, 0};
slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
slurm_mutex_lock(&update_srun_mutex);
while (true) {
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&update_srun_cond, &update_srun_mutex,
&ts);
if (slurmctld_config.shutdown_time)
break;
if (!list_count(update_srun_list))
continue;
lock_slurmctld(job_write_lock);
list_delete_all(update_srun_list, _foreach_srun_response, NULL);
unlock_slurmctld(job_write_lock);
}
slurm_mutex_unlock(&update_srun_mutex);
return NULL;
}
static void _queue_update_srun(slurm_step_id_t *step_id)
{
slurm_step_id_t *queue_step_id = xmalloc(sizeof(*queue_step_id));
memcpy(queue_step_id, step_id, sizeof(*step_id));
list_append(update_srun_list, queue_step_id);
/*
* This may or may not wake the _agent_srun_update thread.
* But - we intentionally do not want to claim the &update_srun_mutex
* here which would be the only way to ensure it was asleep, as that
* would block us from queuing additional work while it was blocked
* waiting for the job write lock.
*/
slurm_cond_signal(&update_srun_cond);
}
extern void agent_init(void)
{
if (pending_thread_tid) {
error("%s: thread already running", __func__);
return;
}
update_srun_list = list_create(xfree_ptr);
slurm_thread_create(&pending_thread_tid, _agent_init, NULL);
slurm_thread_create(&nodes_update_tid, _agent_nodes_update, NULL);
slurm_thread_create(&srun_update_tid, _agent_srun_update, NULL);
}
extern void agent_fini(void)
{
struct timespec ts = {0, 0};
int rc = 0;
agent_trigger(999, true, true);
slurm_mutex_lock(&update_nodes_mutex);
slurm_cond_broadcast(&update_nodes_cond);
slurm_mutex_unlock(&update_nodes_mutex);
slurm_mutex_lock(&update_srun_mutex);
slurm_cond_broadcast(&update_srun_cond);
slurm_mutex_unlock(&update_srun_mutex);
slurm_thread_join(pending_thread_tid);
slurm_thread_join(nodes_update_tid);
slurm_thread_join(srun_update_tid);
ts.tv_sec = time(NULL) + AGENT_SHUTDOWN_WAIT;
slurm_mutex_lock(&agent_cnt_mutex);
slurm_cond_broadcast(&agent_cnt_cond);
while (agent_thread_cnt) {
rc = pthread_cond_timedwait(&agent_cnt_cond, &agent_cnt_mutex,
&ts);
if (rc == ETIMEDOUT) {
error("%s: left %d agent threads active", __func__,
agent_thread_cnt);
break;
}
}
slurm_mutex_unlock(&agent_cnt_mutex);
FREE_NULL_LIST(update_srun_list);
}
/*
* agent_trigger - Request processing of pending RPCs
* IN min_wait - Minimum wait time between re-issue of a pending RPC
* IN mail_too - Send pending email too, note this performed using a
* fork/waitpid, so it can take longer than just creating a pthread
* to send RPCs
* IN check_defer - force defer_list check
*/
extern void agent_trigger(int min_wait, bool mail_too, bool check_defer)
{
log_flag(AGENT, "%s: pending_wait_time=%d->%d mail_too=%c->%c Agent_cnt=%d agent_thread_cnt=%d retry_list_size=%d",
__func__, pending_wait_time, min_wait,
mail_too ? 'T' : 'F', pending_mail ? 'T' : 'F',
agent_cnt, agent_thread_cnt, retry_list_size());
slurm_mutex_lock(&pending_mutex);
if ((pending_wait_time == NO_VAL16) ||
(pending_wait_time > min_wait))
pending_wait_time = min_wait;
if (mail_too)
pending_mail = mail_too;
if (check_defer)
pending_check_defer = check_defer;
slurm_cond_broadcast(&pending_cond);
slurm_mutex_unlock(&pending_mutex);
}
/* agent_pack_pending_rpc_stats - pack counts of pending RPCs into a buffer */
extern void agent_pack_pending_rpc_stats(buf_t *buffer)
{
time_t now;
int i;
queued_request_t *queued_req_ptr = NULL;
agent_arg_t *agent_arg_ptr = NULL;
list_itr_t *list_iter;
now = time(NULL);
if (difftime(now, cache_build_time) <= RPC_PACK_MAX_AGE)
goto pack_it; /* Send cached data */
cache_build_time = now;
if (rpc_stat_counts) { /* Clear existing data */
stat_type_count = 0;
memset(rpc_stat_counts, 0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
memset(rpc_stat_types, 0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
rpc_count = 0;
/* the other variables need not be cleared */
} else { /* Allocate buffers for data */
stat_type_count = 0;
rpc_stat_counts = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
rpc_stat_types = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
rpc_count = 0;
rpc_host_list = xcalloc(DUMP_RPC_COUNT, sizeof(char *));
for (i = 0; i < DUMP_RPC_COUNT; i++) {
rpc_host_list[i] = xmalloc(HOSTLIST_MAX_SIZE);
}
rpc_type_list = xcalloc(DUMP_RPC_COUNT, sizeof(uint32_t));
}
slurm_mutex_lock(&retry_mutex);
if (retry_list) {
list_iter = list_iterator_create(retry_list);
/* iterate through list, find type slot or make a new one */
while ((queued_req_ptr = list_next(list_iter))) {
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
if (rpc_count < DUMP_RPC_COUNT) {
rpc_type_list[rpc_count] =
agent_arg_ptr->msg_type;
hostlist_ranged_string(agent_arg_ptr->hostlist,
HOSTLIST_MAX_SIZE,
rpc_host_list[rpc_count]);
rpc_count++;
}
for (i = 0; i < MAX_RPC_PACK_CNT; i++) {
if (rpc_stat_types[i] == 0) {
rpc_stat_types[i] =
agent_arg_ptr->msg_type;
stat_type_count++;
} else if (rpc_stat_types[i] !=
agent_arg_ptr->msg_type)
continue;
rpc_stat_counts[i]++;
break;
}
}
list_iterator_destroy(list_iter);
}
slurm_mutex_unlock(&retry_mutex);
pack_it:
pack32_array(rpc_stat_types, stat_type_count, buffer);
pack32_array(rpc_stat_counts, stat_type_count, buffer);
pack32_array(rpc_type_list, rpc_count, buffer);
packstr_array(rpc_host_list, rpc_count, buffer);
}
static void _agent_defer(void)
{
int rc = -1;
queued_request_t *queued_req_ptr = NULL;
agent_arg_t *agent_arg_ptr = NULL;
/* Write lock on jobs */
slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&defer_mutex);
if (defer_list) {
list_t *tmp_list = NULL;
/* first try to find a new (never tried) record */
while ((queued_req_ptr = list_pop(defer_list))) {
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
if (agent_arg_ptr->msg_type ==
REQUEST_BATCH_JOB_LAUNCH)
rc = _batch_launch_defer(queued_req_ptr);
else if (agent_arg_ptr->msg_type ==
REQUEST_SIGNAL_TASKS)
rc = _signal_defer(queued_req_ptr);
else
fatal("%s: Invalid message type (%s)",
__func__,
rpc_num2string(agent_arg_ptr->msg_type));
if (rc == -1) { /* abort request */
purge_agent_args(
queued_req_ptr->agent_arg_ptr);
xfree(queued_req_ptr);
} else if (rc == 0) {
/* ready to process now, move to retry_list */
slurm_mutex_lock(&retry_mutex);
if (!retry_list)
retry_list =
list_create(_list_delete_retry);
list_append(retry_list, queued_req_ptr);
slurm_mutex_unlock(&retry_mutex);
} else if (rc == 1) {
if (!tmp_list)
tmp_list =
list_create(_list_delete_retry);
list_append(tmp_list, (void *)queued_req_ptr);
}
}
if (tmp_list) {
list_transfer(defer_list, tmp_list);
FREE_NULL_LIST(tmp_list);
}
}
slurm_mutex_unlock(&defer_mutex);
unlock_slurmctld(job_write_lock);
}
static int _find_request(void *x, void *key)
{
queued_request_t *queued_req_ptr = x;
double *before = key;
if (!(*before) && queued_req_ptr->last_attempt == 0)
return 1;
else if (queued_req_ptr->last_attempt < *before)
return 1;
return 0;
}
/* Do the work requested by agent_retry (retry pending RPCs).
* This is a separate thread so the job records can be locked */
static void _agent_retry(int min_wait, bool mail_too)
{
time_t now = time(NULL);
queued_request_t *queued_req_ptr = NULL;
agent_arg_t *agent_arg_ptr = NULL;
mail_info_t *mi = NULL;
int list_size = 0, agent_started = 0;
next:
slurm_mutex_lock(&retry_mutex);
if (retry_list && !list_size) {
static time_t last_msg_time = (time_t) 0;
uint32_t msg_type[5] = {0, 0, 0, 0, 0};
int i = 0;
list_size = list_count(retry_list);
if (((list_size > 100) &&
(difftime(now, last_msg_time) > 300)) ||
((list_size > 0) &&
(slurm_conf.debug_flags & DEBUG_FLAG_AGENT))) {
/* Note sizable backlog (retry_list_size()) of work */
list_itr_t *retry_iter;
retry_iter = list_iterator_create(retry_list);
while ((queued_req_ptr = list_next(retry_iter))) {
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
msg_type[i++] = agent_arg_ptr->msg_type;
if (i == 5)
break;
}
list_iterator_destroy(retry_iter);
info(" retry_list retry_list_size:%d msg_type=%s,%s,%s,%s,%s",
list_size, rpc_num2string(msg_type[0]),
rpc_num2string(msg_type[1]),
rpc_num2string(msg_type[2]),
rpc_num2string(msg_type[3]),
rpc_num2string(msg_type[4]));
last_msg_time = now;
}
}
if (get_agent_thread_count() + AGENT_THREAD_COUNT + 2 >
MAX_SERVER_THREADS) {
/* too much work already */
slurm_mutex_unlock(&retry_mutex);
return;
}
if (retry_list) {
/* first try to find a new (never tried) record */
double key = 0;
queued_req_ptr = list_remove_first(retry_list, _find_request,
&key);
}
if (retry_list && (queued_req_ptr == NULL)) {
/* now try to find a requeue request that is
* relatively old */
double before = difftime(now, min_wait);
queued_req_ptr = list_remove_first(retry_list, _find_request,
&before);
}
slurm_mutex_unlock(&retry_mutex);
if (queued_req_ptr) {
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
xfree(queued_req_ptr);
if (agent_arg_ptr) {
debug2("Spawning RPC agent for msg_type %s",
rpc_num2string(agent_arg_ptr->msg_type));
slurm_thread_create_detached(agent, agent_arg_ptr);
agent_started++;
} else
error("agent_retry found record with no agent_args");
if ((list_size > agent_started) && !LOTS_OF_AGENTS) {
log_flag(AGENT, "%s: created %d agent, try to start more",
__func__, agent_started);
goto next;
}
} else if (mail_too) {
slurm_mutex_lock(&agent_cnt_mutex);
slurm_mutex_lock(&mail_mutex);
while (mail_list && (agent_thread_cnt < MAX_SERVER_THREADS) &&
(mail_thread_cnt < MAX_MAIL_THREADS)) {
mi = (mail_info_t *) list_dequeue(mail_list);
if (!mi)
break;
mail_thread_cnt++;
agent_thread_cnt++;
slurm_thread_create_detached(_mail_proc, mi);
}
slurm_mutex_unlock(&mail_mutex);
slurm_mutex_unlock(&agent_cnt_mutex);
}
}
/*
* agent_queue_request - put a new request on the queue for execution or
* execute now if not too busy
* IN agent_arg_ptr - the request to enqueue
*/
void agent_queue_request(agent_arg_t *agent_arg_ptr)
{
queued_request_t *queued_req_ptr = NULL;
if ((AGENT_THREAD_COUNT + 2) >= MAX_SERVER_THREADS)
fatal("AGENT_THREAD_COUNT value is too high relative to MAX_SERVER_THREADS");
if (message_timeout == NO_VAL16) {
message_timeout = MAX(slurm_conf.msg_timeout, 30);
}
if (agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) {
pthread_t agent_thread = 0;
slurm_thread_create(&agent_thread, agent, agent_arg_ptr);
slurm_thread_join(agent_thread);
return;
}
queued_req_ptr = xmalloc(sizeof(queued_request_t));
queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
/* queued_req_ptr->last_attempt = 0; Implicit */
if (((agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
(_batch_launch_defer(queued_req_ptr) != 0)) ||
((agent_arg_ptr->msg_type == REQUEST_SIGNAL_TASKS) &&
(_signal_defer(queued_req_ptr) != 0))) {
slurm_mutex_lock(&defer_mutex);
if (defer_list == NULL)
defer_list = list_create(_list_delete_retry);
list_append(defer_list, (void *)queued_req_ptr);
slurm_mutex_unlock(&defer_mutex);
} else {
slurm_mutex_lock(&retry_mutex);
if (retry_list == NULL)
retry_list = list_create(_list_delete_retry);
list_append(retry_list, (void *)queued_req_ptr);
slurm_mutex_unlock(&retry_mutex);
}
/* now process the request in a separate pthread
* (if we can create another pthread to do so) */
agent_trigger(999, false, false);
}
/* agent_purge - purge all pending RPC requests */
extern void agent_purge(void)
{
int i;
slurm_mutex_lock(&retry_mutex);
FREE_NULL_LIST(retry_list);
slurm_mutex_unlock(&retry_mutex);
slurm_mutex_lock(&defer_mutex);
FREE_NULL_LIST(defer_list);
slurm_mutex_unlock(&defer_mutex);
slurm_mutex_lock(&mail_mutex);
FREE_NULL_LIST(mail_list);
slurm_mutex_unlock(&mail_mutex);
slurm_mutex_lock(&update_nodes_mutex);
FREE_NULL_LIST(update_node_list);
slurm_mutex_unlock(&update_nodes_mutex);
xfree(rpc_stat_counts);
xfree(rpc_stat_types);
xfree(rpc_type_list);
if (rpc_host_list) {
for (i = 0; i < DUMP_RPC_COUNT; i++)
xfree(rpc_host_list[i]);
xfree(rpc_host_list);
}
}
extern int get_agent_count(void)
{
int cnt;
slurm_mutex_lock(&agent_cnt_mutex);
cnt = agent_cnt;
slurm_mutex_unlock(&agent_cnt_mutex);
return cnt;
}
extern int get_agent_thread_count(void)
{
int cnt;
slurm_mutex_lock(&agent_cnt_mutex);
cnt = agent_thread_cnt;
slurm_mutex_unlock(&agent_cnt_mutex);
return cnt;
}
static mail_info_t *_mail_alloc(void)
{
return xmalloc(sizeof(mail_info_t));
}
static void _mail_free(void *arg)
{
mail_info_t *mi = (mail_info_t *) arg;
if (mi) {
xfree(mi->user_name);
xfree(mi->message);
env_array_free(mi->environment);
xfree(mi);
}
}
/*
* Initializes MailProg environment and sets main part of variables, however
* additional variables are set in _set_job_time and _set_job_term_info to avoid
* code duplication.
*/
static char **_build_mail_env(job_record_t *job_ptr, uint32_t mail_type)
{
char **my_env = job_common_env_vars(job_ptr,
((mail_type & MAIL_JOB_END) ||
(mail_type & MAIL_JOB_FAIL)));
setenvf(&my_env, "SLURM_JOB_STATE", "%s",
job_state_string(job_ptr->job_state & JOB_STATE_BASE));
setenvf(&my_env, "SLURM_JOB_MAIL_TYPE", "%s",
_mail_type_str(mail_type));
return my_env;
}
/* process an email request and free the record */
static void *_mail_proc(void *arg)
{
mail_info_t *mi = (mail_info_t *) arg;
int status;
char *result = NULL;
char *argv[5] = {
slurm_conf.mail_prog, "-s", mi->message, mi->user_name, NULL};
status = slurmscriptd_run_mail(slurm_conf.mail_prog, 5, argv,
mi->environment, MAIL_PROG_TIMEOUT,
&result);
if (status)
error("MailProg returned error, it's output was '%s'", result);
else if (result && (strlen(result) > 0))
debug("MailProg output was '%s'.", result);
else
debug2("No output from MailProg, exit code=%d", status);
xfree(result);
_mail_free(mi);
slurm_mutex_lock(&agent_cnt_mutex);
slurm_mutex_lock(&mail_mutex);
if (agent_thread_cnt)
agent_thread_cnt--;
else
error("agent_thread_cnt underflow");
if (mail_thread_cnt)
mail_thread_cnt--;
else
error("mail_thread_cnt underflow");
slurm_mutex_unlock(&mail_mutex);
slurm_mutex_unlock(&agent_cnt_mutex);
return NULL;
}
static char *_mail_type_str(uint16_t mail_type)
{
if (mail_type == MAIL_INVALID_DEPEND)
return "Invalid dependency";
if (mail_type == MAIL_JOB_BEGIN)
return "Began";
if (mail_type == MAIL_JOB_END)
return "Ended";
if (mail_type == MAIL_JOB_FAIL)
return "Failed";
if (mail_type == MAIL_JOB_REQUEUE)
return "Requeued";
if (mail_type == MAIL_JOB_STAGE_OUT)
return "StageOut/Teardown";
if (mail_type == MAIL_JOB_TIME100)
return "Reached time limit";
if (mail_type == MAIL_JOB_TIME90)
return "Reached 90% of time limit";
if (mail_type == MAIL_JOB_TIME80)
return "Reached 80% of time limit";
if (mail_type == MAIL_JOB_TIME50)
return "Reached 50% of time limit";
return "unknown";
}
static void _set_job_time(job_record_t *job_ptr, uint16_t mail_type,
char *buf, int buf_len, char ***env)
{
time_t interval = NO_VAL;
int msg_len;
buf[0] = '\0';
if ((mail_type == MAIL_JOB_BEGIN) && job_ptr->start_time &&
job_ptr->details && job_ptr->details->submit_time) {
interval = job_ptr->start_time - job_ptr->details->submit_time;
snprintf(buf, buf_len, ", Queued time ");
msg_len = 14;
secs2time_str(interval, buf+msg_len, buf_len-msg_len);
setenvf(env, "SLURM_JOB_QUEUED_TIME", "%s", buf+msg_len);
return;
}
if (((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL) ||
(mail_type == MAIL_JOB_REQUEUE)) &&
(job_ptr->start_time && job_ptr->end_time)) {
if (job_ptr->suspend_time) {
interval = job_ptr->end_time - job_ptr->suspend_time;
interval += job_ptr->pre_sus_time;
} else
interval = job_ptr->end_time - job_ptr->start_time;
snprintf(buf, buf_len, ", Run time ");
msg_len = 11;
secs2time_str(interval, buf+msg_len, buf_len-msg_len);
setenvf(env, "SLURM_JOB_RUN_TIME", "%s", buf+msg_len);
return;
}
if (((mail_type == MAIL_JOB_TIME100) ||
(mail_type == MAIL_JOB_TIME90) ||
(mail_type == MAIL_JOB_TIME80) ||
(mail_type == MAIL_JOB_TIME50)) && job_ptr->start_time) {
if (job_ptr->suspend_time) {
interval = time(NULL) - job_ptr->suspend_time;
interval += job_ptr->pre_sus_time;
} else
interval = time(NULL) - job_ptr->start_time;
snprintf(buf, buf_len, ", Run time ");
msg_len = 11;
secs2time_str(interval, buf+msg_len, buf_len-msg_len);
setenvf(env, "SLURM_JOB_RUN_TIME", "%s", buf+msg_len);
return;
}
if ((mail_type == MAIL_JOB_STAGE_OUT) && job_ptr->end_time) {
interval = time(NULL) - job_ptr->end_time;
snprintf(buf, buf_len, " time ");
msg_len = 11;
secs2time_str(interval, buf+msg_len, buf_len-msg_len);
setenvf(env, "SLURM_JOB_STAGE_OUT_TIME", "%s", buf+msg_len);
return;
}
}
static void _set_job_term_info(job_record_t *job_ptr, uint16_t mail_type,
char *buf, int buf_len, char ***env)
{
buf[0] = '\0';
if ((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL)) {
uint16_t base_state;
uint32_t exit_status_min, exit_status_max;
int exit_code_min, exit_code_max;
base_state = job_ptr->job_state & JOB_STATE_BASE;
if (job_ptr->array_recs &&
!(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
/* Summarize array tasks. */
exit_status_min = job_ptr->array_recs->min_exit_code;
exit_status_max = job_ptr->array_recs->max_exit_code;
if (WIFEXITED(exit_status_min) &&
WIFEXITED(exit_status_max)) {
char *state_string;
exit_code_min = WEXITSTATUS(exit_status_min);
exit_code_max = WEXITSTATUS(exit_status_max);
if ((exit_code_min == 0) && (exit_code_max > 0))
state_string = "Mixed";
else {
state_string =
job_state_string(base_state);
}
snprintf(buf, buf_len, ", %s, ExitCode [%d-%d]",
state_string, exit_code_min,
exit_code_max);
setenvf(env, "SLURM_JOB_EXIT_CODE_MIN", "%d",
exit_code_min);
setenvf(env, "SLURM_JOB_EXIT_CODE_MAX", "%d",
exit_code_max);
} else if (WIFSIGNALED(exit_status_max)) {
exit_code_max = WTERMSIG(exit_status_max);
snprintf(buf, buf_len, ", %s, MaxSignal [%d]",
"Mixed", exit_code_max);
setenvf(env, "SLURM_JOB_TERM_SIGNAL_MAX", "%d",
exit_code_max);
} else if (WIFEXITED(exit_status_max)) {
exit_code_max = WEXITSTATUS(exit_status_max);
snprintf(buf, buf_len, ", %s, MaxExitCode [%d]",
"Mixed", exit_code_max);
setenvf(env, "SLURM_JOB_EXIT_CODE_MAX", "%d",
exit_code_max);
} else {
snprintf(buf, buf_len, ", %s",
job_state_string(base_state));
setenvf(env, "SLURM_JOB_EXIT_CODE_MAX", "%s",
"0");
}
if (job_ptr->array_recs->array_flags &
ARRAY_TASK_REQUEUED)
strncat(buf, ", with requeued tasks",
buf_len - strlen(buf) - 1);
} else {
exit_status_max = job_ptr->exit_code;
if (WIFEXITED(exit_status_max)) {
exit_code_max = WEXITSTATUS(exit_status_max);
snprintf(buf, buf_len, ", %s, ExitCode %d",
job_state_string(base_state),
exit_code_max);
setenvf(env, "SLURM_JOB_EXIT_CODE_MAX", "%d",
exit_code_max);
} else {
snprintf(buf, buf_len, ", %s",
job_state_string(base_state));
setenvf(env, "SLURM_JOB_EXIT_CODE_MAX", "%s",
"0");
}
}
} else if (buf_len > 0) {
buf[0] = '\0';
}
}
/*
* mail_job_info - Send e-mail notice of job state change
* IN job_ptr - job identification
* IN state_type - job transition type, see MAIL_JOB in slurm.h
*/
extern void mail_job_info(job_record_t *job_ptr, uint16_t mail_type)
{
char job_time[128], term_msg[128];
mail_info_t *mi;
/*
* Send mail only for first component (leader) of a hetjob,
* not the individual job components.
*/
if (job_ptr->het_job_id && (job_ptr->het_job_offset != 0))
return;
mi = _mail_alloc();
mi->user_name = xstrdup(job_ptr->mail_user);
/* Use job array master record, if available */
if (!(job_ptr->mail_type & MAIL_ARRAY_TASKS) &&
(job_ptr->array_task_id != NO_VAL) && !job_ptr->array_recs) {
job_record_t *master_job_ptr;
master_job_ptr = find_job_record(job_ptr->array_job_id);
if (master_job_ptr && master_job_ptr->array_recs)
job_ptr = master_job_ptr;
}
mi->environment = _build_mail_env(job_ptr, mail_type);
_set_job_time(job_ptr, mail_type, job_time, sizeof(job_time),
&mi->environment);
_set_job_term_info(job_ptr, mail_type, term_msg, sizeof(term_msg),
&mi->environment);
if (job_ptr->array_recs && !(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
mi->message = xstrdup_printf("Slurm Array Summary Job_id=%u_* (%u) Name=%s "
"%s%s",
job_ptr->array_job_id,
job_ptr->job_id, job_ptr->name,
_mail_type_str(mail_type),
term_msg);
} else if (job_ptr->array_task_id != NO_VAL) {
mi->message = xstrdup_printf("Slurm Array Task Job_id=%u_%u (%u) Name=%s "
"%s%s%s",
job_ptr->array_job_id,
job_ptr->array_task_id,
job_ptr->job_id, job_ptr->name,
_mail_type_str(mail_type),
job_time, term_msg);
} else {
mi->message = xstrdup_printf("Slurm Job_id=%u Name=%s %s%s%s",
job_ptr->job_id, job_ptr->name,
_mail_type_str(mail_type),
job_time, term_msg);
}
debug("email msg to %s: %s", mi->user_name, mi->message);
slurm_mutex_lock(&mail_mutex);
if (!mail_list)
mail_list = list_create(_mail_free);
list_enqueue(mail_list, mi);
slurm_mutex_unlock(&mail_mutex);
}
/* Test if a batch launch request should be deferred
* RET -1: abort the request, pending job cancelled
* 0: execute the request now
* 1: defer the request
*/
static int _batch_launch_defer(queued_request_t *queued_req_ptr)
{
agent_arg_t *agent_arg_ptr;
batch_job_launch_msg_t *launch_msg_ptr;
time_t now = time(NULL);
job_record_t *job_ptr;
int nodes_ready = 0, tmp = 0;
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
if (difftime(now, queued_req_ptr->last_attempt) < 10) {
/* Reduce overhead by only testing once every 10 secs */
return 1;
}
launch_msg_ptr = (batch_job_launch_msg_t *)agent_arg_ptr->msg_args;
job_ptr = find_job_record(launch_msg_ptr->job_id);
if ((job_ptr == NULL) ||
(!IS_JOB_RUNNING(job_ptr) && !IS_JOB_SUSPENDED(job_ptr))) {
info("agent(batch_launch): removed pending request for cancelled JobId=%u",
launch_msg_ptr->job_id);
return -1; /* job cancelled while waiting */
}
if (job_ptr->details && job_ptr->details->prolog_running) {
debug2("%s: JobId=%u still waiting on %u prologs",
__func__, job_ptr->job_id,
job_ptr->details->prolog_running);
return 1;
}
if (job_ptr->wait_all_nodes) {
(void) job_node_ready(launch_msg_ptr->job_id, &tmp);
if (tmp ==
(READY_JOB_STATE | READY_NODE_STATE | READY_PROLOG_STATE)) {
nodes_ready = 1;
}
} else {
node_record_t *node_ptr;
char *hostname;
hostname = hostlist_deranged_string_xmalloc(
agent_arg_ptr->hostlist);
node_ptr = find_node_record(hostname);
if (node_ptr == NULL) {
error("agent(batch_launch) removed pending request for JobId=%u, missing node %s",
launch_msg_ptr->job_id, hostname);
xfree(hostname);
return -1; /* invalid request?? */
}
xfree(hostname);
if (!IS_NODE_POWERED_DOWN(node_ptr) &&
!IS_NODE_POWERING_DOWN(node_ptr) &&
!IS_NODE_NO_RESPOND(node_ptr)) {
nodes_ready = 1;
}
}
if ((slurm_conf.prolog_flags & PROLOG_FLAG_DEFER_BATCH) &&
(job_ptr->state_reason == WAIT_PROLOG)) {
if (job_ptr->node_bitmap_pr &&
(slurm_conf.debug_flags &
(DEBUG_FLAG_TRACE_JOBS | DEBUG_FLAG_AGENT))) {
char *tmp_pr;
tmp_pr = bitmap2node_name(job_ptr->node_bitmap_pr);
verbose("%s: JobId=%u still waiting on prologs on %s",
__func__, job_ptr->job_id, tmp_pr);
xfree(tmp_pr);
} else {
debug2("%s: JobId=%u still waiting on node prologs",
__func__, job_ptr->job_id);
}
return 1;
}
if (nodes_ready) {
if (IS_JOB_CONFIGURING(job_ptr))
job_config_fini(job_ptr);
queued_req_ptr->last_attempt = (time_t) 0;
return 0;
}
if (queued_req_ptr->last_attempt == 0) {
queued_req_ptr->first_attempt = now;
queued_req_ptr->last_attempt = now;
} else if (difftime(now, queued_req_ptr->first_attempt) >=
slurm_conf.resume_timeout) {
/* Nodes will get marked DOWN and job requeued, if possible */
error("agent waited too long for nodes to respond, abort launch of JobId=%u",
job_ptr->job_id);
return -1;
}
queued_req_ptr->last_attempt = now;
return 1;
}
/* Test if a job signal request should be deferred
* RET -1: abort the request
* 0: execute the request now
* 1: defer the request
*/
static int _signal_defer(queued_request_t *queued_req_ptr)
{
agent_arg_t *agent_arg_ptr;
signal_tasks_msg_t *signal_msg_ptr;
time_t now = time(NULL);
job_record_t *job_ptr;
agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
signal_msg_ptr = (signal_tasks_msg_t *)agent_arg_ptr->msg_args;
job_ptr = find_job_record(signal_msg_ptr->step_id.job_id);
if (job_ptr == NULL) {
info("agent(signal_task): removed pending request for cancelled JobId=%u",
signal_msg_ptr->step_id.job_id);
return -1; /* job cancelled while waiting */
}
if (job_ptr->state_reason != WAIT_PROLOG)
return 0;
if (queued_req_ptr->first_attempt == 0) {
queued_req_ptr->first_attempt = now;
} else if (difftime(now, queued_req_ptr->first_attempt) >=
(2 * slurm_conf.batch_start_timeout)) {
error("agent waited too long for nodes to respond, abort signal of JobId=%u",
job_ptr->job_id);
return -1;
}
return 1;
}
/* Return length of agent's retry_list */
extern int retry_list_size(void)
{
if (retry_list == NULL)
return 0;
return list_count(retry_list);
}
static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr)
{
char *argv[4], *pname;
uint32_t argc;
int rc, status = 0;
reboot_msg_t *reboot_msg = agent_arg_ptr->msg_args;
if (!agent_arg_ptr->hostlist) {
error("%s: hostlist is NULL", __func__);
return;
}
if ((!slurm_conf.reboot_program) ||
(!slurm_conf.reboot_program[0])) {
error("%s: Requested reboot from slurmctld but RebootProgram is not defined", __func__);
return;
}
pname = strrchr(slurm_conf.reboot_program, '/');
if (pname)
argv[0] = pname + 1;
else
argv[0] = slurm_conf.reboot_program;
argv[1] = hostlist_deranged_string_xmalloc(agent_arg_ptr->hostlist);
if (reboot_msg && reboot_msg->features) {
argc = 4;
argv[2] = reboot_msg->features;
argv[3] = NULL;
} else {
argc = 3;
argv[2] = NULL;
}
status = slurmscriptd_run_reboot(slurm_conf.reboot_program, argc, argv);
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
if (rc != 0) {
error("RebootProgram exit status of %d",
rc);
}
} else if (WIFSIGNALED(status)) {
error("RebootProgram signaled: %s",
strsignal(WTERMSIG(status)));
}
xfree(argv[1]);
}