blob: c84ff205762f03719499a8047628a82e1db2c673 [file] [log] [blame]
/*****************************************************************************\
* burst_buffer_datawarp.c - Plugin for managing a Cray DataWarp burst_buffer
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#define _GNU_SOURCE
#include <ctype.h>
#include <poll.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#if HAVE_JSON_C_INC
# include <json-c/json.h>
#elif HAVE_JSON_INC
# include <json/json.h>
#endif
#include "slurm/slurm.h"
#include "src/common/assoc_mgr.h"
#include "src/common/bitstring.h"
#include "src/common/fd.h"
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/pack.h"
#include "src/common/parse_config.h"
#include "src/common/run_command.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/state_save.h"
#include "src/common/timers.h"
#include "src/common/uid.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/node_scheduler.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/state_save.h"
#include "src/slurmctld/trigger_mgr.h"
#include "src/plugins/burst_buffer/common/burst_buffer_common.h"
#define _DEBUG 0 /* Detailed debugging information */
#define TIME_SLOP 60 /* Time allowed to synchronize operations between
* threads */
#define MAX_RETRY_CNT 2 /* Hold job if "pre_run" operation fails more than
* 2 times */
/* Script line types */
#define LINE_OTHER 0
#define LINE_BB 1
#define LINE_DW 2
/*
* These variables are required by the burst buffer plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it.
*
* plugin_name - a string giving a human-readable description of the
* plugin. There is no maximum length, but the symbol must refer to
* a valid string.
*
* plugin_type - a string suggesting the type of the plugin or its
* applicability to a particular form of data or method of data handling.
* If the low-level plugin API is used, the contents of this string are
* unimportant and may be anything. Slurm uses the higher-level plugin
* interface which requires this string to be of the form
*
* <application>/<method>
*
* where <application> is a description of the intended application of
* the plugin (e.g., "burst_buffer" for Slurm burst_buffer) and <method> is a
* description of how this plugin satisfies that application. Slurm will only
* load a burst_buffer plugin if the plugin_type string has a prefix of
* "burst_buffer/".
*
* plugin_version - an unsigned 32-bit integer containing the Slurm version
* (major.minor.micro combined into a single number).
*/
const char plugin_name[] = "burst_buffer datawarp plugin";
const char plugin_type[] = "burst_buffer/datawarp";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
/* Most state information is in a common structure so that we can more
* easily use common functions from multiple burst buffer plugins */
static bb_state_t bb_state;
static uint32_t last_persistent_id = 1;
/* These are defined here so when we link with something other than
* the slurmctld we will have these symbols defined. They will get
* overwritten when linking with the slurmctld.
*/
#if defined (__APPLE__)
extern uint16_t accounting_enforce __attribute__((weak_import));
extern void *acct_db_conn __attribute__((weak_import));
#else
uint16_t accounting_enforce = 0;
void *acct_db_conn = NULL;
#endif
/* Description of each Cray DW configuration entry
*/
typedef struct bb_configs {
uint32_t id;
uint32_t instance;
} bb_configs_t;
/* Description of each Cray DW instance entry, including persistent buffers
*/
typedef struct bb_instances {
uint32_t id;
uint64_t bytes;
uint32_t session;
} bb_instances_t;
/* Description of each Cray DW pool entry
*/
typedef struct bb_pools {
char *id;
char *units;
uint64_t granularity;
uint64_t quantity;
uint64_t free;
} bb_pools_t;
/* Description of each Cray DW pool entry
*/
typedef struct bb_sessions {
uint32_t created;
uint32_t id;
char *token;
bool used;
uint32_t user_id;
} bb_sessions_t;
typedef struct {
char **args;
uint32_t job_id;
uint32_t timeout;
uint32_t user_id;
} pre_run_args_t;
typedef struct {
char **args1;
char **args2;
uint64_t bb_size;
uint32_t job_id;
char *pool;
uint32_t user_id;
} stage_args_t;
typedef struct create_buf_data {
char *access; /* Access mode */
bool hurry; /* Set to destroy in a hurry (no stage-out) */
uint32_t job_id; /* Job ID to use */
char *job_script; /* Path to job script */
char *name; /* Name of the persistent burst buffer */
char *pool; /* Name of pool in which to create the buffer */
uint64_t size; /* Size in bytes */
char *type; /* Access type */
uint32_t user_id;
} create_buf_data_t;
#define BB_UNITS_BYTES 1
struct bb_total_size {
int units;
uint64_t capacity;
};
static int _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
bool job_ready);
static void _apply_limits(void);
static void * _bb_agent(void *args);
static void _bb_free_configs(bb_configs_t *ents, int num_ent);
static void _bb_free_instances(bb_instances_t *ents, int num_ent);
static void _bb_free_pools(bb_pools_t *ents, int num_ent);
static void _bb_free_sessions(bb_sessions_t *ents, int num_ent);
static bb_configs_t *_bb_get_configs(int *num_ent, bb_state_t *state_ptr,
uint32_t timeout);
static bb_instances_t *_bb_get_instances(int *num_ent, bb_state_t *state_ptr,
uint32_t timeout);
static bb_pools_t *_bb_get_pools(int *num_ent, bb_state_t *state_ptr,
uint32_t timeout);
static bb_sessions_t *_bb_get_sessions(int *num_ent, bb_state_t *state_ptr,
uint32_t timeout);
static int _create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
bool job_ready);
static void * _create_persistent(void *x);
static void * _destroy_persistent(void *x);
static void _free_create_args(create_buf_data_t *create_args);
static bb_job_t *_get_bb_job(job_record_t *job_ptr);
static bool _have_dw_cmd_opts(bb_job_t *bb_job);
static void _job_queue_del(void *x);
static bb_configs_t *_json_parse_configs_array(json_object *jobj, char *key,
int *num);
static bb_instances_t *_json_parse_instances_array(json_object *jobj, char *key,
int *num);
static struct bb_pools *_json_parse_pools_array(json_object *jobj, char *key,
int *num);
static struct bb_sessions *_json_parse_sessions_array(json_object *jobj,
char *key, int *num);
static void _json_parse_configs_object(json_object *jobj,
bb_configs_t *ent);
static void _json_parse_instances_object(json_object *jobj,
bb_instances_t *ent);
static void _json_parse_pools_object(json_object *jobj, bb_pools_t *ent);
static void _json_parse_sessions_object(json_object *jobj,
bb_sessions_t *ent);
static struct bb_total_size *_json_parse_real_size(json_object *j);
static void _log_script_argv(char **script_argv, char *resp_msg);
static void _load_state(bool init_config);
static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
uid_t submit_uid);
static void _parse_config_links(json_object *instance, bb_configs_t *ent);
static void _parse_instance_capacity(json_object *instance,
bb_instances_t *ent);
static void _parse_instance_links(json_object *instance,
bb_instances_t *ent);
static void _pick_alloc_account(bb_alloc_t *bb_alloc);
static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr);
static void _purge_vestigial_bufs(void);
static void _python2json(char *buf);
static void _recover_bb_state(void);
static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job);
static int _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job);
static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry);
static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
int new_state, uint64_t buf_size);
static void _save_bb_state(void);
static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc);
static void * _start_pre_run(void *x);
static void * _start_stage_in(void *x);
static void * _start_stage_out(void *x);
static void * _start_teardown(void *x);
static void _test_config(void);
static bool _test_persistent_use_ready(bb_job_t *bb_job,
job_record_t *job_ptr);
static void _timeout_bb_rec(void);
static int _xlate_batch(job_desc_msg_t *job_desc);
static int _xlate_interactive(job_desc_msg_t *job_desc);
/* Convert a Python string to real JSON format. Specifically replace single
* quotes with double quotes and strip leading "u" before the single quotes.
* See: https://github.com/stedolan/jq/issues/312 */
static void _python2json(char *buf)
{
bool quoted = false;
int i, o;
if (!buf)
return;
for (i = 0, o = 0; ; i++) {
if (buf[i] == '\'') {
buf[o++] = '\"';
quoted = !quoted;
} else if ((buf[i] == 'u') && (buf[i+1] == '\'') && !quoted) {
/* Skip over unicode flag */
} else {
buf[o++] = buf[i];
if (buf[i] == '\0')
break;
}
}
}
/* Log a command's arguments. */
static void _log_script_argv(char **script_argv, char *resp_msg)
{
char *cmd_line = NULL;
int i;
if (!(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF))
return;
for (i = 0; script_argv[i]; i++) {
if (i)
xstrcat(cmd_line, " ");
xstrcat(cmd_line, script_argv[i]);
}
info("%s", cmd_line);
if (resp_msg && resp_msg[0])
info("%s", resp_msg);
xfree(cmd_line);
}
static void _job_queue_del(void *x)
{
bb_job_queue_rec_t *job_rec = (bb_job_queue_rec_t *) x;
if (job_rec) {
xfree(job_rec);
}
}
/* Purge files we have created for the job.
* bb_state.bb_mutex is locked on function entry.
* job_ptr may be NULL if not found */
static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr)
{
char *hash_dir = NULL, *job_dir = NULL;
char *script_file = NULL, *path_file = NULL, *client_nids_file = NULL;
int hash_inx;
hash_inx = job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(client_nids_file, "%s/client_nids", job_dir);
(void) unlink(client_nids_file);
xfree(client_nids_file);
xstrfmtcat(path_file, "%s/pathfile", job_dir);
(void) unlink(path_file);
xfree(path_file);
if (!job_ptr || (job_ptr->batch_flag == 0)) {
xstrfmtcat(script_file, "%s/script", job_dir);
(void) unlink(script_file);
xfree(script_file);
}
(void) unlink(job_dir);
xfree(job_dir);
xfree(hash_dir);
}
/* Validate that our configuration is valid for this plugin type */
static void _test_config(void)
{
if (!bb_state.bb_config.get_sys_state) {
debug("GetSysState is NULL");
bb_state.bb_config.get_sys_state =
xstrdup("/opt/cray/dw_wlm/default/bin/dw_wlm_cli");
}
if (!bb_state.bb_config.get_sys_status) {
debug("GetSysStatus is NULL");
bb_state.bb_config.get_sys_status =
xstrdup("/opt/cray/dws/default/bin/dwstat");
}
}
/* Allocate resources to a job and begin setup/stage-in */
static int _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
bool job_ready)
{
int rc = SLURM_SUCCESS;
log_flag(BURST_BUF, "start job allocate %pJ",
job_ptr);
if (bb_job->buf_cnt &&
(_create_bufs(job_ptr, bb_job, job_ready) > 0))
return EAGAIN;
if (bb_job->state < BB_STATE_STAGING_IN) {
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_STAGING_IN);
rc = _queue_stage_in(job_ptr, bb_job);
if (rc != SLURM_SUCCESS) {
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(job_ptr->job_id, job_ptr->user_id,
true);
}
}
return rc;
}
/* Perform periodic background activities */
static void *_bb_agent(void *args)
{
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
while (!bb_state.term_flag) {
bb_sleep(&bb_state, AGENT_INTERVAL);
if (!bb_state.term_flag) {
_load_state(false); /* Has own locking */
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
_timeout_bb_rec();
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
}
_save_bb_state(); /* Has own locks excluding file write */
}
return NULL;
}
/* Given a request size and a pool name (or NULL name for default pool),
* return the required buffer size (rounded up by granularity) */
static uint64_t _set_granularity(uint64_t orig_size, char *bb_pool)
{
burst_buffer_pool_t *pool_ptr;
uint64_t new_size;
int i;
if (!bb_pool || !xstrcmp(bb_pool, bb_state.bb_config.default_pool)) {
new_size = bb_granularity(orig_size,
bb_state.bb_config.granularity);
return new_size;
}
for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
if (!xstrcmp(bb_pool, pool_ptr->name)) {
new_size = bb_granularity(orig_size,
pool_ptr->granularity);
return new_size;
}
}
debug("Could not find pool %s", bb_pool);
return orig_size;
}
/* Return the burst buffer size specification of a job
* RET size data structure or NULL of none found
* NOTE: delete return value using _del_bb_size() */
static bb_job_t *_get_bb_job(job_record_t *job_ptr)
{
char *bb_specs, *bb_hurry, *bb_name, *bb_type, *bb_access, *bb_pool;
char *end_ptr = NULL, *save_ptr = NULL, *sub_tok, *tok;
bool have_bb = false;
uint64_t tmp_cnt;
int inx;
bb_job_t *bb_job;
uint16_t new_bb_state;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return NULL;
if ((bb_job = bb_job_find(&bb_state, job_ptr->job_id)))
return bb_job; /* Cached data */
bb_job = bb_job_alloc(&bb_state, job_ptr->job_id);
bb_job->account = xstrdup(job_ptr->account);
if (job_ptr->part_ptr)
bb_job->partition = xstrdup(job_ptr->part_ptr->name);
if (job_ptr->qos_ptr)
bb_job->qos = xstrdup(job_ptr->qos_ptr->name);
new_bb_state = job_ptr->burst_buffer_state ?
bb_state_num(job_ptr->burst_buffer_state) : BB_STATE_PENDING;
bb_set_job_bb_state(job_ptr, bb_job, new_bb_state);
bb_job->user_id = job_ptr->user_id;
bb_specs = xstrdup(job_ptr->burst_buffer);
tok = strtok_r(bb_specs, "\n", &save_ptr);
while (tok) {
uint32_t bb_flag = 0;
if (tok[0] != '#') {
tok = strtok_r(NULL, "\n", &save_ptr);
continue;
}
if ((tok[1] == 'B') && (tok[2] == 'B'))
bb_flag = BB_FLAG_BB_OP;
else if ((tok[1] == 'D') && (tok[2] == 'W'))
bb_flag = BB_FLAG_DW_OP;
/*
* Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
* and destroy_persistent functions are directly supported by
* dw_wlm_cli. Support "#BB" format for backward compatibility.
*/
if (bb_flag != 0) {
tok += 3;
while (isspace(tok[0]))
tok++;
}
/*
* Is % symbol replacement required? Only done on "#DW" / "#BB"
* lines.
*/
if (bb_flag && strchr(tok, (int) '%'))
bb_job->need_symbol_replacement = true;
if (bb_flag == BB_FLAG_BB_OP) {
if (!xstrncmp(tok, "create_persistent", 17)) {
have_bb = true;
bb_access = NULL;
bb_name = NULL;
bb_pool = NULL;
bb_type = NULL;
if ((sub_tok = strstr(tok, "access_mode="))) {
bb_access = xstrdup(sub_tok + 12);
sub_tok = strchr(bb_access, ' ');
if (sub_tok)
sub_tok[0] = '\0';
} else if ((sub_tok = strstr(tok, "access="))) {
bb_access = xstrdup(sub_tok + 7);
sub_tok = strchr(bb_access, ' ');
if (sub_tok)
sub_tok[0] = '\0';
}
if ((sub_tok = strstr(tok, "capacity="))) {
tmp_cnt = bb_get_size_num(sub_tok+9, 1);
} else {
tmp_cnt = 0;
}
if ((sub_tok = strstr(tok, "name="))) {
bb_name = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_name, ' ');
if (sub_tok)
sub_tok[0] = '\0';
}
if ((sub_tok = strstr(tok, "pool="))) {
bb_pool = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_pool, ' ');
if (sub_tok)
sub_tok[0] = '\0';
} else {
bb_pool = xstrdup(
bb_state.bb_config.default_pool);
}
if ((sub_tok = strstr(tok, "type="))) {
bb_type = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_type, ' ');
if (sub_tok)
sub_tok[0] = '\0';
}
inx = bb_job->buf_cnt++;
bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
sizeof(bb_buf_t) *
bb_job->buf_cnt);
bb_job->buf_ptr[inx].access = bb_access;
bb_job->buf_ptr[inx].create = true;
bb_job->buf_ptr[inx].flags = bb_flag;
//bb_job->buf_ptr[inx].hurry = false;
bb_job->buf_ptr[inx].name = bb_name;
bb_job->buf_ptr[inx].pool = bb_pool;
tmp_cnt = _set_granularity(tmp_cnt, bb_pool);
bb_job->buf_ptr[inx].size = tmp_cnt;
bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
bb_job->buf_ptr[inx].type = bb_type;
//bb_job->buf_ptr[inx].use = false;
bb_job->persist_add += tmp_cnt;
} else if (!xstrncmp(tok, "destroy_persistent", 18)) {
have_bb = true;
bb_name = NULL;
if ((sub_tok = strstr(tok, "name="))) {
bb_name = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_name, ' ');
if (sub_tok)
sub_tok[0] = '\0';
}
/* if ((sub_tok = strstr(tok, "type="))) { */
/* bb_type = xstrdup(sub_tok + 5); */
/* sub_tok = strchr(bb_type, ' '); */
/* if (sub_tok) */
/* sub_tok[0] = '\0'; */
/* } */
bb_hurry = strstr(tok, "hurry");
inx = bb_job->buf_cnt++;
bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
sizeof(bb_buf_t) *
bb_job->buf_cnt);
//bb_job->buf_ptr[inx].access = NULL;
//bb_job->buf_ptr[inx].create = false;
bb_job->buf_ptr[inx].destroy = true;
bb_job->buf_ptr[inx].flags = bb_flag;
bb_job->buf_ptr[inx].hurry = (bb_hurry != NULL);
bb_job->buf_ptr[inx].name = bb_name;
//bb_job->buf_ptr[inx].pool = NULL;
//bb_job->buf_ptr[inx].size = 0;
bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
//bb_job->buf_ptr[inx].type = NULL;
//bb_job->buf_ptr[inx].use = false;
} else {
/* Ignore other (future) options */
}
}
if (bb_flag == BB_FLAG_DW_OP) {
if (!xstrncmp(tok, "jobdw", 5)) {
have_bb = true;
if ((sub_tok = strstr(tok, "capacity="))) {
tmp_cnt = bb_get_size_num(sub_tok+9, 1);
} else {
tmp_cnt = 0;
}
if ((sub_tok = strstr(tok, "pool="))) {
xfree(bb_job->job_pool);
bb_job->job_pool = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_job->job_pool, ' ');
if (sub_tok)
sub_tok[0] = '\0';
} else {
bb_job->job_pool = xstrdup(
bb_state.bb_config.default_pool);
}
tmp_cnt = _set_granularity(tmp_cnt,
bb_job->job_pool);
bb_job->req_size += tmp_cnt;
bb_job->total_size += tmp_cnt;
bb_job->use_job_buf = true;
} else if (!xstrncmp(tok, "persistentdw", 12)) {
/* Persistent buffer use */
have_bb = true;
bb_name = NULL;
if ((sub_tok = strstr(tok, "name="))) {
bb_name = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_name, ' ');
if (sub_tok)
sub_tok[0] = '\0';
}
inx = bb_job->buf_cnt++;
bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
sizeof(bb_buf_t) *
bb_job->buf_cnt);
//bb_job->buf_ptr[inx].access = NULL;
//bb_job->buf_ptr[inx].create = false;
//bb_job->buf_ptr[inx].destroy = false;
//bb_job->buf_ptr[inx].hurry = false;
bb_job->buf_ptr[inx].name = bb_name;
//bb_job->buf_ptr[inx].size = 0;
bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
//bb_job->buf_ptr[inx].type = NULL;
bb_job->buf_ptr[inx].use = true;
} else if (!xstrncmp(tok, "swap", 4)) {
have_bb = true;
tok += 4;
while (isspace(tok[0]))
tok++;
bb_job->swap_size = strtol(tok, &end_ptr, 10);
if (job_ptr->details &&
job_ptr->details->max_nodes) {
bb_job->swap_nodes =
job_ptr->details->max_nodes;
} else if (job_ptr->details) {
bb_job->swap_nodes =
job_ptr->details->min_nodes;
} else {
bb_job->swap_nodes = 1;
}
tmp_cnt = (uint64_t) bb_job->swap_size *
bb_job->swap_nodes;
if ((sub_tok = strstr(tok, "pool="))) {
xfree(bb_job->job_pool);
bb_job->job_pool = xstrdup(sub_tok + 5);
sub_tok = strchr(bb_job->job_pool, ' ');
if (sub_tok)
sub_tok[0] = '\0';
} else if (!bb_job->job_pool) {
bb_job->job_pool = xstrdup(
bb_state.bb_config.default_pool);
}
tmp_cnt = _set_granularity(tmp_cnt,
bb_job->job_pool);
bb_job->req_size += tmp_cnt;
bb_job->total_size += tmp_cnt;
bb_job->use_job_buf = true;
} else {
/* Ignore stage-in, stage-out, etc. */
}
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(bb_specs);
if (!have_bb) {
xfree(job_ptr->state_desc);
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xstrfmtcat(job_ptr->state_desc,
"%s: Invalid burst buffer spec (%s)",
plugin_type, job_ptr->burst_buffer);
job_ptr->priority = 0;
info("Invalid burst buffer spec for %pJ (%s)",
job_ptr, job_ptr->burst_buffer);
bb_job_del(&bb_state, job_ptr->job_id);
return NULL;
}
if (!bb_job->job_pool)
bb_job->job_pool = xstrdup(bb_state.bb_config.default_pool);
if (slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF)
bb_job_log(&bb_state, bb_job);
return bb_job;
}
/* At slurmctld start up time, for every currently active burst buffer,
* update that user's limit. Also log every recovered buffer */
static void _apply_limits(void)
{
bool emulate_cray = false;
bb_alloc_t *bb_alloc;
int i;
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
emulate_cray = true;
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
info("Recovered buffer Name:%s User:%u Pool:%s Size:%"PRIu64,
bb_alloc->name, bb_alloc->user_id,
bb_alloc->pool, bb_alloc->size);
_set_assoc_mgr_ptrs(bb_alloc);
bb_limit_add(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state, emulate_cray);
bb_alloc = bb_alloc->next;
}
}
}
/* Write current burst buffer state to a file so that we can preserve account,
* partition, and QOS information of persistent burst buffers as there is no
* place to store that information within the DataWarp data structures */
static void _save_bb_state(void)
{
static time_t last_save_time = 0;
static uint32_t high_buffer_size = 16 * 1024;
time_t save_time = time(NULL);
bb_alloc_t *bb_alloc;
uint32_t rec_count = 0;
buf_t *buffer;
int i, count_offset, offset;
uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
if ((bb_state.last_update_time <= last_save_time) &&
!bb_state.term_flag)
return;
/* Build buffer with name/account/partition/qos information for all
* named burst buffers so we can preserve limits across restarts */
buffer = init_buf(high_buffer_size);
pack16(protocol_version, buffer);
count_offset = get_buf_offset(buffer);
pack32(rec_count, buffer);
if (bb_state.bb_ahash) {
slurm_mutex_lock(&bb_state.bb_mutex);
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
if (bb_alloc->name) {
packstr(bb_alloc->account, buffer);
pack_time(bb_alloc->create_time,buffer);
pack32(bb_alloc->id, buffer);
packstr(bb_alloc->name, buffer);
packstr(bb_alloc->partition, buffer);
packstr(bb_alloc->pool, buffer);
packstr(bb_alloc->qos, buffer);
pack32(bb_alloc->user_id, buffer);
if (bb_state.bb_config.flags &
BB_FLAG_EMULATE_CRAY)
pack64(bb_alloc->size, buffer);
rec_count++;
}
bb_alloc = bb_alloc->next;
}
}
save_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
offset = get_buf_offset(buffer);
set_buf_offset(buffer, count_offset);
pack32(rec_count, buffer);
set_buf_offset(buffer, offset);
}
if (!save_buf_to_state("burst_buffer_cray_state", buffer, NULL))
last_save_time = save_time;
FREE_NULL_BUFFER(buffer);
}
/* Recover saved burst buffer state and use it to preserve account, partition,
* and QOS information for persistent burst buffers. */
static void _recover_bb_state(void)
{
char *state_file = NULL;
uint16_t protocol_version = NO_VAL16;
uint32_t rec_count = 0;
uint32_t id = 0, user_id = 0;
uint64_t size = 0;
int i;
char *account = NULL, *name = NULL;
char *partition = NULL, *pool = NULL, *qos = NULL;
char *end_ptr = NULL;
time_t create_time = 0;
bb_alloc_t *bb_alloc;
buf_t *buffer;
errno = 0;
buffer = state_save_open("burst_buffer_cray_state", &state_file);
if (!buffer && (errno == ENOENT)) {
info("No burst buffer state file (%s) to recover",
state_file);
xfree(state_file);
return;
}
xfree(state_file);
safe_unpack16(&protocol_version, buffer);
if (protocol_version == NO_VAL16) {
if (!ignore_state_errors)
fatal("Can not recover burst_buffer/datawarp state, data version incompatible, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
error("**********************************************************************");
error("Can not recover burst_buffer/datawarp state, data version incompatible");
error("**********************************************************************");
return;
}
safe_unpack32(&rec_count, buffer);
for (i = 0; i < rec_count; i++) {
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpackstr(&account, buffer);
safe_unpack_time(&create_time, buffer);
safe_unpack32(&id, buffer);
safe_unpackstr(&name, buffer);
safe_unpackstr(&partition, buffer);
safe_unpackstr(&pool, buffer);
safe_unpackstr(&qos, buffer);
safe_unpack32(&user_id, buffer);
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
safe_unpack64(&size, buffer);
}
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
bb_alloc = bb_alloc_name_rec(&bb_state, name, user_id);
bb_alloc->id = id;
last_persistent_id = MAX(last_persistent_id, id);
if (name && (name[0] >='0') && (name[0] <='9')) {
bb_alloc->job_id = strtol(name, &end_ptr, 10);
bb_alloc->array_job_id = bb_alloc->job_id;
bb_alloc->array_task_id = NO_VAL;
}
bb_alloc->seen_time = time(NULL);
bb_alloc->size = size;
} else {
bb_alloc = bb_find_name_rec(name, user_id, &bb_state);
}
if (bb_alloc) {
log_flag(BURST_BUF, "Recovered burst buffer %s from user %u",
bb_alloc->name, bb_alloc->user_id);
xfree(bb_alloc->account);
bb_alloc->account = account;
account = NULL;
bb_alloc->create_time = create_time;
xfree(bb_alloc->partition);
bb_alloc->partition = partition;
partition = NULL;
xfree(bb_alloc->pool);
bb_alloc->pool = pool;
pool = NULL;
xfree(bb_alloc->qos);
bb_alloc->qos = qos;
qos = NULL;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
xfree(account);
xfree(name);
xfree(partition);
xfree(pool);
xfree(qos);
}
info("Recovered state of %d burst buffers", rec_count);
FREE_NULL_BUFFER(buffer);
return;
unpack_error:
if (!ignore_state_errors)
fatal("Incomplete burst buffer data checkpoint file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
error("Incomplete burst buffer data checkpoint file");
xfree(account);
xfree(name);
xfree(partition);
xfree(qos);
FREE_NULL_BUFFER(buffer);
return;
}
/* We just found an unexpected session, set default account, QOS, & partition.
* Copy the information from any currently existing session for the same user.
* If none found, use his default account and QOS.
* NOTE: assoc_mgr_locks need to be locked with
* assoc_mgr_lock_t assoc_locks = { READ_LOCK, NO_LOCK, READ_LOCK, NO_LOCK,
* NO_LOCK, NO_LOCK, NO_LOCK };
* before calling this.
*/
static void _pick_alloc_account(bb_alloc_t *bb_alloc)
{
slurmdb_assoc_rec_t assoc_rec;
slurmdb_qos_rec_t qos_rec;
bb_alloc_t *bb_ptr = NULL;
bb_ptr = bb_state.bb_ahash[bb_alloc->user_id % BB_HASH_SIZE];
while (bb_ptr) {
if ((bb_ptr != bb_alloc) &&
(bb_ptr->user_id == bb_alloc->user_id)) {
xfree(bb_alloc->account);
bb_alloc->account = xstrdup(bb_ptr->account);
bb_alloc->assoc_ptr = bb_ptr->assoc_ptr;
xfree(bb_alloc->partition);
bb_alloc->partition = xstrdup(bb_ptr->partition);
xfree(bb_alloc->qos);
bb_alloc->qos = xstrdup(bb_ptr->qos);
bb_alloc->qos_ptr = bb_ptr->qos_ptr;
xfree(bb_alloc->assocs);
bb_alloc->assocs = xstrdup(bb_ptr->assocs);
return;
}
bb_ptr = bb_ptr->next;
}
/* Set default for this user */
bb_alloc->partition = xstrdup(default_part_name);
memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
assoc_rec.partition = default_part_name;
assoc_rec.uid = bb_alloc->user_id;
if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
accounting_enforce,
&bb_alloc->assoc_ptr,
true) == SLURM_SUCCESS) {
xfree(bb_alloc->account);
bb_alloc->account = xstrdup(assoc_rec.acct);
xfree(bb_alloc->assocs);
if (bb_alloc->assoc_ptr)
bb_alloc->assocs =
xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
assoc_mgr_get_default_qos_info(bb_alloc->assoc_ptr, &qos_rec);
if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec,
accounting_enforce,
&bb_alloc->qos_ptr,
true) == SLURM_SUCCESS) {
xfree(bb_alloc->qos);
if (bb_alloc->qos_ptr)
bb_alloc->qos =
xstrdup(bb_alloc->qos_ptr->name);
}
}
}
/* For a given user/partition/account, set it's assoc_ptr */
static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc)
{
/* read locks on assoc */
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK, .user = READ_LOCK };
slurmdb_assoc_rec_t assoc_rec;
slurmdb_qos_rec_t qos_rec;
memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
assoc_rec.acct = bb_alloc->account;
assoc_rec.partition = bb_alloc->partition;
assoc_rec.uid = bb_alloc->user_id;
assoc_mgr_lock(&assoc_locks);
if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
accounting_enforce,
&bb_alloc->assoc_ptr,
true) == SLURM_SUCCESS) {
xfree(bb_alloc->assocs);
if (bb_alloc->assoc_ptr) {
bb_alloc->assocs =
xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
}
}
memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
qos_rec.name = bb_alloc->qos;
if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec, accounting_enforce,
&bb_alloc->qos_ptr, true) != SLURM_SUCCESS)
verbose("Invalid QOS name: %s",
bb_alloc->qos);
assoc_mgr_unlock(&assoc_locks);
}
/*
* Determine the current actual burst buffer state.
*/
static void _load_state(bool init_config)
{
static bool first_run = true;
burst_buffer_pool_t *pool_ptr;
bb_configs_t *configs;
bb_instances_t *instances;
bb_pools_t *pools;
bb_sessions_t *sessions;
bb_alloc_t *bb_alloc;
job_record_t *job_ptr;
int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0;
int i, j, pools_inx;
char *end_ptr = NULL;
time_t now = time(NULL);
uint32_t timeout;
assoc_mgr_lock_t assoc_locks = { .assoc = READ_LOCK,
.qos = READ_LOCK,
.user = READ_LOCK };
bool found_pool;
bitstr_t *pools_bitmap;
slurm_mutex_lock(&bb_state.bb_mutex);
timeout = bb_state.bb_config.other_timeout * 1000;
slurm_mutex_unlock(&bb_state.bb_mutex);
/*
* Load the pools information
*/
pools = _bb_get_pools(&num_pools, &bb_state, timeout);
if (pools == NULL) {
error("failed to find DataWarp entries, what now?");
return;
}
pools_bitmap = bit_alloc(bb_state.bb_config.pool_cnt + num_pools);
slurm_mutex_lock(&bb_state.bb_mutex);
if (!bb_state.bb_config.default_pool && (num_pools > 0)) {
info("Setting DefaultPool to %s",
pools[0].id);
bb_state.bb_config.default_pool = xstrdup(pools[0].id);
}
for (i = 0; i < num_pools; i++) {
/* ID: "bytes" */
if (xstrcmp(pools[i].id,
bb_state.bb_config.default_pool) == 0) {
bb_state.bb_config.granularity = pools[i].granularity;
bb_state.total_space = pools[i].quantity *
pools[i].granularity;
bb_state.unfree_space = pools[i].quantity -
pools[i].free;
bb_state.unfree_space *= pools[i].granularity;
continue;
}
found_pool = false;
pool_ptr = bb_state.bb_config.pool_ptr;
for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
if (!xstrcmp(pool_ptr->name, pools[i].id)) {
found_pool = true;
break;
}
}
if (!found_pool) {
if (!first_run) {
info("Newly reported pool %s",
pools[i].id);
}
bb_state.bb_config.pool_ptr
= xrealloc(bb_state.bb_config.pool_ptr,
sizeof(burst_buffer_pool_t) *
(bb_state.bb_config.pool_cnt + 1));
pool_ptr = bb_state.bb_config.pool_ptr +
bb_state.bb_config.pool_cnt;
pool_ptr->name = xstrdup(pools[i].id);
bb_state.bb_config.pool_cnt++;
}
pools_inx = pool_ptr - bb_state.bb_config.pool_ptr;
bit_set(pools_bitmap, pools_inx);
pool_ptr->total_space = pools[i].quantity *
pools[i].granularity;
pool_ptr->granularity = pools[i].granularity;
pool_ptr->unfree_space = pools[i].quantity - pools[i].free;
pool_ptr->unfree_space *= pools[i].granularity;
}
pool_ptr = bb_state.bb_config.pool_ptr;
for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
if (bit_test(pools_bitmap, j) || (pool_ptr->total_space == 0))
continue;
error("Pool %s no longer reported by system, setting size to zero",
pool_ptr->name);
pool_ptr->total_space = 0;
pool_ptr->used_space = 0;
pool_ptr->unfree_space = 0;
}
first_run = false;
slurm_mutex_unlock(&bb_state.bb_mutex);
FREE_NULL_BITMAP(pools_bitmap);
_bb_free_pools(pools, num_pools);
/*
* Load the instances information
*/
instances = _bb_get_instances(&num_instances, &bb_state, timeout);
if (instances == NULL) {
log_flag(BURST_BUF, "No DataWarp instances found");
num_instances = 0; /* Redundant, but fixes CLANG bug */
}
sessions = _bb_get_sessions(&num_sessions, &bb_state, timeout);
assoc_mgr_lock(&assoc_locks);
slurm_mutex_lock(&bb_state.bb_mutex);
bb_state.last_load_time = time(NULL);
for (i = 0; i < num_sessions; i++) {
if (!init_config) {
bb_alloc = bb_find_name_rec(sessions[i].token,
sessions[i].user_id,
&bb_state);
if (bb_alloc) {
bb_alloc->seen_time = bb_state.last_load_time;
continue;
}
if (difftime(now, sessions[i].created) <
bb_state.bb_config.other_timeout) {
/* Newly created in other thread. Give that
* thread a chance to add the entry */
continue;
}
error("Unexpected burst buffer found: %s",
sessions[i].token);
}
bb_alloc = bb_alloc_name_rec(&bb_state, sessions[i].token,
sessions[i].user_id);
bb_alloc->create_time = sessions[i].created;
bb_alloc->id = sessions[i].id;
if ((sessions[i].token != NULL) &&
(sessions[i].token[0] >= '0') &&
(sessions[i].token[0] <= '9')) {
bb_alloc->job_id =
strtol(sessions[i].token, &end_ptr, 10);
job_ptr = find_job_record(bb_alloc->job_id);
if (job_ptr) {
bb_alloc->array_job_id = job_ptr->array_job_id;
bb_alloc->array_task_id =job_ptr->array_task_id;
} else {
bb_alloc->array_task_id = NO_VAL;
}
}
for (j = 0; j < num_instances; j++) {
if (sessions[i].id != instances[j].session)
continue;
bb_alloc->size += instances[j].bytes;
}
bb_alloc->seen_time = bb_state.last_load_time;
if (!init_config) { /* Newly found buffer */
_pick_alloc_account(bb_alloc);
bb_limit_add(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state, false);
}
if (bb_alloc->job_id == 0)
bb_post_persist_create(NULL, bb_alloc, &bb_state);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
assoc_mgr_unlock(&assoc_locks);
_bb_free_sessions(sessions, num_sessions);
_bb_free_instances(instances, num_instances);
if (!init_config)
return;
/*
* Load the configurations information
* NOTE: This information is currently unused
*/
configs = _bb_get_configs(&num_configs, &bb_state, timeout);
if (configs == NULL) {
info("No DataWarp configurations found");
num_configs = 0;
}
_bb_free_configs(configs, num_configs);
_recover_bb_state();
_apply_limits();
bb_state.last_update_time = time(NULL);
return;
}
static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job)
{
char *hash_dir = NULL, *job_dir = NULL, *job_pool;
char *client_nodes_file_nid = NULL;
char **setup_argv, **data_in_argv;
stage_args_t *stage_args;
int hash_inx = job_ptr->job_id % 10;
int rc = SLURM_SUCCESS;
bb_alloc_t *bb_alloc = NULL;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
if (job_ptr->sched_nodes) {
xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
if (bb_write_nid_file(client_nodes_file_nid,
job_ptr->sched_nodes, job_ptr))
xfree(client_nodes_file_nid);
}
setup_argv = xcalloc(20, sizeof(char *)); /* NULL terminated */
setup_argv[0] = xstrdup("dw_wlm_cli");
setup_argv[1] = xstrdup("--function");
setup_argv[2] = xstrdup("setup");
setup_argv[3] = xstrdup("--token");
xstrfmtcat(setup_argv[4], "%u", job_ptr->job_id);
setup_argv[5] = xstrdup("--caller");
setup_argv[6] = xstrdup("SLURM");
setup_argv[7] = xstrdup("--user");
xstrfmtcat(setup_argv[8], "%u", job_ptr->user_id);
setup_argv[9] = xstrdup("--groupid");
xstrfmtcat(setup_argv[10], "%u", job_ptr->group_id);
setup_argv[11] = xstrdup("--capacity");
if (bb_job->job_pool)
job_pool = bb_job->job_pool;
else
job_pool = bb_state.bb_config.default_pool;
xstrfmtcat(setup_argv[12], "%s:%s",
job_pool, bb_get_size_str(bb_job->total_size));
setup_argv[13] = xstrdup("--job");
setup_argv[14] = bb_handle_job_script(job_ptr, bb_job);
if (client_nodes_file_nid) {
#if defined(HAVE_NATIVE_CRAY)
setup_argv[15] = xstrdup("--nidlistfile");
#else
setup_argv[15] = xstrdup("--nodehostnamefile");
#endif
setup_argv[16] = xstrdup(client_nodes_file_nid);
}
/*
* Create bb allocation for the job now. Check if it has already been
* created (perhaps it was created but then slurmctld restarted).
* bb_alloc is the structure that is state saved.
* If we wait until the _start_stage_in thread to create bb_alloc,
* we introduce a race condition where the thread could be killed
* (if slurmctld is shut down) before the thread creates
* bb_alloc. That race would mean the burst buffer isn't state saved.
*/
if (!(bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))) {
bb_alloc = bb_alloc_job(&bb_state, job_ptr, bb_job);
bb_alloc->create_time = time(NULL);
}
bb_limit_add(job_ptr->user_id, bb_job->total_size, job_pool, &bb_state,
true);
data_in_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
data_in_argv[0] = xstrdup("dw_wlm_cli");
data_in_argv[1] = xstrdup("--function");
data_in_argv[2] = xstrdup("data_in");
data_in_argv[3] = xstrdup("--token");
xstrfmtcat(data_in_argv[4], "%u", job_ptr->job_id);
data_in_argv[5] = xstrdup("--job");
data_in_argv[6] = bb_handle_job_script(job_ptr, bb_job);
stage_args = xmalloc(sizeof(stage_args_t));
stage_args->bb_size = bb_job->total_size;
stage_args->job_id = job_ptr->job_id;
stage_args->pool = xstrdup(job_pool);
stage_args->user_id = job_ptr->user_id;
stage_args->args1 = setup_argv;
stage_args->args2 = data_in_argv;
slurm_thread_create_detached(_start_stage_in, stage_args);
xfree(hash_dir);
xfree(job_dir);
xfree(client_nodes_file_nid);
return rc;
}
static void *_start_stage_in(void *x)
{
stage_args_t *stage_args = (stage_args_t *) x;
char **setup_argv, **size_argv, **data_in_argv;
char *resp_msg = NULL, *resp_msg2 = NULL, *op = NULL;
uint64_t real_size = 0;
int rc = SLURM_SUCCESS, status = 0, timeout;
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
job_record_t *job_ptr;
bb_alloc_t *bb_alloc = NULL;
bb_job_t *bb_job;
bool get_real_size = false;
DEF_TIMERS;
pthread_t tid = pthread_self();
track_script_rec_add(stage_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.status = &status,
.tid = tid,
};
setup_argv = stage_args->args1;
data_in_argv = stage_args->args2;
timeout = bb_state.bb_config.other_timeout * 1000;
op = "setup";
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = setup_argv;
run_command_args.script_type = "setup";
resp_msg = run_command(&run_command_args);
END_TIMER;
info("setup for job JobId=%u ran for %s",
stage_args->job_id, TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("setup for JobId=%u terminated by slurmctld",
stage_args->job_id);
xfree_array(setup_argv);
xfree_array(data_in_argv);
xfree(resp_msg);
xfree(stage_args->pool);
xfree(stage_args);
track_script_remove(pthread_self());
return NULL;
}
track_script_reset_cpid(pthread_self(), 0);
_log_script_argv(setup_argv, resp_msg);
slurm_mutex_lock(&bb_state.bb_mutex);
/*
* The buffer's actual size may be larger than requested by the user.
* Remove limit here and restore limit based upon actual size below
* (assuming buffer allocation succeeded, or just leave it out).
*/
bb_limit_rem(stage_args->user_id, stage_args->bb_size, stage_args->pool,
&bb_state);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
/*
* Unlock bb_mutex before locking job_write_lock to avoid
* deadlock, since job_write_lock is always locked first.
*/
slurm_mutex_unlock(&bb_state.bb_mutex);
trigger_burst_buffer();
error("setup for JobId=%u status:%u response:%s",
stage_args->job_id, status,
resp_msg);
rc = SLURM_ERROR;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (job_ptr)
bb_update_system_comment(job_ptr, "setup", resp_msg, 0);
unlock_slurmctld(job_write_lock);
} else {
bb_job = bb_job_find(&bb_state, stage_args->job_id);
if (!bb_job) {
error("unable to find bb_job record for JobId=%u",
stage_args->job_id);
rc = SLURM_ERROR;
} else if (bb_job->total_size) {
/* Restore limit based upon actual size. */
bb_limit_add(stage_args->user_id, bb_job->total_size,
stage_args->pool, &bb_state, true);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
}
if (rc == SLURM_SUCCESS) {
timeout = bb_state.bb_config.stage_in_timeout * 1000;
xfree(resp_msg);
op = "dws_data_in";
START_TIMER;
/* Overwrite changed parameters */
run_command_args.max_wait = timeout;
run_command_args.script_argv = data_in_argv;
run_command_args.script_type = "dws_data_in";
resp_msg = run_command(&run_command_args);
END_TIMER;
info("dws_data_in for JobId=%u ran for %s",
stage_args->job_id, TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("dws_data_in for JobId=%u terminated by slurmctld",
stage_args->job_id);
xfree_array(setup_argv);
xfree_array(data_in_argv);
xfree(resp_msg);
xfree(stage_args->pool);
xfree(stage_args);
/*
* Don't need to free track_script_rec here,
* it is handled elsewhere since it still being tracked.
*/
return NULL;
}
track_script_reset_cpid(pthread_self(), 0);
_log_script_argv(data_in_argv, resp_msg);
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
!strstr(resp_msg, "No matching session")) {
trigger_burst_buffer();
error("dws_data_in for JobId=%u status:%u response:%s",
stage_args->job_id, status,
resp_msg);
rc = SLURM_ERROR;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (job_ptr)
bb_update_system_comment(job_ptr, "data_in",
resp_msg, 0);
unlock_slurmctld(job_write_lock);
}
}
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = bb_job_find(&bb_state, stage_args->job_id);
if (bb_job && bb_job->req_size)
get_real_size = true;
slurm_mutex_unlock(&bb_state.bb_mutex);
/* Round up job buffer size based upon DW "equalize_fragments"
* configuration parameter */
if (get_real_size) {
size_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
size_argv[0] = xstrdup("dw_wlm_cli");
size_argv[1] = xstrdup("--function");
size_argv[2] = xstrdup("real_size");
size_argv[3] = xstrdup("--token");
xstrfmtcat(size_argv[4], "%u", stage_args->job_id);
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = size_argv;
run_command_args.script_type = "real_size";
resp_msg2 = run_command(&run_command_args);
END_TIMER;
if ((DELTA_TIMER > 200000) || /* 0.2 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF))
info("real_size ran for %s",
TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("real_size for JobId=%u terminated by slurmctld",
stage_args->job_id);
xfree_array(setup_argv);
xfree_array(data_in_argv);
xfree(resp_msg);
xfree(resp_msg2);
xfree_array(size_argv);
xfree(stage_args->pool);
xfree(stage_args);
/*
* Don't need to free track_script_rec here,
* it is handled elsewhere since it still being tracked.
*/
return NULL;
}
track_script_reset_cpid(pthread_self(), 0);
/* Use resp_msg2 to preserve resp_msg for error message below */
_log_script_argv(size_argv, resp_msg2);
if (WIFEXITED(status) && (WEXITSTATUS(status) != 0) &&
resp_msg2 &&
(strncmp(resp_msg2, "invalid function", 16) == 0)) {
debug("Old dw_wlm_cli does not support real_size function");
} else if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
error("real_size for JobId=%u status:%u response:%s",
stage_args->job_id,
status, resp_msg2);
} else if (resp_msg2 && resp_msg2[0]) {
json_object *j;
struct bb_total_size *ent;
j = json_tokener_parse(resp_msg2);
if (j == NULL) {
error("json parser failed on \"%s\"",
resp_msg2);
} else {
ent = _json_parse_real_size(j);
json_object_put(j); /* Frees json memory */
if (ent && (ent->units == BB_UNITS_BYTES))
real_size = ent->capacity;
xfree(ent);
}
}
xfree(resp_msg2);
xfree_array(size_argv);
}
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_args->job_id);
} else if (rc == SLURM_SUCCESS) {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = bb_job_find(&bb_state, stage_args->job_id);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
if (bb_job && bb_job->total_size) {
if (real_size > bb_job->req_size) {
info("%pJ total_size increased from %"PRIu64" to %"PRIu64,
job_ptr,
bb_job->req_size, real_size);
bb_job->total_size = real_size;
}
bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
if (bb_alloc) {
bb_alloc->state = BB_STATE_STAGED_IN;
bb_alloc->state_time = time(NULL);
log_flag(BURST_BUF, "Setup/stage-in complete for %pJ",
job_ptr);
queue_job_scheduler();
bb_state.last_update_time = time(NULL);
} else {
error("unable to find bb_alloc record for %pJ",
job_ptr);
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
} else {
xfree(job_ptr->state_desc);
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
plugin_type, op, resp_msg);
job_ptr->priority = 0; /* Hold job */
bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
if (bb_alloc) {
bb_alloc->state_time = time(NULL);
bb_state.last_update_time = time(NULL);
if (bb_state.bb_config.flags &
BB_FLAG_TEARDOWN_FAILURE) {
bb_alloc->state = BB_STATE_TEARDOWN;
_queue_teardown(job_ptr->job_id,
job_ptr->user_id, true);
} else {
bb_alloc->state = BB_STATE_ALLOCATED;
}
} else {
_queue_teardown(job_ptr->job_id, job_ptr->user_id,true);
}
}
unlock_slurmctld(job_write_lock);
xfree(resp_msg);
xfree_array(setup_argv);
xfree_array(data_in_argv);
xfree(stage_args->pool);
xfree(stage_args);
track_script_remove(tid);
return NULL;
}
static int _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
{
char *hash_dir = NULL, *job_dir = NULL;
char **post_run_argv, **data_out_argv;
stage_args_t *stage_args;
int hash_inx = bb_job->job_id % 10, rc = SLURM_SUCCESS;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, bb_job->job_id);
data_out_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
data_out_argv[0] = xstrdup("dw_wlm_cli");
data_out_argv[1] = xstrdup("--function");
data_out_argv[2] = xstrdup("data_out");
data_out_argv[3] = xstrdup("--token");
xstrfmtcat(data_out_argv[4], "%u", bb_job->job_id);
data_out_argv[5] = xstrdup("--job");
data_out_argv[6] = bb_handle_job_script(job_ptr, bb_job);
post_run_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
post_run_argv[0] = xstrdup("dw_wlm_cli");
post_run_argv[1] = xstrdup("--function");
post_run_argv[2] = xstrdup("post_run");
post_run_argv[3] = xstrdup("--token");
xstrfmtcat(post_run_argv[4], "%u", bb_job->job_id);
post_run_argv[5] = xstrdup("--job");
post_run_argv[6] = bb_handle_job_script(job_ptr, bb_job);
stage_args = xmalloc(sizeof(stage_args_t));
stage_args->args1 = data_out_argv;
stage_args->args2 = post_run_argv;
stage_args->job_id = bb_job->job_id;
stage_args->user_id = bb_job->user_id;
slurm_thread_create_detached(_start_stage_out, stage_args);
xfree(hash_dir);
xfree(job_dir);
return rc;
}
static void *_start_stage_out(void *x)
{
stage_args_t *stage_args = (stage_args_t *)x;
char **post_run_argv, **data_out_argv, *resp_msg = NULL, *op = NULL;
int rc = SLURM_SUCCESS, status = 0, timeout;
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
job_record_t *job_ptr;
bb_alloc_t *bb_alloc = NULL;
bb_job_t *bb_job = NULL;
DEF_TIMERS
pthread_t tid = pthread_self();
track_script_rec_add(stage_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.status = &status,
.tid = tid,
};
data_out_argv = stage_args->args1;
post_run_argv = stage_args->args2;
timeout = bb_state.bb_config.other_timeout * 1000;
op = "dws_post_run";
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = post_run_argv;
run_command_args.script_type = "dws_post_run";
resp_msg = run_command(&run_command_args);
END_TIMER;
if ((DELTA_TIMER > 500000) || /* 0.5 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF)) {
info("dws_post_run for JobId=%u ran for %s",
stage_args->job_id, TIME_STR);
}
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("dws_post_run for JobId=%u terminated by slurmctld",
stage_args->job_id);
xfree_array(post_run_argv);
xfree_array(data_out_argv);
xfree(resp_msg);
xfree(stage_args->pool);
xfree(stage_args);
track_script_remove(pthread_self());
return NULL;
}
track_script_reset_cpid(pthread_self(), 0);
_log_script_argv(post_run_argv, resp_msg);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
trigger_burst_buffer();
error("dws_post_run for JobId=%u status:%u response:%s",
stage_args->job_id, status,
resp_msg);
rc = SLURM_ERROR;
if (job_ptr) {
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s: post_run: %s",
plugin_type, resp_msg);
bb_update_system_comment(job_ptr, "post_run",
resp_msg, 1);
}
}
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_args->job_id);
} else {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = _get_bb_job(job_ptr);
if (bb_job)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGING_OUT);
slurm_mutex_unlock(&bb_state.bb_mutex);
}
unlock_slurmctld(job_write_lock);
if (rc == SLURM_SUCCESS) {
timeout = bb_state.bb_config.stage_out_timeout * 1000;
op = "dws_data_out";
START_TIMER;
xfree(resp_msg);
run_command_args.max_wait = timeout;
run_command_args.script_argv = data_out_argv;
run_command_args.script_type = "dws_data_out";
resp_msg = run_command(&run_command_args);
END_TIMER;
if ((DELTA_TIMER > 1000000) || /* 10 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF)) {
info("dws_data_out for JobId=%u ran for %s",
stage_args->job_id,
TIME_STR);
}
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("dws_data_out for JobId=%u terminated by slurmctld",
stage_args->job_id);
xfree_array(post_run_argv);
xfree_array(data_out_argv);
xfree(resp_msg);
xfree(stage_args->pool);
xfree(stage_args);
track_script_remove(pthread_self());
return NULL;
}
track_script_reset_cpid(pthread_self(), 0);
_log_script_argv(data_out_argv, resp_msg);
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
!strstr(resp_msg, "No matching session")) {
trigger_burst_buffer();
error("dws_data_out for JobId=%u status:%u response:%s",
stage_args->job_id,
status, resp_msg);
rc = SLURM_ERROR;
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (job_ptr) {
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc,
"%s: stage-out: %s",
plugin_type, resp_msg);
bb_update_system_comment(job_ptr, "data_out",
resp_msg, 1);
}
unlock_slurmctld(job_write_lock);
}
}
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(stage_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
stage_args->job_id);
} else {
if (rc != SLURM_SUCCESS) {
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
plugin_type, op, resp_msg);
} else {
job_state_unset_flag(job_ptr, JOB_STAGE_OUT);
xfree(job_ptr->state_desc);
last_job_update = time(NULL);
}
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job = _get_bb_job(job_ptr);
if ((rc == SLURM_SUCCESS) && bb_job)
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
if (bb_alloc) {
if (rc == SLURM_SUCCESS) {
log_flag(BURST_BUF, "Stage-out/post-run complete for %pJ",
job_ptr);
/* bb_alloc->state = BB_STATE_STAGED_OUT; */
bb_alloc->state = BB_STATE_TEARDOWN;
bb_alloc->state_time = time(NULL);
} else {
if (bb_state.bb_config.flags &
BB_FLAG_TEARDOWN_FAILURE) {
bb_alloc->state = BB_STATE_TEARDOWN;
_queue_teardown(stage_args->job_id,
stage_args->user_id,
false);
} else
bb_alloc->state = BB_STATE_STAGED_IN;
log_flag(BURST_BUF, "Stage-out failed for %pJ",
job_ptr);
}
bb_state.last_update_time = time(NULL);
} else if (bb_job && bb_job->total_size) {
error("unable to find bb record for %pJ",
job_ptr);
}
if (rc == SLURM_SUCCESS) {
_queue_teardown(stage_args->job_id, stage_args->user_id,
false);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
}
unlock_slurmctld(job_write_lock);
xfree(resp_msg);
xfree_array(post_run_argv);
xfree_array(data_out_argv);
xfree(stage_args);
track_script_remove(pthread_self());
return NULL;
}
static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry)
{
struct stat buf;
char *hash_dir = NULL, *job_script = NULL;
char **teardown_argv;
stage_args_t *teardown_args;
int fd, hash_inx = job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
xstrfmtcat(job_script, "%s/job.%u/script", hash_dir, job_id);
if (stat(job_script, &buf) == -1) {
xfree(job_script);
xstrfmtcat(job_script, "%s/burst_buffer_script",
slurm_conf.state_save_location);
if (stat(job_script, &buf) == -1) {
fd = creat(job_script, 0755);
if (fd >= 0) {
int len;
char *dummy_script = "#!/bin/bash\nexit 0\n";
len = strlen(dummy_script) + 1;
if (write(fd, dummy_script, len) != len) {
verbose("write(%s): %m",
job_script);
}
close(fd);
}
}
}
teardown_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
teardown_argv[0] = xstrdup("dw_wlm_cli");
teardown_argv[1] = xstrdup("--function");
teardown_argv[2] = xstrdup("teardown");
teardown_argv[3] = xstrdup("--token");
xstrfmtcat(teardown_argv[4], "%u", job_id);
teardown_argv[5] = xstrdup("--job");
teardown_argv[6] = xstrdup(job_script);
if (hurry)
teardown_argv[7] = xstrdup("--hurry");
teardown_args = xmalloc(sizeof(stage_args_t));
teardown_args->job_id = job_id;
teardown_args->user_id = user_id;
teardown_args->args1 = teardown_argv;
slurm_thread_create_detached(_start_teardown, teardown_args);
xfree(hash_dir);
xfree(job_script);
}
static void *_start_teardown(void *x)
{
static uint32_t previous_job_id = 0;
stage_args_t *teardown_args = (stage_args_t *)x;
char **teardown_argv, *resp_msg = NULL;
int status = 0, timeout;
job_record_t *job_ptr;
bb_alloc_t *bb_alloc = NULL;
bb_job_t *bb_job = NULL;
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
DEF_TIMERS;
bool hurry;
pthread_t tid = pthread_self();
track_script_rec_add(teardown_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.status = &status,
.tid = tid,
};
teardown_argv = teardown_args->args1;
if (previous_job_id == teardown_args->job_id)
sleep(5);
previous_job_id = teardown_args->job_id;
START_TIMER;
timeout = bb_state.bb_config.other_timeout * 1000;
run_command_args.max_wait = timeout;
run_command_args.script_argv = teardown_argv;
run_command_args.script_type = "teardown";
resp_msg = run_command(&run_command_args);
END_TIMER;
info("teardown for JobId=%u ran for %s",
teardown_args->job_id, TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("teardown for JobId=%u terminated by slurmctld",
teardown_args->job_id);
xfree(resp_msg);
xfree_array(teardown_argv);
xfree(teardown_args);
track_script_remove(pthread_self());
return NULL;
}
/* track_script_reset_cpid(pthread_self(), 0); */
_log_script_argv(teardown_argv, resp_msg);
/*
* "Teardown" is run at every termination of every job that _might_
* have a burst buffer, so an error of "token not found" should be
* fairly common and not indicative of a problem.
*/
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
(!resp_msg ||
(!strstr(resp_msg, "No matching session") &&
!strstr(resp_msg, "token not found")))) {
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(teardown_args->job_id);
if (job_ptr &&
(bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))) {
bb_alloc->state = BB_STATE_TEARDOWN_FAIL;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
trigger_burst_buffer();
error("teardown for JobId=%u status:%u response:%s",
teardown_args->job_id, status,
resp_msg);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(teardown_args->job_id);
if (job_ptr) {
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s: teardown: %s",
plugin_type, resp_msg);
bb_update_system_comment(job_ptr, "teardown",
resp_msg, 0);
}
unlock_slurmctld(job_write_lock);
if (!xstrcmp(teardown_argv[7], "--hurry"))
hurry = true;
else
hurry = false;
_queue_teardown(teardown_args->job_id, teardown_args->user_id,
hurry);
} else {
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(teardown_args->job_id);
_purge_bb_files(teardown_args->job_id, job_ptr);
if (job_ptr) {
if ((bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))){
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
}
if ((bb_job = _get_bb_job(job_ptr))) {
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_COMPLETE);
bb_job_del(&bb_state, bb_job->job_id);
}
job_state_unset_flag(job_ptr, JOB_STAGE_OUT);
if (!IS_JOB_PENDING(job_ptr) && /* No email if requeue */
(job_ptr->mail_type & MAIL_JOB_STAGE_OUT)) {
/*
* NOTE: If a job uses multiple burst buffer
* plugins, the message will be sent after the
* teardown completes in the first plugin
*/
mail_job_info(job_ptr, MAIL_JOB_STAGE_OUT);
job_ptr->mail_type &= (~MAIL_JOB_STAGE_OUT);
}
} else {
/*
* This will happen when slurmctld restarts and needs
* to clear vestigial buffers
*/
char buf_name[32];
snprintf(buf_name, sizeof(buf_name), "%u",
teardown_args->job_id);
bb_alloc = bb_find_name_rec(buf_name,
teardown_args->user_id,
&bb_state);
if (bb_alloc) {
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
}
xfree(resp_msg);
xfree_array(teardown_argv);
xfree(teardown_args);
track_script_remove(pthread_self());
return NULL;
}
/* Handle timeout of burst buffer events:
* 1. Purge per-job burst buffer records when the stage-out has completed and
* the job has been purged from Slurm
* 2. Test for StageInTimeout events
* 3. Test for StageOutTimeout events
*/
static void _timeout_bb_rec(void)
{
bb_alloc_t **bb_pptr, *bb_alloc = NULL;
job_record_t *job_ptr;
int i;
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
return;
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_pptr = &bb_state.bb_ahash[i];
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
if (((bb_alloc->seen_time + TIME_SLOP) <
bb_state.last_load_time) &&
(bb_alloc->state == BB_STATE_TEARDOWN)) {
/*
* Teardown likely complete, but bb_alloc state
* not yet updated; skip the record
*/
} else if ((bb_alloc->seen_time + TIME_SLOP) <
bb_state.last_load_time) {
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK,
.qos = READ_LOCK };
/*
* assoc_mgr needs locking to call
* bb_post_persist_delete
*/
if (bb_alloc->job_id == 0) {
info("Persistent burst buffer %s purged",
bb_alloc->name);
} else
log_flag(BURST_BUF, "burst buffer for JobId=%u purged",
bb_alloc->job_id);
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
assoc_mgr_lock(&assoc_locks);
bb_post_persist_delete(bb_alloc, &bb_state);
assoc_mgr_unlock(&assoc_locks);
*bb_pptr = bb_alloc->next;
bb_free_alloc_buf(bb_alloc);
break;
} else if (bb_alloc->state == BB_STATE_COMPLETE) {
job_ptr = find_job_record(bb_alloc->job_id);
if (!job_ptr || IS_JOB_PENDING(job_ptr)) {
/* Job purged or BB preempted */
*bb_pptr = bb_alloc->next;
bb_free_alloc_buf(bb_alloc);
break;
}
}
bb_pptr = &bb_alloc->next;
bb_alloc = bb_alloc->next;
}
}
}
/* Perform basic burst_buffer option validation */
static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
uid_t submit_uid)
{
char *bb_script, *save_ptr = NULL;
char *bb_name = NULL, *bb_pool, *capacity;
char *end_ptr = NULL, *sub_tok, *tok;
uint64_t tmp_cnt, swap_cnt = 0;
int rc = SLURM_SUCCESS;
bool enable_persist = false, have_bb = false, have_stage_out = false;
xassert(bb_size);
*bb_size = 0;
if (validate_operator(submit_uid) ||
(bb_state.bb_config.flags & BB_FLAG_ENABLE_PERSISTENT))
enable_persist = true;
if (job_desc->script)
rc = _xlate_batch(job_desc);
else
rc = _xlate_interactive(job_desc);
if ((rc != SLURM_SUCCESS) || (!job_desc->burst_buffer))
return rc;
bb_script = xstrdup(job_desc->burst_buffer);
tok = strtok_r(bb_script, "\n", &save_ptr);
while (tok) {
uint32_t bb_flag = 0;
tmp_cnt = 0;
if (tok[0] != '#')
break; /* Quit at first non-comment */
if ((tok[1] == 'B') && (tok[2] == 'B'))
bb_flag = BB_FLAG_BB_OP;
else if ((tok[1] == 'D') && (tok[2] == 'W'))
bb_flag = BB_FLAG_DW_OP;
/*
* Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
* and destroy_persistent functions are directly supported by
* dw_wlm_cli. Support "#BB" format for backward compatibility.
*/
if (bb_flag == BB_FLAG_BB_OP) {
tok += 3;
while (isspace(tok[0]))
tok++;
if (!xstrncmp(tok, "create_persistent", 17) &&
!enable_persist) {
info("User %d disabled from creating persistent burst buffer",
submit_uid);
rc = ESLURM_BURST_BUFFER_PERMISSION;
break;
} else if (!xstrncmp(tok, "create_persistent", 17)) {
have_bb = true;
bb_name = NULL;
bb_pool = NULL;
if ((sub_tok = strstr(tok, "capacity="))) {
tmp_cnt = bb_get_size_num(sub_tok+9, 1);
}
if (tmp_cnt == 0)
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
if ((sub_tok = strstr(tok, "name="))) {
bb_name = xstrdup(sub_tok + 5);
if ((sub_tok = strchr(bb_name, ' ')))
sub_tok[0] = '\0';
} else {
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
if (!bb_name ||
((bb_name[0] >= '0') &&
(bb_name[0] <= '9')))
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
xfree(bb_name);
if ((sub_tok = strstr(tok, "pool="))) {
bb_pool = xstrdup(sub_tok + 5);
if ((sub_tok = strchr(bb_pool, ' ')))
sub_tok[0] = '\0';
}
if (!bb_valid_pool_test(&bb_state, bb_pool))
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
*bb_size += _set_granularity(tmp_cnt, bb_pool);
xfree(bb_pool);
if (rc != SLURM_SUCCESS)
break;
} else if (!xstrncmp(tok, "destroy_persistent", 18) &&
!enable_persist) {
info("User %d disabled from destroying persistent burst buffer",
submit_uid);
rc = ESLURM_BURST_BUFFER_PERMISSION;
break;
} else if (!xstrncmp(tok, "destroy_persistent", 18)) {
have_bb = true;
if (!(sub_tok = strstr(tok, "name="))) {
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
break;
}
} else {
/* Ignore other (future) options */
}
}
if (bb_flag == BB_FLAG_DW_OP) {
tok += 3;
while (isspace(tok[0]))
tok++;
if (!xstrncmp(tok, "jobdw", 5) &&
(capacity = strstr(tok, "capacity="))) {
bb_pool = NULL;
have_bb = true;
tmp_cnt = bb_get_size_num(capacity + 9, 1);
if (tmp_cnt == 0) {
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
break;
}
if ((sub_tok = strstr(tok, "pool="))) {
bb_pool = xstrdup(sub_tok + 5);
if ((sub_tok = strchr(bb_pool, ' ')))
sub_tok[0] = '\0';
}
if (!bb_valid_pool_test(&bb_state, bb_pool))
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
*bb_size += _set_granularity(tmp_cnt, bb_pool);
xfree(bb_pool);
} else if (!xstrncmp(tok, "persistentdw", 12)) {
have_bb = true;
} else if (!xstrncmp(tok, "swap", 4)) {
bb_pool = NULL;
have_bb = true;
tok += 4;
while (isspace(tok[0]) && (tok[0] != '\0'))
tok++;
swap_cnt += strtol(tok, &end_ptr, 10);
if ((job_desc->max_nodes == 0) ||
(job_desc->max_nodes == NO_VAL)) {
info("user %u submitted job with swap space specification, but no max node count specification",
job_desc->user_id);
if (job_desc->min_nodes == NO_VAL)
job_desc->min_nodes = 1;
job_desc->max_nodes =
job_desc->min_nodes;
}
tmp_cnt = swap_cnt * job_desc->max_nodes;
if ((sub_tok = strstr(tok, "pool="))) {
bb_pool = xstrdup(sub_tok + 5);
if ((sub_tok = strchr(bb_pool, ' ')))
sub_tok[0] = '\0';
}
if (!bb_valid_pool_test(&bb_state, bb_pool))
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
*bb_size += _set_granularity(tmp_cnt, bb_pool);
xfree(bb_pool);
} else if (!xstrncmp(tok, "stage_out", 9)) {
have_stage_out = true;
} else if (!xstrncmp(tok, "create_persistent", 17) ||
!xstrncmp(tok, "destroy_persistent", 18)) {
/*
* Disable support until Slurm v18.08 to prevent
* user directed persistent burst buffer changes
* outside of Slurm control.
*/
rc = ESLURM_BURST_BUFFER_PERMISSION;
break;
}
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(bb_script);
if (!have_bb)
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
if (!have_stage_out) {
/* prevent sending stage out email */
job_desc->mail_type &= (~MAIL_JOB_STAGE_OUT);
}
return rc;
}
/* Copy a batch job's burst_buffer options into a separate buffer.
* merge continued lines into a single line */
static int _xlate_batch(job_desc_msg_t *job_desc)
{
char *script, *save_ptr = NULL, *tok;
int line_type, prev_type = LINE_OTHER;
bool is_cont = false, has_space = false;
int len, rc = SLURM_SUCCESS;
/*
* Any command line --bb options get added to the script
*/
if (job_desc->burst_buffer) {
rc = _xlate_interactive(job_desc);
if (rc != SLURM_SUCCESS)
return rc;
run_command_add_to_script(&job_desc->script,
job_desc->burst_buffer);
xfree(job_desc->burst_buffer);
}
script = xstrdup(job_desc->script);
tok = strtok_r(script, "\n", &save_ptr);
while (tok) {
if (tok[0] != '#')
break; /* Quit at first non-comment */
if ((tok[1] == 'B') && (tok[2] == 'B'))
line_type = LINE_BB;
else if ((tok[1] == 'D') && (tok[2] == 'W'))
line_type = LINE_DW;
else
line_type = LINE_OTHER;
if (line_type == LINE_OTHER) {
is_cont = false;
} else {
if (is_cont) {
if (line_type != prev_type) {
/*
* Mixing "#DW" with "#BB" on same
* (continued) line, error
*/
rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
break;
}
tok += 3; /* Skip "#DW" or "#BB" */
while (has_space && isspace(tok[0]))
tok++; /* Skip duplicate spaces */
} else if (job_desc->burst_buffer) {
xstrcat(job_desc->burst_buffer, "\n");
}
prev_type = line_type;
len = strlen(tok);
if (tok[len - 1] == '\\') {
has_space = isspace(tok[len - 2]);
tok[strlen(tok) - 1] = '\0';
is_cont = true;
} else {
is_cont = false;
}
xstrcat(job_desc->burst_buffer, tok);
}
tok = strtok_r(NULL, "\n", &save_ptr);
}
xfree(script);
if (rc != SLURM_SUCCESS)
xfree(job_desc->burst_buffer);
return rc;
}
/* Parse simple interactive burst_buffer options into an format identical to
* burst_buffer options in a batch script file */
static int _xlate_interactive(job_desc_msg_t *job_desc)
{
char *access = NULL, *bb_copy = NULL, *capacity = NULL, *pool = NULL;
char *swap = NULL, *type = NULL;
char *end_ptr = NULL, *sep, *tok;
uint64_t buf_size = 0, swap_cnt = 0;
int i, rc = SLURM_SUCCESS, tok_len;
if (!job_desc->burst_buffer || (job_desc->burst_buffer[0] == '#'))
return rc;
if (strstr(job_desc->burst_buffer, "create_persistent") ||
strstr(job_desc->burst_buffer, "destroy_persistent")) {
/* Create or destroy of persistent burst buffers NOT supported
* via --bb option. Use --bbf or a batch script instead. */
return ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
bb_copy = xstrdup(job_desc->burst_buffer);
if ((tok = strstr(bb_copy, "access="))) {
access = xstrdup(tok + 7);
sep = strchr(access, ',');
if (sep)
sep[0] = '\0';
sep = strchr(access, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(access) + 7;
memset(tok, ' ', tok_len);
}
if ((access == NULL) && /* Not set above with "access=" */
(tok = strstr(bb_copy, "access_mode="))) {
access = xstrdup(tok + 12);
sep = strchr(access, ',');
if (sep)
sep[0] = '\0';
sep = strchr(access, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(access) + 12;
memset(tok, ' ', tok_len);
}
if ((tok = strstr(bb_copy, "capacity="))) {
buf_size = bb_get_size_num(tok + 9, 1);
if (buf_size == 0) {
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
goto fini;
}
capacity = xstrdup(tok + 9);
sep = strchr(capacity, ',');
if (sep)
sep[0] = '\0';
sep = strchr(capacity, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(capacity) + 9;
memset(tok, ' ', tok_len);
}
if ((tok = strstr(bb_copy, "pool="))) {
pool = xstrdup(tok + 5);
sep = strchr(pool, ',');
if (sep)
sep[0] = '\0';
sep = strchr(pool, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(pool) + 5;
memset(tok, ' ', tok_len);
}
if ((tok = strstr(bb_copy, "swap="))) {
swap_cnt = strtol(tok + 5, &end_ptr, 10);
if (swap_cnt == 0) {
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
goto fini;
}
swap = xstrdup(tok + 5);
sep = strchr(swap, ',');
if (sep)
sep[0] = '\0';
sep = strchr(swap, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(swap) + 5;
memset(tok, ' ', tok_len);
}
if ((tok = strstr(bb_copy, "type="))) {
type = xstrdup(tok + 5);
sep = strchr(type, ',');
if (sep)
sep[0] = '\0';
sep = strchr(type, ' ');
if (sep)
sep[0] = '\0';
tok_len = strlen(type) + 5;
memset(tok, ' ', tok_len);
}
if (rc == SLURM_SUCCESS) {
/* Look for vestigial content. Treating this as an error would
* prevent backward compatibility. Just log it for now. */
for (i = 0; bb_copy[i]; i++) {
if (isspace(bb_copy[i]))
continue;
verbose("Unrecognized --bb content: %s",
bb_copy + i);
// rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
// goto fini;
}
}
if (rc == SLURM_SUCCESS)
xfree(job_desc->burst_buffer);
if ((rc == SLURM_SUCCESS) && (swap_cnt || buf_size)) {
if (swap_cnt) {
xstrfmtcat(job_desc->burst_buffer,
"#DW swap %"PRIu64"GiB", swap_cnt);
if (pool) {
xstrfmtcat(job_desc->burst_buffer,
" pool=%s", pool);
}
}
if (buf_size) {
if (job_desc->burst_buffer)
xstrfmtcat(job_desc->burst_buffer, "\n");
xstrfmtcat(job_desc->burst_buffer,
"#DW jobdw capacity=%s",
bb_get_size_str(buf_size));
if (access) {
xstrfmtcat(job_desc->burst_buffer,
" access_mode=%s", access);
}
if (pool) {
xstrfmtcat(job_desc->burst_buffer,
" pool=%s", pool);
}
if (type) {
xstrfmtcat(job_desc->burst_buffer,
" type=%s", type);
}
}
}
fini: xfree(access);
xfree(bb_copy);
xfree(capacity);
xfree(pool);
xfree(swap);
xfree(type);
return rc;
}
/*
* Read and validate configuration file.
* Spawn thread to periodically read Datawarp state.
*/
extern int init(void)
{
slurm_mutex_init(&bb_state.bb_mutex);
slurm_mutex_init(&bb_state.term_mutex);
slurm_mutex_lock(&bb_state.bb_mutex);
bb_load_config(&bb_state, (char *)plugin_type); /* Removes "const" */
_test_config();
log_flag(BURST_BUF, "");
bb_alloc_cache(&bb_state);
slurm_thread_create(&bb_state.bb_thread, _bb_agent, NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
extern void fini(void)
{
int pc, last_pc = 0;
while ((pc = run_command_count()) > 0) {
if ((last_pc != 0) && (last_pc != pc)) {
info("waiting for %d running processes",
pc);
}
last_pc = pc;
usleep(100000);
}
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "");
slurm_mutex_lock(&bb_state.term_mutex);
bb_state.term_flag = true;
slurm_cond_signal(&bb_state.term_cond);
slurm_mutex_unlock(&bb_state.term_mutex);
if (bb_state.bb_thread) {
slurm_mutex_unlock(&bb_state.bb_mutex);
slurm_thread_join(bb_state.bb_thread);
slurm_mutex_lock(&bb_state.bb_mutex);
}
bb_clear_config(&bb_state.bb_config, true);
bb_clear_cache(&bb_state);
slurm_mutex_unlock(&bb_state.bb_mutex);
}
static void _pre_queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
{
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_POST_RUN);
job_state_set_flag(job_ptr, JOB_STAGE_OUT);
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s: Stage-out in progress",
plugin_type);
_queue_stage_out(job_ptr, bb_job);
}
/*
* This function should only be called from _purge_vestigial_bufs().
* We need to reset the burst buffer state and restart any threads that may
* have been running before slurmctld was shutdown, depending on the state
* that the burst buffer is in.
*/
static void _recover_job_bb(job_record_t *job_ptr, bb_alloc_t *bb_alloc,
time_t defer_time)
{
bb_job_t *bb_job;
uint16_t job_bb_state = bb_state_num(job_ptr->burst_buffer_state);
/*
* Call _get_bb_job() to create a cache of the job's burst buffer info,
* including the state. Lots of functions will call this so do it now to
* create the cache, and we may need to change the burst buffer state.
* The job burst buffer state is set in job_ptr and in bb_job.
* bb_alloc is used for persistent burst buffers, so bb_alloc->state
* isn't used for job burst buffers.
*/
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
/* This shouldn't happen. */
error("%s: %pJ does not have a burst buffer specification, tearing down vestigial burst buffer.",
__func__, job_ptr);
_queue_teardown(bb_alloc->job_id, bb_alloc->user_id, false);
return;
}
switch(job_bb_state) {
/*
* First 4 states are specific to persistent burst buffers.
* We shouldn't get here since _purge_vestigial_bufs() handles
* persistent burst buffers.
*/
case BB_STATE_ALLOCATING:
case BB_STATE_ALLOCATED:
case BB_STATE_DELETING:
case BB_STATE_DELETED:
error("%s: Unexpected burst buffer state %s for %pJ",
__func__, job_ptr->burst_buffer_state, job_ptr);
break;
/* Pending states for jobs: */
case BB_STATE_STAGING_IN:
case BB_STATE_STAGED_IN:
case BB_STATE_ALLOC_REVOKE:
/*
* We do not know the state of file staging,
* so teardown the buffer and defer the job
* for at least 60 seconds (for the teardown).
* Also set the burst buffer state back to PENDING.
*/
log_flag(BURST_BUF, "Purging buffer for pending %pJ",
job_ptr);
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, true);
if (job_ptr->details &&
(job_ptr->details->begin_time < defer_time)){
job_ptr->details->begin_time =
defer_time;
}
break;
/* Running states for jobs: */
case BB_STATE_PRE_RUN:
/*
* slurmctld will call bb_g_job_begin() which will
* handle burst buffers in this state.
*/
break;
case BB_STATE_RUNNING:
case BB_STATE_SUSPEND:
/* Nothing to do here. */
break;
/* Post running states for jobs: */
case BB_STATE_POST_RUN:
case BB_STATE_STAGING_OUT:
case BB_STATE_STAGED_OUT:
log_flag(BURST_BUF, "Restarting burst buffer stage out for %pJ",
job_ptr);
/*
* _pre_queue_stage_out() sets the burst buffer state
* correctly and restarts the needed thread.
*/
_pre_queue_stage_out(job_ptr, bb_job);
break;
case BB_STATE_TEARDOWN:
case BB_STATE_TEARDOWN_FAIL:
log_flag(BURST_BUF, "Restarting burst buffer teardown for %pJ",
job_ptr);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, false);
break;
case BB_STATE_COMPLETE:
/*
* We shouldn't get here since the bb_alloc record is
* removed when the job's bb state is set to
* BB_STATE_COMPLETE during teardown.
*/
log_flag(BURST_BUF, "Clearing burst buffer for completed job %pJ",
job_ptr);
/*
* Subtract the space this burst buffer was allocated
* since _load_state() calls _apply_limits()
* which calls bb_limit_add() for all burst buffers.
*/
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
break;
default:
error("%s: Invalid job burst buffer state %s for %pJ",
__func__, job_ptr->burst_buffer_state, job_ptr);
break;
}
}
/* Identify and purge any vestigial buffers (i.e. we have a job buffer, but
* the matching job is either gone or completed OR we have a job buffer and a
* pending job, but don't know the status of stage-in) */
static void _purge_vestigial_bufs(void)
{
bb_alloc_t *bb_alloc = NULL;
time_t defer_time = time(NULL) + 60;
int i;
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
job_record_t *job_ptr = NULL;
if (bb_alloc->job_id)
job_ptr = find_job_record(bb_alloc->job_id);
if (bb_alloc->job_id == 0) {
/* Persistent buffer, do not purge */
} else if (!job_ptr) {
info("Purging vestigial buffer for JobId=%u",
bb_alloc->job_id);
_queue_teardown(bb_alloc->job_id,
bb_alloc->user_id, false);
} else {
_recover_job_bb(job_ptr, bb_alloc, defer_time);
}
bb_alloc = bb_alloc->next;
}
}
}
static bool _is_directive(char *tok)
{
if ((tok[0] == '#') &&
(((tok[1] == 'B') && (tok[2] == 'B')) ||
((tok[1] == 'D') && (tok[2] == 'W')))) {
return true;
}
return false;
}
extern char *bb_p_build_het_job_script(char *script, uint32_t het_job_offset)
{
return bb_common_build_het_job_script(script, het_job_offset,
_is_directive);
}
/*
* Return the total burst buffer size in MB
*/
extern uint64_t bb_p_get_system_size(void)
{
uint64_t size = 0;
slurm_mutex_lock(&bb_state.bb_mutex);
size = bb_state.total_space / (1024 * 1024); /* bytes to MB */
slurm_mutex_unlock(&bb_state.bb_mutex);
return size;
}
/*
* Load the current burst buffer state (e.g. how much space is available now).
* Run at the beginning of each scheduling cycle in order to recognize external
* changes to the burst buffer state (e.g. capacity is added, removed, fails,
* etc.)
*
* init_config IN - true if called as part of slurmctld initialization
* Returns a Slurm errno.
*/
extern int bb_p_load_state(bool init_config)
{
if (!init_config)
return SLURM_SUCCESS;
/* In practice the Cray APIs are too slow to run inline on each
* scheduling cycle. Do so on a periodic basis from _bb_agent(). */
log_flag(BURST_BUF, "");
_load_state(init_config); /* Has own locking */
slurm_mutex_lock(&bb_state.bb_mutex);
bb_set_tres_pos(&bb_state);
_purge_vestigial_bufs();
slurm_mutex_unlock(&bb_state.bb_mutex);
_save_bb_state(); /* Has own locks excluding file write */
return SLURM_SUCCESS;
}
/*
* Return string containing current burst buffer status
* argc IN - count of status command arguments
* argv IN - status command arguments
* uid - authenticated UID
* gid - authenticated GID
* RET status string, release memory using xfree()
*/
extern char *bb_p_get_status(uint32_t argc, char **argv, uint32_t uid,
uint32_t gid)
{
char *status_resp, **script_argv;
int i, status = 0;
run_command_args_t run_command_args = {
.max_wait = 2000,
.script_path = bb_state.bb_config.get_sys_status,
.script_type = "dwstat",
.status = &status,
};
script_argv = xcalloc((argc + 2), sizeof(char *));
script_argv[0] = "dwstat";
for (i = 0; i < argc; i++)
script_argv[i + 1] = argv[i];
run_command_args.script_argv = script_argv;
status_resp = run_command(&run_command_args);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
xfree(status_resp);
status_resp = xstrdup("Error running dwstat\n");
}
xfree(script_argv);
return status_resp;
}
/*
* Note configuration may have changed. Handle changes in BurstBufferParameters.
*
* Returns a Slurm errno.
*/
extern int bb_p_reconfig(void)
{
char *old_default_pool;
int i;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "");
old_default_pool = bb_state.bb_config.default_pool;
bb_state.bb_config.default_pool = NULL;
bb_load_config(&bb_state, (char *)plugin_type); /* Remove "const" */
if (!bb_state.bb_config.default_pool)
bb_state.bb_config.default_pool = old_default_pool;
else
xfree(old_default_pool);
_test_config();
slurm_mutex_unlock(&bb_state.bb_mutex);
/* reconfig is the place we make sure the pointers are correct */
for (i = 0; i < BB_HASH_SIZE; i++) {
bb_alloc_t *bb_alloc = bb_state.bb_ahash[i];
while (bb_alloc) {
_set_assoc_mgr_ptrs(bb_alloc);
bb_alloc = bb_alloc->next;
}
}
return SLURM_SUCCESS;
}
/*
* Pack current burst buffer state information for network transmission to
* user (e.g. "scontrol show burst")
*
* Returns a Slurm errno.
*/
extern int bb_p_state_pack(uid_t uid, buf_t *buffer, uint16_t protocol_version)
{
uint32_t rec_count = 0;
slurm_mutex_lock(&bb_state.bb_mutex);
packstr(bb_state.name, buffer);
bb_pack_state(&bb_state, buffer, protocol_version);
if (((bb_state.bb_config.flags & BB_FLAG_PRIVATE_DATA) == 0) ||
validate_operator(uid))
uid = 0; /* User can see all data */
rec_count = bb_pack_bufs(uid, &bb_state, buffer, protocol_version);
(void) bb_pack_usage(uid, &bb_state, buffer, protocol_version);
log_flag(BURST_BUF, "record_count:%u",
rec_count);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/*
* Preliminary validation of a job submit request with respect to burst buffer
* options. Performed after setting default account + qos, but prior to
* establishing job ID or creating script file.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_validate(job_desc_msg_t *job_desc, uid_t submit_uid,
char **err_msg)
{
uint64_t bb_size = 0;
int i, rc;
xassert(job_desc);
xassert(job_desc->tres_req_cnt);
xassert(err_msg);
rc = _parse_bb_opts(job_desc, &bb_size, submit_uid);
if (rc != SLURM_SUCCESS)
return rc;
if ((job_desc->burst_buffer == NULL) ||
(job_desc->burst_buffer[0] == '\0'))
return rc;
log_flag(BURST_BUF, "job_user_id:%u, submit_uid:%u",
job_desc->user_id, submit_uid);
log_flag(BURST_BUF, "burst_buffer:%s",
job_desc->burst_buffer);
if (job_desc->user_id == 0) {
info("User root can not allocate burst buffers");
*err_msg = xstrdup("User root can not allocate burst buffers");
return ESLURM_BURST_BUFFER_PERMISSION;
}
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.bb_config.allow_users) {
bool found_user = false;
for (i = 0; bb_state.bb_config.allow_users[i]; i++) {
if (job_desc->user_id ==
bb_state.bb_config.allow_users[i]) {
found_user = true;
break;
}
}
if (!found_user) {
*err_msg = xstrdup("User not found in AllowUsers");
rc = ESLURM_BURST_BUFFER_PERMISSION;
goto fini;
}
}
if (bb_state.bb_config.deny_users) {
bool found_user = false;
for (i = 0; bb_state.bb_config.deny_users[i]; i++) {
if (job_desc->user_id ==
bb_state.bb_config.deny_users[i]) {
found_user = true;
break;
}
}
if (found_user) {
*err_msg = xstrdup("User found in DenyUsers");
rc = ESLURM_BURST_BUFFER_PERMISSION;
goto fini;
}
}
if (bb_state.tres_pos > 0) {
job_desc->tres_req_cnt[bb_state.tres_pos] =
bb_size / (1024 * 1024);
}
fini: slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/* Add key=value pairs from "resp_msg" to the job's environment */
static void _update_job_env(job_record_t *job_ptr, char *file_path)
{
struct stat stat_buf;
char *data_buf = NULL, *start, *sep;
int path_fd, i, inx = 0, env_cnt = 0;
ssize_t read_size;
/* Read the DataWarp generated environment variable file */
path_fd = open(file_path, O_CLOEXEC);
if (path_fd == -1) {
error("open error on file %s: %m",
file_path);
return;
}
if (fstat(path_fd, &stat_buf) == -1) {
error("stat error on file %s: %m",
file_path);
stat_buf.st_size = 2048;
} else if (stat_buf.st_size == 0)
goto fini;
data_buf = xmalloc_nz(stat_buf.st_size + 1);
while (inx < stat_buf.st_size) {
read_size = read(path_fd, data_buf + inx, stat_buf.st_size);
if (read_size < 0)
data_buf[inx] = '\0';
else
data_buf[inx + read_size] = '\0';
if (read_size > 0) {
inx += read_size;
} else if (read_size == 0) { /* EOF */
break;
} else if (read_size < 0) { /* error */
if ((errno == EAGAIN) || (errno == EINTR))
continue;
error("read error on file %s: %m",
file_path);
break;
}
}
log_flag(BURST_BUF, "%s", data_buf);
/* Get count of environment variables in the file */
env_cnt = 0;
if (data_buf) {
for (i = 0; data_buf[i]; i++) {
if (data_buf[i] == '=')
env_cnt++;
}
}
/* Add to supplemental environment variables (in job record) */
if (env_cnt) {
xrecalloc(job_ptr->details->env_sup,
MAX(job_ptr->details->env_cnt + env_cnt, 1 + env_cnt),
sizeof(char *));
start = data_buf;
for (i = 0; (i < env_cnt) && start[0]; i++) {
sep = strchr(start, '\n');
if (sep)
sep[0] = '\0';
job_ptr->details->env_sup[job_ptr->details->env_cnt++] =
xstrdup(start);
if (sep)
start = sep + 1;
else
break;
}
}
fini: xfree(data_buf);
close(path_fd);
}
/* Return true if #DW options (excludes #BB options) */
static bool _have_dw_cmd_opts(bb_job_t *bb_job)
{
int i;
bb_buf_t *bb_buf;
xassert(bb_job);
if (bb_job->total_size)
return true;
for (i = 0, bb_buf = bb_job->buf_ptr; i < bb_job->buf_cnt;
i++, bb_buf++) {
if (bb_buf->use)
return true;
}
return false;
}
/*
* Secondary validation of a job submit request with respect to burst buffer
* options. Performed after establishing job ID and creating script file.
*
* NOTE: We run several DW APIs at job submit time so that we can notify the
* user immediately if there is some error, although that can be a relatively
* slow operation. We have a timeout of 3 seconds on the DW APIs here and log
* any times over 0.2 seconds.
*
* NOTE: We do this work inline so the user can be notified immediately if
* there is some problem with their script.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_validate2(job_record_t *job_ptr, char **err_msg)
{
char *hash_dir = NULL, *job_dir = NULL, *script_file = NULL;
char *task_script_file = NULL;
char *resp_msg = NULL, **script_argv;
char *dw_cli_path;
int fd = -1, hash_inx, rc = SLURM_SUCCESS, status = 0;
bb_job_t *bb_job;
uint32_t timeout;
bool using_master_script = false;
DEF_TIMERS;
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.script_type = "job_process",
.status = &status,
};
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0')) {
if (job_ptr->details->min_nodes == 0)
rc = ESLURM_INVALID_NODE_COUNT;
return rc;
}
/* Initialization */
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.last_load_time == 0) {
/* Assume request is valid for now, can't test it anyway */
info("Burst buffer down, skip tests for %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
bb_job = _get_bb_job(job_ptr);
if (bb_job == NULL) {
slurm_mutex_unlock(&bb_state.bb_mutex);
if (job_ptr->details->min_nodes == 0)
rc = ESLURM_INVALID_NODE_COUNT;
return rc;
}
if ((job_ptr->details->min_nodes == 0) && bb_job->use_job_buf) {
slurm_mutex_unlock(&bb_state.bb_mutex);
return ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
if (!_have_dw_cmd_opts(bb_job)) {
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
log_flag(BURST_BUF, "%pJ", job_ptr);
timeout = bb_state.bb_config.validate_timeout * 1000;
dw_cli_path = xstrdup(bb_state.bb_config.get_sys_state);
slurm_mutex_unlock(&bb_state.bb_mutex);
/* Standard file location for job arrays */
if ((job_ptr->array_task_id != NO_VAL) &&
(job_ptr->array_job_id != job_ptr->job_id)) {
hash_inx = job_ptr->array_job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir,
job_ptr->array_job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(script_file, "%s/script", job_dir);
fd = open(script_file, 0);
if (fd >= 0) { /* found the script */
close(fd);
using_master_script = true;
} else {
xfree(hash_dir);
}
} else {
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
(void) mkdir(job_dir, 0700);
xstrfmtcat(script_file, "%s/script", job_dir);
if (job_ptr->batch_flag == 0)
rc = bb_build_bb_script(job_ptr, script_file);
}
/* Run "job_process" function, validates user script */
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("job_process");
script_argv[3] = xstrdup("--job");
xstrfmtcat(script_argv[4], "%s", script_file);
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
if ((DELTA_TIMER > 200000) || /* 0.2 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF))
info("job_process ran for %s",
TIME_STR);
_log_script_argv(script_argv, resp_msg);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
error("job_process for %pJ status:%u response:%s",
job_ptr, status, resp_msg);
if (err_msg) {
xfree(*err_msg);
xstrfmtcat(*err_msg, "%s: %s", plugin_type, resp_msg);
}
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
}
xfree(resp_msg);
xfree_array(script_argv);
/* Clean-up */
xfree(hash_dir);
xfree(job_dir);
xfree(dw_cli_path);
if (rc != SLURM_SUCCESS) {
slurm_mutex_lock(&bb_state.bb_mutex);
bb_job_del(&bb_state, job_ptr->job_id);
slurm_mutex_unlock(&bb_state.bb_mutex);
} else if (using_master_script) {
/* Job array's need to have script file in the "standard"
* location for the remaining logic, make hard link */
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(hash_dir, "%s/hash.%d",
slurm_conf.state_save_location, hash_inx);
(void) mkdir(hash_dir, 0700);
xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
xfree(hash_dir);
(void) mkdir(job_dir, 0700);
xstrfmtcat(task_script_file, "%s/script", job_dir);
xfree(job_dir);
if ((link(script_file, task_script_file) != 0) &&
(errno != EEXIST)) {
error("link(%s,%s): %m",
script_file,
task_script_file);
}
}
xfree(task_script_file);
xfree(script_file);
return rc;
}
static struct bb_total_size *_json_parse_real_size(json_object *j)
{
enum json_type type;
struct json_object_iter iter;
struct bb_total_size *bb_tot_sz;
const char *p;
bb_tot_sz = xmalloc(sizeof(struct bb_total_size));
json_object_object_foreachC(j, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_string:
if (!xstrcmp(iter.key, "units")) {
p = json_object_get_string(iter.val);
if (!xstrcmp(p, "bytes")) {
bb_tot_sz->units =
BB_UNITS_BYTES;
}
}
break;
case json_type_int:
if (!xstrcmp(iter.key, "capacity")) {
bb_tot_sz->capacity =
json_object_get_int64(iter.val);
}
break;
default:
break;
}
}
return bb_tot_sz;
}
/*
* Fill in the tres_cnt (in MB) based off the job record
* NOTE: Based upon job-specific burst buffers, excludes persistent buffers
* IN job_ptr - job record
* IN/OUT tres_cnt - fill in this already allocated array with tres_cnts
* IN locked - if the assoc_mgr tres read locked is locked or not
*/
extern void bb_p_job_set_tres_cnt(job_record_t *job_ptr, uint64_t *tres_cnt,
bool locked)
{
bb_job_t *bb_job;
if (!tres_cnt) {
error("No tres_cnt given when looking at %pJ",
job_ptr);
}
if (bb_state.tres_pos < 0) {
/* BB not defined in AccountingStorageTRES */
return;
}
slurm_mutex_lock(&bb_state.bb_mutex);
if ((bb_job = _get_bb_job(job_ptr))) {
tres_cnt[bb_state.tres_pos] =
bb_job->total_size / (1024 * 1024);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
}
/*
* For a given job, return our best guess if when it might be able to start
*/
extern time_t bb_p_job_get_est_start(job_record_t *job_ptr)
{
time_t est_start = time(NULL);
bb_job_t *bb_job;
int rc;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return est_start;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE))) {
est_start += 300; /* 5 minutes, guess... */
return est_start; /* Can't operate on job array struct */
}
slurm_mutex_lock(&bb_state.bb_mutex);
if (bb_state.last_load_time == 0) {
est_start += 3600; /* 1 hour, guess... */
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start; /* Can't operate on job array struct */
}
if ((bb_job = _get_bb_job(job_ptr)) == NULL) {
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start;
}
log_flag(BURST_BUF, "%pJ", job_ptr);
if ((bb_job->persist_add == 0) && (bb_job->swap_size == 0) &&
(bb_job->total_size == 0)) {
/* Only deleting or using persistent buffers */
if (!_test_persistent_use_ready(bb_job, job_ptr))
est_start += 60 * 60; /* one hour, guess... */
} else if (bb_job->state == BB_STATE_PENDING) {
rc = bb_test_size_limit(job_ptr, bb_job, &bb_state,
_queue_teardown);
if (rc == 0) { /* Could start now */
;
} else if (rc == 1) { /* Exceeds configured limits */
est_start += 365 * 24 * 60 * 60;
} else { /* No space currently available */
est_start = MAX(est_start, bb_state.next_end_time);
}
} else { /* Allocation or staging in progress */
est_start++;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return est_start;
}
/*
* Attempt to allocate resources and begin file staging for pending jobs.
*/
extern int bb_p_job_try_stage_in(list_t *job_queue)
{
bb_job_queue_rec_t *job_rec;
list_t *job_candidates;
list_itr_t *job_iter;
job_record_t *job_ptr;
bb_job_t *bb_job;
int rc;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "Mutex locked");
if (bb_state.last_load_time == 0) {
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/* Identify candidates to be allocated burst buffers */
job_candidates = list_create(_job_queue_del);
job_iter = list_iterator_create(job_queue);
while ((job_ptr = list_next(job_iter))) {
if (!IS_JOB_PENDING(job_ptr) ||
(job_ptr->start_time == 0) ||
(job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
continue;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE)))
continue; /* Can't operate on job array struct */
bb_job = _get_bb_job(job_ptr);
if (bb_job == NULL)
continue;
if (bb_job->state == BB_STATE_COMPLETE)
bb_set_job_bb_state(job_ptr, bb_job, /* job requeued */
BB_STATE_PENDING);
else if (bb_job->state >= BB_STATE_POST_RUN)
continue; /* Requeued job still staging out */
job_rec = xmalloc(sizeof(bb_job_queue_rec_t));
job_rec->job_ptr = job_ptr;
job_rec->bb_job = bb_job;
list_push(job_candidates, job_rec);
}
list_iterator_destroy(job_iter);
/* Sort in order of expected start time */
list_sort(job_candidates, bb_job_queue_sort);
bb_set_use_time(&bb_state);
job_iter = list_iterator_create(job_candidates);
while ((job_rec = list_next(job_iter))) {
job_ptr = job_rec->job_ptr;
bb_job = job_rec->bb_job;
if (bb_job->state >= BB_STATE_STAGING_IN)
continue; /* Job was already allocated a buffer */
rc = bb_test_size_limit(job_ptr, bb_job, &bb_state,
_queue_teardown);
if (rc == 0) /* Could start now */
(void) _alloc_job_bb(job_ptr, bb_job, true);
else if (rc == 1) /* Exceeds configured limits */
continue;
else /* No space currently available */
break;
}
list_iterator_destroy(job_iter);
slurm_mutex_unlock(&bb_state.bb_mutex);
FREE_NULL_LIST(job_candidates);
return SLURM_SUCCESS;
}
/*
* Determine if a job's burst buffer stage-in is complete
* job_ptr IN - Job to test
* test_only IN - If false, then attempt to allocate burst buffer if possible
*
* RET: 0 - stage-in is underway
* 1 - stage-in complete
* -1 - stage-in not started or burst buffer in some unexpected state
*/
extern int bb_p_job_test_stage_in(job_record_t *job_ptr, bool test_only)
{
bb_job_t *bb_job = NULL;
int rc = 1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
if (job_ptr->array_recs &&
((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE)))
return -1; /* Can't operate on job array structure */
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ test_only:%d",
job_ptr, (int) test_only);
if (bb_state.last_load_time != 0)
bb_job = _get_bb_job(job_ptr);
if (bb_job && (bb_job->state == BB_STATE_COMPLETE))
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_PENDING); /* job requeued */
if (bb_job == NULL) {
rc = -1;
} else if (bb_job->state < BB_STATE_STAGING_IN) {
/* Job buffer not allocated, create now if space available */
rc = -1;
if ((test_only == false) &&
(bb_test_size_limit(job_ptr, bb_job, &bb_state,
_queue_teardown) == 0) &&
(_alloc_job_bb(job_ptr, bb_job, false) == SLURM_SUCCESS)) {
rc = 0; /* Setup/stage-in in progress */
}
} else if (bb_job->state == BB_STATE_STAGING_IN) {
rc = 0;
} else if (bb_job->state == BB_STATE_STAGED_IN) {
rc = 1;
} else {
rc = -1; /* Requeued job still staging in */
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/* Attempt to claim burst buffer resources.
* At this time, bb_g_job_test_stage_in() should have been run successfully AND
* the compute nodes selected for the job.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_begin(job_record_t *job_ptr)
{
char *client_nodes_file_nid = NULL;
pre_run_args_t *pre_run_args;
char **pre_run_argv = NULL, **script_argv = NULL;
char *job_dir = NULL, *path_file, *resp_msg;
int arg_inx, hash_inx, rc = SLURM_SUCCESS, status = 0;
bb_job_t *bb_job;
uint32_t timeout;
bool do_pre_run;
DEF_TIMERS;
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.script_type = "paths",
.status = &status,
};
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return SLURM_SUCCESS;
if (((!job_ptr->job_resrcs || !job_ptr->job_resrcs->nodes)) &&
(job_ptr->details->min_nodes != 0)) {
error("%pJ lacks node allocation",
job_ptr);
return SLURM_ERROR;
}
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ",
job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not start %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
error("no job record buffer for %pJ",
job_ptr);
xfree(job_ptr->state_desc);
job_ptr->state_desc =
xstrdup("Could not find burst buffer record");
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
do_pre_run = _have_dw_cmd_opts(bb_job);
/* Confirm that persistent burst buffers work has been completed */
if ((_create_bufs(job_ptr, bb_job, true) > 0)) {
xfree(job_ptr->state_desc);
job_ptr->state_desc =
xstrdup("Error managing persistent burst buffers");
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(job_dir, "%s/hash.%d/job.%u",
slurm_conf.state_save_location, hash_inx, job_ptr->job_id);
xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
if (do_pre_run)
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_PRE_RUN);
else
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_RUNNING);
slurm_mutex_unlock(&bb_state.bb_mutex);
if (job_ptr->job_resrcs && job_ptr->job_resrcs->nodes &&
bb_write_nid_file(client_nodes_file_nid, job_ptr->job_resrcs->nodes,
job_ptr)) {
xfree(client_nodes_file_nid);
}
/* Run "paths" function, get DataWarp environment variables */
if (do_pre_run) {
/* Setup "paths" operation */
timeout = bb_state.bb_config.validate_timeout * 1000;
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminate */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("paths");
script_argv[3] = xstrdup("--job");
xstrfmtcat(script_argv[4], "%s/script", job_dir);
script_argv[5] = xstrdup("--token");
xstrfmtcat(script_argv[6], "%u", job_ptr->job_id);
script_argv[7] = xstrdup("--pathfile");
xstrfmtcat(script_argv[8], "%s/path", job_dir);
path_file = script_argv[8];
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
if ((DELTA_TIMER > 200000) || /* 0.2 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF))
info("paths ran for %s",
TIME_STR);
_log_script_argv(script_argv, resp_msg);
#if 1
//FIXME: Cray API returning "job_file_valid True" but exit 1 in some cases
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
(!resp_msg ||
strncmp(resp_msg, "job_file_valid True", 19))) {
#else
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
#endif
error("paths for %pJ status:%u response:%s",
job_ptr, status, resp_msg);
xfree(resp_msg);
rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
xfree_array(script_argv);
goto fini;
} else {
_update_job_env(job_ptr, path_file);
xfree(resp_msg);
}
xfree_array(script_argv);
/* Setup "pre_run" operation */
pre_run_argv = xcalloc(12, sizeof(char *));
pre_run_argv[0] = xstrdup("dw_wlm_cli");
pre_run_argv[1] = xstrdup("--function");
pre_run_argv[2] = xstrdup("pre_run");
pre_run_argv[3] = xstrdup("--token");
xstrfmtcat(pre_run_argv[4], "%u", job_ptr->job_id);
pre_run_argv[5] = xstrdup("--job");
xstrfmtcat(pre_run_argv[6], "%s/script", job_dir);
arg_inx = 7;
if (client_nodes_file_nid) {
#if defined(HAVE_NATIVE_CRAY)
pre_run_argv[arg_inx++] = xstrdup("--nidlistfile");
#else
pre_run_argv[arg_inx++] = xstrdup("--nodehostnamefile");
#endif
pre_run_argv[arg_inx++] =
xstrdup(client_nodes_file_nid);
}
pre_run_args = xmalloc(sizeof(pre_run_args_t));
pre_run_args->args = pre_run_argv;
pre_run_args->job_id = job_ptr->job_id;
pre_run_args->timeout = bb_state.bb_config.other_timeout * 1000;
pre_run_args->user_id = job_ptr->user_id;
if (job_ptr->details) { /* Defer launch until completion */
job_ptr->details->prolog_running++;
job_state_set_flag(job_ptr, JOB_CONFIGURING);
}
slurm_thread_create_detached(_start_pre_run, pre_run_args);
}
fini:
xfree(client_nodes_file_nid);
xfree(job_dir);
return rc;
}
/* Kill job from CONFIGURING state */
static void _kill_job(job_record_t *job_ptr, bool hold_job)
{
last_job_update = time(NULL);
job_ptr->end_time = last_job_update;
if (hold_job)
job_ptr->priority = 0;
build_cg_bitmap(job_ptr);
job_ptr->exit_code = 1;
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
job_ptr->state_desc = xstrdup("Burst buffer pre_run error");
job_state_set(job_ptr, JOB_REQUEUE);
job_completion_logger(job_ptr, true);
job_state_set(job_ptr, (JOB_PENDING | JOB_COMPLETING));
deallocate_nodes(job_ptr, false, false, false);
}
static void *_start_pre_run(void *x)
{
/* Locks: read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
/* Locks: write job */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
pre_run_args_t *pre_run_args = (pre_run_args_t *) x;
char *resp_msg = NULL;
bb_job_t *bb_job = NULL;
int status = 0;
job_record_t *job_ptr;
bool run_kill_job = false;
bool hold_job = false, nodes_ready = false;
DEF_TIMERS;
pthread_t tid = pthread_self();
track_script_rec_add(pre_run_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.max_wait = pre_run_args->timeout * 1000,
.script_argv = pre_run_args->args,
.script_path = bb_state.bb_config.get_sys_state,
.script_type = "dws_pre_run",
.status = &status,
.tid = tid,
};
/* Wait for node boot to complete */
while (!nodes_ready) {
lock_slurmctld(job_read_lock);
job_ptr = find_job_record(pre_run_args->job_id);
if (!job_ptr || IS_JOB_COMPLETED(job_ptr)) {
unlock_slurmctld(job_read_lock);
track_script_remove(pthread_self());
return NULL;
}
if (test_job_nodes_ready(job_ptr))
nodes_ready = true;
unlock_slurmctld(job_read_lock);
if (!nodes_ready)
sleep(60);
}
START_TIMER;
resp_msg = run_command(&run_command_args);
END_TIMER;
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("dws_pre_run for JobId=%u terminated by slurmctld",
pre_run_args->job_id);
xfree(resp_msg);
xfree_array(pre_run_args->args);
xfree(pre_run_args);
track_script_remove(pthread_self());
return NULL;
}
/* track_script_reset_cpid(pthread_self(), 0); */
lock_slurmctld(job_write_lock);
slurm_mutex_lock(&bb_state.bb_mutex);
job_ptr = find_job_record(pre_run_args->job_id);
if ((DELTA_TIMER > 500000) || /* 0.5 secs */
(slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF)) {
info("dws_pre_run for %pJ ran for %s",
job_ptr, TIME_STR);
}
if (job_ptr)
bb_job = _get_bb_job(job_ptr);
_log_script_argv(pre_run_args->args, resp_msg);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
/* Pre-run failure */
trigger_burst_buffer();
error("dws_pre_run for %pJ status:%u response:%s",
job_ptr, status, resp_msg);
if (job_ptr) {
bb_update_system_comment(job_ptr, "pre_run", resp_msg,
0);
if (IS_JOB_RUNNING(job_ptr))
run_kill_job = true;
if (bb_job) {
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_TEARDOWN);
if (bb_job->retry_cnt++ > MAX_RETRY_CNT)
hold_job = true;
}
}
_queue_teardown(pre_run_args->job_id, pre_run_args->user_id,
true);
} else if (bb_job) {
/* Pre-run success and the job's BB record exists */
if (bb_job->state == BB_STATE_ALLOC_REVOKE)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
else
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_RUNNING);
}
if (job_ptr) {
if (run_kill_job)
job_state_unset_flag(job_ptr, JOB_CONFIGURING);
prolog_running_decr(job_ptr);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
if (run_kill_job) {
/* bb_mutex must be unlocked before calling this */
_kill_job(job_ptr, hold_job);
}
unlock_slurmctld(job_write_lock);
xfree(resp_msg);
xfree_array(pre_run_args->args);
xfree(pre_run_args);
track_script_remove(pthread_self());
return NULL;
}
/* Revoke allocation, but do not release resources.
* Executed after bb_p_job_begin() if there was an allocation failure.
* Does not release previously allocated resources.
*
* Returns a Slurm errno.
*/
extern int bb_p_job_revoke_alloc(job_record_t *job_ptr)
{
bb_job_t *bb_job = NULL;
int rc = SLURM_SUCCESS;
slurm_mutex_lock(&bb_state.bb_mutex);
if (job_ptr)
bb_job = _get_bb_job(job_ptr);
if (bb_job) {
if (bb_job->state == BB_STATE_RUNNING)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_STAGED_IN);
else if (bb_job->state == BB_STATE_PRE_RUN)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOC_REVOKE);
} else {
rc = SLURM_ERROR;
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Trigger a job's burst buffer stage-out to begin
*
* Returns a Slurm errno.
*/
extern int bb_p_job_start_stage_out(job_record_t *job_ptr)
{
bb_job_t *bb_job;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return SLURM_SUCCESS;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not stage out %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
/* No job buffers. Assuming use of persistent buffers only */
verbose("%pJ bb job record not found",
job_ptr);
} else if (bb_job->state < BB_STATE_RUNNING) {
/* Job never started. Just teardown the buffer */
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
} else if (bb_job->state < BB_STATE_POST_RUN) {
_pre_queue_stage_out(job_ptr, bb_job);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
/*
* Determine if a job's burst buffer post_run operation is complete
*
* RET: 0 - post_run is underway
* 1 - post_run complete
* -1 - fatal error
*/
extern int bb_p_job_test_post_run(job_record_t *job_ptr)
{
bb_job_t *bb_job;
int rc = -1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not post_run %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return -1;
}
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (!bb_job) {
/* No job buffers. Assuming use of persistent buffers only */
verbose("%pJ bb job record not found",
job_ptr);
rc = 1;
} else {
if (bb_job->state < BB_STATE_POST_RUN) {
rc = -1;
} else if (bb_job->state > BB_STATE_POST_RUN) {
rc = 1;
} else {
rc = 0;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Determine if a job's burst buffer stage-out is complete
*
* RET: 0 - stage-out is underway
* 1 - stage-out complete
* -1 - fatal error
*/
extern int bb_p_job_test_stage_out(job_record_t *job_ptr)
{
bb_job_t *bb_job;
int rc = -1;
if ((job_ptr->burst_buffer == NULL) ||
(job_ptr->burst_buffer[0] == '\0'))
return 1;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not stage-out %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return -1;
}
bb_job = bb_job_find(&bb_state, job_ptr->job_id);
if (!bb_job) {
/*
* This is expected if the burst buffer completed teardown,
* or if only persistent burst buffers were used.
*/
rc = 1;
} else {
if (bb_job->state == BB_STATE_PENDING) {
/*
* No job BB work not started before job was killed.
* Alternately slurmctld daemon restarted after the
* job's BB work was completed.
*/
rc = 1;
} else if (bb_job->state < BB_STATE_POST_RUN) {
rc = -1;
} else if (bb_job->state > BB_STATE_STAGING_OUT) {
rc = 1;
if (bb_job->state == BB_STATE_COMPLETE)
bb_job_del(&bb_state, bb_job->job_id);
} else {
rc = 0;
}
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return rc;
}
/*
* Terminate any file staging and completely release burst buffer resources
*
* Returns a Slurm errno.
*/
extern int bb_p_job_cancel(job_record_t *job_ptr)
{
bb_job_t *bb_job;
bb_alloc_t *bb_alloc;
slurm_mutex_lock(&bb_state.bb_mutex);
log_flag(BURST_BUF, "%pJ", job_ptr);
if (bb_state.last_load_time == 0) {
info("Burst buffer down, can not cancel %pJ",
job_ptr);
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_ERROR;
}
bb_job = _get_bb_job(job_ptr);
if (!bb_job) {
/* Nothing ever allocated, nothing to clean up */
} else if (bb_job->state == BB_STATE_PENDING) {
bb_set_job_bb_state(job_ptr, bb_job, /* Nothing to clean up */
BB_STATE_COMPLETE);
} else {
/* Note: Persistent burst buffer actions already completed
* for the job are not reversed */
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_TEARDOWN);
bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
if (bb_alloc) {
bb_alloc->state = BB_STATE_TEARDOWN;
bb_alloc->state_time = time(NULL);
bb_state.last_update_time = time(NULL);
}
_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
}
slurm_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS;
}
static void _free_create_args(create_buf_data_t *create_args)
{
if (create_args) {
xfree(create_args->access);
xfree(create_args->job_script);
xfree(create_args->name);
xfree(create_args->pool);
xfree(create_args->type);
xfree(create_args);
}
}
/*
* Create/destroy persistent burst buffers
* job_ptr IN - job to operate upon
* bb_job IN - job's burst buffer data
* job_ready IN - if true, job is ready to run now, if false then do not
* delete persistent buffers
* Returns count of buffer create/destroy requests which are pending
*/
static int _create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
bool job_ready)
{
create_buf_data_t *create_args;
bb_buf_t *buf_ptr;
bb_alloc_t *bb_alloc;
int i, hash_inx, rc = 0;
xassert(bb_job);
for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
i++, buf_ptr++) {
if ((buf_ptr->state == BB_STATE_ALLOCATING) ||
(buf_ptr->state == BB_STATE_DELETING)) {
rc++;
} else if (buf_ptr->state != BB_STATE_PENDING) {
; /* Nothing to do */
} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
buf_ptr->create) { /* Create the buffer */
bb_alloc = bb_find_name_rec(buf_ptr->name,
job_ptr->user_id,
&bb_state);
if (bb_alloc &&
(bb_alloc->user_id != job_ptr->user_id)) {
info("Attempt by %pJ user %u to create duplicate persistent burst buffer named %s and currently owned by user %u",
job_ptr, job_ptr->user_id,
buf_ptr->name, bb_alloc->user_id);
job_ptr->priority = 0;
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xfree(job_ptr->state_desc);
job_ptr->state_desc = xstrdup(
"Burst buffer create_persistent error");
buf_ptr->state = BB_STATE_COMPLETE;
bb_update_system_comment(
job_ptr, "create_persistent",
"Duplicate buffer name", 0);
rc++;
break;
} else if (bb_alloc) {
/* Duplicate create likely result of requeue */
debug("Attempt by %pJ to create duplicate persistent burst buffer named %s",
job_ptr, buf_ptr->name);
buf_ptr->create = false; /* Creation complete */
if (bb_job->persist_add >= bb_alloc->size) {
bb_job->persist_add -= bb_alloc->size;
} else {
error("Persistent buffer size underflow for %pJ",
job_ptr);
bb_job->persist_add = 0;
}
continue;
}
rc++;
if (!buf_ptr->pool) {
buf_ptr->pool =
xstrdup(bb_state.bb_config.default_pool);
}
bb_limit_add(job_ptr->user_id, buf_ptr->size,
buf_ptr->pool, &bb_state, true);
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOCATING);
buf_ptr->state = BB_STATE_ALLOCATING;
create_args = xmalloc(sizeof(create_buf_data_t));
create_args->access = xstrdup(buf_ptr->access);
create_args->job_id = job_ptr->job_id;
create_args->name = xstrdup(buf_ptr->name);
create_args->pool = xstrdup(buf_ptr->pool);
create_args->size = buf_ptr->size;
create_args->type = xstrdup(buf_ptr->type);
create_args->user_id = job_ptr->user_id;
slurm_thread_create_detached(_create_persistent,
create_args);
} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
buf_ptr->destroy && job_ready) {
/* Delete the buffer */
bb_alloc = bb_find_name_rec(buf_ptr->name,
job_ptr->user_id,
&bb_state);
if (!bb_alloc) {
/* Ignore request if named buffer not found */
info("No burst buffer with name '%s' found for %pJ",
buf_ptr->name, job_ptr);
continue;
}
rc++;
if ((bb_alloc->user_id != job_ptr->user_id) &&
!validate_super_user(job_ptr->user_id)) {
info("Attempt by user %u %pJ to destroy buffer %s owned by user %u",
job_ptr->user_id, job_ptr,
buf_ptr->name, bb_alloc->user_id);
job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
xstrfmtcat(job_ptr->state_desc,
"%s: Delete buffer %s permission "
"denied",
plugin_type, buf_ptr->name);
job_ptr->priority = 0; /* Hold job */
continue;
}
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_DELETING);
buf_ptr->state = BB_STATE_DELETING;
create_args = xmalloc(sizeof(create_buf_data_t));
create_args->hurry = buf_ptr->hurry;
create_args->job_id = job_ptr->job_id;
hash_inx = job_ptr->job_id % 10;
xstrfmtcat(create_args->job_script,
"%s/hash.%d/job.%u/script",
slurm_conf.state_save_location,
hash_inx, job_ptr->job_id);
create_args->name = xstrdup(buf_ptr->name);
create_args->user_id = job_ptr->user_id;
slurm_thread_create_detached(_destroy_persistent,
create_args);
} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
buf_ptr->destroy) {
rc++;
} else if ((buf_ptr->flags != BB_FLAG_BB_OP) &&
buf_ptr->use) {
/*
* Persistent buffer not created or destroyed, but used.
* Just check for existence
*/
bb_alloc = bb_find_name_rec(buf_ptr->name,
job_ptr->user_id,
&bb_state);
if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED))
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOCATED);
else
rc++;
}
}
return rc;
}
/* Test for the existence of persistent burst buffers to be used (but not
* created) by this job. Return true of they are all ready */
static bool _test_persistent_use_ready(bb_job_t *bb_job,
job_record_t *job_ptr)
{
int i, not_ready_cnt = 0;
bb_alloc_t *bb_alloc;
bb_buf_t *buf_ptr;
xassert(bb_job);
for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
i++, buf_ptr++) {
if (buf_ptr->create || buf_ptr->destroy)
continue;
bb_alloc = bb_find_name_rec(buf_ptr->name, job_ptr->user_id,
&bb_state);
if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED)) {
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOCATED);
} else {
not_ready_cnt++;
break;
}
}
if (not_ready_cnt != 0)
return false;
return true;
}
/* Reset data structures based upon a change in buffer state
* IN user_id - User effected
* IN job_id - Job effected
* IN name - Buffer name
* IN new_state - New buffer state
* IN buf_size - Size of created burst buffer only, used to decrement remaining
* space requirement for the job
*/
static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
int new_state, uint64_t buf_size)
{
bb_buf_t *buf_ptr;
bb_job_t *bb_job;
int i, old_state;
bool active_buf = false;
bb_job = bb_job_find(&bb_state, job_id);
if (!bb_job) {
error("Could not find job record for JobId=%u",
job_id);
return;
}
/* Update the buffer's state in job record */
for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
i++, buf_ptr++) {
if (xstrcmp(name, buf_ptr->name))
continue;
old_state = buf_ptr->state;
buf_ptr->state = new_state;
if ((old_state == BB_STATE_ALLOCATING) &&
(new_state == BB_STATE_PENDING)) {
bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
&bb_state);
}
if ((old_state == BB_STATE_DELETING) &&
(new_state == BB_STATE_PENDING)) {
bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
&bb_state);
}
if ((old_state == BB_STATE_ALLOCATING) &&
(new_state == BB_STATE_ALLOCATED) &&
((name[0] < '0') || (name[0] > '9'))) {
buf_ptr->create = false; /* Buffer creation complete */
if (bb_job->persist_add >= buf_size) {
bb_job->persist_add -= buf_size;
} else {
error("Persistent buffer size underflow for JobId=%u",
job_id);
bb_job->persist_add = 0;
}
}
break;
}
for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
i++, buf_ptr++) {
old_state = buf_ptr->state;
if ((old_state == BB_STATE_PENDING) ||
(old_state == BB_STATE_ALLOCATING) ||
(old_state == BB_STATE_DELETING) ||
(old_state == BB_STATE_TEARDOWN) ||
(old_state == BB_STATE_TEARDOWN_FAIL))
active_buf = true;
break;
}
if (!active_buf) {
job_record_t *job_ptr = find_job_record(job_id);
if (bb_job->state == BB_STATE_ALLOCATING)
bb_set_job_bb_state(job_ptr, bb_job,
BB_STATE_ALLOCATED);
else if (bb_job->state == BB_STATE_DELETING)
bb_set_job_bb_state(job_ptr, bb_job, BB_STATE_DELETED);
queue_job_scheduler();
}
}
/* Create a persistent burst buffer based upon user specifications. */
static void *_create_persistent(void *x)
{
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
create_buf_data_t *create_args = (create_buf_data_t *) x;
job_record_t *job_ptr;
bb_alloc_t *bb_alloc;
char **script_argv, *resp_msg;
int i, status = 0;
uint32_t timeout;
DEF_TIMERS;
pthread_t tid = pthread_self();
track_script_rec_add(create_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.script_type = "create_persistent",
.status = &status,
.tid = tid,
};
script_argv = xcalloc(20, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("create_persistent");
script_argv[3] = xstrdup("-c");
script_argv[4] = xstrdup("CLI");
script_argv[5] = xstrdup("-t"); /* name */
script_argv[6] = xstrdup(create_args->name);
script_argv[7] = xstrdup("-u"); /* user iD */
xstrfmtcat(script_argv[8], "%u", create_args->user_id);
script_argv[9] = xstrdup("-C"); /* configuration */
xstrfmtcat(script_argv[10], "%s:%"PRIu64"",
create_args->pool, create_args->size);
slurm_mutex_lock(&bb_state.bb_mutex);
timeout = bb_state.bb_config.other_timeout * 1000;
slurm_mutex_unlock(&bb_state.bb_mutex);
i = 11;
if (create_args->access) {
script_argv[i++] = xstrdup("-a");
script_argv[i++] = xstrdup(create_args->access);
}
if (create_args->type) {
script_argv[i++] = xstrdup("-T");
script_argv[i++] = xstrdup(create_args->type);
}
/* NOTE: There is an optional group ID parameter available and
* currently not used by Slurm */
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
_log_script_argv(script_argv, resp_msg);
xfree_array(script_argv);
END_TIMER;
info("create_persistent of %s ran for %s",
create_args->name, TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("create_persistent for JobId=%u terminated by slurmctld",
create_args->job_id);
xfree(resp_msg);
_free_create_args(create_args);
track_script_remove(pthread_self());
return NULL;
}
/* track_script_reset_cpid(pthread_self(), 0); */
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
trigger_burst_buffer();
error("For JobId=%u Name=%s status:%u response:%s",
create_args->job_id,
create_args->name, status, resp_msg);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(create_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
create_args->job_id);
} else {
job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
job_ptr->priority = 0;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s",
resp_msg);
bb_update_system_comment(job_ptr, "create_persistent",
resp_msg, 0);
}
slurm_mutex_lock(&bb_state.bb_mutex);
_reset_buf_state(create_args->user_id, create_args->job_id,
create_args->name, BB_STATE_PENDING, 0);
bb_state.last_update_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
} else if (resp_msg && strstr(resp_msg, "created")) {
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK };
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(create_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
create_args->job_id);
}
assoc_mgr_lock(&assoc_locks);
slurm_mutex_lock(&bb_state.bb_mutex);
_reset_buf_state(create_args->user_id, create_args->job_id,
create_args->name, BB_STATE_ALLOCATED,
create_args->size);
bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name,
create_args->user_id);
bb_alloc->size = create_args->size;
bb_alloc->pool = xstrdup(create_args->pool);
if (job_ptr) {
bb_alloc->account = xstrdup(job_ptr->account);
if (job_ptr->assoc_ptr) {
/* Only add the direct association id
* here, we don't need to keep track
* of the tree.
*/
slurmdb_assoc_rec_t *assoc = job_ptr->assoc_ptr;
bb_alloc->assoc_ptr = assoc;
xfree(bb_alloc->assocs);
bb_alloc->assocs = xstrdup_printf(
",%u,", assoc->id);
}
if (job_ptr->qos_ptr) {
slurmdb_qos_rec_t *qos_ptr = job_ptr->qos_ptr;
bb_alloc->qos_ptr = qos_ptr;
bb_alloc->qos = xstrdup(qos_ptr->name);
}
if (job_ptr->part_ptr) {
bb_alloc->partition =
xstrdup(job_ptr->part_ptr->name);
}
}
if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
bb_alloc->create_time = time(NULL);
bb_alloc->id = ++last_persistent_id;
} else {
bb_sessions_t *sessions;
int num_sessions = 0;
sessions = _bb_get_sessions(&num_sessions, &bb_state,
timeout);
for (i = 0; i < num_sessions; i++) {
if (xstrcmp(sessions[i].token,
create_args->name))
continue;
bb_alloc->create_time = sessions[i].created;
bb_alloc->id = sessions[i].id;
break;
}
_bb_free_sessions(sessions, num_sessions);
}
(void) bb_post_persist_create(job_ptr, bb_alloc, &bb_state);
bb_state.last_update_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
assoc_mgr_unlock(&assoc_locks);
unlock_slurmctld(job_write_lock);
}
xfree(resp_msg);
_free_create_args(create_args);
track_script_remove(pthread_self());
return NULL;
}
/* Destroy a persistent burst buffer */
static void *_destroy_persistent(void *x)
{
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
create_buf_data_t *destroy_args = (create_buf_data_t *) x;
job_record_t *job_ptr;
bb_alloc_t *bb_alloc;
char **script_argv, *resp_msg;
int status = 0;
uint32_t timeout;
DEF_TIMERS;
pthread_t tid = pthread_self();
track_script_rec_add(destroy_args->job_id, 0, pthread_self());
run_command_args_t run_command_args = {
.script_path = bb_state.bb_config.get_sys_state,
.script_type = "destroy_persistent",
.status = &status,
.tid = tid,
};
slurm_mutex_lock(&bb_state.bb_mutex);
bb_alloc = bb_find_name_rec(destroy_args->name, destroy_args->user_id,
&bb_state);
if (!bb_alloc) {
info("No burst buffer with name '%s' found for JobId=%u",
destroy_args->name, destroy_args->job_id);
}
timeout = bb_state.bb_config.other_timeout * 1000;
slurm_mutex_unlock(&bb_state.bb_mutex);
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("teardown");
script_argv[3] = xstrdup("--token"); /* name */
script_argv[4] = xstrdup(destroy_args->name);
script_argv[5] = xstrdup("--job"); /* script */
script_argv[6] = xstrdup(destroy_args->job_script);
if (destroy_args->hurry)
script_argv[7] = xstrdup("--hurry");
START_TIMER;
run_command_args.max_wait = timeout;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
_log_script_argv(script_argv, resp_msg);
xfree_array(script_argv);
END_TIMER;
info("destroy_persistent of %s ran for %s",
destroy_args->name, TIME_STR);
if (track_script_killed(pthread_self(), status, true)) {
/* I was killed by slurmtrack, bail out right now */
info("destroy_persistent for JobId=%u terminated by slurmctld",
destroy_args->job_id);
xfree(resp_msg);
_free_create_args(destroy_args);
track_script_remove(pthread_self());
return NULL;
}
/* track_script_reset_cpid(pthread_self(), 0); */
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
trigger_burst_buffer();
error("destroy_persistent for JobId=%u Name=%s status:%u response:%s",
destroy_args->job_id,
destroy_args->name, status, resp_msg);
lock_slurmctld(job_write_lock);
job_ptr = find_job_record(destroy_args->job_id);
if (!job_ptr) {
error("unable to find job record for JobId=%u",
destroy_args->job_id);
} else {
bb_update_system_comment(job_ptr, "teardown",
resp_msg, 0);
job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
xfree(job_ptr->state_desc);
xstrfmtcat(job_ptr->state_desc, "%s",
resp_msg);
}
slurm_mutex_lock(&bb_state.bb_mutex);
_reset_buf_state(destroy_args->user_id, destroy_args->job_id,
destroy_args->name, BB_STATE_PENDING, 0);
bb_state.last_update_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock);
} else {
assoc_mgr_lock_t assoc_locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK };
/*
* job_write_lock needed for _reset_buf_state() since it will
* call bb_set_job_bb_state() to modify
* job_ptr->burst_buffer_state
*/
lock_slurmctld(job_write_lock);
/* assoc_mgr needs locking to call bb_post_persist_delete */
if (bb_alloc)
assoc_mgr_lock(&assoc_locks);
slurm_mutex_lock(&bb_state.bb_mutex);
_reset_buf_state(destroy_args->user_id, destroy_args->job_id,
destroy_args->name, BB_STATE_DELETED, 0);
/* Modify internal buffer record for purging */
if (bb_alloc) {
bb_alloc->state = BB_STATE_COMPLETE;
bb_alloc->job_id = destroy_args->job_id;
bb_alloc->state_time = time(NULL);
bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
bb_alloc->pool, &bb_state);
(void) bb_post_persist_delete(bb_alloc, &bb_state);
(void) bb_free_alloc_rec(&bb_state, bb_alloc);
}
bb_state.last_update_time = time(NULL);
slurm_mutex_unlock(&bb_state.bb_mutex);
if (bb_alloc)
assoc_mgr_unlock(&assoc_locks);
unlock_slurmctld(job_write_lock);
}
xfree(resp_msg);
_free_create_args(destroy_args);
track_script_remove(pthread_self());
return NULL;
}
/* _bb_get_configs()
*
* Handle the JSON stream with configuration info (instance use details).
*/
static bb_configs_t *
_bb_get_configs(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
{
bb_configs_t *ents = NULL;
json_object *j;
json_object_iter iter;
int status = 0;
DEF_TIMERS;
char *resp_msg;
char **script_argv;
run_command_args_t run_command_args = {
.max_wait = timeout,
.script_path = state_ptr->bb_config.get_sys_state,
.script_type = "show_configurations",
.status = &status,
};
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("show_configurations");
START_TIMER;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
log_flag(BURST_BUF, "show_configurations ran for %s",
TIME_STR);
_log_script_argv(script_argv, resp_msg);
xfree_array(script_argv);
#if 0
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
#else
//FIXME: Cray bug: API returning error if no configurations, use above code when fixed
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
(!resp_msg || (resp_msg[0] != '{'))) {
#endif
trigger_burst_buffer();
error("show_configurations status:%u response:%s",
status, resp_msg);
}
if (resp_msg == NULL) {
info("%s returned no configurations",
state_ptr->bb_config.get_sys_state);
return ents;
}
_python2json(resp_msg);
j = json_tokener_parse(resp_msg);
if (j == NULL) {
error("json parser failed on \"%s\"",
resp_msg);
xfree(resp_msg);
return ents;
}
xfree(resp_msg);
json_object_object_foreachC(j, iter) {
if (ents) {
error("Multiple configuration objects");
break;
}
ents = _json_parse_configs_array(j, iter.key, num_ent);
}
json_object_put(j); /* Frees json memory */
return ents;
}
/* _bb_get_instances()
*
* Handle the JSON stream with instance info (resource reservations).
*/
static bb_instances_t *
_bb_get_instances(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
{
bb_instances_t *ents = NULL;
json_object *j;
json_object_iter iter;
int status = 0;
DEF_TIMERS;
char *resp_msg;
char **script_argv;
run_command_args_t run_command_args = {
.max_wait = timeout,
.script_path = state_ptr->bb_config.get_sys_state,
.script_type = "show_instances",
.status = &status,
};
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("show_instances");
START_TIMER;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
log_flag(BURST_BUF, "show_instances ran for %s",
TIME_STR);
_log_script_argv(script_argv, resp_msg);
xfree_array(script_argv);
#if 0
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
#else
//FIXME: Cray bug: API returning error if no instances, use above code when fixed
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
(!resp_msg || (resp_msg[0] != '{'))) {
#endif
trigger_burst_buffer();
error("show_instances status:%u response:%s",
status, resp_msg);
}
if (resp_msg == NULL) {
info("%s returned no instances",
state_ptr->bb_config.get_sys_state);
return ents;
}
_python2json(resp_msg);
j = json_tokener_parse(resp_msg);
if (j == NULL) {
error("json parser failed on \"%s\"",
resp_msg);
xfree(resp_msg);
return ents;
}
xfree(resp_msg);
json_object_object_foreachC(j, iter) {
if (ents) {
error("Multiple instance objects");
break;
}
ents = _json_parse_instances_array(j, iter.key, num_ent);
}
json_object_put(j); /* Frees json memory */
return ents;
}
/* _bb_get_pools()
*
* Handle the JSON stream with resource pool info (available resource type).
*/
static bb_pools_t *
_bb_get_pools(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
{
bb_pools_t *ents = NULL;
json_object *j;
json_object_iter iter;
int status = 0;
DEF_TIMERS;
char *resp_msg;
char **script_argv;
run_command_args_t run_command_args = {
.max_wait = timeout,
.script_path = state_ptr->bb_config.get_sys_state,
.script_type = "pools",
.status = &status,
};
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("pools");
START_TIMER;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
if (slurm_conf.debug_flags & DEBUG_FLAG_BURST_BUF) {
/* Only log pools data if different to limit volume of logs */
static uint32_t last_csum = 0;
uint32_t i, resp_csum = 0;
debug("pools ran for %s",
TIME_STR);
for (i = 0; resp_msg[i]; i++)
resp_csum += ((i * resp_msg[i]) % 1000000);
if (last_csum != resp_csum)
_log_script_argv(script_argv, resp_msg);
last_csum = resp_csum;
}
xfree_array(script_argv);
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
trigger_burst_buffer();
error("pools status:%u response:%s",
status, resp_msg);
}
if (resp_msg == NULL) {
error("%s returned no pools",
state_ptr->bb_config.get_sys_state);
return ents;
}
_python2json(resp_msg);
j = json_tokener_parse(resp_msg);
if (j == NULL) {
error("json parser failed on \"%s\"",
resp_msg);
xfree(resp_msg);
return ents;
}
xfree(resp_msg);
json_object_object_foreachC(j, iter) {
if (ents) {
error("Multiple pool objects");
break;
}
ents = _json_parse_pools_array(j, iter.key, num_ent);
}
json_object_put(j); /* Frees json memory */
return ents;
}
static bb_sessions_t *
_bb_get_sessions(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
{
bb_sessions_t *ents = NULL;
json_object *j;
json_object_iter iter;
int status = 0;
DEF_TIMERS;
char *resp_msg;
char **script_argv;
run_command_args_t run_command_args = {
.max_wait = timeout,
.script_path = state_ptr->bb_config.get_sys_state,
.script_type = "show_sessions",
.status = &status,
};
script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
script_argv[0] = xstrdup("dw_wlm_cli");
script_argv[1] = xstrdup("--function");
script_argv[2] = xstrdup("show_sessions");
START_TIMER;
run_command_args.script_argv = script_argv;
resp_msg = run_command(&run_command_args);
END_TIMER;
log_flag(BURST_BUF, "show_sessions ran for %s",
TIME_STR);
_log_script_argv(script_argv, resp_msg);
xfree_array(script_argv);
#if 0
if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
#else
//FIXME: Cray bug: API returning error if no sessions, use above code when fixed
if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
(!resp_msg || (resp_msg[0] != '{'))) {
#endif
trigger_burst_buffer();
error("show_sessions status:%u response:%s",
status, resp_msg);
}
if (resp_msg == NULL) {
info("%s returned no sessions",
state_ptr->bb_config.get_sys_state);
xfree_array(script_argv);
return ents;
}
_python2json(resp_msg);
j = json_tokener_parse(resp_msg);
if (j == NULL) {
error("json parser failed on \"%s\"",
resp_msg);
xfree(resp_msg);
return ents;
}
xfree(resp_msg);
json_object_object_foreachC(j, iter) {
if (ents) {
error("Multiple session objects");
break;
}
ents = _json_parse_sessions_array(j, iter.key, num_ent);
}
json_object_put(j); /* Frees json memory */
return ents;
}
/* _bb_free_configs()
*/
static void
_bb_free_configs(bb_configs_t *ents, int num_ent)
{
xfree(ents);
}
/* _bb_free_instances()
*/
static void
_bb_free_instances(bb_instances_t *ents, int num_ent)
{
xfree(ents);
}
/* _bb_free_pools()
*/
static void
_bb_free_pools(bb_pools_t *ents, int num_ent)
{
int i;
for (i = 0; i < num_ent; i++) {
xfree(ents[i].id);
xfree(ents[i].units);
}
xfree(ents);
}
/* _bb_free_sessions()
*/
static void
_bb_free_sessions(bb_sessions_t *ents, int num_ent)
{
int i;
for (i = 0; i < num_ent; i++) {
xfree(ents[i].token);
}
xfree(ents);
}
/* _json_parse_configs_array()
*/
static bb_configs_t *
_json_parse_configs_array(json_object *jobj, char *key, int *num)
{
json_object *jarray;
int i;
json_object *jvalue;
bb_configs_t *ents;
jarray = jobj;
json_object_object_get_ex(jobj, key, &jarray);
*num = json_object_array_length(jarray);
ents = xcalloc(*num, sizeof(bb_configs_t));
for (i = 0; i < *num; i++) {
jvalue = json_object_array_get_idx(jarray, i);
_json_parse_configs_object(jvalue, &ents[i]);
}
return ents;
}
/* _json_parse_instances_array()
*/
static bb_instances_t *
_json_parse_instances_array(json_object *jobj, char *key, int *num)
{
json_object *jarray;
int i;
json_object *jvalue;
bb_instances_t *ents;
jarray = jobj;
json_object_object_get_ex(jobj, key, &jarray);
*num = json_object_array_length(jarray);
ents = xcalloc(*num, sizeof(bb_instances_t));
for (i = 0; i < *num; i++) {
jvalue = json_object_array_get_idx(jarray, i);
_json_parse_instances_object(jvalue, &ents[i]);
}
return ents;
}
/* _json_parse_pools_array()
*/
static bb_pools_t *
_json_parse_pools_array(json_object *jobj, char *key, int *num)
{
json_object *jarray;
int i;
json_object *jvalue;
bb_pools_t *ents;
jarray = jobj;
json_object_object_get_ex(jobj, key, &jarray);
*num = json_object_array_length(jarray);
ents = xcalloc(*num, sizeof(bb_pools_t));
for (i = 0; i < *num; i++) {
jvalue = json_object_array_get_idx(jarray, i);
_json_parse_pools_object(jvalue, &ents[i]);
}
return ents;
}
/* _json_parse_sessions_array()
*/
static bb_sessions_t *
_json_parse_sessions_array(json_object *jobj, char *key, int *num)
{
json_object *jarray;
int i;
json_object *jvalue;
bb_sessions_t *ents;
jarray = jobj;
json_object_object_get_ex(jobj, key, &jarray);
*num = json_object_array_length(jarray);
ents = xcalloc(*num, sizeof(bb_sessions_t));
for (i = 0; i < *num; i++) {
jvalue = json_object_array_get_idx(jarray, i);
_json_parse_sessions_object(jvalue, &ents[i]);
}
return ents;
}
/* Parse "links" object in the "configuration" object */
static void
_parse_config_links(json_object *instance, bb_configs_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int x;
json_object_object_foreachC(instance, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_int:
x = json_object_get_int64(iter.val);
if (!xstrcmp(iter.key, "instance"))
ent->instance = x;
break;
default:
break;
}
}
}
/* _json_parse_configs_object()
*/
static void
_json_parse_configs_object(json_object *jobj, bb_configs_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
json_object_object_foreachC(jobj, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_object:
if (xstrcmp(iter.key, "links") == 0)
_parse_config_links(iter.val, ent);
break;
case json_type_int:
x = json_object_get_int64(iter.val);
if (xstrcmp(iter.key, "id") == 0) {
ent->id = x;
}
break;
default:
break;
}
}
}
/* Parse "capacity" object in the "instance" object */
static void
_parse_instance_capacity(json_object *instance, bb_instances_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
json_object_object_foreachC(instance, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_int:
x = json_object_get_int64(iter.val);
if (!xstrcmp(iter.key, "bytes"))
ent->bytes = x;
break;
default:
break;
}
}
}
/* Parse "links" object in the "instance" object */
static void
_parse_instance_links(json_object *instance, bb_instances_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
json_object_object_foreachC(instance, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_int:
x = json_object_get_int64(iter.val);
if (!xstrcmp(iter.key, "session"))
ent->session = x;
break;
default:
break;
}
}
}
/* _json_parse_instances_object()
*/
static void
_json_parse_instances_object(json_object *jobj, bb_instances_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
json_object_object_foreachC(jobj, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_object:
if (xstrcmp(iter.key, "capacity") == 0)
_parse_instance_capacity(iter.val, ent);
else if (xstrcmp(iter.key, "links") == 0)
_parse_instance_links(iter.val, ent);
break;
case json_type_int:
x = json_object_get_int64(iter.val);
if (xstrcmp(iter.key, "id") == 0) {
ent->id = x;
}
break;
default:
break;
}
}
}
/* _json_parse_pools_object()
*/
static void
_json_parse_pools_object(json_object *jobj, bb_pools_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
const char *p;
json_object_object_foreachC(jobj, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_int:
x = json_object_get_int64(iter.val);
if (xstrcmp(iter.key, "granularity") == 0) {
ent->granularity = x;
} else if (xstrcmp(iter.key, "quantity") == 0) {
ent->quantity = x;
} else if (xstrcmp(iter.key, "free") == 0) {
ent->free = x;
}
break;
case json_type_string:
p = json_object_get_string(iter.val);
if (xstrcmp(iter.key, "id") == 0) {
ent->id = xstrdup(p);
} else if (xstrcmp(iter.key, "units") == 0) {
ent->units = xstrdup(p);
}
break;
default:
break;
}
}
}
/* _json_parse_session_object()
*/
static void
_json_parse_sessions_object(json_object *jobj, bb_sessions_t *ent)
{
enum json_type type;
struct json_object_iter iter;
int64_t x;
const char *p;
json_object_object_foreachC(jobj, iter) {
type = json_object_get_type(iter.val);
switch (type) {
case json_type_int:
x = json_object_get_int64(iter.val);
if (xstrcmp(iter.key, "created") == 0) {
ent->created = x;
} else if (xstrcmp(iter.key, "id") == 0) {
ent->id = x;
} else if (xstrcmp(iter.key, "owner") == 0) {
ent->user_id = x;
}
break;
case json_type_string:
p = json_object_get_string(iter.val);
if (xstrcmp(iter.key, "token") == 0) {
ent->token = xstrdup(p);
}
default:
break;
}
}
}
/*
* Run a script in the burst buffer plugin
*
* func IN - script function to run
* jobid IN - job id for which we are running the script (0 if not for a job)
* argc IN - number of arguments to pass to script
* argv IN - argument list to pass to script
* resp_msg OUT - string returned by script
*
* Returns the status of the script.
*/
extern int bb_p_run_script(char *func, uint32_t job_id, uint32_t argc,
char **argv, job_info_msg_t *job_info,
char **resp_msg)
{
return 0;
}
/*
* Translate a burst buffer string to it's equivalent TRES string
* (e.g. "cray:2G,generic:4M" -> "1004=2048,1005=4")
* Caller must xfree the return value
*/
extern char *bb_p_xlate_bb_2_tres_str(char *burst_buffer)
{
char *save_ptr = NULL, *sep, *tmp, *tok;
char *result = NULL;
uint64_t size, total = 0;
if (!burst_buffer || (bb_state.tres_id < 1))
return result;
tmp = xstrdup(burst_buffer);
tok = strtok_r(tmp, ",", &save_ptr);
while (tok) {
sep = strchr(tok, ':');
if (sep) {
if (!xstrncmp(tok, "cray:", 5))
tok += 5;
else
tok = NULL;
}
if (tok) {
uint64_t mb_xlate = 1024 * 1024;
size = bb_get_size_num(tok,
bb_state.bb_config.granularity);
total += ROUNDUP(size, mb_xlate);
}
tok = strtok_r(NULL, ",", &save_ptr);
}
xfree(tmp);
if (total)
xstrfmtcat(result, "%d=%"PRIu64, bb_state.tres_id, total);
return result;
}