blob: 529925ee6a71257266770a69c3365a0942695eba [file] [log] [blame]
/****************************************************************************\
* slurmdbd_agent.c - functions to the agent talking to the SlurmDBD
*****************************************************************************
* Copyright (C) SchedMD LLC.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "src/common/slurm_xlator.h"
#include "src/common/fd.h"
#include "src/common/slurmdbd_pack.h"
#include "src/common/xstring.h"
#include "slurmdbd_agent.h"
enum {
MAX_DBD_ACTION_DISCARD,
MAX_DBD_ACTION_EXIT
};
typedef struct {
uint32_t msg_size;
list_t *my_list;
} foreach_get_my_list_t;
persist_conn_t *slurmdbd_conn = NULL;
#define DBD_MAGIC 0xDEAD3219
#define DEBUG_PRINT_MAX_MSG_TYPES 10
#define MAX_DBD_DEFAULT_ACTION MAX_DBD_ACTION_DISCARD
static pthread_mutex_t agent_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER;
static pthread_cond_t shutdown_cond = PTHREAD_COND_INITIALIZER;
static list_t *agent_list = NULL;
static pthread_t agent_tid = 0;
static bool halt_agent = 0;
static time_t slurmdbd_shutdown = 0;
static bool agent_running = 0;
static pthread_mutex_t slurmdbd_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t slurmdbd_cond = PTHREAD_COND_INITIALIZER;
static int max_dbd_msg_action = MAX_DBD_DEFAULT_ACTION;
typedef struct {
list_t *id_rc_list;
int rc;
} rc_msg_t;
extern int jobacct_storage_p_job_heavy(void *db_conn, job_record_t *job_ptr);
static int _sending_script_env(void *x, void *arg)
{
dbd_id_rc_msg_t *id_ptr = x;
job_record_t *job_ptr;
xassert(id_ptr);
if (!(job_ptr = find_job_record(id_ptr->job_id)))
return 0;
xassert(job_ptr);
xassert(job_ptr->details);
if ((slurm_conf.conf_flags & CONF_FLAG_SJS) &&
(id_ptr->flags & JOB_SEND_SCRIPT) &&
job_ptr->details->script_hash)
job_ptr->bit_flags |= JOB_SEND_SCRIPT;
if ((slurm_conf.conf_flags & CONF_FLAG_SJE) &&
(id_ptr->flags & JOB_SEND_ENV) &&
job_ptr->details->env_hash)
job_ptr->bit_flags |= JOB_SEND_ENV;
if (jobacct_storage_p_job_heavy(slurmdbd_conn, job_ptr) ==
SLURM_SUCCESS) {
job_ptr->bit_flags &= ~JOB_SEND_SCRIPT;
job_ptr->bit_flags &= ~JOB_SEND_ENV;
}
return 0;
}
static bool _add_sending_script_env(dbd_id_rc_msg_t *id_ptr, rc_msg_t *rc_msg)
{
xassert(id_ptr);
if (!(id_ptr->flags & (JOB_SEND_SCRIPT | JOB_SEND_ENV)))
return false;
/*
* We are in the agent_lock here, we can't call
* jobacct_storage_p_job_heavy() which will call slurmdbd_agent_send()
* creating deadlock. Add message to a list to handle things later.
*/
if (!rc_msg->id_rc_list)
rc_msg->id_rc_list = list_create(slurmdbd_free_id_rc_msg);
list_append(rc_msg->id_rc_list, id_ptr);
return true;
}
static void _process_id_rc_list(list_t *id_rc_list)
{
slurmctld_lock_t job_write_lock = {
.job = WRITE_LOCK,
};
if (!id_rc_list)
return;
lock_slurmctld(job_write_lock);
(void) list_for_each(id_rc_list, _sending_script_env, NULL);
unlock_slurmctld(job_write_lock);
FREE_NULL_LIST(id_rc_list);
}
static int _unpack_return_code(uint16_t rpc_version, buf_t *buffer,
rc_msg_t *rc_msg)
{
uint16_t msg_type = -1;
persist_rc_msg_t *msg;
dbd_id_rc_msg_t *id_msg;
persist_msg_t resp;
int rc = SLURM_ERROR;
xassert(rc_msg);
memset(&resp, 0, sizeof(persist_msg_t));
if ((rc = unpack_slurmdbd_msg(&resp, slurmdbd_conn->version, buffer))
!= SLURM_SUCCESS) {
error("unpack message error");
return rc;
}
switch (resp.msg_type) {
case DBD_ID_RC:
id_msg = resp.data;
rc = id_msg->return_code;
log_flag(PROTOCOL, "msg_type:DBD_ID_RC return_code:%s JobId=%u db_index=%"PRIu64,
slurm_strerror(rc), id_msg->job_id,
id_msg->db_index);
if (!_add_sending_script_env(id_msg, rc_msg))
slurmdbd_free_id_rc_msg(id_msg);
if (rc != SLURM_SUCCESS)
error("DBD_ID_RC is %d", rc);
break;
case PERSIST_RC:
msg = resp.data;
rc = msg->rc;
log_flag(PROTOCOL, "msg_type:PERSIST_RC return_code:%s ret_info:%hu flags=%#x comment:%s",
slurm_strerror(rc), msg->ret_info,
msg->flags, msg->comment);
if (rc != SLURM_SUCCESS) {
if (msg->ret_info == DBD_REGISTER_CTLD &&
slurm_conf.accounting_storage_enforce) {
error("PERSIST_RC is %d from "
"%s(%u): %s",
rc,
slurmdbd_msg_type_2_str(
msg->ret_info, 1),
msg->ret_info,
msg->comment);
fatal("You need to add this cluster "
"to accounting if you want to "
"enforce associations, or no "
"jobs will ever run.");
} else
debug("PERSIST_RC is %d from "
"%s(%u): %s",
rc,
slurmdbd_msg_type_2_str(
msg->ret_info, 1),
msg->ret_info,
msg->comment);
}
slurm_persist_free_rc_msg(msg);
break;
default:
error("bad message type %s != PERSIST_RC",
slurmdbd_msg_type_2_str(msg_type, true));
}
return rc;
}
static int _get_return_code(rc_msg_t *rc_msg)
{
int rc = SLURM_ERROR;
buf_t *buffer = slurm_persist_recv_msg(slurmdbd_conn);
if (buffer == NULL)
return rc;
rc = _unpack_return_code(slurmdbd_conn->version, buffer, rc_msg);
FREE_NULL_BUFFER(buffer);
return rc;
}
static int _get_return_codes(void *x, void *arg)
{
buf_t *out_buf = x;
rc_msg_t *rc_msg = arg;
buf_t *b;
if ((rc_msg->rc = _unpack_return_code(
slurmdbd_conn->version, out_buf, rc_msg)) !=
SLURM_SUCCESS)
return -1;
if ((b = list_dequeue(agent_list))) {
FREE_NULL_BUFFER(b);
} else {
error("DBD_GOT_MULT_MSG unpack message error");
}
return 0;
}
static int _handle_mult_rc_ret(void)
{
buf_t *buffer;
uint16_t msg_type;
persist_rc_msg_t *msg = NULL;
dbd_list_msg_t *list_msg = NULL;
int rc = SLURM_ERROR;
rc_msg_t rc_msg = { 0 };
buffer = slurm_persist_recv_msg(slurmdbd_conn);
if (buffer == NULL)
return rc;
safe_unpack16(&msg_type, buffer);
switch (msg_type) {
case DBD_GOT_MULT_MSG:
if (slurmdbd_unpack_list_msg(
&list_msg, slurmdbd_conn->version,
DBD_GOT_MULT_MSG, buffer)
!= SLURM_SUCCESS) {
error("unpack message error");
break;
}
slurm_mutex_lock(&agent_lock);
if (agent_list) {
list_for_each(list_msg->my_list, _get_return_codes,
&rc_msg);
}
slurm_mutex_unlock(&agent_lock);
rc = rc_msg.rc;
_process_id_rc_list(rc_msg.id_rc_list);
slurmdbd_free_list_msg(list_msg);
break;
case PERSIST_RC:
if (slurm_persist_unpack_rc_msg(
&msg, buffer, slurmdbd_conn->version)
== SLURM_SUCCESS) {
rc = msg->rc;
if (rc != SLURM_SUCCESS) {
if (msg->ret_info == DBD_REGISTER_CTLD &&
slurm_conf.accounting_storage_enforce) {
error("PERSIST_RC is %d from "
"%s(%u): %s",
rc,
slurmdbd_msg_type_2_str(
msg->ret_info, 1),
msg->ret_info,
msg->comment);
fatal("You need to add this cluster "
"to accounting if you want to "
"enforce associations, or no "
"jobs will ever run.");
} else
debug("PERSIST_RC is %d from "
"%s(%u): %s",
rc,
slurmdbd_msg_type_2_str(
msg->ret_info, 1),
msg->ret_info,
msg->comment);
}
slurm_persist_free_rc_msg(msg);
} else
error("unpack message error");
break;
default:
error("bad message type %s != PERSIST_RC",
slurmdbd_msg_type_2_str(msg_type, true));
}
unpack_error:
FREE_NULL_BUFFER(buffer);
return rc;
}
/****************************************************************************
* Functions for agent to manage queue of pending message for the Slurm DBD
****************************************************************************/
static buf_t *_load_dbd_rec(int fd)
{
ssize_t size, rd_size;
uint32_t msg_size, magic;
char *msg;
buf_t *buffer;
size = sizeof(msg_size);
rd_size = read(fd, &msg_size, size);
if (rd_size == 0)
return NULL;
if (rd_size != size) {
error("state recover error: %m");
return NULL;
}
if (msg_size > MAX_BUF_SIZE) {
error("state recover error, msg_size=%u", msg_size);
return NULL;
}
buffer = init_buf((int) msg_size);
set_buf_offset(buffer, msg_size);
msg = get_buf_data(buffer);
size = msg_size;
while (size) {
rd_size = read(fd, msg, size);
if ((rd_size > 0) && (rd_size <= size)) {
msg += rd_size;
size -= rd_size;
} else if ((rd_size == -1) && (errno == EINTR))
continue;
else {
error("state recover error: %m");
FREE_NULL_BUFFER(buffer);
return NULL;
}
}
size = sizeof(magic);
rd_size = read(fd, &magic, size);
if ((rd_size != size) || (magic != DBD_MAGIC)) {
error("state recover error");
FREE_NULL_BUFFER(buffer);
return NULL;
}
return buffer;
}
static void _load_dbd_state(void)
{
char *dbd_fname = NULL;
buf_t *buffer;
int fd, recovered = 0;
uint16_t rpc_version = 0;
xstrfmtcat(dbd_fname, "%s/dbd.messages", slurm_conf.state_save_location);
fd = open(dbd_fname, O_RDONLY);
if (fd < 0) {
/* don't print an error message if there is no file */
if (errno == ENOENT)
debug4("There is no state save file to "
"open by name %s", dbd_fname);
else
error("Opening state save file %s: %m",
dbd_fname);
} else {
char *ver_str = NULL;
buffer = _load_dbd_rec(fd);
if (buffer == NULL)
goto end_it;
/* This is set to the end of the buffer for send so we
need to set it back to 0 */
set_buf_offset(buffer, 0);
safe_unpackstr(&ver_str, buffer);
debug3("Version string in dbd_state header is %s", ver_str);
unpack_error:
FREE_NULL_BUFFER(buffer);
buffer = NULL;
if (ver_str) {
/* get the version after VER */
rpc_version = slurm_atoul(ver_str + 3);
xfree(ver_str);
}
while (1) {
/* If the buffer was not the VER%d string it
was an actual message so we don't want to
skip it.
*/
if (!buffer)
buffer = _load_dbd_rec(fd);
if (buffer == NULL)
break;
if (rpc_version != SLURM_PROTOCOL_VERSION) {
/* unpack and repack with new
* PROTOCOL_VERSION just so we keep
* things up to date.
*/
persist_msg_t msg = {0};
int rc;
set_buf_offset(buffer, 0);
rc = unpack_slurmdbd_msg(
&msg, rpc_version, buffer);
FREE_NULL_BUFFER(buffer);
if (rc == SLURM_SUCCESS)
buffer = pack_slurmdbd_msg(
&msg, SLURM_PROTOCOL_VERSION);
else
buffer = NULL;
}
if (!buffer) {
error("no buffer given");
continue;
}
list_enqueue(agent_list, buffer);
recovered++;
buffer = NULL;
}
end_it:
verbose("recovered %d pending RPCs", recovered);
(void) close(fd);
}
xfree(dbd_fname);
}
static int _save_dbd_rec(int fd, buf_t *buffer)
{
ssize_t size, wrote;
uint32_t msg_size = get_buf_offset(buffer);
uint32_t magic = DBD_MAGIC;
char *msg = get_buf_data(buffer);
size = sizeof(msg_size);
wrote = write(fd, &msg_size, size);
if (wrote != size) {
error("state save error: %m");
return SLURM_ERROR;
}
wrote = 0;
while (wrote < msg_size) {
wrote = write(fd, msg, msg_size);
if (wrote > 0) {
msg += wrote;
msg_size -= wrote;
} else if ((wrote == -1) && (errno == EINTR))
continue;
else {
error("state save error: %m");
return SLURM_ERROR;
}
}
size = sizeof(magic);
wrote = write(fd, &magic, size);
if (wrote != size) {
error("state save error: %m");
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
static void _save_dbd_state(void)
{
char *dbd_fname = NULL;
buf_t *buffer;
int fd, rc, wrote = 0;
uint16_t msg_type;
uint32_t offset;
xstrfmtcat(dbd_fname, "%s/dbd.messages", slurm_conf.state_save_location);
(void) unlink(dbd_fname); /* clear save state */
fd = open(dbd_fname, O_WRONLY | O_CREAT | O_TRUNC, 0600);
if (fd < 0) {
error("Creating state save file %s", dbd_fname);
} else if (list_count(agent_list)) {
char curr_ver_str[10];
snprintf(curr_ver_str, sizeof(curr_ver_str),
"VER%d", SLURM_PROTOCOL_VERSION);
buffer = init_buf(strlen(curr_ver_str));
packstr(curr_ver_str, buffer);
rc = _save_dbd_rec(fd, buffer);
FREE_NULL_BUFFER(buffer);
if (rc != SLURM_SUCCESS)
goto end_it;
while ((buffer = list_dequeue(agent_list))) {
/*
* We do not want to store registration messages. If an
* admin puts in an incorrect cluster name we can get a
* deadlock unless they add the bogus cluster name to
* the accounting system.
*/
offset = get_buf_offset(buffer);
if (offset < 2) {
FREE_NULL_BUFFER(buffer);
continue;
}
set_buf_offset(buffer, 0);
(void) unpack16(&msg_type, buffer); /* checked by offset */
set_buf_offset(buffer, offset);
if (msg_type == DBD_REGISTER_CTLD) {
FREE_NULL_BUFFER(buffer);
continue;
}
rc = _save_dbd_rec(fd, buffer);
FREE_NULL_BUFFER(buffer);
if (rc != SLURM_SUCCESS)
break;
wrote++;
}
}
end_it:
if (fd >= 0) {
verbose("saved %d pending RPCs", wrote);
rc = fsync_and_close(fd, "dbd.messages");
if (rc)
error("error from fsync_and_close");
}
xfree(dbd_fname);
}
/*
* Purge queued records from the agent queue
*/
static int _purge_agent_list_req(void *x, void *arg)
{
uint16_t msg_type;
uint32_t offset;
buf_t *buffer = x;
uint16_t purge_type = *(uint16_t *)arg;
offset = get_buf_offset(buffer);
if (offset < 2)
return 0;
set_buf_offset(buffer, 0);
(void) unpack16(&msg_type, buffer); /* checked by offset */
set_buf_offset(buffer, offset);
switch (purge_type) {
case DBD_STEP_START:
if ((msg_type == DBD_STEP_START) ||
(msg_type == DBD_STEP_COMPLETE))
return 1;
break;
case DBD_JOB_START:
if (msg_type == DBD_JOB_START)
return 1;
break;
default:
error("unknown purge type %d", purge_type);
break;
}
return 0;
}
static void _max_dbd_msg_action(uint32_t *msg_cnt)
{
int purged = 0;
if (max_dbd_msg_action == MAX_DBD_ACTION_EXIT) {
if (*msg_cnt < slurm_conf.max_dbd_msgs)
return;
_save_dbd_state();
fatal("agent queue is full (%u), not continuing until slurmdbd is able to process messages.",
*msg_cnt);
}
/* MAX_DBD_ACTION_DISCARD */
if (*msg_cnt >= (slurm_conf.max_dbd_msgs - 1)) {
uint16_t purge_type = DBD_STEP_START;
purged = list_delete_all(agent_list, _purge_agent_list_req,
&purge_type);
*msg_cnt -= purged;
info("purge %d step records", purged);
}
}
static int _print_agent_list_msg_type(void *x, void *arg)
{
buf_t *buffer = (buf_t *) x;
char *mlist = (char *) arg;
uint16_t msg_type;
uint32_t offset = get_buf_offset(buffer);
if (offset < 2)
return SLURM_ERROR;
set_buf_offset(buffer, 0);
(void) unpack16(&msg_type, buffer); /* checked by offset */
set_buf_offset(buffer, offset);
xstrfmtcat(mlist, "%s%s", (mlist[0] ? ", " : ""),
slurmdbd_msg_type_2_str(msg_type, 1));
return SLURM_SUCCESS;
}
/*
* Prints an info line listing msg types of the dbd agent list
*/
static void _print_agent_list_msg_types(void)
{
/* pre-allocate a large enough buffer to handle most lists */
char *mlist = xmalloc(2048);
int processed, max_msgs = DEBUG_PRINT_MAX_MSG_TYPES;
if ((processed = list_for_each_max(agent_list, &max_msgs,
_print_agent_list_msg_type,
mlist, true, true)) < 0) {
error("unable to create msg type list");
xfree(mlist);
return;
}
/* append "..." to indicate there are further unprinted messages */
if (max_msgs)
xstrcat(mlist, ", ...");
info("slurmdbd agent_count=%d msg_types_agent_list:%s",
(processed + max_msgs), mlist);
xfree(mlist);
}
static int _get_my_list(void *x, void *arg)
{
buf_t *buffer = x;
foreach_get_my_list_t *args = arg;
args->msg_size += size_buf(buffer);
if (args->msg_size > MAX_MSG_SIZE)
return -1;
list_enqueue(args->my_list, buffer);
return 0;
}
static void *_agent(void *x)
{
int rc;
uint32_t cnt;
buf_t *buffer;
struct timespec abs_time;
static time_t fail_time = 0;
persist_msg_t list_req = {0};
dbd_list_msg_t list_msg;
DEF_TIMERS;
slurm_mutex_lock(&agent_lock);
agent_running = true;
slurm_mutex_unlock(&agent_lock);
list_req.msg_type = DBD_SEND_MULT_MSG;
list_req.conn = slurmdbd_conn;
list_req.data = &list_msg;
memset(&list_msg, 0, sizeof(dbd_list_msg_t));
log_flag(DBD_AGENT, "slurmdbd agent_count=%d with msg_type=%s",
list_count(agent_list),
slurmdbd_msg_type_2_str(list_req.msg_type, 1));
while (*slurmdbd_conn->shutdown == 0) {
slurm_mutex_lock(&slurmdbd_lock);
if (halt_agent) {
log_flag(DBD_AGENT, "slurmdbd agent halt with agent_count=%d",
list_count(agent_list));
slurm_cond_wait(&slurmdbd_cond, &slurmdbd_lock);
}
START_TIMER;
if (!slurmdbd_conn->tls_conn &&
(difftime(time(NULL), fail_time) >= 10)) {
/* The connection to Slurm DBD is not open */
dbd_conn_check_and_reopen(slurmdbd_conn);
if (!slurmdbd_conn->tls_conn) {
fail_time = time(NULL);
log_flag(DBD_AGENT, "slurmdbd disconnected with agent_count=%d",
list_count(agent_list));
}
}
slurm_mutex_lock(&agent_lock);
cnt = list_count(agent_list);
if ((cnt == 0) || !slurmdbd_conn->tls_conn ||
(fail_time && (difftime(time(NULL), fail_time) < 10))) {
slurm_mutex_unlock(&slurmdbd_lock);
_max_dbd_msg_action(&cnt);
END_TIMER2("slurmdbd agent: sleep");
abs_time.tv_sec = time(NULL) + 10;
abs_time.tv_nsec = 0;
if (*slurmdbd_conn->shutdown != 0) {
slurm_mutex_unlock(&agent_lock);
break;
}
log_flag(AGENT, "slurmdbd agent sleeping with agent_count=%d",
list_count(agent_list));
slurm_cond_timedwait(&agent_cond, &agent_lock,
&abs_time);
slurm_mutex_unlock(&agent_lock);
continue;
} else if (((cnt > 0) && ((cnt % 100) == 0)) ||
(slurm_conf.debug_flags & DEBUG_FLAG_DBD_AGENT))
info("agent_count:%d", cnt);
/* Leave item on the queue until processing complete */
if (agent_list) {
if (cnt > 1) {
int max_rpcs = 1000;
foreach_get_my_list_t args = {
.msg_size = sizeof(list_req),
.my_list = list_create(NULL),
};
list_msg.my_list = args.my_list;
list_for_each_max(agent_list, &max_rpcs,
_get_my_list, &args, 1, true);
buffer = pack_slurmdbd_msg(
&list_req, SLURM_PROTOCOL_VERSION);
} else
buffer = list_peek(agent_list);
} else
buffer = NULL;
slurm_mutex_unlock(&agent_lock);
if (buffer == NULL) {
slurm_mutex_unlock(&slurmdbd_lock);
slurm_mutex_lock(&assoc_cache_mutex);
if (slurmdbd_conn->tls_conn &&
(running_cache != RUNNING_CACHE_STATE_NOTRUNNING))
slurm_cond_signal(&assoc_cache_cond);
slurm_mutex_unlock(&assoc_cache_mutex);
END_TIMER2("slurmdbd agent: empty buffer");
continue;
}
/* NOTE: agent_lock is clear here, so we can add more
* requests to the queue while waiting for this RPC to
* complete. */
rc = slurm_persist_send_msg(slurmdbd_conn, buffer);
if (rc != SLURM_SUCCESS) {
if (*slurmdbd_conn->shutdown) {
slurm_mutex_unlock(&slurmdbd_lock);
END_TIMER2("slurmdbd agent: shutdown");
break;
}
error("Failure sending message: %d: %m", rc);
} else if (list_msg.my_list) {
rc = _handle_mult_rc_ret();
} else {
rc_msg_t rc_msg = { 0 };
rc = _get_return_code(&rc_msg);
_process_id_rc_list(rc_msg.id_rc_list);
if (rc == EAGAIN) {
if (*slurmdbd_conn->shutdown) {
slurm_mutex_unlock(&slurmdbd_lock);
END_TIMER2("slurmdbd agent: EAGAIN on shutdown");
break;
}
error("Failure with "
"message need to resend: %d: %m", rc);
}
}
slurm_mutex_unlock(&slurmdbd_lock);
slurm_mutex_lock(&assoc_cache_mutex);
if (slurmdbd_conn->tls_conn &&
(running_cache != RUNNING_CACHE_STATE_NOTRUNNING))
slurm_cond_signal(&assoc_cache_cond);
slurm_mutex_unlock(&assoc_cache_mutex);
slurm_mutex_lock(&agent_lock);
if (agent_list && (rc == SLURM_SUCCESS)) {
/*
* If we sent a mult_msg we just need to free buffer,
* we don't need to requeue, just mark list_msg.my_list
* as NULL as that is the sign we sent a mult_msg.
*/
if (list_msg.my_list) {
if (list_msg.my_list != agent_list)
FREE_NULL_LIST(list_msg.my_list);
list_msg.my_list = NULL;
} else
buffer = list_dequeue(agent_list);
FREE_NULL_BUFFER(buffer);
fail_time = 0;
} else {
/* We need to free a mult_msg even on failure */
if (list_msg.my_list) {
if (list_msg.my_list != agent_list)
FREE_NULL_LIST(list_msg.my_list);
list_msg.my_list = NULL;
FREE_NULL_BUFFER(buffer);
}
fail_time = time(NULL);
if (slurm_conf.debug_flags & DEBUG_FLAG_DBD_AGENT) {
info("slurmdbd agent failed with rc:%d",
rc);
_print_agent_list_msg_types();
}
}
slurm_mutex_unlock(&agent_lock);
END_TIMER2("slurmdbd agent: full loop");
}
slurm_mutex_lock(&agent_lock);
_save_dbd_state();
log_flag(AGENT, "slurmdbd agent ending with agent_count=%d",
list_count(agent_list));
FREE_NULL_LIST(agent_list);
agent_running = false;
slurm_cond_signal(&shutdown_cond);
slurm_mutex_unlock(&agent_lock);
return NULL;
}
static void _create_agent(void)
{
xassert(running_in_slurmctld());
/* this needs to be set because the agent thread will do
nothing if the connection was closed and then opened again */
slurmdbd_shutdown = 0;
if (agent_list == NULL) {
agent_list = list_create(slurmdbd_free_buffer);
_load_dbd_state();
}
if (agent_tid == 0) {
slurm_thread_create(&agent_tid, _agent, NULL);
}
}
static void _shutdown_agent(void)
{
if (!agent_tid)
return;
slurmdbd_shutdown = time(NULL);
slurm_mutex_lock(&agent_lock);
if (agent_running)
slurm_cond_broadcast(&agent_cond);
slurm_mutex_unlock(&agent_lock);
slurm_thread_join(agent_tid);
}
/****************************************************************************
* Socket open/close/read/write functions
****************************************************************************/
extern void slurmdbd_agent_set_conn(persist_conn_t *pc)
{
if (!running_in_slurmctld())
return;
slurm_mutex_lock(&slurmdbd_lock);
slurmdbd_conn = pc;
slurmdbd_shutdown = 0;
slurmdbd_conn->shutdown = &slurmdbd_shutdown;
slurm_mutex_unlock(&slurmdbd_lock);
slurm_mutex_lock(&agent_lock);
if ((agent_tid == 0) || (agent_list == NULL))
_create_agent();
else if (agent_list)
_load_dbd_state();
slurm_mutex_unlock(&agent_lock);
}
extern void slurmdbd_agent_rem_conn(void)
{
if (!running_in_slurmctld())
return;
_shutdown_agent();
slurm_mutex_lock(&slurmdbd_lock);
slurmdbd_conn = NULL;
slurm_mutex_unlock(&slurmdbd_lock);
}
extern int slurmdbd_agent_send_recv(uint16_t rpc_version,
persist_msg_t *req,
persist_msg_t *resp)
{
int rc = SLURM_SUCCESS;
xassert(req);
xassert(resp);
/*
* To make sure we can get this to send instead of the agent
* sending stuff that can happen anytime we set halt_agent and
* then after we get into the mutex we unset.
*/
halt_agent = 1;
slurm_mutex_lock(&slurmdbd_lock);
halt_agent = 0;
if (!slurmdbd_conn) {
slurm_cond_signal(&slurmdbd_cond);
slurm_mutex_unlock(&slurmdbd_lock);
return ESLURM_DB_CONNECTION_INVALID;
}
if (req->conn && (req->conn != slurmdbd_conn))
error("We are overriding the connection!!!!!");
req->conn = slurmdbd_conn;
rc = dbd_conn_send_recv_direct(rpc_version, req, resp);
slurm_cond_signal(&slurmdbd_cond);
slurm_mutex_unlock(&slurmdbd_lock);
return rc;
}
/* Send an RPC to the SlurmDBD. Do not wait for the reply. The RPC
* will be queued and processed later if the SlurmDBD is not responding.
*
* Returns SLURM_SUCCESS or an error code */
extern int slurmdbd_agent_send(uint16_t rpc_version, persist_msg_t *req)
{
buf_t *buffer;
uint32_t cnt, rc = SLURM_SUCCESS;
static time_t syslog_time = 0;
xassert(running_in_slurmctld());
xassert(slurm_conf.max_dbd_msgs);
log_flag(PROTOCOL, "msg_type:%s protocol_version:%hu agent_count:%d",
slurmdbd_msg_type_2_str(req->msg_type, 1),
rpc_version, list_count(agent_list));
buffer = slurm_persist_msg_pack(
slurmdbd_conn, (persist_msg_t *)req);
if (!buffer) /* pack error */
return SLURM_ERROR;
slurm_mutex_lock(&agent_lock);
if ((agent_tid == 0) || (agent_list == NULL)) {
_create_agent();
if ((agent_tid == 0) || (agent_list == NULL)) {
slurm_mutex_unlock(&agent_lock);
FREE_NULL_BUFFER(buffer);
return SLURM_ERROR;
}
}
cnt = list_count(agent_list);
if ((cnt >= (slurm_conf.max_dbd_msgs / 2)) &&
(difftime(time(NULL), syslog_time) > 120)) {
/* Record critical error every 120 seconds */
syslog_time = time(NULL);
error("agent queue filling (%u), MaxDBDMsgs=%u, RESTART SLURMDBD NOW",
cnt, slurm_conf.max_dbd_msgs);
syslog(LOG_CRIT, "*** RESTART SLURMDBD NOW ***");
(slurmdbd_conn->trigger_callbacks.dbd_fail)();
}
/* Handle action */
_max_dbd_msg_action(&cnt);
if (cnt < slurm_conf.max_dbd_msgs) {
list_enqueue(agent_list, buffer);
} else {
error("agent queue is full (%u), discarding %s:%u request",
cnt,
slurmdbd_msg_type_2_str(req->msg_type, 1),
req->msg_type);
(slurmdbd_conn->trigger_callbacks.acct_full)();
FREE_NULL_BUFFER(buffer);
rc = SLURM_ERROR;
}
slurm_cond_broadcast(&agent_cond);
slurm_mutex_unlock(&agent_lock);
return rc;
}
/* Return true if connection to slurmdbd is active, false otherwise. */
extern bool slurmdbd_conn_active(void)
{
if (!slurmdbd_conn || !slurmdbd_conn->tls_conn)
return false;
return true;
}
extern int slurmdbd_agent_queue_count(void)
{
return list_count(agent_list);
}
extern void slurmdbd_agent_config_setup(void)
{
char *tmp_ptr;
/*
* Whatever our max job count is multiplied by 2 plus node count
* multiplied by 4 or DEFAULT_MAX_DBD_MSGS which ever is bigger.
*/
if (!slurm_conf.max_dbd_msgs)
slurm_conf.max_dbd_msgs =
MAX(DEFAULT_MAX_DBD_MSGS,
((slurm_conf.max_job_cnt * 2) +
(node_record_count * 4)));
/* 0123456789012345678 */
if ((tmp_ptr = xstrcasestr(slurm_conf.slurmctld_params,
"max_dbd_msg_action="))) {
char *type = xstrdup(tmp_ptr + 19);
tmp_ptr = strchr(type, ',');
if (tmp_ptr)
tmp_ptr[0] = '\0';
if (!xstrcasecmp(type, "discard"))
max_dbd_msg_action = MAX_DBD_ACTION_DISCARD;
else if (!xstrcasecmp(type, "exit"))
max_dbd_msg_action = MAX_DBD_ACTION_EXIT;
else
fatal("Unknown SlurmctldParameters option for max_dbd_msg_action '%s'",
type);
xfree(type);
} else
max_dbd_msg_action = MAX_DBD_DEFAULT_ACTION;
}