blob: cb120f445c67fb5c6c5e38c5ed9437948bb61eca [file] [log] [blame]
/*****************************************************************************\
* burst_buffer_lua.c - Plugin for managing burst buffers with lua
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#define _GNU_SOURCE
#include <ctype.h>
#include <lauxlib.h>
#include <lua.h>
#include <lualib.h>
#include <stdlib.h>
#include <unistd.h>
#include "slurm/slurm.h"
#include "src/common/assoc_mgr.h"
#include "src/common/data.h"
#include "src/common/fd.h"
#include "src/common/run_command.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/state_save.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
#include "src/interfaces/serializer.h"
#include "src/lua/slurm_lua.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/node_scheduler.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/slurmscriptd.h"
#include "src/slurmctld/trigger_mgr.h"
#include "src/plugins/burst_buffer/common/burst_buffer_common.h"
/* Script directive */
#define DEFAULT_DIRECTIVE_STR "BB_LUA"
/* Hold job if pre_run fails more times than MAX_RETRY_CNT */
#define MAX_RETRY_CNT 2
/* Used for the polling hooks "test_data_{in|out}" */
#define SLURM_BB_BUSY "BUSY"
/*
* Limit the number of burst buffers APIs allowed to run in parallel so that we
* don't exceed process or system resource limits (such as number of processes
* or max open files) when we run scripts through slurmscriptd. We limit this
* per "stage" (stage in, pre run, stage out, teardown) so that if we hit the
* maximum in stage in (for example) we won't block all jobs from completing.
* We also do this so that if 1000+ jobs complete or get cancelled all at
* once they won't all run teardown at the same time.
*/
#define MAX_BURST_BUFFERS_PER_STAGE 128
/*
* These variables are required by the burst buffer plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it.
*
* plugin_name - a string giving a human-readable description of the
* plugin. There is no maximum length, but the symbol must refer to
* a valid string.
*
* plugin_type - a string suggesting the type of the plugin or its
* applicability to a particular form of data or method of data handling.
* If the low-level plugin API is used, the contents of this string are
* unimportant and may be anything. Slurm uses the higher-level plugin
* interface which requires this string to be of the form
*
* <application>/<method>
*
* where <application> is a description of the intended application of
* the plugin (e.g., "burst_buffer" for Slurm burst_buffer) and <method> is a
* description of how this plugin satisfies that application. Slurm will only
* load a burst_buffer plugin if the plugin_type string has a prefix of
* "burst_buffer/".
*
* plugin_version - an unsigned 32-bit integer containing the Slurm version
* (major.minor.micro combined into a single number).
*/
const char plugin_name[] = "burst_buffer lua plugin";
const char plugin_type[] = "burst_buffer/lua";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
/*
* Most state information is in a common structure so that we can more
* easily use common functions from multiple burst buffer plugins.
*/
static bb_state_t bb_state;
static char *directive_str;
static int directive_len = 0;
static char *lua_script_path;
static const char *req_fxns[] = {
"slurm_bb_job_process",
"slurm_bb_pools",
"slurm_bb_job_teardown",
"slurm_bb_setup",
"slurm_bb_data_in",
"slurm_bb_test_data_in",
"slurm_bb_real_size",
"slurm_bb_paths",
"slurm_bb_pre_run",
"slurm_bb_post_run",
"slurm_bb_data_out",
"slurm_bb_test_data_out",
"slurm_bb_get_status",
NULL
};
/* Keep this in sync with req_fxns */
typedef enum {
SLURM_BB_JOB_PROCESS = 0,
SLURM_BB_POOLS,
SLURM_BB_JOB_TEARDOWN,
SLURM_BB_SETUP,
SLURM_BB_DATA_IN,
SLURM_BB_TEST_DATA_IN,
SLURM_BB_REAL_SIZE,
SLURM_BB_PATHS,
SLURM_BB_PRE_RUN,
SLURM_BB_POST_RUN,
SLURM_BB_DATA_OUT,
SLURM_BB_TEST_DATA_OUT,
SLURM_BB_GET_STATUS,
SLURM_BB_OP_MAX
} bb_op_e;
/* Description of each pool entry */
typedef struct bb_pools {
char *name;
uint64_t granularity;
uint64_t quantity;
uint64_t free;
} bb_pools_t;
typedef struct {
int i;
int num_pools;
bb_pools_t *pools;
} data_pools_arg_t;
typedef struct {
uint64_t bb_size;
uint32_t gid;
bool hurry;
uint32_t job_id;
char *job_script;
char *pool;
uint32_t uid;
} stage_args_t;
typedef struct {
uint32_t argc;
char **argv;
} status_args_t;
typedef struct {
uint32_t argc;
char **argv;
bool get_job_ptr;
bool have_job_lock;
uint32_t job_id;
job_record_t *job_ptr;
const char *lua_func;
char **resp_msg;
uint32_t timeout;
bool *track_script_signal;
bool with_scriptd;
} run_lua_args_t;
typedef void (*init_argv_f_t)(stage_args_t *stage_args, int *argc_p,
char ***argv_p);
typedef struct {
init_argv_f_t init_argv;
bb_op_e op_type;
int (*run_func)(stage_args_t *stage_args,
init_argv_f_t init_argv,
const char *op, uint32_t job_id, uint32_t timeout,
char **resp_msg);
uint32_t timeout;
} bb_func_t;
static int lua_thread_cnt = 0;
pthread_mutex_t lua_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Function prototypes */
static bb_job_t *_get_bb_job(job_record_t *job_ptr);
static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry,
uint32_t group_id);
static void _fail_stage(stage_args_t *stage_args, const char *op, int rc,
char *resp_msg);
static void _init_data_in_argv(stage_args_t *stage_args, int *argc_p,
char ***argv_p);
static int _get_lua_thread_cnt(void)
{
int cnt;
slurm_mutex_lock(&lua_thread_mutex);
cnt = lua_thread_cnt;
slurm_mutex_unlock(&lua_thread_mutex);
return cnt;
}
static void _incr_lua_thread_cnt(void)
{
slurm_mutex_lock(&lua_thread_mutex);
lua_thread_cnt++;
slurm_mutex_unlock(&lua_thread_mutex);
}
static void _decr_lua_thread_cnt(void)
{
slurm_mutex_lock(&lua_thread_mutex);
lua_thread_cnt--;
slurm_mutex_unlock(&lua_thread_mutex);
}
static void _loadscript_extra(lua_State *st)
{
/* local setup */
/*
* We may add functions later (like job_submit/lua and cli_filter/lua),
* but for now we don't have any.
*/
//slurm_lua_table_register(st, NULL, slurm_functions);
/* Push this string to the top of the stack" */
lua_pushstring(st, SLURM_BB_BUSY);
/*
* Add an entry to the table at index -2 (second from the top of the
* stack) with key == "SLURM_BB_BUSY" and value == SLURM_BB_BUSY.
* Also pop the value SLURM_BB_BUSY from the stack.
*/
lua_setfield(st, -2, "SLURM_BB_BUSY");
/* Must be always done after we register the slurm_functions */
/*
* This sets the table at the top of the stack to the value "slurm",
* which is what enables using slurm.<key> or slurm['key'] to access
* values from the table.
*/
lua_setglobal(st, "slurm");
}
static int _lua_job_info_field(lua_State *L, const job_info_t *job_info,
const char *name)
{
/*
* Be careful with 64-bit numbers.
* Lua prior to 5.3 stored all numbers as bit floating point numbers,
* which can cause loss of precision.
* Lua 5.3 onward can store 64-bit signed integers, but not unsigned
* integers, but Lua will also convert between its integer and floating
* point data types for certain operations - see section 3.4.3
* "Coercions and Conversions" of the Lua manual.
*/
if (!job_info) {
error("_job_info_field: job_info is NULL");
lua_pushnil(L);
} else if (!xstrcmp(name, "account")) {
lua_pushstring(L, job_info->account);
} else if (!xstrcmp(name, "accrue_time")) {
lua_pushinteger(L, job_info->accrue_time);
} else if (!xstrcmp(name, "admin_comment")) {
lua_pushstring(L, job_info->admin_comment);
} else if (!xstrcmp(name, "alloc_node")) {
lua_pushstring(L, job_info->alloc_node);
} else if (!xstrcmp(name, "alloc_sid")) {
lua_pushinteger(L, job_info->alloc_sid);
} else if (!xstrcmp(name, "array_job_id")) {
lua_pushinteger(L, job_info->array_job_id);
} else if (!xstrcmp(name, "array_task_id")) {
lua_pushinteger(L, job_info->array_task_id);
} else if (!xstrcmp(name, "array_max_tasks")) {
lua_pushinteger(L, job_info->array_max_tasks);
} else if (!xstrcmp(name, "array_task_str")) {
lua_pushstring(L, job_info->array_task_str);
} else if (!xstrcmp(name, "assoc_id")) {
lua_pushinteger(L, job_info->assoc_id);
} else if (!xstrcmp(name, "batch_features")) {
lua_pushstring(L, job_info->batch_features);
} else if (!xstrcmp(name, "batch_flag")) {
lua_pushinteger(L, job_info->batch_flag);
} else if (!xstrcmp(name, "batch_host")) {
lua_pushstring(L, job_info->batch_host);
/* Ignore bitflags */
} else if (!xstrcmp(name, "boards_per_node")) {
lua_pushinteger(L, job_info->boards_per_node);
} else if (!xstrcmp(name, "burst_buffer")) {
lua_pushstring(L, job_info->burst_buffer);
} else if (!xstrcmp(name, "burst_buffer_state")) {
lua_pushstring(L, job_info->burst_buffer_state);
} else if (!xstrcmp(name, "cluster")) {
lua_pushstring(L, job_info->cluster);
} else if (!xstrcmp(name, "cluster_features")) {
lua_pushstring(L, job_info->cluster_features);
} else if (!xstrcmp(name, "command")) {
lua_pushstring(L, job_info->command);
} else if (!xstrcmp(name, "comment")) {
lua_pushstring(L, job_info->comment);
} else if (!xstrcmp(name, "container")) {
lua_pushstring(L, job_info->container);
} else if (!xstrcmp(name, "container_id")) {
lua_pushstring(L, job_info->container_id);
} else if (!xstrcmp(name, "contiguous")) {
lua_pushinteger(L, job_info->contiguous);
} else if (!xstrcmp(name, "core_spec")) {
lua_pushinteger(L, job_info->core_spec);
} else if (!xstrcmp(name, "cores_per_socket")) {
lua_pushinteger(L, job_info->cores_per_socket);
} else if (!xstrcmp(name, "billable_tres")) {
lua_pushnumber(L, job_info->billable_tres);
} else if (!xstrcmp(name, "cpus_per_task")) {
lua_pushinteger(L, job_info->cpus_per_task);
} else if (!xstrcmp(name, "cpu_freq_min")) {
lua_pushinteger(L, job_info->cpu_freq_min);
} else if (!xstrcmp(name, "cpu_freq_max")) {
lua_pushinteger(L, job_info->cpu_freq_max);
} else if (!xstrcmp(name, "cpu_freq_gov")) {
lua_pushinteger(L, job_info->cpu_freq_gov);
} else if (!xstrcmp(name, "cpus_per_tres")) {
lua_pushstring(L, job_info->cpus_per_tres);
} else if (!xstrcmp(name, "cronspec")) {
lua_pushstring(L, job_info->cronspec);
} else if (!xstrcmp(name, "deadline")) {
lua_pushinteger(L, job_info->deadline);
} else if (!xstrcmp(name, "delay_boot")) {
lua_pushinteger(L, job_info->delay_boot);
} else if (!xstrcmp(name, "dependency")) {
lua_pushstring(L, job_info->dependency);
} else if (!xstrcmp(name, "derived_ec")) {
lua_pushinteger(L, job_info->derived_ec);
} else if (!xstrcmp(name, "eligible_time")) {
lua_pushinteger(L, job_info->eligible_time);
} else if (!xstrcmp(name, "end_time")) {
lua_pushinteger(L, job_info->end_time);
} else if (!xstrcmp(name, "exc_nodes")) {
lua_pushstring(L, job_info->exc_nodes);
/* Ignore exc_node_inx */
} else if (!xstrcmp(name, "exit_code")) {
lua_pushinteger(L, job_info->exit_code);
} else if (!xstrcmp(name, "features")) {
lua_pushstring(L, job_info->features);
} else if (!xstrcmp(name, "fed_origin_str")) {
lua_pushstring(L, job_info->fed_origin_str);
/* Ignore fed_siblings_active */
} else if (!xstrcmp(name, "fed_siblings_active_str")) {
lua_pushstring(L, job_info->fed_siblings_active_str);
/* Ignore fed_siblings_viable */
} else if (!xstrcmp(name, "fed_siblings_viable_str")) {
lua_pushstring(L, job_info->fed_siblings_viable_str);
} else if (!xstrcmp(name, "gres_detail_cnt")) {
lua_pushinteger(L, job_info->gres_detail_cnt);
} else if (!xstrcmp(name, "gres_detail_str")) {
if (!job_info->gres_detail_cnt)
lua_pushnil(L);
else {
/*
* Add a table: key=index i+1, value=gres_detail_str[i]
* (index=i+1 because Lua is one-indexed)
*/
lua_newtable(L);
for (int i = 0; i < job_info->gres_detail_cnt; i++) {
lua_pushinteger(L, i+1);
lua_pushstring(L, job_info->gres_detail_str[i]);
/*
* Adds this key-value pair to the table which
* is 3 from the top of the stack
*/
lua_settable(L, -3);
}
}
} else if (!xstrcmp(name, "gres_total")) {
lua_pushstring(L, job_info->gres_total);
} else if (!xstrcmp(name, "group_id")) {
lua_pushinteger(L, job_info->group_id);
} else if (!xstrcmp(name, "het_job_id")) {
lua_pushinteger(L, job_info->het_job_id);
} else if (!xstrcmp(name, "het_job_id_set")) {
lua_pushstring(L, job_info->het_job_id_set);
} else if (!xstrcmp(name, "het_job_offset")) {
lua_pushinteger(L, job_info->het_job_offset);
} else if (!xstrcmp(name, "job_id")) {
lua_pushinteger(L, job_info->job_id);
/* Ignore job_resrcs */
} else if (!xstrcmp(name, "job_state")) {
lua_pushinteger(L, job_info->job_state);
} else if (!xstrcmp(name, "last_sched_eval")) {
lua_pushinteger(L, job_info->last_sched_eval);
} else if (!xstrcmp(name, "licenses")) {
lua_pushstring(L, job_info->licenses);
} else if (!xstrcmp(name, "mail_type")) {
lua_pushinteger(L, job_info->mail_type);
} else if (!xstrcmp(name, "mail_user")) {
lua_pushstring(L, job_info->mail_user);
} else if (!xstrcmp(name, "max_cpus")) {
lua_pushinteger(L, job_info->max_cpus);
} else if (!xstrcmp(name, "max_nodes")) {
lua_pushinteger(L, job_info->max_nodes);
} else if (!xstrcmp(name, "mcs_label")) {
lua_pushstring(L, job_info->mcs_label);
} else if (!xstrcmp(name, "mem_per_tres")) {
lua_pushstring(L, job_info->mem_per_tres);
} else if (!xstrcmp(name, "min_mem_per_node")) {
if ((job_info->pn_min_memory != NO_VAL64) &&
!(job_info->pn_min_memory & MEM_PER_CPU))
lua_pushinteger(L, job_info->pn_min_memory);
else
lua_pushnil(L);
} else if (!xstrcmp(name, "min_mem_per_cpu")) {
if ((job_info->pn_min_memory != NO_VAL64) &&
(job_info->pn_min_memory & MEM_PER_CPU))
lua_pushinteger(L,
job_info->pn_min_memory & ~MEM_PER_CPU);
else
lua_pushnil(L);
} else if (!xstrcmp(name, "name")) {
lua_pushstring(L, job_info->name);
} else if (!xstrcmp(name, "network")) {
lua_pushstring(L, job_info->network);
} else if (!xstrcmp(name, "nodes")) {
lua_pushstring(L, job_info->nodes);
} else if (!xstrcmp(name, "nice")) {
lua_pushinteger(L, job_info->nice);
/* Ignore node_inx */
} else if (!xstrcmp(name, "ntasks_per_core")) {
lua_pushinteger(L, job_info->ntasks_per_core);
} else if (!xstrcmp(name, "ntasks_per_tres")) {
lua_pushinteger(L, job_info->ntasks_per_tres);
} else if (!xstrcmp(name, "ntasks_per_node")) {
lua_pushinteger(L, job_info->ntasks_per_node);
} else if (!xstrcmp(name, "ntasks_per_socket")) {
lua_pushinteger(L, job_info->ntasks_per_socket);
} else if (!xstrcmp(name, "ntasks_per_board")) {
lua_pushinteger(L, job_info->ntasks_per_board);
} else if (!xstrcmp(name, "num_cpus")) {
lua_pushinteger(L, job_info->num_cpus);
} else if (!xstrcmp(name, "num_nodes")) {
lua_pushinteger(L, job_info->num_nodes);
} else if (!xstrcmp(name, "num_tasks")) {
lua_pushinteger(L, job_info->num_tasks);
} else if (!xstrcmp(name, "partition")) {
lua_pushstring(L, job_info->partition);
} else if (!xstrcmp(name, "prefer")) {
lua_pushstring(L, job_info->prefer);
/* Ignore pn_min_memory - use min_mem_per_node|cpu instead */
} else if (!xstrcmp(name, "pn_min_cpus")) {
lua_pushinteger(L, job_info->pn_min_cpus);
} else if (!xstrcmp(name, "pn_min_tmp_disk")) {
lua_pushinteger(L, job_info->pn_min_tmp_disk);
} else if (!xstrcmp(name, "preempt_time")) {
lua_pushinteger(L, job_info->preempt_time);
} else if (!xstrcmp(name, "preemptable_time")) {
lua_pushinteger(L, job_info->preemptable_time);
} else if (!xstrcmp(name, "pre_sus_time")) {
lua_pushinteger(L, job_info->pre_sus_time);
} else if (!xstrcmp(name, "priority")) {
lua_pushinteger(L, job_info->priority);
} else if (!xstrcmp(name, "profile")) {
lua_pushinteger(L, job_info->profile);
} else if (!xstrcmp(name, "qos")) {
lua_pushstring(L, job_info->qos);
} else if (!xstrcmp(name, "reboot")) {
lua_pushinteger(L, job_info->reboot);
} else if (!xstrcmp(name, "req_nodes")) {
lua_pushstring(L, job_info->req_nodes);
/* Ignore req_node_inx */
} else if (!xstrcmp(name, "req_switch")) {
lua_pushinteger(L, job_info->req_switch);
} else if (!xstrcmp(name, "requeue")) {
lua_pushinteger(L, job_info->requeue);
} else if (!xstrcmp(name, "resize_time")) {
lua_pushinteger(L, job_info->resize_time);
} else if (!xstrcmp(name, "restart_cnt")) {
lua_pushinteger(L, job_info->restart_cnt);
} else if (!xstrcmp(name, "resv_name")) {
lua_pushstring(L, job_info->resv_name);
} else if (!xstrcmp(name, "sched_nodes")) {
lua_pushstring(L, job_info->sched_nodes);
} else if (!xstrcmp(name, "selinux_context")) {
lua_pushstring(L, job_info->selinux_context);
} else if (!xstrcmp(name, "shared")) {
lua_pushinteger(L, job_info->shared);
} else if (!xstrcmp(name, "site_factor")) {
lua_pushinteger(L, job_info->site_factor);
} else if (!xstrcmp(name, "sockets_per_board")) {
lua_pushinteger(L, job_info->sockets_per_board);
} else if (!xstrcmp(name, "sockets_per_node")) {
lua_pushinteger(L, job_info->sockets_per_node);
} else if (!xstrcmp(name, "start_time")) {
lua_pushinteger(L, job_info->start_time);
} else if (!xstrcmp(name, "start_protocol_ver")) {
lua_pushinteger(L, job_info->start_protocol_ver);
} else if (!xstrcmp(name, "state_desc")) {
lua_pushstring(L, job_info->state_desc);
} else if (!xstrcmp(name, "state_reason")) {
lua_pushinteger(L, job_info->state_reason);
} else if (!xstrcmp(name, "std_err")) {
lua_pushstring(L, job_info->std_err);
} else if (!xstrcmp(name, "std_in")) {
lua_pushstring(L, job_info->std_in);
} else if (!xstrcmp(name, "std_out")) {
lua_pushstring(L, job_info->std_out);
} else if (!xstrcmp(name, "submit_time")) {
lua_pushinteger(L, job_info->submit_time);
} else if (!xstrcmp(name, "suspend_time")) {
lua_pushinteger(L, job_info->suspend_time);
} else if (!xstrcmp(name, "system_comment")) {
lua_pushstring(L, job_info->system_comment);
} else if (!xstrcmp(name, "time_limit")) {
lua_pushinteger(L, job_info->time_limit);
} else if (!xstrcmp(name, "time_min")) {
lua_pushinteger(L, job_info->time_min);
} else if (!xstrcmp(name, "threads_per_core")) {
lua_pushinteger(L, job_info->threads_per_core);
} else if (!xstrcmp(name, "tres_bind")) {
lua_pushstring(L, job_info->tres_bind);
} else if (!xstrcmp(name, "tres_freq")) {
lua_pushstring(L, job_info->tres_freq);
} else if (!xstrcmp(name, "tres_per_job")) {
lua_pushstring(L, job_info->tres_per_job);
} else if (!xstrcmp(name, "tres_per_node")) {
lua_pushstring(L, job_info->tres_per_node);
} else if (!xstrcmp(name, "tres_per_socket")) {
lua_pushstring(L, job_info->tres_per_socket);
} else if (!xstrcmp(name, "tres_per_task")) {
lua_pushstring(L, job_info->tres_per_task);
} else if (!xstrcmp(name, "tres_req_str")) {
lua_pushstring(L, job_info->tres_req_str);
} else if (!xstrcmp(name, "tres_alloc_str")) {
lua_pushstring(L, job_info->tres_alloc_str);
} else if (!xstrcmp(name, "user_id")) {
lua_pushinteger(L, job_info->user_id);
/*
* user_name is not guaranteed to be set, but is accurate when it is.
* See slurm_job_info_t in slurm.h. This is for performance reasons,
* as we are avoiding using a job_write_lock to set it in job_info
* before it is packed, and we are avoiding doing a lookup with UID
* multiple times per job in the lua script. If performance is not a
* concern and username is needed, the script may do a lookup using
* the UID.
*/
} else if (!xstrcmp(name, "user_name")) {
lua_pushstring(L, job_info->user_name);
} else if (!xstrcmp(name, "wait4switch")) {
lua_pushinteger(L, job_info->wait4switch);
} else if (!xstrcmp(name, "wckey")) {
lua_pushstring(L, job_info->wckey);
} else if (!xstrcmp(name, "work_dir")) {
lua_pushstring(L, job_info->work_dir);
} else {
lua_pushnil(L);
}
return 1;
}
/*
* Arguments are passed to us on the stack of L:
* (1) The table (second from the top of the stack)
* (2) The key (top of the stack)
*/
static int _job_info_field_index(lua_State *L)
{
const char *name = luaL_checkstring(L, 2);
job_info_t *job_info;
/* Pushes the metatable of the table onto the stack */
lua_getmetatable(L, -2);
/*
* Pushes metatable["_job_info_ptr"] onto the stack, which is just a
* pointer to job_info.
*/
lua_getfield(L, -1, "_job_info_ptr");
/* Now we can get the pointer to job_info from the top of the stack */
job_info = lua_touserdata(L, -1);
return _lua_job_info_field(L, job_info, name);
}
/*
* This function results in a single metatable on the stack:
*
* <metatable> = {
* __index = _job_info_field_index
* _job_info_ptr = job_info
* }
*
* The metatable defines the behavior of special operations on the table.
* The __index operation is called when indexing job_info in lua.
* When burst_buffer.lua does this:
* job_info["some_value"]
* Then the function _job_info_field_index is called with the table and key
* as the arguments:
* (1) This job_info table, (2) "some_value"
*
* See the lua manual section 2.4 Metatables and Metamethods for the various
* table operations (which are identified by keys in the metatable).
*
* Negative indices reference the top of the stack:
* stack[-1] is the top of the stack, stack[-2] is the second from the top of
* the stack, etc.
*/
static void _push_job_info(job_info_t *job_info, lua_State *L)
{
/*
* Push a table onto the stack
* Stack after this call:
* -1 table1
*/
lua_newtable(L);
/*
* Push a table the stack.
* Stack after this call:
* -1 table2
* -2 table1
*/
lua_newtable(L);
/*
* Push a c function to the stack.
* Stack after this call:
* -1 function = _push_job_info_field_index
* -2 table2
* -3 table1
*/
lua_pushcfunction(L, _job_info_field_index);
/*
* - Pop _job_info_field_index
* - table2["__index"] = _job_info_field_index
* Stack after this call:
* -1 table2 = {
* __index = _job_info_field_index
* }
* -2 table1
*/
lua_setfield(L, -2, "__index");
/*
* The next two calls store the job_info in the metatable, so the index
* function knows which struct it's getting data for.
* lightuserdata represents a C pointer (a void*).
*/
/*
* Pushes lightuserdata onto the stack.
* Stack after this call:
* -1 userdata = pointer to job_info
* -2 table2 = {
* __index = _job_info_field_index
* }
* -3 table1
*/
lua_pushlightuserdata(L, job_info);
/*
* - Pop userdata (pointer to job_info) from the stack
* - table2["_job_info_ptr"] = userdata
* Stack after this call:
* Assigns second from top value on stack to the value on the top
* of the stack and pops the top value from the stack
* -1 table2 = {
* __index = _job_info_field_index
* _job_info_ptr = job_info
* }
* -2 table1
*/
lua_setfield(L, -2, "_job_info_ptr");
/*
* Pops a value from the stack and sets it as the new metatable for
* the value at the given index (second from the top value on the stack)
* - Pop table2 from the stack
* - table1 metatable = table2
* Stack after this call:
* -1 table1 metatable = {
* __index = _job_info_field_index
* _job_info_ptr = job_info
* }
*/
lua_setmetatable(L, -2);
}
static int _handle_lua_return_code(lua_State *L, const char *lua_func)
{
/* Return code is always the bottom of the stack. */
if (!lua_isnumber(L, 1)) {
error("%s: %s returned a non-numeric return code, returning error",
__func__, lua_func);
return SLURM_ERROR;
} else {
return lua_tonumber(L, 1);
}
}
static void _print_lua_rc_msg(int rc, const char *lua_func, uint32_t job_id,
char *resp_msg)
{
/*
* Some burst buffer APIs don't run for a specific job. But if they
* do run for a specific job, log the job ID.
*/
if (job_id)
log_flag(BURST_BUF, "%s for JobId=%u returned, status=%d, response=%s",
lua_func, job_id, rc, resp_msg);
else
log_flag(BURST_BUF, "%s returned, status=%d, response=%s",
lua_func, rc, resp_msg);
}
static int _handle_lua_return(lua_State *L, const char *lua_func,
uint32_t job_id, char **ret_str)
{
int rc = SLURM_SUCCESS;
int num_stack_elems = lua_gettop(L);
if (!num_stack_elems) {
log_flag(BURST_BUF, "%s finished and didn't return anything",
lua_func);
return rc; /* No results, return success. */
}
/* Bottom of the stack should be the return code. */
rc = _handle_lua_return_code(L, lua_func);
if (num_stack_elems > 1) {
/*
* Multiple results. Right now we only consider up to 2 results,
* and the second should be a string.
*/
xassert(ret_str);
if (lua_isstring(L, 2)) {
xfree(*ret_str);
/*
* Valgrind thinks that we leak this lua_tostring() by
* calling xstrdup and not free'ing the string on the
* lua stack, but lua will garbage collect it after
* we pop it off the stack.
*/
*ret_str = xstrdup(lua_tostring(L, 2));
} else {
/* Don't know how to handle non-strings here. */
error("%s: Cannot handle non-string as second return value for lua function %s.",
__func__, lua_func);
rc = SLURM_ERROR;
}
}
if (ret_str)
_print_lua_rc_msg(rc, lua_func, job_id, *ret_str);
else
_print_lua_rc_msg(rc, lua_func, job_id, NULL);
/* Pop everything from the stack. */
lua_pop(L, num_stack_elems);
return rc;
}
static int _start_lua_script(const char *func, uint32_t job_id, uint32_t argc,
char **argv, job_info_msg_t *job_info,
char **resp_msg)
{
/*
* We don't make lua_State L or lua_script_last_loaded static.
* If they were static, then only 1 thread could use them at a time.
* This would be problematic for performance since these
* calls can possibly last a long time. By not making them static it
* means we can let these calls run in parallel, but it also means
* we don't preserve the previous script. Therefore, we have to
* reload the script every time even if the script hasn't changed.
* Also, if there is ever a problem loading the script then we can't
* fall back to the old script.
*/
lua_State *L = NULL;
time_t lua_script_last_loaded = (time_t) 0;
int rc, i;
errno = 0;
rc = slurm_lua_loadscript(&L, "burst_buffer/lua",
lua_script_path, req_fxns,
&lua_script_last_loaded, _loadscript_extra,
resp_msg);
if (rc != SLURM_SUCCESS)
return rc;
/*
* All lua script functions should have been verified during
* initialization:
*/
lua_getglobal(L, func);
if (lua_isnil(L, -1)) {
error("%s: Couldn't find function %s",
__func__, func);
lua_close(L);
return SLURM_ERROR;
}
for (i = 0; i < argc; i++)
lua_pushstring(L, argv[i]);
if (job_info) {
job_info_t *info = &job_info->job_array[0];
_push_job_info(info, L);
argc++;
}
slurm_lua_stack_dump("burst_buffer/lua", "before lua_pcall", L);
/* Run the lua command and tell the calling thread when it's done. */
if ((rc = lua_pcall(L, argc, LUA_MULTRET, 0)) != 0) {
error("%s: %s", lua_script_path, lua_tostring(L, -1));
rc = SLURM_ERROR;
lua_pop(L, lua_gettop(L));
} else {
slurm_lua_stack_dump("burst_buffer/lua", "after lua_pcall, before returns have been popped", L);
rc = _handle_lua_return(L, func, job_id, resp_msg);
}
slurm_lua_stack_dump("burst_buffer/lua", "after lua_pcall, after returns have been popped", L);
lua_close(L);
return rc;
}
/*
* Call a function in burst_buffer.lua.
*/
static int _run_lua_script(run_lua_args_t *args)
{
int rc;
buf_t *info_buf = NULL;
list_t *job_ids = NULL;
job_record_t *job_ptr;
slurmctld_lock_t job_read_lock = {
.conf = READ_LOCK, .job = READ_LOCK,
};
if (args->get_job_ptr) {
if (!args->have_job_lock)
lock_slurmctld(job_read_lock);
if (args->job_ptr)
job_ptr = args->job_ptr;
else {
job_ptr = find_job_record(args->job_id);
if (!job_ptr) {
error("Unable to find job record for JobId=%u, cannot run %s",
args->job_id, args->lua_func);
if (args->resp_msg)
*args->resp_msg =
xstrdup_printf("Unable to find job record for JobId=%u, cannot run %s",
args->job_id,
args->lua_func);
rc = SLURM_ERROR;
if (!args->have_job_lock)
unlock_slurmctld(job_read_lock);
return rc;
}
}
job_ids = list_create(NULL);
list_append(job_ids, &job_ptr->job_id);
info_buf = pack_spec_jobs(job_ids, SHOW_DETAIL,
slurm_conf.slurm_user_id, NO_VAL,
SLURM_PROTOCOL_VERSION);
if (!args->have_job_lock)
unlock_slurmctld(job_read_lock);
}
_incr_lua_thread_cnt();
if (args->with_scriptd) {
rc = slurmscriptd_run_bb_lua(args->job_id,
(char *) args->lua_func,
args->argc,
args->argv,
args->timeout,
info_buf,
args->resp_msg,
args->track_script_signal);
} else {
job_info_msg_t *job_info = NULL;
if (info_buf) {
slurm_msg_t *info_msg = xmalloc(sizeof *info_msg);
slurm_msg_t_init(info_msg);
info_msg->protocol_version = SLURM_PROTOCOL_VERSION;
info_msg->msg_type = RESPONSE_JOB_INFO;
/*
* Since we are directly unpacking the buffer that we
* just packed, we need to reset the "processed" value
* of the buffer because unpacking starts from there.
*/
set_buf_offset(info_buf, 0);
unpack_msg(info_msg, info_buf);
job_info = info_msg->data;
info_msg->data = NULL;
slurm_free_msg(info_msg);
/*
* If info_buf is non-NULL then we should always have
* gotten a job. This assert catches if something
* went wrong with the pack or unpack.
*/
xassert(job_info->record_count);
}
rc = _start_lua_script(args->lua_func, args->job_id, args->argc,
args->argv, job_info, args->resp_msg);
slurm_free_job_info_msg(job_info);
}
_decr_lua_thread_cnt();
FREE_NULL_LIST(job_ids);
FREE_NULL_BUFFER(info_buf);
return rc;
}
/*
* Write current burst buffer state to a file.
*/
static void _save_bb_state(void)
{
static time_t last_save_time = 0;
static uint32_t high_buffer_size = 16 * 1024;
time_t save_time = time(NULL);
bb_alloc_t *bb_alloc;
uint32_t rec_count = 0;
buf_t *buffer;
int i, count_offset, offset;
uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
if ((bb_state.last_update_time <= last_save_time) &&
!bb_state.term_flag)
return;
buffer = init_buf(high_buffer_size);
pack16(protocol_version, buffer);
count_offset = get_buf_offset(buffer);
pack32(rec_count, buffer);
/* Each allocated burst buffer is in bb_state.bb_ahash */
if (bb_state.bb_ahash) {
slurm_mutex_lock(&bb_state.bb_mutex);
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
packstr(bb_alloc->account, buffer);
pack_time(bb_alloc->create_time, buffer);
pack32(bb_alloc->id, buffer);
packstr(bb_alloc->name, buffer);
packstr(bb_alloc->partition, buffer);
packstr(bb_alloc->pool, buffer);
packstr(bb_alloc->qos, buffer);
pack32(bb_alloc->user_id, buffer);
pack32(bb_alloc->group_id, buffer);
pack64(bb_alloc->size, buffer);
rec_count++;
bb_alloc = bb_alloc->next;
}
}
save_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
offset = get_buf_offset(buffer);
set_buf_offset(buffer, count_offset);
pack32(rec_count, buffer);
set_buf_offset(buffer, offset);
}
if (!save_buf_to_state("burst_buffer_lua_state", buffer, NULL))
last_save_time = save_time;
FREE_NULL_BUFFER(buffer);
}
static void _recover_bb_state(void)
{
char *state_file = NULL;
uint16_t protocol_version = NO_VAL16;
uint32_t rec_count = 0;
uint32_t id = 0, user_id = 0, group_id = 0;
uint64_t size = 0;
int i;
char *account = NULL, *name = NULL;
char *partition = NULL, *pool = NULL, *qos = NULL;
char *end_ptr = NULL;
time_t create_time = 0;
bb_alloc_t *bb_alloc;
buf_t *buffer;
errno = 0;
buffer = state_save_open("burst_buffer_lua_state", &state_file);
if (!buffer && errno == ENOENT) {
info("No burst buffer state file (%s) to recover",
state_file);
xfree(state_file);
return;
}
xfree(state_file);
safe_unpack16(&protocol_version, buffer);
if (protocol_version == NO_VAL16) {
if (!ignore_state_errors)
fatal("Can not recover burst_buffer/lua state, data version incompatible, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
error("**********************************************************************");
error("Can not recover burst_buffer/lua state, data version incompatible");
error("**********************************************************************");
return;
}
safe_unpack32(&rec_count, buffer);
for (i = 0; i < rec_count; i++) {
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpackstr(&account, buffer);
safe_unpack_time(&create_time, buffer);
safe_unpack32(&id, buffer);
safe_unpackstr(&name, buffer);
safe_unpackstr(&partition, buffer);
safe_unpackstr(&pool, buffer);
safe_unpackstr(&qos, buffer);
safe_unpack32(&user_id, buffer);
safe_unpack32(&group_id, buffer);
safe_unpack64(&size, buffer);
}
slurm_mutex_lock(&bb_state.bb_mutex);
bb_alloc = bb_alloc_name_rec(&bb_state, name, user_id);
bb_alloc->group_id = group_id;
bb_alloc->id = id;
if (name && (name[0] >='0') && (name[0] <='9')) {
bb_alloc->job_id = strtol(name, &end_ptr, 10);
bb_alloc->array_job_id = bb_alloc->job_id;
bb_alloc->array_task_id = NO_VAL;
}
bb_alloc->seen_time = time(NULL);
bb_alloc->size = size;
log_flag(BURST_BUF, "Recovered burst buffer %s from user %u",
bb_alloc->name, bb_alloc->user_id);
bb_alloc->account = account;
account = NULL;
bb_alloc->create_time = create_time;
bb_alloc->partition = partition;
partition = NULL;
bb_alloc->pool = pool;
pool = NULL;
bb_alloc->qos = qos;
qos = NULL;
slurm_mutex_unlock(&bb_state.bb_mutex);
xfree(name);
}
info("Recovered state of %d burst buffers", rec_count);
FREE_NULL_BUFFER(buffer);
return;
unpack_error:
if (!ignore_state_errors)
fatal("Incomplete burst buffer data checkpoint file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
error("Incomplete burst buffer data checkpoint file");
xfree(account);
xfree(name);
xfree(partition);
xfree(qos);
FREE_NULL_BUFFER(buffer);
return;
}
/* For a given user/partition/account, set it's assoc_ptr */
static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc)
{
slurmdb_assoc_rec_t assoc_rec;
slurmdb_qos_rec_t qos_rec;
memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
assoc_rec.acct = bb_alloc->account;
assoc_rec.partition = bb_alloc->partition;
assoc_rec.uid = bb_alloc->user_id;
if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
accounting_enforce,
&bb_alloc->assoc_ptr,
true) == SLURM_SUCCESS) {
xfree(bb_alloc->assocs);
if (bb_alloc->assoc_ptr) {
bb_alloc->assocs =
xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
}
}
memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
qos_rec.name = bb_alloc->qos;
if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec, accounting_enforce,
&bb_alloc->qos_ptr, true) != SLURM_SUCCESS)
verbose("Invalid QOS name: %s",
bb_alloc->qos);
}
static void _apply_limits(void)
{
bb_alloc_t *bb_alloc;
/* read locks on assoc */
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK, .user = READ_LOCK };
assoc_mgr_lock(&assoc_locks);
slurm_mutex_lock(&bb_state.bb_mutex);
for (int i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
info("Recovered buffer Name:%s User:%u Pool:%s Size:%"PRIu64,
bb_alloc->name, bb_alloc->user_id,
bb_alloc->pool, bb_alloc->size);
_set_assoc_mgr_ptrs(bb_alloc);
bb_limit_add(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state, true);
bb_alloc = bb_alloc->next;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
assoc_mgr_unlock(&assoc_locks);
}
static void _bb_free_pools(bb_pools_t *pools, int num_ent)
{
for (int i = 0; i < num_ent; i++)
xfree(pools[i].name);
xfree(pools);
}
static int _data_get_val_from_key(data_t *data, char *key, data_type_t type,
bool required, void *out_val)
{
int rc = SLURM_SUCCESS;
data_t *data_tmp = NULL;
char **val_str;
int64_t *val_int;
data_tmp = data_key_get(data, key);
if (!data_tmp) {
if (required)
return SLURM_ERROR;
return SLURM_SUCCESS; /* Not specified */
}
if (data_get_type(data_tmp) != type) {
error("%s: %s is the wrong data type", __func__, key);
return SLURM_ERROR;
}
switch (type) {
case DATA_TYPE_STRING:
val_str = out_val;
*val_str = xstrdup(data_get_string(data_tmp));
break;
case DATA_TYPE_INT_64:
val_int = out_val;
*val_int = data_get_int(data_tmp);
break;
default:
rc = SLURM_ERROR;
break;
}
return rc;
}
/*
* IN data - A dictionary describing a pool.
* IN/OUT arg - pointer to data_pools_arg_t. This function populates a pool
* in arg->pools.
*
* RET data_for_each_cmd_t
*/
static data_for_each_cmd_t _foreach_parse_pool(data_t *data, void *arg)
{
data_pools_arg_t *data_arg = arg;
data_for_each_cmd_t rc = DATA_FOR_EACH_CONT;
bb_pools_t *pools = data_arg->pools;
int i = data_arg->i;
if (i > data_arg->num_pools) {
/* This should never happen. */
error("%s: Got more pools than are in the dict. Cannot parse pools.",
__func__);
rc = DATA_FOR_EACH_FAIL;
goto next;
}
pools[i].free = NO_VAL64;
pools[i].granularity = NO_VAL64;
pools[i].quantity = NO_VAL64;
if (_data_get_val_from_key(data, "id", DATA_TYPE_STRING, true,
&pools[i].name) != SLURM_SUCCESS) {
error("%s: Failure parsing id", __func__);
rc = DATA_FOR_EACH_FAIL;
goto next;
}
if (_data_get_val_from_key(data, "free", DATA_TYPE_INT_64, false,
&pools[i].free) != SLURM_SUCCESS) {
error("%s: Failure parsing free", __func__);
rc = DATA_FOR_EACH_FAIL;
goto next;
}
if (_data_get_val_from_key(data, "granularity", DATA_TYPE_INT_64, false,
&pools[i].granularity) != SLURM_SUCCESS) {
error("%s: Failure parsing granularity", __func__);
rc = DATA_FOR_EACH_FAIL;
goto next;
}
if (_data_get_val_from_key(data, "quantity", DATA_TYPE_INT_64, false,
&pools[i].quantity) != SLURM_SUCCESS) {
error("%s: Failure parsing quantity", __func__);
rc = DATA_FOR_EACH_FAIL;
goto next;
}
next:
data_arg->i += 1;
return rc;
}
static int _run_lua_script_wrapper(run_lua_args_t *run_lua_args)
{
int rc;
DEF_TIMERS;
START_TIMER;
rc = _run_lua_script(run_lua_args);
END_TIMER;
if (run_lua_args->job_id)
log_flag(BURST_BUF, "%s for JobId=%u ran for %s",
run_lua_args->lua_func, run_lua_args->job_id,
TIME_STR);
else
log_flag(BURST_BUF, "%s ran for %s",
run_lua_args->lua_func, TIME_STR);
return rc;
}
static int _run_lua_stage_script(stage_args_t *stage_args,
init_argv_f_t init_argv,
const char *op,
uint32_t job_id, uint32_t timeout,
char **resp_msg)
{
bool track_script_signal = false;
int rc;
int argc = 0;
char **argv = NULL;
run_lua_args_t run_lua_args = { 0 };
init_argv(stage_args, &argc, &argv);
run_lua_args.argc = argc;
run_lua_args.argv = argv;
run_lua_args.get_job_ptr = true;
run_lua_args.job_id = job_id;
run_lua_args.lua_func = op;
run_lua_args.resp_msg = resp_msg;
run_lua_args.timeout = timeout;
run_lua_args.track_script_signal = &track_script_signal;
run_lua_args.with_scriptd = true;
rc = _run_lua_script_wrapper(&run_lua_args);
xfree_array(argv);
if (track_script_signal) {
/* Killed by slurmctld, exit now. */
info("%s for JobId=%u terminated by slurmctld",
op, stage_args->job_id);
return SLURM_ERROR;
}
if (rc != SLURM_SUCCESS) {
_fail_stage(stage_args, op, rc, *resp_msg);
return SLURM_ERROR;
}
return rc;
}
static int _run_stage_ops(bb_func_t *stage_ops, int op_count,
stage_args_t *stage_args)
{
for (int i = 0; i < op_count; i++) {
int rc;
char *resp_msg = NULL;
const char *op;
bb_op_e op_type = stage_ops[i].op_type;
op = req_fxns[op_type];
rc = stage_ops[i].run_func(stage_args,
stage_ops[i].init_argv,
op,
stage_args->job_id,
stage_ops[i].timeout,
&resp_msg);
xfree(resp_msg);
if (rc != SLURM_SUCCESS)
return rc;
}
return SLURM_SUCCESS;
}
static bb_pools_t *_bb_get_pools(int *num_pools, uint32_t timeout, int *out_rc)
{
int rc;
char *resp_msg = NULL;
const char *lua_func_name = req_fxns[SLURM_BB_POOLS];
bb_pools_t *pools = NULL;
data_pools_arg_t arg;
data_t *data = NULL;
data_t *data_tmp = NULL;
run_lua_args_t run_lua_args = {
.lua_func = lua_func_name,
.resp_msg = &resp_msg,
.timeout = timeout,
};
*num_pools = 0;
/* Call lua function. */
rc = _run_lua_script_wrapper(&run_lua_args);
*out_rc = rc;
if (rc != SLURM_SUCCESS) {
trigger_burst_buffer();
return NULL;
}
if (!resp_msg) {
/* This is okay - pools are not required. */
return NULL;
}
rc = serialize_g_string_to_data(&data, resp_msg, strlen(resp_msg),
MIME_TYPE_JSON);
if ((rc != SLURM_SUCCESS) || !data) {
error("%s: Problem parsing \"%s\": %s",
__func__, resp_msg, slurm_strerror(rc));
goto cleanup;
}
data_tmp = data_resolve_dict_path(data, "/pools");
if (!data_tmp || (data_get_type(data_tmp) != DATA_TYPE_LIST)) {
error("%s: Did not find pools dictionary; problem parsing \"%s\"",
__func__, resp_msg);
goto cleanup;
}
*num_pools = (int) data_get_list_length(data_tmp);
if (*num_pools == 0) {
error("%s: No pools found, problem parsing \"%s\"",
__func__, resp_msg);
goto cleanup;
}
pools = xcalloc(*num_pools, sizeof(*pools));
arg.num_pools = *num_pools;
arg.pools = pools;
arg.i = 0;
rc = data_list_for_each(data_tmp, _foreach_parse_pool, &arg);
if (rc <= 0) {
error("%s: Failed to parse pools: \"%s\"", __func__, resp_msg);
goto cleanup;
}
cleanup:
xfree(resp_msg);
FREE_NULL_DATA(data);
return pools;
}
static int _load_pools(uint32_t timeout)
{
static bool first_run = true;
bool have_new_pools = false;
int num_pools = 0, i, j, pools_inx, rc;
burst_buffer_pool_t *pool_ptr;
bb_pools_t *pools;
bitstr_t *pools_bitmap;
/* Load the pools information from burst_buffer.lua. */
pools = _bb_get_pools(&num_pools, timeout, &rc);
if (rc != SLURM_SUCCESS) {
error("Get pools returned error %d, cannot use pools unless get pools returns success",
rc);
return SLURM_ERROR;
}
if (!pools) {
/* Pools are not required. */
return SLURM_SUCCESS;
}
slurm_mutex_lock(&bb_state.bb_mutex);
pools_bitmap = bit_alloc(bb_state.bb_config.pool_cnt + num_pools);
/* Put found pools into bb_state.bb_config.pool_ptr. */
for (i = 0; i < num_pools; i++) {
bool found_pool = false;
pool_ptr = bb_state.bb_config.pool_ptr;
for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
if (!xstrcmp(pool_ptr->name, pools[i].name)) {
found_pool = true;
break;
}
}
if (!found_pool) {
have_new_pools = true;
/* This is a new pool. Add it to bb_state. */
if (!first_run)
info("Newly reported pool %s", pools[i].name);
bb_state.bb_config.pool_ptr =
xrealloc(bb_state.bb_config.pool_ptr,
sizeof(burst_buffer_pool_t) *
(bb_state.bb_config.pool_cnt + 1));
pool_ptr = bb_state.bb_config.pool_ptr +
bb_state.bb_config.pool_cnt;
pool_ptr->name = xstrdup(pools[i].name);
bb_state.bb_config.pool_cnt++;
}
pools_inx = pool_ptr - bb_state.bb_config.pool_ptr;
bit_set(pools_bitmap, pools_inx);
if (!pools[i].granularity ||
(pools[i].granularity == NO_VAL64)) {
if (first_run || !found_pool)
log_flag(BURST_BUF, "Granularity cannot be zero. Setting granularity to 1 for pool %s",
pool_ptr->name);
pools[i].granularity = 1;
}
if (pools[i].quantity == NO_VAL64) {
if (first_run || !found_pool)
log_flag(BURST_BUF, "Quantity unset for pool %s, setting to zero",
pool_ptr->name);
pools[i].quantity = 0;
}
pool_ptr->total_space = pools[i].quantity *
pools[i].granularity;
pool_ptr->granularity = pools[i].granularity;
/*
* Set unfree space. We use pool_ptr->used_space to track
* usage of pools within Slurm and this plugin also always
* updates pool_ptr->unfree_space at the same time. But we
* have unfree_space as a way for the burst buffer API to say
* that something external to Slurm is using space, or as a
* way to not allow some space to be used.
*/
if (pools[i].free != NO_VAL64) {
if (pools[i].quantity >= pools[i].free) {
pool_ptr->unfree_space =
pools[i].quantity - pools[i].free;
pool_ptr->unfree_space *= pools[i].granularity;
} else {
error("Underflow on pool=%s: Free space=%"PRIu64" bigger than quantity=%"PRIu64", setting free space equal to quantity",
pools[i].name, pools[i].free,
pools[i].quantity);
pool_ptr->unfree_space = 0;
}
} else if (!found_pool) {
/*
* Free space not specified. This is a new pool since
* found_pool==false, so set unfree space to 0. Don't
* change unfree space for pools that already exist if
* it wasn't specified.
*/
pool_ptr->unfree_space = 0;
}
}
pool_ptr = bb_state.bb_config.pool_ptr;
for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
if (bit_test(pools_bitmap, j) || (pool_ptr->total_space == 0)) {
if (have_new_pools)
log_flag(BURST_BUF, "Pool name=%s, granularity=%"PRIu64", total_space=%"PRIu64", used_space=%"PRIu64", unfree_space=%"PRIu64,
pool_ptr->name, pool_ptr->granularity,
pool_ptr->total_space,
pool_ptr->used_space,
pool_ptr->unfree_space);
continue;
}
error("Pool %s is no longer reported by the system, setting size to zero",
pool_ptr->name);
pool_ptr->total_space = 0;
pool_ptr->used_space = 0;
pool_ptr->unfree_space = 0;
}
first_run = false;
slurm_mutex_unlock(&bb_state.bb_mutex);
FREE_NULL_BITMAP(pools_bitmap);
_bb_free_pools(pools, num_pools);
return SLURM_SUCCESS;
}
/*
* Set a new job state reason if needed and update the job state reason
* description. Use WAIT_NO_REASON to indicate that no update is needed.
*/
static void _set_job_state_desc(job_record_t *job_ptr, int new_state,
const char *fmt, ...)
{
va_list ap;
char *buf;
if (new_state != WAIT_NO_REASON) {
job_ptr->state_reason = new_state;
}
xfree(job_ptr->state_desc);
va_start(ap, fmt);
buf = vxstrfmt(fmt, ap);
va_end(ap);
if (!job_ptr->priority) {
/*
* Make it clear that the job is held, in addition to the
* rest of the reason.
*/
xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
plugin_type, job_state_reason_string(WAIT_HELD),
buf);
} else {
xstrfmtcat(job_ptr->state_desc, "%s: %s", plugin_type, buf);
}
xfree(buf);
}
static void _fail_stage(stage_args_t *stage_args, const char *op,
int rc, char *resp_msg)
{
uint32_t job_id = stage_args->job_id;
bb_job_t *bb_job = NULL;
job_record_t *job_ptr = NULL;
slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
error("%s for JobId=%u failed, status=%d, response=%s.",
op, job_id, rc, resp_msg);
trigger_burst_buffer();
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(job_id);
if (!job_ptr) {
error("%s: Could not find JobId=%u", __func__, job_id);
goto fini;
}
bb_update_system_comment(job_ptr, (char *) op, resp_msg, 0);
job_ptr->priority = 0; /* Hold job */
_set_job_state_desc(job_ptr, FAIL_BURST_BUFFER_OP,
"%s failed: %s", op, resp_msg);
if (bb_state.bb_config.flags & BB_FLAG_TEARDOWN_FAILURE) {
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_TEARDOWN);
_queue_teardown(job_ptr->job_id,
job_ptr->user_id,
stage_args->hurry,
job_ptr->group_id);
}
fini:
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
}
static int _run_post_run(stage_args_t *stage_args, init_argv_f_t init_argv,
const char *op, uint32_t job_id, uint32_t timeout,
char **resp_msg)
{
job_record_t *job_ptr = NULL;
bb_job_t *bb_job = NULL;
slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
if (_run_lua_stage_script(stage_args, init_argv, op, job_id, timeout,
resp_msg) != SLURM_SUCCESS)
return SLURM_ERROR;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_args->job_id);
} else {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = _get_bb_job(job_ptr);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGING_OUT);
slurm_mutex_unlock(&bb_state.bb_mutex);
}
unlock_slurmctld(job_write_lock);
return SLURM_SUCCESS;
}
static int _run_test_data_inout(stage_args_t *stage_args,
init_argv_f_t init_argv, const char *op,
uint32_t job_id, uint32_t timeout,
char **resp_msg)
{
time_t start_time = time(NULL);
while (true) {
bool term_flag;
slurm_mutex_lock(&bb_state.term_mutex);
term_flag = bb_state.term_flag;
slurm_mutex_unlock(&bb_state.term_mutex);
if (term_flag)
return SLURM_ERROR;
if (_run_lua_stage_script(stage_args, init_argv, op, job_id,
timeout, resp_msg) != SLURM_SUCCESS)
return SLURM_ERROR;
if (!xstrcasecmp(*resp_msg, SLURM_BB_BUSY)) {
uint64_t elapsed_time = time(NULL) - start_time;
/* Use the timeout for the entire polling period */
if (elapsed_time >= timeout) {
log_flag(BURST_BUF, "%s: Polling exceeded time limit of %u seconds",
op, timeout);
_fail_stage(stage_args, op, SLURM_ERROR,
"Poll exceeded time limit");
return SLURM_ERROR;
}
/* time_t is a long int */
log_flag(BURST_BUF, "%s: Poll elapsed time=%"PRIu64", timeout=%u seconds",
op, elapsed_time, timeout);
bb_sleep(&bb_state, bb_state.bb_config.poll_interval);
xfree(*resp_msg);
continue;
}
break;
}
return SLURM_SUCCESS;
}
static void *_start_stage_out(void *x)
{
stage_args_t *stage_out_args = x;
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
job_record_t *job_ptr;
bb_job_t *bb_job = NULL;
bb_func_t stage_out_ops[] = {
{
.init_argv = _init_data_in_argv, /* Same as data_in */
.op_type = SLURM_BB_POST_RUN,
.run_func = _run_post_run,
.timeout = bb_state.bb_config.other_timeout,
},
{
.init_argv = _init_data_in_argv, /* Same as data_in */
.op_type = SLURM_BB_DATA_OUT,
.run_func = _run_lua_stage_script,
.timeout = bb_state.bb_config.stage_out_timeout,
},
{
.init_argv = _init_data_in_argv, /* Same as data in */
.op_type = SLURM_BB_TEST_DATA_OUT,
.run_func = _run_test_data_inout,
.timeout = bb_state.bb_config.stage_out_timeout,
},
};
stage_out_args->hurry = false;
if (_run_stage_ops(stage_out_ops, ARRAY_SIZE(stage_out_ops),
stage_out_args) != SLURM_SUCCESS)
goto fini;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_out_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_out_args->job_id);
} else {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = _get_bb_job(job_ptr);
job_state_unset_flag(job_ptr, JOB_STAGE_OUT);
xfree(job_ptr->state_desc);
last_job_update = time(NULL);
log_flag(BURST_BUF, "Stage-out/post-run complete for %pJ",
job_ptr);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_TEARDOWN);
_queue_teardown(stage_out_args->job_id,
stage_out_args->uid, false,
stage_out_args->gid);
slurm_mutex_unlock(&bb_state.bb_mutex);
}
unlock_slurmctld(job_write_lock);
fini:
xfree(stage_out_args->job_script);
xfree(stage_out_args);
return NULL;
}
static void _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
{
stage_args_t *stage_out_args;
stage_out_args = xmalloc(sizeof *stage_out_args);
stage_out_args->job_id = bb_job->job_id;
stage_out_args->uid = bb_job->user_id;
stage_out_args->gid = job_ptr->group_id;
stage_out_args->job_script = bb_handle_job_script(job_ptr, bb_job);
slurm_thread_create_detached(_start_stage_out, stage_out_args);
}
static void _pre_queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
{
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_POST_RUN);
job_state_set_flag(job_ptr, JOB_STAGE_OUT);
_set_job_state_desc(job_ptr, WAIT_NO_REASON, "Stage-out in progress");
_queue_stage_out(job_ptr, bb_job);
}
static void _load_state(bool init_config)
{
uint32_t timeout;
timeout = bb_state.bb_config.other_timeout;
if (_load_pools(timeout) != SLURM_SUCCESS)
return;
bb_state.last_load_time = time(NULL);
if (!init_config)
return;
/* Load allocated burst buffers from state files. */
_recover_bb_state();
_apply_limits();
bb_state.last_update_time = time(NULL);
return;
}
/* Perform periodic background activities */
static void *_bb_agent(void *args)
{
while (!bb_state.term_flag) {
bb_sleep(&bb_state, AGENT_INTERVAL);
if (!bb_state.term_flag) {
_load_state(false); /* Has own locking */
}
_save_bb_state(); /* Has own locks excluding file write */
}
/* Wait for lua threads to finish, then save state once more. */
while (_get_lua_thread_cnt())
usleep(100000); /* 100 ms */
_save_bb_state();
return NULL;
}
/*
* Copy a batch job's burst_buffer options into a separate buffer.
* Merge continued lines into a single line.
*/
static int _xlate_batch(job_desc_msg_t *job_desc)
{
char *script, *save_ptr = NULL, *tok;
bool is_cont = false, has_space = false;
int len, rc = SLURM_SUCCESS;
xassert(directive_str);
/*
* Any command line --bb options get added to the script
*/
if (job_desc->burst_buffer) {
run_command_add_to_script(&job_desc->script,
job_desc->burst_buffer);
xfree(job_desc->burst_buffer);
}
script = xstrdup(job_desc->script);
tok = strtok_r(script, "\n", &save_ptr);
while (tok) {
if (tok[0] != '#')
break; /* Quit at first non-comment */
if (xstrncmp(tok + 1, directive_str, directive_len)) {
/* Skip lines without a burst buffer directive. */
is_cont = false;
} else {
if (is_cont) {
/*
* Continuation of the previous line. Add to
* the previous line without the newline and
* without repeating the directive.
*/
tok += directive_len + 1; /* Add 1 for '#' */
while (has_space && isspace(tok[0]))
tok++; /* Skip extra spaces */
} else if (job_desc->burst_buffer) {
xstrcat(job_desc->burst_buffer, "\n");
}
len = strlen(tok);
if (tok[len - 1] == '\\') {
/* Next line is a continuation of this line. */
has_space = isspace(tok[len - 2]);
tok[len - 1] = '\0';
is_cont = true;
} else {
is_cont = false;
}
xstrcat(job_desc->burst_buffer, tok);
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(script);
if (rc != SLURM_SUCCESS)
xfree(job_desc->burst_buffer);
return rc;
}
/*
* Given a request size and a pool name, return the required buffer size
* (rounded up by granularity). If no pool name is given then return 0.
*/
static uint64_t _set_granularity(uint64_t orig_size, char *bb_pool)
{
burst_buffer_pool_t *pool_ptr;
int i;
if (!bb_pool)
return 0;
for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
if (!xstrcmp(bb_pool, pool_ptr->name)) {
if (!pool_ptr->granularity) {
/*
* This should never happen if we initialize
* the pools correctly, so if this error happens
* it means we initialized the pool wrong.
* This avoids a divide by 0 error.
*/
error("%s: Invalid granularity of 0 for pool %s. Setting granularity=1.",
__func__, pool_ptr->name);
pool_ptr->granularity = 1;
}
return bb_granularity(orig_size, pool_ptr->granularity);
}
}
debug("Could not find pool %s", bb_pool);
return orig_size;
}
/*
* IN tok - a line in a burst buffer specification containing "capacity="
* IN capacity_ptr - pointer to the first character after "capacity=" within tok
* OUT pool - return a malloc'd string of the pool name, caller is responsible
* to free
* OUT size - return the number specified after "capacity="
*/
static int _parse_capacity(char *tok, char *capacity_ptr, char **pool,
uint64_t *size)
{
char *sub_tok;
xassert(size);
xassert(pool);
*size = bb_get_size_num(capacity_ptr, 1);
if ((sub_tok = strstr(tok, "pool="))) {
*pool = xstrdup(sub_tok + 5);
sub_tok = strchr(*pool, ' ');
if (sub_tok)
sub_tok[0] = '\0';
} else {
error("%s: Must specify pool with capacity for burst buffer",
plugin_type);
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
/* Perform basic burst_buffer option validation */
static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
uid_t submit_uid)
{
char *bb_script, *save_ptr = NULL;
char *capacity;
char *tok;
int rc = SLURM_SUCCESS;
bool have_bb = false;
xassert(bb_size);
*bb_size = 0;
if (!directive_str) {
error("%s: We don't have a directive! Can't parse burst buffer request",
__func__);
return SLURM_ERROR;
}
/*
* Combine command line options with script, and copy the script to
* job_desc->burst_buffer.
*/
if (job_desc->script)
rc = _xlate_batch(job_desc);
if ((rc != SLURM_SUCCESS) || (!job_desc->burst_buffer))
return rc;
/*
* Now validate that burst buffer was requested and get the pool and
* size if specified.
*/
bb_script = xstrdup(job_desc->burst_buffer);
tok = strtok_r(bb_script, "\n", &save_ptr);
while (tok) {
if (tok[0] != '#')
break; /* Quit at first non-comment */
tok++; /* Skip '#' */
if (xstrncmp(tok, directive_str, directive_len)) {
/* Skip lines without a burst buffer directive. */
tok = strtok_r(NULL, "\n", &save_ptr);
continue;
}
/*
* We only require that the directive is here.
* Specifying a pool is optional. Any other needed validation
* can be done by the burst_buffer.lua script.
*/
have_bb = true;
tok += directive_len; /* Skip the directive string. */
while (isspace(tok[0]))
tok++;
if ((capacity = strstr(tok, "capacity="))) {
char *tmp_pool = NULL;
uint64_t tmp_cnt = 0;
/*
* Lock bb_mutex since we iterate through pools in
* bb_state in bb_valid_pool_test() and
* _set_granularity().
*/
slurm_mutex_lock(&bb_state.bb_mutex);
if ((rc = _parse_capacity(tok, capacity + 9, &tmp_pool,
&tmp_cnt) != SLURM_SUCCESS)) {
have_bb = false;
} else if (tmp_cnt == 0) {
error("%s: Invalid capacity (must be greater than 0) in burst buffer line:%s",
plugin_type, tok);
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
} else if (!bb_valid_pool_test(&bb_state, tmp_pool)) {
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
} else
*bb_size += _set_granularity(tmp_cnt, tmp_pool);
slurm_mutex_unlock(&bb_state.bb_mutex);
xfree(tmp_pool);
if (rc != SLURM_SUCCESS)
break;
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(bb_script);
if (!have_bb)
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
return rc;
}
/* Note: bb_mutex is locked on entry */
static bb_job_t *_get_bb_job(job_record_t *job_ptr)
{
char *bb_specs;
char *save_ptr = NULL, *sub_tok, *tok;
bool have_bb = false;
uint16_t new_bb_state;
bb_job_t *bb_job;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return NULL;
if ((bb_job = bb_job_find(&bb_state, job_ptr->job_id)))
return bb_job; /* Cached data */
if (!directive_str) {
error("%s: We don't have a directive! Can't parse burst buffer request",
__func__);
return NULL;
}
bb_job = bb_job_alloc(&bb_state, job_ptr->job_id);
bb_job->account = xstrdup(job_ptr->account);
if (job_ptr->part_ptr)
bb_job->partition = xstrdup(job_ptr->part_ptr->name);
if (job_ptr->qos_ptr)
bb_job->qos = xstrdup(job_ptr->qos_ptr->name);
new_bb_state = job_ptr->burst_buffer_state ?
bb_state_num(job_ptr->burst_buffer_state) : BB_STATE_PENDING;
bb_set_job_bb_state(job_ptr, bb_job, new_bb_state);
bb_job->user_id = job_ptr->user_id;
bb_specs = xstrdup(job_ptr->burst_buffer);
tok = strtok_r(bb_specs, "\n", &save_ptr);
while (tok) {
/* Skip lines that don't have a burst buffer directive. */
if ((tok[0] != '#') ||
xstrncmp(tok + 1, directive_str, directive_len)) {
tok = strtok_r(NULL, "\n", &save_ptr);
continue;
}
/*
* We only require that the directive is here.
* Specifying a pool is optional. Any other needed validation
* can be done by the burst_buffer.lua script.
*/
have_bb = true;
/*
* Is % symbol replacement required? Only done on lines with
* directive_str
*/
if (strchr(tok, (int) '%'))
bb_job->need_symbol_replacement = true;
tok += directive_len + 1; /* Add 1 for the '#' character. */
while (isspace(tok[0]))
tok++;
if ((sub_tok = strstr(tok, "capacity="))) {
char *tmp_pool = NULL;
uint64_t tmp_cnt = 0;
if (_parse_capacity(tok, sub_tok + 9, &tmp_pool,
&tmp_cnt) != SLURM_SUCCESS) {
have_bb = false;
xfree(tmp_pool);
break;
}
xfree(bb_job->job_pool);
bb_job->job_pool = tmp_pool;
tmp_cnt = _set_granularity(tmp_cnt, bb_job->job_pool);
bb_job->req_size += tmp_cnt;
bb_job->total_size += tmp_cnt;
bb_job->use_job_buf = true;
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(bb_specs);
if (!have_bb) {
job_ptr->priority = 0;
_set_job_state_desc(job_ptr, FAIL_BURST_BUFFER_OP,
"Invalid burst buffer spec (%s)");
info("Invalid burst buffer spec for %pJ (%s)",
job_ptr, job_ptr->burst_buffer);
bb_job_del(&bb_state, job_ptr->job_id);
return NULL;
}
if (slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF)
bb_job_log(&bb_state, bb_job);
return bb_job;
}
/* Validate burst buffer configuration */
static void _test_config(void)
{
/* 24-day max time limit. (2073600 seconds) */
static uint32_t max_timeout = (60 * 60 * 24 * 24);
if (bb_state.bb_config.get_sys_state) {
error("%s: found get_sys_state which is unused in this plugin, unsetting",
plugin_type);
xfree(bb_state.bb_config.get_sys_state);
}
if (bb_state.bb_config.get_sys_status) {
error("%s: found get_sys_status which is unused in this plugin, unsetting",
plugin_type);
xfree(bb_state.bb_config.get_sys_status);
}
if (bb_state.bb_config.flags & BB_FLAG_ENABLE_PERSISTENT) {
error("%s: found flags=EnablePersistent: persistent burst buffers don't exist in this plugin, setting DisablePersistent",
plugin_type);
bb_state.bb_config.flags &= (~BB_FLAG_ENABLE_PERSISTENT);
bb_state.bb_config.flags |= BB_FLAG_DISABLE_PERSISTENT;
}
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
error("%s: found flags=EmulateCray which is invalid for this plugin, unsetting",
plugin_type);
bb_state.bb_config.flags &= (~BB_FLAG_EMULATE_CRAY);
}
if (bb_state.bb_config.directive_str)
directive_str = bb_state.bb_config.directive_str;
else
directive_str = DEFAULT_DIRECTIVE_STR;
directive_len = strlen(directive_str);
if (bb_state.bb_config.default_pool) {
error("%s: found DefaultPool=%s, but DefaultPool is unused for this plugin, unsetting",
plugin_type, bb_state.bb_config.default_pool);
xfree(bb_state.bb_config.default_pool);
}
/*
* Burst buffer APIs that would use ValidateTimeout
* (slurm_bb_job_process and slurm_bb_paths) are actually called
* directly from slurmctld, not through SlurmScriptd. Because of this,
* they cannot be killed, so there is no timeout for them. Therefore,
* ValidateTimeout doesn't matter in this plugin.
*/
if (bb_state.bb_config.validate_timeout &&
(bb_state.bb_config.validate_timeout != DEFAULT_VALIDATE_TIMEOUT))
info("%s: ValidateTimeout is not used in this plugin, ignoring",
plugin_type);
/*
* Test time limits. In order to prevent overflow when converting
* the time limits in seconds to milliseconds (multiply by 1000),
* the maximum value for time limits is 2073600 seconds (24 days).
* 2073600 * 1000 is still less than the maximum 32-bit signed integer.
*/
if (bb_state.bb_config.other_timeout > max_timeout) {
error("%s: OtherTimeout=%u exceeds maximum allowed timeout=%u, setting OtherTimeout to maximum",
plugin_type, bb_state.bb_config.other_timeout,
max_timeout);
bb_state.bb_config.other_timeout = max_timeout;
}
if (bb_state.bb_config.stage_in_timeout > max_timeout) {
error("%s: StageInTimeout=%u exceeds maximum allowed timeout=%u, setting StageInTimeout to maximum",
plugin_type, bb_state.bb_config.stage_in_timeout,
max_timeout);
bb_state.bb_config.stage_in_timeout = max_timeout;
}
if (bb_state.bb_config.stage_out_timeout > max_timeout) {
error("%s: StageOutTimeout=%u exceeds maximum allowed timeout=%u, setting StageOutTimeout to maximum",
plugin_type, bb_state.bb_config.stage_out_timeout,
max_timeout);
bb_state.bb_config.stage_out_timeout = max_timeout;
}
}
extern int init(void)
{
int rc;
lua_State *L = NULL;
time_t lua_script_last_loaded = (time_t) 0;
if ((rc = slurm_lua_init()) != SLURM_SUCCESS)
return rc;
lua_script_path = get_extra_conf_path("burst_buffer.lua");
serializer_required(MIME_TYPE_JSON);
/*
* slurmscriptd calls bb_g_init() and then bb_g_run_script(). We only
* need to initialize lua to run the script. We don't want
* slurmscriptd to read from or write to the state save location, nor
* do we need slurmscriptd to load the configuration file.
*/
if (!running_in_slurmctld()) {
return SLURM_SUCCESS;
}
/* Check that the script can load successfully */
rc = slurm_lua_loadscript(&L, "burst_buffer/lua",
lua_script_path, req_fxns,
&lua_script_last_loaded, _loadscript_extra,
NULL);
if (rc != SLURM_SUCCESS)
return rc;
else
lua_close(L);
slurm_mutex_init(&lua_thread_mutex);
slurm_mutex_init(&bb_state.bb_mutex);
slurm_mutex_init(&bb_state.term_mutex);
slurm_mutex_lock(&bb_state.bb_mutex);
bb_load_config(&bb_state, (char *)plugin_type); /* Removes "const" */
_test_config();
log_flag(BURST_BUF, "");
bb_alloc_cache(&bb_state);
slurm_thread_create(&bb_state.bb_thread, _bb_agent, NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
extern void fini(void)
{
int thread_cnt, last_thread_cnt = 0;
/*
* Signal threads to stop. _bb_agent will do one more state save after
* all threads have completed.
*/
slurm_mutex_lock(&bb_state.term_mutex);
if (bb_state.term_flag) {
slurm_mutex_unlock(&bb_state.term_mutex);
return;
}
bb_state.term_flag = true;
slurm_cond_broadcast(&bb_state.term_cond);
slurm_mutex_unlock(&bb_state.term_mutex);
/* Wait for all running scripts to finish. */
while ((thread_cnt = _get_lua_thread_cnt())) {
if ((last_thread_cnt != 0) && (thread_cnt != last_thread_cnt))
info("Waiting for %d lua script threads", thread_cnt);
last_thread_cnt = thread_cnt;
usleep(100000); /* 100 ms */
}
log_flag(BURST_BUF, "");
if (bb_state.bb_thread) {
slurm_thread_join(bb_state.bb_thread);
bb_state.bb_thread = 0;
}
slurm_mutex_lock(&bb_state.bb_mutex);
bb_clear_config(&bb_state.bb_config, true);
bb_clear_cache(&bb_state);
slurm_mutex_unlock(&bb_state.bb_mutex);
slurm_mutex_destroy(&lua_thread_mutex);
slurm_lua_fini();
xfree(lua_script_path);
}
static void _free_orphan_alloc_rec(void *x)
{
bb_alloc_t *rec = (bb_alloc_t *)x;
bb_limit_rem(rec->user_id, rec->size, rec->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, rec);
}
/*
* This function should only be called from _purge_vestigial_bufs().
* We need to reset the burst buffer state and restart any threads that may
* have been running before slurmctld was shutdown, depending on the state
* that the burst buffer is in.
*/
static void _recover_job_bb(job_record_t *job_ptr, bb_alloc_t *bb_alloc,
time_t defer_time, list_t *orphan_rec_list)
{
bb_job_t *bb_job;
uint16_t job_bb_state = bb_state_num(job_ptr->burst_buffer_state);
/*
* Call _get_bb_job() to create a cache of the job's burst buffer info,
* including the state. Lots of functions will call this so do it now to
* create the cache, and we may need to change the burst buffer state.
* The job burst buffer state is set in job_ptr and in bb_job.
*/
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
/* This shouldn't happen. */
error("%s: %pJ does not have a burst buffer specification, tearing down vestigial burst buffer.",
__func__, job_ptr);
_queue_teardown(bb_alloc->job_id, bb_alloc->user_id, false,
bb_alloc->group_id);
return;
}
switch(job_bb_state) {
/*
* First 4 states are specific to persistent burst buffers,
* which aren't used in burst_buffer/lua.
*/
case BB_STATE_ALLOCATING:
case BB_STATE_ALLOCATED:
case BB_STATE_DELETING:
case BB_STATE_DELETED:
error("%s: Unexpected burst buffer state %s for %pJ",
__func__, job_ptr->burst_buffer_state, job_ptr);
break;
/* Pending states for jobs: */
case BB_STATE_STAGING_IN:
case BB_STATE_STAGED_IN:
case BB_STATE_ALLOC_REVOKE:
/*
* We do not know the state of staging,
* so teardown the buffer and defer the job
* for at least 60 seconds (for the teardown).
* Also set the burst buffer state back to PENDING.
*/
log_flag(BURST_BUF, "Purging buffer for pending %pJ",
job_ptr);
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, true,
bb_alloc->group_id);
if (job_ptr->details &&
(job_ptr->details->begin_time < defer_time)) {
job_ptr->details->begin_time = defer_time;
}
break;
/* Running states for jobs: */
case BB_STATE_PRE_RUN:
/*
* slurmctld will call bb_g_job_begin() which will
* handle burst buffers in this state.
*/
break;
case BB_STATE_RUNNING:
case BB_STATE_SUSPEND:
/* Nothing to do here. */
break;
case BB_STATE_POST_RUN:
case BB_STATE_STAGING_OUT:
case BB_STATE_STAGED_OUT:
log_flag(BURST_BUF, "Restarting burst buffer stage out for %pJ",
job_ptr);
/*
* _pre_queue_stage_out() sets the burst buffer state
* correctly and restarts the needed thread.
*/
_pre_queue_stage_out(job_ptr, bb_job);
break;
case BB_STATE_TEARDOWN:
case BB_STATE_TEARDOWN_FAIL:
log_flag(BURST_BUF, "Restarting burst buffer teardown for %pJ",
job_ptr);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, false,
bb_alloc->group_id);
break;
case BB_STATE_COMPLETE:
/*
* We shouldn't get here since the bb_alloc record is
* removed when the job's bb state is set to
* BB_STATE_COMPLETE during teardown.
*/
log_flag(BURST_BUF, "Clearing burst buffer for completed job %pJ",
job_ptr);
list_append(orphan_rec_list, bb_alloc);
break;
default:
error("%s: Invalid job burst buffer state %s for %pJ",
__func__, job_ptr->burst_buffer_state, job_ptr);
break;
}
}
/*
* Identify and purge any vestigial buffers (i.e. we have a job buffer, but
* the matching job is either gone or completed OR we have a job buffer and a
* pending job, but don't know the status of stage-in)
*/
static void _purge_vestigial_bufs(void)
{
list_t *orphan_rec_list = list_create(_free_orphan_alloc_rec);
bb_alloc_t *bb_alloc = NULL;
time_t defer_time = time(NULL) + 60;
int i;
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
job_record_t *job_ptr = NULL;
if (bb_alloc->job_id == 0) {
/* This should not happen */
error("Burst buffer without a job found, removing buffer.");
list_append(orphan_rec_list, bb_alloc);
} else if (!(job_ptr =
find_job_record(bb_alloc->job_id))) {
info("Purging vestigial buffer for JobId=%u",
bb_alloc->job_id);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, false,
bb_alloc->group_id);
} else {
_recover_job_bb(job_ptr, bb_alloc, defer_time,
orphan_rec_list);
}
bb_alloc = bb_alloc->next;
}
}
FREE_NULL_LIST(orphan_rec_list);
}
static bool _is_directive(char *tok)
{
xassert(directive_str);
if ((tok[0] == '#') && !xstrncmp(tok + 1, directive_str, directive_len))
return true;
return false;
}
extern char *bb_p_build_het_job_script(char *script, uint32_t het_job_offset)
{
return bb_common_build_het_job_script(script, het_job_offset,
_is_directive);
}
/*
* Return the total burst buffer size in MB
*/
extern uint64_t bb_p_get_system_size(void)
{
uint64_t size = 0;
/*
* Add up the space of all the pools.
* Don't add bb_state.total_space - it is always zero since we don't
* use DefaultPool in this plugin.
* Even though the pools in this plugin are really unitless and can
* be used for a lot more than just "bytes", we have to convert to MB
* to satisfy the burst buffer plugin API.
*/
slurm_mutex_lock(&bb_state.bb_mutex);
for (int i = 0; i < bb_state.bb_config.pool_cnt; i++) {
size += bb_state.bb_config.pool_ptr[i].total_space;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
size /= (1024 * 1024); /* to MB */
return size;
}
/*
* Load the current burst buffer state (e.g. how much space is available now).
* Run at the beginning of each scheduling cycle in order to recognize external
* changes to the burst buffer state (e.g. capacity is added, removed, fails,
* etc.)
*
* init_config IN - true if called as part of slurmctld initialization
* Returns a Slurm errno.
*/
extern int bb_p_load_state(bool init_config)
{
if (!init_config)
return SLURM_SUCCESS;
log_flag(BURST_BUF, "");
_load_state(init_config); /* Has own locking */
slurm_mutex_lock(&bb_state.bb_mutex);
bb_set_tres_pos(&bb_state);
_purge_vestigial_bufs();
slurm_mutex_unlock(&bb_state.bb_mutex);
_save_bb_state(); /* Has own locks excluding file write */
return SLURM_SUCCESS;
}
/*
* Return string containing current burst buffer status
* argc IN - count of status command arguments
* argv IN - status command arguments
* uid - authenticated UID
* gid - authenticated GID
* RET status string, release memory using xfree()
*/
extern char *bb_p_get_status(uint32_t argc, char **argv, uint32_t uid,
uint32_t gid)
{
char **pass_argv;
char *status_resp = NULL;
uint32_t pass_argc;
run_lua_args_t run_lua_args;
pass_argc = argc + 2;
pass_argv = xcalloc(pass_argc + 1, sizeof(char *));
pass_argv[0] = xstrdup_printf("%u", uid);
pass_argv[1] = xstrdup_printf("%u", gid);
for (int i = 0; i < argc; i++)
pass_argv[i + 2] = xstrdup(argv[i]);
memset(&run_lua_args, 0, sizeof run_lua_args);
run_lua_args.argc = pass_argc;
run_lua_args.argv = pass_argv;
run_lua_args.lua_func = req_fxns[SLURM_BB_GET_STATUS];
run_lua_args.resp_msg = &status_resp;
run_lua_args.timeout = bb_state.bb_config.other_timeout;
run_lua_args.with_scriptd = true;
if (_run_lua_script_wrapper(&run_lua_args) != SLURM_SUCCESS) {
xfree(status_resp);
status_resp = xstrdup("Error running slurm_bb_get_status\n");
}
xfree_array(pass_argv);
return status_resp;
}
/*
* Note configuration may have changed. Handle changes in BurstBufferParameters.
*
* Returns a Slurm errno.
*/
extern int bb_p_reconfig(void)
{
/* read locks on assoc */
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK, .user = READ_LOCK };
assoc_mgr_lock(&assoc_locks);
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "");
bb_load_config(&bb_state, (char *) plugin_type); /* Remove "const" */
_test_config();
/* reconfig is the place we make sure the pointers are correct */
for (int i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc_t *bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
_set_assoc_mgr_ptrs(bb_alloc);
bb_alloc = bb_alloc->next;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
assoc_mgr_unlock(&assoc_locks);
return SLURM_SUCCESS;
}
/*
* Pack current burst buffer state information for network transmission to
* user (e.g. "scontrol show burst")
*
* Returns a Slurm errno.
*/
extern int bb_p_state_pack(uid_t uid, buf_t *buffer, uint16_t protocol_version)
{
uint32_t rec_count = 0;
slurm_mutex_lock(&bb_state.bb_mutex);
packstr(bb_state.name, buffer);
bb_pack_state(&bb_state, buffer, protocol_version);
if (((bb_state.bb_config.flags & BB_FLAG_PRIVATE_DATA) == 0) ||
validate_operator(uid))
uid = 0; /* User can see all data */
rec_count = bb_pack_bufs(uid, &bb_state, buffer, protocol_version);
(void) bb_pack_usage(uid, &bb_state, buffer, protocol_version);
log_flag(BURST_BUF, "record_count:%u", rec_count);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/*
* Preliminary validation of a job submit request with respect to burst buffer
* options. Performed after setting default account + qos, but prior to
* establishing job ID or creating script file.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_validate(job_desc_msg_t *job_desc, uid_t submit_uid,
char **err_msg)
{
uint64_t bb_size = 0;
int rc;
xassert(job_desc);
xassert(job_desc->tres_req_cnt);
xassert(err_msg);
rc = _parse_bb_opts(job_desc, &bb_size, submit_uid);
if (rc != SLURM_SUCCESS)
return rc;
if ((job_desc->burst_buffer == NULL) ||
(job_desc->burst_buffer[0] == '\0'))
return rc;
log_flag(BURST_BUF, "job_user_id:%u, submit_uid:%u",
job_desc->user_id, submit_uid);
log_flag(BURST_BUF, "burst_buffer:\n%s",
job_desc->burst_buffer);
if (job_desc->user_id == 0) {
info("User root can not allocate burst buffers");
*err_msg = xstrdup("User root can not allocate burst buffers");
return ESLURM_BURST_BUFFER_PERMISSION;
}
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.bb_config.allow_users) {
bool found_user = false;
for (int i = 0; bb_state.bb_config.allow_users[i]; i++) {
if (job_desc->user_id ==
bb_state.bb_config.allow_users[i]) {
found_user = true;
break;
}
}
if (!found_user) {
*err_msg = xstrdup("User not found in AllowUsers");
rc = ESLURM_BURST_BUFFER_PERMISSION;
goto fini;
}
}
if (bb_state.bb_config.deny_users) {
bool found_user = false;
for (int i = 0; bb_state.bb_config.deny_users[i]; i++) {
if (job_desc->user_id ==
bb_state.bb_config.deny_users[i]) {
found_user = true;
break;
}
}
if (found_user) {
*err_msg = xstrdup("User found in DenyUsers");
rc = ESLURM_BURST_BUFFER_PERMISSION;
goto fini;
}
}
if (bb_state.tres_pos > 0) {
job_desc->tres_req_cnt[bb_state.tres_pos] =
bb_size / (1024 * 1024);
}
fini:
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Secondary validation of a job submit request with respect to burst buffer
* options. Performed after establishing job ID and creating script file.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_validate2(job_record_t *job_ptr, char **err_msg)
{
const char *lua_func_name = req_fxns[SLURM_BB_JOB_PROCESS];
uint32_t argc;
char **argv;
int rc = SLURM_SUCCESS, fd = -1, hash_inx;
char *hash_dir = NULL, *job_dir = NULL, *script_file = NULL;
char *task_script_file = NULL, *resp_msg = NULL;
bool using_master_script = false;
bb_job_t *bb_job;
run_lua_args_t run_lua_args;
/* Initialization */
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.last_load_time == 0) {
/* Assume request is valid for now, can't test it anyway */
info("Burst buffer down, skip tests for %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
bb_job = _get_bb_job(job_ptr);
if (bb_job == NULL) {
/* No burst buffer specification */
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
if (job_ptr->details->min_nodes == 0) {
/*
* Since persistent burst buffers aren't allowed in this
* plugin, 0-node jobs are never allowed to have burst buffers.
*/
slurm_mutex_unlock(&bb_state.bb_mutex);
return ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
log_flag(BURST_BUF, "%pJ", job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
/* Standard file location for job arrays */
if ((job_ptr->array_task_id != NO_VAL) &&
(job_ptr->array_job_id != job_ptr->job_id)) {
hash_inx = job_ptr->array_job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir,
job_ptr->array_job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(script_file, "%s/script", job_dir);
fd = open(script_file, 0);
if (fd >= 0) { /* found the script */
close(fd);
using_master_script = true;
} else {
xfree(hash_dir);
}
} else {
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(script_file, "%s/script", job_dir);
if (job_ptr->batch_flag == 0) {
rc = bb_build_bb_script(job_ptr, script_file);
if (rc != SLURM_SUCCESS) {
/*
* There was an error writing to the script,
* and that error was logged by
* bb_build_bb_script(). Bail out now.
*/
goto fini;
}
}
}
/* Run "job_process" function, validates user script */
argc = 3;
argv = xcalloc(argc + 1, sizeof(char *));
argv[0] = xstrdup_printf("%s", script_file);
argv[1] = xstrdup_printf("%u", job_ptr->user_id);
argv[2] = xstrdup_printf("%u", job_ptr->group_id);
memset(&run_lua_args, 0, sizeof run_lua_args);
run_lua_args.argc = argc;
run_lua_args.argv = argv;
run_lua_args.get_job_ptr = true;
run_lua_args.have_job_lock = true;
run_lua_args.job_id = job_ptr->job_id;
run_lua_args.job_ptr = job_ptr;
run_lua_args.lua_func = lua_func_name;
run_lua_args.resp_msg = &resp_msg;
rc = _run_lua_script_wrapper(&run_lua_args);
xfree_array(argv);
if (rc) {
if (err_msg && resp_msg) {
xfree(*err_msg);
xstrfmtcat(*err_msg, "%s: %s",
plugin_type, resp_msg);
}
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
xfree(resp_msg);
fini:
/* Clean up */
xfree(hash_dir);
xfree(job_dir);
if (rc != SLURM_SUCCESS) {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job_del(&bb_state, job_ptr->job_id);
slurm_mutex_unlock(&bb_state.bb_mutex);
} else if (using_master_script) {
/*
* Job arrays need to have script file in the "standard"
* location for the remaining logic. Make hard link.
*/
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
xfree(hash_dir);
(void) mkdir(job_dir, 0700);
xstrfmtcat(task_script_file, "%s/script", job_dir);
xfree(job_dir);
if ((link(script_file, task_script_file) != 0) &&
(errno != EEXIST)) {
error("%s: link(%s,%s): %m",
__func__, script_file, task_script_file);
}
}
xfree(task_script_file);
xfree(script_file);
return rc;
}
/*
* Fill in the tres_cnt (in MB) based off the job record
* NOTE: Based upon job-specific burst buffers, excludes persistent buffers
* IN job_ptr - job record
* IN/OUT tres_cnt - fill in this already allocated array with tres_cnts
* IN locked - if the assoc_mgr tres read locked is locked or not
*/
extern void bb_p_job_set_tres_cnt(job_record_t *job_ptr, uint64_t *tres_cnt,
bool locked)
{
bb_job_t *bb_job;
if (!tres_cnt) {
error("No tres_cnt given when looking at %pJ",
job_ptr);
}
if (bb_state.tres_pos < 0) {
/* BB not defined in AccountingStorageTRES */
return;
}
slurm_mutex_lock(&bb_state.bb_mutex);
if ((bb_job = _get_bb_job(job_ptr))) {
tres_cnt[bb_state.tres_pos] =
bb_job->total_size / (1024 * 1024);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
}
/*
* For a given job, return our best guess if when it might be able to start
*/
extern time_t bb_p_job_get_est_start(job_record_t *job_ptr)
{
time_t est_start = time(NULL);
bb_job_t *bb_job;
int rc;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return est_start;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE))) {
/* Can't operate on job array. Guess 5 minutes. */
est_start += 300;
return est_start;
}
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.last_load_time == 0) {
/*
* The plugin hasn't successfully loaded yet, so we can't know.
* Guess 1 hour.
*/
est_start += 3600;
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start;
}
if (!(bb_job = _get_bb_job(job_ptr))) {
/* No bb_job record; we can't know. */
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start;
}
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_job->state == BB_STATE_PENDING) {
if (bb_job->job_pool && bb_job->req_size)
rc = bb_test_size_limit(job_ptr, bb_job, &bb_state,
NULL);
else
rc = 0;
if (rc == 0) { /* Could start now. */
;
} else if (rc == 1) { /* Exceeds configured limits */
est_start += 365 * 24 * 60 * 60;
} else {
est_start = MAX(est_start, bb_state.next_end_time);
}
} else {
/* Allocation or staging in progress, guess 1 minute from now */
est_start++;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start;
}
/*
* If the job (x) should be allocated a burst buffer, add it to the
* job_candidates list (arg).
*/
static int _identify_bb_candidate(void *x, void *arg)
{
job_record_t *job_ptr = (job_record_t *) x;
list_t *job_candidates = arg;
bb_job_t *bb_job;
bb_job_queue_rec_t *job_rec;
if (!IS_JOB_PENDING(job_ptr) || (job_ptr->start_time == 0) ||
(job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return SLURM_SUCCESS;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE)))
return SLURM_SUCCESS; /* Can't operate on job array struct */
bb_job = _get_bb_job(job_ptr);
if (bb_job == NULL)
return SLURM_SUCCESS;
if (bb_job->state == BB_STATE_COMPLETE) {
/* Job requeued or slurmctld restarted during stage-in */
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_PENDING);
} else if (bb_job->state >= BB_STATE_POST_RUN) {
/* Requeued job still staging out */
return SLURM_SUCCESS;
}
job_rec = xmalloc(sizeof(bb_job_queue_rec_t));
job_rec->job_ptr = job_ptr;
job_rec->bb_job = bb_job;
list_push(job_candidates, job_rec);
return SLURM_SUCCESS;
}
/*
* Purge files we have created for the job.
* bb_state.bb_mutex is locked on function entry.
* job_ptr may be NULL if not found
*/
static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr)
{
char *hash_dir = NULL, *job_dir = NULL;
char *script_file = NULL, *path_file = NULL;
int hash_inx;
hash_inx = job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(path_file, "%s/pathfile", job_dir);
(void) unlink(path_file);
xfree(path_file);
if (!job_ptr || (job_ptr->batch_flag == 0)) {
xstrfmtcat(script_file, "%s/script", job_dir);
(void) unlink(script_file);
xfree(script_file);
}
(void) unlink(job_dir);
xfree(job_dir);
xfree(hash_dir);
}
static void *_start_teardown(void *x)
{
int rc, retry_count = 0;
uint32_t timeout, argc;
char *resp_msg = NULL;
char **argv;
bool track_script_signal = false;
stage_args_t *teardown_args = x;
job_record_t *job_ptr;
bb_alloc_t *bb_alloc = NULL;
bb_job_t *bb_job = NULL;
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
run_lua_args_t run_lua_args;
argc = 5;
argv = xcalloc(argc + 1, sizeof(char *)); /* NULL-terminated */
argv[0] = xstrdup_printf("%u", teardown_args->job_id);
argv[1] = xstrdup_printf("%s", teardown_args->job_script);
argv[2] = xstrdup_printf("%s", teardown_args->hurry ? "true" : "false");
argv[3] = xstrdup_printf("%u", teardown_args->uid);
argv[4] = xstrdup_printf("%u", teardown_args->gid);
timeout = bb_state.bb_config.other_timeout;
memset(&run_lua_args, 0, sizeof run_lua_args);
run_lua_args.argc = argc;
run_lua_args.argv = argv;
run_lua_args.job_id = teardown_args->job_id;
run_lua_args.lua_func = req_fxns[SLURM_BB_JOB_TEARDOWN];
run_lua_args.resp_msg = &resp_msg;
run_lua_args.timeout = timeout;
run_lua_args.track_script_signal = &track_script_signal;
run_lua_args.with_scriptd = true;
/* Run lua "teardown" function */
while (1) {
rc = _run_lua_script_wrapper(&run_lua_args);
if (track_script_signal) {
/* Killed by slurmctld, exit now. */
info("teardown for JobId=%u terminated by slurmctld",
teardown_args->job_id);
goto fini;
}
if (rc != SLURM_SUCCESS) {
int sleep_time = 10; /* Arbitrary time */
/*
* To prevent an infinite loop of teardown failures,
* limit the number of times we retry teardown and
* sleep in between tries.
* Give up trying teardown if it fails after retrying
* a certain number of times.
*/
trigger_burst_buffer();
if (retry_count >= MAX_RETRY_CNT) {
error("Teardown for JobId=%u failed %d times. We won't retry teardown anymore. Removing burst buffer.",
teardown_args->job_id, retry_count);
break;
} else {
error("Teardown for JobId=%u failed. status: %d, response: %s. Retrying after %d seconds. Current retry count=%d, max retries=%d",
teardown_args->job_id, rc, resp_msg,
sleep_time, retry_count, MAX_RETRY_CNT);
retry_count++;
lock_slurmctld(job_write_lock);
job_ptr =
find_job_record(teardown_args->job_id);
if (job_ptr) {
_set_job_state_desc(
job_ptr, FAIL_BURST_BUFFER_OP,
"teardown failed: %s",
resp_msg);
bb_update_system_comment(job_ptr,
"teardown",
resp_msg, 0);
}
unlock_slurmctld(job_write_lock);
sleep(sleep_time);
}
} else {
break; /* Success, break out of loop */
}
}
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(teardown_args->job_id);
_purge_bb_files(teardown_args->job_id, job_ptr);
if (job_ptr) {
if ((bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))){
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
}
if ((bb_job = _get_bb_job(job_ptr)))
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_COMPLETE);
job_state_unset_flag(job_ptr, JOB_STAGE_OUT);
if (!IS_JOB_PENDING(job_ptr) && /* No email if requeue */
(job_ptr->mail_type & MAIL_JOB_STAGE_OUT)) {
mail_job_info(job_ptr, MAIL_JOB_STAGE_OUT);
job_ptr->mail_type &= (~MAIL_JOB_STAGE_OUT);
}
} else {
/*
* This will happen when slurmctld restarts and needs
* to clear vestigial buffers
*/
char buf_name[32];
snprintf(buf_name, sizeof(buf_name), "%u",
teardown_args->job_id);
bb_alloc = bb_find_name_rec(buf_name,
teardown_args->uid,
&bb_state);
if (bb_alloc) {
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
fini:
xfree(resp_msg);
xfree(teardown_args->job_script);
xfree(teardown_args);
xfree_array(argv);
return NULL;
}
static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry,
uint32_t group_id)
{
char *hash_dir = NULL, *job_script = NULL;
int hash_inx = job_id % 10;
struct stat buf;
stage_args_t *teardown_args;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
xstrfmtcat(job_script, "%s/job.%u/script", hash_dir, job_id);
if (stat(job_script, &buf) == -1) {
int fd = creat(job_script, 0755);
if (fd >= 0) {
int len;
char *dummy_script = "#!/bin/bash\nexit 0\n";
len = strlen(dummy_script) + 1;
if (write(fd, dummy_script, len) != len) {
verbose("%s: write(%s): %m",
__func__, job_script);
}
close(fd);
}
}
teardown_args = xmalloc(sizeof *teardown_args);
teardown_args->job_id = job_id;
teardown_args->uid = user_id;
teardown_args->gid = group_id;
teardown_args->job_script = job_script;
teardown_args->hurry = hurry;
slurm_thread_create_detached(_start_teardown, teardown_args);
xfree(hash_dir);
}
static int _run_real_size(stage_args_t *stage_args, init_argv_f_t init_argv,
const char *op, uint32_t job_id, uint32_t timeout,
char **resp_msg)
{
bool get_real_size = false;
uint64_t real_size = 0;
bb_job_t *bb_job = NULL;
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = bb_job_find(&bb_state, stage_args->job_id);
if (bb_job && bb_job->req_size)
get_real_size = true;
slurm_mutex_unlock(&bb_state.bb_mutex);
if (!get_real_size)
return SLURM_SUCCESS;
if (_run_lua_stage_script(stage_args, init_argv, op, job_id, timeout,
resp_msg) != SLURM_SUCCESS)
return SLURM_ERROR;
if (*resp_msg && (**resp_msg != '\0')) {
char *end_ptr = NULL;
real_size = strtoull(*resp_msg, &end_ptr, 10);
if ((real_size == ULLONG_MAX) || (end_ptr[0] != '\0')) {
error("%s return value=\"%s\" is invalid, discarding result",
op, *resp_msg);
real_size = 0;
}
stage_args->bb_size = real_size;
}
return SLURM_SUCCESS;
}
static void _init_real_size_argv(stage_args_t *stage_args, int *argc_p,
char ***argv_p)
{
char **argv = NULL;
int argc;
argc = 3;
argv = xcalloc(argc + 1, sizeof(char *)); /* NULL terminated */
argv[0] = xstrdup_printf("%u", stage_args->job_id);
argv[1] = xstrdup_printf("%u", stage_args->uid);
argv[2] = xstrdup_printf("%u", stage_args->gid);
*argc_p = argc;
*argv_p = argv;
}
static void _init_data_in_argv(stage_args_t *stage_args, int *argc_p,
char ***argv_p)
{
char **argv = NULL;
int argc;
argc = 4;
argv = xcalloc(argc + 1, sizeof (char *)); /* NULL-terminated */
argv[0] = xstrdup_printf("%u", stage_args->job_id);
argv[1] = xstrdup_printf("%s", stage_args->job_script);
argv[2] = xstrdup_printf("%u", stage_args->uid);
argv[3] = xstrdup_printf("%u", stage_args->gid);
*argc_p = argc;
*argv_p = argv;
}
static void _init_setup_argv(stage_args_t *stage_args, int *argc_p,
char ***argv_p)
{
char **argv = NULL;
int argc;
argc = 6;
argv = xcalloc(argc + 1, sizeof (char *)); /* NULL-terminated */
argv[0] = xstrdup_printf("%u", stage_args->job_id);
argv[1] = xstrdup_printf("%u", stage_args->uid);
argv[2] = xstrdup_printf("%u", stage_args->gid);
argv[3] = xstrdup_printf("%s", stage_args->pool);
argv[4] = xstrdup_printf("%"PRIu64, stage_args->bb_size);
argv[5] = xstrdup_printf("%s", stage_args->job_script);
*argc_p = argc;
*argv_p = argv;
}
static void *_start_stage_in(void *x)
{
stage_args_t *stage_in_args = x;
uint64_t real_size = 0;
uint64_t orig_real_size = stage_in_args->bb_size;
job_record_t *job_ptr;
slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
bb_func_t stage_in_ops[] = {
{
.init_argv = _init_setup_argv,
.op_type = SLURM_BB_SETUP,
.run_func = _run_lua_stage_script,
.timeout = bb_state.bb_config.other_timeout,
},
{
.init_argv = _init_data_in_argv,
.op_type = SLURM_BB_DATA_IN,
.run_func = _run_lua_stage_script,
.timeout = bb_state.bb_config.stage_in_timeout,
},
{
.init_argv = _init_data_in_argv, /* Same as data in */
.op_type = SLURM_BB_TEST_DATA_IN,
.run_func = _run_test_data_inout,
.timeout = bb_state.bb_config.stage_in_timeout,
},
{
.init_argv = _init_real_size_argv,
.op_type = SLURM_BB_REAL_SIZE,
.run_func = _run_real_size,
.timeout = bb_state.bb_config.stage_in_timeout,
},
};
stage_in_args->hurry = true;
if (_run_stage_ops(stage_in_ops, ARRAY_SIZE(stage_in_ops),
stage_in_args) != SLURM_SUCCESS)
goto fini;
real_size = stage_in_args->bb_size; /* Updated by _run_real_size */
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(stage_in_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_in_args->job_id);
} else {
bb_job_t *bb_job;
bb_alloc_t *bb_alloc = NULL;
bb_job = bb_job_find(&bb_state, stage_in_args->job_id);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
if (bb_job && bb_job->total_size) {
/*
* Adjust total size to real size if real size
* returns something bigger.
*/
if (real_size > bb_job->req_size) {
log_flag(BURST_BUF, "%pJ total_size increased from %"PRIu64" to %"PRIu64,
job_ptr, bb_job->req_size, real_size);
bb_job->total_size = real_size;
bb_limit_rem(stage_in_args->uid,
orig_real_size,
stage_in_args->pool, &bb_state);
/* Restore limit based upon actual size. */
bb_limit_add(stage_in_args->uid,
bb_job->total_size,
stage_in_args->pool, &bb_state,
true);
}
bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
if (bb_alloc) {
if (bb_alloc->size != bb_job->total_size) {
/*
* bb_alloc is state saved, so we need
* to update bb_alloc in case slurmctld
* restarts.
*/
bb_alloc->size = bb_job->total_size;
bb_state.last_update_time = time(NULL);
}
} else {
error("unable to find bb_alloc record for %pJ",
job_ptr);
}
}
log_flag(BURST_BUF, "Setup/stage-in complete for %pJ", job_ptr);
queue_job_scheduler();
}
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
fini:
xfree(stage_in_args->job_script);
xfree(stage_in_args->pool);
xfree(stage_in_args);
return NULL;
}
static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job)
{
char *hash_dir = NULL, *job_dir = NULL;
int hash_inx = job_ptr->job_id % 10;
stage_args_t *stage_in_args;
bb_alloc_t *bb_alloc = NULL;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
stage_in_args = xmalloc(sizeof *stage_in_args);
stage_in_args->job_id = job_ptr->job_id;
stage_in_args->uid = job_ptr->user_id;
stage_in_args->gid = job_ptr->group_id;
if (bb_job->job_pool)
stage_in_args->pool = xstrdup(bb_job->job_pool);
else
stage_in_args->pool = NULL;
stage_in_args->bb_size = bb_job->total_size;
stage_in_args->job_script = bb_handle_job_script(job_ptr, bb_job);
/*
* Create bb allocation for the job now. Check if it has already been
* created (perhaps it was created but then slurmctld restarted).
* bb_alloc is the structure that is state saved.
* If we wait until the _start_stage_in thread to create bb_alloc,
* we introduce a race condition where the thread could be killed
* (if slurmctld is shut down) before the thread creates
* bb_alloc. That race would mean the burst buffer isn't state saved.
*/
if (!(bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))) {
bb_alloc = bb_alloc_job(&bb_state, job_ptr, bb_job);
bb_alloc->create_time = time(NULL);
}
bb_limit_add(job_ptr->user_id, bb_job->total_size, bb_job->job_pool,
&bb_state, true);
slurm_thread_create_detached(_start_stage_in, stage_in_args);
xfree(hash_dir);
xfree(job_dir);
return SLURM_SUCCESS;
}
static void _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
bool job_ready)
{
log_flag(BURST_BUF, "start job allocate %pJ", job_ptr);
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_STAGING_IN);
_queue_stage_in(job_ptr, bb_job);
}
static int _try_alloc_job_bb(void *x, void *arg)
{
bb_job_queue_rec_t *job_rec = (bb_job_queue_rec_t *) x;
job_record_t *job_ptr = job_rec->job_ptr;
bb_job_t *bb_job = job_rec->bb_job;
int rc;
if (bb_job->state >= BB_STATE_STAGING_IN)
return SLURM_SUCCESS; /* Job was already allocated a buffer */
if (bb_job->job_pool && bb_job->req_size)
rc = bb_test_size_limit(job_ptr, bb_job, &bb_state, NULL);
else
rc = 0;
if (rc == 0) {
/*
* Job could start now. Allocate burst buffer and continue to
* the next job.
*/
_alloc_job_bb(job_ptr, bb_job, true);
rc = SLURM_SUCCESS;
} else if (rc == 1) /* Exceeds configured limits, try next job */
rc = SLURM_SUCCESS;
else /* No space currently available, break out of loop */
rc = SLURM_ERROR;
return rc;
}
/*
* Attempt to allocate resources and begin file staging for pending jobs.
*/
extern int bb_p_job_try_stage_in(list_t *job_queue)
{
list_t *job_candidates;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "Mutex locked");
if (bb_state.last_load_time == 0) {
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/* Identify candidates to be allocated burst buffers */
job_candidates = list_create(xfree_ptr);
list_for_each(job_queue, _identify_bb_candidate, job_candidates);
/* Sort in order of expected start time */
list_sort(job_candidates, bb_job_queue_sort);
/* Try to allocate burst buffers for these jobs. */
list_for_each(job_candidates, _try_alloc_job_bb, NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
FREE_NULL_LIST(job_candidates);
return SLURM_SUCCESS;
}
/*
* Determine if a job's burst buffer stage-in is complete
* job_ptr IN - Job to test
* test_only IN - If false, then attempt to allocate burst buffer if possible
*
* RET: 0 - stage-in is underway
* 1 - stage-in complete
* -1 - stage-in not started or burst buffer in some unexpected state
*/
extern int bb_p_job_test_stage_in(job_record_t *job_ptr, bool test_only)
{
bb_job_t *bb_job = NULL;
int rc = 1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE)))
return -1; /* Can't operate on job array structure */
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ test_only:%d",
job_ptr, (int) test_only);
if (bb_state.last_load_time != 0)
bb_job = _get_bb_job(job_ptr);
if (bb_job && (bb_job->state == BB_STATE_COMPLETE))
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_PENDING); /* job requeued */
if (bb_job == NULL) {
rc = -1;
} else if (bb_job->state < BB_STATE_STAGING_IN) {
/* Job buffer not allocated, create now if space available */
rc = -1;
if (test_only)
goto fini;
if (bb_job->job_pool && bb_job->req_size) {
if ((bb_test_size_limit(job_ptr, bb_job, &bb_state,
NULL) == 0)) {
_alloc_job_bb(job_ptr, bb_job, false);
rc = 0; /* Setup/stage-in in progress */
}
} else {
_alloc_job_bb(job_ptr, bb_job, false);
rc = 0; /* Setup/stage-in in progress */
}
} else if (bb_job->state == BB_STATE_STAGING_IN) {
rc = 0;
} else if (bb_job->state == BB_STATE_STAGED_IN) {
rc = 1;
} else {
rc = -1; /* Requeued job still staging in */
}
fini:
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/* Add key=value pairs from file_path to the job's environment */
static void _update_job_env(job_record_t *job_ptr, char *file_path)
{
struct stat stat_buf;
char *data_buf = NULL, *start, *sep;
int path_fd, i, inx = 0, env_cnt = 0;
ssize_t read_size;
/* Read the environment variables file */
path_fd = open(file_path, 0);
if (path_fd == -1) {
error("open error on file %s: %m",
file_path);
return;
}
fd_set_close_on_exec(path_fd);
if (fstat(path_fd, &stat_buf) == -1) {
error("stat error on file %s: %m",
file_path);
stat_buf.st_size = 2048;
} else if (stat_buf.st_size == 0)
goto fini;
data_buf = xmalloc_nz(stat_buf.st_size + 1);
while (inx < stat_buf.st_size) {
read_size = read(path_fd, data_buf + inx, stat_buf.st_size);
if (read_size < 0)
data_buf[inx] = '\0';
else
data_buf[inx + read_size] = '\0';
if (read_size > 0) {
inx += read_size;
} else if (read_size == 0) { /* EOF */
break;
} else if (read_size < 0) { /* error */
if ((errno == EAGAIN) || (errno == EINTR))
continue;
error("read error on file %s: %m",
file_path);
break;
}
}
log_flag(BURST_BUF, "%s", data_buf);
/* Get count of environment variables in the file */
env_cnt = 0;
if (data_buf) {
for (i = 0; data_buf[i]; i++) {
if (data_buf[i] == '=')
env_cnt++;
}
}
/* Add to supplemental environment variables (in job record) */
if (env_cnt) {
xrecalloc(job_ptr->details->env_sup,
MAX(job_ptr->details->env_cnt + env_cnt, 1 + env_cnt),
sizeof(char *));
start = data_buf;
for (i = 0; (i < env_cnt) && start[0]; i++) {
sep = strchr(start, '\n');
if (sep)
sep[0] = '\0';
job_ptr->details->env_sup[job_ptr->details->env_cnt++] =
xstrdup(start);
if (sep)
start = sep + 1;
else
break;
}
}
fini: xfree(data_buf);
close(path_fd);
}
/* Kill job from CONFIGURING state */
static void _kill_job(job_record_t *job_ptr, bool hold_job, char *resp_msg)
{
last_job_update = time(NULL);
job_ptr->end_time = last_job_update;
if (hold_job)
job_ptr->priority = 0;
build_cg_bitmap(job_ptr);
job_ptr->exit_code = 1;
_set_job_state_desc(job_ptr, FAIL_BURST_BUFFER_OP,
"%s failed: %s", req_fxns[SLURM_BB_PRE_RUN],
resp_msg);
job_state_set(job_ptr, JOB_REQUEUE);
job_completion_logger(job_ptr, true);
job_state_set(job_ptr, (JOB_PENDING | JOB_COMPLETING));
deallocate_nodes(job_ptr, false, false, false);
}
static void *_start_pre_run(void *x)
{
int rc;
uint32_t timeout, argc;
bool nodes_ready = false, run_kill_job = false, hold_job = false;
bool track_script_signal = false;
char *resp_msg = NULL;
const char *op;
char **argv;
bb_job_t *bb_job = NULL;
job_record_t *job_ptr;
/* Locks: read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
stage_args_t *pre_run_args = x;
run_lua_args_t run_lua_args;
argc = 4;
argv = xcalloc(argc + 1, sizeof (char *)); /* NULL-terminated */
argv[0] = xstrdup_printf("%u", pre_run_args->job_id);
argv[1] = xstrdup_printf("%s", pre_run_args->job_script);
argv[2] = xstrdup_printf("%u", pre_run_args->uid);
argv[3] = xstrdup_printf("%u", pre_run_args->gid);
/* Wait for node boot to complete. */
while (!nodes_ready) {
lock_slurmctld(job_read_lock);
job_ptr = find_job_record(pre_run_args->job_id);
if (!job_ptr || IS_JOB_COMPLETED(job_ptr)) {
unlock_slurmctld(job_read_lock);
goto fini;
}
if (test_job_nodes_ready(job_ptr))
nodes_ready = true;
unlock_slurmctld(job_read_lock);
if (!nodes_ready)
sleep(60);
}
timeout = bb_state.bb_config.other_timeout;
op = req_fxns[SLURM_BB_PRE_RUN];
memset(&run_lua_args, 0, sizeof run_lua_args);
run_lua_args.argc = argc;
run_lua_args.argv = argv;
run_lua_args.get_job_ptr = true;
run_lua_args.job_id = pre_run_args->job_id;
run_lua_args.lua_func = op;
run_lua_args.resp_msg = &resp_msg;
run_lua_args.timeout = timeout;
run_lua_args.track_script_signal = &track_script_signal;
run_lua_args.with_scriptd = true;
rc = _run_lua_script_wrapper(&run_lua_args);
if (track_script_signal) {
/* Killed by slurmctld, exit now. */
info("%s for JobId=%u terminated by slurmctld",
op, pre_run_args->job_id);
goto fini;
}
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(pre_run_args->job_id);
if (job_ptr)
bb_job = _get_bb_job(job_ptr);
if (rc != SLURM_SUCCESS) {
/* pre_run failure */
trigger_burst_buffer();
error("%s failed for JobId=%u", op, pre_run_args->job_id);
if (job_ptr) {
bb_update_system_comment(job_ptr, "pre_run", resp_msg,
0);
if (IS_JOB_RUNNING(job_ptr))
run_kill_job = true;
if (bb_job) {
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_TEARDOWN);
if (bb_job->retry_cnt++ > MAX_RETRY_CNT)
hold_job = true;
}
}
_queue_teardown(pre_run_args->job_id, pre_run_args->uid, true,
pre_run_args->gid);
} else if (bb_job) {
/* pre_run success and the job's BB record exists */
if (bb_job->state == BB_STATE_ALLOC_REVOKE)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
else
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_RUNNING);
}
if (job_ptr) {
if (run_kill_job)
job_state_unset_flag(job_ptr, JOB_CONFIGURING);
prolog_running_decr(job_ptr);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
if (run_kill_job) {
/* bb_mutex must be unlocked before calling this */
_kill_job(job_ptr, hold_job, resp_msg);
}
unlock_slurmctld(job_write_lock);
fini:
xfree(resp_msg);
xfree(pre_run_args->job_script);
xfree(pre_run_args);
xfree_array(argv);
return NULL;
}
/* Attempt to claim burst buffer resources.
* At this time, bb_g_job_test_stage_in() should have been run successfully AND
* the compute nodes selected for the job.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_begin(job_record_t *job_ptr)
{
char *path_file = NULL;
char *job_dir = NULL, *resp_msg = NULL, *job_script = NULL;
int hash_inx = job_ptr->job_id % 10;
int rc = SLURM_SUCCESS;
uint32_t argc;
char **argv;
bb_job_t *bb_job;
stage_args_t *pre_run_args;
run_lua_args_t run_lua_args;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return SLURM_SUCCESS;
if (!job_ptr->job_resrcs || !job_ptr->job_resrcs->nodes) {
error("%pJ lacks node allocation",
job_ptr);
return SLURM_ERROR;
}
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not start %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
error("no job record buffer for %pJ", job_ptr);
_set_job_state_desc(job_ptr, FAIL_BURST_BUFFER_OP,
"Could not find burst buffer record");
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true,
job_ptr->group_id);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
xstrfmtcat(job_dir, "%s/hash.%d/job.%u",
slurm_conf.state_save_location, hash_inx, job_ptr->job_id);
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_PRE_RUN);
slurm_mutex_unlock(&bb_state.bb_mutex);
xstrfmtcat(job_script, "%s/script", job_dir);
/* Create an empty "path" file which can be used by lua. */
xstrfmtcat(path_file, "%s/path", job_dir);
bb_write_file(path_file, "");
/* Initialize args and run the "paths" function. */
argc = 5;
argv = xcalloc(argc + 1, sizeof (char *)); /* NULL-terminated */
argv[0] = xstrdup_printf("%u", job_ptr->job_id);
argv[1] = xstrdup_printf("%s", job_script);
argv[2] = xstrdup_printf("%s", path_file);
argv[3] = xstrdup_printf("%u", job_ptr->user_id);
argv[4] = xstrdup_printf("%u", job_ptr->group_id);
memset(&run_lua_args, 0, sizeof run_lua_args);
run_lua_args.argc = argc;
run_lua_args.argv = argv;
run_lua_args.get_job_ptr = true;
run_lua_args.have_job_lock = true;
run_lua_args.job_id = job_ptr->job_id;
run_lua_args.job_ptr = job_ptr;
run_lua_args.lua_func = req_fxns[SLURM_BB_PATHS];
run_lua_args.resp_msg = &resp_msg;
run_lua_args.timeout = 0;
rc = _run_lua_script_wrapper(&run_lua_args);
/* resp_msg already logged by _run_lua_script. */
xfree(resp_msg);
xfree_array(argv);
if (rc != SLURM_SUCCESS) {
error("paths for %pJ failed", job_ptr);
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
goto fini;
} else {
_update_job_env(job_ptr, path_file);
}
/* Setup for the "pre_run" function. */
pre_run_args = xmalloc(sizeof *pre_run_args);
pre_run_args->job_id = job_ptr->job_id;
pre_run_args->job_script = job_script; /* Point at malloc'd string */
job_script = NULL; /* Avoid two variables pointing at the same string */
pre_run_args->uid = job_ptr->user_id;
pre_run_args->gid = job_ptr->group_id;
if (job_ptr->details) { /* Defer launch until completion */
job_ptr->details->prolog_running++;
job_state_set_flag(job_ptr, JOB_CONFIGURING);
}
slurm_thread_create_detached(_start_pre_run, pre_run_args);
fini:
xfree(job_script);
xfree(path_file);
xfree(job_dir);
return rc;
}
/* Revoke allocation, but do not release resources.
* Executed after bb_p_job_begin() if there was an allocation failure.
* Does not release previously allocated resources.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_revoke_alloc(job_record_t *job_ptr)
{
bb_job_t *bb_job = NULL;
int rc = SLURM_SUCCESS;
slurm_mutex_lock(&bb_state.bb_mutex);
if (job_ptr)
bb_job = _get_bb_job(job_ptr);
if (bb_job) {
if (bb_job->state == BB_STATE_RUNNING)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
else if (bb_job->state == BB_STATE_PRE_RUN)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOC_REVOKE);
} else {
rc = SLURM_ERROR;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Trigger a job's burst buffer stage-out to begin
*
* Returns a Slurm errno.
*/
extern int bb_p_job_start_stage_out(job_record_t *job_ptr)
{
int rc = SLURM_SUCCESS;
bb_job_t *bb_job;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return SLURM_SUCCESS;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not stage out %pJ",
job_ptr);
rc = SLURM_ERROR;
goto fini;
}
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
/* No job buffers. */
error("%pJ bb job record not found", job_ptr);
rc = SLURM_ERROR;
goto fini;
} else if (bb_job->state < BB_STATE_RUNNING) {
/* Job never started. Just teardown the buffer */
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true,
job_ptr->group_id);
} else if (bb_job->state < BB_STATE_POST_RUN) {
_pre_queue_stage_out(job_ptr, bb_job);
}
fini:
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Determine if a job's burst buffer post_run operation is complete
*
* RET: 0 - post_run is underway
* 1 - post_run complete
* -1 - fatal error
*/
extern int bb_p_job_test_post_run(job_record_t *job_ptr)
{
bb_job_t *bb_job;
int rc = -1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not post_run %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return -1;
}
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (!bb_job) {
error("%pJ bb job record not found, assuming post run is complete",
job_ptr);
rc = 1;
} else {
if (bb_job->state < BB_STATE_POST_RUN) {
rc = -1;
} else if (bb_job->state > BB_STATE_POST_RUN) {
rc = 1;
} else {
rc = 0;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Determine if a job's burst buffer stage-out is complete
*
* RET: 0 - stage-out is underway
* 1 - stage-out complete
* -1 - fatal error
*/
extern int bb_p_job_test_stage_out(job_record_t *job_ptr)
{
bb_job_t *bb_job;
int rc = -1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not stage-out %pJ", job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return -1;
}
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (!bb_job) {
/* This is expected if the burst buffer completed teardown */
rc = 1;
} else {
/*
* bb_g_job_test_stage_out() is called when purging old jobs
* from slurmctld and when testing for dependencies.
* We don't want the job to be purged until teardown is done
* (teardown happens right after stage_out). Once teardown is
* done the state will be BB_STATE_COMPLETE. We also free
* bb_job so it doesn't stay around forever.
*/
if (bb_job->state == BB_STATE_PENDING) {
/*
* No job BB work started before job was killed.
* Alternately slurmctld daemon restarted after the
* job's BB work was completed.
*/
rc = 1;
} else if (bb_job->state < BB_STATE_POST_RUN) {
rc = -1;
} else if (bb_job->state == BB_STATE_COMPLETE) {
bb_job_del(&bb_state, bb_job->job_id);
rc = 1;
} else {
rc = 0;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Terminate any file staging and completely release burst buffer resources
*
* Returns a Slurm errno.
*/
extern int bb_p_job_cancel(job_record_t *job_ptr)
{
bb_job_t *bb_job;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not cancel %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (!bb_job) {
/* Nothing ever allocated, nothing to clean up */
} else if (bb_job->state == BB_STATE_PENDING) {
bb_set_job_bb_state(job_ptr, bb_job, /* Nothing to clean up */
BB_STATE_COMPLETE);
} else if (bb_job->state == BB_STATE_COMPLETE) {
/* Teardown already done. */
} else {
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true,
job_ptr->group_id);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/*
* Run a script in the burst buffer plugin
*
* func IN - script function to run
* jobid IN - job id for which we are running the script (0 if not for a job)
* argc IN - number of arguments to pass to script
* argv IN - argument list to pass to script
* resp_msg OUT - string returned by script
*
* Returns the status of the script.
*/
extern int bb_p_run_script(char *func, uint32_t job_id, uint32_t argc,
char **argv, job_info_msg_t *job_info,
char **resp_msg)
{
return _start_lua_script(func, job_id, argc, argv, job_info, resp_msg);
}
/*
* Translate a burst buffer string to it's equivalent TRES string
* For example:
* "bb/lua=2M" -> "1004=2"
* Caller must xfree the return value
*/
extern char *bb_p_xlate_bb_2_tres_str(char *burst_buffer)
{
char *save_ptr = NULL, *sep, *tmp, *tok;
char *result = NULL;
uint64_t size, total = 0;
if (!burst_buffer || (bb_state.tres_id < 1))
return result;
tmp = xstrdup(burst_buffer);
tok = strtok_r(tmp, ",", &save_ptr);
while (tok) {
sep = strchr(tok, ':');
if (sep) {
if (!xstrncmp(tok, "lua:", 4))
tok += 4;
else
tok = NULL;
}
if (tok) {
uint64_t mb_xlate = 1024 * 1024;
size = bb_get_size_num(tok,
bb_state.bb_config.granularity);
total += ROUNDUP(size, mb_xlate);
}
tok = strtok_r(NULL, ",", &save_ptr);
}
xfree(tmp);
if (total)
xstrfmtcat(result, "%d=%"PRIu64, bb_state.tres_id, total);
return result;
}