| /*****************************************************************************\ |
| * fed_mgr.c - functions for federations |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "config.h" |
| |
| #include <pthread.h> |
| #include <signal.h> |
| |
| #if HAVE_SYS_PRCTL_H |
| # include <sys/prctl.h> |
| #endif |
| |
| #include "src/common/list.h" |
| #include "src/common/macros.h" |
| #include "src/common/parse_time.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/slurm_protocol_pack.h" |
| #include "src/common/slurmdbd_defs.h" |
| #include "src/common/state_save.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| #include "src/slurmctld/fed_mgr.h" |
| #include "src/slurmctld/job_scheduler.h" |
| #include "src/slurmctld/locks.h" |
| #include "src/slurmctld/proc_req.h" |
| #include "src/slurmctld/slurmctld.h" |
| #include "src/slurmctld/state_save.h" |
| #include "src/slurmdbd/read_config.h" |
| #include "src/stepmgr/srun_comm.h" |
| |
| #include "src/interfaces/conn.h" |
| |
| #define FED_MGR_STATE_FILE "fed_mgr_state" |
| #define FED_MGR_CLUSTER_ID_BEGIN 26 |
| #define TEST_REMOTE_DEP_FREQ 30 /* seconds */ |
| |
| #define FED_SIBLING_BIT(x) ((uint64_t)1 << (x - 1)) |
| |
| slurmdb_federation_rec_t *fed_mgr_fed_rec = NULL; |
| slurmdb_cluster_rec_t *fed_mgr_cluster_rec = NULL; |
| |
| static pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_t agent_thread_id = (pthread_t) 0; |
| static int agent_queue_size = 0; |
| |
| static pthread_cond_t job_watch_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t job_watch_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static bool job_watch_thread_running = false; |
| static bool stop_job_watch_thread = false; |
| |
| static bool inited = false; |
| static pthread_mutex_t open_send_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_mutex_t update_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| static list_t *fed_job_list = NULL; |
| static list_t *fed_job_update_list = NULL; |
| static pthread_t fed_job_update_thread_id = (pthread_t) 0; |
| static pthread_mutex_t fed_job_list_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t job_update_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t job_update_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| static list_t *remote_dep_recv_list = NULL; |
| static pthread_t remote_dep_thread_id = (pthread_t) 0; |
| static pthread_cond_t remote_dep_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t remote_dep_recv_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| static list_t *remote_dep_job_list = NULL; |
| static pthread_t dep_job_thread_id = (pthread_t) 0; |
| static pthread_mutex_t dep_job_list_mutex = PTHREAD_MUTEX_INITIALIZER; |
| static pthread_cond_t test_dep_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t test_dep_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| static list_t *origin_dep_update_list = NULL; |
| static pthread_t origin_dep_thread_id = (pthread_t) 0; |
| static pthread_cond_t origin_dep_cond = PTHREAD_COND_INITIALIZER; |
| static pthread_mutex_t origin_dep_update_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| typedef struct { |
| buf_t *buffer; |
| uint32_t job_id; |
| time_t last_try; |
| int last_defer; |
| uint16_t msg_type; |
| } agent_queue_t; |
| |
| enum fed_job_update_type { |
| FED_JOB_NONE = 0, |
| FED_JOB_CANCEL, |
| FED_JOB_COMPLETE, |
| FED_JOB_REMOVE_ACTIVE_SIB_BIT, |
| FED_JOB_REQUEUE, |
| FED_JOB_START, |
| FED_JOB_SUBMIT_BATCH, |
| FED_JOB_SUBMIT_INT, |
| FED_JOB_SUBMIT_RESP, |
| FED_JOB_SYNC, |
| FED_JOB_UPDATE, |
| FED_JOB_UPDATE_RESPONSE, |
| FED_SEND_JOB_SYNC, |
| }; |
| |
| typedef struct { |
| uint32_t cluster_lock; |
| uint32_t flags; |
| uint32_t job_id; |
| job_info_msg_t *job_info_msg; |
| uint32_t job_state; |
| job_step_kill_msg_t *kill_msg; |
| bool requeue; |
| uint32_t return_code; |
| uint64_t siblings_active; |
| uint64_t siblings_viable; |
| char *siblings_str; |
| time_t start_time; |
| char *submit_cluster; |
| job_desc_msg_t *submit_desc; |
| uint16_t submit_proto_ver; |
| uint32_t type; |
| uid_t uid; |
| } fed_job_update_info_t; |
| |
| typedef struct { |
| uint32_t cluster_lock; |
| uint32_t job_id; |
| uint64_t siblings_active; |
| uint64_t siblings_viable; |
| uint32_t updating_sibs[MAX_FED_CLUSTERS + 1]; |
| time_t updating_time[MAX_FED_CLUSTERS + 1]; |
| } fed_job_info_t; |
| |
| /* Local Structs */ |
| typedef struct { |
| job_info_msg_t *job_info_msg; |
| uint32_t sibling_id; |
| char *sibling_name; |
| time_t sync_time; |
| } reconcile_sib_t; |
| |
| /* Local Prototypes */ |
| static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id); |
| static uint64_t _get_all_sibling_bits(); |
| static int _validate_cluster_features(char *spec_features, |
| uint64_t *cluster_bitmap); |
| static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap); |
| static void _leave_federation(void); |
| static int _q_send_job_sync(char *sib_name); |
| static slurmdb_federation_rec_t *_state_load(char *state_save_location); |
| static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg, |
| time_t sync_time); |
| static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid); |
| |
| static char *_job_update_type_str(enum fed_job_update_type type) |
| { |
| switch (type) { |
| case FED_JOB_COMPLETE: |
| return "FED_JOB_COMPLETE"; |
| case FED_JOB_CANCEL: |
| return "FED_JOB_CANCEL"; |
| case FED_JOB_REMOVE_ACTIVE_SIB_BIT: |
| return "FED_JOB_REMOVE_ACTIVE_SIB_BIT"; |
| case FED_JOB_REQUEUE: |
| return "FED_JOB_REQUEUE"; |
| case FED_JOB_START: |
| return "FED_JOB_START"; |
| case FED_JOB_SUBMIT_BATCH: |
| return "FED_JOB_SUBMIT_BATCH"; |
| case FED_JOB_SUBMIT_INT: |
| return "FED_JOB_SUBMIT_INT"; |
| case FED_JOB_SUBMIT_RESP: |
| return "FED_JOB_SUBMIT_RESP"; |
| case FED_JOB_SYNC: |
| return "FED_JOB_SYNC"; |
| case FED_JOB_UPDATE: |
| return "FED_JOB_UPDATE"; |
| case FED_JOB_UPDATE_RESPONSE: |
| return "FED_JOB_UPDATE_RESPONSE"; |
| case FED_SEND_JOB_SYNC: |
| return "FED_SEND_JOB_SYNC"; |
| default: |
| return "?"; |
| } |
| } |
| |
| static void _append_job_update(fed_job_update_info_t *job_update_info) |
| { |
| list_append(fed_job_update_list, job_update_info); |
| |
| slurm_mutex_lock(&job_update_mutex); |
| slurm_cond_broadcast(&job_update_cond); |
| slurm_mutex_unlock(&job_update_mutex); |
| } |
| |
| /* Return true if communication failure should be logged. Only log failures |
| * every 10 minutes to avoid filling logs */ |
| static bool _comm_fail_log(slurmdb_cluster_rec_t *cluster) |
| { |
| time_t now = time(NULL); |
| time_t old = now - 600; /* Log failures once every 10 mins */ |
| |
| if (cluster->comm_fail_time < old) { |
| cluster->comm_fail_time = now; |
| return true; |
| } |
| return false; |
| } |
| |
| static int _close_controller_conn(slurmdb_cluster_rec_t *cluster) |
| { |
| int rc = SLURM_SUCCESS; |
| // persist_conn_t *persist_conn = NULL; |
| |
| xassert(cluster); |
| slurm_mutex_lock(&cluster->lock); |
| log_flag(FEDR, "closing sibling conn to %s", cluster->name); |
| |
| /* The recv free of this is handled directly in the persist_conn code, |
| * don't free it here */ |
| // slurm_persist_conn_destroy(cluster->fed.recv); |
| cluster->fed.recv = NULL; |
| slurm_persist_conn_destroy(cluster->fed.send); |
| cluster->fed.send = NULL; |
| |
| log_flag(FEDR, "closed sibling conn to %s", cluster->name); |
| slurm_mutex_unlock(&cluster->lock); |
| |
| return rc; |
| } |
| |
| /* Get list of jobs that originated from this cluster and the remote sibling and |
| * that are viable between the two siblings. |
| * Originating here: so that the remote can determine if the tracker job is gone |
| * Originating sib: so that the remote verify jobs are where they're supposed to |
| * be. If the sibling doesn't find a job, the sibling can resubmit the job or |
| * take other actions. |
| * Viable sib: because the origin might be down and the job was started or |
| * cancelled while the origin was down. |
| * |
| * Only get jobs that were submitted prior to sync_time |
| */ |
| static list_t *_get_sync_jobid_list(uint32_t sib_id, time_t sync_time) |
| { |
| list_t *jobids = NULL; |
| list_itr_t *job_itr; |
| job_record_t *job_ptr; |
| |
| jobids = list_create(xfree_ptr); |
| |
| /* |
| * Only look at jobs that: |
| * 1. originate from the remote sibling |
| * 2. originate from this cluster |
| * 3. if the sibling is in the job's viable list. |
| */ |
| job_itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(job_itr))) { |
| uint32_t cluster_id = fed_mgr_get_cluster_id(job_ptr->job_id); |
| if (job_ptr->fed_details && |
| (job_ptr->details && |
| (job_ptr->details->submit_time < sync_time)) && |
| ((cluster_id == sib_id) || |
| (cluster_id == fed_mgr_cluster_rec->fed.id) || |
| (job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(sib_id)))) { |
| |
| uint32_t *tmp = xmalloc(sizeof(uint32_t)); |
| *tmp = job_ptr->job_id; |
| list_append(jobids, tmp); |
| } |
| } |
| list_iterator_destroy(job_itr); |
| |
| return jobids; |
| } |
| |
| static int _open_controller_conn(slurmdb_cluster_rec_t *cluster, bool locked) |
| { |
| int rc; |
| persist_conn_t *persist_conn = NULL; |
| static int timeout = -1; |
| |
| if (timeout < 0) |
| timeout = slurm_conf.msg_timeout * 1000; |
| |
| if (cluster == fed_mgr_cluster_rec) { |
| info("%s: hey! how did we get here with ourselves?", __func__); |
| return SLURM_ERROR; |
| } |
| |
| if (!locked) |
| slurm_mutex_lock(&cluster->lock); |
| |
| if (!cluster->control_host || !cluster->control_host[0] || |
| !cluster->control_port) { |
| if (cluster->fed.recv) { |
| persist_conn = cluster->fed.recv; |
| cluster->control_port = persist_conn->rem_port; |
| xfree(cluster->control_host); |
| cluster->control_host = xstrdup(persist_conn->rem_host); |
| } else { |
| log_flag(FEDR, "%s: Sibling cluster %s doesn't appear to be up yet, skipping", |
| __func__, cluster->name); |
| if (!locked) |
| slurm_mutex_unlock(&cluster->lock); |
| return SLURM_ERROR; |
| } |
| } |
| |
| log_flag(FEDR, "opening sibling conn to %s[%s:%u]", |
| cluster->name, cluster->control_host, cluster->control_port); |
| |
| if (!cluster->fed.send) { |
| persist_conn = xmalloc(sizeof(*persist_conn)); |
| |
| cluster->fed.send = persist_conn; |
| |
| /* Since this connection is coming from us, make it so ;) */ |
| persist_conn->cluster_name = xstrdup(slurm_conf.cluster_name); |
| persist_conn->persist_type = PERSIST_TYPE_FED; |
| persist_conn->my_port = slurm_conf.slurmctld_port; |
| persist_conn->rem_host = xstrdup(cluster->control_host); |
| persist_conn->rem_port = cluster->control_port; |
| persist_conn->version = cluster->rpc_version; |
| persist_conn->shutdown = &slurmctld_config.shutdown_time; |
| persist_conn->timeout = timeout; /* don't put this as 0 it |
| * could cause deadlock */ |
| } else { |
| persist_conn = cluster->fed.send; |
| |
| /* Perhaps a backup came up, so don't assume it was the same |
| * host or port we had before. |
| */ |
| xfree(persist_conn->rem_host); |
| persist_conn->rem_host = xstrdup(cluster->control_host); |
| persist_conn->rem_port = cluster->control_port; |
| } |
| |
| persist_conn->r_uid = SLURM_AUTH_UID_ANY; |
| |
| rc = slurm_persist_conn_open(persist_conn); |
| if (rc != SLURM_SUCCESS) { |
| if (_comm_fail_log(cluster)) { |
| error("fed_mgr: Unable to open connection to cluster %s using host %s(%u)", |
| cluster->name, |
| persist_conn->rem_host, persist_conn->rem_port); |
| } |
| } else { |
| log_flag(FEDR, "opened sibling conn to %s:%d", |
| cluster->name, |
| conn_g_get_fd(persist_conn->tls_conn)); |
| } |
| |
| if (!locked) |
| slurm_mutex_unlock(&cluster->lock); |
| |
| return rc; |
| } |
| |
| /* The cluster->lock should be locked before this is called */ |
| static int _check_send(slurmdb_cluster_rec_t *cluster) |
| { |
| persist_conn_t *send = cluster->fed.send; |
| |
| if (!send || !send->tls_conn) { |
| return _open_controller_conn(cluster, true); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* fed_mgr read lock needs to be set before coming in here, |
| * not the write lock */ |
| static void _open_persist_sends(void) |
| { |
| list_itr_t *itr; |
| slurmdb_cluster_rec_t *cluster = NULL; |
| persist_conn_t *send = NULL; |
| |
| if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) { |
| log_flag(FEDR, "bailing on empty cluster list"); |
| return; |
| } |
| |
| /* This open_send_mutex will make this like a write lock since at the |
| * same time we are sending out these open requests the other slurmctlds |
| * will be replying and needing to get to the structures. If we just |
| * used the fed_mgr write lock it would cause deadlock. |
| */ |
| slurm_mutex_lock(&open_send_mutex); |
| itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((cluster = list_next(itr))) { |
| if (cluster == fed_mgr_cluster_rec) |
| continue; |
| |
| send = cluster->fed.send; |
| if (!send || !send->tls_conn) |
| _open_controller_conn(cluster, false); |
| } |
| list_iterator_destroy(itr); |
| slurm_mutex_unlock(&open_send_mutex); |
| } |
| |
| static int _send_recv_msg(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req, |
| slurm_msg_t *resp, bool locked) |
| { |
| int rc; |
| |
| xassert(cluster); |
| xassert(req); |
| xassert(resp); |
| |
| slurm_msg_t_init(resp); |
| |
| if (!locked) |
| slurm_mutex_lock(&cluster->lock); |
| |
| rc = _check_send(cluster); |
| if ((rc == SLURM_SUCCESS) && cluster->fed.send) { |
| resp->conn = req->conn = cluster->fed.send; |
| rc = slurm_send_recv_msg(req->conn->tls_conn, req, resp, 0); |
| } |
| if (!locked) |
| slurm_mutex_unlock(&cluster->lock); |
| |
| return rc; |
| } |
| |
| /* Free buf_t record from a list */ |
| static void _ctld_free_list_msg(void *x) |
| { |
| agent_queue_t *agent_queue_ptr = (agent_queue_t *) x; |
| if (agent_queue_ptr) { |
| FREE_NULL_BUFFER(agent_queue_ptr->buffer); |
| xfree(agent_queue_ptr); |
| } |
| } |
| |
| static int _queue_rpc(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req, |
| uint32_t job_id, bool locked) |
| { |
| agent_queue_t *agent_rec; |
| buf_t *buf; |
| |
| if (!cluster->send_rpc) |
| cluster->send_rpc = list_create(_ctld_free_list_msg); |
| |
| buf = init_buf(1024); |
| pack16(req->msg_type, buf); |
| if (pack_msg(req, buf) != SLURM_SUCCESS) { |
| error("%s: failed to pack msg_type:%u", |
| __func__, req->msg_type); |
| FREE_NULL_BUFFER(buf); |
| return SLURM_ERROR; |
| } |
| |
| /* Queue the RPC and notify the agent of new work */ |
| agent_rec = xmalloc(sizeof(agent_queue_t)); |
| agent_rec->buffer = buf; |
| agent_rec->job_id = job_id; |
| agent_rec->msg_type = req->msg_type; |
| list_append(cluster->send_rpc, agent_rec); |
| slurm_mutex_lock(&agent_mutex); |
| agent_queue_size++; |
| slurm_cond_broadcast(&agent_cond); |
| slurm_mutex_unlock(&agent_mutex); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * close all sibling conns |
| * must lock before entering. |
| */ |
| static int _close_sibling_conns(void) |
| { |
| list_itr_t *itr; |
| slurmdb_cluster_rec_t *cluster; |
| |
| if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) |
| goto fini; |
| |
| itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((cluster = list_next(itr))) { |
| if (cluster == fed_mgr_cluster_rec) |
| continue; |
| _close_controller_conn(cluster); |
| } |
| list_iterator_destroy(itr); |
| |
| fini: |
| return SLURM_SUCCESS; |
| } |
| |
| static void _mark_self_as_drained(void) |
| { |
| list_t *ret_list = NULL; |
| slurmdb_cluster_cond_t cluster_cond; |
| slurmdb_cluster_rec_t cluster_rec; |
| |
| log_flag(FEDR, "%s: setting cluster fedstate to DRAINED", __func__); |
| |
| slurmdb_init_cluster_cond(&cluster_cond, false); |
| slurmdb_init_cluster_rec(&cluster_rec, false); |
| |
| cluster_cond.cluster_list = list_create(NULL); |
| list_append(cluster_cond.cluster_list, fed_mgr_cluster_rec->name); |
| |
| cluster_rec.fed.state = CLUSTER_FED_STATE_INACTIVE | |
| (fed_mgr_cluster_rec->fed.state & ~CLUSTER_FED_STATE_BASE); |
| |
| ret_list = acct_storage_g_modify_clusters(acct_db_conn, |
| slurm_conf.slurm_user_id, |
| &cluster_cond, &cluster_rec); |
| if (!ret_list || !list_count(ret_list)) { |
| error("Failed to set cluster state to drained"); |
| } |
| |
| FREE_NULL_LIST(cluster_cond.cluster_list); |
| FREE_NULL_LIST(ret_list); |
| } |
| |
| static void _remove_self_from_federation(void) |
| { |
| list_t *ret_list = NULL; |
| slurmdb_federation_cond_t fed_cond; |
| slurmdb_federation_rec_t fed_rec; |
| slurmdb_cluster_rec_t cluster_rec; |
| |
| log_flag(FEDR, "%s: removing self from federation %s", |
| __func__, fed_mgr_fed_rec->name); |
| |
| slurmdb_init_federation_cond(&fed_cond, false); |
| slurmdb_init_federation_rec(&fed_rec, false); |
| slurmdb_init_cluster_rec(&cluster_rec, false); |
| |
| fed_cond.federation_list = list_create(NULL); |
| list_append(fed_cond.federation_list, fed_mgr_fed_rec->name); |
| |
| cluster_rec.name = xstrdup_printf("-%s", fed_mgr_cluster_rec->name); |
| fed_rec.cluster_list = list_create(NULL); |
| list_append(fed_rec.cluster_list, &cluster_rec); |
| |
| ret_list = acct_storage_g_modify_federations(acct_db_conn, |
| slurm_conf.slurm_user_id, |
| &fed_cond, &fed_rec); |
| if (!ret_list || !list_count(ret_list)) { |
| error("Failed to remove federation from list"); |
| } |
| |
| FREE_NULL_LIST(ret_list); |
| FREE_NULL_LIST(fed_cond.federation_list); |
| FREE_NULL_LIST(fed_rec.cluster_list); |
| xfree(cluster_rec.name); |
| |
| slurmctld_config.scheduling_disabled = false; |
| slurmctld_config.submissions_disabled = false; |
| |
| _leave_federation(); |
| } |
| |
| static int _foreach_job_completed(void *object, void *arg) |
| { |
| job_record_t *job_ptr = (job_record_t *) object; |
| |
| if (IS_JOB_COMPLETED(job_ptr)) |
| return SLURM_SUCCESS; |
| |
| return SLURM_ERROR; |
| } |
| |
| static int _foreach_job_no_requeue(void *object, void *arg) |
| { |
| job_record_t *job_ptr = (job_record_t *) object; |
| |
| if (job_ptr->details) |
| job_ptr->details->requeue = 0; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static void *_job_watch_thread(void *arg) |
| { |
| struct timespec ts = {0, 0}; |
| slurmctld_lock_t job_write_fed_write_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_jobw", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, "fed_jobw"); |
| } |
| #endif |
| log_flag(FEDR, "%s: started job_watch thread", __func__); |
| |
| while (!slurmctld_config.shutdown_time && !stop_job_watch_thread) { |
| int tot_jobs, comp_jobs; |
| |
| slurm_mutex_lock(&job_watch_mutex); |
| if (!slurmctld_config.shutdown_time && !stop_job_watch_thread) { |
| ts.tv_sec = time(NULL) + 5; |
| slurm_cond_timedwait(&job_watch_cond, |
| &job_watch_mutex, &ts); |
| } |
| slurm_mutex_unlock(&job_watch_mutex); |
| |
| if (slurmctld_config.shutdown_time || stop_job_watch_thread) |
| break; |
| |
| lock_slurmctld(job_write_fed_write_lock); |
| |
| if (!fed_mgr_cluster_rec) { |
| /* not part of the federation anymore */ |
| unlock_slurmctld(job_write_fed_write_lock); |
| break; |
| } |
| |
| if ((tot_jobs = list_count(job_list)) != |
| (comp_jobs = list_for_each(job_list, _foreach_job_completed, |
| NULL))) { |
| /* list_for_each negates the count if failed. */ |
| int remaining_jobs = tot_jobs + comp_jobs + 1; |
| log_flag(FEDR, "%s: at least %d remaining jobs before being drained and/or removed from the federation", |
| __func__, remaining_jobs); |
| } else { |
| if (fed_mgr_cluster_rec->fed.state & |
| CLUSTER_FED_STATE_REMOVE) { |
| /* prevent federated jobs from being requeued */ |
| list_for_each(job_list, _foreach_job_no_requeue, |
| NULL); |
| _remove_self_from_federation(); |
| } else if (fed_mgr_cluster_rec->fed.state & |
| CLUSTER_FED_STATE_DRAIN) { |
| _mark_self_as_drained(); |
| } |
| |
| unlock_slurmctld(job_write_fed_write_lock); |
| |
| break; |
| } |
| |
| unlock_slurmctld(job_write_fed_write_lock); |
| } |
| |
| slurm_mutex_lock(&job_watch_mutex); |
| job_watch_thread_running = false; |
| slurm_mutex_unlock(&job_watch_mutex); |
| |
| log_flag(FEDR, "%s: exiting job watch thread", __func__); |
| |
| return NULL; |
| } |
| |
| static void _spawn_job_watch_thread() |
| { |
| slurm_mutex_lock(&job_watch_mutex); |
| if (!job_watch_thread_running) { |
| /* Detach the thread since it will exit once the cluster is |
| * drained or removed. */ |
| stop_job_watch_thread = false; |
| job_watch_thread_running = true; |
| slurm_thread_create_detached(_job_watch_thread, NULL); |
| } else { |
| info("a job_watch_thread already exists"); |
| } |
| slurm_mutex_unlock(&job_watch_mutex); |
| } |
| |
| static void _remove_job_watch_thread() |
| { |
| slurm_mutex_lock(&job_watch_mutex); |
| if (job_watch_thread_running) { |
| stop_job_watch_thread = true; |
| slurm_cond_broadcast(&job_watch_cond); |
| } |
| slurm_mutex_unlock(&job_watch_mutex); |
| } |
| |
| static int _clear_recv_conns(void *object, void *arg) |
| { |
| slurmdb_cluster_rec_t *cluster = (slurmdb_cluster_rec_t *)object; |
| cluster->fed.recv = NULL; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Must have FED unlocked prior to entering |
| */ |
| static void _fed_mgr_ptr_init(slurmdb_federation_rec_t *db_fed, |
| slurmdb_cluster_rec_t *cluster, |
| uint64_t *added_clusters) |
| { |
| list_itr_t *c_itr; |
| slurmdb_cluster_rec_t *tmp_cluster, *db_cluster; |
| uint32_t cluster_state; |
| int base_state; |
| bool drain_flag; |
| |
| slurmctld_lock_t fed_write_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| xassert(cluster); |
| |
| log_flag(FEDR, "Joining federation %s", db_fed->name); |
| |
| lock_slurmctld(fed_write_lock); |
| if (fed_mgr_fed_rec) { |
| /* we are already part of a federation, preserve existing |
| * connections */ |
| c_itr = list_iterator_create(db_fed->cluster_list); |
| while ((db_cluster = list_next(c_itr))) { |
| if (!xstrcmp(db_cluster->name, |
| slurm_conf.cluster_name)) { |
| fed_mgr_cluster_rec = db_cluster; |
| continue; |
| } |
| if (!(tmp_cluster = |
| fed_mgr_get_cluster_by_name(db_cluster->name))) { |
| *added_clusters |= |
| FED_SIBLING_BIT(db_cluster->fed.id); |
| /* don't worry about destroying the connection |
| * here. It will happen below when we free |
| * fed_mgr_fed_rec (automagically). |
| */ |
| continue; |
| } |
| slurm_mutex_lock(&tmp_cluster->lock); |
| /* transfer over the connections we already have */ |
| db_cluster->fed.send = tmp_cluster->fed.send; |
| tmp_cluster->fed.send = NULL; |
| db_cluster->fed.recv = tmp_cluster->fed.recv; |
| tmp_cluster->fed.recv = NULL; |
| db_cluster->send_rpc = tmp_cluster->send_rpc; |
| tmp_cluster->send_rpc = NULL; |
| db_cluster->fed.sync_sent = |
| tmp_cluster->fed.sync_sent; |
| db_cluster->fed.sync_recvd = |
| tmp_cluster->fed.sync_recvd; |
| slurm_mutex_unlock(&tmp_cluster->lock); |
| |
| list_delete_all(fed_mgr_fed_rec->cluster_list, |
| slurmdb_find_cluster_in_list, |
| db_cluster->name); |
| } |
| list_iterator_destroy(c_itr); |
| |
| /* Remove any existing clusters that were part of the federation |
| * before and are not now. Don't free the recv connection now, |
| * it will get destroyed when the recv thread exits. */ |
| list_for_each(fed_mgr_fed_rec->cluster_list, _clear_recv_conns, |
| NULL); |
| slurmdb_destroy_federation_rec(fed_mgr_fed_rec); |
| } else |
| fed_mgr_cluster_rec = cluster; |
| |
| fed_mgr_fed_rec = db_fed; |
| |
| /* Set scheduling and submissions states */ |
| cluster_state = fed_mgr_cluster_rec->fed.state; |
| base_state = (cluster_state & CLUSTER_FED_STATE_BASE); |
| drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN); |
| |
| unlock_slurmctld(fed_write_lock); |
| |
| if (drain_flag) { |
| slurmctld_config.scheduling_disabled = false; |
| slurmctld_config.submissions_disabled = true; |
| |
| /* INACTIVE + DRAIN == DRAINED (already) */ |
| if (base_state == CLUSTER_FED_STATE_ACTIVE) |
| _spawn_job_watch_thread(); |
| |
| } else if (base_state == CLUSTER_FED_STATE_ACTIVE) { |
| slurmctld_config.scheduling_disabled = false; |
| slurmctld_config.submissions_disabled = false; |
| } else if (base_state == CLUSTER_FED_STATE_INACTIVE) { |
| slurmctld_config.scheduling_disabled = true; |
| slurmctld_config.submissions_disabled = true; |
| } |
| if (!drain_flag) |
| _remove_job_watch_thread(); |
| } |
| |
| /* |
| * Must have FED write lock prior to entering |
| */ |
| static void _leave_federation(void) |
| { |
| if (!fed_mgr_fed_rec) |
| return; |
| |
| log_flag(FEDR, "Leaving federation %s", fed_mgr_fed_rec->name); |
| |
| _close_sibling_conns(); |
| _remove_job_watch_thread(); |
| slurmdb_destroy_federation_rec(fed_mgr_fed_rec); |
| fed_mgr_fed_rec = NULL; |
| fed_mgr_cluster_rec = NULL; |
| } |
| |
| static void _persist_callback_fini(void *arg) |
| { |
| persist_conn_t *persist_conn = arg; |
| slurmdb_cluster_rec_t *cluster; |
| slurmctld_lock_t fed_write_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| /* If we are shutting down just return or you will get deadlock since |
| * all these locks are already locked. |
| */ |
| if (!persist_conn || *persist_conn->shutdown) |
| return; |
| lock_slurmctld(fed_write_lock); |
| |
| /* shutting down */ |
| if (!fed_mgr_fed_rec) { |
| unlock_slurmctld(fed_write_lock); |
| return; |
| } |
| |
| if (!(cluster = |
| fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) { |
| info("Couldn't find cluster %s?", |
| persist_conn->cluster_name); |
| unlock_slurmctld(fed_write_lock); |
| return; |
| } |
| |
| slurm_mutex_lock(&cluster->lock); |
| |
| /* This will get handled at the end of the thread, don't free it here */ |
| cluster->fed.recv = NULL; |
| // persist_conn = cluster->fed.recv; |
| // slurm_persist_conn_close(persist_conn); |
| |
| persist_conn = cluster->fed.send; |
| if (persist_conn) { |
| log_flag(FEDR, "Closing send to sibling cluster %s", |
| cluster->name); |
| slurm_persist_conn_destroy(persist_conn); |
| cluster->fed.send = NULL; |
| } |
| cluster->fed.sync_recvd = false; |
| cluster->fed.sync_sent = false; |
| |
| slurm_mutex_unlock(&cluster->lock); |
| unlock_slurmctld(fed_write_lock); |
| } |
| |
| static void _join_federation(slurmdb_federation_rec_t *fed, |
| slurmdb_cluster_rec_t *cluster, |
| uint64_t *added_clusters) |
| { |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| _fed_mgr_ptr_init(fed, cluster, added_clusters); |
| |
| /* We must open the connections after we get out of the |
| * write_lock or we will end up in deadlock. |
| */ |
| lock_slurmctld(fed_read_lock); |
| _open_persist_sends(); |
| unlock_slurmctld(fed_read_lock); |
| } |
| |
| static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| job_desc_msg_t *data, uid_t uid) |
| { |
| int rc; |
| slurm_msg_t req_msg, tmp_msg; |
| sib_msg_t sib_msg; |
| buf_t *buffer; |
| |
| slurm_msg_t_init(&tmp_msg); |
| tmp_msg.msg_type = REQUEST_UPDATE_JOB; |
| tmp_msg.data = data; |
| tmp_msg.protocol_version = conn->rpc_version; |
| |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(&tmp_msg, buffer); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_UPDATE; |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_type = tmp_msg.msg_type; |
| sib_msg.data_version = tmp_msg.protocol_version; |
| sib_msg.req_uid = uid; |
| sib_msg.job_id = job_id; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = tmp_msg.protocol_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, 0, false); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| } |
| |
| static int _persist_update_job_resp(slurmdb_cluster_rec_t *conn, |
| uint32_t job_id, uint32_t return_code) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| sib_msg_t sib_msg; |
| |
| slurm_msg_t_init(&req_msg); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_UPDATE_RESPONSE; |
| sib_msg.job_id = job_id; |
| sib_msg.return_code = return_code; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = conn->rpc_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| return rc; |
| } |
| |
| /* |
| * Remove a sibling job that won't be scheduled |
| * |
| * IN conn - sibling connection |
| * IN job_id - the job's id |
| * IN job_state - state of job. |
| * IN return_code - return code of job. |
| * IN start_time - time the fed job started |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| static int _persist_fed_job_revoke(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| uint32_t job_state, uint32_t return_code, |
| time_t start_time) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| sib_msg_t sib_msg; |
| |
| if (!conn->fed.send || |
| (!((persist_conn_t *) conn->fed.send)->tls_conn)) |
| return SLURM_SUCCESS; |
| |
| slurm_msg_t_init(&req_msg); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_COMPLETE; |
| sib_msg.job_id = job_id; |
| sib_msg.job_state = job_state; |
| sib_msg.start_time = start_time; |
| sib_msg.return_code = return_code; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = conn->rpc_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| return rc; |
| } |
| |
| static int _persist_fed_job_response(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| uint32_t return_code) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| sib_msg_t sib_msg; |
| |
| slurm_msg_t_init(&req_msg); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_SUBMIT_RESP; |
| sib_msg.job_id = job_id; |
| sib_msg.return_code = return_code; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = conn->rpc_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| return rc; |
| } |
| |
| /* |
| * Grab the fed lock on the sibling job. |
| * |
| * This message doesn't need to be queued because the other side just locks the |
| * fed_job_list, checks and gets out -- doesn't need the internal locks. |
| * |
| * IN conn - sibling connection |
| * IN job_id - the job's id |
| * IN cluster_id - cluster id of the cluster locking |
| * IN do_lock - true == lock, false == unlock |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| static int _persist_fed_job_lock_bool(slurmdb_cluster_rec_t *conn, |
| uint32_t job_id, uint32_t cluster_id, |
| bool do_lock) |
| { |
| int rc; |
| slurm_msg_t req_msg, resp_msg; |
| |
| slurm_msg_t_init(&req_msg); |
| |
| sib_msg_t sib_msg; |
| memset(&sib_msg, 0, sizeof(sib_msg_t)); |
| sib_msg.job_id = job_id; |
| sib_msg.cluster_id = cluster_id; |
| |
| if (do_lock) |
| req_msg.msg_type = REQUEST_SIB_JOB_LOCK; |
| else |
| req_msg.msg_type = REQUEST_SIB_JOB_UNLOCK; |
| |
| req_msg.protocol_version = conn->rpc_version; |
| req_msg.data = &sib_msg; |
| |
| if (_send_recv_msg(conn, &req_msg, &resp_msg, false)) { |
| rc = SLURM_ERROR; |
| goto end_it; |
| } |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if ((rc = slurm_get_return_code(resp_msg.msg_type, |
| resp_msg.data))) { |
| errno = rc; |
| rc = SLURM_ERROR; |
| } |
| break; |
| default: |
| errno = SLURM_UNEXPECTED_MSG_ERROR; |
| rc = SLURM_ERROR; |
| break; |
| } |
| |
| end_it: |
| slurm_free_msg_members(&resp_msg); |
| |
| return rc; |
| } |
| |
| static int _persist_fed_job_lock(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| uint32_t cluster_id) |
| { |
| return _persist_fed_job_lock_bool(conn, job_id, cluster_id, true); |
| } |
| |
| static int _persist_fed_job_unlock(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| uint32_t cluster_id) |
| { |
| return _persist_fed_job_lock_bool(conn, job_id, cluster_id, false); |
| } |
| |
| /* |
| * Tell the origin cluster that the job was started |
| * |
| * This message is queued up as an rpc and a fed_job_update so that it can |
| * cancel the siblings without holding up the internal locks. |
| * |
| * IN conn - sibling connection |
| * IN job_id - the job's id |
| * IN cluster_id - cluster id of the cluster that started the job |
| * IN start_time - time the fed job started |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| static int _persist_fed_job_start(slurmdb_cluster_rec_t *conn, |
| uint32_t job_id, uint32_t cluster_id, |
| time_t start_time) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| |
| slurm_msg_t_init(&req_msg); |
| |
| sib_msg_t sib_msg; |
| memset(&sib_msg, 0, sizeof(sib_msg_t)); |
| sib_msg.sib_msg_type = FED_JOB_START; |
| sib_msg.job_id = job_id; |
| sib_msg.cluster_id = cluster_id; |
| sib_msg.start_time = start_time; |
| |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = conn->rpc_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| return rc; |
| } |
| |
| /* |
| * Send the specified signal to all steps of an existing job |
| * IN job_id - the job's id |
| * IN signal - signal number |
| * IN flags - see KILL_JOB_* flags above |
| * IN uid - uid of user making the request. |
| * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise. |
| */ |
| static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id, |
| uint16_t signal, uint16_t flags, |
| uid_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| slurm_msg_t req_msg, tmp_msg; |
| sib_msg_t sib_msg; |
| job_step_kill_msg_t kill_req; |
| buf_t *buffer; |
| |
| /* Build and pack a kill_req msg to put in a sib_msg */ |
| memset(&kill_req, 0, sizeof(job_step_kill_msg_t)); |
| kill_req.step_id.job_id = job_id; |
| kill_req.sjob_id = NULL; |
| kill_req.step_id.step_id = NO_VAL; |
| kill_req.step_id.step_het_comp = NO_VAL; |
| kill_req.signal = signal; |
| kill_req.flags = flags; |
| |
| slurm_msg_t_init(&tmp_msg); |
| tmp_msg.msg_type = REQUEST_CANCEL_JOB_STEP; |
| tmp_msg.data = &kill_req; |
| tmp_msg.protocol_version = conn->rpc_version; |
| |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(&tmp_msg, buffer); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_CANCEL; |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_type = tmp_msg.msg_type; |
| sib_msg.data_version = tmp_msg.protocol_version; |
| sib_msg.req_uid = uid; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = tmp_msg.protocol_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| } |
| |
| /* |
| * Tell the origin cluster to requeue the job |
| * |
| * IN conn - sibling connection |
| * IN job_id - the job's id |
| * IN start_time - time the fed job started |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn, |
| uint32_t job_id, uint32_t flags) |
| { |
| int rc; |
| requeue_msg_t requeue_req; |
| slurm_msg_t req_msg, tmp_msg; |
| sib_msg_t sib_msg; |
| buf_t *buffer; |
| |
| xassert(conn); |
| |
| requeue_req.job_id = job_id; |
| requeue_req.job_id_str = NULL; |
| requeue_req.flags = flags; |
| |
| slurm_msg_t_init(&tmp_msg); |
| tmp_msg.msg_type = REQUEST_JOB_REQUEUE; |
| tmp_msg.data = &requeue_req; |
| tmp_msg.protocol_version = conn->rpc_version; |
| |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(&tmp_msg, buffer); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg)); |
| sib_msg.sib_msg_type = FED_JOB_REQUEUE; |
| sib_msg.job_id = job_id; |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_type = tmp_msg.msg_type; |
| sib_msg.data_version = tmp_msg.protocol_version; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = tmp_msg.protocol_version; |
| req_msg.data = &sib_msg; |
| |
| rc = _queue_rpc(conn, &req_msg, job_id, false); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| } |
| |
| static int _find_sibling_by_id(void *x, void *key) |
| { |
| slurmdb_cluster_rec_t *object = (slurmdb_cluster_rec_t *)x; |
| uint32_t id = *(uint32_t *)key; |
| |
| if (object->fed.id == id) |
| return 1; |
| |
| return 0; |
| } |
| |
| extern void add_fed_job_info(job_record_t *job_ptr) |
| { |
| fed_job_info_t *job_info; |
| |
| job_info = xmalloc(sizeof(fed_job_info_t)); |
| job_info->job_id = job_ptr->job_id; |
| job_info->siblings_active = job_ptr->fed_details->siblings_active; |
| job_info->siblings_viable = job_ptr->fed_details->siblings_viable; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (fed_job_list) |
| list_append(fed_job_list, job_info); |
| else |
| xfree(job_info); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| } |
| |
| static int _delete_fed_job_info_by_id(void *object, void *arg) |
| { |
| fed_job_info_t *job_info = (fed_job_info_t *)object; |
| uint32_t job_id = *(uint32_t *)arg; |
| |
| if (job_info->job_id == job_id) |
| return true; |
| |
| return false; |
| } |
| |
| extern void fed_mgr_remove_fed_job_info(uint32_t job_id) |
| { |
| slurm_mutex_lock(&fed_job_list_mutex); |
| |
| if (fed_job_list) |
| list_delete_all(fed_job_list, _delete_fed_job_info_by_id, |
| &job_id); |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| } |
| |
| static int _list_find_fed_job_info_by_jobid(void *x, void *key) |
| { |
| fed_job_info_t *job_info = (fed_job_info_t *)x; |
| uint32_t job_id = *(uint32_t *)key; |
| |
| if (job_info->job_id == job_id) |
| return 1; |
| |
| return 0; |
| } |
| |
| /* Must have fed_job_mutex before entering */ |
| static fed_job_info_t *_find_fed_job_info(uint32_t job_id) |
| { |
| if (!fed_job_list) |
| return NULL; |
| return list_find_first(fed_job_list, _list_find_fed_job_info_by_jobid, |
| &job_id); |
| } |
| |
| static void _destroy_fed_job_update_info(void *object) |
| { |
| fed_job_update_info_t *job_update_info = |
| (fed_job_update_info_t *)object; |
| |
| if (job_update_info) { |
| xfree(job_update_info->siblings_str); |
| xfree(job_update_info->submit_cluster); |
| slurm_free_job_info_msg(job_update_info->job_info_msg); |
| slurm_free_job_step_kill_msg(job_update_info->kill_msg); |
| slurm_free_job_desc_msg(job_update_info->submit_desc); |
| xfree(job_update_info); |
| } |
| } |
| |
| static void _destroy_dep_msg(void *object) |
| { |
| slurm_free_dep_msg((dep_msg_t *)object); |
| } |
| |
| static void _destroy_dep_update_msg(void *object) |
| { |
| slurm_free_dep_update_origin_msg((dep_update_origin_msg_t *)object); |
| } |
| |
| static void _destroy_dep_job(void *object) |
| { |
| job_record_t *job_ptr = (job_record_t *)object; |
| |
| if (job_ptr) { |
| xassert(job_ptr->magic == JOB_MAGIC); |
| xfree(job_ptr->fed_details); |
| xfree(job_ptr->name); |
| if (job_ptr->details) { |
| xassert(job_ptr->details->magic == DETAILS_MAGIC); |
| xfree(job_ptr->details->dependency); |
| FREE_NULL_LIST(job_ptr->details->depend_list); |
| xfree(job_ptr->details); |
| } |
| job_record_free_null_array_recs(job_ptr); |
| job_ptr->magic = 0; |
| job_ptr->job_id = 0; |
| job_ptr->user_id = 0; |
| xfree(job_ptr); |
| } |
| } |
| |
| extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_id(uint32_t id) |
| { |
| uint32_t key = id; |
| return list_find_first(fed_mgr_fed_rec->cluster_list, |
| _find_sibling_by_id, &key); |
| } |
| |
| extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_name(char *sib_name) |
| { |
| if (!fed_mgr_fed_rec) |
| return NULL; |
| |
| return list_find_first(fed_mgr_fed_rec->cluster_list, |
| slurmdb_find_cluster_in_list, sib_name); |
| } |
| |
| /* |
| * Revoke all sibling jobs except from cluster_id -- which the request came from |
| * |
| * IN job_ptr - job to revoke |
| * IN cluster_id - cluster id of cluster that initiated call. Don're revoke |
| * the job on this cluster -- it's the one that started the fed job. |
| * IN revoke_sibs - bitmap of siblings to revoke. |
| * IN start_time - time the fed job started |
| */ |
| static void _revoke_sibling_jobs(uint32_t job_id, uint32_t cluster_id, |
| uint64_t revoke_sibs, time_t start_time) |
| { |
| int id = 1; |
| |
| if (!fed_mgr_fed_rec) /* Not part of federation anymore */ |
| return; |
| |
| while (revoke_sibs) { |
| if ((revoke_sibs & 1) && |
| (id != fed_mgr_cluster_rec->fed.id) && |
| (id != cluster_id)) { |
| slurmdb_cluster_rec_t *cluster = |
| fed_mgr_get_cluster_by_id(id); |
| if (!cluster) { |
| error("couldn't find cluster rec by id %d", id); |
| goto next_job; |
| } |
| |
| _persist_fed_job_revoke(cluster, job_id, JOB_CANCELLED, |
| 0, start_time); |
| } |
| |
| next_job: |
| revoke_sibs >>= 1; |
| id++; |
| } |
| } |
| |
| static int _remove_sibling_bit(job_record_t *job_ptr, |
| slurmdb_cluster_rec_t *sibling) |
| { |
| uint32_t origin_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return ESLURM_JOB_NOT_FEDERATED; |
| |
| job_ptr->fed_details->siblings_active &= |
| ~(FED_SIBLING_BIT(sibling->fed.id)); |
| job_ptr->fed_details->siblings_viable &= |
| ~(FED_SIBLING_BIT(sibling->fed.id)); |
| |
| if (!(job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))) |
| job_state_set_flag(job_ptr, JOB_REVOKED); |
| else if (!job_ptr->fed_details->cluster_lock) |
| job_state_unset_flag(job_ptr, JOB_REVOKED); |
| |
| update_job_fed_details(job_ptr); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Remove all pending federated jobs from the origin cluster. |
| */ |
| static void _cleanup_removed_origin_jobs(void) |
| { |
| list_itr_t *job_itr; |
| job_record_t *job_ptr; |
| time_t now = time(NULL); |
| uint32_t origin_id, sibling_id; |
| uint64_t sibling_bit; |
| |
| if (!fed_mgr_cluster_rec) |
| return; |
| |
| sibling_id = fed_mgr_cluster_rec->fed.id; |
| sibling_bit = FED_SIBLING_BIT(sibling_id); |
| |
| job_itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(job_itr))) { |
| bool running_remotely = false; |
| uint64_t siblings_viable; |
| |
| if (IS_JOB_COMPLETED(job_ptr)) |
| continue; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| continue; |
| |
| siblings_viable = job_ptr->fed_details->siblings_viable; |
| |
| if (sibling_id == origin_id && |
| job_ptr->fed_details->cluster_lock) |
| running_remotely = true; |
| |
| /* free fed_job_details so it can't call home. */ |
| job_record_free_fed_details(&job_ptr->fed_details); |
| |
| /* allow running/completing jobs to finish. */ |
| if (IS_JOB_COMPLETED(job_ptr) || |
| IS_JOB_COMPLETING(job_ptr) || |
| IS_JOB_RUNNING(job_ptr)) |
| continue; |
| |
| /* If this job is the only viable sibling then just let |
| * it run as a non-federated job. The origin should remove the |
| * tracking job. */ |
| if (!(siblings_viable & ~sibling_bit)) |
| continue; |
| |
| /* |
| * If a job is pending and not revoked on the origin cluster |
| * leave it as a non-federated job. |
| */ |
| if ((origin_id == sibling_id) && |
| IS_JOB_PENDING(job_ptr) && !IS_JOB_REVOKED(job_ptr)) |
| continue; |
| |
| /* Free the resp_host so that the srun doesn't get |
| * signaled about the job going away. The job could |
| * still run on another sibling. */ |
| if (running_remotely || |
| (origin_id != sibling_id)) |
| xfree(job_ptr->resp_host); |
| |
| job_state_set(job_ptr, (JOB_CANCELLED | JOB_REVOKED)); |
| job_ptr->start_time = now; |
| job_ptr->end_time = now; |
| job_completion_logger(job_ptr, false); |
| } |
| list_iterator_destroy(job_itr); |
| |
| /* Don't test these jobs for remote dependencies anymore */ |
| if (remote_dep_job_list) { |
| log_flag(FEDR, "%s: Remove all jobs in remote_dep_job_list", |
| __func__); |
| slurm_mutex_lock(&dep_job_list_mutex); |
| list_flush(remote_dep_job_list); |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| } |
| } |
| |
| /* |
| * Remove all pending jobs that originated from the given cluster. |
| */ |
| static void _cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t *cluster) |
| { |
| list_itr_t *job_itr; |
| job_record_t *job_ptr; |
| time_t now = time(NULL); |
| uint32_t origin_id, sibling_id; |
| uint64_t origin_bit, sibling_bit; |
| |
| if (!fed_mgr_cluster_rec) |
| return; |
| |
| sibling_id = fed_mgr_cluster_rec->fed.id; |
| sibling_bit = FED_SIBLING_BIT(sibling_id); |
| |
| job_itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(job_itr))) { |
| uint64_t siblings_viable; |
| |
| if (IS_JOB_COMPLETED(job_ptr)) |
| continue; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| continue; |
| |
| origin_bit = FED_SIBLING_BIT(origin_id); |
| siblings_viable = job_ptr->fed_details->siblings_viable; |
| |
| /* Remove cluster from viable,active siblings */ |
| _remove_sibling_bit(job_ptr, cluster); |
| |
| /* Remove jobs that |
| * 1. originated from the removed cluster |
| * 2. origin jobs that are tracking the running job |
| * 2. origin jobs that are tracking the pending job */ |
| if (origin_id == cluster->fed.id || |
| |
| (job_ptr->fed_details && |
| origin_id == sibling_id && |
| job_ptr->fed_details->cluster_lock == cluster->fed.id) || |
| |
| (siblings_viable & FED_SIBLING_BIT(cluster->fed.id) && |
| !(siblings_viable & ~FED_SIBLING_BIT(cluster->fed.id)))) { |
| |
| /* |
| * If this job originated from the origin (which is |
| * being removed) and the origin is not a viable sibling |
| * and there are more than one sibling then let the job |
| * remain as a federated job to be scheduled amongst |
| * it's siblings. |
| */ |
| if (IS_JOB_PENDING(job_ptr) && |
| (origin_id == cluster->fed.id) && |
| !(siblings_viable & origin_bit) && |
| (siblings_viable & ~sibling_bit)) |
| continue; |
| |
| /* free fed_job_details so it can't call home. */ |
| job_record_free_fed_details(&job_ptr->fed_details); |
| |
| /* |
| * If this job originated from the origin (which is |
| * being removed) and this sibling is the only viable |
| * sibling then let it run/pend as a non-federated job. |
| */ |
| if ((origin_id == cluster->fed.id) && |
| !(siblings_viable & ~sibling_bit)) |
| continue; |
| |
| if (!(IS_JOB_COMPLETED(job_ptr) || |
| IS_JOB_COMPLETING(job_ptr) || |
| IS_JOB_RUNNING(job_ptr))) { |
| |
| /* Free the resp_host so that the srun doesn't |
| * get signaled about the job going away. The |
| * job could still run on another sibling. */ |
| xfree(job_ptr->resp_host); |
| |
| job_state_set(job_ptr, (JOB_CANCELLED | |
| JOB_REVOKED)); |
| job_ptr->start_time = now; |
| job_ptr->end_time = now; |
| job_ptr->state_reason = WAIT_NO_REASON; |
| xfree(job_ptr->state_desc); |
| job_completion_logger(job_ptr, false); |
| } |
| } |
| } |
| list_iterator_destroy(job_itr); |
| } |
| |
| static void _handle_removed_clusters(slurmdb_federation_rec_t *db_fed, |
| uint64_t *removed_clusters) |
| { |
| list_itr_t *itr; |
| slurmdb_cluster_rec_t *tmp_cluster = NULL; |
| |
| itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((tmp_cluster = list_next(itr))) { |
| if (tmp_cluster->name && |
| !(list_find_first(db_fed->cluster_list, |
| slurmdb_find_cluster_in_list, |
| tmp_cluster->name))) { |
| info("cluster %s was removed from the federation", |
| tmp_cluster->name); |
| *removed_clusters |= |
| FED_SIBLING_BIT(tmp_cluster->fed.id); |
| _cleanup_removed_cluster_jobs(tmp_cluster); |
| } |
| } |
| list_iterator_destroy(itr); |
| } |
| |
| /* Parse a RESPONSE_CTLD_MULT_MSG message and return a bit set for every |
| * successful operation */ |
| bitstr_t *_parse_resp_ctld_mult(slurm_msg_t *resp_msg) |
| { |
| ctld_list_msg_t *ctld_resp_msg = resp_msg->data; |
| list_itr_t *iter = NULL; |
| bitstr_t *success_bits; |
| slurm_msg_t sub_msg; |
| return_code_msg_t *rc_msg; |
| buf_t *single_resp_buf = NULL; |
| int resp_cnt, resp_inx = -1; |
| |
| xassert(resp_msg->msg_type == RESPONSE_CTLD_MULT_MSG); |
| |
| if (!ctld_resp_msg->my_list) { |
| error("%s: RESPONSE_CTLD_MULT_MSG has no list component", |
| __func__); |
| return NULL; |
| } |
| |
| resp_cnt = list_count(ctld_resp_msg->my_list); |
| success_bits = bit_alloc(resp_cnt); |
| iter = list_iterator_create(ctld_resp_msg->my_list); |
| while ((single_resp_buf = list_next(iter))) { |
| resp_inx++; |
| slurm_msg_t_init(&sub_msg); |
| if (unpack16(&sub_msg.msg_type, single_resp_buf) || |
| unpack_msg(&sub_msg, single_resp_buf)) { |
| error("%s: Sub-message unpack error for Message Type:%s", |
| __func__, rpc_num2string(sub_msg.msg_type)); |
| continue; |
| } |
| |
| if (sub_msg.msg_type != RESPONSE_SLURM_RC) { |
| error("%s: Unexpected Message Type:%s", |
| __func__, rpc_num2string(sub_msg.msg_type)); |
| } else { |
| rc_msg = (return_code_msg_t *) sub_msg.data; |
| if (rc_msg->return_code == SLURM_SUCCESS) |
| bit_set(success_bits, resp_inx); |
| } |
| (void) slurm_free_msg_data(sub_msg.msg_type, sub_msg.data); |
| |
| } |
| |
| return success_bits; |
| } |
| |
| static int _fed_mgr_job_allocate_sib(char *sib_name, job_desc_msg_t *job_desc, |
| uint16_t start_protocol_version, |
| bool interactive_job) |
| { |
| int error_code = SLURM_SUCCESS; |
| job_record_t *job_ptr = NULL; |
| char *err_msg = NULL; |
| bool reject_job = false; |
| slurmdb_cluster_rec_t *sibling; |
| uid_t uid = 0; |
| slurm_msg_t response_msg; |
| slurm_msg_t_init(&response_msg); |
| |
| xassert(sib_name); |
| xassert(job_desc); |
| |
| if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) { |
| error_code = ESLURM_INVALID_CLUSTER_NAME; |
| error("Invalid sibling name"); |
| } else if ((job_desc->alloc_node == NULL) || |
| (job_desc->alloc_node[0] == '\0')) { |
| error_code = ESLURM_INVALID_NODE_NAME; |
| error("REQUEST_SUBMIT_BATCH_JOB lacks alloc_node"); |
| } |
| |
| if (error_code == SLURM_SUCCESS) |
| error_code = validate_job_create_req(job_desc,uid,&err_msg); |
| |
| if (error_code) { |
| reject_job = true; |
| goto send_msg; |
| } |
| |
| /* Create new job allocation */ |
| job_desc->het_job_offset = NO_VAL; |
| error_code = job_allocate(job_desc, job_desc->immediate, false, NULL, |
| interactive_job, uid, false, &job_ptr, |
| &err_msg, start_protocol_version); |
| if (!job_ptr || |
| (error_code && job_ptr->job_state == JOB_FAILED)) |
| reject_job = true; |
| |
| if (job_desc->immediate && |
| (error_code != SLURM_SUCCESS)) |
| error_code = ESLURM_CAN_NOT_START_IMMEDIATELY; |
| |
| send_msg: |
| /* Send response back about origin jobid if an error occurred. */ |
| if (reject_job) |
| _persist_fed_job_response(sibling, job_desc->job_id, error_code); |
| else { |
| if (!(job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))) |
| job_state_set_flag(job_ptr, JOB_REVOKED); |
| |
| add_fed_job_info(job_ptr); |
| schedule_job_save(); /* Has own locks */ |
| schedule_node_save(); /* Has own locks */ |
| queue_job_scheduler(); |
| } |
| |
| xfree(err_msg); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static void _do_fed_job_complete(job_record_t *job_ptr, uint32_t job_state, |
| uint32_t exit_code, time_t start_time) |
| { |
| if (job_ptr->job_state & JOB_REQUEUE_FED) { |
| /* Remove JOB_REQUEUE_FED and JOB_COMPLETING once |
| * sibling reports that sibling job is done. Leave other |
| * state in place. JOB_SPECIAL_EXIT may be in the |
| * states. */ |
| job_state_unset_flag(job_ptr, (JOB_PENDING | JOB_COMPLETING)); |
| batch_requeue_fini(job_ptr); |
| } else { |
| fed_mgr_job_revoke(job_ptr, true, job_state, exit_code, |
| start_time); |
| } |
| } |
| |
| static void _handle_fed_job_complete(fed_job_update_info_t *job_update_info) |
| { |
| job_record_t *job_ptr; |
| |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| if (!(job_ptr = find_job_record(job_update_info->job_id))) { |
| error("%s: failed to find job_record for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| } else if (!job_ptr->fed_details) { |
| debug2("%s: %pJ not federated anymore", __func__, job_ptr); |
| } else if (IS_JOB_RUNNING(job_ptr)) { |
| /* |
| * The job could have started between the time that the origin |
| * sent the complete message and now. |
| */ |
| slurm_msg_t msg; |
| sib_msg_t sib_msg = {0}; |
| job_step_kill_msg_t *kill_req; |
| |
| /* Build and pack a kill_req msg to put in a sib_msg */ |
| kill_req = xmalloc(sizeof(job_step_kill_msg_t)); |
| kill_req->step_id.job_id = job_update_info->job_id; |
| kill_req->sjob_id = NULL; |
| kill_req->step_id.step_id = SLURM_BATCH_SCRIPT; |
| kill_req->step_id.step_het_comp = NO_VAL; |
| kill_req->signal = SIGKILL; |
| kill_req->flags = 0; |
| |
| sib_msg.data = kill_req; |
| |
| slurm_msg_t_init(&msg); |
| msg.data = &sib_msg; |
| |
| log_flag(FEDR, "%s: %pJ running now, just going to cancel it.", |
| __func__, job_ptr); |
| |
| _q_sib_job_cancel(&msg, job_update_info->uid); |
| } else { |
| _do_fed_job_complete(job_ptr, job_update_info->job_state, |
| job_update_info->return_code, |
| job_update_info->start_time); |
| } |
| |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| static void _handle_fed_job_cancel(fed_job_update_info_t *job_update_info) |
| { |
| kill_job_step(job_update_info->kill_msg, job_update_info->uid); |
| } |
| |
| static void |
| _handle_fed_job_remove_active_sib_bit(fed_job_update_info_t *job_update_info) |
| { |
| fed_job_info_t *job_info; |
| job_record_t *job_ptr; |
| slurmdb_cluster_rec_t *sibling; |
| |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| if (!(job_ptr = find_job_record(job_update_info->job_id))) { |
| error("%s: failed to find job_record for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } else if (!job_ptr->fed_details) { |
| debug2("%s: %pJ not federated anymore", __func__, job_ptr); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!(job_info = _find_fed_job_info(job_update_info->job_id))) { |
| error("%s: failed to find fed job info for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| |
| sibling = fed_mgr_get_cluster_by_name(job_update_info->siblings_str); |
| if (sibling) { |
| job_info->siblings_active &= |
| ~(FED_SIBLING_BIT(sibling->fed.id)); |
| job_ptr->fed_details->siblings_active = |
| job_info->siblings_active; |
| update_job_fed_details(job_ptr); |
| } |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| static void _handle_fed_job_requeue(fed_job_update_info_t *job_update_info) |
| { |
| int rc; |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| if ((rc = job_requeue(job_update_info->uid, job_update_info->job_id, |
| NULL, false, job_update_info->flags))) |
| error("failed to requeue fed JobId=%u - rc:%d", |
| job_update_info->job_id, rc); |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| /* |
| * Job has been started, revoke the sibling jobs. |
| * |
| * This is the common code between queued and local job starts. |
| * This can be done when the job is starting on the origin cluster without |
| * queueing because the cluster already has the job write lock and fed read |
| * lock. |
| * |
| * Must have fed_job_list mutex locked and job write_lock set. |
| */ |
| static void _fed_job_start_revoke(fed_job_info_t *job_info, |
| job_record_t *job_ptr, time_t start_time) |
| { |
| uint32_t cluster_lock; |
| uint64_t old_active; |
| uint64_t old_viable; |
| |
| cluster_lock = job_info->cluster_lock; |
| old_active = job_info->siblings_active; |
| old_viable = job_info->siblings_viable; |
| |
| job_ptr->fed_details->cluster_lock = cluster_lock; |
| job_ptr->fed_details->siblings_active = |
| job_info->siblings_active = |
| FED_SIBLING_BIT(cluster_lock); |
| update_job_fed_details(job_ptr); |
| |
| if (old_active & ~FED_SIBLING_BIT(cluster_lock)) { |
| /* There are siblings that need to be removed */ |
| log_flag(FEDR, "%s: %pJ is running on cluster id %d, revoking remote siblings (active:%"PRIu64" viable:%"PRIu64")", |
| __func__, job_ptr, cluster_lock, old_active, |
| old_viable); |
| |
| _revoke_sibling_jobs(job_ptr->job_id, cluster_lock, |
| old_active, start_time); |
| } |
| } |
| |
| static void _handle_fed_job_start(fed_job_update_info_t *job_update_info) |
| { |
| fed_job_info_t *job_info; |
| job_record_t *job_ptr; |
| |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| if (!(job_ptr = find_job_record(job_update_info->job_id))) { |
| error("%s: failed to find job_record for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } else if (!job_ptr->fed_details) { |
| debug2("%s: %pJ not federated anymore", __func__, job_ptr); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!(job_info = |
| _find_fed_job_info(job_update_info->job_id))) { |
| error("%s: failed to find fed job info for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| |
| _fed_job_start_revoke(job_info, job_ptr, job_update_info->start_time); |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| if (job_info->cluster_lock != fed_mgr_cluster_rec->fed.id) { |
| log_flag(FEDR, "%s: %pJ is running remotely, revoking origin tracking job", |
| __func__, job_ptr); |
| |
| /* leave as pending so that it will stay around */ |
| fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0, |
| job_update_info->start_time); |
| } |
| |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| static int _list_find_jobid(void *x, void *key) |
| { |
| uint32_t src_jobid = *(uint32_t *)x; |
| uint32_t key_jobid = *(uint32_t *)key; |
| |
| if (src_jobid == key_jobid) |
| return 1; |
| return 0; |
| } |
| |
| static void _handle_fed_job_submission(fed_job_update_info_t *job_update_info) |
| { |
| job_record_t *job_ptr; |
| bool interactive_job = |
| (job_update_info->type == FED_JOB_SUBMIT_INT) ? |
| true : false; |
| |
| slurmctld_lock_t job_write_lock = { |
| READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; |
| |
| log_flag(FEDR, "%s: submitting %s sibling JobId=%u from %s", |
| __func__, (interactive_job) ? "interactive" : "batch", |
| job_update_info->submit_desc->job_id, |
| job_update_info->submit_cluster); |
| |
| |
| /* do this outside the job write lock */ |
| delete_job_desc_files(job_update_info->job_id); |
| lock_slurmctld(job_write_lock); |
| |
| if ((job_ptr = find_job_record(job_update_info->job_id))) { |
| debug("Found existing fed %pJ, going to requeue/unlink it", |
| job_ptr); |
| /* Delete job quickly */ |
| job_state_set_flag(job_ptr, JOB_REVOKED); |
| unlink_job_record(job_ptr); |
| |
| /* |
| * Make sure that the file delete request is purged from list |
| * -- added from purge_job_record() -- before job is allocated |
| * again. |
| */ |
| list_delete_all(purge_files_list, _list_find_jobid, |
| &job_update_info->job_id); |
| |
| } |
| |
| _fed_mgr_job_allocate_sib(job_update_info->submit_cluster, |
| job_update_info->submit_desc, |
| job_update_info->submit_proto_ver, |
| interactive_job); |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| static void _handle_fed_job_update(fed_job_update_info_t *job_update_info) |
| { |
| int rc; |
| slurm_msg_t msg; |
| slurm_msg_t_init(&msg); |
| job_desc_msg_t *job_desc = job_update_info->submit_desc; |
| slurmdb_cluster_rec_t *sibling; |
| |
| slurmctld_lock_t job_write_lock = { |
| READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, READ_LOCK }; |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| xassert(job_desc); |
| |
| /* scontrol always sets job_id_str */ |
| job_desc->job_id = job_update_info->job_id; |
| msg.data = job_desc; |
| |
| lock_slurmctld(job_write_lock); |
| rc = update_job(&msg, job_update_info->uid, false); |
| unlock_slurmctld(job_write_lock); |
| |
| lock_slurmctld(fed_read_lock); |
| if (!(sibling = |
| fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) { |
| error("Invalid sibling name"); |
| } else { |
| _persist_update_job_resp(sibling, job_update_info->job_id, rc); |
| } |
| unlock_slurmctld(fed_read_lock); |
| } |
| |
| static void |
| _handle_fed_job_update_response(fed_job_update_info_t *job_update_info) |
| { |
| fed_job_info_t *job_info; |
| slurmdb_cluster_rec_t *sibling; |
| |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!(job_info = _find_fed_job_info(job_update_info->job_id))) { |
| error("%s: failed to find fed job info for fed JobId=%u", |
| __func__, job_update_info->job_id); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| return; |
| } |
| |
| lock_slurmctld(fed_read_lock); |
| |
| if (!(sibling = |
| fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) { |
| error("Invalid sibling name"); |
| unlock_slurmctld(fed_read_lock); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| return; |
| } |
| |
| if (job_info->updating_sibs[sibling->fed.id]) |
| job_info->updating_sibs[sibling->fed.id]--; |
| else |
| error("%s this should never happen", __func__); |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| unlock_slurmctld(fed_read_lock); |
| } |
| |
| static int _handle_fed_job_sync(fed_job_update_info_t *job_update_info) |
| { |
| int rc = SLURM_SUCCESS; |
| |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| |
| rc = _sync_jobs(job_update_info->submit_cluster, |
| job_update_info->job_info_msg, |
| job_update_info->start_time); |
| |
| unlock_slurmctld(job_write_lock); |
| |
| return rc; |
| } |
| |
| /* Have to send the job sync from the job_update thread so that it can |
| * independently get the job read lock. */ |
| static int _handle_fed_send_job_sync(fed_job_update_info_t *job_update_info) |
| { |
| int rc = SLURM_SUCCESS; |
| list_t *jobids = NULL; |
| slurm_msg_t req_msg, job_msg; |
| sib_msg_t sib_msg = {0}; |
| slurmdb_cluster_rec_t *sibling; |
| buf_t *job_buffer = NULL, *buffer = NULL; |
| time_t sync_time = 0; |
| char *sib_name = job_update_info->submit_cluster; |
| |
| slurmctld_lock_t job_read_lock = { |
| READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_read_lock); |
| |
| if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) { |
| error("%s: Invalid sibling name %s", __func__, sib_name); |
| unlock_slurmctld(job_read_lock); |
| return SLURM_ERROR; |
| } |
| |
| slurm_mutex_lock(&sibling->lock); |
| if (!sibling->rpc_version && sibling->fed.recv) { |
| sibling->rpc_version = |
| ((persist_conn_t *) sibling->fed.recv)->version; |
| } |
| slurm_mutex_unlock(&sibling->lock); |
| |
| if (!sibling->rpc_version) { |
| error("%s: cluster %s doesn't have rpc_version yet.", |
| __func__, sib_name); |
| unlock_slurmctld(job_read_lock); |
| return SLURM_ERROR; |
| } |
| |
| sync_time = time(NULL); |
| jobids = _get_sync_jobid_list(sibling->fed.id, sync_time); |
| job_buffer = pack_spec_jobs(jobids, SHOW_ALL, slurm_conf.slurm_user_id, |
| NO_VAL, sibling->rpc_version); |
| FREE_NULL_LIST(jobids); |
| |
| unlock_slurmctld(job_read_lock); |
| |
| slurm_msg_t_init(&job_msg); |
| job_msg.protocol_version = sibling->rpc_version; |
| job_msg.msg_type = RESPONSE_JOB_INFO; |
| job_msg.data = job_buffer; |
| |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(&job_msg, buffer); |
| |
| memset(&sib_msg, 0, sizeof(sib_msg_t)); |
| sib_msg.sib_msg_type = FED_JOB_SYNC; |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_type = job_msg.msg_type; |
| sib_msg.data_version = job_msg.protocol_version; |
| sib_msg.start_time = sync_time; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.protocol_version = job_msg.protocol_version; |
| req_msg.data = &sib_msg; |
| |
| sibling->fed.sync_sent = true; |
| |
| rc = _queue_rpc(sibling, &req_msg, 0, false); |
| |
| FREE_NULL_BUFFER(job_buffer); |
| FREE_NULL_BUFFER(buffer); |
| |
| return rc; |
| } |
| |
| static int _foreach_fed_job_update_info(fed_job_update_info_t *job_update_info) |
| { |
| if (!fed_mgr_cluster_rec) { |
| info("Not part of federation anymore, not performing fed job updates"); |
| return SLURM_SUCCESS; |
| } |
| |
| log_flag(FEDR, "%s: JobId=%u type:%s", |
| __func__, job_update_info->job_id, |
| _job_update_type_str(job_update_info->type)); |
| |
| switch (job_update_info->type) { |
| case FED_JOB_COMPLETE: |
| _handle_fed_job_complete(job_update_info); |
| break; |
| case FED_JOB_CANCEL: |
| _handle_fed_job_cancel(job_update_info); |
| break; |
| case FED_JOB_REMOVE_ACTIVE_SIB_BIT: |
| _handle_fed_job_remove_active_sib_bit(job_update_info); |
| break; |
| case FED_JOB_REQUEUE: |
| _handle_fed_job_requeue(job_update_info); |
| break; |
| case FED_JOB_START: |
| _handle_fed_job_start(job_update_info); |
| break; |
| case FED_JOB_SUBMIT_BATCH: |
| case FED_JOB_SUBMIT_INT: |
| _handle_fed_job_submission(job_update_info); |
| break; |
| case FED_JOB_SYNC: |
| _handle_fed_job_sync(job_update_info); |
| break; |
| case FED_JOB_UPDATE: |
| _handle_fed_job_update(job_update_info); |
| break; |
| case FED_JOB_UPDATE_RESPONSE: |
| _handle_fed_job_update_response(job_update_info); |
| break; |
| case FED_SEND_JOB_SYNC: |
| _handle_fed_send_job_sync(job_update_info); |
| break; |
| default: |
| error("Invalid fed_job type: %d JobId=%u", |
| job_update_info->type, job_update_info->job_id); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static void _update_origin_job_dep(job_record_t *job_ptr, |
| slurmdb_cluster_rec_t *origin) |
| { |
| slurm_msg_t req_msg; |
| dep_update_origin_msg_t dep_update_msg = { 0 }; |
| |
| xassert(job_ptr); |
| xassert(job_ptr->details); |
| xassert(job_ptr->details->depend_list); |
| xassert(origin); |
| |
| if (origin == fed_mgr_cluster_rec) { |
| error("%s: Cannot send dependency update of %pJ to self - were clusters removed then re-added to the federation in a different order?", |
| __func__, job_ptr); |
| return; |
| } |
| |
| dep_update_msg.depend_list = job_ptr->details->depend_list; |
| dep_update_msg.job_id = job_ptr->job_id; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_UPDATE_ORIGIN_DEP; |
| req_msg.data = &dep_update_msg; |
| |
| if (_queue_rpc(origin, &req_msg, 0, false)) |
| error("%s: Failed to send dependency update for %pJ", |
| __func__, job_ptr); |
| } |
| |
| static int _find_local_dep(void *arg, void *key) |
| { |
| depend_spec_t *dep_ptr = (depend_spec_t *) arg; |
| return !(dep_ptr->depend_flags & SLURM_FLAGS_REMOTE); |
| } |
| |
| static int _find_job_by_id(void *arg, void *key) |
| { |
| job_record_t *job_ptr = (job_record_t *) arg; |
| uint32_t job_id = *((uint32_t *) key); |
| return job_ptr->job_id == job_id; |
| } |
| |
| static void _handle_recv_remote_dep(dep_msg_t *remote_dep_info) |
| { |
| /* |
| * update_job_dependency() will: |
| * - read the job list (need job read lock) |
| * - call fed_mgr_is_origin_job_id (need fed read lock) |
| */ |
| int rc, tmp; |
| slurmctld_lock_t job_read_lock = { .job = READ_LOCK, .fed = READ_LOCK }; |
| job_record_t *job_ptr = xmalloc(sizeof *job_ptr); |
| |
| job_ptr->magic = JOB_MAGIC; |
| job_ptr->details = xmalloc(sizeof *(job_ptr->details)); |
| job_ptr->details->magic = DETAILS_MAGIC; |
| job_ptr->job_id = remote_dep_info->job_id; |
| job_ptr->name = remote_dep_info->job_name; |
| job_ptr->user_id = remote_dep_info->user_id; |
| |
| /* |
| * Initialize array info. Allocate space for job_ptr->array_recs if |
| * the job is an array so it's recognized as an array, but it's not used |
| * anywhere. |
| */ |
| job_ptr->array_job_id = remote_dep_info->array_job_id; |
| job_ptr->array_task_id = remote_dep_info->array_task_id; |
| |
| /* |
| * We need to allocate space for job_ptr->array_recs if |
| * it's an array job, but we don't use anything in it. |
| */ |
| if (remote_dep_info->is_array) |
| job_ptr->array_recs = xmalloc(sizeof *(job_ptr->array_recs)); |
| |
| /* |
| * We need to allocate space for fed_details so |
| * other places know this is a fed job, but we don't |
| * need to set anything specific in it. |
| */ |
| job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details)); |
| |
| log_flag(FEDR, "%s: Got job_id: %u, name: \"%s\", array_task_id: %u, dependency: \"%s\", is_array? %s, user_id: %u", |
| __func__, remote_dep_info->job_id, remote_dep_info->job_name, |
| remote_dep_info->array_task_id, remote_dep_info->dependency, |
| remote_dep_info->is_array ? "yes" : "no", |
| remote_dep_info->user_id); |
| |
| /* NULL string so it doesn't get free'd since it's used by job_ptr */ |
| remote_dep_info->job_name = NULL; |
| |
| /* Create and validate the dependency. */ |
| lock_slurmctld(job_read_lock); |
| rc = update_job_dependency(job_ptr, remote_dep_info->dependency); |
| unlock_slurmctld(job_read_lock); |
| |
| if (rc) { |
| error("%s: Invalid dependency %s for %pJ: %s", |
| __func__, remote_dep_info->dependency, job_ptr, |
| slurm_strerror(rc)); |
| _destroy_dep_job(job_ptr); |
| } else { |
| job_record_t *tmp_job; |
| list_itr_t *itr; |
| |
| /* |
| * Remove the old reference to this job from remote_dep_job_list |
| * so that we don't continue testing the old dependencies. |
| */ |
| slurm_mutex_lock(&dep_job_list_mutex); |
| itr = list_iterator_create(remote_dep_job_list); |
| while ((tmp_job = list_next(itr))) { |
| if (tmp_job->job_id == job_ptr->job_id) { |
| list_delete_item(itr); |
| break; |
| } |
| } |
| list_iterator_destroy(itr); |
| |
| /* |
| * If we were sent a list of 0 dependencies, that means |
| * the dependency was updated and cleared, so don't |
| * add it to the list to test. Also only add it if |
| * there are dependencies local to this cluster. |
| */ |
| if (list_count(job_ptr->details->depend_list) && |
| list_find_first(job_ptr->details->depend_list, |
| _find_local_dep, &tmp)) |
| list_append(remote_dep_job_list, job_ptr); |
| else |
| _destroy_dep_job(job_ptr); |
| |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| } |
| _destroy_dep_msg(remote_dep_info); |
| } |
| |
| static void _handle_dep_update_origin_msgs(void) |
| { |
| job_record_t *job_ptr; |
| dep_update_origin_msg_t *dep_update_msg; |
| list_t *update_job_list = NULL; |
| slurmctld_lock_t job_write_lock = { |
| .conf = READ_LOCK, .job = WRITE_LOCK, .fed = READ_LOCK }; |
| |
| if (!list_count(origin_dep_update_list)) |
| return; |
| |
| lock_slurmctld(job_write_lock); |
| while ((dep_update_msg = list_pop(origin_dep_update_list))) { |
| if (!(job_ptr = find_job_record(dep_update_msg->job_id))) { |
| /* |
| * Maybe the job was cancelled and purged before |
| * the dependency update got here or was able |
| * to be processed. Regardless, this job doesn't |
| * exist here, so we have to throw out this |
| * dependency update message. |
| */ |
| log_flag(DEPENDENCY, "%s: Could not find job %u, cannot process dependency update. Perhaps the jobs was purged before we got here.", |
| __func__, dep_update_msg->job_id); |
| slurm_free_dep_update_origin_msg(dep_update_msg); |
| continue; |
| } else if (!job_ptr->details || |
| !job_ptr->details->depend_list) { |
| /* |
| * This might happen if the job's dependencies |
| * were updated to be none before the dependency |
| * update came from the sibling cluster. |
| */ |
| log_flag(DEPENDENCY, "%s: %pJ doesn't have dependencies, cannot process dependency update", |
| __func__, job_ptr); |
| slurm_free_dep_update_origin_msg(dep_update_msg); |
| continue; |
| } |
| if (update_job_dependency_list(job_ptr, |
| dep_update_msg->depend_list)) { |
| if (!update_job_list) { |
| update_job_list = list_create(NULL); |
| list_append(update_job_list, job_ptr); |
| } else if (!list_find_first(update_job_list, |
| _find_job_by_id, |
| &job_ptr->job_id)) |
| list_append(update_job_list, job_ptr); |
| } |
| slurm_free_dep_update_origin_msg(dep_update_msg); |
| } |
| if (update_job_list) { |
| list_for_each(update_job_list, handle_job_dependency_updates, |
| NULL); |
| FREE_NULL_LIST(update_job_list); |
| } |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| static void *_test_dep_job_thread(void *arg) |
| { |
| time_t last_test = 0; |
| time_t now; |
| struct timespec ts = {0, 0}; |
| slurmctld_lock_t job_read_lock = { |
| .job = READ_LOCK, .fed = READ_LOCK }; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_test_dep", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, |
| "fed_test_dep"); |
| } |
| #endif |
| |
| while (!slurmctld_config.shutdown_time) { |
| now = time(NULL); |
| |
| /* Only test after joining a federation. */ |
| if (fed_mgr_fed_rec && fed_mgr_cluster_rec && |
| ((now - last_test) > TEST_REMOTE_DEP_FREQ)) { |
| last_test = now; |
| lock_slurmctld(job_read_lock); |
| fed_mgr_test_remote_dependencies(); |
| unlock_slurmctld(job_read_lock); |
| } |
| |
| slurm_mutex_lock(&test_dep_mutex); |
| ts.tv_sec = now + 2; |
| slurm_cond_timedwait(&test_dep_cond, |
| &test_dep_mutex, &ts); |
| slurm_mutex_unlock(&test_dep_mutex); |
| } |
| return NULL; |
| } |
| |
| static void *_origin_dep_update_thread(void *arg) |
| { |
| struct timespec ts = {0, 0}; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_update_dep", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, |
| "fed_update_dep"); |
| } |
| #endif |
| |
| while (!slurmctld_config.shutdown_time) { |
| slurm_mutex_lock(&origin_dep_update_mutex); |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&origin_dep_cond, |
| &origin_dep_update_mutex, &ts); |
| slurm_mutex_unlock(&origin_dep_update_mutex); |
| |
| if (slurmctld_config.shutdown_time) |
| break; |
| |
| /* Wait for fed_mgr_init() */ |
| if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec) |
| continue; |
| |
| _handle_dep_update_origin_msgs(); |
| } |
| return NULL; |
| } |
| |
| static void *_remote_dep_recv_thread(void *arg) |
| { |
| struct timespec ts = {0, 0}; |
| dep_msg_t *remote_dep_info; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_remote_dep", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, |
| "fed_remote_dep"); |
| } |
| #endif |
| |
| while (!slurmctld_config.shutdown_time) { |
| slurm_mutex_lock(&remote_dep_recv_mutex); |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&remote_dep_cond, |
| &remote_dep_recv_mutex, &ts); |
| slurm_mutex_unlock(&remote_dep_recv_mutex); |
| |
| if (slurmctld_config.shutdown_time) |
| break; |
| |
| /* Wait for fed_mgr_init() */ |
| if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec) |
| continue; |
| |
| while ((remote_dep_info = list_pop(remote_dep_recv_list))) { |
| _handle_recv_remote_dep(remote_dep_info); |
| } |
| } |
| return NULL; |
| } |
| |
| /* Start a thread to manage queued sibling requests */ |
| static void *_fed_job_update_thread(void *arg) |
| { |
| struct timespec ts = {0, 0}; |
| fed_job_update_info_t *job_update_info; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_jobs", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, "fed_jobs"); |
| } |
| #endif |
| |
| while (!slurmctld_config.shutdown_time) { |
| slurm_mutex_lock(&job_update_mutex); |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&job_update_cond, |
| &job_update_mutex, &ts); |
| slurm_mutex_unlock(&job_update_mutex); |
| |
| if (slurmctld_config.shutdown_time) |
| break; |
| |
| while ((job_update_info = list_pop(fed_job_update_list))) { |
| _foreach_fed_job_update_info(job_update_info); |
| _destroy_fed_job_update_info(job_update_info); |
| } |
| } |
| |
| return NULL; |
| } |
| |
| /* Start a thread to manage queued agent requests */ |
| static void *_agent_thread(void *arg) |
| { |
| slurmdb_cluster_rec_t *cluster; |
| struct timespec ts = {0, 0}; |
| list_itr_t *cluster_iter, *rpc_iter; |
| agent_queue_t *rpc_rec; |
| slurm_msg_t req_msg, resp_msg; |
| ctld_list_msg_t ctld_req_msg; |
| bitstr_t *success_bits; |
| int rc, resp_inx, success_size; |
| |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| #if HAVE_SYS_PRCTL_H |
| if (prctl(PR_SET_NAME, "fed_agent", NULL, NULL, NULL) < 0) { |
| error("%s: cannot set my name to %s %m", __func__, "fed_agent"); |
| } |
| #endif |
| |
| while (!slurmctld_config.shutdown_time) { |
| /* Wait for new work or re-issue RPCs after 2 second wait */ |
| slurm_mutex_lock(&agent_mutex); |
| if (!slurmctld_config.shutdown_time && !agent_queue_size) { |
| ts.tv_sec = time(NULL) + 2; |
| slurm_cond_timedwait(&agent_cond, &agent_mutex, &ts); |
| } |
| agent_queue_size = 0; |
| slurm_mutex_unlock(&agent_mutex); |
| if (slurmctld_config.shutdown_time) |
| break; |
| |
| lock_slurmctld(fed_read_lock); |
| if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) { |
| unlock_slurmctld(fed_read_lock); |
| continue; |
| } |
| |
| /* Look for work on each cluster */ |
| cluster_iter = list_iterator_create( |
| fed_mgr_fed_rec->cluster_list); |
| while (!slurmctld_config.shutdown_time && |
| (cluster = list_next(cluster_iter))) { |
| time_t now = time(NULL); |
| if ((cluster->send_rpc == NULL) || |
| (list_count(cluster->send_rpc) == 0)) |
| continue; |
| |
| /* Move currently pending RPCs to new list */ |
| ctld_req_msg.my_list = NULL; |
| rpc_iter = list_iterator_create(cluster->send_rpc); |
| while ((rpc_rec = list_next(rpc_iter))) { |
| if ((rpc_rec->last_try + rpc_rec->last_defer) >= |
| now) |
| continue; |
| if (!ctld_req_msg.my_list) |
| ctld_req_msg.my_list =list_create(NULL); |
| list_append(ctld_req_msg.my_list, |
| rpc_rec->buffer); |
| rpc_rec->last_try = now; |
| if (rpc_rec->last_defer == 128) { |
| info("%s: %s JobId=%u request to cluster %s is repeatedly failing", |
| __func__, |
| rpc_num2string(rpc_rec->msg_type), |
| rpc_rec->job_id, cluster->name); |
| rpc_rec->last_defer *= 2; |
| } else if (rpc_rec->last_defer) |
| rpc_rec->last_defer *= 2; |
| else |
| rpc_rec->last_defer = 2; |
| } |
| list_iterator_destroy(rpc_iter); |
| if (!ctld_req_msg.my_list) |
| continue; |
| |
| /* Build, pack and send the combined RPC */ |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_CTLD_MULT_MSG; |
| req_msg.data = &ctld_req_msg; |
| rc = _send_recv_msg(cluster, &req_msg, &resp_msg, |
| false); |
| |
| /* Process the response */ |
| if ((rc == SLURM_SUCCESS) && |
| (resp_msg.msg_type == RESPONSE_CTLD_MULT_MSG)) { |
| /* Remove successfully processed RPCs */ |
| resp_inx = 0; |
| success_bits = _parse_resp_ctld_mult(&resp_msg); |
| success_size = bit_size(success_bits); |
| rpc_iter = list_iterator_create(cluster-> |
| send_rpc); |
| while ((rpc_rec = list_next(rpc_iter))) { |
| if (rpc_rec->last_try != now) |
| continue; |
| if (resp_inx >= success_size) { |
| error("%s: bitmap too small (%d >= %d)", |
| __func__, resp_inx, |
| success_size); |
| break; |
| } |
| if (bit_test(success_bits, resp_inx++)) |
| list_delete_item(rpc_iter); |
| } |
| list_iterator_destroy(rpc_iter); |
| FREE_NULL_BITMAP(success_bits); |
| } else { |
| /* Failed to process combined RPC. |
| * Leave all RPCs on the queue. */ |
| if (rc != SLURM_SUCCESS) { |
| if (_comm_fail_log(cluster)) { |
| error("%s: Failed to send RPC: %s", |
| __func__, |
| slurm_strerror(rc)); |
| } else { |
| debug("%s: Failed to send RPC: %s", |
| __func__, |
| slurm_strerror(rc)); |
| } |
| } else if (resp_msg.msg_type == |
| PERSIST_RC) { |
| persist_rc_msg_t *msg; |
| char *err_str; |
| msg = resp_msg.data; |
| if (msg->comment) |
| err_str = msg->comment; |
| else |
| err_str=slurm_strerror(msg->rc); |
| error("%s: failed to process msg: %s", |
| __func__, err_str); |
| } else if (resp_msg.msg_type == |
| RESPONSE_SLURM_RC) { |
| rc = slurm_get_return_code( |
| resp_msg.msg_type, |
| resp_msg.data); |
| error("%s: failed to process msg: %s", |
| __func__, slurm_strerror(rc)); |
| } else { |
| error("%s: Invalid response msg_type: %u", |
| __func__, resp_msg.msg_type); |
| } |
| } |
| (void) slurm_free_msg_data(resp_msg.msg_type, |
| resp_msg.data); |
| |
| FREE_NULL_LIST(ctld_req_msg.my_list); |
| } |
| list_iterator_destroy(cluster_iter); |
| |
| unlock_slurmctld(fed_read_lock); |
| } |
| |
| /* Log the abandoned RPCs */ |
| lock_slurmctld(fed_read_lock); |
| if (!fed_mgr_fed_rec) |
| goto end_it; |
| |
| cluster_iter = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((cluster = list_next(cluster_iter))) { |
| if (cluster->send_rpc == NULL) |
| continue; |
| |
| rpc_iter = list_iterator_create(cluster->send_rpc); |
| while ((rpc_rec = list_next(rpc_iter))) { |
| info("%s: %s JobId=%u request to cluster %s aborted", |
| __func__, rpc_num2string(rpc_rec->msg_type), |
| rpc_rec->job_id, cluster->name); |
| list_delete_item(rpc_iter); |
| } |
| list_iterator_destroy(rpc_iter); |
| FREE_NULL_LIST(cluster->send_rpc); |
| } |
| list_iterator_destroy(cluster_iter); |
| |
| end_it: |
| unlock_slurmctld(fed_read_lock); |
| |
| return NULL; |
| } |
| |
| static void _spawn_threads(void) |
| { |
| slurm_mutex_lock(&agent_mutex); |
| slurm_thread_create(&agent_thread_id, _agent_thread, NULL); |
| slurm_mutex_unlock(&agent_mutex); |
| |
| slurm_mutex_lock(&job_update_mutex); |
| slurm_thread_create(&fed_job_update_thread_id, |
| _fed_job_update_thread, NULL); |
| slurm_mutex_unlock(&job_update_mutex); |
| |
| slurm_mutex_lock(&remote_dep_recv_mutex); |
| slurm_thread_create(&remote_dep_thread_id, |
| _remote_dep_recv_thread, NULL); |
| slurm_mutex_unlock(&remote_dep_recv_mutex); |
| |
| slurm_mutex_lock(&test_dep_mutex); |
| slurm_thread_create(&dep_job_thread_id, _test_dep_job_thread, NULL); |
| slurm_mutex_unlock(&test_dep_mutex); |
| |
| slurm_mutex_lock(&origin_dep_update_mutex); |
| slurm_thread_create(&origin_dep_thread_id, _origin_dep_update_thread, |
| NULL); |
| slurm_mutex_unlock(&origin_dep_update_mutex); |
| } |
| |
| static void _add_missing_fed_job_info() |
| { |
| job_record_t *job_ptr; |
| list_itr_t *job_itr; |
| |
| slurmctld_lock_t job_read_lock = { .job = READ_LOCK }; |
| |
| /* Sanity check and add any missing job_info structs */ |
| lock_slurmctld(job_read_lock); |
| job_itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(job_itr))) { |
| uint32_t origin_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| continue; |
| |
| if (!_find_fed_job_info(job_ptr->job_id)) { |
| info("adding missing fed_job_info for job %pJ", |
| job_ptr); |
| add_fed_job_info(job_ptr); |
| } |
| } |
| list_iterator_destroy(job_itr); |
| unlock_slurmctld(job_read_lock); |
| } |
| |
| extern int fed_mgr_init(void *db_conn) |
| { |
| int rc = SLURM_SUCCESS; |
| uint64_t tmp = 0; |
| slurmdb_federation_cond_t fed_cond; |
| list_t *fed_list = NULL; |
| slurmdb_federation_rec_t *fed = NULL, *state_fed = NULL; |
| slurmdb_cluster_rec_t *state_cluster = NULL; |
| |
| slurm_mutex_lock(&init_mutex); |
| |
| if (inited) { |
| slurm_mutex_unlock(&init_mutex); |
| return SLURM_SUCCESS; |
| } |
| |
| if (!slurm_with_slurmdbd()) |
| goto end_it; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!fed_job_list) |
| fed_job_list = list_create(xfree_ptr); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| /* |
| * fed_job_update_list should only be appended to and popped from. |
| * So rely on the list's lock. If there are ever changes to iterate the |
| * list, then a lock will be needed around the list. |
| */ |
| if (!fed_job_update_list) |
| fed_job_update_list = list_create(_destroy_fed_job_update_info); |
| |
| /* |
| * remote_dep_recv_list should only be appended to and popped from. |
| * So rely on the list's lock. If there are ever changes to iterate the |
| * list, then a lock will be needed around the list. |
| */ |
| if (!remote_dep_recv_list) |
| remote_dep_recv_list = list_create(_destroy_dep_msg); |
| |
| /* |
| * origin_dep_update_list should only be read from or modified with |
| * list_* functions (such as append, pop, count). |
| * So rely on the list's lock. If there are ever changes to iterate the |
| * list, then a lock will be needed around the list. |
| */ |
| if (!origin_dep_update_list) |
| origin_dep_update_list = list_create(_destroy_dep_update_msg); |
| |
| slurm_mutex_lock(&dep_job_list_mutex); |
| if (!remote_dep_job_list) |
| remote_dep_job_list = list_create(_destroy_dep_job); |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| |
| _spawn_threads(); |
| |
| if (running_cache) { |
| debug("Database appears down, reading federations from state file."); |
| fed = _state_load(slurm_conf.state_save_location); |
| if (!fed) { |
| debug2("No federation state"); |
| rc = SLURM_SUCCESS; |
| goto end_it; |
| } |
| } else { |
| state_fed = _state_load(slurm_conf.state_save_location); |
| if (state_fed) |
| state_cluster = list_find_first( |
| state_fed->cluster_list, |
| slurmdb_find_cluster_in_list, |
| slurm_conf.cluster_name); |
| |
| slurmdb_init_federation_cond(&fed_cond, 0); |
| fed_cond.cluster_list = list_create(NULL); |
| list_append(fed_cond.cluster_list, slurm_conf.cluster_name); |
| |
| fed_list = acct_storage_g_get_federations( |
| db_conn, slurm_conf.slurm_user_id, &fed_cond); |
| FREE_NULL_LIST(fed_cond.cluster_list); |
| if (!fed_list) { |
| error("failed to get a federation list"); |
| rc = SLURM_ERROR; |
| goto end_it; |
| } |
| |
| if (list_count(fed_list) == 1) |
| fed = list_pop(fed_list); |
| else if (list_count(fed_list) > 1) { |
| error("got more federations than expected"); |
| rc = SLURM_ERROR; |
| } |
| FREE_NULL_LIST(fed_list); |
| } |
| |
| if (fed) { |
| slurmdb_cluster_rec_t *cluster = NULL; |
| slurmctld_lock_t fedr_jobw_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| if ((cluster = list_find_first(fed->cluster_list, |
| slurmdb_find_cluster_in_list, |
| slurm_conf.cluster_name))) { |
| job_record_t *job_ptr; |
| list_itr_t *itr; |
| |
| _join_federation(fed, cluster, &tmp); |
| |
| /* Find clusters that were removed from the federation |
| * since the last time we got an update */ |
| lock_slurmctld(fedr_jobw_lock); |
| if (state_fed && state_cluster && fed_mgr_fed_rec) |
| _handle_removed_clusters(state_fed, &tmp); |
| |
| /* Send remote dependencies to siblings. */ |
| itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(itr))) { |
| if (job_ptr->details && |
| job_ptr->details->dependency && |
| list_count(job_ptr->details->depend_list) && |
| fed_mgr_submit_remote_dependencies(job_ptr, |
| false, |
| false)) |
| error("%s: Failed to send %pJ dependencies to some or all siblings", |
| __func__, job_ptr); |
| } |
| list_iterator_destroy(itr); |
| unlock_slurmctld(fedr_jobw_lock); |
| } else { |
| slurmdb_destroy_federation_rec(fed); |
| error("failed to get cluster from federation that we requested"); |
| rc = SLURM_ERROR; |
| } |
| } else if (state_fed && state_cluster) { |
| /* cluster has been removed from federation while it was down. |
| * Need to clear up jobs */ |
| slurmctld_lock_t fedw_jobw_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| info("self was removed from federation since last start"); |
| lock_slurmctld(fedw_jobw_lock); |
| fed_mgr_cluster_rec = state_cluster; |
| _cleanup_removed_origin_jobs(); |
| fed_mgr_cluster_rec = NULL; |
| unlock_slurmctld(fedw_jobw_lock); |
| } |
| |
| slurmdb_destroy_federation_rec(state_fed); |
| |
| end_it: |
| /* Call whether state file existed or not. */ |
| _add_missing_fed_job_info(); |
| |
| inited = true; |
| slurm_mutex_unlock(&init_mutex); |
| |
| return rc; |
| } |
| |
| extern int fed_mgr_fini(void) |
| { |
| slurmctld_lock_t fed_write_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| slurm_mutex_lock(&init_mutex); |
| inited = false; |
| slurm_mutex_unlock(&init_mutex); |
| |
| lock_slurmctld(fed_write_lock); |
| /* Call _leave_federation() before slurm_persist_conn_recv_server_fini() |
| * as this will NULL out the cluster's recv persistent connection before |
| * _server_fini() actually destroy's it. That way the cluster's recv |
| * connection won't be pointing to bad memory. */ |
| _leave_federation(); |
| unlock_slurmctld(fed_write_lock); |
| |
| /* Signal threads to end */ |
| slurm_cond_signal(&agent_cond); |
| slurm_cond_signal(&job_update_cond); |
| slurm_cond_signal(&remote_dep_cond); |
| slurm_cond_signal(&test_dep_cond); |
| slurm_cond_signal(&origin_dep_cond); |
| /* _job_watch_thread signaled by _leave_federation() */ |
| |
| slurm_thread_join(agent_thread_id); |
| slurm_thread_join(fed_job_update_thread_id); |
| slurm_thread_join(remote_dep_thread_id); |
| slurm_thread_join(dep_job_thread_id); |
| slurm_thread_join(origin_dep_thread_id); |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| FREE_NULL_LIST(fed_job_list); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| FREE_NULL_LIST(fed_job_update_list); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static void _handle_dependencies_for_modified_fed(uint64_t added_clusters, |
| uint64_t removed_clusters) |
| { |
| uint32_t origin_id; |
| job_record_t *job_ptr; |
| list_itr_t *itr; |
| depend_spec_t find_dep = { 0 }; |
| |
| xassert(verify_lock(JOB_LOCK, WRITE_LOCK)); |
| xassert(verify_lock(FED_LOCK, READ_LOCK)); |
| |
| if (!fed_mgr_cluster_rec) |
| return; |
| |
| find_dep.depend_type = SLURM_DEPEND_SINGLETON; |
| itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(itr))) { |
| if (added_clusters && IS_JOB_PENDING(job_ptr) && |
| _is_fed_job(job_ptr, &origin_id) && |
| find_dependency(job_ptr, &find_dep)) |
| fed_mgr_submit_remote_dependencies(job_ptr, true, |
| false); |
| /* |
| * Make sure any remote dependencies are immediately |
| * marked as invalid. |
| */ |
| if (removed_clusters) |
| test_job_dependency(job_ptr, NULL); |
| } |
| list_iterator_destroy(itr); |
| } |
| |
| extern int fed_mgr_update_feds(slurmdb_update_object_t *update) |
| { |
| uint64_t added_clusters = 0, removed_clusters = 0; |
| list_t *feds = NULL; |
| slurmdb_federation_rec_t *fed = NULL; |
| slurmdb_cluster_rec_t *cluster = NULL; |
| slurmctld_lock_t fedr_jobw_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| slurmctld_lock_t fedw_jobw_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| if (!update->objects) |
| return SLURM_SUCCESS; |
| |
| slurm_mutex_lock(&init_mutex); |
| if (!inited) { |
| slurm_mutex_unlock(&init_mutex); |
| return SLURM_SUCCESS; /* we haven't started the fed mgr and we |
| * can't start it from here, don't worry |
| * all will get set up later. */ |
| } |
| slurm_mutex_unlock(&init_mutex); |
| /* we only want one update happening at a time. */ |
| slurm_mutex_lock(&update_mutex); |
| log_flag(FEDR, "Got a federation update"); |
| |
| feds = update->objects; |
| |
| /* find the federation that this cluster is in. |
| * if it's changed from last time then update stored information. |
| * grab other clusters in federation |
| * establish connections with each cluster in federation */ |
| |
| /* what if a remote cluster is removed from federation. |
| * have to detect that and close the connection to the remote */ |
| while ((fed = list_pop(feds))) { |
| if (fed->cluster_list && |
| (cluster = list_find_first(fed->cluster_list, |
| slurmdb_find_cluster_in_list, |
| slurm_conf.cluster_name))) { |
| /* Find clusters that were removed from the federation |
| * since the last time we got an update */ |
| lock_slurmctld(fedr_jobw_lock); |
| if (fed_mgr_fed_rec) |
| _handle_removed_clusters(fed, |
| &removed_clusters); |
| unlock_slurmctld(fedr_jobw_lock); |
| _join_federation(fed, cluster, &added_clusters); |
| |
| if (added_clusters || removed_clusters) { |
| lock_slurmctld(fedr_jobw_lock); |
| log_flag(DEPENDENCY, "%s: Cluster(s) added: 0x%"PRIx64"; removed: 0x%"PRIx64, |
| __func__, added_clusters, |
| removed_clusters); |
| _handle_dependencies_for_modified_fed( |
| added_clusters, |
| removed_clusters); |
| unlock_slurmctld(fedr_jobw_lock); |
| } |
| break; |
| } |
| slurmdb_destroy_federation_rec(fed); |
| } |
| |
| if (!fed && fed_mgr_fed_rec) { |
| log_flag(FEDR, "Not part of any federation"); |
| lock_slurmctld(fedw_jobw_lock); |
| _cleanup_removed_origin_jobs(); |
| _leave_federation(); |
| unlock_slurmctld(fedw_jobw_lock); |
| } |
| slurm_mutex_unlock(&update_mutex); |
| return SLURM_SUCCESS; |
| } |
| |
| static void _pack_fed_job_info(fed_job_info_t *job_info, buf_t *buffer, |
| uint16_t protocol_version) |
| { |
| int i; |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| pack32(job_info->cluster_lock, buffer); |
| pack32(job_info->job_id, buffer); |
| pack64(job_info->siblings_active, buffer); |
| pack64(job_info->siblings_viable, buffer); |
| |
| for (i = 0; i <= MAX_FED_CLUSTERS; i++) |
| pack32(job_info->updating_sibs[i], buffer); |
| for (i = 0; i <= MAX_FED_CLUSTERS; i++) |
| pack_time(job_info->updating_time[i], buffer); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| } |
| } |
| |
| static int _unpack_fed_job_info(fed_job_info_t **job_info_pptr, buf_t *buffer, |
| uint16_t protocol_version) |
| { |
| int i; |
| fed_job_info_t *job_info = xmalloc(sizeof(fed_job_info_t)); |
| |
| *job_info_pptr = job_info; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| safe_unpack32(&job_info->cluster_lock, buffer); |
| safe_unpack32(&job_info->job_id, buffer); |
| safe_unpack64(&job_info->siblings_active, buffer); |
| safe_unpack64(&job_info->siblings_viable, buffer); |
| |
| for (i = 0; i <= MAX_FED_CLUSTERS; i++) |
| safe_unpack32(&job_info->updating_sibs[i], buffer); |
| for (i = 0; i <= MAX_FED_CLUSTERS; i++) |
| safe_unpack_time(&job_info->updating_time[i], buffer); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| goto unpack_error; |
| } |
| |
| return SLURM_SUCCESS; |
| |
| unpack_error: |
| xfree(job_info); |
| *job_info_pptr = NULL; |
| return SLURM_ERROR; |
| } |
| |
| static void _dump_fed_job_list(buf_t *buffer, uint16_t protocol_version) |
| { |
| uint32_t count = NO_VAL; |
| fed_job_info_t *fed_job_info; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| /* |
| * Need to be in the lock to prevent the window between getting |
| * the count and actually looping on the list. |
| */ |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (fed_job_list) |
| count = list_count(fed_job_list); |
| else |
| count = NO_VAL; |
| |
| pack32(count, buffer); |
| if (count && (count != NO_VAL)) { |
| list_itr_t *itr = list_iterator_create(fed_job_list); |
| while ((fed_job_info = list_next(itr))) { |
| _pack_fed_job_info(fed_job_info, buffer, |
| protocol_version); |
| } |
| list_iterator_destroy(itr); |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| } |
| } |
| |
| static list_t *_load_fed_job_list(buf_t *buffer, uint16_t protocol_version) |
| { |
| int i; |
| uint32_t count; |
| fed_job_info_t *tmp_job_info = NULL; |
| list_t *tmp_list = NULL; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| safe_unpack32(&count, buffer); |
| if (count > NO_VAL) |
| goto unpack_error; |
| if (count != NO_VAL) { |
| tmp_list = list_create(xfree_ptr); |
| |
| for (i = 0; i < count; i++) { |
| if (_unpack_fed_job_info(&tmp_job_info, buffer, |
| protocol_version)) |
| goto unpack_error; |
| list_append(tmp_list, tmp_job_info); |
| } |
| } |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| } |
| |
| return tmp_list; |
| |
| unpack_error: |
| FREE_NULL_LIST(tmp_list); |
| return NULL; |
| } |
| |
| /* |
| * If this changes, then _pack_dep_msg() in slurm_protocol_pack.c probably |
| * needs to change. |
| */ |
| static void _pack_remote_dep_job(job_record_t *job_ptr, buf_t *buffer, |
| uint16_t protocol_version) |
| { |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| pack32(job_ptr->array_job_id, buffer); |
| pack32(job_ptr->array_task_id, buffer); |
| pack_dep_list(job_ptr->details->depend_list, buffer, |
| protocol_version); |
| packstr(job_ptr->details->dependency, buffer); |
| packbool(job_ptr->array_recs ? true : false, buffer); |
| pack32(job_ptr->job_id, buffer); |
| packstr(job_ptr->name, buffer); |
| pack32(job_ptr->user_id, buffer); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| } |
| } |
| |
| /* |
| * If this changes, then _unpack_dep_msg() in slurm_protocol_pack.c probably |
| * needs to change. |
| */ |
| static int _unpack_remote_dep_job(job_record_t **job_pptr, buf_t *buffer, |
| uint16_t protocol_version) |
| { |
| bool is_array; |
| job_record_t *job_ptr; |
| |
| xassert(job_pptr); |
| |
| job_ptr = xmalloc(sizeof *job_ptr); |
| job_ptr->magic = JOB_MAGIC; |
| job_ptr->details = xmalloc(sizeof *(job_ptr->details)); |
| job_ptr->details->magic = DETAILS_MAGIC; |
| job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details)); |
| *job_pptr = job_ptr; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| safe_unpack32(&job_ptr->array_job_id, buffer); |
| safe_unpack32(&job_ptr->array_task_id, buffer); |
| unpack_dep_list(&job_ptr->details->depend_list, buffer, |
| protocol_version); |
| safe_unpackstr(&job_ptr->details->dependency, buffer); |
| safe_unpackbool(&is_array, buffer); |
| if (is_array) |
| job_ptr->array_recs = |
| xmalloc(sizeof *(job_ptr->array_recs)); |
| safe_unpack32(&job_ptr->job_id, buffer); |
| safe_unpackstr(&job_ptr->name, buffer); |
| safe_unpack32(&job_ptr->user_id, buffer); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| goto unpack_error; |
| } |
| |
| return SLURM_SUCCESS; |
| |
| unpack_error: |
| _destroy_dep_job(job_ptr); |
| *job_pptr = NULL; |
| return SLURM_ERROR; |
| } |
| |
| static void _dump_remote_dep_job_list(buf_t *buffer, uint16_t protocol_version) |
| { |
| uint32_t count = NO_VAL; |
| job_record_t *job_ptr; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| slurm_mutex_lock(&dep_job_list_mutex); |
| if (remote_dep_job_list) |
| count = list_count(remote_dep_job_list); |
| else |
| count = NO_VAL; |
| pack32(count, buffer); |
| if (count && (count != NO_VAL)) { |
| list_itr_t *itr = |
| list_iterator_create(remote_dep_job_list); |
| while ((job_ptr = list_next(itr))) |
| _pack_remote_dep_job(job_ptr, buffer, |
| protocol_version); |
| list_iterator_destroy(itr); |
| } |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| } |
| } |
| |
| static list_t *_load_remote_dep_job_list(buf_t *buffer, |
| uint16_t protocol_version) |
| { |
| uint32_t count, i; |
| list_t *tmp_list = NULL; |
| job_record_t *job_ptr = NULL; |
| |
| if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) { |
| safe_unpack32(&count, buffer); |
| if (count > NO_VAL) |
| goto unpack_error; |
| if (count != NO_VAL) { |
| tmp_list = list_create(_destroy_dep_job); |
| for (i = 0; i < count; i++) { |
| if (_unpack_remote_dep_job(&job_ptr, buffer, |
| protocol_version)) |
| goto unpack_error; |
| list_append(tmp_list, job_ptr); |
| } |
| } |
| } else { |
| error("%s: protocol_version %hu not supported.", |
| __func__, protocol_version); |
| goto unpack_error; |
| } |
| return tmp_list; |
| |
| unpack_error: |
| FREE_NULL_LIST(tmp_list); |
| return NULL; |
| } |
| |
| extern int fed_mgr_state_save(void) |
| { |
| int error_code = 0; |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| buf_t *buffer = init_buf(0); |
| |
| DEF_TIMERS; |
| |
| START_TIMER; |
| |
| /* write header: version, time */ |
| pack16(SLURM_PROTOCOL_VERSION, buffer); |
| pack_time(time(NULL), buffer); |
| |
| lock_slurmctld(fed_read_lock); |
| slurmdb_pack_federation_rec(fed_mgr_fed_rec, SLURM_PROTOCOL_VERSION, |
| buffer); |
| unlock_slurmctld(fed_read_lock); |
| |
| _dump_fed_job_list(buffer, SLURM_PROTOCOL_VERSION); |
| _dump_remote_dep_job_list(buffer, SLURM_PROTOCOL_VERSION); |
| |
| error_code = save_buf_to_state(FED_MGR_STATE_FILE, buffer, NULL); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| END_TIMER2(__func__); |
| |
| return error_code; |
| } |
| |
| static slurmdb_federation_rec_t *_state_load(char *state_save_location) |
| { |
| buf_t *buffer = NULL; |
| char *state_file; |
| time_t buf_time; |
| uint16_t ver = 0; |
| int error_code = SLURM_SUCCESS; |
| slurmdb_federation_rec_t *ret_fed = NULL; |
| list_t *tmp_list = NULL; |
| |
| slurmctld_lock_t job_read_lock = { .job = READ_LOCK }; |
| |
| if (!(buffer = state_save_open(FED_MGR_STATE_FILE, &state_file))) { |
| if ((clustername_existed == 1) && (!ignore_state_errors)) |
| fatal("No fed_mgr state file (%s) to recover", |
| state_file); |
| info("No fed_mgr state file (%s) to recover", state_file); |
| xfree(state_file); |
| return NULL; |
| } |
| xfree(state_file); |
| |
| safe_unpack16(&ver, buffer); |
| |
| debug3("Version in fed_mgr_state header is %u", ver); |
| if (ver > SLURM_PROTOCOL_VERSION || ver < SLURM_MIN_PROTOCOL_VERSION) { |
| if (!ignore_state_errors) |
| fatal("Can not recover fed_mgr state, incompatible version, got %u need > %u <= %u, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.", |
| ver, SLURM_MIN_PROTOCOL_VERSION, |
| SLURM_PROTOCOL_VERSION); |
| error("***********************************************"); |
| error("Can not recover fed_mgr state, incompatible version, " |
| "got %u need > %u <= %u", ver, |
| SLURM_MIN_PROTOCOL_VERSION, SLURM_PROTOCOL_VERSION); |
| error("***********************************************"); |
| FREE_NULL_BUFFER(buffer); |
| return NULL; |
| } |
| |
| safe_unpack_time(&buf_time, buffer); |
| |
| error_code = slurmdb_unpack_federation_rec((void **)&ret_fed, ver, |
| buffer); |
| if (error_code != SLURM_SUCCESS) |
| goto unpack_error; |
| else if (!ret_fed || !ret_fed->name || |
| !list_count(ret_fed->cluster_list)) { |
| slurmdb_destroy_federation_rec(ret_fed); |
| ret_fed = NULL; |
| debug("No feds to retrieve from state"); |
| } else { |
| /* We want to free the connections here since they don't exist |
| * anymore, but they were packed when state was saved. */ |
| slurmdb_cluster_rec_t *cluster; |
| list_itr_t *itr = list_iterator_create( |
| ret_fed->cluster_list); |
| while ((cluster = list_next(itr))) { |
| slurm_persist_conn_destroy(cluster->fed.recv); |
| cluster->fed.recv = NULL; |
| slurm_persist_conn_destroy(cluster->fed.send); |
| cluster->fed.send = NULL; |
| } |
| list_iterator_destroy(itr); |
| } |
| |
| /* Load in fed_job_list and transfer objects to actual fed_job_list only |
| * if there is an actual job for the job */ |
| if ((tmp_list = _load_fed_job_list(buffer, ver))) { |
| fed_job_info_t *tmp_info; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (fed_job_list) { |
| lock_slurmctld(job_read_lock); |
| while ((tmp_info = list_pop(tmp_list))) { |
| if (find_job_record(tmp_info->job_id)) |
| list_append(fed_job_list, tmp_info); |
| else |
| xfree(tmp_info); |
| } |
| unlock_slurmctld(job_read_lock); |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| } |
| FREE_NULL_LIST(tmp_list); |
| |
| /* |
| * Load in remote_dep_job_list and transfer to actual |
| * remote_dep_job_list. If the actual list already has that job, |
| * just throw away this one. |
| */ |
| if ((tmp_list = _load_remote_dep_job_list(buffer, ver))) { |
| job_record_t *tmp_dep_job; |
| slurm_mutex_lock(&dep_job_list_mutex); |
| while ((tmp_dep_job = list_pop(tmp_list))) { |
| if (!remote_dep_job_list) |
| remote_dep_job_list = |
| list_create(_destroy_dep_job); |
| if (!list_find_first(remote_dep_job_list, |
| _find_job_by_id, |
| &tmp_dep_job->job_id) && |
| tmp_dep_job->details->dependency) { |
| list_append(remote_dep_job_list, tmp_dep_job); |
| } /* else it will get free'd with FREE_NULL_LIST */ |
| } |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| } |
| FREE_NULL_LIST(tmp_list); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| return ret_fed; |
| |
| unpack_error: |
| if (!ignore_state_errors) |
| fatal("Incomplete fed_mgr state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered."); |
| error("Incomplete fed_mgr state file"); |
| FREE_NULL_BUFFER(buffer); |
| |
| return NULL; |
| } |
| |
| /* |
| * Returns federated job id (<local id> + <cluster id>. |
| * Bits 0-25: Local job id |
| * Bits 26-31: Cluster id |
| */ |
| extern uint32_t fed_mgr_get_job_id(uint32_t orig) |
| { |
| if (!fed_mgr_cluster_rec) |
| return orig; |
| return orig + (fed_mgr_cluster_rec->fed.id << FED_MGR_CLUSTER_ID_BEGIN); |
| } |
| |
| /* |
| * Returns the local job id from a federated job id. |
| */ |
| extern uint32_t fed_mgr_get_local_id(uint32_t id) |
| { |
| return id & MAX_JOB_ID; |
| } |
| |
| /* |
| * Returns the cluster id from a federated job id. |
| */ |
| extern uint32_t fed_mgr_get_cluster_id(uint32_t id) |
| { |
| return id >> FED_MGR_CLUSTER_ID_BEGIN; |
| } |
| |
| extern int fed_mgr_add_sibling_conn(persist_conn_t *persist_conn, |
| char **out_buffer) |
| { |
| slurmdb_cluster_rec_t *cluster = NULL; |
| slurmctld_lock_t fed_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(fed_read_lock); |
| |
| if (!fed_mgr_fed_rec) { |
| unlock_slurmctld(fed_read_lock); |
| *out_buffer = xstrdup_printf( |
| "no fed_mgr_fed_rec on cluster %s yet.", |
| slurm_conf.cluster_name); |
| /* This really isn't an error. If the cluster doesn't know it |
| * is in a federation this could happen on the initial |
| * connection from a sibling that found out about the addition |
| * before I did. |
| */ |
| debug("%s: %s", __func__, *out_buffer); |
| /* The other side needs to see this as an error though or the |
| * connection won't be completely established. |
| */ |
| return SLURM_ERROR; |
| } |
| |
| if (!fed_mgr_cluster_rec) { |
| unlock_slurmctld(fed_read_lock); |
| *out_buffer = xstrdup_printf( |
| "no fed_mgr_cluster_rec on cluster %s? " |
| "This should never happen", |
| slurm_conf.cluster_name); |
| error("%s: %s", __func__, *out_buffer); |
| return SLURM_ERROR; |
| } |
| |
| if (!(cluster = |
| fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) { |
| unlock_slurmctld(fed_read_lock); |
| *out_buffer = xstrdup_printf( |
| "%s isn't a known sibling of ours, but tried to connect to cluster %s federation %s", |
| persist_conn->cluster_name, slurm_conf.cluster_name, |
| fed_mgr_fed_rec->name); |
| error("%s: %s", __func__, *out_buffer); |
| return SLURM_ERROR; |
| } |
| |
| persist_conn->callback_fini = _persist_callback_fini; |
| persist_conn->flags |= PERSIST_FLAG_ALREADY_INITED; |
| |
| /* If this pointer exists it will be handled by the persist_conn code, |
| * don't free |
| */ |
| //slurm_persist_conn_destroy(cluster->fed.recv); |
| |
| /* Preserve the persist_conn so that the cluster can get the remote |
| * side's hostname and port to talk back to if it doesn't have it yet. |
| * See _open_controller_conn(). |
| * Don't lock the cluster's lock here because a (almost)deadlock |
| * could occur if this cluster is opening a connection to the remote |
| * cluster at the same time the remote cluster is connecting to this |
| * cluster since the both sides will have the mutex locked in order to |
| * send/recv. If it did happen the connection will eventually |
| * timeout and resolved itself. */ |
| cluster->fed.recv = persist_conn; |
| |
| slurm_persist_conn_recv_thread_init(persist_conn, |
| conn_g_get_fd(persist_conn |
| ->tls_conn), |
| -1, persist_conn); |
| _q_send_job_sync(cluster->name); |
| |
| unlock_slurmctld(fed_read_lock); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Convert comma separated list of cluster names to bitmap of cluster ids. |
| */ |
| static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap) |
| { |
| int rc = SLURM_SUCCESS; |
| uint64_t cluster_ids = 0; |
| list_t *cluster_names = NULL; |
| |
| xassert(clusters); |
| |
| if (!xstrcasecmp(clusters, "all") || |
| (clusters && (*clusters == '\0'))) { |
| cluster_ids = _get_all_sibling_bits(); |
| goto end_it; |
| } |
| |
| cluster_names = list_create(xfree_ptr); |
| if (slurm_addto_char_list(cluster_names, clusters)) { |
| list_itr_t *itr = list_iterator_create(cluster_names); |
| char *cluster_name; |
| slurmdb_cluster_rec_t *sibling; |
| |
| while ((cluster_name = list_next(itr))) { |
| if (!(sibling = |
| fed_mgr_get_cluster_by_name(cluster_name))) { |
| error("didn't find requested cluster name %s in list of federated clusters", |
| cluster_name); |
| rc = SLURM_ERROR; |
| break; |
| } |
| |
| cluster_ids |= FED_SIBLING_BIT(sibling->fed.id); |
| } |
| list_iterator_destroy(itr); |
| } |
| FREE_NULL_LIST(cluster_names); |
| |
| end_it: |
| if (cluster_bitmap) |
| *cluster_bitmap = cluster_ids; |
| |
| return rc; |
| } |
| |
| /* Update remote sibling job's viable_siblings bitmaps. |
| * |
| * IN job_id - job_id of job to update. |
| * IN job_desc - job_desc to update job_id with. |
| * IN viable_sibs - viable siblings bitmap to send to sibling jobs. |
| * IN update_sibs - bitmap of siblings to update. |
| */ |
| extern int fed_mgr_update_job(uint32_t job_id, job_desc_msg_t *job_desc, |
| uint64_t update_sibs, uid_t uid) |
| { |
| list_itr_t *sib_itr; |
| slurmdb_cluster_rec_t *sibling; |
| fed_job_info_t *job_info; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!(job_info = _find_fed_job_info(job_id))) { |
| error("Didn't find JobId=%u in fed_job_list", job_id); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| return SLURM_ERROR; |
| } |
| |
| sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((sibling = list_next(sib_itr))) { |
| /* Local is handled outside */ |
| if (sibling == fed_mgr_cluster_rec) |
| continue; |
| |
| if (!(update_sibs & FED_SIBLING_BIT(sibling->fed.id))) |
| continue; |
| |
| if (_persist_update_job(sibling, job_id, job_desc, uid)) { |
| error("failed to update sibling job on sibling %s", |
| sibling->name); |
| continue; |
| } |
| |
| job_info->updating_sibs[sibling->fed.id]++; |
| job_info->updating_time[sibling->fed.id] = time(NULL); |
| } |
| list_iterator_destroy(sib_itr); |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Submit sibling jobs to designated siblings. |
| * |
| * Will update job_desc->fed_siblings_active with the successful submissions. |
| * Will not send to siblings if they are in |
| * job_desc->fed_details->siblings_active. |
| * |
| * IN job_desc - job_desc containing job_id and fed_siblings_viable of job to be |
| * submitted. |
| * IN msg - contains the original job_desc buffer to send to the siblings. |
| * IN alloc_only - true if just an allocation. false if a batch job. |
| * IN dest_sibs - bitmap of viable siblings to submit to. |
| * RET returns SLURM_SUCCESS if all siblings received the job successfully or |
| * SLURM_ERROR if any siblings failed to receive the job. If a sibling |
| * fails, then the successful siblings will be updated with the correct |
| * sibling bitmap. |
| */ |
| static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg, |
| bool alloc_only, uint64_t dest_sibs, |
| uint16_t start_protocol_version) |
| { |
| int ret_rc = SLURM_SUCCESS; |
| list_itr_t *sib_itr; |
| sib_msg_t sib_msg = {0}; |
| slurmdb_cluster_rec_t *sibling = NULL; |
| slurm_msg_t req_msg; |
| uint16_t last_rpc_version = NO_VAL16; |
| buf_t *buffer = NULL; |
| |
| xassert(job_desc); |
| xassert(msg); |
| |
| sib_msg.data_type = msg->msg_type; |
| sib_msg.fed_siblings = job_desc->fed_siblings_viable; |
| sib_msg.group_id = job_desc->group_id; |
| sib_msg.job_id = job_desc->job_id; |
| sib_msg.resp_host = job_desc->resp_host; |
| sib_msg.submit_host = job_desc->alloc_node; |
| sib_msg.user_id = job_desc->user_id; |
| sib_msg.submit_proto_ver = start_protocol_version; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SIB_MSG; |
| req_msg.data = &sib_msg; |
| |
| sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((sibling = list_next(sib_itr))) { |
| int rc; |
| if (sibling == fed_mgr_cluster_rec) |
| continue; |
| |
| /* Only send to specific siblings */ |
| if (!(dest_sibs & FED_SIBLING_BIT(sibling->fed.id))) |
| continue; |
| |
| /* skip sibling if the sibling already has a job */ |
| if (job_desc->fed_siblings_active & |
| FED_SIBLING_BIT(sibling->fed.id)) |
| continue; |
| |
| if (alloc_only) |
| sib_msg.sib_msg_type = FED_JOB_SUBMIT_INT; |
| else |
| sib_msg.sib_msg_type = FED_JOB_SUBMIT_BATCH; |
| |
| /* Pack message buffer according to sibling's rpc version. A |
| * submission from a client will already have a buffer with the |
| * packed job_desc from the client. If this controller is |
| * submitting new sibling jobs then the buffer needs to be |
| * packed according to each siblings rpc_version. */ |
| if (!msg->buffer && |
| (last_rpc_version != sibling->rpc_version)) { |
| FREE_NULL_BUFFER(buffer); |
| msg->protocol_version = sibling->rpc_version; |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(msg, buffer); |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_version = msg->protocol_version; |
| |
| last_rpc_version = sibling->rpc_version; |
| } |
| |
| /* |
| * We have a buffer which means, we are submitting new sibling |
| * jobs from a client submission. If the sibling is the same or |
| * higher protocol version as the client we just send the packed |
| * buffer from the client, otherwise we need to get an |
| * unmmodified job_desc_t by unpacking the client's job_desc_msg |
| * and repack it at the sibling's version. |
| */ |
| if (msg->buffer) { |
| if (sibling->rpc_version >= msg->protocol_version) { |
| sib_msg.data_buffer = msg->buffer; |
| sib_msg.data_offset = msg->body_offset; |
| sib_msg.data_version = msg->protocol_version; |
| |
| /* Don't pack buffer again unless the version changed */ |
| } else if (last_rpc_version != sibling->rpc_version) { |
| slurm_msg_t tmp_msg; |
| slurm_msg_t_init(&tmp_msg); |
| tmp_msg.msg_type = msg->msg_type; |
| tmp_msg.protocol_version = |
| msg->protocol_version; |
| set_buf_offset(msg->buffer, msg->body_offset); |
| |
| unpack_msg(&tmp_msg, msg->buffer); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| tmp_msg.protocol_version = sibling->rpc_version; |
| buffer = init_buf(BUF_SIZE); |
| pack_msg(&tmp_msg, buffer); |
| sib_msg.data_buffer = buffer; |
| sib_msg.data_offset = 0; |
| sib_msg.data_version = msg->protocol_version; |
| |
| last_rpc_version = sibling->rpc_version; |
| } |
| } |
| |
| req_msg.protocol_version = sibling->rpc_version; |
| |
| if (!(rc = _queue_rpc(sibling, &req_msg, 0, false))) |
| job_desc->fed_siblings_active |= |
| FED_SIBLING_BIT(sibling->fed.id); |
| ret_rc |= rc; |
| } |
| list_iterator_destroy(sib_itr); |
| |
| FREE_NULL_BUFFER(buffer); |
| |
| return ret_rc; |
| } |
| |
| /* |
| * Prepare and submit new sibling jobs built from an existing job. |
| * |
| * IN job_ptr - job to submit to remote siblings. |
| * IN dest_sibs - bitmap of viable siblings to submit to. |
| */ |
| static int _prepare_submit_siblings(job_record_t *job_ptr, uint64_t dest_sibs) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id; |
| job_desc_msg_t *job_desc; |
| slurm_msg_t msg; |
| |
| xassert(job_ptr); |
| xassert(job_ptr->details); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "submitting new siblings for %pJ", job_ptr); |
| |
| if (!(job_desc = copy_job_record_to_job_desc(job_ptr))) |
| return SLURM_ERROR; |
| |
| /* |
| * Since job_ptr could have had defaults filled on the origin cluster, |
| * clear these before sibling submission if default flag is set |
| */ |
| if (job_desc->bitflags & USE_DEFAULT_ACCT) |
| xfree(job_desc->account); |
| if (job_desc->bitflags & USE_DEFAULT_PART) |
| xfree(job_desc->partition); |
| if (job_desc->bitflags & USE_DEFAULT_QOS) |
| xfree(job_desc->qos); |
| if (job_desc->bitflags & USE_DEFAULT_WCKEY) |
| xfree(job_desc->wckey); |
| |
| /* Have to pack job_desc into a buffer. _submit_sibling_jobs will pack |
| * the job_desc according to each sibling's rpc_version. */ |
| slurm_msg_t_init(&msg); |
| msg.msg_type = REQUEST_RESOURCE_ALLOCATION; |
| msg.data = job_desc; |
| |
| if (_submit_sibling_jobs(job_desc, &msg, false, dest_sibs, |
| job_ptr->start_protocol_ver)) |
| error("Failed to submit fed job to siblings"); |
| |
| /* mark this cluster as an active sibling */ |
| if (job_desc->fed_siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) |
| job_desc->fed_siblings_active |= |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id); |
| |
| /* Add new active jobs to siblings_active bitmap */ |
| job_ptr->fed_details->siblings_active |= job_desc->fed_siblings_active; |
| update_job_fed_details(job_ptr); |
| |
| /* free the environment since all strings are stored in one |
| * xmalloced buffer */ |
| if (job_desc->environment) { |
| xfree(job_desc->environment[0]); |
| xfree(job_desc->environment); |
| job_desc->env_size = 0; |
| } |
| slurm_free_job_desc_msg(job_desc); |
| |
| return rc; |
| } |
| |
| static uint64_t _get_all_sibling_bits() |
| { |
| list_itr_t *itr; |
| slurmdb_cluster_rec_t *cluster; |
| uint64_t sib_bits = 0; |
| |
| if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) |
| goto fini; |
| |
| itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((cluster = list_next(itr))) { |
| sib_bits |= FED_SIBLING_BIT(cluster->fed.id); |
| } |
| list_iterator_destroy(itr); |
| |
| fini: |
| return sib_bits; |
| } |
| |
| static int _remove_inactive_sibs(void *object, void *arg) |
| { |
| slurmdb_cluster_rec_t *sibling = (slurmdb_cluster_rec_t *)object; |
| uint64_t *viable_sibs = (uint64_t *)arg; |
| uint32_t cluster_state = sibling->fed.state; |
| int base_state = (cluster_state & CLUSTER_FED_STATE_BASE); |
| bool drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN); |
| |
| if (drain_flag || |
| (base_state == CLUSTER_FED_STATE_INACTIVE)) |
| *viable_sibs &= ~(FED_SIBLING_BIT(sibling->fed.id)); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static uint64_t _get_viable_sibs(char *req_clusters, uint64_t feature_sibs, |
| bool is_array_job, char **err_msg) |
| { |
| uint64_t viable_sibs = _get_all_sibling_bits(); |
| if (req_clusters) |
| _validate_cluster_names(req_clusters, &viable_sibs); |
| if (feature_sibs) |
| viable_sibs &= feature_sibs; |
| |
| /* filter out clusters that are inactive or draining */ |
| list_for_each(fed_mgr_fed_rec->cluster_list, _remove_inactive_sibs, |
| &viable_sibs); |
| |
| if (is_array_job) { /* lock array jobs to local cluster */ |
| uint32_t tmp_viable = viable_sibs & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id); |
| if (viable_sibs && !tmp_viable) { |
| info("federated job arrays must run on local cluster"); |
| if (err_msg) { |
| xfree(*err_msg); |
| xstrfmtcat(*err_msg, "federated job arrays must run on local cluster"); |
| } |
| } |
| viable_sibs = tmp_viable; |
| } |
| |
| return viable_sibs; |
| } |
| |
| static void _add_remove_sibling_jobs(job_record_t *job_ptr) |
| { |
| fed_job_info_t *job_info; |
| uint32_t origin_id = 0; |
| uint64_t new_sibs = 0, old_sibs = 0, add_sibs = 0, |
| rem_sibs = 0, feature_sibs = 0; |
| |
| xassert(job_ptr); |
| |
| origin_id = fed_mgr_get_cluster_id(job_ptr->job_id); |
| |
| /* if job is not pending then remove removed siblings and add |
| * new siblings. */ |
| old_sibs = job_ptr->fed_details->siblings_active; |
| |
| _validate_cluster_features(job_ptr->details->cluster_features, |
| &feature_sibs); |
| |
| new_sibs = _get_viable_sibs(job_ptr->clusters, feature_sibs, |
| job_ptr->array_recs ? true : false, NULL); |
| job_ptr->fed_details->siblings_viable = new_sibs; |
| |
| add_sibs = new_sibs & ~old_sibs; |
| rem_sibs = ~new_sibs & old_sibs; |
| |
| if (rem_sibs) { |
| time_t now = time(NULL); |
| _revoke_sibling_jobs(job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| rem_sibs, now); |
| if (fed_mgr_is_origin_job(job_ptr) && |
| (rem_sibs & FED_SIBLING_BIT(origin_id))) { |
| fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0, |
| now); |
| } |
| |
| job_ptr->fed_details->siblings_active &= ~rem_sibs; |
| } |
| |
| /* Don't submit new siblings if the job is held */ |
| if (job_ptr->priority != 0 && add_sibs) |
| _prepare_submit_siblings( |
| job_ptr, |
| job_ptr->fed_details->siblings_viable); |
| |
| /* unrevoke the origin job */ |
| if (fed_mgr_is_origin_job(job_ptr) && |
| (add_sibs & FED_SIBLING_BIT(origin_id))) |
| job_state_unset_flag(job_ptr, JOB_REVOKED); |
| |
| /* Can't have the mutex while calling fed_mgr_job_revoke because it will |
| * lock the mutex as well. */ |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if ((job_info = _find_fed_job_info(job_ptr->job_id))) { |
| job_info->siblings_viable = |
| job_ptr->fed_details->siblings_viable; |
| job_info->siblings_active = |
| job_ptr->fed_details->siblings_active; |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| /* Update where sibling jobs are running */ |
| update_job_fed_details(job_ptr); |
| } |
| |
| static bool _job_has_pending_updates(fed_job_info_t *job_info) |
| { |
| int i; |
| xassert(job_info); |
| static const int UPDATE_DELAY = 60; |
| time_t now = time(NULL); |
| |
| for (i = 1; i <= MAX_FED_CLUSTERS; i++) { |
| if (job_info->updating_sibs[i]) { |
| if (job_info->updating_time[i] > (now - UPDATE_DELAY)) { |
| log_flag(FEDR, "JobId=%u is waiting for %d update responses from cluster id %d", |
| job_info->job_id, |
| job_info->updating_sibs[i], i); |
| return true; |
| } else { |
| log_flag(FEDR, "JobId=%u is had pending updates (%d) for cluster id %d, but haven't heard back from it for %ld seconds. Clearing the cluster's updating state", |
| job_info->job_id, |
| job_info->updating_sibs[i], i, |
| now - job_info->updating_time[i]); |
| job_info->updating_sibs[i] = 0; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /* |
| * Validate requested job cluster features against each cluster's features. |
| * |
| * IN spec_features - cluster features that the job requested. |
| * OUT cluster_bitmap - bitmap of clusters that have matching features. |
| * RET SLURM_ERROR if no cluster has any of the requested features, |
| * SLURM_SUCCESS otherwise. |
| */ |
| static int _validate_cluster_features(char *spec_features, |
| uint64_t *cluster_bitmap) |
| { |
| int rc = SLURM_SUCCESS; |
| bool negative_logic = false; |
| uint64_t feature_sibs = 0; |
| char *feature = NULL; |
| slurmdb_cluster_rec_t *sib; |
| list_t *req_features = NULL; |
| list_itr_t *feature_itr, *sib_itr; |
| |
| if (!spec_features || !fed_mgr_fed_rec) { |
| if (cluster_bitmap) |
| *cluster_bitmap = feature_sibs; |
| return rc; |
| } |
| |
| if (*spec_features == '\0') { |
| if (cluster_bitmap) |
| *cluster_bitmap = _get_all_sibling_bits(); |
| return rc; |
| } |
| |
| req_features = list_create(xfree_ptr); |
| slurm_addto_char_list(req_features, spec_features); |
| |
| feature_itr = list_iterator_create(req_features); |
| sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| |
| feature = list_peek(req_features); |
| if (feature && feature[0] == '!') { |
| feature_sibs = _get_all_sibling_bits(); |
| negative_logic = true; |
| } |
| |
| while ((feature = list_next(feature_itr))) { |
| if (negative_logic && feature[0] == '!') |
| feature++; |
| bool found = false; |
| while ((sib = list_next(sib_itr))) { |
| if (sib->fed.feature_list && |
| list_find_first(sib->fed.feature_list, |
| slurm_find_char_in_list, feature)) { |
| if (negative_logic) { |
| feature_sibs &= |
| ~FED_SIBLING_BIT(sib->fed.id); |
| } else { |
| feature_sibs |= |
| FED_SIBLING_BIT(sib->fed.id); |
| } |
| found = true; |
| } |
| } |
| |
| if (!found) { |
| error("didn't find at least one cluster with the feature '%s'", |
| feature); |
| rc = SLURM_ERROR; |
| goto end_features; |
| } |
| if (negative_logic && !feature_sibs) { |
| error("eliminated all viable clusters with constraint '%s'", |
| feature); |
| rc = SLURM_ERROR; |
| goto end_features; |
| } |
| list_iterator_reset(sib_itr); |
| } |
| end_features: |
| list_iterator_destroy(sib_itr); |
| list_iterator_destroy(feature_itr); |
| FREE_NULL_LIST(req_features); |
| |
| if (cluster_bitmap) |
| *cluster_bitmap = feature_sibs; |
| |
| return rc; |
| } |
| |
| extern void fed_mgr_remove_remote_dependencies(job_record_t *job_ptr) |
| { |
| uint32_t origin_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id) || |
| !fed_mgr_is_origin_job(job_ptr) || !job_ptr->details) |
| return; |
| |
| fed_mgr_submit_remote_dependencies(job_ptr, false, true); |
| } |
| |
| static int _add_to_send_list(void *object, void *arg) |
| { |
| depend_spec_t *dependency = (depend_spec_t *)object; |
| uint64_t *send_sib_bits = (uint64_t *)arg; |
| uint32_t cluster_id; |
| |
| if ((dependency->depend_type == SLURM_DEPEND_SINGLETON) && |
| !disable_remote_singleton) { |
| *send_sib_bits |= _get_all_sibling_bits(); |
| /* Negative value short-circuits list_for_each */ |
| return -1; |
| } |
| if (!(dependency->depend_flags & SLURM_FLAGS_REMOTE) || |
| (dependency->depend_state != DEPEND_NOT_FULFILLED)) |
| return SLURM_SUCCESS; |
| cluster_id = fed_mgr_get_cluster_id(dependency->job_id); |
| *send_sib_bits |= FED_SIBLING_BIT(cluster_id); |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Send dependencies of job_ptr to siblings. |
| * |
| * If the dependency string is NULL, that means we're telling the siblings |
| * to delete that dependency. Send empty string to indicate that. |
| * |
| * If send_all_sibs == true, then send dependencies to all siblings. Otherwise, |
| * only send dependencies to siblings that own the remote jobs that job_ptr |
| * depends on. I.e., if a sibling doesn't own any jobs that job_ptr depends on, |
| * we won't send job_ptr's dependencies to that sibling. |
| * |
| * If clear_dependencies == true, then clear the dependencies on the siblings |
| * where dependencies reside. In this case, use the job's dependency list to |
| * find out which siblings to send the RPC to if the list is non-NULL. If the |
| * list is NULL, then we have to send to all siblings. |
| */ |
| extern int fed_mgr_submit_remote_dependencies(job_record_t *job_ptr, |
| bool send_all_sibs, |
| bool clear_dependencies) |
| { |
| int rc = SLURM_SUCCESS; |
| uint64_t send_sib_bits = 0; |
| list_itr_t *sib_itr; |
| slurm_msg_t req_msg; |
| dep_msg_t dep_msg = { 0 }; |
| slurmdb_cluster_rec_t *sibling; |
| uint32_t origin_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| xassert(job_ptr->details); |
| |
| dep_msg.job_id = job_ptr->job_id; |
| dep_msg.job_name = job_ptr->name; |
| dep_msg.array_job_id = job_ptr->array_job_id; |
| dep_msg.array_task_id = job_ptr->array_task_id; |
| dep_msg.is_array = job_ptr->array_recs ? true : false; |
| dep_msg.user_id = job_ptr->user_id; |
| |
| if (!job_ptr->details->dependency || clear_dependencies) |
| /* |
| * Since we have to pack these values, set dependency to empty |
| * string and set depend_list to an empty list so we have |
| * data to pack. |
| */ |
| dep_msg.dependency = ""; |
| else |
| dep_msg.dependency = job_ptr->details->dependency; |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_SEND_DEP; |
| req_msg.data = &dep_msg; |
| |
| if (!job_ptr->details->depend_list) |
| send_all_sibs = true; |
| if (!send_all_sibs) { |
| list_for_each(job_ptr->details->depend_list, |
| _add_to_send_list, &send_sib_bits); |
| } |
| |
| sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list); |
| while ((sibling = list_next(sib_itr))) { |
| if (sibling == fed_mgr_cluster_rec) |
| continue; |
| /* |
| * If we aren't sending the dependency to all siblings and |
| * there isn't a dependency on this sibling, don't send |
| * an RPC to this sibling. |
| */ |
| if (!send_all_sibs && |
| !(send_sib_bits & FED_SIBLING_BIT(sibling->fed.id))) |
| continue; |
| |
| req_msg.protocol_version = sibling->rpc_version; |
| rc |= _queue_rpc(sibling, &req_msg, job_ptr->job_id, false); |
| } |
| list_iterator_destroy(sib_itr); |
| return rc; |
| } |
| |
| /* submit a federated job. |
| * |
| * IN msg - msg that contains packed job_desc msg to send to siblings. |
| * IN job_desc - original job_desc msg. |
| * IN alloc_only - true if requesting just an allocation (srun/salloc). |
| * IN protocol_version - version of the code the caller is using |
| * OUT job_id_ptr - job_id of allocated job |
| * OUT alloc_code - error_code returned from job_allocate |
| * OUT err_msg - error message returned if any |
| * RET returns SLURM_SUCCESS if the allocation was successful, SLURM_ERROR |
| * otherwise. |
| */ |
| extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc, |
| bool alloc_only, |
| uint32_t *job_id_ptr, int *alloc_code, |
| char **err_msg) |
| { |
| uint64_t feature_sibs = 0; |
| job_record_t *job_ptr = NULL; |
| bool job_held = false; |
| |
| xassert(msg); |
| xassert(job_desc); |
| xassert(job_id_ptr); |
| xassert(alloc_code); |
| xassert(err_msg); |
| |
| if (job_desc->job_id != NO_VAL) { |
| error("attempt by uid %u to set JobId=%u. " |
| "specifying a job_id is not allowed when in a federation", |
| msg->auth_uid, job_desc->job_id); |
| *alloc_code = ESLURM_INVALID_JOB_ID; |
| return SLURM_ERROR; |
| } |
| |
| if (_validate_cluster_features(job_desc->cluster_features, |
| &feature_sibs)) { |
| *alloc_code = ESLURM_INVALID_CLUSTER_FEATURE; |
| return SLURM_ERROR; |
| } |
| |
| /* get job_id now. Can't submit job to get job_id as job_allocate will |
| * change the job_desc. */ |
| job_desc->job_id = get_next_job_id(false); |
| |
| /* Set viable siblings */ |
| job_desc->fed_siblings_viable = |
| _get_viable_sibs(job_desc->clusters, feature_sibs, |
| (job_desc->array_inx) ? true : false, err_msg); |
| if (!job_desc->fed_siblings_viable) { |
| *alloc_code = ESLURM_FED_NO_VALID_CLUSTERS; |
| return SLURM_ERROR; |
| } |
| |
| /* ensure that fed_siblings_active is clear since this is a new job */ |
| job_desc->fed_siblings_active = 0; |
| |
| /* |
| * Submit local job first. Then submit to all siblings. If the local job |
| * fails, then don't worry about sending to the siblings. |
| */ |
| job_desc->het_job_offset = NO_VAL; |
| *alloc_code = job_allocate(job_desc, job_desc->immediate, false, NULL, |
| alloc_only, msg->auth_uid, false, &job_ptr, |
| err_msg, msg->protocol_version); |
| |
| if (!job_ptr || (*alloc_code && job_ptr->job_state == JOB_FAILED)) { |
| /* There may be an rc but the job won't be failed. Will sit in |
| * queue */ |
| info("failed to submit federated job to local cluster"); |
| return SLURM_ERROR; |
| } |
| |
| /* mark this cluster as an active sibling if it's in the viable list */ |
| if (job_desc->fed_siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) |
| job_desc->fed_siblings_active |= |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id); |
| |
| /* Job is not eligible on origin cluster - mark as revoked. */ |
| if (!(job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))) |
| job_state_set_flag(job_ptr, JOB_REVOKED); |
| |
| *job_id_ptr = job_ptr->job_id; |
| |
| /* |
| * Don't submit a job with dependencies to siblings - the origin will |
| * test job dependencies and submit the job to siblings when all |
| * dependencies are fulfilled. |
| * job_allocate() calls job_independent() which sets the JOB_DEPENDENT |
| * flag if the job is dependent, so check this after job_allocate(). |
| */ |
| if ((job_desc->priority == 0) || (job_ptr->bit_flags & JOB_DEPENDENT)) |
| job_held = true; |
| |
| if (job_held) { |
| info("Submitted held federated %pJ to %s(self)", |
| job_ptr, fed_mgr_cluster_rec->name); |
| } else { |
| info("Submitted %sfederated %pJ to %s(self)", |
| (!(job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) ? |
| "tracking " : ""), |
| job_ptr, fed_mgr_cluster_rec->name); |
| } |
| |
| /* Update job before submitting sibling jobs so that it will show the |
| * viable siblings and potentially active local job */ |
| job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active; |
| update_job_fed_details(job_ptr); |
| |
| if (!job_held && _submit_sibling_jobs( |
| job_desc, msg, alloc_only, |
| job_ptr->fed_details->siblings_viable, |
| job_ptr->start_protocol_ver)) |
| info("failed to submit sibling job to one or more siblings"); |
| /* Send remote dependencies to siblings */ |
| if ((job_ptr->bit_flags & JOB_DEPENDENT) && |
| job_ptr->details && job_ptr->details->dependency) |
| if (fed_mgr_submit_remote_dependencies(job_ptr, false, false)) |
| error("%s: %pJ Failed to send remote dependencies to some or all siblings.", |
| __func__, job_ptr); |
| |
| job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active; |
| update_job_fed_details(job_ptr); |
| |
| /* Add record to fed job table */ |
| add_fed_job_info(job_ptr); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* Tests if the job is a tracker only federated job. |
| * Tracker only job: a job that shouldn't run on the local cluster but should be |
| * kept around to facilitate communications for it's sibling jobs on other |
| * clusters. |
| */ |
| extern bool fed_mgr_is_tracker_only_job(job_record_t *job_ptr) |
| { |
| bool rc = false; |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return rc; |
| |
| if (job_ptr->fed_details && |
| (origin_id == fed_mgr_cluster_rec->fed.id) && |
| job_ptr->fed_details->siblings_active && |
| (!(job_ptr->fed_details->siblings_active & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))) |
| rc = true; |
| |
| if (job_ptr->fed_details && |
| job_ptr->fed_details->cluster_lock && |
| job_ptr->fed_details->cluster_lock != fed_mgr_cluster_rec->fed.id) |
| rc = true; |
| |
| return rc; |
| } |
| |
| /* Return the cluster name for the given cluster id. |
| * Must xfree returned string |
| */ |
| extern char *fed_mgr_get_cluster_name(uint32_t id) |
| { |
| slurmdb_cluster_rec_t *sibling; |
| char *name = NULL; |
| |
| if ((sibling = fed_mgr_get_cluster_by_id(id))) { |
| name = xstrdup(sibling->name); |
| } |
| |
| return name; |
| } |
| |
| static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id) |
| { |
| xassert(job_ptr); |
| xassert(origin_id); |
| |
| if (!fed_mgr_cluster_rec) |
| return false; |
| |
| if ((!job_ptr->fed_details) || |
| (!(*origin_id = fed_mgr_get_cluster_id(job_ptr->job_id)))) { |
| debug2("job %pJ not a federated job", job_ptr); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| static int _job_unlock_spec_sibs(job_record_t *job_ptr, uint64_t spec_sibs) |
| { |
| uint32_t cluster_id = fed_mgr_cluster_rec->fed.id; |
| slurmdb_cluster_rec_t *sibling; |
| int sib_id = 1; |
| |
| while (spec_sibs) { |
| if (!(spec_sibs & 1)) |
| goto next_unlock; |
| |
| if (fed_mgr_cluster_rec->fed.id == sib_id) |
| fed_mgr_job_lock_unset(job_ptr->job_id, |
| cluster_id); |
| else if ((sibling = fed_mgr_get_cluster_by_id(sib_id))) |
| _persist_fed_job_unlock(sibling, job_ptr->job_id, |
| cluster_id); |
| next_unlock: |
| spec_sibs >>= 1; |
| sib_id++; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Return SLURM_SUCCESS if all siblings give lock to job; SLURM_ERROR otherwise. |
| */ |
| static int _job_lock_all_sibs(job_record_t *job_ptr) |
| { |
| slurmdb_cluster_rec_t *sibling = NULL; |
| int sib_id = 1; |
| bool all_said_yes = true; |
| uint64_t replied_sibs = 0, tmp_sibs = 0; |
| uint32_t origin_id, cluster_id; |
| |
| xassert(job_ptr); |
| |
| origin_id = fed_mgr_get_cluster_id(job_ptr->job_id); |
| cluster_id = fed_mgr_cluster_rec->fed.id; |
| |
| tmp_sibs = job_ptr->fed_details->siblings_viable & |
| (~FED_SIBLING_BIT(origin_id)); |
| while (tmp_sibs) { |
| if (!(tmp_sibs & 1)) |
| goto next_lock; |
| |
| if (cluster_id == sib_id) { |
| if (!fed_mgr_job_lock_set(job_ptr->job_id, cluster_id)) |
| replied_sibs |= FED_SIBLING_BIT(sib_id); |
| else { |
| all_said_yes = false; |
| break; |
| } |
| } else if (!(sibling = fed_mgr_get_cluster_by_id(sib_id)) || |
| (!sibling->fed.send) || |
| (((persist_conn_t *) sibling->fed.send) < 0)) { |
| /* |
| * Don't consider clusters that are down. They will sync |
| * up later. |
| */ |
| goto next_lock; |
| } else if (!_persist_fed_job_lock(sibling, job_ptr->job_id, |
| cluster_id)) { |
| replied_sibs |= FED_SIBLING_BIT(sib_id); |
| } else { |
| all_said_yes = false; |
| break; |
| } |
| |
| next_lock: |
| tmp_sibs >>= 1; |
| sib_id++; |
| } |
| |
| /* |
| * Have to talk to at least one other sibling -- if there is one -- to |
| * start the job |
| */ |
| if (all_said_yes && |
| (!(job_ptr->fed_details->siblings_viable & |
| ~FED_SIBLING_BIT(cluster_id)) || |
| (replied_sibs & ~(FED_SIBLING_BIT(cluster_id))))) |
| return SLURM_SUCCESS; |
| |
| /* have to release the lock on those that said yes */ |
| _job_unlock_spec_sibs(job_ptr, replied_sibs); |
| |
| return SLURM_ERROR; |
| } |
| |
| static int _slurmdbd_conn_active() |
| { |
| int active = 0; |
| |
| if (acct_storage_g_get_data(acct_db_conn, ACCT_STORAGE_INFO_CONN_ACTIVE, |
| &active) != SLURM_SUCCESS) |
| active = 0; |
| |
| return active; |
| } |
| |
| /* |
| * Attempt to grab the job's federation cluster lock so that the requesting |
| * cluster can attempt to start to the job. |
| * |
| * IN job - job to lock |
| * RET returns SLURM_SUCCESS if the lock was granted, SLURM_ERROR otherwise |
| */ |
| extern int fed_mgr_job_lock(job_record_t *job_ptr) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id, cluster_id; |
| |
| xassert(job_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| cluster_id = fed_mgr_cluster_rec->fed.id; |
| |
| log_flag(FEDR, "attempting fed job lock on %pJ by cluster_id %d", |
| job_ptr, cluster_id); |
| |
| if (origin_id != fed_mgr_cluster_rec->fed.id) { |
| persist_conn_t *origin_conn = NULL; |
| slurmdb_cluster_rec_t *origin_cluster; |
| if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) { |
| info("Unable to find origin cluster for %pJ from origin id %d", |
| job_ptr, origin_id); |
| } else |
| origin_conn = |
| (persist_conn_t *) origin_cluster->fed.send; |
| |
| /* Check dbd is up to make sure ctld isn't on an island. */ |
| if (acct_db_conn && _slurmdbd_conn_active() && |
| (!origin_conn || !origin_conn->tls_conn)) { |
| rc = _job_lock_all_sibs(job_ptr); |
| } else if (origin_cluster) { |
| rc = _persist_fed_job_lock(origin_cluster, |
| job_ptr->job_id, |
| cluster_id); |
| } else { |
| rc = SLURM_ERROR; |
| } |
| |
| if (!rc) { |
| job_ptr->fed_details->cluster_lock = cluster_id; |
| fed_mgr_job_lock_set(job_ptr->job_id, cluster_id); |
| } |
| |
| return rc; |
| } |
| |
| /* origin cluster */ |
| rc = fed_mgr_job_lock_set(job_ptr->job_id, cluster_id); |
| |
| return rc; |
| } |
| |
| extern int fed_mgr_job_lock_set(uint32_t job_id, uint32_t cluster_id) |
| { |
| int rc = SLURM_SUCCESS; |
| fed_job_info_t *job_info; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| |
| log_flag(FEDR, "%s: attempting to set fed JobId=%u lock to %u", |
| __func__, job_id, cluster_id); |
| |
| if (!(job_info = _find_fed_job_info(job_id))) { |
| error("Didn't find JobId=%u in fed_job_list", job_id); |
| rc = SLURM_ERROR; |
| } else if (_job_has_pending_updates(job_info)) { |
| log_flag(FEDR, "%s: cluster %u can't get cluster lock for JobId=%u because it has pending updates", |
| __func__, cluster_id, job_id); |
| rc = SLURM_ERROR; |
| } else if (job_info->cluster_lock && |
| job_info->cluster_lock != cluster_id) { |
| log_flag(FEDR, "%s: fed JobId=%u already locked by cluster %d", |
| __func__, job_id, job_info->cluster_lock); |
| rc = SLURM_ERROR; |
| } else { |
| log_flag(FEDR, "%s: fed JobId=%u locked by %u", |
| __func__, job_id, cluster_id); |
| |
| job_info->cluster_lock = cluster_id; |
| } |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| return rc; |
| } |
| |
| extern bool fed_mgr_job_is_self_owned(job_record_t *job_ptr) |
| { |
| if (!fed_mgr_cluster_rec || !job_ptr->fed_details || |
| (job_ptr->fed_details->cluster_lock == fed_mgr_cluster_rec->fed.id)) |
| return true; |
| |
| return false; |
| } |
| |
| extern bool fed_mgr_job_is_locked(job_record_t *job_ptr) |
| { |
| if (!job_ptr->fed_details || |
| job_ptr->fed_details->cluster_lock) |
| return true; |
| |
| return false; |
| } |
| |
| static void _q_sib_job_start(slurm_msg_t *msg) |
| { |
| sib_msg_t *sib_msg = msg->data; |
| fed_job_update_info_t *job_update_info; |
| |
| /* add todo to remove remote siblings if the origin job */ |
| job_update_info = xmalloc(sizeof(fed_job_update_info_t)); |
| job_update_info->type = FED_JOB_START; |
| job_update_info->job_id = sib_msg->job_id; |
| job_update_info->start_time = sib_msg->start_time; |
| job_update_info->cluster_lock = sib_msg->cluster_id; |
| |
| _append_job_update(job_update_info); |
| } |
| |
| extern int fed_mgr_job_lock_unset(uint32_t job_id, uint32_t cluster_id) |
| { |
| int rc = SLURM_SUCCESS; |
| fed_job_info_t * job_info; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| |
| log_flag(FEDR, "%s: attempting to unlock fed JobId=%u by cluster %u", |
| __func__, job_id, cluster_id); |
| |
| if (!(job_info = _find_fed_job_info(job_id))) { |
| error("Didn't find JobId=%u in fed_job_list", job_id); |
| rc = SLURM_ERROR; |
| } else if (job_info->cluster_lock && |
| job_info->cluster_lock != cluster_id) { |
| error("attempt to unlock sib JobId=%u by cluster %d which doesn't have job lock", |
| job_id, cluster_id); |
| rc = SLURM_ERROR; |
| } else { |
| log_flag(FEDR, "%s: fed JobId=%u unlocked by %u", |
| __func__, job_id, cluster_id); |
| job_info->cluster_lock = 0; |
| } |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| return rc; |
| } |
| |
| /* |
| * Release the job's federation cluster lock so that other cluster's can try to |
| * start the job. |
| * |
| * IN job - job to unlock |
| * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise |
| */ |
| extern int fed_mgr_job_unlock(job_record_t *job_ptr) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id, cluster_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| cluster_id = fed_mgr_cluster_rec->fed.id; |
| |
| log_flag(FEDR, "releasing fed job lock on %pJ by cluster_id %d", |
| job_ptr, cluster_id); |
| |
| if (origin_id != fed_mgr_cluster_rec->fed.id) { |
| persist_conn_t *origin_conn = NULL; |
| slurmdb_cluster_rec_t *origin_cluster; |
| if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) { |
| info("Unable to find origin cluster for %pJ from origin id %d", |
| job_ptr, origin_id); |
| } else { |
| origin_conn = |
| (persist_conn_t *) origin_cluster->fed.send; |
| } |
| |
| if (!origin_conn || !origin_conn->tls_conn) { |
| uint64_t tmp_sibs; |
| tmp_sibs = job_ptr->fed_details->siblings_viable & |
| ~FED_SIBLING_BIT(origin_id); |
| rc = _job_unlock_spec_sibs(job_ptr, tmp_sibs); |
| } else { |
| rc = _persist_fed_job_unlock(origin_cluster, |
| job_ptr->job_id, |
| cluster_id); |
| } |
| |
| if (!rc) { |
| job_ptr->fed_details->cluster_lock = 0; |
| fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id); |
| } |
| |
| return rc; |
| } |
| |
| /* Origin Cluster */ |
| rc = fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id); |
| |
| return rc; |
| } |
| |
| /* |
| * Notify origin cluster that cluster_id started job. |
| * |
| * Cancels remaining sibling jobs. |
| * |
| * IN job_ptr - job_ptr of job to unlock |
| * IN start_time - start_time of the job. |
| * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise |
| */ |
| extern int fed_mgr_job_start(job_record_t *job_ptr, time_t start_time) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id, cluster_id; |
| fed_job_info_t *job_info; |
| |
| xassert(job_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| cluster_id = fed_mgr_cluster_rec->fed.id; |
| |
| log_flag(FEDR, "start fed %pJ by cluster_id %d", |
| job_ptr, cluster_id); |
| |
| if (origin_id != fed_mgr_cluster_rec->fed.id) { |
| persist_conn_t *origin_conn = NULL; |
| slurmdb_cluster_rec_t *origin_cluster; |
| if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) { |
| info("Unable to find origin cluster for %pJ from origin id %d", |
| job_ptr, origin_id); |
| } else { |
| origin_conn = |
| (persist_conn_t *) origin_cluster->fed.send; |
| } |
| |
| if (!origin_conn || !origin_conn->tls_conn) { |
| uint64_t viable_sibs; |
| viable_sibs = job_ptr->fed_details->siblings_viable; |
| viable_sibs &= ~FED_SIBLING_BIT(origin_id); |
| viable_sibs &= ~FED_SIBLING_BIT(cluster_id); |
| _revoke_sibling_jobs(job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| viable_sibs, job_ptr->start_time); |
| rc = SLURM_SUCCESS; |
| } else { |
| rc = _persist_fed_job_start(origin_cluster, |
| job_ptr->job_id, cluster_id, |
| job_ptr->start_time); |
| } |
| |
| if (!rc) { |
| job_ptr->fed_details->siblings_active = |
| FED_SIBLING_BIT(cluster_id); |
| update_job_fed_details(job_ptr); |
| } |
| |
| return rc; |
| |
| } |
| |
| /* Origin Cluster: */ |
| slurm_mutex_lock(&fed_job_list_mutex); |
| |
| if (!(job_info = _find_fed_job_info(job_ptr->job_id))) { |
| error("Didn't find %pJ in fed_job_list", job_ptr); |
| rc = SLURM_ERROR; |
| } else if (!job_info->cluster_lock) { |
| error("attempt to start sib JobId=%u by cluster %u, but it's not locked", |
| job_info->job_id, cluster_id); |
| rc = SLURM_ERROR; |
| } else if (job_info->cluster_lock && |
| (job_info->cluster_lock != cluster_id)) { |
| error("attempt to start sib JobId=%u by cluster %u, which doesn't have job lock", |
| job_info->job_id, cluster_id); |
| rc = SLURM_ERROR; |
| } |
| |
| if (!rc) |
| _fed_job_start_revoke(job_info, job_ptr, start_time); |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| |
| return rc; |
| } |
| |
| /* |
| * Complete the federated job. If the job ran on a cluster other than the |
| * origin_cluster then it notifies the origin cluster that the job finished. |
| * |
| * Tells the origin cluster to revoke the tracking job. |
| * |
| * IN job_ptr - job_ptr of job to complete. |
| * IN return_code - return code of job |
| * IN start_time - start time of the job that actually ran. |
| * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise |
| */ |
| extern int fed_mgr_job_complete(job_record_t *job_ptr, uint32_t return_code, |
| time_t start_time) |
| { |
| uint32_t origin_id; |
| |
| if (job_ptr->bit_flags & SIB_JOB_FLUSH) |
| return SLURM_SUCCESS; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "complete fed %pJ by cluster_id %d", |
| job_ptr, fed_mgr_cluster_rec->fed.id); |
| |
| if (origin_id == fed_mgr_cluster_rec->fed.id) { |
| _revoke_sibling_jobs(job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| job_ptr->fed_details->siblings_active, |
| job_ptr->start_time); |
| return SLURM_SUCCESS; |
| } |
| |
| slurmdb_cluster_rec_t *conn = fed_mgr_get_cluster_by_id(origin_id); |
| if (!conn) { |
| info("Unable to find origin cluster for %pJ from origin id %d", |
| job_ptr, origin_id); |
| return SLURM_ERROR; |
| } |
| |
| return _persist_fed_job_revoke(conn, job_ptr->job_id, |
| job_ptr->job_state, return_code, |
| start_time); |
| } |
| |
| /* |
| * Revoke all sibling jobs. |
| * |
| * IN job_ptr - job to revoke sibling jobs from. |
| * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise. |
| */ |
| extern int fed_mgr_job_revoke_sibs(job_record_t *job_ptr) |
| { |
| uint32_t origin_id; |
| time_t now = time(NULL); |
| |
| xassert(verify_lock(JOB_LOCK, READ_LOCK)); |
| xassert(verify_lock(FED_LOCK, READ_LOCK)); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| if (origin_id != fed_mgr_cluster_rec->fed.id) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "revoke fed %pJ's siblings", job_ptr); |
| |
| _revoke_sibling_jobs(job_ptr->job_id, fed_mgr_cluster_rec->fed.id, |
| job_ptr->fed_details->siblings_active, now); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Revokes the federated job. |
| * |
| * IN job_ptr - job_ptr of job to revoke. |
| * IN job_complete - whether the job is done or not. If completed then sets the |
| * state to JOB_REVOKED | completed_state. JOB_REVOKED otherwise. |
| * IN completed_state - state of completed job. Only use if job_complete==true. |
| * If job_complete==false, then this is unused. |
| * IN exit_code - exit_code of job. |
| * IN start_time - start time of the job that actually ran. |
| * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise |
| */ |
| extern int fed_mgr_job_revoke(job_record_t *job_ptr, bool job_complete, |
| uint32_t completed_state, uint32_t exit_code, |
| time_t start_time) |
| { |
| uint32_t origin_id; |
| uint32_t state = JOB_REVOKED; |
| |
| if (IS_JOB_COMPLETED(job_ptr)) /* job already completed */ |
| return SLURM_SUCCESS; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "revoking fed %pJ (%s)", |
| job_ptr, job_complete ? "REVOKED|CANCELLED" : "REVOKED"); |
| |
| /* Check if the job exited with one of the configured requeue values. */ |
| job_ptr->exit_code = exit_code; |
| if (job_hold_requeue(job_ptr)) { |
| batch_requeue_fini(job_ptr); |
| return SLURM_SUCCESS; |
| } |
| /* |
| * Only set to a "completed" state (i.e., state > JOB_SUSPENDED) |
| * if job_complete is true. |
| */ |
| if (job_complete) { |
| if (completed_state > JOB_SUSPENDED) |
| state |= completed_state; |
| else |
| state |= JOB_CANCELLED; |
| } |
| |
| job_state_set(job_ptr, state); |
| job_ptr->start_time = start_time; |
| job_ptr->end_time = start_time; |
| job_ptr->state_reason = WAIT_NO_REASON; |
| xfree(job_ptr->state_desc); |
| |
| /* |
| * Since the job is purged/revoked quickly on the non-origin side it's |
| * possible that the job_start message has not been sent yet. Send it |
| * now so that the db record gets the uid set -- which the complete |
| * message doesn't send. |
| */ |
| if (!job_ptr->db_index && (origin_id != fed_mgr_cluster_rec->fed.id)) { |
| if (IS_JOB_FINISHED(job_ptr)) |
| jobacct_storage_g_job_start(acct_db_conn, job_ptr); |
| else |
| info("%s: %pJ isn't finished and isn't an origin job (%u != %u) and doesn't have a db_index yet. We aren't sending a start message to the database.", |
| __func__, job_ptr, origin_id, |
| fed_mgr_cluster_rec->fed.id); |
| } |
| |
| job_completion_logger(job_ptr, false); |
| |
| /* Don't remove the origin job */ |
| if (origin_id == fed_mgr_cluster_rec->fed.id) |
| return SLURM_SUCCESS; |
| |
| /* Purge the revoked job -- remote only */ |
| unlink_job_record(job_ptr); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* Convert cluster ids to cluster names. |
| * |
| * RET: return string of comma-separated cluster names. |
| * Must free returned string. |
| */ |
| extern char *fed_mgr_cluster_ids_to_names(uint64_t cluster_ids) |
| { |
| int bit = 1; |
| char *names = NULL; |
| |
| if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) |
| return names; |
| |
| while (cluster_ids) { |
| if (cluster_ids & 1) { |
| slurmdb_cluster_rec_t *sibling; |
| if ((sibling = fed_mgr_get_cluster_by_id(bit))) { |
| xstrfmtcat(names, "%s%s", |
| (names) ? "," : "", sibling->name); |
| } else { |
| error("Couldn't find a sibling cluster with id %d", |
| bit); |
| } |
| } |
| |
| cluster_ids >>= 1; |
| bit++; |
| } |
| |
| return names; |
| } |
| |
| /* |
| * Tests whether a federated job can be requeued. |
| * |
| * If called from the remote cluster (non-origin) then it will send a requeue |
| * request to the origin to have the origin cancel this job. In this case, it |
| * will return success and set the JOB_REQUEUE_FED flag and wait to be killed. |
| * |
| * If it is the origin job, it will also cancel a running remote job. New |
| * federated sibling jobs will be submitted after the job has completed (e.g. |
| * after epilog) in fed_mgr_job_requeue(). |
| * |
| * IN job_ptr - job to requeue. |
| * IN flags - flags for the requeue (e.g. JOB_RECONFIG_FAIL). |
| * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR |
| * otherwise. |
| */ |
| extern int fed_mgr_job_requeue_test(job_record_t *job_ptr, uint32_t flags) |
| { |
| uint32_t origin_id; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| if (origin_id != fed_mgr_cluster_rec->fed.id) { |
| slurmdb_cluster_rec_t *origin_cluster; |
| if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) { |
| error("Unable to find origin cluster for %pJ from origin id %d", |
| job_ptr, origin_id); |
| return SLURM_ERROR; |
| } |
| |
| log_flag(FEDR, "requeuing fed job %pJ on origin cluster %d", |
| job_ptr, origin_id); |
| |
| _persist_fed_job_requeue(origin_cluster, job_ptr->job_id, |
| flags); |
| |
| job_state_set_flag(job_ptr, JOB_REQUEUE_FED); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| log_flag(FEDR, "requeuing fed %pJ by cluster_id %d", |
| job_ptr, fed_mgr_cluster_rec->fed.id); |
| |
| /* If the job is currently running locally, then cancel the running job |
| * and set a flag that it's being requeued. Then when the epilog |
| * complete comes in submit the siblings to the other clusters. |
| * Have to check this after checking for origin else it won't get to the |
| * origin. */ |
| if (IS_JOB_RUNNING(job_ptr)) |
| return SLURM_SUCCESS; |
| |
| /* If a sibling job is running remotely, then cancel the remote job and |
| * wait till job finishes (e.g. after long epilog) and then resubmit the |
| * siblings in fed_mgr_job_requeue(). */ |
| if (IS_JOB_PENDING(job_ptr) && IS_JOB_REVOKED(job_ptr)) { |
| slurmdb_cluster_rec_t *remote_cluster; |
| if (!(remote_cluster = |
| fed_mgr_get_cluster_by_id( |
| job_ptr->fed_details->cluster_lock))) { |
| error("Unable to find remote cluster for %pJ from cluster lock %d", |
| job_ptr, job_ptr->fed_details->cluster_lock); |
| return SLURM_ERROR; |
| } |
| |
| if (_persist_fed_job_cancel(remote_cluster, job_ptr->job_id, |
| SIGKILL, KILL_FED_REQUEUE, 0)) { |
| error("failed to kill/requeue fed %pJ", |
| job_ptr); |
| } |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Submits requeued sibling jobs. |
| * |
| * IN job_ptr - job to requeue. |
| * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR |
| * otherwise. |
| */ |
| extern int fed_mgr_job_requeue(job_record_t *job_ptr) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id; |
| uint64_t feature_sibs = 0; |
| fed_job_info_t *job_info; |
| |
| xassert(job_ptr); |
| xassert(job_ptr->details); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "requeuing fed job %pJ", job_ptr); |
| |
| /* clear where actual siblings were */ |
| job_ptr->fed_details->siblings_active = 0; |
| |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if (!(job_info = _find_fed_job_info(job_ptr->job_id))) { |
| error("%s: failed to find fed job info for fed %pJ", |
| __func__, job_ptr); |
| } |
| |
| /* don't submit siblings for jobs that are held */ |
| if (job_ptr->priority == 0) { |
| job_state_unset_flag(job_ptr, JOB_REQUEUE_FED); |
| |
| update_job_fed_details(job_ptr); |
| |
| /* clear cluster lock */ |
| job_ptr->fed_details->cluster_lock = 0; |
| if (job_info) |
| job_info->cluster_lock = 0; |
| |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| return SLURM_SUCCESS; |
| } |
| |
| /* Don't worry about testing which clusters can start the job the |
| * soonest since they can't start the job for 120 seconds anyways. */ |
| |
| /* Get new viable siblings since the job might just have one viable |
| * sibling listed if the sibling was the cluster that could start the |
| * job the soonest. */ |
| _validate_cluster_features(job_ptr->details->cluster_features, |
| &feature_sibs); |
| job_ptr->fed_details->siblings_viable = |
| _get_viable_sibs(job_ptr->clusters, feature_sibs, |
| job_ptr->array_recs ? true : false, NULL); |
| |
| _prepare_submit_siblings(job_ptr, |
| job_ptr->fed_details->siblings_viable); |
| |
| job_state_unset_flag(job_ptr, JOB_REQUEUE_FED); |
| |
| if (!(job_ptr->fed_details->siblings_viable & |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))) |
| job_state_set_flag(job_ptr, JOB_REVOKED); |
| else |
| job_state_unset_flag(job_ptr, JOB_REVOKED); |
| |
| /* clear cluster lock */ |
| job_ptr->fed_details->cluster_lock = 0; |
| if (job_info) { |
| job_info->cluster_lock = 0; |
| job_info->siblings_viable = |
| job_ptr->fed_details->siblings_viable; |
| job_info->siblings_active = |
| job_ptr->fed_details->siblings_active; |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| return rc; |
| } |
| |
| /* Cancel sibling jobs. Just send request to itself */ |
| static int _cancel_sibling_jobs(job_record_t *job_ptr, uint16_t signal, |
| uint16_t flags, uid_t uid, bool kill_viable) |
| { |
| int id = 1; |
| uint64_t tmp_sibs; |
| persist_conn_t *sib_conn; |
| |
| if (kill_viable) { |
| tmp_sibs = job_ptr->fed_details->siblings_viable; |
| flags |= KILL_NO_SIBS; |
| } else { |
| tmp_sibs = job_ptr->fed_details->siblings_active; |
| flags &= ~KILL_NO_SIBS; |
| } |
| |
| while (tmp_sibs) { |
| if ((tmp_sibs & 1) && |
| (id != fed_mgr_cluster_rec->fed.id)) { |
| slurmdb_cluster_rec_t *cluster = |
| fed_mgr_get_cluster_by_id(id); |
| if (!cluster) { |
| error("couldn't find cluster rec by id %d", id); |
| goto next_job; |
| } |
| |
| /* Don't send request to siblings that are down when |
| * killing viables */ |
| sib_conn = (persist_conn_t *) cluster->fed.send; |
| if (kill_viable && (!sib_conn || !sib_conn->tls_conn)) |
| goto next_job; |
| |
| _persist_fed_job_cancel(cluster, job_ptr->job_id, |
| signal, flags, uid); |
| } |
| |
| next_job: |
| tmp_sibs >>= 1; |
| id++; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* Cancel sibling jobs of a federated job |
| * |
| * IN job_ptr - job to cancel |
| * IN signal - signal to send to job |
| * IN flags - KILL_.* flags |
| * IN uid - uid making request |
| * IN kill_viable - if true cancel viable_sibs, if false cancel active_sibs |
| */ |
| extern int fed_mgr_job_cancel(job_record_t *job_ptr, uint16_t signal, |
| uint16_t flags, uid_t uid, bool kill_viable) |
| { |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return SLURM_SUCCESS; |
| |
| log_flag(FEDR, "cancel fed %pJ by local cluster", job_ptr); |
| |
| _cancel_sibling_jobs(job_ptr, signal, flags, uid, kill_viable); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern bool fed_mgr_job_started_on_sib(job_record_t *job_ptr) |
| { |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| |
| /* |
| * When a sibling starts the job, the job becomes revoked on the origin |
| * and the job's cluster_lock is set to that sibling's id. |
| * Don't use fed_mgr_is_origin_job() because that return true if |
| * _is_fed_job() returns false (the job isn't federated), and that's |
| * the opposite of what we want here. |
| */ |
| return _is_fed_job(job_ptr, &origin_id) && |
| (fed_mgr_cluster_rec->fed.id == origin_id) && |
| IS_JOB_REVOKED(job_ptr) && job_ptr->fed_details->cluster_lock && |
| (job_ptr->fed_details->cluster_lock != |
| fed_mgr_cluster_rec->fed.id); |
| } |
| |
| extern bool fed_mgr_is_job_id_in_fed(uint32_t job_id) |
| { |
| uint32_t cluster_id; |
| |
| if (!fed_mgr_cluster_rec) |
| return false; |
| |
| cluster_id = fed_mgr_get_cluster_id(job_id); |
| if (!cluster_id) |
| return false; |
| |
| return FED_SIBLING_BIT(cluster_id) & _get_all_sibling_bits(); |
| } |
| |
| extern int fed_mgr_is_origin_job(job_record_t *job_ptr) |
| { |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return true; |
| |
| if (fed_mgr_cluster_rec->fed.id != origin_id) |
| return false; |
| |
| return true; |
| } |
| |
| /* |
| * Use this instead of fed_mgr_is_origin_job if job_ptr is not available. |
| */ |
| extern bool fed_mgr_is_origin_job_id(uint32_t job_id) |
| { |
| uint32_t origin_id = fed_mgr_get_cluster_id(job_id); |
| |
| if (!fed_mgr_cluster_rec || !origin_id) { |
| debug2("%s: job %u is not a federated job", __func__, job_id); |
| return true; |
| } |
| |
| if (fed_mgr_cluster_rec->fed.id == origin_id) |
| return true; |
| return false; |
| } |
| |
| /* |
| * Check if all siblings have fulfilled the singleton dependency. |
| * Return true if all clusters have checked in that they've fulfilled this |
| * singleton dependency. |
| * |
| * IN job_ptr - job with dependency to check |
| * IN dep_ptr - dependency to check. If it's not singleton, just return true. |
| * IN set_cluster_bit - if true, set the bit for this cluster indicating |
| * that this cluster has fulfilled the dependency. |
| */ |
| extern bool fed_mgr_is_singleton_satisfied(job_record_t *job_ptr, |
| depend_spec_t *dep_ptr, |
| bool set_cluster_bit) |
| { |
| uint32_t origin_id; |
| uint64_t sib_bits; |
| |
| xassert(job_ptr); |
| xassert(dep_ptr); |
| |
| if (!_is_fed_job(job_ptr, &origin_id) || disable_remote_singleton) |
| return true; |
| if (dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) { |
| error("%s: Got non-singleton dependency (type %u) for %pJ. This should never happen.", |
| __func__, dep_ptr->depend_type, job_ptr); |
| return true; |
| } |
| |
| /* Set the bit for this cluster indicating that it has been satisfied */ |
| if (set_cluster_bit) |
| dep_ptr->singleton_bits |= |
| FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id); |
| |
| if (fed_mgr_cluster_rec->fed.id != origin_id) { |
| return true; |
| } |
| |
| /* |
| * Only test for current siblings; if a sibling was removed but |
| * previously had passed a singleton dependency, that bit may be |
| * set in dep_ptr->singleton_bits. |
| */ |
| sib_bits = _get_all_sibling_bits(); |
| return (dep_ptr->singleton_bits & sib_bits) == sib_bits; |
| } |
| |
| /* |
| * Update a job's required clusters. |
| * |
| * Results in siblings being removed and added. |
| * |
| * IN job_ptr - job to update. |
| * IN spec_clusters - comma-separated list of cluster names. |
| * RET return SLURM_SUCCESS on success, error code otherwise. |
| */ |
| extern int fed_mgr_update_job_clusters(job_record_t *job_ptr, |
| char *spec_clusters) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| xassert(spec_clusters); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) { |
| sched_info("update_job: not a fed job"); |
| rc = SLURM_ERROR; |
| } else if ((!IS_JOB_PENDING(job_ptr)) || |
| job_ptr->fed_details->cluster_lock) { |
| rc = ESLURM_JOB_NOT_PENDING; |
| } else if (!fed_mgr_fed_rec) { |
| sched_info("update_job: setting Clusters on a non-active federated cluster for %pJ", |
| job_ptr); |
| rc = ESLURM_JOB_NOT_FEDERATED; |
| } else if (_validate_cluster_names(spec_clusters, NULL)) { |
| sched_info("update_job: invalid Clusters for %pJ: %s", |
| job_ptr, spec_clusters); |
| rc = ESLURM_INVALID_CLUSTER_NAME; |
| } else { |
| xfree(job_ptr->clusters); |
| if (spec_clusters[0] == '\0') |
| sched_info("update_job: cleared Clusters for %pJ", |
| job_ptr); |
| else if (*spec_clusters) |
| job_ptr->clusters = |
| xstrdup(spec_clusters); |
| |
| if (fed_mgr_is_origin_job(job_ptr)) |
| _add_remove_sibling_jobs(job_ptr); |
| } |
| |
| return rc; |
| } |
| |
| /* |
| * Update a job's cluster features. |
| * |
| * Results in siblings being removed and added. |
| * |
| * IN job_ptr - job to update cluster features. |
| * IN req_features - comma-separated list of feature names. |
| * RET return SLURM_SUCCESS on success, error code otherwise. |
| */ |
| extern int fed_mgr_update_job_cluster_features(job_record_t *job_ptr, |
| char *req_features) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t origin_id; |
| |
| xassert(job_ptr); |
| xassert(req_features); |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) { |
| sched_info("update_job: not a fed job"); |
| rc = SLURM_ERROR; |
| } else if ((!IS_JOB_PENDING(job_ptr)) || |
| job_ptr->fed_details->cluster_lock) { |
| rc = ESLURM_JOB_NOT_PENDING; |
| } else if (!fed_mgr_fed_rec) { |
| sched_info("update_job: setting ClusterFeatures on a non-active federated cluster for %pJ", |
| job_ptr); |
| rc = ESLURM_JOB_NOT_FEDERATED; |
| } else if (_validate_cluster_features(req_features, NULL)) { |
| sched_info("update_job: invalid ClusterFeatures for %pJ", |
| job_ptr); |
| rc = ESLURM_INVALID_CLUSTER_FEATURE; |
| } else { |
| xfree(job_ptr->details->cluster_features); |
| if (req_features[0] == '\0') |
| sched_info("update_job: cleared ClusterFeatures for %pJ", |
| job_ptr); |
| else if (*req_features) |
| job_ptr->details->cluster_features = |
| xstrdup(req_features); |
| |
| if (fed_mgr_is_origin_job(job_ptr)) |
| _add_remove_sibling_jobs(job_ptr); |
| } |
| |
| return rc; |
| } |
| |
| static int _reconcile_fed_job(job_record_t *job_ptr, reconcile_sib_t *rec_sib) |
| { |
| int i; |
| bool found_job = false; |
| job_info_msg_t *remote_jobs_ptr = rec_sib->job_info_msg; |
| uint32_t origin_id = fed_mgr_get_cluster_id(job_ptr->job_id); |
| uint32_t sibling_id = rec_sib->sibling_id; |
| uint64_t sibling_bit = FED_SIBLING_BIT(sibling_id); |
| char *sibling_name = rec_sib->sibling_name; |
| slurm_job_info_t *remote_job = NULL; |
| fed_job_info_t *job_info; |
| |
| xassert(job_ptr); |
| xassert(remote_jobs_ptr); |
| |
| /* |
| * Only look at jobs that: |
| * 1. originate from the remote sibling |
| * 2. originate from this cluster |
| * 3. if the sibling is in the job's viable list. |
| */ |
| if (!job_ptr->fed_details || |
| !job_ptr->details || |
| (job_ptr->details->submit_time >= rec_sib->sync_time) || |
| IS_JOB_COMPLETED(job_ptr) || IS_JOB_COMPLETING(job_ptr) || |
| ((fed_mgr_get_cluster_id(job_ptr->job_id) != sibling_id) && |
| (!fed_mgr_is_origin_job(job_ptr)) && |
| (!(job_ptr->fed_details->siblings_viable & sibling_bit)))) { |
| return SLURM_SUCCESS; |
| } |
| |
| for (i = 0; i < remote_jobs_ptr->record_count; i++) { |
| remote_job = &remote_jobs_ptr->job_array[i]; |
| if (job_ptr->job_id == remote_job->job_id) { |
| found_job = true; |
| break; |
| } |
| } |
| |
| /* Jobs that originated on the remote sibling */ |
| if (origin_id == sibling_id) { |
| if (!found_job || |
| (remote_job && IS_JOB_COMPLETED(remote_job))) { |
| /* origin job is missing on remote sibling or is |
| * completed. Could have been removed from a clean |
| * start. */ |
| info("%s: origin %pJ is missing (or completed) from origin %s. Killing this copy of the job", |
| __func__, job_ptr, sibling_name); |
| job_ptr->bit_flags |= SIB_JOB_FLUSH; |
| job_signal(job_ptr, SIGKILL, KILL_NO_SIBS, 0, false); |
| } else { |
| info("%s: origin %s still has %pJ", |
| __func__, sibling_name, job_ptr); |
| } |
| /* Jobs that are shared between two the siblings -- not originating from |
| * either one */ |
| } else if (origin_id != fed_mgr_cluster_rec->fed.id) { |
| if (!found_job) { |
| /* Only care about jobs that are currently there. */ |
| } else if (IS_JOB_PENDING(job_ptr) && IS_JOB_CANCELLED(remote_job)) { |
| info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin and sibling were down", |
| __func__, job_ptr, sibling_name); |
| job_state_set(job_ptr, JOB_CANCELLED); |
| job_ptr->start_time = remote_job->start_time; |
| job_ptr->end_time = remote_job->end_time; |
| job_ptr->state_reason = WAIT_NO_REASON; |
| xfree(job_ptr->state_desc); |
| job_completion_logger(job_ptr, false); |
| } else if (IS_JOB_PENDING(job_ptr) && |
| (IS_JOB_RUNNING(remote_job) || |
| IS_JOB_COMPLETING(remote_job))) { |
| info("%s: %pJ is running on sibling %s, must have been started while the origin and sibling were down", |
| __func__, job_ptr, sibling_name); |
| |
| fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, |
| remote_job->exit_code, |
| job_ptr->start_time); |
| /* return now because job_ptr have been free'd */ |
| return SLURM_SUCCESS; |
| } else if (IS_JOB_PENDING(job_ptr) && |
| (IS_JOB_COMPLETED(remote_job))) { |
| info("%s: %pJ is completed on sibling %s, must have been started and completed while the origin and sibling were down", |
| __func__, job_ptr, sibling_name); |
| |
| fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, |
| remote_job->exit_code, |
| job_ptr->start_time); |
| /* return now because job_ptr have been free'd */ |
| return SLURM_SUCCESS; |
| } |
| |
| /* Origin Jobs */ |
| } else if (!found_job) { |
| info("%s: didn't find %pJ on cluster %s", |
| __func__, job_ptr, sibling_name); |
| |
| /* Remove from active siblings */ |
| if (!(job_ptr->fed_details->siblings_active & sibling_bit)) { |
| /* The sibling is a viable sibling but the sibling is |
| * not active and there is no job there. This is ok. */ |
| info("%s: %s is a viable but not active sibling of %pJ. This is ok.", |
| __func__, sibling_name, job_ptr); |
| |
| #if 0 |
| /* Don't submit new sibling jobs if they're not found on the cluster. They could |
| * have been removed while the cluster was down. */ |
| } else if (!job_ptr->fed_details->cluster_lock) { |
| /* If the origin job isn't locked, then submit a sibling |
| * to this cluster. */ |
| /* Only do this if it was an active job. Could have been |
| * removed with --cancel-sibling */ |
| info("%s: %s is an active sibling of %pJ, attempting to submit new sibling job to the cluster.", |
| __func__, sibling_name, job_ptr); |
| _prepare_submit_siblings(job_ptr, sibling_bit); |
| #endif |
| } else if (job_ptr->fed_details->cluster_lock == sibling_id) { |
| /* The origin thinks that the sibling was running the |
| * job. It could have completed while this cluster was |
| * down or the sibling removed it by clearing out jobs |
| * (e.g. slurmctld -c). */ |
| info("%s: origin %pJ was running on sibling %s, but it's not there. Assuming that the job completed", |
| __func__, job_ptr, sibling_name); |
| fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, 0, |
| job_ptr->start_time); |
| } else { |
| /* The origin job has a lock but it's not on the sibling |
| * being reconciled. The job could have been started by |
| * another cluster while the sibling was down. Or the |
| * original sibling job submission could have failed. Or |
| * the origin started the job on the different sibling |
| * before the sibling before the sibling went down and |
| * came back up (normal situation). */ |
| info("%s: origin %pJ is currently locked by sibling %d, this is ok", |
| __func__, job_ptr, |
| job_ptr->fed_details->cluster_lock); |
| job_ptr->fed_details->siblings_active &= ~sibling_bit; |
| } |
| } else if (remote_job) { |
| info("%s: %pJ found on remote sibling %s state:%s", |
| __func__, job_ptr, sibling_name, |
| job_state_string(remote_job->job_state)); |
| |
| if (job_ptr->fed_details->cluster_lock == sibling_id) { |
| if (IS_JOB_COMPLETE(remote_job)) { |
| info("%s: %pJ on sibling %s is already completed, completing the origin job", |
| __func__, job_ptr, sibling_name); |
| fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, |
| remote_job->exit_code, |
| job_ptr->start_time); |
| } else if (IS_JOB_CANCELLED(remote_job)) { |
| info("%s: %pJ on sibling %s is already cancelled, completing the origin job", |
| __func__, job_ptr, sibling_name); |
| fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, |
| remote_job->exit_code, |
| job_ptr->start_time); |
| } else if (!IS_JOB_RUNNING(remote_job)) { |
| /* The job could be pending if it was requeued |
| * due to node failure */ |
| info("%s: %pJ on sibling %s has job lock but job is not running (state:%s)", |
| __func__, job_ptr, sibling_name, |
| job_state_string(remote_job->job_state)); |
| } |
| } else if (job_ptr->fed_details->cluster_lock) { |
| /* The remote might have had a sibling job before it |
| * went away and the origin started another job while it |
| * was away. The remote job needs to be revoked. */ |
| info("%s: %pJ found on sibling %s but job is locked by cluster id %d", |
| __func__, job_ptr, sibling_name, |
| job_ptr->fed_details->cluster_lock); |
| |
| if (IS_JOB_PENDING(remote_job)) { |
| info("%s: %pJ is on %s in a pending state but cluster %d has the lock on it -- revoking the remote sibling job", |
| __func__, job_ptr, sibling_name, |
| job_ptr->fed_details->cluster_lock); |
| _revoke_sibling_jobs( |
| job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| sibling_bit, |
| job_ptr->start_time); |
| } else { |
| /* should this job get cancelled? Would have to |
| * check cluster_lock before cancelling it to |
| * make sure that it's not there. */ |
| info("%s: %pJ has a lock on sibling id %d, but found a non-pending job on sibling %s.", |
| __func__, job_ptr, |
| job_ptr->fed_details->cluster_lock, |
| sibling_name); |
| |
| _revoke_sibling_jobs( |
| job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| sibling_bit, |
| job_ptr->start_time); |
| } |
| } else { |
| if (!(job_ptr->fed_details->siblings_active & |
| sibling_bit)) { |
| info("%s: %pJ on sibling %s but it wasn't in the active list. Adding to active list.", |
| __func__, job_ptr, sibling_name); |
| job_ptr->fed_details->siblings_active |= |
| sibling_bit; |
| } |
| if (IS_JOB_CANCELLED(remote_job)) { |
| info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin was down", |
| __func__, job_ptr, sibling_name); |
| job_state_set(job_ptr, JOB_CANCELLED); |
| job_ptr->start_time = remote_job->start_time; |
| job_ptr->end_time = remote_job->end_time; |
| job_ptr->state_reason = WAIT_NO_REASON; |
| xfree(job_ptr->state_desc); |
| job_completion_logger(job_ptr, false); |
| |
| } else if (IS_JOB_COMPLETED(remote_job)) { |
| info("%s: %pJ is completed on sibling %s but the origin cluster wasn't part of starting the job, must have been started while the origin was down", |
| __func__, job_ptr, sibling_name); |
| _do_fed_job_complete(job_ptr, JOB_CANCELLED, |
| remote_job->exit_code, |
| remote_job->start_time); |
| |
| } else if (IS_JOB_RUNNING(remote_job) || |
| IS_JOB_COMPLETING(remote_job)) { |
| info("%s: origin doesn't think that %pJ should be running on sibling %s but it is. %s could have started the job while this cluster was down.", |
| __func__, job_ptr, sibling_name, |
| sibling_name); |
| /* Job was started while we were down. Set this |
| * job to RV and cancel other siblings */ |
| fed_job_info_t *job_info; |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if ((job_info = |
| _find_fed_job_info(job_ptr->job_id))) { |
| job_info->cluster_lock = sibling_id; |
| job_ptr->fed_details->cluster_lock = |
| sibling_id; |
| |
| /* Remove sibling jobs */ |
| _fed_job_start_revoke( |
| job_info, job_ptr, |
| remote_job->start_time); |
| |
| /* Set job as RV to track running job */ |
| fed_mgr_job_revoke( |
| job_ptr, false, |
| JOB_CANCELLED, 0, |
| remote_job->start_time); |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| } |
| /* else all good */ |
| } |
| } |
| |
| /* Update job_info with updated siblings */ |
| slurm_mutex_lock(&fed_job_list_mutex); |
| if ((job_info = _find_fed_job_info(job_ptr->job_id))) { |
| job_info->siblings_viable = |
| job_ptr->fed_details->siblings_viable; |
| job_info->siblings_active = |
| job_ptr->fed_details->siblings_active; |
| } else { |
| error("%s: failed to find fed job info for fed %pJ", |
| __func__, job_ptr); |
| } |
| slurm_mutex_unlock(&fed_job_list_mutex); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Sync jobs with the given sibling name. |
| * |
| * IN sib_name - name of the sibling to sync with. |
| */ |
| static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg, |
| time_t sync_time) |
| { |
| list_itr_t *itr; |
| reconcile_sib_t rec_sib = {0}; |
| slurmdb_cluster_rec_t *sib; |
| job_record_t *job_ptr; |
| |
| if (!(sib = fed_mgr_get_cluster_by_name((char *)sib_name))) { |
| error("Couldn't find sibling by name '%s'", sib_name); |
| return SLURM_ERROR; |
| } |
| |
| rec_sib.sibling_id = sib->fed.id; |
| rec_sib.sibling_name = sib->name; |
| rec_sib.job_info_msg = job_info_msg; |
| rec_sib.sync_time = sync_time; |
| |
| itr = list_iterator_create(job_list); |
| while ((job_ptr = list_next(itr))) |
| _reconcile_fed_job(job_ptr, &rec_sib); |
| list_iterator_destroy(itr); |
| |
| sib->fed.sync_recvd = true; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Remove active sibling from the given job. |
| * |
| * IN job_id - job_id of job to remove active sibling from. |
| * IN sib_name - name of sibling job to remove from active siblings. |
| * RET SLURM_SUCCESS on success, error code on error. |
| */ |
| extern int fed_mgr_remove_active_sibling(uint32_t job_id, char *sib_name) |
| { |
| uint32_t origin_id; |
| job_record_t *job_ptr = NULL; |
| slurmdb_cluster_rec_t *sibling; |
| |
| if (!(job_ptr = find_job_record(job_id))) |
| return ESLURM_INVALID_JOB_ID; |
| |
| if (!_is_fed_job(job_ptr, &origin_id)) |
| return ESLURM_JOB_NOT_FEDERATED; |
| |
| if (job_ptr->fed_details->cluster_lock) |
| return ESLURM_JOB_NOT_PENDING; |
| |
| if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) |
| return ESLURM_INVALID_CLUSTER_NAME; |
| |
| if (job_ptr->fed_details->siblings_active & |
| FED_SIBLING_BIT(sibling->fed.id)) { |
| time_t now = time(NULL); |
| if (fed_mgr_cluster_rec == sibling) |
| fed_mgr_job_revoke(job_ptr, false, 0, JOB_CANCELLED, |
| now); |
| else |
| _revoke_sibling_jobs(job_ptr->job_id, |
| fed_mgr_cluster_rec->fed.id, |
| FED_SIBLING_BIT(sibling->fed.id), |
| now); |
| job_ptr->fed_details->siblings_active &= |
| ~(FED_SIBLING_BIT(sibling->fed.id)); |
| update_job_fed_details(job_ptr); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _q_sib_job_submission(slurm_msg_t *msg, bool interactive_job) |
| { |
| fed_job_update_info_t *job_update_info = NULL; |
| sib_msg_t *sib_msg = msg->data; |
| job_desc_msg_t *job_desc = sib_msg->data; |
| job_desc->job_id = sib_msg->job_id; |
| job_desc->fed_siblings_viable = sib_msg->fed_siblings; |
| job_desc->alloc_node = sib_msg->submit_host; |
| job_desc->user_id = sib_msg->user_id; |
| job_desc->group_id = sib_msg->group_id; |
| |
| /* |
| * If the job has a dependency, it won't be submitted to siblings |
| * or it will be revoked from siblings if it became dependent. |
| * So, the sibling should ignore job_desc->dependency since it's |
| */ |
| xfree(job_desc->dependency); |
| if (interactive_job) |
| job_desc->resp_host = xstrdup(sib_msg->resp_host); |
| |
| /* NULL out the data pointer because we are storing the pointer on the |
| * fed job update queue to be handled later. */ |
| sib_msg->data = NULL; |
| |
| /* set protocol version to that of the client's version so that |
| * the job's start_protocol_version is that of the client's and |
| * not the calling controllers. */ |
| job_update_info = xmalloc(sizeof(fed_job_update_info_t)); |
| |
| job_update_info->job_id = job_desc->job_id; |
| job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name); |
| job_update_info->submit_desc = job_desc; |
| job_update_info->submit_proto_ver = sib_msg->submit_proto_ver; |
| |
| if (interactive_job) |
| job_update_info->type = FED_JOB_SUBMIT_INT; |
| else |
| job_update_info->type = FED_JOB_SUBMIT_BATCH; |
| |
| _append_job_update(job_update_info); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _q_sib_submit_response(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| sib_msg_t *sib_msg; |
| fed_job_update_info_t *job_update_info = NULL; |
| |
| xassert(msg); |
| xassert(msg->conn); |
| |
| sib_msg = msg->data; |
| |
| /* if failure then remove from active siblings */ |
| if (sib_msg && sib_msg->return_code) { |
| log_flag(FEDR, "%s: cluster %s failed to submit sibling JobId=%u. Removing from active_sibs. (error:%d)", |
| __func__, msg->conn->cluster_name, sib_msg->job_id, |
| sib_msg->return_code); |
| |
| job_update_info = xmalloc(sizeof(fed_job_update_info_t)); |
| job_update_info->job_id = sib_msg->job_id; |
| job_update_info->type = FED_JOB_REMOVE_ACTIVE_SIB_BIT; |
| job_update_info->siblings_str = |
| xstrdup(msg->conn->cluster_name); |
| _append_job_update(job_update_info); |
| } |
| |
| return rc; |
| } |
| |
| static int _q_sib_job_update(slurm_msg_t *msg, uint32_t uid) |
| { |
| sib_msg_t *sib_msg = msg->data; |
| job_desc_msg_t *job_desc = sib_msg->data; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| /* NULL out the data pointer because we are storing the pointer on the |
| * fed job update queue to be handled later. */ |
| sib_msg->data = NULL; |
| |
| job_update_info->type = FED_JOB_UPDATE; |
| job_update_info->submit_desc = job_desc; |
| job_update_info->job_id = sib_msg->job_id; |
| job_update_info->uid = uid; |
| job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name); |
| |
| _append_job_update(job_update_info); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| uint32_t req_uid; |
| sib_msg_t *sib_msg = msg->data; |
| job_step_kill_msg_t *kill_msg = sib_msg->data; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| /* NULL out the data pointer because we are storing the pointer on the |
| * fed job update queue to be handled later. */ |
| sib_msg->data = NULL; |
| |
| if (sib_msg->req_uid) |
| req_uid = sib_msg->req_uid; |
| else |
| req_uid = uid; |
| |
| job_update_info->type = FED_JOB_CANCEL; |
| job_update_info->job_id = kill_msg->step_id.job_id; |
| job_update_info->kill_msg = kill_msg; |
| job_update_info->uid = req_uid; |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| static int _q_sib_job_complete(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| sib_msg_t *sib_msg = msg->data; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| job_update_info->type = FED_JOB_COMPLETE; |
| job_update_info->job_id = sib_msg->job_id; |
| job_update_info->job_state = sib_msg->job_state; |
| job_update_info->start_time = sib_msg->start_time; |
| job_update_info->return_code = sib_msg->return_code; |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| static int _q_sib_job_update_response(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| sib_msg_t *sib_msg = msg->data; |
| |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| job_update_info->type = FED_JOB_UPDATE_RESPONSE; |
| job_update_info->job_id = sib_msg->job_id; |
| job_update_info->return_code = sib_msg->return_code; |
| job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name); |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| static int _q_sib_job_requeue(slurm_msg_t *msg, uint32_t uid) |
| { |
| int rc = SLURM_SUCCESS; |
| sib_msg_t *sib_msg = msg->data; |
| requeue_msg_t *req_ptr = sib_msg->data; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| job_update_info->type = FED_JOB_REQUEUE; |
| job_update_info->job_id = req_ptr->job_id; |
| job_update_info->flags = req_ptr->flags; |
| job_update_info->uid = uid; |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| static int _q_send_job_sync(char *sib_name) |
| { |
| int rc = SLURM_SUCCESS; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| job_update_info->type = FED_SEND_JOB_SYNC; |
| job_update_info->submit_cluster = xstrdup(sib_name); |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| static int _q_sib_job_sync(slurm_msg_t *msg) |
| { |
| int rc = SLURM_SUCCESS; |
| sib_msg_t *sib_msg = msg->data; |
| job_info_msg_t *job_info_msg = sib_msg->data; |
| fed_job_update_info_t *job_update_info = |
| xmalloc(sizeof(fed_job_update_info_t)); |
| |
| /* NULL out the data pointer because we are storing the pointer on the |
| * fed job update queue to be handled later. */ |
| sib_msg->data = NULL; |
| |
| job_update_info->type = FED_JOB_SYNC; |
| job_update_info->job_info_msg = job_info_msg; |
| job_update_info->start_time = sib_msg->start_time; |
| job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name); |
| |
| _append_job_update(job_update_info); |
| |
| return rc; |
| } |
| |
| extern int fed_mgr_q_update_origin_dep_msg(slurm_msg_t *msg) |
| { |
| dep_update_origin_msg_t *update_deps; |
| dep_update_origin_msg_t *update_msg = msg->data; |
| |
| log_flag(FEDR, "%s: Got %s: Job %u", |
| __func__, rpc_num2string(msg->msg_type), update_msg->job_id); |
| |
| /* update_msg will get free'd, so copy it */ |
| update_deps = xmalloc(sizeof *update_deps); |
| update_deps->depend_list = update_msg->depend_list; |
| update_deps->job_id = update_msg->job_id; |
| /* |
| * NULL update_msg->depend_list so it doesn't get free'd; we're |
| * using it later. |
| */ |
| update_msg->depend_list = NULL; |
| |
| list_append(origin_dep_update_list, update_deps); |
| slurm_mutex_lock(&origin_dep_update_mutex); |
| slurm_cond_broadcast(&origin_dep_cond); |
| slurm_mutex_unlock(&origin_dep_update_mutex); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern int fed_mgr_q_dep_msg(slurm_msg_t *msg) |
| { |
| dep_msg_t *remote_dependency; |
| dep_msg_t *dep_msg = msg->data; |
| |
| log_flag(FEDR, "%s: Got %s: Job %u", |
| __func__, rpc_num2string(msg->msg_type), dep_msg->job_id); |
| |
| /* dep_msg will get free'd, so copy it */ |
| remote_dependency = xmalloc(sizeof *remote_dependency); |
| remote_dependency->job_id = dep_msg->job_id; |
| remote_dependency->job_name = dep_msg->job_name; |
| remote_dependency->dependency = dep_msg->dependency; |
| /* NULL strings so they don't get free'd */ |
| dep_msg->job_name = NULL; |
| dep_msg->dependency = NULL; |
| remote_dependency->array_task_id = dep_msg->array_task_id; |
| remote_dependency->array_job_id = dep_msg->array_job_id; |
| remote_dependency->is_array = dep_msg->is_array; |
| remote_dependency->user_id = dep_msg->user_id; |
| |
| list_append(remote_dep_recv_list, remote_dependency); |
| slurm_mutex_lock(&remote_dep_recv_mutex); |
| slurm_cond_broadcast(&remote_dep_cond); |
| slurm_mutex_unlock(&remote_dep_recv_mutex); |
| return SLURM_SUCCESS; |
| } |
| |
| extern int fed_mgr_q_sib_msg(slurm_msg_t *msg, uint32_t rpc_uid) |
| { |
| sib_msg_t *sib_msg = msg->data; |
| |
| log_flag(FEDR, "%s: sib_msg_type:%s", |
| __func__, _job_update_type_str(sib_msg->sib_msg_type)); |
| |
| switch (sib_msg->sib_msg_type) { |
| case FED_JOB_CANCEL: |
| _q_sib_job_cancel(msg, rpc_uid); |
| break; |
| case FED_JOB_COMPLETE: |
| _q_sib_job_complete(msg); |
| break; |
| case FED_JOB_REQUEUE: |
| _q_sib_job_requeue(msg, rpc_uid); |
| break; |
| case FED_JOB_START: |
| _q_sib_job_start(msg); |
| break; |
| case FED_JOB_SUBMIT_BATCH: |
| _q_sib_job_submission(msg, false); |
| break; |
| case FED_JOB_SUBMIT_INT: |
| _q_sib_job_submission(msg, true); |
| break; |
| case FED_JOB_SUBMIT_RESP: |
| _q_sib_submit_response(msg); |
| break; |
| case FED_JOB_SYNC: |
| _q_sib_job_sync(msg); |
| break; |
| case FED_JOB_UPDATE: |
| _q_sib_job_update(msg, rpc_uid); |
| break; |
| case FED_JOB_UPDATE_RESPONSE: |
| _q_sib_job_update_response(msg); |
| break; |
| default: |
| error("%s: invalid sib_msg_type: %d", |
| __func__, sib_msg->sib_msg_type); |
| break; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _list_find_not_synced_sib(void *x, void *key) |
| { |
| slurmdb_cluster_rec_t *sib = x; |
| |
| if (sib != fed_mgr_cluster_rec && |
| sib->fed.send && |
| ((persist_conn_t *) sib->fed.send)->tls_conn && |
| !sib->fed.sync_recvd) |
| return 1; |
| |
| return 0; |
| } |
| |
| extern bool fed_mgr_sibs_synced(void) |
| { |
| slurmdb_cluster_rec_t *sib; |
| int dummy = 1; |
| |
| if (!fed_mgr_fed_rec) |
| return true; |
| |
| if ((sib = list_find_first(fed_mgr_fed_rec->cluster_list, |
| _list_find_not_synced_sib, &dummy))) { |
| debug("%s: sibling %s up but not synced yet", |
| __func__, sib->name); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| extern void fed_mgr_test_remote_dependencies(void) |
| { |
| int rc; |
| uint32_t origin_id; |
| bool was_changed; |
| job_record_t *job_ptr; |
| list_itr_t *itr; |
| slurmdb_cluster_rec_t *origin; |
| |
| xassert(verify_lock(JOB_LOCK, READ_LOCK)); |
| xassert(verify_lock(FED_LOCK, READ_LOCK)); |
| |
| if (!list_count(remote_dep_job_list) || !fed_mgr_fed_rec || |
| !fed_mgr_cluster_rec) |
| return; |
| |
| slurm_mutex_lock(&dep_job_list_mutex); |
| itr = list_iterator_create(remote_dep_job_list); |
| while ((job_ptr = list_next(itr))) { |
| origin_id = fed_mgr_get_cluster_id(job_ptr->job_id); |
| origin = fed_mgr_get_cluster_by_id(origin_id); |
| if (!origin) { |
| /* |
| * The origin probably left the federation. If it comes |
| * back there's no guarantee it will have the same |
| * cluster id as before. |
| */ |
| log_flag(FEDR, "%s: Couldn't find the origin cluster (id %u); it probably left the federation. Stop testing dependency for %pJ.", |
| __func__, origin_id, job_ptr); |
| list_delete_item(itr); |
| continue; |
| } |
| |
| rc = test_job_dependency(job_ptr, &was_changed); |
| if (rc == LOCAL_DEPEND) { |
| if (was_changed) { |
| log_flag(FEDR, "%s: %pJ has at least 1 local dependency left.", |
| __func__, job_ptr); |
| _update_origin_job_dep(job_ptr, origin); |
| } |
| } else if (rc == FAIL_DEPEND) { |
| log_flag(FEDR, "%s: %pJ test_job_dependency() failed, dependency never satisfied.", |
| __func__, job_ptr); |
| _update_origin_job_dep(job_ptr, origin); |
| list_delete_item(itr); |
| } else { /* ((rc == REMOTE_DEPEND) || (rc == NO_DEPEND)) */ |
| log_flag(FEDR, "%s: %pJ has no more dependencies left on this cluster.", |
| __func__, job_ptr); |
| _update_origin_job_dep(job_ptr, origin); |
| list_delete_item(itr); |
| } |
| } |
| list_iterator_destroy(itr); |
| slurm_mutex_unlock(&dep_job_list_mutex); |
| } |