blob: e8bd3a46d80920553fbac314713234cf1cecc610 [file] [log] [blame]
/*****************************************************************************
* alloc.c - Slurm scrun job alloc handlers
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <limits.h>
#include "slurm/slurm.h"
#include "src/common/cpu_frequency.h"
#include "src/common/env.h"
#include "src/common/net.h"
#include "src/common/read_config.h"
#include "src/common/slurm_opt.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/spank.h"
#include "src/common/uid.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/conmgr/conmgr.h"
#include "src/scrun/scrun.h"
/* max number of seconds to delay while waiting for job */
#define MAX_DELAY 60
typedef struct {
const char *var;
int type;
} env_vars_t;
static const env_vars_t env_vars[] = {
{ "SCRUN_ACCOUNT", 'A' },
{ "SCRUN_ACCTG_FREQ", LONG_OPT_ACCTG_FREQ },
{ "SCRUN_BURST_BUFFER", LONG_OPT_BURST_BUFFER_SPEC },
{ "SCRUN_CLUSTER_CONSTRAINT", LONG_OPT_CLUSTER_CONSTRAINT },
{ "SCRUN_CLUSTERS", 'M' },
{ "SCRUN_CONSTRAINT", 'C' },
{ "SCRUN_CORE_SPEC", 'S' },
{ "SCRUN_CPU_BIND", LONG_OPT_CPU_BIND },
{ "SCRUN_CPU_FREQ_REQ", LONG_OPT_CPU_FREQ },
{ "SCRUN_CPUS_PER_GPU", LONG_OPT_CPUS_PER_GPU },
{ "SCRUN_CPUS_PER_TASK", 'c' },
{ "SCRUN_DELAY_BOOT", LONG_OPT_DELAY_BOOT },
{ "SCRUN_DEPENDENCY", 'd' },
{ "SCRUN_DISTRIBUTION", 'm' },
{ "SCRUN_EPILOG", LONG_OPT_EPILOG },
{ "SCRUN_EXACT", LONG_OPT_EXACT },
{ "SCRUN_EXCLUSIVE", LONG_OPT_EXCLUSIVE },
{ "SCRUN_GPU_BIND", LONG_OPT_GPU_BIND },
{ "SCRUN_GPU_FREQ", LONG_OPT_GPU_FREQ },
{ "SCRUN_GPUS", 'G' },
{ "SCRUN_GPUS_PER_NODE", LONG_OPT_GPUS_PER_NODE },
{ "SCRUN_GPUS_PER_SOCKET", LONG_OPT_GPUS_PER_SOCKET },
{ "SCRUN_GPUS_PER_TASK", LONG_OPT_GPUS_PER_TASK },
{ "SCRUN_GRES_FLAGS", LONG_OPT_GRES_FLAGS },
{ "SCRUN_GRES", LONG_OPT_GRES },
{ "SCRUN_HINT", LONG_OPT_HINT },
{ "SCRUN_JOB_NAME", 'J' },
{ "SCRUN_JOB_NODELIST", LONG_OPT_ALLOC_NODELIST },
{ "SCRUN_JOB_NUM_NODES", 'N' },
{ "SCRUN_LABELIO", 'l' },
{ "SCRUN_MEM_BIND", LONG_OPT_MEM_BIND },
{ "SCRUN_MEM_PER_CPU", LONG_OPT_MEM_PER_CPU },
{ "SCRUN_MEM_PER_GPU", LONG_OPT_MEM_PER_GPU },
{ "SCRUN_MEM_PER_NODE", LONG_OPT_MEM },
{ "SCRUN_MPI_TYPE", LONG_OPT_MPI },
{ "SCRUN_NCORES_PER_SOCKET", LONG_OPT_CORESPERSOCKET },
{ "SCRUN_NETWORK", LONG_OPT_NETWORK },
{ "SCRUN_NSOCKETS_PER_NODE", LONG_OPT_SOCKETSPERNODE },
{ "SCRUN_NTASKS", 'n' },
{ "SCRUN_NTASKS_PER_CORE", LONG_OPT_NTASKSPERCORE },
{ "SCRUN_NTASKS_PER_GPU", LONG_OPT_NTASKSPERGPU },
{ "SCRUN_NTASKS_PER_NODE", LONG_OPT_NTASKSPERNODE },
{ "SCRUN_NTASKS_PER_TRES", LONG_OPT_NTASKSPERTRES },
{ "SCRUN_OPEN_MODE", LONG_OPT_OPEN_MODE },
{ "SCRUN_OVERCOMMIT", 'O' },
{ "SCRUN_OVERLAP", LONG_OPT_OVERLAP },
{ "SCRUN_PARTITION", 'p' },
{ "SCRUN_POWER", LONG_OPT_POWER },
{ "SCRUN_PROFILE", LONG_OPT_PROFILE },
{ "SCRUN_PROLOG", LONG_OPT_PROLOG },
{ "SCRUN_QOS", 'q' },
{ "SCRUN_REMOTE_CWD", 'D' },
{ "SCRUN_REQ_SWITCH", LONG_OPT_SWITCH_REQ },
{ "SCRUN_RESERVATION", LONG_OPT_RESERVATION },
{ "SCRUN_SIGNAL", LONG_OPT_SIGNAL },
{ "SCRUN_SLURMD_DEBUG", LONG_OPT_SLURMD_DEBUG },
{ "SCRUN_SPREAD_JOB", LONG_OPT_SPREAD_JOB },
{ "SCRUN_TASK_EPILOG", LONG_OPT_TASK_EPILOG },
{ "SCRUN_TASK_PROLOG", LONG_OPT_TASK_PROLOG },
{ "SCRUN_THREAD_SPEC", LONG_OPT_THREAD_SPEC },
{ "SCRUN_THREADS_PER_CORE", LONG_OPT_THREADSPERCORE },
{ "SCRUN_THREADS", 'T' },
{ "SCRUN_TIMELIMIT", 't' },
{ "SCRUN_TRES_BIND", LONG_OPT_TRES_BIND },
{ "SCRUN_TRES_PER_TASK", LONG_OPT_TRES_PER_TASK },
{ "SCRUN_UNBUFFEREDIO", 'u' },
{ "SCRUN_USE_MIN_NODES", LONG_OPT_USE_MIN_NODES },
{ "SCRUN_WAIT4SWITCH", LONG_OPT_SWITCH_WAIT },
{ "SCRUN_WCKEY", LONG_OPT_WCKEY },
{ "SCRUN_WORKING_DIR", 'D' },
};
#define _set_env_args(state, field, pattern, ...) \
do { \
xassert(state.locked); \
(void) env_array_overwrite_fmt(&state.job_env, field, pattern, \
__VA_ARGS__); \
} while (0)
#define _set_env(state, field, value) \
do { \
const char *v = value; /* avoid field=(null) */ \
xassert(state.locked); \
(void) env_array_overwrite_fmt(&state.job_env, field, "%s", \
((v && v[0]) ? v : "")); \
} while (0)
static int _foreach_env_annotation(void *x, void *arg)
{
config_key_pair_t *key_pair_ptr = x;
char *key = xstrdup_printf("SCRUN_ANNOTATION_%s", key_pair_ptr->name);
xassert(!arg);
xassert(state.locked);
_set_env(state, key, key_pair_ptr->value);
xfree(key);
return SLURM_SUCCESS;
}
static void _script_env(void)
{
xassert(state.locked);
/* variables required for OCI state */
_set_env(state, "SCRUN_OCI_VERSION", state.oci_version);
_set_env(state, "SCRUN_CONTAINER_ID", state.id);
if (state.pid && (state.pid != INFINITE64))
_set_env_args(state, "SCRUN_PID", "%"PRIu64,
(uint64_t) state.pid);
_set_env(state, "SCRUN_BUNDLE", state.bundle);
_set_env(state, "SCRUN_SUBMISSION_BUNDLE", state.orig_bundle);
list_for_each_ro(state.annotations, _foreach_env_annotation, NULL);
_set_env(state, "SCRUN_PID_FILE", state.pid_file);
_set_env(state, "SCRUN_SOCKET", state.anchor_socket);
_set_env(state, "SCRUN_SPOOL_DIR", state.spool_dir);
_set_env(state, "SCRUN_SUBMISSION_CONFIG_FILE", state.config_file);
if ((state.user_id != NO_VAL) && (state.user_id != SLURM_AUTH_NOBODY)) {
/* set user if we know it but we may not in a user namespace */
char *u = uid_to_string_or_null(state.user_id);
_set_env(state, "SCRUN_USER", u);
xfree(u);
_set_env_args(state, "SCRUN_USER_ID", "%u", state.user_id);
}
if ((state.group_id != NO_VAL) &&
(state.group_id != SLURM_AUTH_NOBODY)) {
/* set group if we know it but we may not in a user namespace */
char *u = gid_to_string(state.group_id);
_set_env(state, "SCRUN_GROUP", u);
xfree(u);
_set_env_args(state, "SCRUN_GROUP_ID", "%u", state.group_id);
}
_set_env(state, "SCRUN_ROOT", state.root_dir);
_set_env(state, "SCRUN_ROOTFS_PATH", state.root_path);
_set_env(state, "SCRUN_SUBMISSION_ROOTFS_PATH", state.root_path);
if (log_file)
_set_env(state, "SCRUN_LOG_FILE", log_file);
if (log_format)
_set_env(state, "SCRUN_LOG_FORMAT", log_format);
if (state.tty_size.ws_col)
_set_env_args(state, "SLURM_PTY_WIN_COL", "%hu",
state.tty_size.ws_col);
if (state.tty_size.ws_row)
_set_env_args(state, "SLURM_PTY_WIN_ROW", "%hu",
state.tty_size.ws_row);
}
#undef _set_env
#undef _set_env_args
static int _stage_in()
{
int rc;
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: BEGIN container %s staging in", __func__, state.id);
unlock_state();
}
rc = stage_in();
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: END container %s staging in: %s",
__func__, state.id, slurm_strerror(rc));
unlock_state();
}
if (rc) {
read_lock_state();
error("%s: stage_in() for %s failed: %s",
__func__, state.id, slurm_strerror(rc));
unlock_state();
}
return rc;
}
static void *_on_connection(conmgr_fd_t *con, void *arg)
{
debug("%s:[%s] new srun connection",
__func__, conmgr_fd_get_name(con));
/* must return !NULL or connection will be closed */
return con;
}
static int _on_msg(conmgr_fd_t *con, slurm_msg_t *msg, int unpack_rc, void *arg)
{
int rc = SLURM_SUCCESS;
xassert(arg == con);
if (unpack_rc || !msg->auth_ids_set) {
error("%s: [%s] rejecting malformed RPC and closing connection: %s",
__func__, conmgr_fd_get_name(con),
slurm_strerror(unpack_rc));
slurm_free_msg(msg);
return unpack_rc;
}
switch (msg->msg_type)
{
case SRUN_PING:
{
/* if conmgr is alive then always respond success */
rc = slurm_send_rc_msg(msg, SLURM_SUCCESS);
debug("%s:[%s] srun RPC PING has been PONGED",
__func__, conmgr_fd_get_name(con));
break;
}
case SRUN_JOB_COMPLETE:
{
xassert(sizeof(srun_job_complete_msg_t) ==
sizeof(slurm_step_id_t));
slurm_step_id_t *step = msg->data;
debug("%s:[%s] %pS complete srun RPC",
__func__, conmgr_fd_get_name(con), step);
stop_anchor(SLURM_SUCCESS);
break;
}
case SRUN_TIMEOUT:
{
srun_timeout_msg_t *to = msg->data;
debug("%s:[%s] srun RPC %pS timeout at %ld RPC",
__func__, conmgr_fd_get_name(con), &to->step_id,
to->timeout);
stop_anchor(ESLURM_JOB_TIMEOUT_KILLED);
break;
}
case SRUN_USER_MSG:
{
srun_user_msg_t *um = msg->data;
debug("%s:[%s] JobId=%u srun user message RPC",
__func__, conmgr_fd_get_name(con), um->job_id);
print_multi_line_string(um->msg, -1, LOG_LEVEL_INFO);
break;
}
case SRUN_NODE_FAIL:
{
srun_node_fail_msg_t *nf = msg->data;
debug("%s:[%s] srun RPC %pS nodes failed: %s",
__func__, conmgr_fd_get_name(con), &nf->step_id,
nf->nodelist);
stop_anchor(ESLURM_JOB_NODE_FAIL_KILLED);
break;
}
case SRUN_REQUEST_SUSPEND:
{
suspend_msg_t *sus_msg = msg->data;
rc = SLURM_UNEXPECTED_MSG_ERROR;
error("%s:[%s] rejecting srun suspend RPC for %s",
__func__, conmgr_fd_get_name(con), sus_msg->job_id_str);
break;
}
case SRUN_NET_FORWARD:
{
net_forward_msg_t *net = msg->data;
rc = SLURM_UNEXPECTED_MSG_ERROR;
error("%s:[%s] rejecting srun net forward RPC for %s",
__func__, conmgr_fd_get_name(con), net->target);
break;
}
default:
rc = SLURM_UNEXPECTED_MSG_ERROR;
error("%s:[%s] received spurious srun message type: %s",
__func__, conmgr_fd_get_name(con),
rpc_num2string(msg->msg_type));
}
slurm_free_msg(msg);
return rc;
}
static void _on_finish(conmgr_fd_t *con, void *arg)
{
xassert(con == arg);
if (get_log_level() > LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: [%s] closed srun connection state=%s",
__func__, conmgr_fd_get_name(con),
slurm_container_status_to_str(state.status));
unlock_state();
}
}
/*
* Listen on srun port to make sure that slurmctld doesn't mark job as dead
* RET port listening on
*/
static uint32_t _setup_listener(void)
{
static const conmgr_events_t events = {
.on_connection = _on_connection,
.on_msg = _on_msg,
.on_finish = _on_finish,
};
uint16_t *ports;
uint16_t port = 0;
int fd = -1;
int rc;
if ((ports = slurm_get_srun_port_range())) {
if (net_stream_listen_ports(&fd, &port,
slurm_get_srun_port_range(),
false) < 0)
fatal("%s: unable to open local listening port. Try increasing range of SrunPortRange in slurm.conf.",
__func__);
} else {
if (net_stream_listen(&fd, &port) < 0)
fatal("%s: unable to open local listening port",
__func__);
}
xassert(port > 0);
debug("%s: listening for srun RPCs on port=%hu", __func__, port);
if ((rc = conmgr_process_fd(CON_TYPE_RPC, fd, fd, &events, CON_FLAG_NONE,
NULL, 0, NULL, NULL)))
fatal("%s: conmgr refused fd=%d: %s",
__func__, fd, slurm_strerror(rc));
return port;
}
static void _pending_callback(uint32_t job_id)
{
info("waiting on pending job allocation %u", job_id);
}
/* check allocation has all nodes ready */
extern void check_allocation(conmgr_callback_args_t conmgr_args, void *arg)
{
/* there must be only 1 thread that will call this at any one time */
static long delay = 1;
bool bail = false;
int rc, job_id;
read_lock_state();
bail = (state.status != CONTAINER_ST_CREATING);
job_id = state.jobid;
unlock_state();
if (bail) {
/*
* Only check allocations while creating. Something else must
* have broke before now so bail out.
*/
debug("%s: bailing due to status %s != %s",
__func__,
slurm_container_status_to_str(state.status),
slurm_container_status_to_str(CONTAINER_ST_CREATING));
stop_anchor(ESLURM_ALREADY_DONE);
return;
}
if (conmgr_args.status != CONMGR_WORK_STATUS_RUN) {
debug("%s: bailing due to callback status %s",
__func__, conmgr_work_status_string(conmgr_args.status));
stop_anchor(ESLURM_ALREADY_DONE);
return;
}
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: checking JobId=%d for nodes ready",
__func__, state.jobid);
unlock_state();
}
rc = slurm_job_node_ready(job_id);
if ((rc == READY_JOB_ERROR) || (rc == EAGAIN)) {
delay *= 2;
if ((delay < 0) || (delay > MAX_DELAY))
delay = MAX_DELAY;
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: rechecking JobId=%d for nodes ready in %ld ns",
__func__, state.jobid, delay);
unlock_state();
}
conmgr_add_work_delayed_fifo(check_allocation, NULL, delay, 0);
} else if ((rc == READY_JOB_FATAL) || !(rc & READY_JOB_STATE)) {
/* job failed! */
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: JobId=%d failed. Bailing on checking for nodes: %s",
__func__, state.jobid, slurm_strerror(rc));
unlock_state();
}
stop_anchor(ESLURM_ALREADY_DONE);
return;
} else {
/* job is ready! */
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
debug("%s: JobId=%d is ready", __func__, state.jobid);
unlock_state();
}
if ((rc = _stage_in())) {
stop_anchor(rc);
} else {
/* we have a job now. see if creating is done */
conmgr_add_work_fifo(on_allocation, NULL);
}
}
}
static void _alloc_job(void)
{
int rc;
resource_allocation_response_msg_t *alloc = NULL;
salloc_opt_t aopt = { 0 };
slurm_opt_t opt = {
.salloc_opt = &aopt,
};
char *opt_string = NULL;
struct option *spanked = slurm_option_table_create(&opt, &opt_string);
job_desc_msg_t *desc;
slurm_reset_all_options(&opt, true);
for (int i = 0; i < ARRAY_SIZE(env_vars); i++) {
const char *val;
const env_vars_t *e = &env_vars[i];
if ((val = getenv(e->var)) &&
slurm_process_option(&opt, e->type, val, true, false))
fatal("%s: Unable to process environment variable %s=%s",
__func__, e->var, val);
}
/* Process spank env options */
if ((rc = spank_process_env_options()))
fatal("%s: spank_process_env_options() failed: %s",
__func__, slurm_strerror(rc));
slurm_option_table_destroy(spanked);
spanked = NULL;
xfree(opt_string);
desc = slurm_opt_create_job_desc(&opt, true);
xfree(desc->name);
read_lock_state();
desc->name = xstrdup(state.id);
desc->container_id = xstrdup(state.id);
if (state.spank_job_env) {
desc->spank_job_env =
env_array_copy((const char **) state.spank_job_env);
desc->spank_job_env_size = envcount(state.spank_job_env);
}
unlock_state();
if (!desc->min_nodes || (desc->min_nodes == NO_VAL))
desc->min_nodes = 1;
/*
* Avoid giving the user/group as this may be run
* in user namespace as uid 0.
*/
desc->user_id = SLURM_AUTH_NOBODY;
desc->group_id = SLURM_AUTH_NOBODY;
desc->name = xstrdup("scrun");
desc->other_port = _setup_listener();
debug("%s: requesting allocation with %u tasks and %u hosts",
__func__, (desc->num_tasks == NO_VAL ? 1 : desc->num_tasks),
(desc->min_nodes == NO_VAL ? 1 : desc->min_nodes));
alloc = slurm_allocate_resources_blocking(desc, false,
_pending_callback);
if (!alloc)
fatal("Unable to request job allocation: %m");
if (alloc->error_code) {
error("%s: unable to request job allocation: %s",
__func__, slurm_strerror(alloc->error_code));
stop_anchor(alloc->error_code);
}
if (get_log_level() >= LOG_LEVEL_DEBUG) {
char *user = uid_to_string(alloc->uid);
char *group = gid_to_string(alloc->gid);
debug("allocated jobId=%u user[%u]=%s group[%u]=%s",
alloc->job_id, alloc->uid, user, alloc->uid, group);
xfree(user);
xfree(group);
}
write_lock_state();
state.jobid = alloc->job_id;
/* take job env (if any) for srun calls later */
SWAP(state.job_env, alloc->environment);
/* apply SPANK env (if any) */
for (int i = 0; i < opt.spank_job_env_size; i++) {
char *value, *name = NULL;
if ((value = strchr(opt.spank_job_env[i], '='))) {
value[0] = '\0';
value++;
}
xstrfmtcat(name, "SLURM_SPANK_%s", opt.spank_job_env[i]);
env_array_overwrite(&state.job_env, name, value);
xfree(name);
}
xassert(state.user_id == getuid());
state.user_id = alloc->uid;
xassert(state.user_id != SLURM_AUTH_NOBODY);
xassert(state.group_id == getgid());
state.group_id = alloc->gid;
xassert(state.group_id != SLURM_AUTH_NOBODY);
env_array_for_job(&state.job_env, alloc, desc, -1);
unlock_state();
slurm_free_job_desc_msg(desc);
slurm_free_resource_allocation_response_msg(alloc);
}
extern void get_allocation(conmgr_callback_args_t conmgr_args, void *arg)
{
int rc;
job_info_msg_t *jobs = NULL;
int job_id;
char *job_id_str = getenv("SLURM_JOB_ID");
bool existing_allocation = false;
if (job_id_str && job_id_str[0]) {
extern char **environ;
slurm_selected_step_t id = {0};
if ((rc = unfmt_job_id_string(job_id_str, &id, NO_VAL))) {
fatal("%s: invalid SLURM_JOB_ID=%s: %s",
__func__, job_id_str, slurm_strerror(rc));
return;
}
write_lock_state();
state.jobid = job_id = id.step_id.job_id;
state.existing_allocation = existing_allocation = true;
/* scrape SLURM_* from calling env */
state.job_env = env_array_create();
env_array_merge_slurm_spank(&state.job_env,
(const char **) environ);
unlock_state();
debug("Running under existing JobId=%u", job_id);
} else {
_alloc_job();
read_lock_state();
job_id = state.jobid;
unlock_state();
}
/* alloc response is too sparse. get full job info */
rc = slurm_load_job(&jobs, job_id, 0);
if (rc || !jobs || (jobs->record_count <= 0)) {
/* job not found or already died ? */
if ((rc == SLURM_ERROR) && errno)
rc = errno;
error("%s: unable to find JobId=%u: %s",
__func__, job_id, slurm_strerror(rc));
stop_anchor(rc);
return;
}
/* grab the first job */
xassert(jobs->job_array->job_id == job_id);
write_lock_state();
if (existing_allocation) {
xassert(state.user_id == getuid());
state.user_id = jobs->job_array->user_id;
xassert(state.user_id != SLURM_AUTH_NOBODY);
xassert(state.group_id == getgid());
state.group_id = jobs->job_array->group_id;
xassert(state.group_id != SLURM_AUTH_NOBODY);
}
_script_env();
unlock_state();
if (get_log_level() >= LOG_LEVEL_DEBUG) {
read_lock_state();
if (state.job_env)
for (int i = 0; state.job_env[i]; i++)
debug("Job env[%d]=%s", i, state.job_env[i]);
else
debug("JobId=%u did not provide an environment",
job_id);
unlock_state();
}
slurm_free_job_info_msg(jobs);
if (existing_allocation) {
if ((rc = _stage_in()))
stop_anchor(rc);
else
conmgr_add_work_fifo(on_allocation, NULL);
} else {
conmgr_add_work_delayed_fifo(check_allocation, NULL, 0, 1);
}
}