blob: 1b5e9172fcb4a1056414ec869d80c1eaea44be62 [file] [log] [blame]
/*****************************************************************************\
* rpc_queue.c
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <inttypes.h>
#if HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
#include "src/common/data.h"
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/conn.h"
#include "src/interfaces/serializer.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/state_save.h"
bool enabled = true;
static void *_rpc_queue_worker(void *arg)
{
slurmctld_rpc_t *q = (slurmctld_rpc_t *) arg;
int processed = 0;
long processed_usec = 0;
#if HAVE_SYS_PRCTL_H
char *name = xstrdup_printf("rpcq-%u", q->msg_type);
if (prctl(PR_SET_NAME, name, NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, "sstate");
}
xfree(name);
#endif
/*
* Acquire on init to simplify the inner loop.
* On rpc_queue_init() this will proceed directly to slurm_cond_wait().
*/
lock_slurmctld(q->locks);
/*
* Process as many queued messages as possible in one slurmctld_lock()
* acquisition, then fall back to sleep until additional work is queued.
*/
while (true) {
slurm_msg_t *msg = NULL;
bool highload = false;
long sleep_usec = 0;
/* apply per-cycle rate limiting, if configured */
if ((q->max_per_cycle && (processed == q->max_per_cycle)) ||
(q->max_usec_per_cycle &&
(processed_usec >= q->max_usec_per_cycle)))
highload = true;
else
msg = list_dequeue(q->work);
if (!msg) {
unlock_slurmctld(q->locks);
if (processed && q->post_func)
q->post_func();
if (processed) {
slurm_mutex_lock(&q->mutex);
q->cycle_last = processed;
if (processed > q->cycle_max)
q->cycle_max = processed;
record_rpc_queue_stats(q);
slurm_mutex_unlock(&q->mutex);
}
/*
* Use yield_sleep if there's more work to be done,
* otherwise interval if set, otherwise 500 usec.
*/
if (highload && (q->yield_sleep > 0))
sleep_usec = q->yield_sleep;
else if (q->interval > 0)
sleep_usec = q->interval;
else
sleep_usec = 500;
/*
* Rate limit RPC processing. Ensure that when we
* stop processing we don't immediately start again
* by inserting a slight delay.
*
* This encourages additional RPCs to accumulate,
* which is desirable as it lowers pressure on the
* slurmctld locks.
*
* This extends the race described below, but this
* is handled properly.
*/
log_flag(PROTOCOL, "%s(%s): sleeping %ld usec after processing %d/%u msgs (processed_usec=%ld/%d)",
__func__, q->msg_name, sleep_usec,
processed, q->max_per_cycle,
processed_usec, q->max_usec_per_cycle);
processed = 0;
processed_usec = 0;
usleep(sleep_usec);
slurm_mutex_lock(&q->mutex);
if (q->shutdown) {
log_flag(PROTOCOL, "%s(%s): shutting down",
__func__, q->msg_name);
slurm_mutex_unlock(&q->mutex);
return NULL;
}
/*
* Verify list is empty. Since list_dequeue() above is
* called without the mutex held, there is a race with
* rpc_enqueue() that this check will solve.
*/
if (!list_count(q->work))
slurm_cond_wait(&q->cond, &q->mutex);
slurm_mutex_unlock(&q->mutex);
log_flag(PROTOCOL, "%s(%s): woke up",
__func__, q->msg_name);
lock_slurmctld(q->locks);
} else {
DEF_TIMERS;
START_TIMER;
if (q->max_queued) {
slurm_mutex_lock(&q->mutex);
q->queued--;
record_rpc_queue_stats(q);
slurm_mutex_unlock(&q->mutex);
}
msg->flags |= CTLD_QUEUE_PROCESSING;
q->func(msg);
conn_g_destroy(msg->tls_conn, true);
msg->tls_conn = NULL;
END_TIMER;
record_rpc_stats(msg, DELTA_TIMER);
slurm_free_msg(msg);
processed++;
processed_usec += DELTA_TIMER;
}
}
return NULL;
}
static data_t *_load_config(void)
{
char *file = get_extra_conf_path("rpc_queue.yaml");
buf_t *buf = create_mmap_buf(file);
data_t *conf = NULL;
if (!buf) {
debug("%s: could not load %s, ignoring", __func__, file);
xfree(file);
return NULL;
}
serializer_required(MIME_TYPE_YAML);
if (serialize_g_string_to_data(&conf, buf->head, buf->size,
MIME_TYPE_YAML))
fatal("Failed to decode %s", file);
FREE_NULL_BUFFER(buf);
xfree(file);
if (data_get_type(conf) != DATA_TYPE_DICT)
fatal("%s: Unexpected root of rpc_queue.yaml is %s when dictionary expected",
__func__, data_get_type_string(conf));
return conf;
}
static bool _find_msg_name(const data_t *data, void *needle)
{
const data_t *type = NULL;
if (data_get_type(data) != DATA_TYPE_DICT)
return false;
type = data_key_get_const(data, "type");
if (data_get_type(type) != DATA_TYPE_STRING)
return false;
return !xstrcasecmp(data_get_string(type), needle);
}
static void _apply_config(data_t *conf, slurmctld_rpc_t *q)
{
data_t *rpc_queue = NULL, *settings = NULL, *field = NULL;
int64_t int64_tmp;
if (!conf || !q)
return;
rpc_queue = data_key_get(conf, "rpc_queue");
if (data_get_type(rpc_queue) != DATA_TYPE_LIST)
return;
if (!(settings = data_list_find_first(rpc_queue, _find_msg_name,
(void *) q->msg_name)))
return;
if ((field = data_key_get(settings, "disabled"))) {
bool disabled = false;
if (!data_get_bool_converted(field, &disabled)) {
q->queue_enabled = false;
return;
}
}
if ((field = data_key_get(settings, "rl_exempt")))
(void) data_get_bool_converted(field, &q->rl_exempt);
if ((field = data_key_get(settings, "hard_drop")))
(void) data_get_bool_converted(field, &q->hard_drop);
if ((field = data_key_get(settings, "max_per_cycle")))
if (!data_get_int_converted(field, &int64_tmp))
q->max_per_cycle = int64_tmp;
if ((field = data_key_get(settings, "max_usec_per_cycle")))
if (!data_get_int_converted(field, &int64_tmp))
q->max_usec_per_cycle = int64_tmp;
if ((field = data_key_get(settings, "max_queued")))
if (!data_get_int_converted(field, &int64_tmp))
q->max_queued = int64_tmp;
if ((field = data_key_get(settings, "yield_sleep")))
if (!data_get_int_converted(field, &int64_tmp))
q->yield_sleep = int64_tmp;
if ((field = data_key_get(settings, "interval")))
if (!data_get_int_converted(field, &int64_tmp))
q->interval = int64_tmp;
}
extern void rpc_queue_init(void)
{
data_t *conf = NULL;
if (!xstrcasestr(slurm_conf.slurmctld_params, "enable_rpc_queue")) {
enabled = false;
return;
}
error("enabled experimental rpc queuing system");
conf = _load_config();
for (slurmctld_rpc_t *q = slurmctld_rpcs; q->msg_type; q++) {
bool was_enabled = q->queue_enabled;
q->msg_name = rpc_num2string(q->msg_type);
_apply_config(conf, q);
/* config may have disabled this queue, check again */
if (!q->queue_enabled) {
if (was_enabled)
verbose("disabled rpc_queue for %s",
q->msg_name);
else if (q->rl_exempt)
verbose("disabled rate limiting for %s",
q->msg_name);
continue;
}
q->work = list_create(NULL);
slurm_cond_init(&q->cond, NULL);
slurm_mutex_init(&q->mutex);
q->shutdown = false;
verbose("starting rpc_queue for %s: max_per_cycle=%u max_usec_per_cycle=%u max_queued=%d hard_drop=%d yield_sleep=%d interval=%d",
q->msg_name, q->max_per_cycle, q->max_usec_per_cycle,
q->max_queued, q->hard_drop, q->yield_sleep,
q->interval);
slurm_thread_create(&q->thread, _rpc_queue_worker, q);
}
FREE_NULL_DATA(conf);
}
extern void rpc_queue_shutdown(void)
{
if (!enabled)
return;
enabled = false;
/* mark all as shut down */
for (slurmctld_rpc_t *q = slurmctld_rpcs; q->msg_type; q++) {
if (!q->queue_enabled)
continue;
slurm_mutex_lock(&q->mutex);
q->shutdown = true;
slurm_cond_signal(&q->cond);
slurm_mutex_unlock(&q->mutex);
}
/* wait for completion and cleanup */
for (slurmctld_rpc_t *q = slurmctld_rpcs; q->msg_type; q++) {
if (!q->queue_enabled)
continue;
slurm_thread_join(q->thread);
FREE_NULL_LIST(q->work);
}
}
extern bool rpc_queue_enabled(void)
{
return enabled;
}
extern int rpc_enqueue(slurm_msg_t *msg)
{
if (!enabled)
return ESLURM_NOT_SUPPORTED;
for (slurmctld_rpc_t *q = slurmctld_rpcs; q->msg_type; q++) {
if (q->msg_type == msg->msg_type) {
if (!q->queue_enabled)
break;
if (q->max_queued) {
slurm_mutex_lock(&q->mutex);
if (q->queued >= q->max_queued) {
q->dropped++;
record_rpc_queue_stats(q);
slurm_mutex_unlock(&q->mutex);
if (q->hard_drop)
return SLURMCTLD_COMMUNICATIONS_HARD_DROP;
else
return SLURMCTLD_COMMUNICATIONS_BACKOFF;
}
q->queued++;
record_rpc_queue_stats(q);
slurm_mutex_unlock(&q->mutex);
}
list_enqueue(q->work, msg);
slurm_mutex_lock(&q->mutex);
slurm_cond_signal(&q->cond);
slurm_mutex_unlock(&q->mutex);
return SLURM_SUCCESS;
}
}
/* RPC does not have a dedicated queue */
return ESLURM_NOT_SUPPORTED;
}