blob: 6c907853096c8b47ef972ebeab58599d7da70e09 [file] [log] [blame]
/*****************************************************************************\
* fed_mgr.c - functions for federations
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <pthread.h>
#include <signal.h>
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/parse_time.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/slurmdbd_defs.h"
#include "src/common/state_save.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/slurmctld/fed_mgr.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/state_save.h"
#include "src/slurmdbd/read_config.h"
#include "src/stepmgr/srun_comm.h"
#include "src/interfaces/conn.h"
#define FED_MGR_STATE_FILE "fed_mgr_state"
#define FED_MGR_CLUSTER_ID_BEGIN 26
#define TEST_REMOTE_DEP_FREQ 30 /* seconds */
#define FED_SIBLING_BIT(x) ((uint64_t)1 << (x - 1))
slurmdb_federation_rec_t *fed_mgr_fed_rec = NULL;
slurmdb_cluster_rec_t *fed_mgr_cluster_rec = NULL;
static pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t agent_thread_id = (pthread_t) 0;
static int agent_queue_size = 0;
static pthread_cond_t job_watch_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t job_watch_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool job_watch_thread_running = false;
static bool stop_job_watch_thread = false;
static bool inited = false;
static pthread_mutex_t open_send_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t update_mutex = PTHREAD_MUTEX_INITIALIZER;
static list_t *fed_job_list = NULL;
static list_t *fed_job_update_list = NULL;
static pthread_t fed_job_update_thread_id = (pthread_t) 0;
static pthread_mutex_t fed_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t job_update_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t job_update_mutex = PTHREAD_MUTEX_INITIALIZER;
static list_t *remote_dep_recv_list = NULL;
static pthread_t remote_dep_thread_id = (pthread_t) 0;
static pthread_cond_t remote_dep_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t remote_dep_recv_mutex = PTHREAD_MUTEX_INITIALIZER;
static list_t *remote_dep_job_list = NULL;
static pthread_t dep_job_thread_id = (pthread_t) 0;
static pthread_mutex_t dep_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t test_dep_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t test_dep_mutex = PTHREAD_MUTEX_INITIALIZER;
static list_t *origin_dep_update_list = NULL;
static pthread_t origin_dep_thread_id = (pthread_t) 0;
static pthread_cond_t origin_dep_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t origin_dep_update_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct {
buf_t *buffer;
uint32_t job_id;
time_t last_try;
int last_defer;
uint16_t msg_type;
} agent_queue_t;
enum fed_job_update_type {
FED_JOB_NONE = 0,
FED_JOB_CANCEL,
FED_JOB_COMPLETE,
FED_JOB_REMOVE_ACTIVE_SIB_BIT,
FED_JOB_REQUEUE,
FED_JOB_START,
FED_JOB_SUBMIT_BATCH,
FED_JOB_SUBMIT_INT,
FED_JOB_SUBMIT_RESP,
FED_JOB_SYNC,
FED_JOB_UPDATE,
FED_JOB_UPDATE_RESPONSE,
FED_SEND_JOB_SYNC,
};
typedef struct {
uint32_t cluster_lock;
uint32_t flags;
uint32_t job_id;
job_info_msg_t *job_info_msg;
uint32_t job_state;
job_step_kill_msg_t *kill_msg;
bool requeue;
uint32_t return_code;
uint64_t siblings_active;
uint64_t siblings_viable;
char *siblings_str;
time_t start_time;
char *submit_cluster;
job_desc_msg_t *submit_desc;
uint16_t submit_proto_ver;
uint32_t type;
uid_t uid;
} fed_job_update_info_t;
typedef struct {
uint32_t cluster_lock;
uint32_t job_id;
uint64_t siblings_active;
uint64_t siblings_viable;
uint32_t updating_sibs[MAX_FED_CLUSTERS + 1];
time_t updating_time[MAX_FED_CLUSTERS + 1];
} fed_job_info_t;
/* Local Structs */
typedef struct {
job_info_msg_t *job_info_msg;
uint32_t sibling_id;
char *sibling_name;
time_t sync_time;
} reconcile_sib_t;
/* Local Prototypes */
static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id);
static uint64_t _get_all_sibling_bits();
static int _validate_cluster_features(char *spec_features,
uint64_t *cluster_bitmap);
static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap);
static void _leave_federation(void);
static int _q_send_job_sync(char *sib_name);
static slurmdb_federation_rec_t *_state_load(char *state_save_location);
static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
time_t sync_time);
static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid);
static char *_job_update_type_str(enum fed_job_update_type type)
{
switch (type) {
case FED_JOB_COMPLETE:
return "FED_JOB_COMPLETE";
case FED_JOB_CANCEL:
return "FED_JOB_CANCEL";
case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
return "FED_JOB_REMOVE_ACTIVE_SIB_BIT";
case FED_JOB_REQUEUE:
return "FED_JOB_REQUEUE";
case FED_JOB_START:
return "FED_JOB_START";
case FED_JOB_SUBMIT_BATCH:
return "FED_JOB_SUBMIT_BATCH";
case FED_JOB_SUBMIT_INT:
return "FED_JOB_SUBMIT_INT";
case FED_JOB_SUBMIT_RESP:
return "FED_JOB_SUBMIT_RESP";
case FED_JOB_SYNC:
return "FED_JOB_SYNC";
case FED_JOB_UPDATE:
return "FED_JOB_UPDATE";
case FED_JOB_UPDATE_RESPONSE:
return "FED_JOB_UPDATE_RESPONSE";
case FED_SEND_JOB_SYNC:
return "FED_SEND_JOB_SYNC";
default:
return "?";
}
}
static void _append_job_update(fed_job_update_info_t *job_update_info)
{
list_append(fed_job_update_list, job_update_info);
slurm_mutex_lock(&job_update_mutex);
slurm_cond_broadcast(&job_update_cond);
slurm_mutex_unlock(&job_update_mutex);
}
/* Return true if communication failure should be logged. Only log failures
* every 10 minutes to avoid filling logs */
static bool _comm_fail_log(slurmdb_cluster_rec_t *cluster)
{
time_t now = time(NULL);
time_t old = now - 600; /* Log failures once every 10 mins */
if (cluster->comm_fail_time < old) {
cluster->comm_fail_time = now;
return true;
}
return false;
}
static int _close_controller_conn(slurmdb_cluster_rec_t *cluster)
{
int rc = SLURM_SUCCESS;
// persist_conn_t *persist_conn = NULL;
xassert(cluster);
slurm_mutex_lock(&cluster->lock);
log_flag(FEDR, "closing sibling conn to %s", cluster->name);
/* The recv free of this is handled directly in the persist_conn code,
* don't free it here */
// slurm_persist_conn_destroy(cluster->fed.recv);
cluster->fed.recv = NULL;
slurm_persist_conn_destroy(cluster->fed.send);
cluster->fed.send = NULL;
log_flag(FEDR, "closed sibling conn to %s", cluster->name);
slurm_mutex_unlock(&cluster->lock);
return rc;
}
/* Get list of jobs that originated from this cluster and the remote sibling and
* that are viable between the two siblings.
* Originating here: so that the remote can determine if the tracker job is gone
* Originating sib: so that the remote verify jobs are where they're supposed to
* be. If the sibling doesn't find a job, the sibling can resubmit the job or
* take other actions.
* Viable sib: because the origin might be down and the job was started or
* cancelled while the origin was down.
*
* Only get jobs that were submitted prior to sync_time
*/
static list_t *_get_sync_jobid_list(uint32_t sib_id, time_t sync_time)
{
list_t *jobids = NULL;
list_itr_t *job_itr;
job_record_t *job_ptr;
jobids = list_create(xfree_ptr);
/*
* Only look at jobs that:
* 1. originate from the remote sibling
* 2. originate from this cluster
* 3. if the sibling is in the job's viable list.
*/
job_itr = list_iterator_create(job_list);
while ((job_ptr = list_next(job_itr))) {
uint32_t cluster_id = fed_mgr_get_cluster_id(job_ptr->job_id);
if (job_ptr->fed_details &&
(job_ptr->details &&
(job_ptr->details->submit_time < sync_time)) &&
((cluster_id == sib_id) ||
(cluster_id == fed_mgr_cluster_rec->fed.id) ||
(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(sib_id)))) {
uint32_t *tmp = xmalloc(sizeof(uint32_t));
*tmp = job_ptr->job_id;
list_append(jobids, tmp);
}
}
list_iterator_destroy(job_itr);
return jobids;
}
static int _open_controller_conn(slurmdb_cluster_rec_t *cluster, bool locked)
{
int rc;
persist_conn_t *persist_conn = NULL;
static int timeout = -1;
if (timeout < 0)
timeout = slurm_conf.msg_timeout * 1000;
if (cluster == fed_mgr_cluster_rec) {
info("%s: hey! how did we get here with ourselves?", __func__);
return SLURM_ERROR;
}
if (!locked)
slurm_mutex_lock(&cluster->lock);
if (!cluster->control_host || !cluster->control_host[0] ||
!cluster->control_port) {
if (cluster->fed.recv) {
persist_conn = cluster->fed.recv;
cluster->control_port = persist_conn->rem_port;
xfree(cluster->control_host);
cluster->control_host = xstrdup(persist_conn->rem_host);
} else {
log_flag(FEDR, "%s: Sibling cluster %s doesn't appear to be up yet, skipping",
__func__, cluster->name);
if (!locked)
slurm_mutex_unlock(&cluster->lock);
return SLURM_ERROR;
}
}
log_flag(FEDR, "opening sibling conn to %s[%s:%u]",
cluster->name, cluster->control_host, cluster->control_port);
if (!cluster->fed.send) {
persist_conn = xmalloc(sizeof(*persist_conn));
cluster->fed.send = persist_conn;
/* Since this connection is coming from us, make it so ;) */
persist_conn->cluster_name = xstrdup(slurm_conf.cluster_name);
persist_conn->persist_type = PERSIST_TYPE_FED;
persist_conn->my_port = slurm_conf.slurmctld_port;
persist_conn->rem_host = xstrdup(cluster->control_host);
persist_conn->rem_port = cluster->control_port;
persist_conn->version = cluster->rpc_version;
persist_conn->shutdown = &slurmctld_config.shutdown_time;
persist_conn->timeout = timeout; /* don't put this as 0 it
* could cause deadlock */
} else {
persist_conn = cluster->fed.send;
/* Perhaps a backup came up, so don't assume it was the same
* host or port we had before.
*/
xfree(persist_conn->rem_host);
persist_conn->rem_host = xstrdup(cluster->control_host);
persist_conn->rem_port = cluster->control_port;
}
persist_conn->r_uid = SLURM_AUTH_UID_ANY;
rc = slurm_persist_conn_open(persist_conn);
if (rc != SLURM_SUCCESS) {
if (_comm_fail_log(cluster)) {
error("fed_mgr: Unable to open connection to cluster %s using host %s(%u)",
cluster->name,
persist_conn->rem_host, persist_conn->rem_port);
}
} else {
log_flag(FEDR, "opened sibling conn to %s:%d",
cluster->name,
conn_g_get_fd(persist_conn->tls_conn));
}
if (!locked)
slurm_mutex_unlock(&cluster->lock);
return rc;
}
/* The cluster->lock should be locked before this is called */
static int _check_send(slurmdb_cluster_rec_t *cluster)
{
persist_conn_t *send = cluster->fed.send;
if (!send || !send->tls_conn) {
return _open_controller_conn(cluster, true);
}
return SLURM_SUCCESS;
}
/* fed_mgr read lock needs to be set before coming in here,
* not the write lock */
static void _open_persist_sends(void)
{
list_itr_t *itr;
slurmdb_cluster_rec_t *cluster = NULL;
persist_conn_t *send = NULL;
if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) {
log_flag(FEDR, "bailing on empty cluster list");
return;
}
/* This open_send_mutex will make this like a write lock since at the
* same time we are sending out these open requests the other slurmctlds
* will be replying and needing to get to the structures. If we just
* used the fed_mgr write lock it would cause deadlock.
*/
slurm_mutex_lock(&open_send_mutex);
itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((cluster = list_next(itr))) {
if (cluster == fed_mgr_cluster_rec)
continue;
send = cluster->fed.send;
if (!send || !send->tls_conn)
_open_controller_conn(cluster, false);
}
list_iterator_destroy(itr);
slurm_mutex_unlock(&open_send_mutex);
}
static int _send_recv_msg(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
slurm_msg_t *resp, bool locked)
{
int rc;
xassert(cluster);
xassert(req);
xassert(resp);
slurm_msg_t_init(resp);
if (!locked)
slurm_mutex_lock(&cluster->lock);
rc = _check_send(cluster);
if ((rc == SLURM_SUCCESS) && cluster->fed.send) {
resp->conn = req->conn = cluster->fed.send;
rc = slurm_send_recv_msg(req->conn->tls_conn, req, resp, 0);
}
if (!locked)
slurm_mutex_unlock(&cluster->lock);
return rc;
}
/* Free buf_t record from a list */
static void _ctld_free_list_msg(void *x)
{
agent_queue_t *agent_queue_ptr = (agent_queue_t *) x;
if (agent_queue_ptr) {
FREE_NULL_BUFFER(agent_queue_ptr->buffer);
xfree(agent_queue_ptr);
}
}
static int _queue_rpc(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
uint32_t job_id, bool locked)
{
agent_queue_t *agent_rec;
buf_t *buf;
if (!cluster->send_rpc)
cluster->send_rpc = list_create(_ctld_free_list_msg);
buf = init_buf(1024);
pack16(req->msg_type, buf);
if (pack_msg(req, buf) != SLURM_SUCCESS) {
error("%s: failed to pack msg_type:%u",
__func__, req->msg_type);
FREE_NULL_BUFFER(buf);
return SLURM_ERROR;
}
/* Queue the RPC and notify the agent of new work */
agent_rec = xmalloc(sizeof(agent_queue_t));
agent_rec->buffer = buf;
agent_rec->job_id = job_id;
agent_rec->msg_type = req->msg_type;
list_append(cluster->send_rpc, agent_rec);
slurm_mutex_lock(&agent_mutex);
agent_queue_size++;
slurm_cond_broadcast(&agent_cond);
slurm_mutex_unlock(&agent_mutex);
return SLURM_SUCCESS;
}
/*
* close all sibling conns
* must lock before entering.
*/
static int _close_sibling_conns(void)
{
list_itr_t *itr;
slurmdb_cluster_rec_t *cluster;
if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
goto fini;
itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((cluster = list_next(itr))) {
if (cluster == fed_mgr_cluster_rec)
continue;
_close_controller_conn(cluster);
}
list_iterator_destroy(itr);
fini:
return SLURM_SUCCESS;
}
static void _mark_self_as_drained(void)
{
list_t *ret_list = NULL;
slurmdb_cluster_cond_t cluster_cond;
slurmdb_cluster_rec_t cluster_rec;
log_flag(FEDR, "%s: setting cluster fedstate to DRAINED", __func__);
slurmdb_init_cluster_cond(&cluster_cond, false);
slurmdb_init_cluster_rec(&cluster_rec, false);
cluster_cond.cluster_list = list_create(NULL);
list_append(cluster_cond.cluster_list, fed_mgr_cluster_rec->name);
cluster_rec.fed.state = CLUSTER_FED_STATE_INACTIVE |
(fed_mgr_cluster_rec->fed.state & ~CLUSTER_FED_STATE_BASE);
ret_list = acct_storage_g_modify_clusters(acct_db_conn,
slurm_conf.slurm_user_id,
&cluster_cond, &cluster_rec);
if (!ret_list || !list_count(ret_list)) {
error("Failed to set cluster state to drained");
}
FREE_NULL_LIST(cluster_cond.cluster_list);
FREE_NULL_LIST(ret_list);
}
static void _remove_self_from_federation(void)
{
list_t *ret_list = NULL;
slurmdb_federation_cond_t fed_cond;
slurmdb_federation_rec_t fed_rec;
slurmdb_cluster_rec_t cluster_rec;
log_flag(FEDR, "%s: removing self from federation %s",
__func__, fed_mgr_fed_rec->name);
slurmdb_init_federation_cond(&fed_cond, false);
slurmdb_init_federation_rec(&fed_rec, false);
slurmdb_init_cluster_rec(&cluster_rec, false);
fed_cond.federation_list = list_create(NULL);
list_append(fed_cond.federation_list, fed_mgr_fed_rec->name);
cluster_rec.name = xstrdup_printf("-%s", fed_mgr_cluster_rec->name);
fed_rec.cluster_list = list_create(NULL);
list_append(fed_rec.cluster_list, &cluster_rec);
ret_list = acct_storage_g_modify_federations(acct_db_conn,
slurm_conf.slurm_user_id,
&fed_cond, &fed_rec);
if (!ret_list || !list_count(ret_list)) {
error("Failed to remove federation from list");
}
FREE_NULL_LIST(ret_list);
FREE_NULL_LIST(fed_cond.federation_list);
FREE_NULL_LIST(fed_rec.cluster_list);
xfree(cluster_rec.name);
slurmctld_config.scheduling_disabled = false;
slurmctld_config.submissions_disabled = false;
_leave_federation();
}
static int _foreach_job_completed(void *object, void *arg)
{
job_record_t *job_ptr = (job_record_t *) object;
if (IS_JOB_COMPLETED(job_ptr))
return SLURM_SUCCESS;
return SLURM_ERROR;
}
static int _foreach_job_no_requeue(void *object, void *arg)
{
job_record_t *job_ptr = (job_record_t *) object;
if (job_ptr->details)
job_ptr->details->requeue = 0;
return SLURM_SUCCESS;
}
static void *_job_watch_thread(void *arg)
{
struct timespec ts = {0, 0};
slurmctld_lock_t job_write_fed_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_jobw", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, "fed_jobw");
}
#endif
log_flag(FEDR, "%s: started job_watch thread", __func__);
while (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
int tot_jobs, comp_jobs;
slurm_mutex_lock(&job_watch_mutex);
if (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
ts.tv_sec = time(NULL) + 5;
slurm_cond_timedwait(&job_watch_cond,
&job_watch_mutex, &ts);
}
slurm_mutex_unlock(&job_watch_mutex);
if (slurmctld_config.shutdown_time || stop_job_watch_thread)
break;
lock_slurmctld(job_write_fed_write_lock);
if (!fed_mgr_cluster_rec) {
/* not part of the federation anymore */
unlock_slurmctld(job_write_fed_write_lock);
break;
}
if ((tot_jobs = list_count(job_list)) !=
(comp_jobs = list_for_each(job_list, _foreach_job_completed,
NULL))) {
/* list_for_each negates the count if failed. */
int remaining_jobs = tot_jobs + comp_jobs + 1;
log_flag(FEDR, "%s: at least %d remaining jobs before being drained and/or removed from the federation",
__func__, remaining_jobs);
} else {
if (fed_mgr_cluster_rec->fed.state &
CLUSTER_FED_STATE_REMOVE) {
/* prevent federated jobs from being requeued */
list_for_each(job_list, _foreach_job_no_requeue,
NULL);
_remove_self_from_federation();
} else if (fed_mgr_cluster_rec->fed.state &
CLUSTER_FED_STATE_DRAIN) {
_mark_self_as_drained();
}
unlock_slurmctld(job_write_fed_write_lock);
break;
}
unlock_slurmctld(job_write_fed_write_lock);
}
slurm_mutex_lock(&job_watch_mutex);
job_watch_thread_running = false;
slurm_mutex_unlock(&job_watch_mutex);
log_flag(FEDR, "%s: exiting job watch thread", __func__);
return NULL;
}
static void _spawn_job_watch_thread()
{
slurm_mutex_lock(&job_watch_mutex);
if (!job_watch_thread_running) {
/* Detach the thread since it will exit once the cluster is
* drained or removed. */
stop_job_watch_thread = false;
job_watch_thread_running = true;
slurm_thread_create_detached(_job_watch_thread, NULL);
} else {
info("a job_watch_thread already exists");
}
slurm_mutex_unlock(&job_watch_mutex);
}
static void _remove_job_watch_thread()
{
slurm_mutex_lock(&job_watch_mutex);
if (job_watch_thread_running) {
stop_job_watch_thread = true;
slurm_cond_broadcast(&job_watch_cond);
}
slurm_mutex_unlock(&job_watch_mutex);
}
static int _clear_recv_conns(void *object, void *arg)
{
slurmdb_cluster_rec_t *cluster = (slurmdb_cluster_rec_t *)object;
cluster->fed.recv = NULL;
return SLURM_SUCCESS;
}
/*
* Must have FED unlocked prior to entering
*/
static void _fed_mgr_ptr_init(slurmdb_federation_rec_t *db_fed,
slurmdb_cluster_rec_t *cluster,
uint64_t *added_clusters)
{
list_itr_t *c_itr;
slurmdb_cluster_rec_t *tmp_cluster, *db_cluster;
uint32_t cluster_state;
int base_state;
bool drain_flag;
slurmctld_lock_t fed_write_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
xassert(cluster);
log_flag(FEDR, "Joining federation %s", db_fed->name);
lock_slurmctld(fed_write_lock);
if (fed_mgr_fed_rec) {
/* we are already part of a federation, preserve existing
* connections */
c_itr = list_iterator_create(db_fed->cluster_list);
while ((db_cluster = list_next(c_itr))) {
if (!xstrcmp(db_cluster->name,
slurm_conf.cluster_name)) {
fed_mgr_cluster_rec = db_cluster;
continue;
}
if (!(tmp_cluster =
fed_mgr_get_cluster_by_name(db_cluster->name))) {
*added_clusters |=
FED_SIBLING_BIT(db_cluster->fed.id);
/* don't worry about destroying the connection
* here. It will happen below when we free
* fed_mgr_fed_rec (automagically).
*/
continue;
}
slurm_mutex_lock(&tmp_cluster->lock);
/* transfer over the connections we already have */
db_cluster->fed.send = tmp_cluster->fed.send;
tmp_cluster->fed.send = NULL;
db_cluster->fed.recv = tmp_cluster->fed.recv;
tmp_cluster->fed.recv = NULL;
db_cluster->send_rpc = tmp_cluster->send_rpc;
tmp_cluster->send_rpc = NULL;
db_cluster->fed.sync_sent =
tmp_cluster->fed.sync_sent;
db_cluster->fed.sync_recvd =
tmp_cluster->fed.sync_recvd;
slurm_mutex_unlock(&tmp_cluster->lock);
list_delete_all(fed_mgr_fed_rec->cluster_list,
slurmdb_find_cluster_in_list,
db_cluster->name);
}
list_iterator_destroy(c_itr);
/* Remove any existing clusters that were part of the federation
* before and are not now. Don't free the recv connection now,
* it will get destroyed when the recv thread exits. */
list_for_each(fed_mgr_fed_rec->cluster_list, _clear_recv_conns,
NULL);
slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
} else
fed_mgr_cluster_rec = cluster;
fed_mgr_fed_rec = db_fed;
/* Set scheduling and submissions states */
cluster_state = fed_mgr_cluster_rec->fed.state;
base_state = (cluster_state & CLUSTER_FED_STATE_BASE);
drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN);
unlock_slurmctld(fed_write_lock);
if (drain_flag) {
slurmctld_config.scheduling_disabled = false;
slurmctld_config.submissions_disabled = true;
/* INACTIVE + DRAIN == DRAINED (already) */
if (base_state == CLUSTER_FED_STATE_ACTIVE)
_spawn_job_watch_thread();
} else if (base_state == CLUSTER_FED_STATE_ACTIVE) {
slurmctld_config.scheduling_disabled = false;
slurmctld_config.submissions_disabled = false;
} else if (base_state == CLUSTER_FED_STATE_INACTIVE) {
slurmctld_config.scheduling_disabled = true;
slurmctld_config.submissions_disabled = true;
}
if (!drain_flag)
_remove_job_watch_thread();
}
/*
* Must have FED write lock prior to entering
*/
static void _leave_federation(void)
{
if (!fed_mgr_fed_rec)
return;
log_flag(FEDR, "Leaving federation %s", fed_mgr_fed_rec->name);
_close_sibling_conns();
_remove_job_watch_thread();
slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
fed_mgr_fed_rec = NULL;
fed_mgr_cluster_rec = NULL;
}
static void _persist_callback_fini(void *arg)
{
persist_conn_t *persist_conn = arg;
slurmdb_cluster_rec_t *cluster;
slurmctld_lock_t fed_write_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
/* If we are shutting down just return or you will get deadlock since
* all these locks are already locked.
*/
if (!persist_conn || *persist_conn->shutdown)
return;
lock_slurmctld(fed_write_lock);
/* shutting down */
if (!fed_mgr_fed_rec) {
unlock_slurmctld(fed_write_lock);
return;
}
if (!(cluster =
fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
info("Couldn't find cluster %s?",
persist_conn->cluster_name);
unlock_slurmctld(fed_write_lock);
return;
}
slurm_mutex_lock(&cluster->lock);
/* This will get handled at the end of the thread, don't free it here */
cluster->fed.recv = NULL;
// persist_conn = cluster->fed.recv;
// slurm_persist_conn_close(persist_conn);
persist_conn = cluster->fed.send;
if (persist_conn) {
log_flag(FEDR, "Closing send to sibling cluster %s",
cluster->name);
slurm_persist_conn_destroy(persist_conn);
cluster->fed.send = NULL;
}
cluster->fed.sync_recvd = false;
cluster->fed.sync_sent = false;
slurm_mutex_unlock(&cluster->lock);
unlock_slurmctld(fed_write_lock);
}
static void _join_federation(slurmdb_federation_rec_t *fed,
slurmdb_cluster_rec_t *cluster,
uint64_t *added_clusters)
{
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
_fed_mgr_ptr_init(fed, cluster, added_clusters);
/* We must open the connections after we get out of the
* write_lock or we will end up in deadlock.
*/
lock_slurmctld(fed_read_lock);
_open_persist_sends();
unlock_slurmctld(fed_read_lock);
}
static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id,
job_desc_msg_t *data, uid_t uid)
{
int rc;
slurm_msg_t req_msg, tmp_msg;
sib_msg_t sib_msg;
buf_t *buffer;
slurm_msg_t_init(&tmp_msg);
tmp_msg.msg_type = REQUEST_UPDATE_JOB;
tmp_msg.data = data;
tmp_msg.protocol_version = conn->rpc_version;
buffer = init_buf(BUF_SIZE);
pack_msg(&tmp_msg, buffer);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_UPDATE;
sib_msg.data_buffer = buffer;
sib_msg.data_type = tmp_msg.msg_type;
sib_msg.data_version = tmp_msg.protocol_version;
sib_msg.req_uid = uid;
sib_msg.job_id = job_id;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = tmp_msg.protocol_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, 0, false);
FREE_NULL_BUFFER(buffer);
return rc;
}
static int _persist_update_job_resp(slurmdb_cluster_rec_t *conn,
uint32_t job_id, uint32_t return_code)
{
int rc;
slurm_msg_t req_msg;
sib_msg_t sib_msg;
slurm_msg_t_init(&req_msg);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_UPDATE_RESPONSE;
sib_msg.job_id = job_id;
sib_msg.return_code = return_code;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = conn->rpc_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
return rc;
}
/*
* Remove a sibling job that won't be scheduled
*
* IN conn - sibling connection
* IN job_id - the job's id
* IN job_state - state of job.
* IN return_code - return code of job.
* IN start_time - time the fed job started
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
static int _persist_fed_job_revoke(slurmdb_cluster_rec_t *conn, uint32_t job_id,
uint32_t job_state, uint32_t return_code,
time_t start_time)
{
int rc;
slurm_msg_t req_msg;
sib_msg_t sib_msg;
if (!conn->fed.send ||
(!((persist_conn_t *) conn->fed.send)->tls_conn))
return SLURM_SUCCESS;
slurm_msg_t_init(&req_msg);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_COMPLETE;
sib_msg.job_id = job_id;
sib_msg.job_state = job_state;
sib_msg.start_time = start_time;
sib_msg.return_code = return_code;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = conn->rpc_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
return rc;
}
static int _persist_fed_job_response(slurmdb_cluster_rec_t *conn, uint32_t job_id,
uint32_t return_code)
{
int rc;
slurm_msg_t req_msg;
sib_msg_t sib_msg;
slurm_msg_t_init(&req_msg);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_SUBMIT_RESP;
sib_msg.job_id = job_id;
sib_msg.return_code = return_code;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = conn->rpc_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
return rc;
}
/*
* Grab the fed lock on the sibling job.
*
* This message doesn't need to be queued because the other side just locks the
* fed_job_list, checks and gets out -- doesn't need the internal locks.
*
* IN conn - sibling connection
* IN job_id - the job's id
* IN cluster_id - cluster id of the cluster locking
* IN do_lock - true == lock, false == unlock
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
static int _persist_fed_job_lock_bool(slurmdb_cluster_rec_t *conn,
uint32_t job_id, uint32_t cluster_id,
bool do_lock)
{
int rc;
slurm_msg_t req_msg, resp_msg;
slurm_msg_t_init(&req_msg);
sib_msg_t sib_msg;
memset(&sib_msg, 0, sizeof(sib_msg_t));
sib_msg.job_id = job_id;
sib_msg.cluster_id = cluster_id;
if (do_lock)
req_msg.msg_type = REQUEST_SIB_JOB_LOCK;
else
req_msg.msg_type = REQUEST_SIB_JOB_UNLOCK;
req_msg.protocol_version = conn->rpc_version;
req_msg.data = &sib_msg;
if (_send_recv_msg(conn, &req_msg, &resp_msg, false)) {
rc = SLURM_ERROR;
goto end_it;
}
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if ((rc = slurm_get_return_code(resp_msg.msg_type,
resp_msg.data))) {
errno = rc;
rc = SLURM_ERROR;
}
break;
default:
errno = SLURM_UNEXPECTED_MSG_ERROR;
rc = SLURM_ERROR;
break;
}
end_it:
slurm_free_msg_members(&resp_msg);
return rc;
}
static int _persist_fed_job_lock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
uint32_t cluster_id)
{
return _persist_fed_job_lock_bool(conn, job_id, cluster_id, true);
}
static int _persist_fed_job_unlock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
uint32_t cluster_id)
{
return _persist_fed_job_lock_bool(conn, job_id, cluster_id, false);
}
/*
* Tell the origin cluster that the job was started
*
* This message is queued up as an rpc and a fed_job_update so that it can
* cancel the siblings without holding up the internal locks.
*
* IN conn - sibling connection
* IN job_id - the job's id
* IN cluster_id - cluster id of the cluster that started the job
* IN start_time - time the fed job started
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
static int _persist_fed_job_start(slurmdb_cluster_rec_t *conn,
uint32_t job_id, uint32_t cluster_id,
time_t start_time)
{
int rc;
slurm_msg_t req_msg;
slurm_msg_t_init(&req_msg);
sib_msg_t sib_msg;
memset(&sib_msg, 0, sizeof(sib_msg_t));
sib_msg.sib_msg_type = FED_JOB_START;
sib_msg.job_id = job_id;
sib_msg.cluster_id = cluster_id;
sib_msg.start_time = start_time;
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = conn->rpc_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
return rc;
}
/*
* Send the specified signal to all steps of an existing job
* IN job_id - the job's id
* IN signal - signal number
* IN flags - see KILL_JOB_* flags above
* IN uid - uid of user making the request.
* RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
*/
static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id,
uint16_t signal, uint16_t flags,
uid_t uid)
{
int rc = SLURM_SUCCESS;
slurm_msg_t req_msg, tmp_msg;
sib_msg_t sib_msg;
job_step_kill_msg_t kill_req;
buf_t *buffer;
/* Build and pack a kill_req msg to put in a sib_msg */
memset(&kill_req, 0, sizeof(job_step_kill_msg_t));
kill_req.step_id.job_id = job_id;
kill_req.sjob_id = NULL;
kill_req.step_id.step_id = NO_VAL;
kill_req.step_id.step_het_comp = NO_VAL;
kill_req.signal = signal;
kill_req.flags = flags;
slurm_msg_t_init(&tmp_msg);
tmp_msg.msg_type = REQUEST_CANCEL_JOB_STEP;
tmp_msg.data = &kill_req;
tmp_msg.protocol_version = conn->rpc_version;
buffer = init_buf(BUF_SIZE);
pack_msg(&tmp_msg, buffer);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_CANCEL;
sib_msg.data_buffer = buffer;
sib_msg.data_type = tmp_msg.msg_type;
sib_msg.data_version = tmp_msg.protocol_version;
sib_msg.req_uid = uid;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = tmp_msg.protocol_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
FREE_NULL_BUFFER(buffer);
return rc;
}
/*
* Tell the origin cluster to requeue the job
*
* IN conn - sibling connection
* IN job_id - the job's id
* IN start_time - time the fed job started
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn,
uint32_t job_id, uint32_t flags)
{
int rc;
requeue_msg_t requeue_req;
slurm_msg_t req_msg, tmp_msg;
sib_msg_t sib_msg;
buf_t *buffer;
xassert(conn);
requeue_req.job_id = job_id;
requeue_req.job_id_str = NULL;
requeue_req.flags = flags;
slurm_msg_t_init(&tmp_msg);
tmp_msg.msg_type = REQUEST_JOB_REQUEUE;
tmp_msg.data = &requeue_req;
tmp_msg.protocol_version = conn->rpc_version;
buffer = init_buf(BUF_SIZE);
pack_msg(&tmp_msg, buffer);
memset(&sib_msg, 0, sizeof(sib_msg));
sib_msg.sib_msg_type = FED_JOB_REQUEUE;
sib_msg.job_id = job_id;
sib_msg.data_buffer = buffer;
sib_msg.data_type = tmp_msg.msg_type;
sib_msg.data_version = tmp_msg.protocol_version;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = tmp_msg.protocol_version;
req_msg.data = &sib_msg;
rc = _queue_rpc(conn, &req_msg, job_id, false);
FREE_NULL_BUFFER(buffer);
return rc;
}
static int _find_sibling_by_id(void *x, void *key)
{
slurmdb_cluster_rec_t *object = (slurmdb_cluster_rec_t *)x;
uint32_t id = *(uint32_t *)key;
if (object->fed.id == id)
return 1;
return 0;
}
extern void add_fed_job_info(job_record_t *job_ptr)
{
fed_job_info_t *job_info;
job_info = xmalloc(sizeof(fed_job_info_t));
job_info->job_id = job_ptr->job_id;
job_info->siblings_active = job_ptr->fed_details->siblings_active;
job_info->siblings_viable = job_ptr->fed_details->siblings_viable;
slurm_mutex_lock(&fed_job_list_mutex);
if (fed_job_list)
list_append(fed_job_list, job_info);
else
xfree(job_info);
slurm_mutex_unlock(&fed_job_list_mutex);
}
static int _delete_fed_job_info_by_id(void *object, void *arg)
{
fed_job_info_t *job_info = (fed_job_info_t *)object;
uint32_t job_id = *(uint32_t *)arg;
if (job_info->job_id == job_id)
return true;
return false;
}
extern void fed_mgr_remove_fed_job_info(uint32_t job_id)
{
slurm_mutex_lock(&fed_job_list_mutex);
if (fed_job_list)
list_delete_all(fed_job_list, _delete_fed_job_info_by_id,
&job_id);
slurm_mutex_unlock(&fed_job_list_mutex);
}
static int _list_find_fed_job_info_by_jobid(void *x, void *key)
{
fed_job_info_t *job_info = (fed_job_info_t *)x;
uint32_t job_id = *(uint32_t *)key;
if (job_info->job_id == job_id)
return 1;
return 0;
}
/* Must have fed_job_mutex before entering */
static fed_job_info_t *_find_fed_job_info(uint32_t job_id)
{
if (!fed_job_list)
return NULL;
return list_find_first(fed_job_list, _list_find_fed_job_info_by_jobid,
&job_id);
}
static void _destroy_fed_job_update_info(void *object)
{
fed_job_update_info_t *job_update_info =
(fed_job_update_info_t *)object;
if (job_update_info) {
xfree(job_update_info->siblings_str);
xfree(job_update_info->submit_cluster);
slurm_free_job_info_msg(job_update_info->job_info_msg);
slurm_free_job_step_kill_msg(job_update_info->kill_msg);
slurm_free_job_desc_msg(job_update_info->submit_desc);
xfree(job_update_info);
}
}
static void _destroy_dep_msg(void *object)
{
slurm_free_dep_msg((dep_msg_t *)object);
}
static void _destroy_dep_update_msg(void *object)
{
slurm_free_dep_update_origin_msg((dep_update_origin_msg_t *)object);
}
static void _destroy_dep_job(void *object)
{
job_record_t *job_ptr = (job_record_t *)object;
if (job_ptr) {
xassert(job_ptr->magic == JOB_MAGIC);
xfree(job_ptr->fed_details);
xfree(job_ptr->name);
if (job_ptr->details) {
xassert(job_ptr->details->magic == DETAILS_MAGIC);
xfree(job_ptr->details->dependency);
FREE_NULL_LIST(job_ptr->details->depend_list);
xfree(job_ptr->details);
}
job_record_free_null_array_recs(job_ptr);
job_ptr->magic = 0;
job_ptr->job_id = 0;
job_ptr->user_id = 0;
xfree(job_ptr);
}
}
extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_id(uint32_t id)
{
uint32_t key = id;
return list_find_first(fed_mgr_fed_rec->cluster_list,
_find_sibling_by_id, &key);
}
extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_name(char *sib_name)
{
if (!fed_mgr_fed_rec)
return NULL;
return list_find_first(fed_mgr_fed_rec->cluster_list,
slurmdb_find_cluster_in_list, sib_name);
}
/*
* Revoke all sibling jobs except from cluster_id -- which the request came from
*
* IN job_ptr - job to revoke
* IN cluster_id - cluster id of cluster that initiated call. Don're revoke
* the job on this cluster -- it's the one that started the fed job.
* IN revoke_sibs - bitmap of siblings to revoke.
* IN start_time - time the fed job started
*/
static void _revoke_sibling_jobs(uint32_t job_id, uint32_t cluster_id,
uint64_t revoke_sibs, time_t start_time)
{
int id = 1;
if (!fed_mgr_fed_rec) /* Not part of federation anymore */
return;
while (revoke_sibs) {
if ((revoke_sibs & 1) &&
(id != fed_mgr_cluster_rec->fed.id) &&
(id != cluster_id)) {
slurmdb_cluster_rec_t *cluster =
fed_mgr_get_cluster_by_id(id);
if (!cluster) {
error("couldn't find cluster rec by id %d", id);
goto next_job;
}
_persist_fed_job_revoke(cluster, job_id, JOB_CANCELLED,
0, start_time);
}
next_job:
revoke_sibs >>= 1;
id++;
}
}
static int _remove_sibling_bit(job_record_t *job_ptr,
slurmdb_cluster_rec_t *sibling)
{
uint32_t origin_id;
if (!_is_fed_job(job_ptr, &origin_id))
return ESLURM_JOB_NOT_FEDERATED;
job_ptr->fed_details->siblings_active &=
~(FED_SIBLING_BIT(sibling->fed.id));
job_ptr->fed_details->siblings_viable &=
~(FED_SIBLING_BIT(sibling->fed.id));
if (!(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
job_state_set_flag(job_ptr, JOB_REVOKED);
else if (!job_ptr->fed_details->cluster_lock)
job_state_unset_flag(job_ptr, JOB_REVOKED);
update_job_fed_details(job_ptr);
return SLURM_SUCCESS;
}
/*
* Remove all pending federated jobs from the origin cluster.
*/
static void _cleanup_removed_origin_jobs(void)
{
list_itr_t *job_itr;
job_record_t *job_ptr;
time_t now = time(NULL);
uint32_t origin_id, sibling_id;
uint64_t sibling_bit;
if (!fed_mgr_cluster_rec)
return;
sibling_id = fed_mgr_cluster_rec->fed.id;
sibling_bit = FED_SIBLING_BIT(sibling_id);
job_itr = list_iterator_create(job_list);
while ((job_ptr = list_next(job_itr))) {
bool running_remotely = false;
uint64_t siblings_viable;
if (IS_JOB_COMPLETED(job_ptr))
continue;
if (!_is_fed_job(job_ptr, &origin_id))
continue;
siblings_viable = job_ptr->fed_details->siblings_viable;
if (sibling_id == origin_id &&
job_ptr->fed_details->cluster_lock)
running_remotely = true;
/* free fed_job_details so it can't call home. */
job_record_free_fed_details(&job_ptr->fed_details);
/* allow running/completing jobs to finish. */
if (IS_JOB_COMPLETED(job_ptr) ||
IS_JOB_COMPLETING(job_ptr) ||
IS_JOB_RUNNING(job_ptr))
continue;
/* If this job is the only viable sibling then just let
* it run as a non-federated job. The origin should remove the
* tracking job. */
if (!(siblings_viable & ~sibling_bit))
continue;
/*
* If a job is pending and not revoked on the origin cluster
* leave it as a non-federated job.
*/
if ((origin_id == sibling_id) &&
IS_JOB_PENDING(job_ptr) && !IS_JOB_REVOKED(job_ptr))
continue;
/* Free the resp_host so that the srun doesn't get
* signaled about the job going away. The job could
* still run on another sibling. */
if (running_remotely ||
(origin_id != sibling_id))
xfree(job_ptr->resp_host);
job_state_set(job_ptr, (JOB_CANCELLED | JOB_REVOKED));
job_ptr->start_time = now;
job_ptr->end_time = now;
job_completion_logger(job_ptr, false);
}
list_iterator_destroy(job_itr);
/* Don't test these jobs for remote dependencies anymore */
if (remote_dep_job_list) {
log_flag(FEDR, "%s: Remove all jobs in remote_dep_job_list",
__func__);
slurm_mutex_lock(&dep_job_list_mutex);
list_flush(remote_dep_job_list);
slurm_mutex_unlock(&dep_job_list_mutex);
}
}
/*
* Remove all pending jobs that originated from the given cluster.
*/
static void _cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t *cluster)
{
list_itr_t *job_itr;
job_record_t *job_ptr;
time_t now = time(NULL);
uint32_t origin_id, sibling_id;
uint64_t origin_bit, sibling_bit;
if (!fed_mgr_cluster_rec)
return;
sibling_id = fed_mgr_cluster_rec->fed.id;
sibling_bit = FED_SIBLING_BIT(sibling_id);
job_itr = list_iterator_create(job_list);
while ((job_ptr = list_next(job_itr))) {
uint64_t siblings_viable;
if (IS_JOB_COMPLETED(job_ptr))
continue;
if (!_is_fed_job(job_ptr, &origin_id))
continue;
origin_bit = FED_SIBLING_BIT(origin_id);
siblings_viable = job_ptr->fed_details->siblings_viable;
/* Remove cluster from viable,active siblings */
_remove_sibling_bit(job_ptr, cluster);
/* Remove jobs that
* 1. originated from the removed cluster
* 2. origin jobs that are tracking the running job
* 2. origin jobs that are tracking the pending job */
if (origin_id == cluster->fed.id ||
(job_ptr->fed_details &&
origin_id == sibling_id &&
job_ptr->fed_details->cluster_lock == cluster->fed.id) ||
(siblings_viable & FED_SIBLING_BIT(cluster->fed.id) &&
!(siblings_viable & ~FED_SIBLING_BIT(cluster->fed.id)))) {
/*
* If this job originated from the origin (which is
* being removed) and the origin is not a viable sibling
* and there are more than one sibling then let the job
* remain as a federated job to be scheduled amongst
* it's siblings.
*/
if (IS_JOB_PENDING(job_ptr) &&
(origin_id == cluster->fed.id) &&
!(siblings_viable & origin_bit) &&
(siblings_viable & ~sibling_bit))
continue;
/* free fed_job_details so it can't call home. */
job_record_free_fed_details(&job_ptr->fed_details);
/*
* If this job originated from the origin (which is
* being removed) and this sibling is the only viable
* sibling then let it run/pend as a non-federated job.
*/
if ((origin_id == cluster->fed.id) &&
!(siblings_viable & ~sibling_bit))
continue;
if (!(IS_JOB_COMPLETED(job_ptr) ||
IS_JOB_COMPLETING(job_ptr) ||
IS_JOB_RUNNING(job_ptr))) {
/* Free the resp_host so that the srun doesn't
* get signaled about the job going away. The
* job could still run on another sibling. */
xfree(job_ptr->resp_host);
job_state_set(job_ptr, (JOB_CANCELLED |
JOB_REVOKED));
job_ptr->start_time = now;
job_ptr->end_time = now;
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
job_completion_logger(job_ptr, false);
}
}
}
list_iterator_destroy(job_itr);
}
static void _handle_removed_clusters(slurmdb_federation_rec_t *db_fed,
uint64_t *removed_clusters)
{
list_itr_t *itr;
slurmdb_cluster_rec_t *tmp_cluster = NULL;
itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((tmp_cluster = list_next(itr))) {
if (tmp_cluster->name &&
!(list_find_first(db_fed->cluster_list,
slurmdb_find_cluster_in_list,
tmp_cluster->name))) {
info("cluster %s was removed from the federation",
tmp_cluster->name);
*removed_clusters |=
FED_SIBLING_BIT(tmp_cluster->fed.id);
_cleanup_removed_cluster_jobs(tmp_cluster);
}
}
list_iterator_destroy(itr);
}
/* Parse a RESPONSE_CTLD_MULT_MSG message and return a bit set for every
* successful operation */
bitstr_t *_parse_resp_ctld_mult(slurm_msg_t *resp_msg)
{
ctld_list_msg_t *ctld_resp_msg = resp_msg->data;
list_itr_t *iter = NULL;
bitstr_t *success_bits;
slurm_msg_t sub_msg;
return_code_msg_t *rc_msg;
buf_t *single_resp_buf = NULL;
int resp_cnt, resp_inx = -1;
xassert(resp_msg->msg_type == RESPONSE_CTLD_MULT_MSG);
if (!ctld_resp_msg->my_list) {
error("%s: RESPONSE_CTLD_MULT_MSG has no list component",
__func__);
return NULL;
}
resp_cnt = list_count(ctld_resp_msg->my_list);
success_bits = bit_alloc(resp_cnt);
iter = list_iterator_create(ctld_resp_msg->my_list);
while ((single_resp_buf = list_next(iter))) {
resp_inx++;
slurm_msg_t_init(&sub_msg);
if (unpack16(&sub_msg.msg_type, single_resp_buf) ||
unpack_msg(&sub_msg, single_resp_buf)) {
error("%s: Sub-message unpack error for Message Type:%s",
__func__, rpc_num2string(sub_msg.msg_type));
continue;
}
if (sub_msg.msg_type != RESPONSE_SLURM_RC) {
error("%s: Unexpected Message Type:%s",
__func__, rpc_num2string(sub_msg.msg_type));
} else {
rc_msg = (return_code_msg_t *) sub_msg.data;
if (rc_msg->return_code == SLURM_SUCCESS)
bit_set(success_bits, resp_inx);
}
(void) slurm_free_msg_data(sub_msg.msg_type, sub_msg.data);
}
return success_bits;
}
static int _fed_mgr_job_allocate_sib(char *sib_name, job_desc_msg_t *job_desc,
uint16_t start_protocol_version,
bool interactive_job)
{
int error_code = SLURM_SUCCESS;
job_record_t *job_ptr = NULL;
char *err_msg = NULL;
bool reject_job = false;
slurmdb_cluster_rec_t *sibling;
uid_t uid = 0;
slurm_msg_t response_msg;
slurm_msg_t_init(&response_msg);
xassert(sib_name);
xassert(job_desc);
if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
error_code = ESLURM_INVALID_CLUSTER_NAME;
error("Invalid sibling name");
} else if ((job_desc->alloc_node == NULL) ||
(job_desc->alloc_node[0] == '\0')) {
error_code = ESLURM_INVALID_NODE_NAME;
error("REQUEST_SUBMIT_BATCH_JOB lacks alloc_node");
}
if (error_code == SLURM_SUCCESS)
error_code = validate_job_create_req(job_desc,uid,&err_msg);
if (error_code) {
reject_job = true;
goto send_msg;
}
/* Create new job allocation */
job_desc->het_job_offset = NO_VAL;
error_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
interactive_job, uid, false, &job_ptr,
&err_msg, start_protocol_version);
if (!job_ptr ||
(error_code && job_ptr->job_state == JOB_FAILED))
reject_job = true;
if (job_desc->immediate &&
(error_code != SLURM_SUCCESS))
error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
send_msg:
/* Send response back about origin jobid if an error occurred. */
if (reject_job)
_persist_fed_job_response(sibling, job_desc->job_id, error_code);
else {
if (!(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
job_state_set_flag(job_ptr, JOB_REVOKED);
add_fed_job_info(job_ptr);
schedule_job_save(); /* Has own locks */
schedule_node_save(); /* Has own locks */
queue_job_scheduler();
}
xfree(err_msg);
return SLURM_SUCCESS;
}
static void _do_fed_job_complete(job_record_t *job_ptr, uint32_t job_state,
uint32_t exit_code, time_t start_time)
{
if (job_ptr->job_state & JOB_REQUEUE_FED) {
/* Remove JOB_REQUEUE_FED and JOB_COMPLETING once
* sibling reports that sibling job is done. Leave other
* state in place. JOB_SPECIAL_EXIT may be in the
* states. */
job_state_unset_flag(job_ptr, (JOB_PENDING | JOB_COMPLETING));
batch_requeue_fini(job_ptr);
} else {
fed_mgr_job_revoke(job_ptr, true, job_state, exit_code,
start_time);
}
}
static void _handle_fed_job_complete(fed_job_update_info_t *job_update_info)
{
job_record_t *job_ptr;
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_write_lock);
if (!(job_ptr = find_job_record(job_update_info->job_id))) {
error("%s: failed to find job_record for fed JobId=%u",
__func__, job_update_info->job_id);
} else if (!job_ptr->fed_details) {
debug2("%s: %pJ not federated anymore", __func__, job_ptr);
} else if (IS_JOB_RUNNING(job_ptr)) {
/*
* The job could have started between the time that the origin
* sent the complete message and now.
*/
slurm_msg_t msg;
sib_msg_t sib_msg = {0};
job_step_kill_msg_t *kill_req;
/* Build and pack a kill_req msg to put in a sib_msg */
kill_req = xmalloc(sizeof(job_step_kill_msg_t));
kill_req->step_id.job_id = job_update_info->job_id;
kill_req->sjob_id = NULL;
kill_req->step_id.step_id = SLURM_BATCH_SCRIPT;
kill_req->step_id.step_het_comp = NO_VAL;
kill_req->signal = SIGKILL;
kill_req->flags = 0;
sib_msg.data = kill_req;
slurm_msg_t_init(&msg);
msg.data = &sib_msg;
log_flag(FEDR, "%s: %pJ running now, just going to cancel it.",
__func__, job_ptr);
_q_sib_job_cancel(&msg, job_update_info->uid);
} else {
_do_fed_job_complete(job_ptr, job_update_info->job_state,
job_update_info->return_code,
job_update_info->start_time);
}
unlock_slurmctld(job_write_lock);
}
static void _handle_fed_job_cancel(fed_job_update_info_t *job_update_info)
{
kill_job_step(job_update_info->kill_msg, job_update_info->uid);
}
static void
_handle_fed_job_remove_active_sib_bit(fed_job_update_info_t *job_update_info)
{
fed_job_info_t *job_info;
job_record_t *job_ptr;
slurmdb_cluster_rec_t *sibling;
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_write_lock);
if (!(job_ptr = find_job_record(job_update_info->job_id))) {
error("%s: failed to find job_record for fed JobId=%u",
__func__, job_update_info->job_id);
unlock_slurmctld(job_write_lock);
return;
} else if (!job_ptr->fed_details) {
debug2("%s: %pJ not federated anymore", __func__, job_ptr);
unlock_slurmctld(job_write_lock);
return;
}
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
error("%s: failed to find fed job info for fed JobId=%u",
__func__, job_update_info->job_id);
slurm_mutex_unlock(&fed_job_list_mutex);
unlock_slurmctld(job_write_lock);
return;
}
sibling = fed_mgr_get_cluster_by_name(job_update_info->siblings_str);
if (sibling) {
job_info->siblings_active &=
~(FED_SIBLING_BIT(sibling->fed.id));
job_ptr->fed_details->siblings_active =
job_info->siblings_active;
update_job_fed_details(job_ptr);
}
slurm_mutex_unlock(&fed_job_list_mutex);
unlock_slurmctld(job_write_lock);
}
static void _handle_fed_job_requeue(fed_job_update_info_t *job_update_info)
{
int rc;
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_write_lock);
if ((rc = job_requeue(job_update_info->uid, job_update_info->job_id,
NULL, false, job_update_info->flags)))
error("failed to requeue fed JobId=%u - rc:%d",
job_update_info->job_id, rc);
unlock_slurmctld(job_write_lock);
}
/*
* Job has been started, revoke the sibling jobs.
*
* This is the common code between queued and local job starts.
* This can be done when the job is starting on the origin cluster without
* queueing because the cluster already has the job write lock and fed read
* lock.
*
* Must have fed_job_list mutex locked and job write_lock set.
*/
static void _fed_job_start_revoke(fed_job_info_t *job_info,
job_record_t *job_ptr, time_t start_time)
{
uint32_t cluster_lock;
uint64_t old_active;
uint64_t old_viable;
cluster_lock = job_info->cluster_lock;
old_active = job_info->siblings_active;
old_viable = job_info->siblings_viable;
job_ptr->fed_details->cluster_lock = cluster_lock;
job_ptr->fed_details->siblings_active =
job_info->siblings_active =
FED_SIBLING_BIT(cluster_lock);
update_job_fed_details(job_ptr);
if (old_active & ~FED_SIBLING_BIT(cluster_lock)) {
/* There are siblings that need to be removed */
log_flag(FEDR, "%s: %pJ is running on cluster id %d, revoking remote siblings (active:%"PRIu64" viable:%"PRIu64")",
__func__, job_ptr, cluster_lock, old_active,
old_viable);
_revoke_sibling_jobs(job_ptr->job_id, cluster_lock,
old_active, start_time);
}
}
static void _handle_fed_job_start(fed_job_update_info_t *job_update_info)
{
fed_job_info_t *job_info;
job_record_t *job_ptr;
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_write_lock);
if (!(job_ptr = find_job_record(job_update_info->job_id))) {
error("%s: failed to find job_record for fed JobId=%u",
__func__, job_update_info->job_id);
unlock_slurmctld(job_write_lock);
return;
} else if (!job_ptr->fed_details) {
debug2("%s: %pJ not federated anymore", __func__, job_ptr);
unlock_slurmctld(job_write_lock);
return;
}
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info =
_find_fed_job_info(job_update_info->job_id))) {
error("%s: failed to find fed job info for fed JobId=%u",
__func__, job_update_info->job_id);
slurm_mutex_unlock(&fed_job_list_mutex);
unlock_slurmctld(job_write_lock);
return;
}
_fed_job_start_revoke(job_info, job_ptr, job_update_info->start_time);
slurm_mutex_unlock(&fed_job_list_mutex);
if (job_info->cluster_lock != fed_mgr_cluster_rec->fed.id) {
log_flag(FEDR, "%s: %pJ is running remotely, revoking origin tracking job",
__func__, job_ptr);
/* leave as pending so that it will stay around */
fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
job_update_info->start_time);
}
unlock_slurmctld(job_write_lock);
}
static int _list_find_jobid(void *x, void *key)
{
uint32_t src_jobid = *(uint32_t *)x;
uint32_t key_jobid = *(uint32_t *)key;
if (src_jobid == key_jobid)
return 1;
return 0;
}
static void _handle_fed_job_submission(fed_job_update_info_t *job_update_info)
{
job_record_t *job_ptr;
bool interactive_job =
(job_update_info->type == FED_JOB_SUBMIT_INT) ?
true : false;
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
log_flag(FEDR, "%s: submitting %s sibling JobId=%u from %s",
__func__, (interactive_job) ? "interactive" : "batch",
job_update_info->submit_desc->job_id,
job_update_info->submit_cluster);
/* do this outside the job write lock */
delete_job_desc_files(job_update_info->job_id);
lock_slurmctld(job_write_lock);
if ((job_ptr = find_job_record(job_update_info->job_id))) {
debug("Found existing fed %pJ, going to requeue/unlink it",
job_ptr);
/* Delete job quickly */
job_state_set_flag(job_ptr, JOB_REVOKED);
unlink_job_record(job_ptr);
/*
* Make sure that the file delete request is purged from list
* -- added from purge_job_record() -- before job is allocated
* again.
*/
list_delete_all(purge_files_list, _list_find_jobid,
&job_update_info->job_id);
}
_fed_mgr_job_allocate_sib(job_update_info->submit_cluster,
job_update_info->submit_desc,
job_update_info->submit_proto_ver,
interactive_job);
unlock_slurmctld(job_write_lock);
}
static void _handle_fed_job_update(fed_job_update_info_t *job_update_info)
{
int rc;
slurm_msg_t msg;
slurm_msg_t_init(&msg);
job_desc_msg_t *job_desc = job_update_info->submit_desc;
slurmdb_cluster_rec_t *sibling;
slurmctld_lock_t job_write_lock = {
READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
xassert(job_desc);
/* scontrol always sets job_id_str */
job_desc->job_id = job_update_info->job_id;
msg.data = job_desc;
lock_slurmctld(job_write_lock);
rc = update_job(&msg, job_update_info->uid, false);
unlock_slurmctld(job_write_lock);
lock_slurmctld(fed_read_lock);
if (!(sibling =
fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
error("Invalid sibling name");
} else {
_persist_update_job_resp(sibling, job_update_info->job_id, rc);
}
unlock_slurmctld(fed_read_lock);
}
static void
_handle_fed_job_update_response(fed_job_update_info_t *job_update_info)
{
fed_job_info_t *job_info;
slurmdb_cluster_rec_t *sibling;
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
error("%s: failed to find fed job info for fed JobId=%u",
__func__, job_update_info->job_id);
slurm_mutex_unlock(&fed_job_list_mutex);
return;
}
lock_slurmctld(fed_read_lock);
if (!(sibling =
fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
error("Invalid sibling name");
unlock_slurmctld(fed_read_lock);
slurm_mutex_unlock(&fed_job_list_mutex);
return;
}
if (job_info->updating_sibs[sibling->fed.id])
job_info->updating_sibs[sibling->fed.id]--;
else
error("%s this should never happen", __func__);
slurm_mutex_unlock(&fed_job_list_mutex);
unlock_slurmctld(fed_read_lock);
}
static int _handle_fed_job_sync(fed_job_update_info_t *job_update_info)
{
int rc = SLURM_SUCCESS;
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_write_lock);
rc = _sync_jobs(job_update_info->submit_cluster,
job_update_info->job_info_msg,
job_update_info->start_time);
unlock_slurmctld(job_write_lock);
return rc;
}
/* Have to send the job sync from the job_update thread so that it can
* independently get the job read lock. */
static int _handle_fed_send_job_sync(fed_job_update_info_t *job_update_info)
{
int rc = SLURM_SUCCESS;
list_t *jobids = NULL;
slurm_msg_t req_msg, job_msg;
sib_msg_t sib_msg = {0};
slurmdb_cluster_rec_t *sibling;
buf_t *job_buffer = NULL, *buffer = NULL;
time_t sync_time = 0;
char *sib_name = job_update_info->submit_cluster;
slurmctld_lock_t job_read_lock = {
READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(job_read_lock);
if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
error("%s: Invalid sibling name %s", __func__, sib_name);
unlock_slurmctld(job_read_lock);
return SLURM_ERROR;
}
slurm_mutex_lock(&sibling->lock);
if (!sibling->rpc_version && sibling->fed.recv) {
sibling->rpc_version =
((persist_conn_t *) sibling->fed.recv)->version;
}
slurm_mutex_unlock(&sibling->lock);
if (!sibling->rpc_version) {
error("%s: cluster %s doesn't have rpc_version yet.",
__func__, sib_name);
unlock_slurmctld(job_read_lock);
return SLURM_ERROR;
}
sync_time = time(NULL);
jobids = _get_sync_jobid_list(sibling->fed.id, sync_time);
job_buffer = pack_spec_jobs(jobids, SHOW_ALL, slurm_conf.slurm_user_id,
NO_VAL, sibling->rpc_version);
FREE_NULL_LIST(jobids);
unlock_slurmctld(job_read_lock);
slurm_msg_t_init(&job_msg);
job_msg.protocol_version = sibling->rpc_version;
job_msg.msg_type = RESPONSE_JOB_INFO;
job_msg.data = job_buffer;
buffer = init_buf(BUF_SIZE);
pack_msg(&job_msg, buffer);
memset(&sib_msg, 0, sizeof(sib_msg_t));
sib_msg.sib_msg_type = FED_JOB_SYNC;
sib_msg.data_buffer = buffer;
sib_msg.data_type = job_msg.msg_type;
sib_msg.data_version = job_msg.protocol_version;
sib_msg.start_time = sync_time;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.protocol_version = job_msg.protocol_version;
req_msg.data = &sib_msg;
sibling->fed.sync_sent = true;
rc = _queue_rpc(sibling, &req_msg, 0, false);
FREE_NULL_BUFFER(job_buffer);
FREE_NULL_BUFFER(buffer);
return rc;
}
static int _foreach_fed_job_update_info(fed_job_update_info_t *job_update_info)
{
if (!fed_mgr_cluster_rec) {
info("Not part of federation anymore, not performing fed job updates");
return SLURM_SUCCESS;
}
log_flag(FEDR, "%s: JobId=%u type:%s",
__func__, job_update_info->job_id,
_job_update_type_str(job_update_info->type));
switch (job_update_info->type) {
case FED_JOB_COMPLETE:
_handle_fed_job_complete(job_update_info);
break;
case FED_JOB_CANCEL:
_handle_fed_job_cancel(job_update_info);
break;
case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
_handle_fed_job_remove_active_sib_bit(job_update_info);
break;
case FED_JOB_REQUEUE:
_handle_fed_job_requeue(job_update_info);
break;
case FED_JOB_START:
_handle_fed_job_start(job_update_info);
break;
case FED_JOB_SUBMIT_BATCH:
case FED_JOB_SUBMIT_INT:
_handle_fed_job_submission(job_update_info);
break;
case FED_JOB_SYNC:
_handle_fed_job_sync(job_update_info);
break;
case FED_JOB_UPDATE:
_handle_fed_job_update(job_update_info);
break;
case FED_JOB_UPDATE_RESPONSE:
_handle_fed_job_update_response(job_update_info);
break;
case FED_SEND_JOB_SYNC:
_handle_fed_send_job_sync(job_update_info);
break;
default:
error("Invalid fed_job type: %d JobId=%u",
job_update_info->type, job_update_info->job_id);
}
return SLURM_SUCCESS;
}
static void _update_origin_job_dep(job_record_t *job_ptr,
slurmdb_cluster_rec_t *origin)
{
slurm_msg_t req_msg;
dep_update_origin_msg_t dep_update_msg = { 0 };
xassert(job_ptr);
xassert(job_ptr->details);
xassert(job_ptr->details->depend_list);
xassert(origin);
if (origin == fed_mgr_cluster_rec) {
error("%s: Cannot send dependency update of %pJ to self - were clusters removed then re-added to the federation in a different order?",
__func__, job_ptr);
return;
}
dep_update_msg.depend_list = job_ptr->details->depend_list;
dep_update_msg.job_id = job_ptr->job_id;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_UPDATE_ORIGIN_DEP;
req_msg.data = &dep_update_msg;
if (_queue_rpc(origin, &req_msg, 0, false))
error("%s: Failed to send dependency update for %pJ",
__func__, job_ptr);
}
static int _find_local_dep(void *arg, void *key)
{
depend_spec_t *dep_ptr = (depend_spec_t *) arg;
return !(dep_ptr->depend_flags & SLURM_FLAGS_REMOTE);
}
static int _find_job_by_id(void *arg, void *key)
{
job_record_t *job_ptr = (job_record_t *) arg;
uint32_t job_id = *((uint32_t *) key);
return job_ptr->job_id == job_id;
}
static void _handle_recv_remote_dep(dep_msg_t *remote_dep_info)
{
/*
* update_job_dependency() will:
* - read the job list (need job read lock)
* - call fed_mgr_is_origin_job_id (need fed read lock)
*/
int rc, tmp;
slurmctld_lock_t job_read_lock = { .job = READ_LOCK, .fed = READ_LOCK };
job_record_t *job_ptr = xmalloc(sizeof *job_ptr);
job_ptr->magic = JOB_MAGIC;
job_ptr->details = xmalloc(sizeof *(job_ptr->details));
job_ptr->details->magic = DETAILS_MAGIC;
job_ptr->job_id = remote_dep_info->job_id;
job_ptr->name = remote_dep_info->job_name;
job_ptr->user_id = remote_dep_info->user_id;
/*
* Initialize array info. Allocate space for job_ptr->array_recs if
* the job is an array so it's recognized as an array, but it's not used
* anywhere.
*/
job_ptr->array_job_id = remote_dep_info->array_job_id;
job_ptr->array_task_id = remote_dep_info->array_task_id;
/*
* We need to allocate space for job_ptr->array_recs if
* it's an array job, but we don't use anything in it.
*/
if (remote_dep_info->is_array)
job_ptr->array_recs = xmalloc(sizeof *(job_ptr->array_recs));
/*
* We need to allocate space for fed_details so
* other places know this is a fed job, but we don't
* need to set anything specific in it.
*/
job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
log_flag(FEDR, "%s: Got job_id: %u, name: \"%s\", array_task_id: %u, dependency: \"%s\", is_array? %s, user_id: %u",
__func__, remote_dep_info->job_id, remote_dep_info->job_name,
remote_dep_info->array_task_id, remote_dep_info->dependency,
remote_dep_info->is_array ? "yes" : "no",
remote_dep_info->user_id);
/* NULL string so it doesn't get free'd since it's used by job_ptr */
remote_dep_info->job_name = NULL;
/* Create and validate the dependency. */
lock_slurmctld(job_read_lock);
rc = update_job_dependency(job_ptr, remote_dep_info->dependency);
unlock_slurmctld(job_read_lock);
if (rc) {
error("%s: Invalid dependency %s for %pJ: %s",
__func__, remote_dep_info->dependency, job_ptr,
slurm_strerror(rc));
_destroy_dep_job(job_ptr);
} else {
job_record_t *tmp_job;
list_itr_t *itr;
/*
* Remove the old reference to this job from remote_dep_job_list
* so that we don't continue testing the old dependencies.
*/
slurm_mutex_lock(&dep_job_list_mutex);
itr = list_iterator_create(remote_dep_job_list);
while ((tmp_job = list_next(itr))) {
if (tmp_job->job_id == job_ptr->job_id) {
list_delete_item(itr);
break;
}
}
list_iterator_destroy(itr);
/*
* If we were sent a list of 0 dependencies, that means
* the dependency was updated and cleared, so don't
* add it to the list to test. Also only add it if
* there are dependencies local to this cluster.
*/
if (list_count(job_ptr->details->depend_list) &&
list_find_first(job_ptr->details->depend_list,
_find_local_dep, &tmp))
list_append(remote_dep_job_list, job_ptr);
else
_destroy_dep_job(job_ptr);
slurm_mutex_unlock(&dep_job_list_mutex);
}
_destroy_dep_msg(remote_dep_info);
}
static void _handle_dep_update_origin_msgs(void)
{
job_record_t *job_ptr;
dep_update_origin_msg_t *dep_update_msg;
list_t *update_job_list = NULL;
slurmctld_lock_t job_write_lock = {
.conf = READ_LOCK, .job = WRITE_LOCK, .fed = READ_LOCK };
if (!list_count(origin_dep_update_list))
return;
lock_slurmctld(job_write_lock);
while ((dep_update_msg = list_pop(origin_dep_update_list))) {
if (!(job_ptr = find_job_record(dep_update_msg->job_id))) {
/*
* Maybe the job was cancelled and purged before
* the dependency update got here or was able
* to be processed. Regardless, this job doesn't
* exist here, so we have to throw out this
* dependency update message.
*/
log_flag(DEPENDENCY, "%s: Could not find job %u, cannot process dependency update. Perhaps the jobs was purged before we got here.",
__func__, dep_update_msg->job_id);
slurm_free_dep_update_origin_msg(dep_update_msg);
continue;
} else if (!job_ptr->details ||
!job_ptr->details->depend_list) {
/*
* This might happen if the job's dependencies
* were updated to be none before the dependency
* update came from the sibling cluster.
*/
log_flag(DEPENDENCY, "%s: %pJ doesn't have dependencies, cannot process dependency update",
__func__, job_ptr);
slurm_free_dep_update_origin_msg(dep_update_msg);
continue;
}
if (update_job_dependency_list(job_ptr,
dep_update_msg->depend_list)) {
if (!update_job_list) {
update_job_list = list_create(NULL);
list_append(update_job_list, job_ptr);
} else if (!list_find_first(update_job_list,
_find_job_by_id,
&job_ptr->job_id))
list_append(update_job_list, job_ptr);
}
slurm_free_dep_update_origin_msg(dep_update_msg);
}
if (update_job_list) {
list_for_each(update_job_list, handle_job_dependency_updates,
NULL);
FREE_NULL_LIST(update_job_list);
}
unlock_slurmctld(job_write_lock);
}
static void *_test_dep_job_thread(void *arg)
{
time_t last_test = 0;
time_t now;
struct timespec ts = {0, 0};
slurmctld_lock_t job_read_lock = {
.job = READ_LOCK, .fed = READ_LOCK };
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_test_dep", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__,
"fed_test_dep");
}
#endif
while (!slurmctld_config.shutdown_time) {
now = time(NULL);
/* Only test after joining a federation. */
if (fed_mgr_fed_rec && fed_mgr_cluster_rec &&
((now - last_test) > TEST_REMOTE_DEP_FREQ)) {
last_test = now;
lock_slurmctld(job_read_lock);
fed_mgr_test_remote_dependencies();
unlock_slurmctld(job_read_lock);
}
slurm_mutex_lock(&test_dep_mutex);
ts.tv_sec = now + 2;
slurm_cond_timedwait(&test_dep_cond,
&test_dep_mutex, &ts);
slurm_mutex_unlock(&test_dep_mutex);
}
return NULL;
}
static void *_origin_dep_update_thread(void *arg)
{
struct timespec ts = {0, 0};
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_update_dep", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__,
"fed_update_dep");
}
#endif
while (!slurmctld_config.shutdown_time) {
slurm_mutex_lock(&origin_dep_update_mutex);
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&origin_dep_cond,
&origin_dep_update_mutex, &ts);
slurm_mutex_unlock(&origin_dep_update_mutex);
if (slurmctld_config.shutdown_time)
break;
/* Wait for fed_mgr_init() */
if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
continue;
_handle_dep_update_origin_msgs();
}
return NULL;
}
static void *_remote_dep_recv_thread(void *arg)
{
struct timespec ts = {0, 0};
dep_msg_t *remote_dep_info;
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_remote_dep", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__,
"fed_remote_dep");
}
#endif
while (!slurmctld_config.shutdown_time) {
slurm_mutex_lock(&remote_dep_recv_mutex);
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&remote_dep_cond,
&remote_dep_recv_mutex, &ts);
slurm_mutex_unlock(&remote_dep_recv_mutex);
if (slurmctld_config.shutdown_time)
break;
/* Wait for fed_mgr_init() */
if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
continue;
while ((remote_dep_info = list_pop(remote_dep_recv_list))) {
_handle_recv_remote_dep(remote_dep_info);
}
}
return NULL;
}
/* Start a thread to manage queued sibling requests */
static void *_fed_job_update_thread(void *arg)
{
struct timespec ts = {0, 0};
fed_job_update_info_t *job_update_info;
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_jobs", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, "fed_jobs");
}
#endif
while (!slurmctld_config.shutdown_time) {
slurm_mutex_lock(&job_update_mutex);
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&job_update_cond,
&job_update_mutex, &ts);
slurm_mutex_unlock(&job_update_mutex);
if (slurmctld_config.shutdown_time)
break;
while ((job_update_info = list_pop(fed_job_update_list))) {
_foreach_fed_job_update_info(job_update_info);
_destroy_fed_job_update_info(job_update_info);
}
}
return NULL;
}
/* Start a thread to manage queued agent requests */
static void *_agent_thread(void *arg)
{
slurmdb_cluster_rec_t *cluster;
struct timespec ts = {0, 0};
list_itr_t *cluster_iter, *rpc_iter;
agent_queue_t *rpc_rec;
slurm_msg_t req_msg, resp_msg;
ctld_list_msg_t ctld_req_msg;
bitstr_t *success_bits;
int rc, resp_inx, success_size;
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "fed_agent", NULL, NULL, NULL) < 0) {
error("%s: cannot set my name to %s %m", __func__, "fed_agent");
}
#endif
while (!slurmctld_config.shutdown_time) {
/* Wait for new work or re-issue RPCs after 2 second wait */
slurm_mutex_lock(&agent_mutex);
if (!slurmctld_config.shutdown_time && !agent_queue_size) {
ts.tv_sec = time(NULL) + 2;
slurm_cond_timedwait(&agent_cond, &agent_mutex, &ts);
}
agent_queue_size = 0;
slurm_mutex_unlock(&agent_mutex);
if (slurmctld_config.shutdown_time)
break;
lock_slurmctld(fed_read_lock);
if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) {
unlock_slurmctld(fed_read_lock);
continue;
}
/* Look for work on each cluster */
cluster_iter = list_iterator_create(
fed_mgr_fed_rec->cluster_list);
while (!slurmctld_config.shutdown_time &&
(cluster = list_next(cluster_iter))) {
time_t now = time(NULL);
if ((cluster->send_rpc == NULL) ||
(list_count(cluster->send_rpc) == 0))
continue;
/* Move currently pending RPCs to new list */
ctld_req_msg.my_list = NULL;
rpc_iter = list_iterator_create(cluster->send_rpc);
while ((rpc_rec = list_next(rpc_iter))) {
if ((rpc_rec->last_try + rpc_rec->last_defer) >=
now)
continue;
if (!ctld_req_msg.my_list)
ctld_req_msg.my_list =list_create(NULL);
list_append(ctld_req_msg.my_list,
rpc_rec->buffer);
rpc_rec->last_try = now;
if (rpc_rec->last_defer == 128) {
info("%s: %s JobId=%u request to cluster %s is repeatedly failing",
__func__,
rpc_num2string(rpc_rec->msg_type),
rpc_rec->job_id, cluster->name);
rpc_rec->last_defer *= 2;
} else if (rpc_rec->last_defer)
rpc_rec->last_defer *= 2;
else
rpc_rec->last_defer = 2;
}
list_iterator_destroy(rpc_iter);
if (!ctld_req_msg.my_list)
continue;
/* Build, pack and send the combined RPC */
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_CTLD_MULT_MSG;
req_msg.data = &ctld_req_msg;
rc = _send_recv_msg(cluster, &req_msg, &resp_msg,
false);
/* Process the response */
if ((rc == SLURM_SUCCESS) &&
(resp_msg.msg_type == RESPONSE_CTLD_MULT_MSG)) {
/* Remove successfully processed RPCs */
resp_inx = 0;
success_bits = _parse_resp_ctld_mult(&resp_msg);
success_size = bit_size(success_bits);
rpc_iter = list_iterator_create(cluster->
send_rpc);
while ((rpc_rec = list_next(rpc_iter))) {
if (rpc_rec->last_try != now)
continue;
if (resp_inx >= success_size) {
error("%s: bitmap too small (%d >= %d)",
__func__, resp_inx,
success_size);
break;
}
if (bit_test(success_bits, resp_inx++))
list_delete_item(rpc_iter);
}
list_iterator_destroy(rpc_iter);
FREE_NULL_BITMAP(success_bits);
} else {
/* Failed to process combined RPC.
* Leave all RPCs on the queue. */
if (rc != SLURM_SUCCESS) {
if (_comm_fail_log(cluster)) {
error("%s: Failed to send RPC: %s",
__func__,
slurm_strerror(rc));
} else {
debug("%s: Failed to send RPC: %s",
__func__,
slurm_strerror(rc));
}
} else if (resp_msg.msg_type ==
PERSIST_RC) {
persist_rc_msg_t *msg;
char *err_str;
msg = resp_msg.data;
if (msg->comment)
err_str = msg->comment;
else
err_str=slurm_strerror(msg->rc);
error("%s: failed to process msg: %s",
__func__, err_str);
} else if (resp_msg.msg_type ==
RESPONSE_SLURM_RC) {
rc = slurm_get_return_code(
resp_msg.msg_type,
resp_msg.data);
error("%s: failed to process msg: %s",
__func__, slurm_strerror(rc));
} else {
error("%s: Invalid response msg_type: %u",
__func__, resp_msg.msg_type);
}
}
(void) slurm_free_msg_data(resp_msg.msg_type,
resp_msg.data);
FREE_NULL_LIST(ctld_req_msg.my_list);
}
list_iterator_destroy(cluster_iter);
unlock_slurmctld(fed_read_lock);
}
/* Log the abandoned RPCs */
lock_slurmctld(fed_read_lock);
if (!fed_mgr_fed_rec)
goto end_it;
cluster_iter = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((cluster = list_next(cluster_iter))) {
if (cluster->send_rpc == NULL)
continue;
rpc_iter = list_iterator_create(cluster->send_rpc);
while ((rpc_rec = list_next(rpc_iter))) {
info("%s: %s JobId=%u request to cluster %s aborted",
__func__, rpc_num2string(rpc_rec->msg_type),
rpc_rec->job_id, cluster->name);
list_delete_item(rpc_iter);
}
list_iterator_destroy(rpc_iter);
FREE_NULL_LIST(cluster->send_rpc);
}
list_iterator_destroy(cluster_iter);
end_it:
unlock_slurmctld(fed_read_lock);
return NULL;
}
static void _spawn_threads(void)
{
slurm_mutex_lock(&agent_mutex);
slurm_thread_create(&agent_thread_id, _agent_thread, NULL);
slurm_mutex_unlock(&agent_mutex);
slurm_mutex_lock(&job_update_mutex);
slurm_thread_create(&fed_job_update_thread_id,
_fed_job_update_thread, NULL);
slurm_mutex_unlock(&job_update_mutex);
slurm_mutex_lock(&remote_dep_recv_mutex);
slurm_thread_create(&remote_dep_thread_id,
_remote_dep_recv_thread, NULL);
slurm_mutex_unlock(&remote_dep_recv_mutex);
slurm_mutex_lock(&test_dep_mutex);
slurm_thread_create(&dep_job_thread_id, _test_dep_job_thread, NULL);
slurm_mutex_unlock(&test_dep_mutex);
slurm_mutex_lock(&origin_dep_update_mutex);
slurm_thread_create(&origin_dep_thread_id, _origin_dep_update_thread,
NULL);
slurm_mutex_unlock(&origin_dep_update_mutex);
}
static void _add_missing_fed_job_info()
{
job_record_t *job_ptr;
list_itr_t *job_itr;
slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
/* Sanity check and add any missing job_info structs */
lock_slurmctld(job_read_lock);
job_itr = list_iterator_create(job_list);
while ((job_ptr = list_next(job_itr))) {
uint32_t origin_id;
if (!_is_fed_job(job_ptr, &origin_id))
continue;
if (!_find_fed_job_info(job_ptr->job_id)) {
info("adding missing fed_job_info for job %pJ",
job_ptr);
add_fed_job_info(job_ptr);
}
}
list_iterator_destroy(job_itr);
unlock_slurmctld(job_read_lock);
}
extern int fed_mgr_init(void *db_conn)
{
int rc = SLURM_SUCCESS;
uint64_t tmp = 0;
slurmdb_federation_cond_t fed_cond;
list_t *fed_list = NULL;
slurmdb_federation_rec_t *fed = NULL, *state_fed = NULL;
slurmdb_cluster_rec_t *state_cluster = NULL;
slurm_mutex_lock(&init_mutex);
if (inited) {
slurm_mutex_unlock(&init_mutex);
return SLURM_SUCCESS;
}
if (!slurm_with_slurmdbd())
goto end_it;
slurm_mutex_lock(&fed_job_list_mutex);
if (!fed_job_list)
fed_job_list = list_create(xfree_ptr);
slurm_mutex_unlock(&fed_job_list_mutex);
/*
* fed_job_update_list should only be appended to and popped from.
* So rely on the list's lock. If there are ever changes to iterate the
* list, then a lock will be needed around the list.
*/
if (!fed_job_update_list)
fed_job_update_list = list_create(_destroy_fed_job_update_info);
/*
* remote_dep_recv_list should only be appended to and popped from.
* So rely on the list's lock. If there are ever changes to iterate the
* list, then a lock will be needed around the list.
*/
if (!remote_dep_recv_list)
remote_dep_recv_list = list_create(_destroy_dep_msg);
/*
* origin_dep_update_list should only be read from or modified with
* list_* functions (such as append, pop, count).
* So rely on the list's lock. If there are ever changes to iterate the
* list, then a lock will be needed around the list.
*/
if (!origin_dep_update_list)
origin_dep_update_list = list_create(_destroy_dep_update_msg);
slurm_mutex_lock(&dep_job_list_mutex);
if (!remote_dep_job_list)
remote_dep_job_list = list_create(_destroy_dep_job);
slurm_mutex_unlock(&dep_job_list_mutex);
_spawn_threads();
if (running_cache) {
debug("Database appears down, reading federations from state file.");
fed = _state_load(slurm_conf.state_save_location);
if (!fed) {
debug2("No federation state");
rc = SLURM_SUCCESS;
goto end_it;
}
} else {
state_fed = _state_load(slurm_conf.state_save_location);
if (state_fed)
state_cluster = list_find_first(
state_fed->cluster_list,
slurmdb_find_cluster_in_list,
slurm_conf.cluster_name);
slurmdb_init_federation_cond(&fed_cond, 0);
fed_cond.cluster_list = list_create(NULL);
list_append(fed_cond.cluster_list, slurm_conf.cluster_name);
fed_list = acct_storage_g_get_federations(
db_conn, slurm_conf.slurm_user_id, &fed_cond);
FREE_NULL_LIST(fed_cond.cluster_list);
if (!fed_list) {
error("failed to get a federation list");
rc = SLURM_ERROR;
goto end_it;
}
if (list_count(fed_list) == 1)
fed = list_pop(fed_list);
else if (list_count(fed_list) > 1) {
error("got more federations than expected");
rc = SLURM_ERROR;
}
FREE_NULL_LIST(fed_list);
}
if (fed) {
slurmdb_cluster_rec_t *cluster = NULL;
slurmctld_lock_t fedr_jobw_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
if ((cluster = list_find_first(fed->cluster_list,
slurmdb_find_cluster_in_list,
slurm_conf.cluster_name))) {
job_record_t *job_ptr;
list_itr_t *itr;
_join_federation(fed, cluster, &tmp);
/* Find clusters that were removed from the federation
* since the last time we got an update */
lock_slurmctld(fedr_jobw_lock);
if (state_fed && state_cluster && fed_mgr_fed_rec)
_handle_removed_clusters(state_fed, &tmp);
/* Send remote dependencies to siblings. */
itr = list_iterator_create(job_list);
while ((job_ptr = list_next(itr))) {
if (job_ptr->details &&
job_ptr->details->dependency &&
list_count(job_ptr->details->depend_list) &&
fed_mgr_submit_remote_dependencies(job_ptr,
false,
false))
error("%s: Failed to send %pJ dependencies to some or all siblings",
__func__, job_ptr);
}
list_iterator_destroy(itr);
unlock_slurmctld(fedr_jobw_lock);
} else {
slurmdb_destroy_federation_rec(fed);
error("failed to get cluster from federation that we requested");
rc = SLURM_ERROR;
}
} else if (state_fed && state_cluster) {
/* cluster has been removed from federation while it was down.
* Need to clear up jobs */
slurmctld_lock_t fedw_jobw_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
info("self was removed from federation since last start");
lock_slurmctld(fedw_jobw_lock);
fed_mgr_cluster_rec = state_cluster;
_cleanup_removed_origin_jobs();
fed_mgr_cluster_rec = NULL;
unlock_slurmctld(fedw_jobw_lock);
}
slurmdb_destroy_federation_rec(state_fed);
end_it:
/* Call whether state file existed or not. */
_add_missing_fed_job_info();
inited = true;
slurm_mutex_unlock(&init_mutex);
return rc;
}
extern int fed_mgr_fini(void)
{
slurmctld_lock_t fed_write_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
slurm_mutex_lock(&init_mutex);
inited = false;
slurm_mutex_unlock(&init_mutex);
lock_slurmctld(fed_write_lock);
/* Call _leave_federation() before slurm_persist_conn_recv_server_fini()
* as this will NULL out the cluster's recv persistent connection before
* _server_fini() actually destroy's it. That way the cluster's recv
* connection won't be pointing to bad memory. */
_leave_federation();
unlock_slurmctld(fed_write_lock);
/* Signal threads to end */
slurm_cond_signal(&agent_cond);
slurm_cond_signal(&job_update_cond);
slurm_cond_signal(&remote_dep_cond);
slurm_cond_signal(&test_dep_cond);
slurm_cond_signal(&origin_dep_cond);
/* _job_watch_thread signaled by _leave_federation() */
slurm_thread_join(agent_thread_id);
slurm_thread_join(fed_job_update_thread_id);
slurm_thread_join(remote_dep_thread_id);
slurm_thread_join(dep_job_thread_id);
slurm_thread_join(origin_dep_thread_id);
slurm_mutex_lock(&fed_job_list_mutex);
FREE_NULL_LIST(fed_job_list);
slurm_mutex_unlock(&fed_job_list_mutex);
FREE_NULL_LIST(fed_job_update_list);
return SLURM_SUCCESS;
}
static void _handle_dependencies_for_modified_fed(uint64_t added_clusters,
uint64_t removed_clusters)
{
uint32_t origin_id;
job_record_t *job_ptr;
list_itr_t *itr;
depend_spec_t find_dep = { 0 };
xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
xassert(verify_lock(FED_LOCK, READ_LOCK));
if (!fed_mgr_cluster_rec)
return;
find_dep.depend_type = SLURM_DEPEND_SINGLETON;
itr = list_iterator_create(job_list);
while ((job_ptr = list_next(itr))) {
if (added_clusters && IS_JOB_PENDING(job_ptr) &&
_is_fed_job(job_ptr, &origin_id) &&
find_dependency(job_ptr, &find_dep))
fed_mgr_submit_remote_dependencies(job_ptr, true,
false);
/*
* Make sure any remote dependencies are immediately
* marked as invalid.
*/
if (removed_clusters)
test_job_dependency(job_ptr, NULL);
}
list_iterator_destroy(itr);
}
extern int fed_mgr_update_feds(slurmdb_update_object_t *update)
{
uint64_t added_clusters = 0, removed_clusters = 0;
list_t *feds = NULL;
slurmdb_federation_rec_t *fed = NULL;
slurmdb_cluster_rec_t *cluster = NULL;
slurmctld_lock_t fedr_jobw_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
slurmctld_lock_t fedw_jobw_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
if (!update->objects)
return SLURM_SUCCESS;
slurm_mutex_lock(&init_mutex);
if (!inited) {
slurm_mutex_unlock(&init_mutex);
return SLURM_SUCCESS; /* we haven't started the fed mgr and we
* can't start it from here, don't worry
* all will get set up later. */
}
slurm_mutex_unlock(&init_mutex);
/* we only want one update happening at a time. */
slurm_mutex_lock(&update_mutex);
log_flag(FEDR, "Got a federation update");
feds = update->objects;
/* find the federation that this cluster is in.
* if it's changed from last time then update stored information.
* grab other clusters in federation
* establish connections with each cluster in federation */
/* what if a remote cluster is removed from federation.
* have to detect that and close the connection to the remote */
while ((fed = list_pop(feds))) {
if (fed->cluster_list &&
(cluster = list_find_first(fed->cluster_list,
slurmdb_find_cluster_in_list,
slurm_conf.cluster_name))) {
/* Find clusters that were removed from the federation
* since the last time we got an update */
lock_slurmctld(fedr_jobw_lock);
if (fed_mgr_fed_rec)
_handle_removed_clusters(fed,
&removed_clusters);
unlock_slurmctld(fedr_jobw_lock);
_join_federation(fed, cluster, &added_clusters);
if (added_clusters || removed_clusters) {
lock_slurmctld(fedr_jobw_lock);
log_flag(DEPENDENCY, "%s: Cluster(s) added: 0x%"PRIx64"; removed: 0x%"PRIx64,
__func__, added_clusters,
removed_clusters);
_handle_dependencies_for_modified_fed(
added_clusters,
removed_clusters);
unlock_slurmctld(fedr_jobw_lock);
}
break;
}
slurmdb_destroy_federation_rec(fed);
}
if (!fed && fed_mgr_fed_rec) {
log_flag(FEDR, "Not part of any federation");
lock_slurmctld(fedw_jobw_lock);
_cleanup_removed_origin_jobs();
_leave_federation();
unlock_slurmctld(fedw_jobw_lock);
}
slurm_mutex_unlock(&update_mutex);
return SLURM_SUCCESS;
}
static void _pack_fed_job_info(fed_job_info_t *job_info, buf_t *buffer,
uint16_t protocol_version)
{
int i;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
pack32(job_info->cluster_lock, buffer);
pack32(job_info->job_id, buffer);
pack64(job_info->siblings_active, buffer);
pack64(job_info->siblings_viable, buffer);
for (i = 0; i <= MAX_FED_CLUSTERS; i++)
pack32(job_info->updating_sibs[i], buffer);
for (i = 0; i <= MAX_FED_CLUSTERS; i++)
pack_time(job_info->updating_time[i], buffer);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
}
}
static int _unpack_fed_job_info(fed_job_info_t **job_info_pptr, buf_t *buffer,
uint16_t protocol_version)
{
int i;
fed_job_info_t *job_info = xmalloc(sizeof(fed_job_info_t));
*job_info_pptr = job_info;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpack32(&job_info->cluster_lock, buffer);
safe_unpack32(&job_info->job_id, buffer);
safe_unpack64(&job_info->siblings_active, buffer);
safe_unpack64(&job_info->siblings_viable, buffer);
for (i = 0; i <= MAX_FED_CLUSTERS; i++)
safe_unpack32(&job_info->updating_sibs[i], buffer);
for (i = 0; i <= MAX_FED_CLUSTERS; i++)
safe_unpack_time(&job_info->updating_time[i], buffer);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
goto unpack_error;
}
return SLURM_SUCCESS;
unpack_error:
xfree(job_info);
*job_info_pptr = NULL;
return SLURM_ERROR;
}
static void _dump_fed_job_list(buf_t *buffer, uint16_t protocol_version)
{
uint32_t count = NO_VAL;
fed_job_info_t *fed_job_info;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
/*
* Need to be in the lock to prevent the window between getting
* the count and actually looping on the list.
*/
slurm_mutex_lock(&fed_job_list_mutex);
if (fed_job_list)
count = list_count(fed_job_list);
else
count = NO_VAL;
pack32(count, buffer);
if (count && (count != NO_VAL)) {
list_itr_t *itr = list_iterator_create(fed_job_list);
while ((fed_job_info = list_next(itr))) {
_pack_fed_job_info(fed_job_info, buffer,
protocol_version);
}
list_iterator_destroy(itr);
}
slurm_mutex_unlock(&fed_job_list_mutex);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
}
}
static list_t *_load_fed_job_list(buf_t *buffer, uint16_t protocol_version)
{
int i;
uint32_t count;
fed_job_info_t *tmp_job_info = NULL;
list_t *tmp_list = NULL;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpack32(&count, buffer);
if (count > NO_VAL)
goto unpack_error;
if (count != NO_VAL) {
tmp_list = list_create(xfree_ptr);
for (i = 0; i < count; i++) {
if (_unpack_fed_job_info(&tmp_job_info, buffer,
protocol_version))
goto unpack_error;
list_append(tmp_list, tmp_job_info);
}
}
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
}
return tmp_list;
unpack_error:
FREE_NULL_LIST(tmp_list);
return NULL;
}
/*
* If this changes, then _pack_dep_msg() in slurm_protocol_pack.c probably
* needs to change.
*/
static void _pack_remote_dep_job(job_record_t *job_ptr, buf_t *buffer,
uint16_t protocol_version)
{
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
pack32(job_ptr->array_job_id, buffer);
pack32(job_ptr->array_task_id, buffer);
pack_dep_list(job_ptr->details->depend_list, buffer,
protocol_version);
packstr(job_ptr->details->dependency, buffer);
packbool(job_ptr->array_recs ? true : false, buffer);
pack32(job_ptr->job_id, buffer);
packstr(job_ptr->name, buffer);
pack32(job_ptr->user_id, buffer);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
}
}
/*
* If this changes, then _unpack_dep_msg() in slurm_protocol_pack.c probably
* needs to change.
*/
static int _unpack_remote_dep_job(job_record_t **job_pptr, buf_t *buffer,
uint16_t protocol_version)
{
bool is_array;
job_record_t *job_ptr;
xassert(job_pptr);
job_ptr = xmalloc(sizeof *job_ptr);
job_ptr->magic = JOB_MAGIC;
job_ptr->details = xmalloc(sizeof *(job_ptr->details));
job_ptr->details->magic = DETAILS_MAGIC;
job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
*job_pptr = job_ptr;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpack32(&job_ptr->array_job_id, buffer);
safe_unpack32(&job_ptr->array_task_id, buffer);
unpack_dep_list(&job_ptr->details->depend_list, buffer,
protocol_version);
safe_unpackstr(&job_ptr->details->dependency, buffer);
safe_unpackbool(&is_array, buffer);
if (is_array)
job_ptr->array_recs =
xmalloc(sizeof *(job_ptr->array_recs));
safe_unpack32(&job_ptr->job_id, buffer);
safe_unpackstr(&job_ptr->name, buffer);
safe_unpack32(&job_ptr->user_id, buffer);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
goto unpack_error;
}
return SLURM_SUCCESS;
unpack_error:
_destroy_dep_job(job_ptr);
*job_pptr = NULL;
return SLURM_ERROR;
}
static void _dump_remote_dep_job_list(buf_t *buffer, uint16_t protocol_version)
{
uint32_t count = NO_VAL;
job_record_t *job_ptr;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
slurm_mutex_lock(&dep_job_list_mutex);
if (remote_dep_job_list)
count = list_count(remote_dep_job_list);
else
count = NO_VAL;
pack32(count, buffer);
if (count && (count != NO_VAL)) {
list_itr_t *itr =
list_iterator_create(remote_dep_job_list);
while ((job_ptr = list_next(itr)))
_pack_remote_dep_job(job_ptr, buffer,
protocol_version);
list_iterator_destroy(itr);
}
slurm_mutex_unlock(&dep_job_list_mutex);
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
}
}
static list_t *_load_remote_dep_job_list(buf_t *buffer,
uint16_t protocol_version)
{
uint32_t count, i;
list_t *tmp_list = NULL;
job_record_t *job_ptr = NULL;
if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
safe_unpack32(&count, buffer);
if (count > NO_VAL)
goto unpack_error;
if (count != NO_VAL) {
tmp_list = list_create(_destroy_dep_job);
for (i = 0; i < count; i++) {
if (_unpack_remote_dep_job(&job_ptr, buffer,
protocol_version))
goto unpack_error;
list_append(tmp_list, job_ptr);
}
}
} else {
error("%s: protocol_version %hu not supported.",
__func__, protocol_version);
goto unpack_error;
}
return tmp_list;
unpack_error:
FREE_NULL_LIST(tmp_list);
return NULL;
}
extern int fed_mgr_state_save(void)
{
int error_code = 0;
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
buf_t *buffer = init_buf(0);
DEF_TIMERS;
START_TIMER;
/* write header: version, time */
pack16(SLURM_PROTOCOL_VERSION, buffer);
pack_time(time(NULL), buffer);
lock_slurmctld(fed_read_lock);
slurmdb_pack_federation_rec(fed_mgr_fed_rec, SLURM_PROTOCOL_VERSION,
buffer);
unlock_slurmctld(fed_read_lock);
_dump_fed_job_list(buffer, SLURM_PROTOCOL_VERSION);
_dump_remote_dep_job_list(buffer, SLURM_PROTOCOL_VERSION);
error_code = save_buf_to_state(FED_MGR_STATE_FILE, buffer, NULL);
FREE_NULL_BUFFER(buffer);
END_TIMER2(__func__);
return error_code;
}
static slurmdb_federation_rec_t *_state_load(char *state_save_location)
{
buf_t *buffer = NULL;
char *state_file;
time_t buf_time;
uint16_t ver = 0;
int error_code = SLURM_SUCCESS;
slurmdb_federation_rec_t *ret_fed = NULL;
list_t *tmp_list = NULL;
slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
if (!(buffer = state_save_open(FED_MGR_STATE_FILE, &state_file))) {
if ((clustername_existed == 1) && (!ignore_state_errors))
fatal("No fed_mgr state file (%s) to recover",
state_file);
info("No fed_mgr state file (%s) to recover", state_file);
xfree(state_file);
return NULL;
}
xfree(state_file);
safe_unpack16(&ver, buffer);
debug3("Version in fed_mgr_state header is %u", ver);
if (ver > SLURM_PROTOCOL_VERSION || ver < SLURM_MIN_PROTOCOL_VERSION) {
if (!ignore_state_errors)
fatal("Can not recover fed_mgr state, incompatible version, got %u need > %u <= %u, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.",
ver, SLURM_MIN_PROTOCOL_VERSION,
SLURM_PROTOCOL_VERSION);
error("***********************************************");
error("Can not recover fed_mgr state, incompatible version, "
"got %u need > %u <= %u", ver,
SLURM_MIN_PROTOCOL_VERSION, SLURM_PROTOCOL_VERSION);
error("***********************************************");
FREE_NULL_BUFFER(buffer);
return NULL;
}
safe_unpack_time(&buf_time, buffer);
error_code = slurmdb_unpack_federation_rec((void **)&ret_fed, ver,
buffer);
if (error_code != SLURM_SUCCESS)
goto unpack_error;
else if (!ret_fed || !ret_fed->name ||
!list_count(ret_fed->cluster_list)) {
slurmdb_destroy_federation_rec(ret_fed);
ret_fed = NULL;
debug("No feds to retrieve from state");
} else {
/* We want to free the connections here since they don't exist
* anymore, but they were packed when state was saved. */
slurmdb_cluster_rec_t *cluster;
list_itr_t *itr = list_iterator_create(
ret_fed->cluster_list);
while ((cluster = list_next(itr))) {
slurm_persist_conn_destroy(cluster->fed.recv);
cluster->fed.recv = NULL;
slurm_persist_conn_destroy(cluster->fed.send);
cluster->fed.send = NULL;
}
list_iterator_destroy(itr);
}
/* Load in fed_job_list and transfer objects to actual fed_job_list only
* if there is an actual job for the job */
if ((tmp_list = _load_fed_job_list(buffer, ver))) {
fed_job_info_t *tmp_info;
slurm_mutex_lock(&fed_job_list_mutex);
if (fed_job_list) {
lock_slurmctld(job_read_lock);
while ((tmp_info = list_pop(tmp_list))) {
if (find_job_record(tmp_info->job_id))
list_append(fed_job_list, tmp_info);
else
xfree(tmp_info);
}
unlock_slurmctld(job_read_lock);
}
slurm_mutex_unlock(&fed_job_list_mutex);
}
FREE_NULL_LIST(tmp_list);
/*
* Load in remote_dep_job_list and transfer to actual
* remote_dep_job_list. If the actual list already has that job,
* just throw away this one.
*/
if ((tmp_list = _load_remote_dep_job_list(buffer, ver))) {
job_record_t *tmp_dep_job;
slurm_mutex_lock(&dep_job_list_mutex);
while ((tmp_dep_job = list_pop(tmp_list))) {
if (!remote_dep_job_list)
remote_dep_job_list =
list_create(_destroy_dep_job);
if (!list_find_first(remote_dep_job_list,
_find_job_by_id,
&tmp_dep_job->job_id) &&
tmp_dep_job->details->dependency) {
list_append(remote_dep_job_list, tmp_dep_job);
} /* else it will get free'd with FREE_NULL_LIST */
}
slurm_mutex_unlock(&dep_job_list_mutex);
}
FREE_NULL_LIST(tmp_list);
FREE_NULL_BUFFER(buffer);
return ret_fed;
unpack_error:
if (!ignore_state_errors)
fatal("Incomplete fed_mgr state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
error("Incomplete fed_mgr state file");
FREE_NULL_BUFFER(buffer);
return NULL;
}
/*
* Returns federated job id (<local id> + <cluster id>.
* Bits 0-25: Local job id
* Bits 26-31: Cluster id
*/
extern uint32_t fed_mgr_get_job_id(uint32_t orig)
{
if (!fed_mgr_cluster_rec)
return orig;
return orig + (fed_mgr_cluster_rec->fed.id << FED_MGR_CLUSTER_ID_BEGIN);
}
/*
* Returns the local job id from a federated job id.
*/
extern uint32_t fed_mgr_get_local_id(uint32_t id)
{
return id & MAX_JOB_ID;
}
/*
* Returns the cluster id from a federated job id.
*/
extern uint32_t fed_mgr_get_cluster_id(uint32_t id)
{
return id >> FED_MGR_CLUSTER_ID_BEGIN;
}
extern int fed_mgr_add_sibling_conn(persist_conn_t *persist_conn,
char **out_buffer)
{
slurmdb_cluster_rec_t *cluster = NULL;
slurmctld_lock_t fed_read_lock = {
NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
lock_slurmctld(fed_read_lock);
if (!fed_mgr_fed_rec) {
unlock_slurmctld(fed_read_lock);
*out_buffer = xstrdup_printf(
"no fed_mgr_fed_rec on cluster %s yet.",
slurm_conf.cluster_name);
/* This really isn't an error. If the cluster doesn't know it
* is in a federation this could happen on the initial
* connection from a sibling that found out about the addition
* before I did.
*/
debug("%s: %s", __func__, *out_buffer);
/* The other side needs to see this as an error though or the
* connection won't be completely established.
*/
return SLURM_ERROR;
}
if (!fed_mgr_cluster_rec) {
unlock_slurmctld(fed_read_lock);
*out_buffer = xstrdup_printf(
"no fed_mgr_cluster_rec on cluster %s? "
"This should never happen",
slurm_conf.cluster_name);
error("%s: %s", __func__, *out_buffer);
return SLURM_ERROR;
}
if (!(cluster =
fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
unlock_slurmctld(fed_read_lock);
*out_buffer = xstrdup_printf(
"%s isn't a known sibling of ours, but tried to connect to cluster %s federation %s",
persist_conn->cluster_name, slurm_conf.cluster_name,
fed_mgr_fed_rec->name);
error("%s: %s", __func__, *out_buffer);
return SLURM_ERROR;
}
persist_conn->callback_fini = _persist_callback_fini;
persist_conn->flags |= PERSIST_FLAG_ALREADY_INITED;
/* If this pointer exists it will be handled by the persist_conn code,
* don't free
*/
//slurm_persist_conn_destroy(cluster->fed.recv);
/* Preserve the persist_conn so that the cluster can get the remote
* side's hostname and port to talk back to if it doesn't have it yet.
* See _open_controller_conn().
* Don't lock the cluster's lock here because a (almost)deadlock
* could occur if this cluster is opening a connection to the remote
* cluster at the same time the remote cluster is connecting to this
* cluster since the both sides will have the mutex locked in order to
* send/recv. If it did happen the connection will eventually
* timeout and resolved itself. */
cluster->fed.recv = persist_conn;
slurm_persist_conn_recv_thread_init(persist_conn,
conn_g_get_fd(persist_conn
->tls_conn),
-1, persist_conn);
_q_send_job_sync(cluster->name);
unlock_slurmctld(fed_read_lock);
return SLURM_SUCCESS;
}
/*
* Convert comma separated list of cluster names to bitmap of cluster ids.
*/
static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap)
{
int rc = SLURM_SUCCESS;
uint64_t cluster_ids = 0;
list_t *cluster_names = NULL;
xassert(clusters);
if (!xstrcasecmp(clusters, "all") ||
(clusters && (*clusters == '\0'))) {
cluster_ids = _get_all_sibling_bits();
goto end_it;
}
cluster_names = list_create(xfree_ptr);
if (slurm_addto_char_list(cluster_names, clusters)) {
list_itr_t *itr = list_iterator_create(cluster_names);
char *cluster_name;
slurmdb_cluster_rec_t *sibling;
while ((cluster_name = list_next(itr))) {
if (!(sibling =
fed_mgr_get_cluster_by_name(cluster_name))) {
error("didn't find requested cluster name %s in list of federated clusters",
cluster_name);
rc = SLURM_ERROR;
break;
}
cluster_ids |= FED_SIBLING_BIT(sibling->fed.id);
}
list_iterator_destroy(itr);
}
FREE_NULL_LIST(cluster_names);
end_it:
if (cluster_bitmap)
*cluster_bitmap = cluster_ids;
return rc;
}
/* Update remote sibling job's viable_siblings bitmaps.
*
* IN job_id - job_id of job to update.
* IN job_desc - job_desc to update job_id with.
* IN viable_sibs - viable siblings bitmap to send to sibling jobs.
* IN update_sibs - bitmap of siblings to update.
*/
extern int fed_mgr_update_job(uint32_t job_id, job_desc_msg_t *job_desc,
uint64_t update_sibs, uid_t uid)
{
list_itr_t *sib_itr;
slurmdb_cluster_rec_t *sibling;
fed_job_info_t *job_info;
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info = _find_fed_job_info(job_id))) {
error("Didn't find JobId=%u in fed_job_list", job_id);
slurm_mutex_unlock(&fed_job_list_mutex);
return SLURM_ERROR;
}
sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((sibling = list_next(sib_itr))) {
/* Local is handled outside */
if (sibling == fed_mgr_cluster_rec)
continue;
if (!(update_sibs & FED_SIBLING_BIT(sibling->fed.id)))
continue;
if (_persist_update_job(sibling, job_id, job_desc, uid)) {
error("failed to update sibling job on sibling %s",
sibling->name);
continue;
}
job_info->updating_sibs[sibling->fed.id]++;
job_info->updating_time[sibling->fed.id] = time(NULL);
}
list_iterator_destroy(sib_itr);
slurm_mutex_unlock(&fed_job_list_mutex);
return SLURM_SUCCESS;
}
/*
* Submit sibling jobs to designated siblings.
*
* Will update job_desc->fed_siblings_active with the successful submissions.
* Will not send to siblings if they are in
* job_desc->fed_details->siblings_active.
*
* IN job_desc - job_desc containing job_id and fed_siblings_viable of job to be
* submitted.
* IN msg - contains the original job_desc buffer to send to the siblings.
* IN alloc_only - true if just an allocation. false if a batch job.
* IN dest_sibs - bitmap of viable siblings to submit to.
* RET returns SLURM_SUCCESS if all siblings received the job successfully or
* SLURM_ERROR if any siblings failed to receive the job. If a sibling
* fails, then the successful siblings will be updated with the correct
* sibling bitmap.
*/
static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg,
bool alloc_only, uint64_t dest_sibs,
uint16_t start_protocol_version)
{
int ret_rc = SLURM_SUCCESS;
list_itr_t *sib_itr;
sib_msg_t sib_msg = {0};
slurmdb_cluster_rec_t *sibling = NULL;
slurm_msg_t req_msg;
uint16_t last_rpc_version = NO_VAL16;
buf_t *buffer = NULL;
xassert(job_desc);
xassert(msg);
sib_msg.data_type = msg->msg_type;
sib_msg.fed_siblings = job_desc->fed_siblings_viable;
sib_msg.group_id = job_desc->group_id;
sib_msg.job_id = job_desc->job_id;
sib_msg.resp_host = job_desc->resp_host;
sib_msg.submit_host = job_desc->alloc_node;
sib_msg.user_id = job_desc->user_id;
sib_msg.submit_proto_ver = start_protocol_version;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SIB_MSG;
req_msg.data = &sib_msg;
sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((sibling = list_next(sib_itr))) {
int rc;
if (sibling == fed_mgr_cluster_rec)
continue;
/* Only send to specific siblings */
if (!(dest_sibs & FED_SIBLING_BIT(sibling->fed.id)))
continue;
/* skip sibling if the sibling already has a job */
if (job_desc->fed_siblings_active &
FED_SIBLING_BIT(sibling->fed.id))
continue;
if (alloc_only)
sib_msg.sib_msg_type = FED_JOB_SUBMIT_INT;
else
sib_msg.sib_msg_type = FED_JOB_SUBMIT_BATCH;
/* Pack message buffer according to sibling's rpc version. A
* submission from a client will already have a buffer with the
* packed job_desc from the client. If this controller is
* submitting new sibling jobs then the buffer needs to be
* packed according to each siblings rpc_version. */
if (!msg->buffer &&
(last_rpc_version != sibling->rpc_version)) {
FREE_NULL_BUFFER(buffer);
msg->protocol_version = sibling->rpc_version;
buffer = init_buf(BUF_SIZE);
pack_msg(msg, buffer);
sib_msg.data_buffer = buffer;
sib_msg.data_version = msg->protocol_version;
last_rpc_version = sibling->rpc_version;
}
/*
* We have a buffer which means, we are submitting new sibling
* jobs from a client submission. If the sibling is the same or
* higher protocol version as the client we just send the packed
* buffer from the client, otherwise we need to get an
* unmmodified job_desc_t by unpacking the client's job_desc_msg
* and repack it at the sibling's version.
*/
if (msg->buffer) {
if (sibling->rpc_version >= msg->protocol_version) {
sib_msg.data_buffer = msg->buffer;
sib_msg.data_offset = msg->body_offset;
sib_msg.data_version = msg->protocol_version;
/* Don't pack buffer again unless the version changed */
} else if (last_rpc_version != sibling->rpc_version) {
slurm_msg_t tmp_msg;
slurm_msg_t_init(&tmp_msg);
tmp_msg.msg_type = msg->msg_type;
tmp_msg.protocol_version =
msg->protocol_version;
set_buf_offset(msg->buffer, msg->body_offset);
unpack_msg(&tmp_msg, msg->buffer);
FREE_NULL_BUFFER(buffer);
tmp_msg.protocol_version = sibling->rpc_version;
buffer = init_buf(BUF_SIZE);
pack_msg(&tmp_msg, buffer);
sib_msg.data_buffer = buffer;
sib_msg.data_offset = 0;
sib_msg.data_version = msg->protocol_version;
last_rpc_version = sibling->rpc_version;
}
}
req_msg.protocol_version = sibling->rpc_version;
if (!(rc = _queue_rpc(sibling, &req_msg, 0, false)))
job_desc->fed_siblings_active |=
FED_SIBLING_BIT(sibling->fed.id);
ret_rc |= rc;
}
list_iterator_destroy(sib_itr);
FREE_NULL_BUFFER(buffer);
return ret_rc;
}
/*
* Prepare and submit new sibling jobs built from an existing job.
*
* IN job_ptr - job to submit to remote siblings.
* IN dest_sibs - bitmap of viable siblings to submit to.
*/
static int _prepare_submit_siblings(job_record_t *job_ptr, uint64_t dest_sibs)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id;
job_desc_msg_t *job_desc;
slurm_msg_t msg;
xassert(job_ptr);
xassert(job_ptr->details);
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
log_flag(FEDR, "submitting new siblings for %pJ", job_ptr);
if (!(job_desc = copy_job_record_to_job_desc(job_ptr)))
return SLURM_ERROR;
/*
* Since job_ptr could have had defaults filled on the origin cluster,
* clear these before sibling submission if default flag is set
*/
if (job_desc->bitflags & USE_DEFAULT_ACCT)
xfree(job_desc->account);
if (job_desc->bitflags & USE_DEFAULT_PART)
xfree(job_desc->partition);
if (job_desc->bitflags & USE_DEFAULT_QOS)
xfree(job_desc->qos);
if (job_desc->bitflags & USE_DEFAULT_WCKEY)
xfree(job_desc->wckey);
/* Have to pack job_desc into a buffer. _submit_sibling_jobs will pack
* the job_desc according to each sibling's rpc_version. */
slurm_msg_t_init(&msg);
msg.msg_type = REQUEST_RESOURCE_ALLOCATION;
msg.data = job_desc;
if (_submit_sibling_jobs(job_desc, &msg, false, dest_sibs,
job_ptr->start_protocol_ver))
error("Failed to submit fed job to siblings");
/* mark this cluster as an active sibling */
if (job_desc->fed_siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
job_desc->fed_siblings_active |=
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
/* Add new active jobs to siblings_active bitmap */
job_ptr->fed_details->siblings_active |= job_desc->fed_siblings_active;
update_job_fed_details(job_ptr);
/* free the environment since all strings are stored in one
* xmalloced buffer */
if (job_desc->environment) {
xfree(job_desc->environment[0]);
xfree(job_desc->environment);
job_desc->env_size = 0;
}
slurm_free_job_desc_msg(job_desc);
return rc;
}
static uint64_t _get_all_sibling_bits()
{
list_itr_t *itr;
slurmdb_cluster_rec_t *cluster;
uint64_t sib_bits = 0;
if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
goto fini;
itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((cluster = list_next(itr))) {
sib_bits |= FED_SIBLING_BIT(cluster->fed.id);
}
list_iterator_destroy(itr);
fini:
return sib_bits;
}
static int _remove_inactive_sibs(void *object, void *arg)
{
slurmdb_cluster_rec_t *sibling = (slurmdb_cluster_rec_t *)object;
uint64_t *viable_sibs = (uint64_t *)arg;
uint32_t cluster_state = sibling->fed.state;
int base_state = (cluster_state & CLUSTER_FED_STATE_BASE);
bool drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN);
if (drain_flag ||
(base_state == CLUSTER_FED_STATE_INACTIVE))
*viable_sibs &= ~(FED_SIBLING_BIT(sibling->fed.id));
return SLURM_SUCCESS;
}
static uint64_t _get_viable_sibs(char *req_clusters, uint64_t feature_sibs,
bool is_array_job, char **err_msg)
{
uint64_t viable_sibs = _get_all_sibling_bits();
if (req_clusters)
_validate_cluster_names(req_clusters, &viable_sibs);
if (feature_sibs)
viable_sibs &= feature_sibs;
/* filter out clusters that are inactive or draining */
list_for_each(fed_mgr_fed_rec->cluster_list, _remove_inactive_sibs,
&viable_sibs);
if (is_array_job) { /* lock array jobs to local cluster */
uint32_t tmp_viable = viable_sibs &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
if (viable_sibs && !tmp_viable) {
info("federated job arrays must run on local cluster");
if (err_msg) {
xfree(*err_msg);
xstrfmtcat(*err_msg, "federated job arrays must run on local cluster");
}
}
viable_sibs = tmp_viable;
}
return viable_sibs;
}
static void _add_remove_sibling_jobs(job_record_t *job_ptr)
{
fed_job_info_t *job_info;
uint32_t origin_id = 0;
uint64_t new_sibs = 0, old_sibs = 0, add_sibs = 0,
rem_sibs = 0, feature_sibs = 0;
xassert(job_ptr);
origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
/* if job is not pending then remove removed siblings and add
* new siblings. */
old_sibs = job_ptr->fed_details->siblings_active;
_validate_cluster_features(job_ptr->details->cluster_features,
&feature_sibs);
new_sibs = _get_viable_sibs(job_ptr->clusters, feature_sibs,
job_ptr->array_recs ? true : false, NULL);
job_ptr->fed_details->siblings_viable = new_sibs;
add_sibs = new_sibs & ~old_sibs;
rem_sibs = ~new_sibs & old_sibs;
if (rem_sibs) {
time_t now = time(NULL);
_revoke_sibling_jobs(job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
rem_sibs, now);
if (fed_mgr_is_origin_job(job_ptr) &&
(rem_sibs & FED_SIBLING_BIT(origin_id))) {
fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
now);
}
job_ptr->fed_details->siblings_active &= ~rem_sibs;
}
/* Don't submit new siblings if the job is held */
if (job_ptr->priority != 0 && add_sibs)
_prepare_submit_siblings(
job_ptr,
job_ptr->fed_details->siblings_viable);
/* unrevoke the origin job */
if (fed_mgr_is_origin_job(job_ptr) &&
(add_sibs & FED_SIBLING_BIT(origin_id)))
job_state_unset_flag(job_ptr, JOB_REVOKED);
/* Can't have the mutex while calling fed_mgr_job_revoke because it will
* lock the mutex as well. */
slurm_mutex_lock(&fed_job_list_mutex);
if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
job_info->siblings_viable =
job_ptr->fed_details->siblings_viable;
job_info->siblings_active =
job_ptr->fed_details->siblings_active;
}
slurm_mutex_unlock(&fed_job_list_mutex);
/* Update where sibling jobs are running */
update_job_fed_details(job_ptr);
}
static bool _job_has_pending_updates(fed_job_info_t *job_info)
{
int i;
xassert(job_info);
static const int UPDATE_DELAY = 60;
time_t now = time(NULL);
for (i = 1; i <= MAX_FED_CLUSTERS; i++) {
if (job_info->updating_sibs[i]) {
if (job_info->updating_time[i] > (now - UPDATE_DELAY)) {
log_flag(FEDR, "JobId=%u is waiting for %d update responses from cluster id %d",
job_info->job_id,
job_info->updating_sibs[i], i);
return true;
} else {
log_flag(FEDR, "JobId=%u is had pending updates (%d) for cluster id %d, but haven't heard back from it for %ld seconds. Clearing the cluster's updating state",
job_info->job_id,
job_info->updating_sibs[i], i,
now - job_info->updating_time[i]);
job_info->updating_sibs[i] = 0;
}
}
}
return false;
}
/*
* Validate requested job cluster features against each cluster's features.
*
* IN spec_features - cluster features that the job requested.
* OUT cluster_bitmap - bitmap of clusters that have matching features.
* RET SLURM_ERROR if no cluster has any of the requested features,
* SLURM_SUCCESS otherwise.
*/
static int _validate_cluster_features(char *spec_features,
uint64_t *cluster_bitmap)
{
int rc = SLURM_SUCCESS;
bool negative_logic = false;
uint64_t feature_sibs = 0;
char *feature = NULL;
slurmdb_cluster_rec_t *sib;
list_t *req_features = NULL;
list_itr_t *feature_itr, *sib_itr;
if (!spec_features || !fed_mgr_fed_rec) {
if (cluster_bitmap)
*cluster_bitmap = feature_sibs;
return rc;
}
if (*spec_features == '\0') {
if (cluster_bitmap)
*cluster_bitmap = _get_all_sibling_bits();
return rc;
}
req_features = list_create(xfree_ptr);
slurm_addto_char_list(req_features, spec_features);
feature_itr = list_iterator_create(req_features);
sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
feature = list_peek(req_features);
if (feature && feature[0] == '!') {
feature_sibs = _get_all_sibling_bits();
negative_logic = true;
}
while ((feature = list_next(feature_itr))) {
if (negative_logic && feature[0] == '!')
feature++;
bool found = false;
while ((sib = list_next(sib_itr))) {
if (sib->fed.feature_list &&
list_find_first(sib->fed.feature_list,
slurm_find_char_in_list, feature)) {
if (negative_logic) {
feature_sibs &=
~FED_SIBLING_BIT(sib->fed.id);
} else {
feature_sibs |=
FED_SIBLING_BIT(sib->fed.id);
}
found = true;
}
}
if (!found) {
error("didn't find at least one cluster with the feature '%s'",
feature);
rc = SLURM_ERROR;
goto end_features;
}
if (negative_logic && !feature_sibs) {
error("eliminated all viable clusters with constraint '%s'",
feature);
rc = SLURM_ERROR;
goto end_features;
}
list_iterator_reset(sib_itr);
}
end_features:
list_iterator_destroy(sib_itr);
list_iterator_destroy(feature_itr);
FREE_NULL_LIST(req_features);
if (cluster_bitmap)
*cluster_bitmap = feature_sibs;
return rc;
}
extern void fed_mgr_remove_remote_dependencies(job_record_t *job_ptr)
{
uint32_t origin_id;
if (!_is_fed_job(job_ptr, &origin_id) ||
!fed_mgr_is_origin_job(job_ptr) || !job_ptr->details)
return;
fed_mgr_submit_remote_dependencies(job_ptr, false, true);
}
static int _add_to_send_list(void *object, void *arg)
{
depend_spec_t *dependency = (depend_spec_t *)object;
uint64_t *send_sib_bits = (uint64_t *)arg;
uint32_t cluster_id;
if ((dependency->depend_type == SLURM_DEPEND_SINGLETON) &&
!disable_remote_singleton) {
*send_sib_bits |= _get_all_sibling_bits();
/* Negative value short-circuits list_for_each */
return -1;
}
if (!(dependency->depend_flags & SLURM_FLAGS_REMOTE) ||
(dependency->depend_state != DEPEND_NOT_FULFILLED))
return SLURM_SUCCESS;
cluster_id = fed_mgr_get_cluster_id(dependency->job_id);
*send_sib_bits |= FED_SIBLING_BIT(cluster_id);
return SLURM_SUCCESS;
}
/*
* Send dependencies of job_ptr to siblings.
*
* If the dependency string is NULL, that means we're telling the siblings
* to delete that dependency. Send empty string to indicate that.
*
* If send_all_sibs == true, then send dependencies to all siblings. Otherwise,
* only send dependencies to siblings that own the remote jobs that job_ptr
* depends on. I.e., if a sibling doesn't own any jobs that job_ptr depends on,
* we won't send job_ptr's dependencies to that sibling.
*
* If clear_dependencies == true, then clear the dependencies on the siblings
* where dependencies reside. In this case, use the job's dependency list to
* find out which siblings to send the RPC to if the list is non-NULL. If the
* list is NULL, then we have to send to all siblings.
*/
extern int fed_mgr_submit_remote_dependencies(job_record_t *job_ptr,
bool send_all_sibs,
bool clear_dependencies)
{
int rc = SLURM_SUCCESS;
uint64_t send_sib_bits = 0;
list_itr_t *sib_itr;
slurm_msg_t req_msg;
dep_msg_t dep_msg = { 0 };
slurmdb_cluster_rec_t *sibling;
uint32_t origin_id;
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
xassert(job_ptr->details);
dep_msg.job_id = job_ptr->job_id;
dep_msg.job_name = job_ptr->name;
dep_msg.array_job_id = job_ptr->array_job_id;
dep_msg.array_task_id = job_ptr->array_task_id;
dep_msg.is_array = job_ptr->array_recs ? true : false;
dep_msg.user_id = job_ptr->user_id;
if (!job_ptr->details->dependency || clear_dependencies)
/*
* Since we have to pack these values, set dependency to empty
* string and set depend_list to an empty list so we have
* data to pack.
*/
dep_msg.dependency = "";
else
dep_msg.dependency = job_ptr->details->dependency;
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_SEND_DEP;
req_msg.data = &dep_msg;
if (!job_ptr->details->depend_list)
send_all_sibs = true;
if (!send_all_sibs) {
list_for_each(job_ptr->details->depend_list,
_add_to_send_list, &send_sib_bits);
}
sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
while ((sibling = list_next(sib_itr))) {
if (sibling == fed_mgr_cluster_rec)
continue;
/*
* If we aren't sending the dependency to all siblings and
* there isn't a dependency on this sibling, don't send
* an RPC to this sibling.
*/
if (!send_all_sibs &&
!(send_sib_bits & FED_SIBLING_BIT(sibling->fed.id)))
continue;
req_msg.protocol_version = sibling->rpc_version;
rc |= _queue_rpc(sibling, &req_msg, job_ptr->job_id, false);
}
list_iterator_destroy(sib_itr);
return rc;
}
/* submit a federated job.
*
* IN msg - msg that contains packed job_desc msg to send to siblings.
* IN job_desc - original job_desc msg.
* IN alloc_only - true if requesting just an allocation (srun/salloc).
* IN protocol_version - version of the code the caller is using
* OUT job_id_ptr - job_id of allocated job
* OUT alloc_code - error_code returned from job_allocate
* OUT err_msg - error message returned if any
* RET returns SLURM_SUCCESS if the allocation was successful, SLURM_ERROR
* otherwise.
*/
extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc,
bool alloc_only,
uint32_t *job_id_ptr, int *alloc_code,
char **err_msg)
{
uint64_t feature_sibs = 0;
job_record_t *job_ptr = NULL;
bool job_held = false;
xassert(msg);
xassert(job_desc);
xassert(job_id_ptr);
xassert(alloc_code);
xassert(err_msg);
if (job_desc->job_id != NO_VAL) {
error("attempt by uid %u to set JobId=%u. "
"specifying a job_id is not allowed when in a federation",
msg->auth_uid, job_desc->job_id);
*alloc_code = ESLURM_INVALID_JOB_ID;
return SLURM_ERROR;
}
if (_validate_cluster_features(job_desc->cluster_features,
&feature_sibs)) {
*alloc_code = ESLURM_INVALID_CLUSTER_FEATURE;
return SLURM_ERROR;
}
/* get job_id now. Can't submit job to get job_id as job_allocate will
* change the job_desc. */
job_desc->job_id = get_next_job_id(false);
/* Set viable siblings */
job_desc->fed_siblings_viable =
_get_viable_sibs(job_desc->clusters, feature_sibs,
(job_desc->array_inx) ? true : false, err_msg);
if (!job_desc->fed_siblings_viable) {
*alloc_code = ESLURM_FED_NO_VALID_CLUSTERS;
return SLURM_ERROR;
}
/* ensure that fed_siblings_active is clear since this is a new job */
job_desc->fed_siblings_active = 0;
/*
* Submit local job first. Then submit to all siblings. If the local job
* fails, then don't worry about sending to the siblings.
*/
job_desc->het_job_offset = NO_VAL;
*alloc_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
alloc_only, msg->auth_uid, false, &job_ptr,
err_msg, msg->protocol_version);
if (!job_ptr || (*alloc_code && job_ptr->job_state == JOB_FAILED)) {
/* There may be an rc but the job won't be failed. Will sit in
* queue */
info("failed to submit federated job to local cluster");
return SLURM_ERROR;
}
/* mark this cluster as an active sibling if it's in the viable list */
if (job_desc->fed_siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
job_desc->fed_siblings_active |=
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
/* Job is not eligible on origin cluster - mark as revoked. */
if (!(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
job_state_set_flag(job_ptr, JOB_REVOKED);
*job_id_ptr = job_ptr->job_id;
/*
* Don't submit a job with dependencies to siblings - the origin will
* test job dependencies and submit the job to siblings when all
* dependencies are fulfilled.
* job_allocate() calls job_independent() which sets the JOB_DEPENDENT
* flag if the job is dependent, so check this after job_allocate().
*/
if ((job_desc->priority == 0) || (job_ptr->bit_flags & JOB_DEPENDENT))
job_held = true;
if (job_held) {
info("Submitted held federated %pJ to %s(self)",
job_ptr, fed_mgr_cluster_rec->name);
} else {
info("Submitted %sfederated %pJ to %s(self)",
(!(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) ?
"tracking " : ""),
job_ptr, fed_mgr_cluster_rec->name);
}
/* Update job before submitting sibling jobs so that it will show the
* viable siblings and potentially active local job */
job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
update_job_fed_details(job_ptr);
if (!job_held && _submit_sibling_jobs(
job_desc, msg, alloc_only,
job_ptr->fed_details->siblings_viable,
job_ptr->start_protocol_ver))
info("failed to submit sibling job to one or more siblings");
/* Send remote dependencies to siblings */
if ((job_ptr->bit_flags & JOB_DEPENDENT) &&
job_ptr->details && job_ptr->details->dependency)
if (fed_mgr_submit_remote_dependencies(job_ptr, false, false))
error("%s: %pJ Failed to send remote dependencies to some or all siblings.",
__func__, job_ptr);
job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
update_job_fed_details(job_ptr);
/* Add record to fed job table */
add_fed_job_info(job_ptr);
return SLURM_SUCCESS;
}
/* Tests if the job is a tracker only federated job.
* Tracker only job: a job that shouldn't run on the local cluster but should be
* kept around to facilitate communications for it's sibling jobs on other
* clusters.
*/
extern bool fed_mgr_is_tracker_only_job(job_record_t *job_ptr)
{
bool rc = false;
uint32_t origin_id;
xassert(job_ptr);
if (!_is_fed_job(job_ptr, &origin_id))
return rc;
if (job_ptr->fed_details &&
(origin_id == fed_mgr_cluster_rec->fed.id) &&
job_ptr->fed_details->siblings_active &&
(!(job_ptr->fed_details->siblings_active &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))))
rc = true;
if (job_ptr->fed_details &&
job_ptr->fed_details->cluster_lock &&
job_ptr->fed_details->cluster_lock != fed_mgr_cluster_rec->fed.id)
rc = true;
return rc;
}
/* Return the cluster name for the given cluster id.
* Must xfree returned string
*/
extern char *fed_mgr_get_cluster_name(uint32_t id)
{
slurmdb_cluster_rec_t *sibling;
char *name = NULL;
if ((sibling = fed_mgr_get_cluster_by_id(id))) {
name = xstrdup(sibling->name);
}
return name;
}
static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id)
{
xassert(job_ptr);
xassert(origin_id);
if (!fed_mgr_cluster_rec)
return false;
if ((!job_ptr->fed_details) ||
(!(*origin_id = fed_mgr_get_cluster_id(job_ptr->job_id)))) {
debug2("job %pJ not a federated job", job_ptr);
return false;
}
return true;
}
static int _job_unlock_spec_sibs(job_record_t *job_ptr, uint64_t spec_sibs)
{
uint32_t cluster_id = fed_mgr_cluster_rec->fed.id;
slurmdb_cluster_rec_t *sibling;
int sib_id = 1;
while (spec_sibs) {
if (!(spec_sibs & 1))
goto next_unlock;
if (fed_mgr_cluster_rec->fed.id == sib_id)
fed_mgr_job_lock_unset(job_ptr->job_id,
cluster_id);
else if ((sibling = fed_mgr_get_cluster_by_id(sib_id)))
_persist_fed_job_unlock(sibling, job_ptr->job_id,
cluster_id);
next_unlock:
spec_sibs >>= 1;
sib_id++;
}
return SLURM_SUCCESS;
}
/*
* Return SLURM_SUCCESS if all siblings give lock to job; SLURM_ERROR otherwise.
*/
static int _job_lock_all_sibs(job_record_t *job_ptr)
{
slurmdb_cluster_rec_t *sibling = NULL;
int sib_id = 1;
bool all_said_yes = true;
uint64_t replied_sibs = 0, tmp_sibs = 0;
uint32_t origin_id, cluster_id;
xassert(job_ptr);
origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
cluster_id = fed_mgr_cluster_rec->fed.id;
tmp_sibs = job_ptr->fed_details->siblings_viable &
(~FED_SIBLING_BIT(origin_id));
while (tmp_sibs) {
if (!(tmp_sibs & 1))
goto next_lock;
if (cluster_id == sib_id) {
if (!fed_mgr_job_lock_set(job_ptr->job_id, cluster_id))
replied_sibs |= FED_SIBLING_BIT(sib_id);
else {
all_said_yes = false;
break;
}
} else if (!(sibling = fed_mgr_get_cluster_by_id(sib_id)) ||
(!sibling->fed.send) ||
(((persist_conn_t *) sibling->fed.send) < 0)) {
/*
* Don't consider clusters that are down. They will sync
* up later.
*/
goto next_lock;
} else if (!_persist_fed_job_lock(sibling, job_ptr->job_id,
cluster_id)) {
replied_sibs |= FED_SIBLING_BIT(sib_id);
} else {
all_said_yes = false;
break;
}
next_lock:
tmp_sibs >>= 1;
sib_id++;
}
/*
* Have to talk to at least one other sibling -- if there is one -- to
* start the job
*/
if (all_said_yes &&
(!(job_ptr->fed_details->siblings_viable &
~FED_SIBLING_BIT(cluster_id)) ||
(replied_sibs & ~(FED_SIBLING_BIT(cluster_id)))))
return SLURM_SUCCESS;
/* have to release the lock on those that said yes */
_job_unlock_spec_sibs(job_ptr, replied_sibs);
return SLURM_ERROR;
}
static int _slurmdbd_conn_active()
{
int active = 0;
if (acct_storage_g_get_data(acct_db_conn, ACCT_STORAGE_INFO_CONN_ACTIVE,
&active) != SLURM_SUCCESS)
active = 0;
return active;
}
/*
* Attempt to grab the job's federation cluster lock so that the requesting
* cluster can attempt to start to the job.
*
* IN job - job to lock
* RET returns SLURM_SUCCESS if the lock was granted, SLURM_ERROR otherwise
*/
extern int fed_mgr_job_lock(job_record_t *job_ptr)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id, cluster_id;
xassert(job_ptr);
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
cluster_id = fed_mgr_cluster_rec->fed.id;
log_flag(FEDR, "attempting fed job lock on %pJ by cluster_id %d",
job_ptr, cluster_id);
if (origin_id != fed_mgr_cluster_rec->fed.id) {
persist_conn_t *origin_conn = NULL;
slurmdb_cluster_rec_t *origin_cluster;
if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
info("Unable to find origin cluster for %pJ from origin id %d",
job_ptr, origin_id);
} else
origin_conn =
(persist_conn_t *) origin_cluster->fed.send;
/* Check dbd is up to make sure ctld isn't on an island. */
if (acct_db_conn && _slurmdbd_conn_active() &&
(!origin_conn || !origin_conn->tls_conn)) {
rc = _job_lock_all_sibs(job_ptr);
} else if (origin_cluster) {
rc = _persist_fed_job_lock(origin_cluster,
job_ptr->job_id,
cluster_id);
} else {
rc = SLURM_ERROR;
}
if (!rc) {
job_ptr->fed_details->cluster_lock = cluster_id;
fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
}
return rc;
}
/* origin cluster */
rc = fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
return rc;
}
extern int fed_mgr_job_lock_set(uint32_t job_id, uint32_t cluster_id)
{
int rc = SLURM_SUCCESS;
fed_job_info_t *job_info;
slurm_mutex_lock(&fed_job_list_mutex);
log_flag(FEDR, "%s: attempting to set fed JobId=%u lock to %u",
__func__, job_id, cluster_id);
if (!(job_info = _find_fed_job_info(job_id))) {
error("Didn't find JobId=%u in fed_job_list", job_id);
rc = SLURM_ERROR;
} else if (_job_has_pending_updates(job_info)) {
log_flag(FEDR, "%s: cluster %u can't get cluster lock for JobId=%u because it has pending updates",
__func__, cluster_id, job_id);
rc = SLURM_ERROR;
} else if (job_info->cluster_lock &&
job_info->cluster_lock != cluster_id) {
log_flag(FEDR, "%s: fed JobId=%u already locked by cluster %d",
__func__, job_id, job_info->cluster_lock);
rc = SLURM_ERROR;
} else {
log_flag(FEDR, "%s: fed JobId=%u locked by %u",
__func__, job_id, cluster_id);
job_info->cluster_lock = cluster_id;
}
slurm_mutex_unlock(&fed_job_list_mutex);
return rc;
}
extern bool fed_mgr_job_is_self_owned(job_record_t *job_ptr)
{
if (!fed_mgr_cluster_rec || !job_ptr->fed_details ||
(job_ptr->fed_details->cluster_lock == fed_mgr_cluster_rec->fed.id))
return true;
return false;
}
extern bool fed_mgr_job_is_locked(job_record_t *job_ptr)
{
if (!job_ptr->fed_details ||
job_ptr->fed_details->cluster_lock)
return true;
return false;
}
static void _q_sib_job_start(slurm_msg_t *msg)
{
sib_msg_t *sib_msg = msg->data;
fed_job_update_info_t *job_update_info;
/* add todo to remove remote siblings if the origin job */
job_update_info = xmalloc(sizeof(fed_job_update_info_t));
job_update_info->type = FED_JOB_START;
job_update_info->job_id = sib_msg->job_id;
job_update_info->start_time = sib_msg->start_time;
job_update_info->cluster_lock = sib_msg->cluster_id;
_append_job_update(job_update_info);
}
extern int fed_mgr_job_lock_unset(uint32_t job_id, uint32_t cluster_id)
{
int rc = SLURM_SUCCESS;
fed_job_info_t * job_info;
slurm_mutex_lock(&fed_job_list_mutex);
log_flag(FEDR, "%s: attempting to unlock fed JobId=%u by cluster %u",
__func__, job_id, cluster_id);
if (!(job_info = _find_fed_job_info(job_id))) {
error("Didn't find JobId=%u in fed_job_list", job_id);
rc = SLURM_ERROR;
} else if (job_info->cluster_lock &&
job_info->cluster_lock != cluster_id) {
error("attempt to unlock sib JobId=%u by cluster %d which doesn't have job lock",
job_id, cluster_id);
rc = SLURM_ERROR;
} else {
log_flag(FEDR, "%s: fed JobId=%u unlocked by %u",
__func__, job_id, cluster_id);
job_info->cluster_lock = 0;
}
slurm_mutex_unlock(&fed_job_list_mutex);
return rc;
}
/*
* Release the job's federation cluster lock so that other cluster's can try to
* start the job.
*
* IN job - job to unlock
* RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
*/
extern int fed_mgr_job_unlock(job_record_t *job_ptr)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id, cluster_id;
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
cluster_id = fed_mgr_cluster_rec->fed.id;
log_flag(FEDR, "releasing fed job lock on %pJ by cluster_id %d",
job_ptr, cluster_id);
if (origin_id != fed_mgr_cluster_rec->fed.id) {
persist_conn_t *origin_conn = NULL;
slurmdb_cluster_rec_t *origin_cluster;
if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
info("Unable to find origin cluster for %pJ from origin id %d",
job_ptr, origin_id);
} else {
origin_conn =
(persist_conn_t *) origin_cluster->fed.send;
}
if (!origin_conn || !origin_conn->tls_conn) {
uint64_t tmp_sibs;
tmp_sibs = job_ptr->fed_details->siblings_viable &
~FED_SIBLING_BIT(origin_id);
rc = _job_unlock_spec_sibs(job_ptr, tmp_sibs);
} else {
rc = _persist_fed_job_unlock(origin_cluster,
job_ptr->job_id,
cluster_id);
}
if (!rc) {
job_ptr->fed_details->cluster_lock = 0;
fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
}
return rc;
}
/* Origin Cluster */
rc = fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
return rc;
}
/*
* Notify origin cluster that cluster_id started job.
*
* Cancels remaining sibling jobs.
*
* IN job_ptr - job_ptr of job to unlock
* IN start_time - start_time of the job.
* RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
*/
extern int fed_mgr_job_start(job_record_t *job_ptr, time_t start_time)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id, cluster_id;
fed_job_info_t *job_info;
xassert(job_ptr);
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
cluster_id = fed_mgr_cluster_rec->fed.id;
log_flag(FEDR, "start fed %pJ by cluster_id %d",
job_ptr, cluster_id);
if (origin_id != fed_mgr_cluster_rec->fed.id) {
persist_conn_t *origin_conn = NULL;
slurmdb_cluster_rec_t *origin_cluster;
if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
info("Unable to find origin cluster for %pJ from origin id %d",
job_ptr, origin_id);
} else {
origin_conn =
(persist_conn_t *) origin_cluster->fed.send;
}
if (!origin_conn || !origin_conn->tls_conn) {
uint64_t viable_sibs;
viable_sibs = job_ptr->fed_details->siblings_viable;
viable_sibs &= ~FED_SIBLING_BIT(origin_id);
viable_sibs &= ~FED_SIBLING_BIT(cluster_id);
_revoke_sibling_jobs(job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
viable_sibs, job_ptr->start_time);
rc = SLURM_SUCCESS;
} else {
rc = _persist_fed_job_start(origin_cluster,
job_ptr->job_id, cluster_id,
job_ptr->start_time);
}
if (!rc) {
job_ptr->fed_details->siblings_active =
FED_SIBLING_BIT(cluster_id);
update_job_fed_details(job_ptr);
}
return rc;
}
/* Origin Cluster: */
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
error("Didn't find %pJ in fed_job_list", job_ptr);
rc = SLURM_ERROR;
} else if (!job_info->cluster_lock) {
error("attempt to start sib JobId=%u by cluster %u, but it's not locked",
job_info->job_id, cluster_id);
rc = SLURM_ERROR;
} else if (job_info->cluster_lock &&
(job_info->cluster_lock != cluster_id)) {
error("attempt to start sib JobId=%u by cluster %u, which doesn't have job lock",
job_info->job_id, cluster_id);
rc = SLURM_ERROR;
}
if (!rc)
_fed_job_start_revoke(job_info, job_ptr, start_time);
slurm_mutex_unlock(&fed_job_list_mutex);
return rc;
}
/*
* Complete the federated job. If the job ran on a cluster other than the
* origin_cluster then it notifies the origin cluster that the job finished.
*
* Tells the origin cluster to revoke the tracking job.
*
* IN job_ptr - job_ptr of job to complete.
* IN return_code - return code of job
* IN start_time - start time of the job that actually ran.
* RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
*/
extern int fed_mgr_job_complete(job_record_t *job_ptr, uint32_t return_code,
time_t start_time)
{
uint32_t origin_id;
if (job_ptr->bit_flags & SIB_JOB_FLUSH)
return SLURM_SUCCESS;
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
log_flag(FEDR, "complete fed %pJ by cluster_id %d",
job_ptr, fed_mgr_cluster_rec->fed.id);
if (origin_id == fed_mgr_cluster_rec->fed.id) {
_revoke_sibling_jobs(job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
job_ptr->fed_details->siblings_active,
job_ptr->start_time);
return SLURM_SUCCESS;
}
slurmdb_cluster_rec_t *conn = fed_mgr_get_cluster_by_id(origin_id);
if (!conn) {
info("Unable to find origin cluster for %pJ from origin id %d",
job_ptr, origin_id);
return SLURM_ERROR;
}
return _persist_fed_job_revoke(conn, job_ptr->job_id,
job_ptr->job_state, return_code,
start_time);
}
/*
* Revoke all sibling jobs.
*
* IN job_ptr - job to revoke sibling jobs from.
* RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
*/
extern int fed_mgr_job_revoke_sibs(job_record_t *job_ptr)
{
uint32_t origin_id;
time_t now = time(NULL);
xassert(verify_lock(JOB_LOCK, READ_LOCK));
xassert(verify_lock(FED_LOCK, READ_LOCK));
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
if (origin_id != fed_mgr_cluster_rec->fed.id)
return SLURM_SUCCESS;
log_flag(FEDR, "revoke fed %pJ's siblings", job_ptr);
_revoke_sibling_jobs(job_ptr->job_id, fed_mgr_cluster_rec->fed.id,
job_ptr->fed_details->siblings_active, now);
return SLURM_SUCCESS;
}
/*
* Revokes the federated job.
*
* IN job_ptr - job_ptr of job to revoke.
* IN job_complete - whether the job is done or not. If completed then sets the
* state to JOB_REVOKED | completed_state. JOB_REVOKED otherwise.
* IN completed_state - state of completed job. Only use if job_complete==true.
* If job_complete==false, then this is unused.
* IN exit_code - exit_code of job.
* IN start_time - start time of the job that actually ran.
* RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
*/
extern int fed_mgr_job_revoke(job_record_t *job_ptr, bool job_complete,
uint32_t completed_state, uint32_t exit_code,
time_t start_time)
{
uint32_t origin_id;
uint32_t state = JOB_REVOKED;
if (IS_JOB_COMPLETED(job_ptr)) /* job already completed */
return SLURM_SUCCESS;
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
log_flag(FEDR, "revoking fed %pJ (%s)",
job_ptr, job_complete ? "REVOKED|CANCELLED" : "REVOKED");
/* Check if the job exited with one of the configured requeue values. */
job_ptr->exit_code = exit_code;
if (job_hold_requeue(job_ptr)) {
batch_requeue_fini(job_ptr);
return SLURM_SUCCESS;
}
/*
* Only set to a "completed" state (i.e., state > JOB_SUSPENDED)
* if job_complete is true.
*/
if (job_complete) {
if (completed_state > JOB_SUSPENDED)
state |= completed_state;
else
state |= JOB_CANCELLED;
}
job_state_set(job_ptr, state);
job_ptr->start_time = start_time;
job_ptr->end_time = start_time;
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
/*
* Since the job is purged/revoked quickly on the non-origin side it's
* possible that the job_start message has not been sent yet. Send it
* now so that the db record gets the uid set -- which the complete
* message doesn't send.
*/
if (!job_ptr->db_index && (origin_id != fed_mgr_cluster_rec->fed.id)) {
if (IS_JOB_FINISHED(job_ptr))
jobacct_storage_g_job_start(acct_db_conn, job_ptr);
else
info("%s: %pJ isn't finished and isn't an origin job (%u != %u) and doesn't have a db_index yet. We aren't sending a start message to the database.",
__func__, job_ptr, origin_id,
fed_mgr_cluster_rec->fed.id);
}
job_completion_logger(job_ptr, false);
/* Don't remove the origin job */
if (origin_id == fed_mgr_cluster_rec->fed.id)
return SLURM_SUCCESS;
/* Purge the revoked job -- remote only */
unlink_job_record(job_ptr);
return SLURM_SUCCESS;
}
/* Convert cluster ids to cluster names.
*
* RET: return string of comma-separated cluster names.
* Must free returned string.
*/
extern char *fed_mgr_cluster_ids_to_names(uint64_t cluster_ids)
{
int bit = 1;
char *names = NULL;
if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
return names;
while (cluster_ids) {
if (cluster_ids & 1) {
slurmdb_cluster_rec_t *sibling;
if ((sibling = fed_mgr_get_cluster_by_id(bit))) {
xstrfmtcat(names, "%s%s",
(names) ? "," : "", sibling->name);
} else {
error("Couldn't find a sibling cluster with id %d",
bit);
}
}
cluster_ids >>= 1;
bit++;
}
return names;
}
/*
* Tests whether a federated job can be requeued.
*
* If called from the remote cluster (non-origin) then it will send a requeue
* request to the origin to have the origin cancel this job. In this case, it
* will return success and set the JOB_REQUEUE_FED flag and wait to be killed.
*
* If it is the origin job, it will also cancel a running remote job. New
* federated sibling jobs will be submitted after the job has completed (e.g.
* after epilog) in fed_mgr_job_requeue().
*
* IN job_ptr - job to requeue.
* IN flags - flags for the requeue (e.g. JOB_RECONFIG_FAIL).
* RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
* otherwise.
*/
extern int fed_mgr_job_requeue_test(job_record_t *job_ptr, uint32_t flags)
{
uint32_t origin_id;
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
if (origin_id != fed_mgr_cluster_rec->fed.id) {
slurmdb_cluster_rec_t *origin_cluster;
if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
error("Unable to find origin cluster for %pJ from origin id %d",
job_ptr, origin_id);
return SLURM_ERROR;
}
log_flag(FEDR, "requeuing fed job %pJ on origin cluster %d",
job_ptr, origin_id);
_persist_fed_job_requeue(origin_cluster, job_ptr->job_id,
flags);
job_state_set_flag(job_ptr, JOB_REQUEUE_FED);
return SLURM_SUCCESS;
}
log_flag(FEDR, "requeuing fed %pJ by cluster_id %d",
job_ptr, fed_mgr_cluster_rec->fed.id);
/* If the job is currently running locally, then cancel the running job
* and set a flag that it's being requeued. Then when the epilog
* complete comes in submit the siblings to the other clusters.
* Have to check this after checking for origin else it won't get to the
* origin. */
if (IS_JOB_RUNNING(job_ptr))
return SLURM_SUCCESS;
/* If a sibling job is running remotely, then cancel the remote job and
* wait till job finishes (e.g. after long epilog) and then resubmit the
* siblings in fed_mgr_job_requeue(). */
if (IS_JOB_PENDING(job_ptr) && IS_JOB_REVOKED(job_ptr)) {
slurmdb_cluster_rec_t *remote_cluster;
if (!(remote_cluster =
fed_mgr_get_cluster_by_id(
job_ptr->fed_details->cluster_lock))) {
error("Unable to find remote cluster for %pJ from cluster lock %d",
job_ptr, job_ptr->fed_details->cluster_lock);
return SLURM_ERROR;
}
if (_persist_fed_job_cancel(remote_cluster, job_ptr->job_id,
SIGKILL, KILL_FED_REQUEUE, 0)) {
error("failed to kill/requeue fed %pJ",
job_ptr);
}
}
return SLURM_SUCCESS;
}
/*
* Submits requeued sibling jobs.
*
* IN job_ptr - job to requeue.
* RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
* otherwise.
*/
extern int fed_mgr_job_requeue(job_record_t *job_ptr)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id;
uint64_t feature_sibs = 0;
fed_job_info_t *job_info;
xassert(job_ptr);
xassert(job_ptr->details);
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
log_flag(FEDR, "requeuing fed job %pJ", job_ptr);
/* clear where actual siblings were */
job_ptr->fed_details->siblings_active = 0;
slurm_mutex_lock(&fed_job_list_mutex);
if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
error("%s: failed to find fed job info for fed %pJ",
__func__, job_ptr);
}
/* don't submit siblings for jobs that are held */
if (job_ptr->priority == 0) {
job_state_unset_flag(job_ptr, JOB_REQUEUE_FED);
update_job_fed_details(job_ptr);
/* clear cluster lock */
job_ptr->fed_details->cluster_lock = 0;
if (job_info)
job_info->cluster_lock = 0;
slurm_mutex_unlock(&fed_job_list_mutex);
return SLURM_SUCCESS;
}
/* Don't worry about testing which clusters can start the job the
* soonest since they can't start the job for 120 seconds anyways. */
/* Get new viable siblings since the job might just have one viable
* sibling listed if the sibling was the cluster that could start the
* job the soonest. */
_validate_cluster_features(job_ptr->details->cluster_features,
&feature_sibs);
job_ptr->fed_details->siblings_viable =
_get_viable_sibs(job_ptr->clusters, feature_sibs,
job_ptr->array_recs ? true : false, NULL);
_prepare_submit_siblings(job_ptr,
job_ptr->fed_details->siblings_viable);
job_state_unset_flag(job_ptr, JOB_REQUEUE_FED);
if (!(job_ptr->fed_details->siblings_viable &
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
job_state_set_flag(job_ptr, JOB_REVOKED);
else
job_state_unset_flag(job_ptr, JOB_REVOKED);
/* clear cluster lock */
job_ptr->fed_details->cluster_lock = 0;
if (job_info) {
job_info->cluster_lock = 0;
job_info->siblings_viable =
job_ptr->fed_details->siblings_viable;
job_info->siblings_active =
job_ptr->fed_details->siblings_active;
}
slurm_mutex_unlock(&fed_job_list_mutex);
return rc;
}
/* Cancel sibling jobs. Just send request to itself */
static int _cancel_sibling_jobs(job_record_t *job_ptr, uint16_t signal,
uint16_t flags, uid_t uid, bool kill_viable)
{
int id = 1;
uint64_t tmp_sibs;
persist_conn_t *sib_conn;
if (kill_viable) {
tmp_sibs = job_ptr->fed_details->siblings_viable;
flags |= KILL_NO_SIBS;
} else {
tmp_sibs = job_ptr->fed_details->siblings_active;
flags &= ~KILL_NO_SIBS;
}
while (tmp_sibs) {
if ((tmp_sibs & 1) &&
(id != fed_mgr_cluster_rec->fed.id)) {
slurmdb_cluster_rec_t *cluster =
fed_mgr_get_cluster_by_id(id);
if (!cluster) {
error("couldn't find cluster rec by id %d", id);
goto next_job;
}
/* Don't send request to siblings that are down when
* killing viables */
sib_conn = (persist_conn_t *) cluster->fed.send;
if (kill_viable && (!sib_conn || !sib_conn->tls_conn))
goto next_job;
_persist_fed_job_cancel(cluster, job_ptr->job_id,
signal, flags, uid);
}
next_job:
tmp_sibs >>= 1;
id++;
}
return SLURM_SUCCESS;
}
/* Cancel sibling jobs of a federated job
*
* IN job_ptr - job to cancel
* IN signal - signal to send to job
* IN flags - KILL_.* flags
* IN uid - uid making request
* IN kill_viable - if true cancel viable_sibs, if false cancel active_sibs
*/
extern int fed_mgr_job_cancel(job_record_t *job_ptr, uint16_t signal,
uint16_t flags, uid_t uid, bool kill_viable)
{
uint32_t origin_id;
xassert(job_ptr);
if (!_is_fed_job(job_ptr, &origin_id))
return SLURM_SUCCESS;
log_flag(FEDR, "cancel fed %pJ by local cluster", job_ptr);
_cancel_sibling_jobs(job_ptr, signal, flags, uid, kill_viable);
return SLURM_SUCCESS;
}
extern bool fed_mgr_job_started_on_sib(job_record_t *job_ptr)
{
uint32_t origin_id;
xassert(job_ptr);
/*
* When a sibling starts the job, the job becomes revoked on the origin
* and the job's cluster_lock is set to that sibling's id.
* Don't use fed_mgr_is_origin_job() because that return true if
* _is_fed_job() returns false (the job isn't federated), and that's
* the opposite of what we want here.
*/
return _is_fed_job(job_ptr, &origin_id) &&
(fed_mgr_cluster_rec->fed.id == origin_id) &&
IS_JOB_REVOKED(job_ptr) && job_ptr->fed_details->cluster_lock &&
(job_ptr->fed_details->cluster_lock !=
fed_mgr_cluster_rec->fed.id);
}
extern bool fed_mgr_is_job_id_in_fed(uint32_t job_id)
{
uint32_t cluster_id;
if (!fed_mgr_cluster_rec)
return false;
cluster_id = fed_mgr_get_cluster_id(job_id);
if (!cluster_id)
return false;
return FED_SIBLING_BIT(cluster_id) & _get_all_sibling_bits();
}
extern int fed_mgr_is_origin_job(job_record_t *job_ptr)
{
uint32_t origin_id;
xassert(job_ptr);
if (!_is_fed_job(job_ptr, &origin_id))
return true;
if (fed_mgr_cluster_rec->fed.id != origin_id)
return false;
return true;
}
/*
* Use this instead of fed_mgr_is_origin_job if job_ptr is not available.
*/
extern bool fed_mgr_is_origin_job_id(uint32_t job_id)
{
uint32_t origin_id = fed_mgr_get_cluster_id(job_id);
if (!fed_mgr_cluster_rec || !origin_id) {
debug2("%s: job %u is not a federated job", __func__, job_id);
return true;
}
if (fed_mgr_cluster_rec->fed.id == origin_id)
return true;
return false;
}
/*
* Check if all siblings have fulfilled the singleton dependency.
* Return true if all clusters have checked in that they've fulfilled this
* singleton dependency.
*
* IN job_ptr - job with dependency to check
* IN dep_ptr - dependency to check. If it's not singleton, just return true.
* IN set_cluster_bit - if true, set the bit for this cluster indicating
* that this cluster has fulfilled the dependency.
*/
extern bool fed_mgr_is_singleton_satisfied(job_record_t *job_ptr,
depend_spec_t *dep_ptr,
bool set_cluster_bit)
{
uint32_t origin_id;
uint64_t sib_bits;
xassert(job_ptr);
xassert(dep_ptr);
if (!_is_fed_job(job_ptr, &origin_id) || disable_remote_singleton)
return true;
if (dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) {
error("%s: Got non-singleton dependency (type %u) for %pJ. This should never happen.",
__func__, dep_ptr->depend_type, job_ptr);
return true;
}
/* Set the bit for this cluster indicating that it has been satisfied */
if (set_cluster_bit)
dep_ptr->singleton_bits |=
FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
if (fed_mgr_cluster_rec->fed.id != origin_id) {
return true;
}
/*
* Only test for current siblings; if a sibling was removed but
* previously had passed a singleton dependency, that bit may be
* set in dep_ptr->singleton_bits.
*/
sib_bits = _get_all_sibling_bits();
return (dep_ptr->singleton_bits & sib_bits) == sib_bits;
}
/*
* Update a job's required clusters.
*
* Results in siblings being removed and added.
*
* IN job_ptr - job to update.
* IN spec_clusters - comma-separated list of cluster names.
* RET return SLURM_SUCCESS on success, error code otherwise.
*/
extern int fed_mgr_update_job_clusters(job_record_t *job_ptr,
char *spec_clusters)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id;
xassert(job_ptr);
xassert(spec_clusters);
if (!_is_fed_job(job_ptr, &origin_id)) {
sched_info("update_job: not a fed job");
rc = SLURM_ERROR;
} else if ((!IS_JOB_PENDING(job_ptr)) ||
job_ptr->fed_details->cluster_lock) {
rc = ESLURM_JOB_NOT_PENDING;
} else if (!fed_mgr_fed_rec) {
sched_info("update_job: setting Clusters on a non-active federated cluster for %pJ",
job_ptr);
rc = ESLURM_JOB_NOT_FEDERATED;
} else if (_validate_cluster_names(spec_clusters, NULL)) {
sched_info("update_job: invalid Clusters for %pJ: %s",
job_ptr, spec_clusters);
rc = ESLURM_INVALID_CLUSTER_NAME;
} else {
xfree(job_ptr->clusters);
if (spec_clusters[0] == '\0')
sched_info("update_job: cleared Clusters for %pJ",
job_ptr);
else if (*spec_clusters)
job_ptr->clusters =
xstrdup(spec_clusters);
if (fed_mgr_is_origin_job(job_ptr))
_add_remove_sibling_jobs(job_ptr);
}
return rc;
}
/*
* Update a job's cluster features.
*
* Results in siblings being removed and added.
*
* IN job_ptr - job to update cluster features.
* IN req_features - comma-separated list of feature names.
* RET return SLURM_SUCCESS on success, error code otherwise.
*/
extern int fed_mgr_update_job_cluster_features(job_record_t *job_ptr,
char *req_features)
{
int rc = SLURM_SUCCESS;
uint32_t origin_id;
xassert(job_ptr);
xassert(req_features);
if (!_is_fed_job(job_ptr, &origin_id)) {
sched_info("update_job: not a fed job");
rc = SLURM_ERROR;
} else if ((!IS_JOB_PENDING(job_ptr)) ||
job_ptr->fed_details->cluster_lock) {
rc = ESLURM_JOB_NOT_PENDING;
} else if (!fed_mgr_fed_rec) {
sched_info("update_job: setting ClusterFeatures on a non-active federated cluster for %pJ",
job_ptr);
rc = ESLURM_JOB_NOT_FEDERATED;
} else if (_validate_cluster_features(req_features, NULL)) {
sched_info("update_job: invalid ClusterFeatures for %pJ",
job_ptr);
rc = ESLURM_INVALID_CLUSTER_FEATURE;
} else {
xfree(job_ptr->details->cluster_features);
if (req_features[0] == '\0')
sched_info("update_job: cleared ClusterFeatures for %pJ",
job_ptr);
else if (*req_features)
job_ptr->details->cluster_features =
xstrdup(req_features);
if (fed_mgr_is_origin_job(job_ptr))
_add_remove_sibling_jobs(job_ptr);
}
return rc;
}
static int _reconcile_fed_job(job_record_t *job_ptr, reconcile_sib_t *rec_sib)
{
int i;
bool found_job = false;
job_info_msg_t *remote_jobs_ptr = rec_sib->job_info_msg;
uint32_t origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
uint32_t sibling_id = rec_sib->sibling_id;
uint64_t sibling_bit = FED_SIBLING_BIT(sibling_id);
char *sibling_name = rec_sib->sibling_name;
slurm_job_info_t *remote_job = NULL;
fed_job_info_t *job_info;
xassert(job_ptr);
xassert(remote_jobs_ptr);
/*
* Only look at jobs that:
* 1. originate from the remote sibling
* 2. originate from this cluster
* 3. if the sibling is in the job's viable list.
*/
if (!job_ptr->fed_details ||
!job_ptr->details ||
(job_ptr->details->submit_time >= rec_sib->sync_time) ||
IS_JOB_COMPLETED(job_ptr) || IS_JOB_COMPLETING(job_ptr) ||
((fed_mgr_get_cluster_id(job_ptr->job_id) != sibling_id) &&
(!fed_mgr_is_origin_job(job_ptr)) &&
(!(job_ptr->fed_details->siblings_viable & sibling_bit)))) {
return SLURM_SUCCESS;
}
for (i = 0; i < remote_jobs_ptr->record_count; i++) {
remote_job = &remote_jobs_ptr->job_array[i];
if (job_ptr->job_id == remote_job->job_id) {
found_job = true;
break;
}
}
/* Jobs that originated on the remote sibling */
if (origin_id == sibling_id) {
if (!found_job ||
(remote_job && IS_JOB_COMPLETED(remote_job))) {
/* origin job is missing on remote sibling or is
* completed. Could have been removed from a clean
* start. */
info("%s: origin %pJ is missing (or completed) from origin %s. Killing this copy of the job",
__func__, job_ptr, sibling_name);
job_ptr->bit_flags |= SIB_JOB_FLUSH;
job_signal(job_ptr, SIGKILL, KILL_NO_SIBS, 0, false);
} else {
info("%s: origin %s still has %pJ",
__func__, sibling_name, job_ptr);
}
/* Jobs that are shared between two the siblings -- not originating from
* either one */
} else if (origin_id != fed_mgr_cluster_rec->fed.id) {
if (!found_job) {
/* Only care about jobs that are currently there. */
} else if (IS_JOB_PENDING(job_ptr) && IS_JOB_CANCELLED(remote_job)) {
info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin and sibling were down",
__func__, job_ptr, sibling_name);
job_state_set(job_ptr, JOB_CANCELLED);
job_ptr->start_time = remote_job->start_time;
job_ptr->end_time = remote_job->end_time;
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
job_completion_logger(job_ptr, false);
} else if (IS_JOB_PENDING(job_ptr) &&
(IS_JOB_RUNNING(remote_job) ||
IS_JOB_COMPLETING(remote_job))) {
info("%s: %pJ is running on sibling %s, must have been started while the origin and sibling were down",
__func__, job_ptr, sibling_name);
fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
remote_job->exit_code,
job_ptr->start_time);
/* return now because job_ptr have been free'd */
return SLURM_SUCCESS;
} else if (IS_JOB_PENDING(job_ptr) &&
(IS_JOB_COMPLETED(remote_job))) {
info("%s: %pJ is completed on sibling %s, must have been started and completed while the origin and sibling were down",
__func__, job_ptr, sibling_name);
fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
remote_job->exit_code,
job_ptr->start_time);
/* return now because job_ptr have been free'd */
return SLURM_SUCCESS;
}
/* Origin Jobs */
} else if (!found_job) {
info("%s: didn't find %pJ on cluster %s",
__func__, job_ptr, sibling_name);
/* Remove from active siblings */
if (!(job_ptr->fed_details->siblings_active & sibling_bit)) {
/* The sibling is a viable sibling but the sibling is
* not active and there is no job there. This is ok. */
info("%s: %s is a viable but not active sibling of %pJ. This is ok.",
__func__, sibling_name, job_ptr);
#if 0
/* Don't submit new sibling jobs if they're not found on the cluster. They could
* have been removed while the cluster was down. */
} else if (!job_ptr->fed_details->cluster_lock) {
/* If the origin job isn't locked, then submit a sibling
* to this cluster. */
/* Only do this if it was an active job. Could have been
* removed with --cancel-sibling */
info("%s: %s is an active sibling of %pJ, attempting to submit new sibling job to the cluster.",
__func__, sibling_name, job_ptr);
_prepare_submit_siblings(job_ptr, sibling_bit);
#endif
} else if (job_ptr->fed_details->cluster_lock == sibling_id) {
/* The origin thinks that the sibling was running the
* job. It could have completed while this cluster was
* down or the sibling removed it by clearing out jobs
* (e.g. slurmctld -c). */
info("%s: origin %pJ was running on sibling %s, but it's not there. Assuming that the job completed",
__func__, job_ptr, sibling_name);
fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, 0,
job_ptr->start_time);
} else {
/* The origin job has a lock but it's not on the sibling
* being reconciled. The job could have been started by
* another cluster while the sibling was down. Or the
* original sibling job submission could have failed. Or
* the origin started the job on the different sibling
* before the sibling before the sibling went down and
* came back up (normal situation). */
info("%s: origin %pJ is currently locked by sibling %d, this is ok",
__func__, job_ptr,
job_ptr->fed_details->cluster_lock);
job_ptr->fed_details->siblings_active &= ~sibling_bit;
}
} else if (remote_job) {
info("%s: %pJ found on remote sibling %s state:%s",
__func__, job_ptr, sibling_name,
job_state_string(remote_job->job_state));
if (job_ptr->fed_details->cluster_lock == sibling_id) {
if (IS_JOB_COMPLETE(remote_job)) {
info("%s: %pJ on sibling %s is already completed, completing the origin job",
__func__, job_ptr, sibling_name);
fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
remote_job->exit_code,
job_ptr->start_time);
} else if (IS_JOB_CANCELLED(remote_job)) {
info("%s: %pJ on sibling %s is already cancelled, completing the origin job",
__func__, job_ptr, sibling_name);
fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
remote_job->exit_code,
job_ptr->start_time);
} else if (!IS_JOB_RUNNING(remote_job)) {
/* The job could be pending if it was requeued
* due to node failure */
info("%s: %pJ on sibling %s has job lock but job is not running (state:%s)",
__func__, job_ptr, sibling_name,
job_state_string(remote_job->job_state));
}
} else if (job_ptr->fed_details->cluster_lock) {
/* The remote might have had a sibling job before it
* went away and the origin started another job while it
* was away. The remote job needs to be revoked. */
info("%s: %pJ found on sibling %s but job is locked by cluster id %d",
__func__, job_ptr, sibling_name,
job_ptr->fed_details->cluster_lock);
if (IS_JOB_PENDING(remote_job)) {
info("%s: %pJ is on %s in a pending state but cluster %d has the lock on it -- revoking the remote sibling job",
__func__, job_ptr, sibling_name,
job_ptr->fed_details->cluster_lock);
_revoke_sibling_jobs(
job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
sibling_bit,
job_ptr->start_time);
} else {
/* should this job get cancelled? Would have to
* check cluster_lock before cancelling it to
* make sure that it's not there. */
info("%s: %pJ has a lock on sibling id %d, but found a non-pending job on sibling %s.",
__func__, job_ptr,
job_ptr->fed_details->cluster_lock,
sibling_name);
_revoke_sibling_jobs(
job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
sibling_bit,
job_ptr->start_time);
}
} else {
if (!(job_ptr->fed_details->siblings_active &
sibling_bit)) {
info("%s: %pJ on sibling %s but it wasn't in the active list. Adding to active list.",
__func__, job_ptr, sibling_name);
job_ptr->fed_details->siblings_active |=
sibling_bit;
}
if (IS_JOB_CANCELLED(remote_job)) {
info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin was down",
__func__, job_ptr, sibling_name);
job_state_set(job_ptr, JOB_CANCELLED);
job_ptr->start_time = remote_job->start_time;
job_ptr->end_time = remote_job->end_time;
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
job_completion_logger(job_ptr, false);
} else if (IS_JOB_COMPLETED(remote_job)) {
info("%s: %pJ is completed on sibling %s but the origin cluster wasn't part of starting the job, must have been started while the origin was down",
__func__, job_ptr, sibling_name);
_do_fed_job_complete(job_ptr, JOB_CANCELLED,
remote_job->exit_code,
remote_job->start_time);
} else if (IS_JOB_RUNNING(remote_job) ||
IS_JOB_COMPLETING(remote_job)) {
info("%s: origin doesn't think that %pJ should be running on sibling %s but it is. %s could have started the job while this cluster was down.",
__func__, job_ptr, sibling_name,
sibling_name);
/* Job was started while we were down. Set this
* job to RV and cancel other siblings */
fed_job_info_t *job_info;
slurm_mutex_lock(&fed_job_list_mutex);
if ((job_info =
_find_fed_job_info(job_ptr->job_id))) {
job_info->cluster_lock = sibling_id;
job_ptr->fed_details->cluster_lock =
sibling_id;
/* Remove sibling jobs */
_fed_job_start_revoke(
job_info, job_ptr,
remote_job->start_time);
/* Set job as RV to track running job */
fed_mgr_job_revoke(
job_ptr, false,
JOB_CANCELLED, 0,
remote_job->start_time);
}
slurm_mutex_unlock(&fed_job_list_mutex);
}
/* else all good */
}
}
/* Update job_info with updated siblings */
slurm_mutex_lock(&fed_job_list_mutex);
if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
job_info->siblings_viable =
job_ptr->fed_details->siblings_viable;
job_info->siblings_active =
job_ptr->fed_details->siblings_active;
} else {
error("%s: failed to find fed job info for fed %pJ",
__func__, job_ptr);
}
slurm_mutex_unlock(&fed_job_list_mutex);
return SLURM_SUCCESS;
}
/*
* Sync jobs with the given sibling name.
*
* IN sib_name - name of the sibling to sync with.
*/
static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
time_t sync_time)
{
list_itr_t *itr;
reconcile_sib_t rec_sib = {0};
slurmdb_cluster_rec_t *sib;
job_record_t *job_ptr;
if (!(sib = fed_mgr_get_cluster_by_name((char *)sib_name))) {
error("Couldn't find sibling by name '%s'", sib_name);
return SLURM_ERROR;
}
rec_sib.sibling_id = sib->fed.id;
rec_sib.sibling_name = sib->name;
rec_sib.job_info_msg = job_info_msg;
rec_sib.sync_time = sync_time;
itr = list_iterator_create(job_list);
while ((job_ptr = list_next(itr)))
_reconcile_fed_job(job_ptr, &rec_sib);
list_iterator_destroy(itr);
sib->fed.sync_recvd = true;
return SLURM_SUCCESS;
}
/*
* Remove active sibling from the given job.
*
* IN job_id - job_id of job to remove active sibling from.
* IN sib_name - name of sibling job to remove from active siblings.
* RET SLURM_SUCCESS on success, error code on error.
*/
extern int fed_mgr_remove_active_sibling(uint32_t job_id, char *sib_name)
{
uint32_t origin_id;
job_record_t *job_ptr = NULL;
slurmdb_cluster_rec_t *sibling;
if (!(job_ptr = find_job_record(job_id)))
return ESLURM_INVALID_JOB_ID;
if (!_is_fed_job(job_ptr, &origin_id))
return ESLURM_JOB_NOT_FEDERATED;
if (job_ptr->fed_details->cluster_lock)
return ESLURM_JOB_NOT_PENDING;
if (!(sibling = fed_mgr_get_cluster_by_name(sib_name)))
return ESLURM_INVALID_CLUSTER_NAME;
if (job_ptr->fed_details->siblings_active &
FED_SIBLING_BIT(sibling->fed.id)) {
time_t now = time(NULL);
if (fed_mgr_cluster_rec == sibling)
fed_mgr_job_revoke(job_ptr, false, 0, JOB_CANCELLED,
now);
else
_revoke_sibling_jobs(job_ptr->job_id,
fed_mgr_cluster_rec->fed.id,
FED_SIBLING_BIT(sibling->fed.id),
now);
job_ptr->fed_details->siblings_active &=
~(FED_SIBLING_BIT(sibling->fed.id));
update_job_fed_details(job_ptr);
}
return SLURM_SUCCESS;
}
static int _q_sib_job_submission(slurm_msg_t *msg, bool interactive_job)
{
fed_job_update_info_t *job_update_info = NULL;
sib_msg_t *sib_msg = msg->data;
job_desc_msg_t *job_desc = sib_msg->data;
job_desc->job_id = sib_msg->job_id;
job_desc->fed_siblings_viable = sib_msg->fed_siblings;
job_desc->alloc_node = sib_msg->submit_host;
job_desc->user_id = sib_msg->user_id;
job_desc->group_id = sib_msg->group_id;
/*
* If the job has a dependency, it won't be submitted to siblings
* or it will be revoked from siblings if it became dependent.
* So, the sibling should ignore job_desc->dependency since it's
*/
xfree(job_desc->dependency);
if (interactive_job)
job_desc->resp_host = xstrdup(sib_msg->resp_host);
/* NULL out the data pointer because we are storing the pointer on the
* fed job update queue to be handled later. */
sib_msg->data = NULL;
/* set protocol version to that of the client's version so that
* the job's start_protocol_version is that of the client's and
* not the calling controllers. */
job_update_info = xmalloc(sizeof(fed_job_update_info_t));
job_update_info->job_id = job_desc->job_id;
job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
job_update_info->submit_desc = job_desc;
job_update_info->submit_proto_ver = sib_msg->submit_proto_ver;
if (interactive_job)
job_update_info->type = FED_JOB_SUBMIT_INT;
else
job_update_info->type = FED_JOB_SUBMIT_BATCH;
_append_job_update(job_update_info);
return SLURM_SUCCESS;
}
static int _q_sib_submit_response(slurm_msg_t *msg)
{
int rc = SLURM_SUCCESS;
sib_msg_t *sib_msg;
fed_job_update_info_t *job_update_info = NULL;
xassert(msg);
xassert(msg->conn);
sib_msg = msg->data;
/* if failure then remove from active siblings */
if (sib_msg && sib_msg->return_code) {
log_flag(FEDR, "%s: cluster %s failed to submit sibling JobId=%u. Removing from active_sibs. (error:%d)",
__func__, msg->conn->cluster_name, sib_msg->job_id,
sib_msg->return_code);
job_update_info = xmalloc(sizeof(fed_job_update_info_t));
job_update_info->job_id = sib_msg->job_id;
job_update_info->type = FED_JOB_REMOVE_ACTIVE_SIB_BIT;
job_update_info->siblings_str =
xstrdup(msg->conn->cluster_name);
_append_job_update(job_update_info);
}
return rc;
}
static int _q_sib_job_update(slurm_msg_t *msg, uint32_t uid)
{
sib_msg_t *sib_msg = msg->data;
job_desc_msg_t *job_desc = sib_msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
/* NULL out the data pointer because we are storing the pointer on the
* fed job update queue to be handled later. */
sib_msg->data = NULL;
job_update_info->type = FED_JOB_UPDATE;
job_update_info->submit_desc = job_desc;
job_update_info->job_id = sib_msg->job_id;
job_update_info->uid = uid;
job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
_append_job_update(job_update_info);
return SLURM_SUCCESS;
}
static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid)
{
int rc = SLURM_SUCCESS;
uint32_t req_uid;
sib_msg_t *sib_msg = msg->data;
job_step_kill_msg_t *kill_msg = sib_msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
/* NULL out the data pointer because we are storing the pointer on the
* fed job update queue to be handled later. */
sib_msg->data = NULL;
if (sib_msg->req_uid)
req_uid = sib_msg->req_uid;
else
req_uid = uid;
job_update_info->type = FED_JOB_CANCEL;
job_update_info->job_id = kill_msg->step_id.job_id;
job_update_info->kill_msg = kill_msg;
job_update_info->uid = req_uid;
_append_job_update(job_update_info);
return rc;
}
static int _q_sib_job_complete(slurm_msg_t *msg)
{
int rc = SLURM_SUCCESS;
sib_msg_t *sib_msg = msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
job_update_info->type = FED_JOB_COMPLETE;
job_update_info->job_id = sib_msg->job_id;
job_update_info->job_state = sib_msg->job_state;
job_update_info->start_time = sib_msg->start_time;
job_update_info->return_code = sib_msg->return_code;
_append_job_update(job_update_info);
return rc;
}
static int _q_sib_job_update_response(slurm_msg_t *msg)
{
int rc = SLURM_SUCCESS;
sib_msg_t *sib_msg = msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
job_update_info->type = FED_JOB_UPDATE_RESPONSE;
job_update_info->job_id = sib_msg->job_id;
job_update_info->return_code = sib_msg->return_code;
job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
_append_job_update(job_update_info);
return rc;
}
static int _q_sib_job_requeue(slurm_msg_t *msg, uint32_t uid)
{
int rc = SLURM_SUCCESS;
sib_msg_t *sib_msg = msg->data;
requeue_msg_t *req_ptr = sib_msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
job_update_info->type = FED_JOB_REQUEUE;
job_update_info->job_id = req_ptr->job_id;
job_update_info->flags = req_ptr->flags;
job_update_info->uid = uid;
_append_job_update(job_update_info);
return rc;
}
static int _q_send_job_sync(char *sib_name)
{
int rc = SLURM_SUCCESS;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
job_update_info->type = FED_SEND_JOB_SYNC;
job_update_info->submit_cluster = xstrdup(sib_name);
_append_job_update(job_update_info);
return rc;
}
static int _q_sib_job_sync(slurm_msg_t *msg)
{
int rc = SLURM_SUCCESS;
sib_msg_t *sib_msg = msg->data;
job_info_msg_t *job_info_msg = sib_msg->data;
fed_job_update_info_t *job_update_info =
xmalloc(sizeof(fed_job_update_info_t));
/* NULL out the data pointer because we are storing the pointer on the
* fed job update queue to be handled later. */
sib_msg->data = NULL;
job_update_info->type = FED_JOB_SYNC;
job_update_info->job_info_msg = job_info_msg;
job_update_info->start_time = sib_msg->start_time;
job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
_append_job_update(job_update_info);
return rc;
}
extern int fed_mgr_q_update_origin_dep_msg(slurm_msg_t *msg)
{
dep_update_origin_msg_t *update_deps;
dep_update_origin_msg_t *update_msg = msg->data;
log_flag(FEDR, "%s: Got %s: Job %u",
__func__, rpc_num2string(msg->msg_type), update_msg->job_id);
/* update_msg will get free'd, so copy it */
update_deps = xmalloc(sizeof *update_deps);
update_deps->depend_list = update_msg->depend_list;
update_deps->job_id = update_msg->job_id;
/*
* NULL update_msg->depend_list so it doesn't get free'd; we're
* using it later.
*/
update_msg->depend_list = NULL;
list_append(origin_dep_update_list, update_deps);
slurm_mutex_lock(&origin_dep_update_mutex);
slurm_cond_broadcast(&origin_dep_cond);
slurm_mutex_unlock(&origin_dep_update_mutex);
return SLURM_SUCCESS;
}
extern int fed_mgr_q_dep_msg(slurm_msg_t *msg)
{
dep_msg_t *remote_dependency;
dep_msg_t *dep_msg = msg->data;
log_flag(FEDR, "%s: Got %s: Job %u",
__func__, rpc_num2string(msg->msg_type), dep_msg->job_id);
/* dep_msg will get free'd, so copy it */
remote_dependency = xmalloc(sizeof *remote_dependency);
remote_dependency->job_id = dep_msg->job_id;
remote_dependency->job_name = dep_msg->job_name;
remote_dependency->dependency = dep_msg->dependency;
/* NULL strings so they don't get free'd */
dep_msg->job_name = NULL;
dep_msg->dependency = NULL;
remote_dependency->array_task_id = dep_msg->array_task_id;
remote_dependency->array_job_id = dep_msg->array_job_id;
remote_dependency->is_array = dep_msg->is_array;
remote_dependency->user_id = dep_msg->user_id;
list_append(remote_dep_recv_list, remote_dependency);
slurm_mutex_lock(&remote_dep_recv_mutex);
slurm_cond_broadcast(&remote_dep_cond);
slurm_mutex_unlock(&remote_dep_recv_mutex);
return SLURM_SUCCESS;
}
extern int fed_mgr_q_sib_msg(slurm_msg_t *msg, uint32_t rpc_uid)
{
sib_msg_t *sib_msg = msg->data;
log_flag(FEDR, "%s: sib_msg_type:%s",
__func__, _job_update_type_str(sib_msg->sib_msg_type));
switch (sib_msg->sib_msg_type) {
case FED_JOB_CANCEL:
_q_sib_job_cancel(msg, rpc_uid);
break;
case FED_JOB_COMPLETE:
_q_sib_job_complete(msg);
break;
case FED_JOB_REQUEUE:
_q_sib_job_requeue(msg, rpc_uid);
break;
case FED_JOB_START:
_q_sib_job_start(msg);
break;
case FED_JOB_SUBMIT_BATCH:
_q_sib_job_submission(msg, false);
break;
case FED_JOB_SUBMIT_INT:
_q_sib_job_submission(msg, true);
break;
case FED_JOB_SUBMIT_RESP:
_q_sib_submit_response(msg);
break;
case FED_JOB_SYNC:
_q_sib_job_sync(msg);
break;
case FED_JOB_UPDATE:
_q_sib_job_update(msg, rpc_uid);
break;
case FED_JOB_UPDATE_RESPONSE:
_q_sib_job_update_response(msg);
break;
default:
error("%s: invalid sib_msg_type: %d",
__func__, sib_msg->sib_msg_type);
break;
}
return SLURM_SUCCESS;
}
static int _list_find_not_synced_sib(void *x, void *key)
{
slurmdb_cluster_rec_t *sib = x;
if (sib != fed_mgr_cluster_rec &&
sib->fed.send &&
((persist_conn_t *) sib->fed.send)->tls_conn &&
!sib->fed.sync_recvd)
return 1;
return 0;
}
extern bool fed_mgr_sibs_synced(void)
{
slurmdb_cluster_rec_t *sib;
int dummy = 1;
if (!fed_mgr_fed_rec)
return true;
if ((sib = list_find_first(fed_mgr_fed_rec->cluster_list,
_list_find_not_synced_sib, &dummy))) {
debug("%s: sibling %s up but not synced yet",
__func__, sib->name);
return false;
}
return true;
}
extern void fed_mgr_test_remote_dependencies(void)
{
int rc;
uint32_t origin_id;
bool was_changed;
job_record_t *job_ptr;
list_itr_t *itr;
slurmdb_cluster_rec_t *origin;
xassert(verify_lock(JOB_LOCK, READ_LOCK));
xassert(verify_lock(FED_LOCK, READ_LOCK));
if (!list_count(remote_dep_job_list) || !fed_mgr_fed_rec ||
!fed_mgr_cluster_rec)
return;
slurm_mutex_lock(&dep_job_list_mutex);
itr = list_iterator_create(remote_dep_job_list);
while ((job_ptr = list_next(itr))) {
origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
origin = fed_mgr_get_cluster_by_id(origin_id);
if (!origin) {
/*
* The origin probably left the federation. If it comes
* back there's no guarantee it will have the same
* cluster id as before.
*/
log_flag(FEDR, "%s: Couldn't find the origin cluster (id %u); it probably left the federation. Stop testing dependency for %pJ.",
__func__, origin_id, job_ptr);
list_delete_item(itr);
continue;
}
rc = test_job_dependency(job_ptr, &was_changed);
if (rc == LOCAL_DEPEND) {
if (was_changed) {
log_flag(FEDR, "%s: %pJ has at least 1 local dependency left.",
__func__, job_ptr);
_update_origin_job_dep(job_ptr, origin);
}
} else if (rc == FAIL_DEPEND) {
log_flag(FEDR, "%s: %pJ test_job_dependency() failed, dependency never satisfied.",
__func__, job_ptr);
_update_origin_job_dep(job_ptr, origin);
list_delete_item(itr);
} else { /* ((rc == REMOTE_DEPEND) || (rc == NO_DEPEND)) */
log_flag(FEDR, "%s: %pJ has no more dependencies left on this cluster.",
__func__, job_ptr);
_update_origin_job_dep(job_ptr, origin);
list_delete_item(itr);
}
}
list_iterator_destroy(itr);
slurm_mutex_unlock(&dep_job_list_mutex);
}