blob: bb96b909c02070a5f5b35aae4aca0ec1ac2090c3 [file] [log] [blame]
/*****************************************************************************\
* job_scheduler.c - manage the scheduling of pending jobs in priority order
* Note there is a global job list (job_list)
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Copyright (C) SchedMD LLC.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#include <ctype.h>
#include <errno.h>
#include <poll.h>
#include <signal.h> /* for SIGKILL */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#if HAVE_SYS_PRCTL_H
# include <sys/prctl.h>
#endif
#include "src/common/assoc_mgr.h"
#include "src/common/cpu_frequency.h"
#include "src/common/env.h"
#include "src/common/group_cache.h"
#include "src/common/job_features.h"
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/node_features.h"
#include "src/common/parse_time.h"
#include "src/common/strlcpy.h"
#include "src/common/timers.h"
#include "src/common/track_script.h"
#include "src/common/uid.h"
#include "src/common/xassert.h"
#include "src/common/xstring.h"
#include "src/interfaces/accounting_storage.h"
#include "src/interfaces/acct_gather.h"
#include "src/interfaces/burst_buffer.h"
#include "src/interfaces/gres.h"
#include "src/interfaces/node_features.h"
#include "src/interfaces/preempt.h"
#include "src/interfaces/prep.h"
#include "src/interfaces/select.h"
#include "src/slurmctld/acct_policy.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/fed_mgr.h"
#include "src/slurmctld/gang.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/job_scheduler.h"
#include "src/slurmctld/licenses.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/node_scheduler.h"
#include "src/slurmctld/power_save.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/reservation.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/state_save.h"
#include "src/stepmgr/gres_stepmgr.h"
#include "src/stepmgr/srun_comm.h"
#include "src/stepmgr/stepmgr.h"
#ifndef CORRESPOND_ARRAY_TASK_CNT
# define CORRESPOND_ARRAY_TASK_CNT 10
#endif
#define BUILD_TIMEOUT 2000000 /* Max build_job_queue() run time in usec */
typedef enum {
ARRAY_SPLIT_BURST_BUFFER,
ARRAY_SPLIT_AFTER_CORR,
} array_split_type_t;
typedef struct {
list_t *job_list;
int pend_cnt_limit;
char *reason_msg;
array_split_type_t type;
} split_job_t;
typedef struct {
bool backfill;
bool clear_start;
int job_prio_pairs;
job_record_t *job_ptr;
list_t *job_queue;
time_t *last_log_time;
time_t now;
int prio_inx;
struct timeval start_tv;
int tested_jobs;
} build_job_queue_for_part_t;
typedef struct {
bool completing;
bitstr_t *eff_cg_bitmap;
time_t recent;
} job_is_comp_t;
typedef struct {
uint32_t prio;
bool set;
} part_prios_same_t;
typedef struct {
char *cg_part_str;
char *cg_part_str_pos;
bitstr_t *eff_cg_bitmap;
} part_reduce_frag_t;
typedef struct {
job_record_t *het_job;
job_record_t *het_job_leader;
job_record_t *job_ptr;
} het_job_ready_t;
typedef struct {
job_record_t *het_job_leader;
int het_job_offset;
batch_job_launch_msg_t *launch_msg_ptr;
} het_job_env_t;
typedef struct {
job_record_t *job_ptr;
char *sep;
bool set_or_flag;
} depend_str_t;
typedef struct {
bool and_failed;
bool changed;
bool has_local_depend;
bool has_unfulfilled;
job_record_t *job_ptr;
bool or_flag;
bool or_satisfied;
} test_job_dep_t;
typedef struct {
uint64_t cume_space_time;
job_record_t *job_ptr;
uint32_t part_cpus_per_node;
} delay_start_t;
typedef struct {
job_record_t *job_ptr;
time_t now;
int rc;
will_run_response_msg_t **resp;
} job_start_data_t;
typedef struct {
int bracket;
bool can_reboot;
char *debug_str;
char *features;
list_t *feature_list;
bool has_xand;
bool has_mor;
int paren;
int rc;
bool skip_validation;
} valid_feature_t;
static batch_job_launch_msg_t *_build_launch_job_msg(job_record_t *job_ptr,
uint16_t protocol_version);
static bool _job_runnable_test1(job_record_t *job_ptr, bool clear_start);
static bool _job_runnable_test2(job_record_t *job_ptr, time_t now,
bool check_min_time);
static bool _scan_depend(list_t *dependency_list, job_record_t *job_ptr);
static void * _sched_agent(void *args);
static void _set_schedule_exit(schedule_exit_t code);
static int _schedule(bool full_queue);
static int _valid_batch_features(job_record_t *job_ptr, bool can_reboot);
static int _valid_feature_list(job_record_t *job_ptr,
valid_feature_t *valid_feature,
bool is_reservation);
static int _valid_node_feature(char *feature, bool can_reboot);
static int build_queue_timeout = BUILD_TIMEOUT;
static int correspond_after_task_cnt = CORRESPOND_ARRAY_TASK_CNT;
static pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t sched_cond = PTHREAD_COND_INITIALIZER;
static pthread_t thread_id_sched = 0;
static bool sched_full_queue = false;
static int sched_requests = 0;
static struct timeval sched_last = {0, 0};
static uint32_t max_array_size = NO_VAL;
static bool bf_hetjob_immediate = false;
static uint16_t bf_hetjob_prio = 0;
static int sched_min_interval = 2;
static int bb_array_stage_cnt = 10;
extern diag_stats_t slurmctld_diag_stats;
static int _find_singleton_job (void *x, void *key)
{
job_record_t *qjob_ptr = (job_record_t *) x;
job_record_t *job_ptr = (job_record_t *) key;
xassert (qjob_ptr->magic == JOB_MAGIC);
/*
* get user jobs with the same user and name
*/
if (qjob_ptr->user_id != job_ptr->user_id)
return 0;
if (qjob_ptr->name && job_ptr->name &&
xstrcmp(qjob_ptr->name, job_ptr->name))
return 0;
/*
* already running/suspended job or previously
* submitted pending job
* and not a het job, or not part of the same het job
*/
if ((IS_JOB_RUNNING(qjob_ptr) || IS_JOB_SUSPENDED(qjob_ptr) ||
(IS_JOB_PENDING(qjob_ptr) &&
(qjob_ptr->job_id < job_ptr->job_id))) &&
(!job_ptr->het_job_id ||
(job_ptr->het_job_id != qjob_ptr->het_job_id))) {
return 1;
}
return 0;
}
static int _queue_resv_list(void *x, void *key)
{
job_queue_req_t *job_queue_req = (job_queue_req_t *) key;
job_queue_req->resv_ptr = (slurmctld_resv_t *) x;
if ((job_queue_req->job_ptr->bit_flags & JOB_PART_ASSIGNED) &&
job_queue_req->resv_ptr->part_ptr)
job_queue_req->part_ptr = job_queue_req->resv_ptr->part_ptr;
job_queue_append_internal(job_queue_req);
return 0;
}
static void _job_queue_append(list_t *job_queue, job_record_t *job_ptr,
uint32_t prio)
{
job_queue_req_t job_queue_req = { .job_ptr = job_ptr,
.job_queue = job_queue,
.part_ptr = job_ptr->part_ptr,
.prio = prio };
/* We have multiple reservations, process and end here */
if (job_ptr->resv_list) {
list_for_each(job_ptr->resv_list, _queue_resv_list,
&job_queue_req);
return;
}
job_queue_append_internal(&job_queue_req);
/*
* This means we requested a specific reservation, don't do any magnetic
* ones
*/
if (job_ptr->resv_name)
return;
/*
* For het jobs, backfill makes a plan for each component; however,
* backfill doesn't track magnetic reservations in the plan, so backfill
* can't start hetjobs in a magnetic reservation unless the het job
* explicitly requests the magnetic reservation.
*
* Also, if there is a magnetic reservation that starts in the future,
* backfill will not be able to start the het job if there is a separate
* magnetic reservation queue record for the component. So, don't create
* a separate magnetic reservation queue record for het jobs.
*/
if (job_ptr->het_job_id)
return;
job_resv_append_magnetic(&job_queue_req);
}
/* Job test for ability to run now, excludes partition specific tests */
static bool _job_runnable_test1(job_record_t *job_ptr, bool sched_plugin)
{
bool job_indepen = false;
time_t now = time(NULL);
xassert(job_ptr->magic == JOB_MAGIC);
if (!IS_JOB_PENDING(job_ptr) || IS_JOB_COMPLETING(job_ptr))
return false;
if (IS_JOB_REVOKED(job_ptr))
return false;
if ((job_ptr->details && job_ptr->details->prolog_running) ||
(job_ptr->step_list && list_count(job_ptr->step_list))) {
/* Job's been requeued and the
* previous run hasn't finished yet */
job_ptr->state_reason = WAIT_CLEANING;
xfree(job_ptr->state_desc);
last_job_update = now;
sched_debug3("%pJ. State=PENDING. Reason=Cleaning.", job_ptr);
return false;
}
job_indepen = job_independent(job_ptr);
if (sched_plugin)
job_ptr->start_time = (time_t) 0;
if (job_ptr->priority == 0) { /* held */
if ((job_ptr->state_reason != FAIL_BAD_CONSTRAINTS) &&
(job_ptr->state_reason != FAIL_BURST_BUFFER_OP) &&
(job_ptr->state_reason != WAIT_HELD) &&
(job_ptr->state_reason != WAIT_HELD_USER) &&
(job_ptr->state_reason != WAIT_MAX_REQUEUE) &&
(job_ptr->state_reason != WAIT_RESV_INVALID) &&
(job_ptr->state_reason != WAIT_RESV_DELETED)) {
job_ptr->state_reason = WAIT_HELD;
xfree(job_ptr->state_desc);
last_job_update = now;
}
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(job_ptr->state_reason),
job_ptr->priority);
return false;
}
if (!job_indepen &&
((job_ptr->state_reason == WAIT_HELD) ||
(job_ptr->state_reason == WAIT_HELD_USER))) {
/* released behind active dependency? */
job_ptr->state_reason = WAIT_DEPENDENCY;
xfree(job_ptr->state_desc);
last_job_update = now;
}
if (!job_indepen) /* can not run now */
return false;
return true;
}
/*
* Job and partition tests for ability to run now
* IN job_ptr - job to test
* IN now - update time
* IN check_min_time - If set, test job's minimum time limit
* otherwise test maximum time limit
*/
static bool _job_runnable_test2(job_record_t *job_ptr, time_t now,
bool check_min_time)
{
int reason;
reason = job_limits_check(&job_ptr, check_min_time);
if ((reason != job_ptr->state_reason) &&
((reason != WAIT_NO_REASON) ||
(job_state_reason_check(job_ptr->state_reason, JSR_PART)))) {
job_ptr->state_reason = reason;
xfree(job_ptr->state_desc);
last_job_update = now;
}
if (reason != WAIT_NO_REASON)
return false;
return true;
}
/*
* Job, reservation and partition tests for ability to run now.
* If a job is submitted to multiple partitions, don't consider partitions
* on which the job would not fit given the current set of nodes in the
* reservation.
* IN job_ptr - job to test
* IN part_ptr - partition to test
*/
static bool _job_runnable_test3(job_record_t *job_ptr, part_record_t *part_ptr)
{
if (job_ptr->resv_ptr && job_ptr->resv_ptr->node_bitmap &&
!(job_ptr->resv_ptr->flags & RESERVE_FLAG_FLEX) &&
part_ptr && part_ptr->node_bitmap &&
(bit_overlap(job_ptr->resv_ptr->node_bitmap, part_ptr->node_bitmap)
< job_ptr->node_cnt_wag))
return false;
return true;
}
static int _find_depend_after_corr(void *x, void *arg)
{
depend_spec_t *dep_ptr = x;
if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_CORRESPOND)
return 1;
return 0;
}
static job_record_t *_split_job_on_schedule_recurse(
job_record_t *job_ptr, split_job_t *split_job)
{
job_record_t *new_job_ptr;
int array_task_id;
if (num_pending_job_array_tasks(job_ptr->array_job_id) >=
split_job->pend_cnt_limit)
return job_ptr;
if (job_ptr->array_recs->task_cnt < 1)
return job_ptr;
array_task_id = bit_ffs(job_ptr->array_recs->task_id_bitmap);
if (array_task_id < 0)
return job_ptr;
if (job_ptr->array_recs->task_cnt == 1) {
job_ptr->array_task_id = array_task_id;
new_job_ptr = job_array_post_sched(job_ptr, false);
if (new_job_ptr != job_ptr) {
if (!split_job->job_list)
split_job->job_list = list_create(NULL);
list_append(split_job->job_list, new_job_ptr);
}
if (job_ptr->details &&
job_ptr->details->dependency &&
job_ptr->details->depend_list)
fed_mgr_submit_remote_dependencies(job_ptr,
false,
false);
return new_job_ptr;
}
job_ptr->array_task_id = array_task_id;
new_job_ptr = job_array_split(job_ptr, false);
debug("%s: Split out %pJ for %s use",
__func__, job_ptr, split_job->reason_msg);
job_state_set(new_job_ptr, JOB_PENDING);
new_job_ptr->start_time = (time_t) 0;
if (!split_job->job_list)
split_job->job_list = list_create(NULL);
list_append(split_job->job_list, new_job_ptr);
/*
* Do NOT clear db_index here, it is handled when task_id_str
* is created elsewhere.
*/
if (split_job->type == ARRAY_SPLIT_BURST_BUFFER)
(void) bb_g_job_validate2(new_job_ptr, NULL);
/*
* See if we need to spawn off any more since the new_job_ptr now has
* ->array_recs.
*/
return _split_job_on_schedule_recurse(new_job_ptr, split_job);
}
static int _split_job_on_schedule(void *x, void *arg)
{
job_record_t *job_ptr = x;
split_job_t *split_job = arg;
if (!IS_JOB_PENDING(job_ptr) ||
!job_ptr->array_recs ||
!job_ptr->array_recs->task_id_bitmap ||
(job_ptr->array_task_id != NO_VAL))
return 0;
/*
* Create individual job records for job arrays that need burst buffer
* staging
*/
if (job_ptr->burst_buffer) {
split_job->pend_cnt_limit = bb_array_stage_cnt;
split_job->reason_msg = "burst buffer";
split_job->type = ARRAY_SPLIT_BURST_BUFFER;
job_ptr = _split_job_on_schedule_recurse(job_ptr, split_job);
}
/*
* Create individual job records for job arrays with
* depend_type == SLURM_DEPEND_AFTER_CORRESPOND
*/
if (job_ptr->details &&
job_ptr->details->depend_list &&
list_count(job_ptr->details->depend_list) &&
list_find_first(job_ptr->details->depend_list,
_find_depend_after_corr,
NULL)) {
split_job->pend_cnt_limit = correspond_after_task_cnt;
split_job->reason_msg = "SLURM_DEPEND_AFTER_CORRESPOND";
split_job->type = ARRAY_SPLIT_AFTER_CORR;
/* If another thing is added after this set job_ptr as above */
(void) _split_job_on_schedule_recurse(job_ptr, split_job);
}
return 0;
}
static int _transfer_job_list(void *x, void *arg)
{
list_append(job_list, x);
return 0;
}
static int _build_job_queue_for_qos(void *x, void *arg)
{
build_job_queue_for_part_t *setup_job = arg;
job_record_t *job_ptr = setup_job->job_ptr;
job_ptr->qos_ptr = x;
/*
* priority_array index matches part_ptr_list * qos_list
* position: increment inx
*/
setup_job->prio_inx++;
if (!_job_runnable_test2(job_ptr, setup_job->now, setup_job->backfill))
return 0;
setup_job->job_prio_pairs++;
if (job_ptr->prio_mult && job_ptr->prio_mult->priority_array) {
_job_queue_append(setup_job->job_queue, job_ptr,
job_ptr->prio_mult->
priority_array[setup_job->prio_inx]);
} else {
_job_queue_append(setup_job->job_queue, job_ptr,
job_ptr->priority);
}
return 0;
}
static int _build_job_queue_for_part(void *x, void *arg)
{
build_job_queue_for_part_t *setup_job = arg;
job_record_t *job_ptr = setup_job->job_ptr;
job_ptr->part_ptr = x;
if (job_ptr->qos_list) {
(void) list_for_each(job_ptr->qos_list,
_build_job_queue_for_qos,
setup_job);
} else {
(void) _build_job_queue_for_qos(job_ptr->qos_ptr, setup_job);
}
return 0;
}
static int _foreach_job_is_completing(void *x, void *arg)
{
job_record_t *job_ptr = x;
job_is_comp_t *job_is_comp = arg;
if (IS_JOB_COMPLETING(job_ptr) &&
(job_ptr->end_time >= job_is_comp->recent)) {
job_is_comp->completing = true;
/*
* Can return after finding first completing job so long
* as a map of nodes in partitions affected by
* completing jobs is not required.
*/
if (!job_is_comp->eff_cg_bitmap)
return -1;
else if (job_ptr->part_ptr)
bit_or(job_is_comp->eff_cg_bitmap,
job_ptr->part_ptr->node_bitmap);
}
return 0;
}
static int _foreach_part_reduce_frag(void *x, void *arg)
{
part_record_t *part_ptr = x;
part_reduce_frag_t *part_reduce_frag = arg;
if (bit_overlap_any(part_reduce_frag->eff_cg_bitmap,
part_ptr->node_bitmap) &&
(part_ptr->state_up & PARTITION_SCHED)) {
part_ptr->flags |= PART_FLAG_SCHED_FAILED;
if (slurm_conf.slurmctld_debug >= LOG_LEVEL_DEBUG) {
xstrfmtcatat(part_reduce_frag->cg_part_str,
&part_reduce_frag->cg_part_str_pos,
"%s%s",
part_reduce_frag->cg_part_str ? "," : "",
part_ptr->name);
}
}
return 0;
}
static int _foreach_setup_part_sched(void *x, void *arg)
{
part_record_t *part_ptr = x;
part_ptr->num_sched_jobs = 0;
part_ptr->flags &= ~PART_FLAG_SCHED_FAILED;
part_ptr->flags &= ~PART_FLAG_SCHED_CLEARED;
return 0;
}
static int _foreach_setup_resv_sched(void *x, void *arg)
{
slurmctld_resv_t *resv_ptr = x;
resv_ptr->flags &= ~RESERVE_FLAG_SCHED_FAILED;
return 0;
}
static int _foreach_build_job_queue(void *x, void *arg)
{
job_record_t *job_ptr = x;
build_job_queue_for_part_t *setup_job = arg;
setup_job->job_ptr = job_ptr;
if (IS_JOB_PENDING(job_ptr)) {
/* Remove backfill flag */
job_ptr->bit_flags &= ~BACKFILL_SCHED;
set_job_failed_assoc_qos_ptr(job_ptr);
acct_policy_handle_accrue_time(job_ptr, false);
if ((job_ptr->state_reason != WAIT_NO_REASON) &&
(job_ptr->state_reason != WAIT_PRIORITY) &&
(job_ptr->state_reason != WAIT_RESOURCES) &&
(job_ptr->state_reason != job_ptr->state_reason_prev_db)) {
job_ptr->state_reason_prev_db = job_ptr->state_reason;
last_job_update = setup_job->now;
}
}
if (((setup_job->tested_jobs % 100) == 0) &&
(slurm_delta_tv(&setup_job->start_tv) >= build_queue_timeout)) {
if (difftime(setup_job->now, *setup_job->last_log_time) > 600) {
/* Log at most once every 10 minutes */
info("%s has run for %d usec, exiting with %d of %d jobs tested, %d job-partition-qos pairs added",
__func__, build_queue_timeout,
setup_job->tested_jobs,
list_count(job_list),
setup_job->job_prio_pairs);
*setup_job->last_log_time = setup_job->now;
}
return -1;
}
setup_job->tested_jobs++;
job_ptr->preempt_in_progress = false; /* initialize */
if (job_ptr->array_recs && setup_job->backfill)
job_ptr->array_recs->pend_run_tasks = 0;
if (job_ptr->resv_list)
job_ptr->resv_ptr = NULL;
if (!_job_runnable_test1(job_ptr, setup_job->clear_start))
return 0;
setup_job->prio_inx = -1;
if (job_ptr->part_ptr_list) {
(void) list_for_each(job_ptr->part_ptr_list,
_build_job_queue_for_part,
setup_job);
} else {
if (job_ptr->part_ptr == NULL) {
part_record_t *part_ptr =
find_part_record(job_ptr->partition);
if (!part_ptr) {
error("Could not find partition %s for %pJ",
job_ptr->partition, job_ptr);
return 0;
}
job_ptr->part_ptr = part_ptr;
error("partition pointer reset for %pJ, part %s",
job_ptr, job_ptr->partition);
job_ptr->bit_flags |= JOB_PART_ASSIGNED;
}
(void) _build_job_queue_for_part(job_ptr->part_ptr, setup_job);
}
return 0;
}
static int _foreach_set_job_elig(void *x, void *arg)
{
job_record_t *job_ptr = x;
time_t now = *(time_t *) arg;
part_record_t *part_ptr = job_ptr->part_ptr;
if (!IS_JOB_PENDING(job_ptr))
return 0;
if (!part_ptr)
return 0;
if (!job_ptr->details ||
(job_ptr->details->begin_time > now))
return 0;
if (!(part_ptr->state_up & PARTITION_SCHED))
return 0;
if ((job_ptr->time_limit != NO_VAL) &&
(job_ptr->time_limit > part_ptr->max_time))
return 0;
if (job_ptr->details->max_nodes &&
((job_ptr->details->max_nodes < part_ptr->min_nodes) ||
(job_ptr->details->min_nodes > part_ptr->max_nodes)))
return 0;
/* Job's eligible time is set in job_independent() */
(void) job_independent(job_ptr);
return 0;
}
extern void job_queue_rec_magnetic_resv(job_queue_rec_t *job_queue_rec)
{
job_record_t *job_ptr;
if (!job_queue_rec->resv_ptr)
return;
xassert(job_queue_rec->job_ptr);
xassert(!job_queue_rec->job_ptr->resv_name);
job_ptr = job_queue_rec->job_ptr;
job_ptr->resv_ptr = job_queue_rec->resv_ptr;
job_ptr->resv_name = xstrdup(job_ptr->resv_ptr->name);
job_ptr->resv_id = job_ptr->resv_ptr->resv_id;
job_queue_rec->job_ptr->bit_flags |= JOB_MAGNETIC;
}
extern void job_queue_rec_resv_list(job_queue_rec_t *job_queue_rec)
{
job_record_t *job_ptr;
if (!job_queue_rec->resv_ptr)
return;
xassert(job_queue_rec->job_ptr);
job_ptr = job_queue_rec->job_ptr;
job_ptr->resv_ptr = job_queue_rec->resv_ptr;
/*
* Do not set the name since we have multiple and we don't want to
* overwrite it.
*/
job_ptr->resv_id = job_ptr->resv_ptr->resv_id;
}
/*
* build_job_queue - build (non-priority ordered) list of pending jobs
* IN clear_start - if set then clear the start_time for pending jobs,
* true when called from sched/backfill or sched/builtin
* IN backfill - true if running backfill scheduler, enforce min time limit
* RET the job queue
* NOTE: the caller must call FREE_NULL_LIST() on RET value to free memory
*/
extern list_t *build_job_queue(bool clear_start, bool backfill)
{
static time_t last_log_time = 0;
split_job_t split_job = { 0 };
build_job_queue_for_part_t setup_job = {
.backfill = backfill,
.clear_start = clear_start,
.last_log_time = &last_log_time,
.now = time(NULL),
.start_tv = { 0, 0 },
};
/* init the timer */
(void) slurm_delta_tv(&setup_job.start_tv);
setup_job.job_queue = list_create(xfree_ptr);
(void) list_for_each(job_list, _split_job_on_schedule, &split_job);
if (split_job.job_list) {
/*
* We can't use list_transfer() because we don't have the same
* destroy function.
*/
(void) list_for_each(split_job.job_list,
_transfer_job_list, NULL);
FREE_NULL_LIST(split_job.job_list);
}
(void) list_for_each(job_list, _foreach_build_job_queue, &setup_job);
return setup_job.job_queue;
}
/*
* job_is_completing - Determine if jobs are in the process of completing.
* IN/OUT eff_cg_bitmap - optional bitmap of all relevant completing nodes,
* relevenace determined by filtering via CompleteWait
* if NULL, function will terminate at first completing
* job
* RET - True of any job is in the process of completing AND
* CompleteWait is configured non-zero
* NOTE: This function can reduce resource fragmentation, which is a
* critical issue on Elan interconnect based systems.
*/
extern bool job_is_completing(bitstr_t *eff_cg_bitmap)
{
job_is_comp_t job_is_comp = {
.eff_cg_bitmap = eff_cg_bitmap,
};
if ((job_list == NULL) || (slurm_conf.complete_wait == 0))
return false;
job_is_comp.recent = time(NULL) - slurm_conf.complete_wait;
(void) list_for_each(job_list, _foreach_job_is_completing,
&job_is_comp);
return job_is_comp.completing;
}
/*
* set_job_elig_time - set the eligible time for pending jobs once their
* dependencies are lifted (in job->details->begin_time)
*/
extern void set_job_elig_time(void)
{
slurmctld_lock_t job_write_lock =
{ READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
time_t now = time(NULL);
lock_slurmctld(job_write_lock);
(void) list_for_each(job_list, _foreach_set_job_elig, &now);
unlock_slurmctld(job_write_lock);
}
static void _do_diag_stats(long delta_t)
{
if (delta_t > slurmctld_diag_stats.schedule_cycle_max)
slurmctld_diag_stats.schedule_cycle_max = delta_t;
slurmctld_diag_stats.schedule_cycle_sum += delta_t;
slurmctld_diag_stats.schedule_cycle_last = delta_t;
slurmctld_diag_stats.schedule_cycle_counter++;
}
/*
* Queue requests of job scheduler
*/
extern void schedule(bool full_queue)
{
if (slurmctld_config.scheduling_disabled)
return;
slurm_mutex_lock(&sched_mutex);
sched_full_queue |= full_queue;
slurm_cond_broadcast(&sched_cond);
sched_requests++;
slurm_mutex_unlock(&sched_mutex);
}
/* detached thread periodically attempts to schedule jobs */
static void *_sched_agent(void *args)
{
long delta_t;
struct timeval now;
int job_cnt;
bool full_queue;
#if HAVE_SYS_PRCTL_H
if (prctl(PR_SET_NAME, "sched_agent", NULL, NULL, NULL) < 0) {
error("cannot set my name to _sched_agent %m");
}
#endif
while (true) {
slurm_mutex_lock(&sched_mutex);
while (true) {
if (slurmctld_config.shutdown_time) {
slurm_mutex_unlock(&sched_mutex);
return NULL;
}
gettimeofday(&now, NULL);
delta_t = (now.tv_sec - sched_last.tv_sec) *
USEC_IN_SEC;
delta_t += now.tv_usec - sched_last.tv_usec;
if (sched_requests && delta_t > sched_min_interval ) {
break;
} else if (sched_requests) {
struct timespec ts = {0, 0};
int64_t nsec;
nsec = sched_min_interval + sched_last.tv_usec;
nsec *= NSEC_IN_USEC;
nsec += NSEC_IN_USEC;
ts.tv_sec = sched_last.tv_sec +
(nsec / NSEC_IN_SEC);
ts.tv_nsec = nsec % NSEC_IN_SEC;
slurm_cond_timedwait(&sched_cond,
&sched_mutex, &ts);
} else {
slurm_cond_wait(&sched_cond, &sched_mutex);
}
}
full_queue = sched_full_queue;
sched_full_queue = false;
sched_requests = 0;
slurm_mutex_unlock(&sched_mutex);
job_cnt = _schedule(full_queue);
gettimeofday(&now, NULL);
sched_last.tv_sec = now.tv_sec;
sched_last.tv_usec = now.tv_usec;
if (job_cnt) {
/* jobs were started, save state */
schedule_node_save(); /* Has own locking */
schedule_job_save(); /* Has own locking */
}
}
return NULL;
}
/* Determine if job's deadline specification is still valid, kill job if not
* job_ptr IN - Job to test
* func IN - function name used for logging
* RET - true of valid, false if invalid and job cancelled
*/
extern bool deadline_ok(job_record_t *job_ptr, const char *func)
{
time_t now;
char time_str_deadline[256];
bool fail_job = false;
time_t inter;
now = time(NULL);
if ((job_ptr->time_min) && (job_ptr->time_min != NO_VAL)) {
inter = now + job_ptr->time_min * 60;
if (job_ptr->deadline < inter) {
slurm_make_time_str(&job_ptr->deadline,
time_str_deadline,
sizeof(time_str_deadline));
info("%s: %pJ with time_min %u exceeded deadline %s and cancelled",
func, job_ptr, job_ptr->time_min,
time_str_deadline);
fail_job = true;
}
} else if ((job_ptr->time_limit != NO_VAL) &&
(job_ptr->time_limit != INFINITE)) {
inter = now + job_ptr->time_limit * 60;
if (job_ptr->deadline < inter) {
slurm_make_time_str(&job_ptr->deadline,
time_str_deadline,
sizeof(time_str_deadline));
info("%s: %pJ with time_limit %u exceeded deadline %s and cancelled",
func, job_ptr, job_ptr->time_limit,
time_str_deadline);
fail_job = true;
}
}
if (fail_job) {
last_job_update = now;
job_state_set(job_ptr, JOB_DEADLINE);
job_ptr->exit_code = 1;
job_ptr->state_reason = FAIL_DEADLINE;
xfree(job_ptr->state_desc);
job_ptr->start_time = now;
job_ptr->end_time = now;
srun_allocate_abort(job_ptr);
job_completion_logger(job_ptr, false);
return false;
}
return true;
}
/*
* When an array job is rejected for some reason, the remaining array tasks will
* get skipped by both the main scheduler and the backfill scheduler (it's an
* optimization). Hence, their reasons should match the reason of the first job.
* This function sets those reasons.
*
* job_ptr (IN) The current job being evaluated, after it has gone
* through the scheduling loop.
* reject_array_job (IN) A pointer to the first job (array task) in the most
* recently rejected array job. If job_ptr belongs to the
* same array as reject_array_job, then set job_ptr's
* reason to match reject_array_job.
*/
extern void fill_array_reasons(job_record_t *job_ptr,
job_record_t *reject_array_job)
{
if (!reject_array_job || !reject_array_job->array_job_id)
return;
if (job_ptr == reject_array_job)
return;
/*
* If the current job is part of the rejected job array...
* And if the reason isn't properly set yet...
*/
if ((job_ptr->array_job_id == reject_array_job->array_job_id) &&
(job_ptr->state_reason != reject_array_job->state_reason)) {
/* Set the reason for the subsequent array task */
xfree(job_ptr->state_desc);
job_ptr->state_reason = reject_array_job->state_reason;
last_job_update = time(NULL);
debug3("%s: Setting reason of array task %pJ to %s",
__func__, job_ptr,
job_state_reason_string(job_ptr->state_reason));
}
}
static job_queue_rec_t *_create_job_queue_rec(job_queue_req_t *job_queue_req)
{
job_queue_rec_t *job_queue_rec = xmalloc(sizeof(*job_queue_rec));
job_queue_rec->array_task_id = job_queue_req->job_ptr->array_task_id;
job_queue_rec->job_id = job_queue_req->job_ptr->job_id;
job_queue_rec->job_ptr = job_queue_req->job_ptr;
job_queue_rec->part_ptr = job_queue_req->part_ptr;
job_queue_rec->priority = job_queue_req->prio;
job_queue_rec->qos_ptr = job_queue_req->job_ptr->qos_ptr;
job_queue_rec->resv_ptr = job_queue_req->resv_ptr;
return job_queue_rec;
}
extern void job_queue_append_internal(job_queue_req_t *job_queue_req)
{
job_queue_rec_t *job_queue_rec;
xassert(job_queue_req);
xassert(job_queue_req->job_ptr);
xassert(job_queue_req->job_queue);
xassert(job_queue_req->part_ptr);
if (job_queue_req->job_ptr->details &&
job_queue_req->job_ptr->details->prefer) {
job_queue_rec = _create_job_queue_rec(job_queue_req);
job_queue_rec->use_prefer = true;
list_append(job_queue_req->job_queue, job_queue_rec);
}
job_queue_rec = _create_job_queue_rec(job_queue_req);
list_append(job_queue_req->job_queue, job_queue_rec);
}
static void _set_features(job_record_t *job_ptr, bool use_prefer)
{
/*
* feature_list_use is a temporary variable and should
* be reset before each use. Do this after the check for
* pending because the job could have started with
* "preferred" job_queue_rec.
*/
if (use_prefer) {
job_ptr->details->features_use =
job_ptr->details->prefer;
job_ptr->details->feature_list_use =
job_ptr->details->prefer_list;
} else {
job_ptr->details->features_use =
job_ptr->details->features;
job_ptr->details->feature_list_use =
job_ptr->details->feature_list;
}
}
static void _set_schedule_exit(schedule_exit_t code)
{
xassert(code < SCHEDULE_EXIT_COUNT);
slurmctld_diag_stats.schedule_exit[code]++;
}
static int _get_nodes_in_reservations(void *x, void *arg)
{
slurmctld_resv_t *resv_ptr = x;
bitstr_t *node_bitmap = arg;
xassert(resv_ptr);
xassert(node_bitmap);
if (resv_ptr->node_bitmap)
bit_or(node_bitmap, resv_ptr->node_bitmap);
return SLURM_SUCCESS;
}
static int _schedule(bool full_queue)
{
list_t *job_queue = NULL;
int job_cnt = 0;
int error_code, i, time_limit, pend_time;
uint32_t job_depth = 0, array_task_id;
job_queue_rec_t *job_queue_rec;
job_record_t *job_ptr = NULL;
part_record_t *part_ptr, *skip_part_ptr = NULL;
bitstr_t *save_avail_node_bitmap;
int bb_wait_cnt = 0;
/* Locks: Read config, write job, write node, read partition */
slurmctld_lock_t job_write_lock =
{ READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
bool is_job_array_head;
static time_t sched_update = 0;
static bool assoc_limit_stop = false;
static int sched_timeout = 0;
static int sched_max_job_start = 0;
static int bf_min_age_reserve = 0;
static uint32_t bf_min_prio_reserve = 0;
static bool bf_licenses = false;
static int def_job_limit = 100;
static int max_jobs_per_part = 0;
static int defer_rpc_cnt = 0;
static bool reduce_completing_frag = false;
time_t now, last_job_sched_start, sched_start;
job_record_t *reject_array_job = NULL;
part_record_t *reject_array_part = NULL;
slurmctld_resv_t *reject_array_resv = NULL;
bool reject_array_use_prefer = false;
bool use_prefer;
bool fail_by_part, wait_on_resv, fail_by_part_non_reserve;
uint32_t deadline_time_limit, save_time_limit = 0;
uint32_t prio_reserve;
DEF_TIMERS;
job_node_select_t job_node_select = { 0 };
static bool ignore_prefer_val = false;
if (slurmctld_config.shutdown_time)
return 0;
if (sched_update != slurm_conf.last_update) {
char *tmp_ptr;
if (xstrcasestr(slurm_conf.sched_params, "assoc_limit_stop"))
assoc_limit_stop = true;
else
assoc_limit_stop = false;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"batch_sched_delay="))) {
batch_sched_delay = atoi(tmp_ptr + 18);
if (batch_sched_delay < 0) {
error("Invalid batch_sched_delay: %d",
batch_sched_delay);
batch_sched_delay = 3;
}
} else {
batch_sched_delay = 3;
}
bb_array_stage_cnt = 10;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"bb_array_stage_cnt="))) {
int task_cnt = atoi(tmp_ptr + 19);
if (task_cnt > 0)
bb_array_stage_cnt = task_cnt;
}
bf_min_age_reserve = 0;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"bf_min_age_reserve="))) {
int min_age = atoi(tmp_ptr + 19);
if (min_age > 0)
bf_min_age_reserve = min_age;
}
bf_min_prio_reserve = 0;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"bf_min_prio_reserve="))) {
int64_t min_prio = (int64_t) atoll(tmp_ptr + 20);
if (min_prio > 0)
bf_min_prio_reserve = (uint32_t) min_prio;
}
bf_licenses = false;
if (xstrcasestr(slurm_conf.sched_params, "bf_licenses")) {
if (!xstrcmp(slurm_conf.schedtype, "sched/builtin"))
error("Ignoring SchedulerParameters=bf_licenses, this option is incompatible with sched/builtin.");
else
bf_licenses = true;
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"build_queue_timeout="))) {
build_queue_timeout = atoi(tmp_ptr + 20);
if (build_queue_timeout < 100) {
error("Invalid build_queue_time: %d",
build_queue_timeout);
build_queue_timeout = BUILD_TIMEOUT;
}
} else {
build_queue_timeout = BUILD_TIMEOUT;
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"correspond_after_task_cnt="))) {
correspond_after_task_cnt = atoi(tmp_ptr + 26);
if (correspond_after_task_cnt <
CORRESPOND_ARRAY_TASK_CNT) {
error("Invalid correspond_after_task_cnt: %d, the value can't be lower than %d",
correspond_after_task_cnt,
CORRESPOND_ARRAY_TASK_CNT);
correspond_after_task_cnt =
CORRESPOND_ARRAY_TASK_CNT;
}
} else {
correspond_after_task_cnt = CORRESPOND_ARRAY_TASK_CNT;
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"default_queue_depth="))) {
def_job_limit = atoi(tmp_ptr + 20);
if (def_job_limit < 0) {
error("ignoring SchedulerParameters: "
"default_queue_depth value of %d",
def_job_limit);
def_job_limit = 100;
}
} else {
def_job_limit = 100;
}
bf_hetjob_prio = 0;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"bf_hetjob_prio="))) {
tmp_ptr += 15;
if (!xstrncasecmp(tmp_ptr, "min", 3))
bf_hetjob_prio |= HETJOB_PRIO_MIN;
else if (!xstrncasecmp(tmp_ptr, "max", 3))
bf_hetjob_prio |= HETJOB_PRIO_MAX;
else if (!xstrncasecmp(tmp_ptr, "avg", 3))
bf_hetjob_prio |= HETJOB_PRIO_AVG;
else
error("Invalid SchedulerParameters bf_hetjob_prio: %s",
tmp_ptr);
}
bf_hetjob_immediate = false;
if (xstrcasestr(slurm_conf.sched_params, "bf_hetjob_immediate"))
bf_hetjob_immediate = true;
if (bf_hetjob_immediate && !bf_hetjob_prio) {
bf_hetjob_prio |= HETJOB_PRIO_MIN;
info("bf_hetjob_immediate automatically sets bf_hetjob_prio=min");
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"partition_job_depth="))) {
max_jobs_per_part = atoi(tmp_ptr + 20);
if (max_jobs_per_part < 0) {
error("ignoring SchedulerParameters: "
"partition_job_depth value of %d",
max_jobs_per_part);
max_jobs_per_part = 0;
}
} else {
max_jobs_per_part = 0;
}
if (xstrcasestr(slurm_conf.sched_params,
"reduce_completing_frag"))
reduce_completing_frag = true;
else
reduce_completing_frag = false;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"max_rpc_cnt=")))
defer_rpc_cnt = atoi(tmp_ptr + 12);
else if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"max_rpc_count=")))
defer_rpc_cnt = atoi(tmp_ptr + 14);
else
defer_rpc_cnt = 0;
if (defer_rpc_cnt < 0) {
error("Invalid max_rpc_cnt: %d", defer_rpc_cnt);
defer_rpc_cnt = 0;
}
time_limit = slurm_conf.msg_timeout / 2;
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"max_sched_time="))) {
sched_timeout = atoi(tmp_ptr + 15);
if ((sched_timeout <= 0) ||
(sched_timeout > time_limit)) {
error("Invalid max_sched_time: %d",
sched_timeout);
sched_timeout = 0;
}
} else {
sched_timeout = 0;
}
if (sched_timeout == 0) {
sched_timeout = MAX(time_limit, 1);
sched_timeout = MIN(sched_timeout, 2);
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"sched_interval="))) {
sched_interval = atoi(tmp_ptr + 15);
if (sched_interval == -1) {
sched_debug("schedule() returning, sched_interval=-1");
/*
* Exit without setting sched_update. This gets
* verbose, but makes this setting easy to
* happen.
*
* No memory is allocated above this.
*/
return 0;
} else if (sched_interval < 0) {
error("Invalid sched_interval: %d",
sched_interval);
sched_interval = 60;
}
} else {
sched_interval = 60;
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"sched_min_interval="))) {
i = atoi(tmp_ptr + 19);
if (i < 0)
error("Invalid sched_min_interval: %d", i);
else
sched_min_interval = i;
} else {
sched_min_interval = 2;
}
if ((tmp_ptr = xstrcasestr(slurm_conf.sched_params,
"sched_max_job_start="))) {
sched_max_job_start = atoi(tmp_ptr + 20);
if (sched_max_job_start < 0) {
error("Invalid sched_max_job_start: %d",
sched_max_job_start);
sched_max_job_start = 0;
}
} else {
sched_max_job_start = 0;
}
if (xstrcasestr(slurm_conf.sched_params,
"ignore_prefer_validation"))
ignore_prefer_val = true;
else
ignore_prefer_val = false;
sched_update = slurm_conf.last_update;
if (slurm_conf.sched_params && strlen(slurm_conf.sched_params))
info("SchedulerParameters=%s", slurm_conf.sched_params);
}
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
if ((defer_rpc_cnt > 0) &&
(slurmctld_config.server_thread_count >= defer_rpc_cnt)) {
sched_debug("schedule() returning, too many RPCs");
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
goto out;
}
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
if (!fed_mgr_sibs_synced()) {
sched_info("schedule() returning, federation siblings not synced yet");
goto out;
}
lock_slurmctld(job_write_lock);
now = time(NULL);
sched_start = now;
last_job_sched_start = now;
START_TIMER;
if (!reduce_completing_frag && job_is_completing(NULL)) {
unlock_slurmctld(job_write_lock);
sched_debug("schedule() returning, some job is still completing");
goto out;
}
(void) list_for_each(part_list, _foreach_setup_part_sched, NULL);
(void) list_for_each(resv_list, _foreach_setup_resv_sched, NULL);
save_avail_node_bitmap = bit_copy(avail_node_bitmap);
/* Avoid resource fragmentation if important */
if (reduce_completing_frag) {
bitstr_t *eff_cg_bitmap = bit_alloc(node_record_count);
if (job_is_completing(eff_cg_bitmap)) {
part_reduce_frag_t part_reduce_frag = {
.eff_cg_bitmap = eff_cg_bitmap,
};
(void) list_for_each(part_list,
_foreach_part_reduce_frag,
&part_reduce_frag);
if (part_reduce_frag.cg_part_str) {
sched_debug("some job is still completing, skipping partitions '%s'",
part_reduce_frag.cg_part_str);
xfree(part_reduce_frag.cg_part_str);
}
}
FREE_NULL_BITMAP(eff_cg_bitmap);
}
sched_debug("Running job scheduler %s.", full_queue ? "for full queue":"for default depth");
job_queue = build_job_queue(false, false);
slurmctld_diag_stats.schedule_queue_len = list_count(job_queue);
sort_job_queue(job_queue);
job_ptr = NULL;
wait_on_resv = false;
while (1) {
/* Run some final guaranteed logic after each job iteration */
if (job_ptr) {
job_resv_clear_magnetic_flag(job_ptr);
fill_array_reasons(job_ptr, reject_array_job);
}
job_queue_rec = list_pop(job_queue);
if (!job_queue_rec) {
_set_schedule_exit(SCHEDULE_EXIT_END);
break;
}
array_task_id = job_queue_rec->array_task_id;
job_ptr = job_queue_rec->job_ptr;
part_ptr = job_queue_rec->part_ptr;
if ((job_ptr->array_task_id != array_task_id) &&
(array_task_id == NO_VAL)) {
/* Job array element started in other partition,
* reset pointer to "master" job array record */
job_ptr = find_job_record(job_ptr->array_job_id);
job_queue_rec->job_ptr = job_ptr;
}
if (!job_ptr ||
!IS_JOB_PENDING(job_ptr) || /* started in other part/qos */
!job_ptr->priority) { /* held from fail in other part/qos */
xfree(job_queue_rec);
continue;
}
use_prefer = job_queue_rec->use_prefer;
_set_features(job_ptr, use_prefer);
if (job_ptr->resv_list)
job_queue_rec_resv_list(job_queue_rec);
else
job_queue_rec_magnetic_resv(job_queue_rec);
if (!_job_runnable_test3(job_ptr, part_ptr)) {
xfree(job_queue_rec);
continue;
}
job_ptr->qos_ptr = job_queue_rec->qos_ptr;
job_ptr->part_ptr = part_ptr;
job_ptr->priority = job_queue_rec->priority;
xfree(job_queue_rec);
job_ptr->last_sched_eval = time(NULL);
if (job_ptr->preempt_in_progress)
continue; /* scheduled in another partition */
if (job_ptr->het_job_id) {
fail_by_part = true;
fail_by_part_non_reserve = false;
goto fail_this_part;
}
if (job_ptr->array_recs && (job_ptr->array_task_id == NO_VAL))
is_job_array_head = true;
else
is_job_array_head = false;
next_task:
if ((time(NULL) - sched_start) >= sched_timeout) {
sched_debug("loop taking too long, breaking out");
_set_schedule_exit(SCHEDULE_EXIT_TIMEOUT);
break;
}
if (sched_max_job_start && (job_cnt >= sched_max_job_start)) {
sched_debug("sched_max_job_start reached, breaking out");
_set_schedule_exit(SCHEDULE_EXIT_MAX_JOB_START);
break;
}
if ((job_ptr->array_task_id != NO_VAL) || job_ptr->array_recs) {
if (reject_array_job &&
(reject_array_job->array_job_id ==
job_ptr->array_job_id) &&
(reject_array_part == part_ptr) &&
(reject_array_resv == job_ptr->resv_ptr) &&
(reject_array_use_prefer == use_prefer))
continue; /* already rejected array element */
/* assume reject whole array for now, clear if OK */
reject_array_job = job_ptr;
reject_array_part = part_ptr;
reject_array_resv = job_ptr->resv_ptr;
reject_array_use_prefer = use_prefer;
if (!job_array_start_test(job_ptr))
continue;
}
if (max_jobs_per_part &&
(max_jobs_per_part < ++job_ptr->part_ptr->num_sched_jobs)) {
if (job_ptr->state_reason == WAIT_NO_REASON) {
xfree(job_ptr->state_desc);
job_ptr->state_reason = WAIT_PRIORITY;
last_job_update = now;
}
if (job_ptr->part_ptr == skip_part_ptr)
continue;
sched_debug2("reached partition %s job limit",
job_ptr->part_ptr->name);
skip_part_ptr = job_ptr->part_ptr;
continue;
}
if (!full_queue && (job_depth++ > def_job_limit)) {
sched_debug("already tested %u jobs, breaking out",
job_depth);
_set_schedule_exit(SCHEDULE_EXIT_MAX_DEPTH);
break;
}
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
if ((defer_rpc_cnt > 0) &&
(slurmctld_config.server_thread_count >= defer_rpc_cnt)) {
sched_debug("schedule() returning, too many RPCs");
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
_set_schedule_exit(SCHEDULE_EXIT_RPC_CNT);
break;
}
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
if (job_limits_check(&job_ptr, false) != WAIT_NO_REASON) {
/* should never happen */
continue;
}
slurmctld_diag_stats.schedule_cycle_depth++;
if (job_ptr->resv_name) {
/*
* If we have a MaxStartDelay we need to make sure we
* don't schedule any jobs that could potentially run to
* avoid starvation of this job.
*/
if (job_ptr->resv_ptr &&
job_ptr->resv_ptr->max_start_delay)
wait_on_resv = true;
if (job_ptr->resv_ptr->flags &
RESERVE_FLAG_SCHED_FAILED) {
job_ptr->state_reason = WAIT_PRIORITY;
xfree(job_ptr->state_desc);
last_job_update = now;
sched_debug3("%pJ. State=PENDING. Reason=Priority. Priority=%u. Resv=%s.",
job_ptr,
job_ptr->priority,
job_ptr->resv_name);
continue;
}
} else if (job_ptr->part_ptr->flags & PART_FLAG_SCHED_FAILED) {
if (!(job_ptr->part_ptr->flags &
PART_FLAG_SCHED_CLEARED)) {
bit_and_not(avail_node_bitmap,
part_ptr->node_bitmap);
job_ptr->part_ptr->flags |=
PART_FLAG_SCHED_CLEARED;
}
if ((job_ptr->state_reason == WAIT_NO_REASON) ||
(job_ptr->state_reason == WAIT_RESOURCES)) {
sched_debug("%pJ unable to schedule in Partition=%s (per PART_FLAG_SCHED_FAILED). State=PENDING. Previous-Reason=%s. Previous-Desc=%s. New-Reason=Priority. Priority=%u.",
job_ptr,
job_ptr->part_ptr->name,
job_state_reason_string(
job_ptr->state_reason),
job_ptr->state_desc,
job_ptr->priority);
job_ptr->state_reason = WAIT_PRIORITY;
xfree(job_ptr->state_desc);
last_job_update = now;
} else {
/*
* Log job can not run even though we are not
* overriding the reason */
sched_debug2("%pJ. unable to schedule in Partition=%s (per PART_FLAG_SCHED_FAILED). Retaining previous scheduling Reason=%s. Desc=%s. Priority=%u.",
job_ptr,
job_ptr->part_ptr->name,
job_state_reason_string(
job_ptr->state_reason),
job_ptr->state_desc,
job_ptr->priority);
}
last_job_update = now;
continue;
} else if (wait_on_resv &&
(job_ptr->warn_flags & KILL_JOB_RESV)) {
sched_debug("%pJ. State=PENDING. Reason=Priority, Priority=%u. May be able to backfill on MaxStartDelay reservations.",
job_ptr, job_ptr->priority);
continue;
}
/* Test for valid QOS and required nodes on each pass */
if (job_ptr->qos_ptr) {
assoc_mgr_lock_t locks =
{ .assoc = READ_LOCK, .qos = READ_LOCK };
assoc_mgr_lock(&locks);
if (job_ptr->assoc_ptr
&& (accounting_enforce & ACCOUNTING_ENFORCE_QOS)
&& ((job_ptr->qos_ptr->id >= g_qos_count) ||
!job_ptr->assoc_ptr->usage ||
!job_ptr->assoc_ptr->usage->valid_qos ||
!bit_test(job_ptr->assoc_ptr->usage->valid_qos,
job_ptr->qos_ptr->id))
&& !job_ptr->limit_set.qos) {
assoc_mgr_unlock(&locks);
sched_debug("%pJ has invalid QOS", job_ptr);
job_fail_qos(job_ptr, __func__, false);
last_job_update = now;
continue;
} else if (job_ptr->state_reason == FAIL_QOS) {
xfree(job_ptr->state_desc);
job_ptr->state_reason = WAIT_NO_REASON;
last_job_update = now;
}
assoc_mgr_unlock(&locks);
}
deadline_time_limit = 0;
if ((job_ptr->deadline) && (job_ptr->deadline != NO_VAL)) {
if (!deadline_ok(job_ptr, __func__))
continue;
deadline_time_limit = job_ptr->deadline - now;
deadline_time_limit /= 60;
if ((job_ptr->time_limit != NO_VAL) &&
(job_ptr->time_limit != INFINITE)) {
deadline_time_limit = MIN(job_ptr->time_limit,
deadline_time_limit);
} else {
if ((job_ptr->part_ptr->default_time != NO_VAL) &&
(job_ptr->part_ptr->default_time != INFINITE)){
deadline_time_limit = MIN(
job_ptr->part_ptr->default_time,
deadline_time_limit);
} else if ((job_ptr->part_ptr->max_time != NO_VAL) &&
(job_ptr->part_ptr->max_time != INFINITE)){
deadline_time_limit = MIN(
job_ptr->part_ptr->max_time,
deadline_time_limit);
}
}
}
if (job_state_reason_check(job_ptr->state_reason,
JSR_QOS_ASSOC) &&
!acct_policy_job_runnable_pre_select(job_ptr, false))
continue;
if ((job_ptr->state_reason == WAIT_NODE_NOT_AVAIL) &&
job_ptr->details && job_ptr->details->req_node_bitmap &&
!bit_super_set(job_ptr->details->req_node_bitmap,
avail_node_bitmap)) {
continue;
}
if (!job_ptr->part_ptr)
continue;
i = bit_overlap(avail_node_bitmap,
job_ptr->part_ptr->node_bitmap);
if ((job_ptr->details &&
(job_ptr->details->min_nodes != NO_VAL) &&
(job_ptr->details->min_nodes > i)) ||
(!job_ptr->details && (i == 0))) {
/*
* Too many nodes DRAIN, DOWN, or
* reserved for jobs in higher priority partition
*/
job_ptr->state_reason = WAIT_RESOURCES;
xfree(job_ptr->state_desc);
job_ptr->state_desc =
xstrdup_printf("Nodes required for job are DOWN, DRAINED%s or reserved for jobs in higher priority partitions",
bit_overlap(rs_node_bitmap,
job_ptr->part_ptr->
node_bitmap) ? ", REBOOTING" : "");
last_job_update = now;
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority, job_ptr->partition);
fail_by_part = true;
fail_by_part_non_reserve = false;
goto fail_this_part;
}
if (assoc_mgr_validate_assoc_id(acct_db_conn,
job_ptr->assoc_id,
accounting_enforce)) {
/* NOTE: This only happens if a user's account is
* disabled between when the job was submitted and
* the time we consider running it. It should be
* very rare. */
sched_info("%pJ has invalid account", job_ptr);
last_job_update = now;
job_ptr->state_reason = FAIL_ACCOUNT;
xfree(job_ptr->state_desc);
continue;
}
last_job_sched_start = MAX(last_job_sched_start,
job_ptr->start_time);
if (deadline_time_limit) {
save_time_limit = job_ptr->time_limit;
job_ptr->time_limit = deadline_time_limit;
}
/* get fed job lock from origin cluster */
if (fed_mgr_job_lock(job_ptr)) {
error_code = ESLURM_FED_JOB_LOCK;
goto skip_start;
}
job_node_select.job_ptr = job_ptr;
error_code = select_nodes(&job_node_select,
false, false,
SLURMDB_JOB_FLAG_SCHED);
if (error_code == SLURM_SUCCESS) {
/*
* If the following fails because of network
* connectivity, the origin cluster should ask
* when it comes back up if the cluster_lock
* cluster actually started the job
*/
fed_mgr_job_start(job_ptr, job_ptr->start_time);
} else {
/*
* Node config unavailable plus state_reason
* FAIL_BAD_CONSTRAINTS causes the job to be held
* later. If job specs were unsatisfied due to
* --prefer, give the opportunity to test the record
* without it in a second attempt by resetting
* state_reason to FAIL_CONSTRAINTS.
*/
if (ignore_prefer_val && job_ptr->details->prefer &&
job_ptr->details->prefer_list &&
(job_ptr->details->prefer_list ==
job_ptr->details->feature_list_use) &&
(error_code ==
ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE) &&
(job_ptr->state_reason == FAIL_BAD_CONSTRAINTS)) {
sched_debug2("StateReason='%s' set after evaluating %pJ in partition %s (maybe unsatisfied due to --prefer while ignore_prefer_validation configured). Re-testing without --prefer if needed.",
job_state_reason_string(job_ptr->state_reason), job_ptr, job_ptr->part_ptr->name);
job_ptr->state_reason = FAIL_CONSTRAINTS;
}
fed_mgr_job_unlock(job_ptr);
}
skip_start:
fail_by_part = false;
fail_by_part_non_reserve = false;
if ((error_code != SLURM_SUCCESS) && deadline_time_limit)
job_ptr->time_limit = save_time_limit;
if (error_code == ESLURM_NODES_BUSY) {
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority, job_ptr->partition);
fail_by_part = true;
} else if (error_code == ESLURM_LICENSES_UNAVAILABLE) {
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority);
if (bf_licenses) {
sched_debug("%pJ is blocked on licenses. Stopping scheduling so license backfill can handle this",
job_ptr);
_set_schedule_exit(SCHEDULE_EXIT_LIC);
break;
}
} else if (error_code == ESLURM_BURST_BUFFER_WAIT) {
if (job_ptr->start_time == 0) {
job_ptr->start_time = last_job_sched_start;
bb_wait_cnt++;
/*
* Since start time wasn't set yet until this
* point, this means that the job hasn't had a
* chance to start stage-in yet. Clear
* reject_array_job so that other jobs in this
* array (if it was an array) may also have
* a chance to have a start time set and
* therefore have a chance to start stage-in.
*/
reject_array_job = NULL;
reject_array_part = NULL;
reject_array_resv = NULL;
}
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority);
continue;
} else if ((error_code == ESLURM_RESERVATION_BUSY) ||
(error_code == ESLURM_RESERVATION_NOT_USABLE)) {
if (job_ptr->resv_ptr &&
job_ptr->resv_ptr->node_bitmap) {
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority);
bit_and_not(avail_node_bitmap,
job_ptr->resv_ptr->node_bitmap);
} else {
/*
* The job has no reservation but requires
* nodes that are currently in some reservation
* so just skip over this job and try running
* the next lower priority job
*/
sched_debug3("%pJ. State=%s. Reason=Required nodes are reserved. Priority=%u",
job_ptr,
job_state_string(job_ptr->job_state),
job_ptr->priority);
}
} else if (error_code == ESLURM_FED_JOB_LOCK) {
job_ptr->state_reason = WAIT_FED_JOB_LOCK;
xfree(job_ptr->state_desc);
last_job_update = now;
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s. Couldn't get federation job lock.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority, job_ptr->partition);
fail_by_part = true;
} else if (error_code == SLURM_SUCCESS) {
/* job initiated */
sched_debug3("%pJ initiated", job_ptr);
last_job_update = now;
/* Clear assumed rejected array status */
reject_array_job = NULL;
reject_array_part = NULL;
reject_array_resv = NULL;
sched_info("Allocate %pJ NodeList=%s #CPUs=%u Partition=%s",
job_ptr, job_ptr->nodes,
job_ptr->total_cpus,
job_ptr->part_ptr->name);
if (job_ptr->batch_flag == 0)
srun_allocate(job_ptr);
else if (!IS_JOB_CONFIGURING(job_ptr))
launch_job(job_ptr);
rebuild_job_part_list(job_ptr);
job_cnt++;
if (is_job_array_head &&
(job_ptr->array_task_id != NO_VAL)) {
/* Try starting another task of the job array */
job_record_t *tmp = job_ptr;
job_ptr = find_job_record(job_ptr->array_job_id);
if (job_ptr && (job_ptr != tmp) &&
IS_JOB_PENDING(job_ptr) &&
(bb_g_job_test_stage_in(job_ptr, false) ==
1)) {
_set_features(job_ptr, use_prefer);
goto next_task;
}
}
continue;
} else if ((error_code ==
ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE) &&
(job_ptr->resv_ptr)) {
debug("%pJ non-runnable in reservation %s: %s",
job_ptr, job_ptr->resv_ptr->name,
slurm_strerror(error_code));
} else if ((error_code ==
ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE) &&
job_ptr->part_ptr_list) {
debug("%pJ non-runnable in partition %s: %s",
job_ptr, job_ptr->part_ptr->name,
slurm_strerror(error_code));
} else if ((error_code ==
ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE) &&
(job_ptr->state_reason == FAIL_CONSTRAINTS)) {
sched_info("%pJ current node constraints not satisfied",
job_ptr);
/*
* Future node updates may satisfy the constraints, so
* do not hold the job.
*/
} else if (error_code == ESLURM_ACCOUNTING_POLICY) {
sched_debug3("%pJ delayed for accounting policy",
job_ptr);
/* potentially starve this job */
if (assoc_limit_stop)
fail_by_part = true;
} else if (error_code == ESLURM_MAX_POWERED_NODES) {
sched_debug2("%pJ cannot start: %s",
job_ptr, slurm_strerror(error_code));
job_ptr->state_reason = WAIT_MAX_POWERED_NODES;
xfree(job_ptr->state_desc);
} else if (error_code == ESLURM_PORTS_BUSY) {
/*
* This can only happen if using stepd step manager.
* The nodes selected for the job ran out of ports.
*/
fail_by_part = true;
job_ptr->state_reason = WAIT_MPI_PORTS_BUSY;
xfree(job_ptr->state_desc);
sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
job_ptr,
job_state_string(job_ptr->job_state),
job_state_reason_string(
job_ptr->state_reason),
job_ptr->priority, job_ptr->partition);
} else if ((error_code !=
ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE) &&
(error_code != ESLURM_NODE_NOT_AVAIL) &&
(error_code != ESLURM_INVALID_BURST_BUFFER_REQUEST)){
sched_info("schedule: %pJ non-runnable: %s",
job_ptr, slurm_strerror(error_code));
last_job_update = now;
job_state_set(job_ptr, JOB_PENDING);
job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
xfree(job_ptr->state_desc);
job_ptr->start_time = job_ptr->end_time = now;
job_ptr->priority = 0;
debug2("%s: setting %pJ to \"%s\" (%s)",
__func__, job_ptr,
job_state_reason_string(job_ptr->state_reason),
slurm_strerror(error_code));
}
if (job_ptr->details && job_ptr->details->req_node_bitmap &&
(bit_set_count(job_ptr->details->req_node_bitmap) >=
job_ptr->details->min_nodes)) {
fail_by_part = false;
/* Do not schedule more jobs on nodes required by this
* job, but don't block the entire queue/partition. */
bit_and_not(avail_node_bitmap,
job_ptr->details->req_node_bitmap);
}
if (fail_by_part && job_ptr->resv_name) {
/*
* If the reservation is not FLEX or ANY_NODES, other
* jobs in this partition can be scheduled.
*
* Jobs submitted to FLEX or ANY_NODES reservations can
* use nodes outside of the reservation. If the
* reservation is FLEX or ANY_NODES, other jobs in
* this partition submitted to other reservations can
* be scheduled.
*
* In both cases, do not schedule more jobs in this
* reservation.
*/
if ((job_ptr->resv_ptr->flags & RESERVE_FLAG_FLEX) ||
(job_ptr->resv_ptr->flags & RESERVE_FLAG_ANY_NODES))
fail_by_part_non_reserve = true;
else
fail_by_part = false;
job_ptr->resv_ptr->flags |= RESERVE_FLAG_SCHED_FAILED;
}
if (fail_by_part && bf_min_age_reserve) {
/* Consider other jobs in this partition if job has been
* waiting for less than bf_min_age_reserve time */
if (job_ptr->details->begin_time == 0) {
fail_by_part = false;
} else {
pend_time = difftime(
now, job_ptr->details->begin_time);
if (pend_time < bf_min_age_reserve)
fail_by_part = false;
}
}
if (!(prio_reserve = acct_policy_get_prio_thresh(
job_ptr, false)))
prio_reserve = bf_min_prio_reserve;
if (fail_by_part && prio_reserve &&
(job_ptr->priority < prio_reserve))
fail_by_part = false;
fail_this_part: if (fail_by_part) {
/* Search for duplicates */
if (job_ptr->part_ptr->flags & PART_FLAG_SCHED_FAILED) {
fail_by_part = false;
break;
}
}
if (fail_by_part) {
/*
* Do not schedule more jobs in this partition or on
* nodes in this partition
*/
job_ptr->part_ptr->flags |= PART_FLAG_SCHED_FAILED;
if (fail_by_part_non_reserve) {
/*
* If a FLEX or ANY_NODES reservation job fails
* by part, remove all nodes that are not in
* reservations from avail_node_bitmap.
*
* Jobs submitted to FLEX or ANY_NODES
* reservations can be scheduled on nodes
* outside of the reservation. If we allowed
* lower priority jobs to be scheduled on nodes
* not in this reservation, they could delay
* the higher priority job submitted to this
* reservation.
*
* We only remove nodes not in reservations,
* so lower priority jobs submitted to other
* reservations can still be scheduled.
*
* We don't mark the partition as being
* cleared. Once the first non-reservation job
* in the partition gets evaluated, which cannot
* be scheduled since non-reserved nodes have
* been removed, the partition's reserved nodes
* will be removed and it will be marked as
* cleared.
*/
bitstr_t *remove_nodes =
bit_alloc(node_record_count);
list_for_each(resv_list,
_get_nodes_in_reservations,
remove_nodes);
bit_not(remove_nodes);
bit_and(remove_nodes,
job_ptr->part_ptr->node_bitmap);
bit_and_not(avail_node_bitmap, remove_nodes);
FREE_NULL_BITMAP(remove_nodes);
} else {
job_ptr->part_ptr->flags |=
PART_FLAG_SCHED_CLEARED;
bit_and_not(avail_node_bitmap,
job_ptr->part_ptr->node_bitmap);
}
}
}
if (bb_wait_cnt)
(void) bb_g_job_try_stage_in();
if (job_ptr)
job_resv_clear_magnetic_flag(job_ptr);
FREE_NULL_BITMAP(avail_node_bitmap);
avail_node_bitmap = save_avail_node_bitmap;
FREE_NULL_LIST(job_queue);
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
if ((slurmctld_config.server_thread_count >= 150) &&
(defer_rpc_cnt == 0)) {
sched_info("%d pending RPCs at cycle end, consider configuring max_rpc_cnt",
slurmctld_config.server_thread_count);
}
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
unlock_slurmctld(job_write_lock);
END_TIMER2(__func__);
_do_diag_stats(DELTA_TIMER);
out:
return job_cnt;
}
/*
* sort_job_queue - sort job_queue in descending priority order
* IN/OUT job_queue - sorted job queue
*/
extern void sort_job_queue(list_t *job_queue)
{
list_sort(job_queue, sort_job_queue2);
}
/* Note this differs from the ListCmpF typedef since we want jobs sorted
* in order of decreasing priority then submit time and the by increasing
* job id */
extern int sort_job_queue2(void *x, void *y)
{
job_queue_rec_t *job_rec1 = *(job_queue_rec_t **) x;
job_queue_rec_t *job_rec2 = *(job_queue_rec_t **) y;
het_job_details_t *details = NULL;
bool has_resv1, has_resv2;
static time_t config_update = 0;
static bool preemption_enabled = true;
uint32_t job_id1, job_id2;
uint32_t p1, p2;
/* The following block of code is designed to minimize run time in
* typical configurations for this frequently executed function. */
if (config_update != slurm_conf.last_update) {
preemption_enabled = slurm_preemption_enabled();
config_update = slurm_conf.last_update;
}
if (preemption_enabled) {
if (preempt_g_job_preempt_check(job_rec1, job_rec2))
return -1;
if (preempt_g_job_preempt_check(job_rec2, job_rec1))
return 1;
}
if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
(job_rec1->job_ptr->het_job_id !=
job_rec2->job_ptr->het_job_id)) {
if ((details = job_rec1->job_ptr->het_details))
has_resv1 = details->any_resv;
else
has_resv1 = (job_rec1->job_ptr->resv_id != 0) ||
job_rec1->resv_ptr;
} else
has_resv1 = (job_rec1->job_ptr->resv_id != 0) ||
job_rec1->resv_ptr;
if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
(job_rec2->job_ptr->het_job_id !=
job_rec1->job_ptr->het_job_id)) {
if ((details = job_rec2->job_ptr->het_details))
has_resv2 = details->any_resv;
else
has_resv2 = (job_rec2->job_ptr->resv_id != 0) ||
job_rec2->resv_ptr;
} else
has_resv2 = (job_rec2->job_ptr->resv_id != 0) ||
job_rec2->resv_ptr;
if (has_resv1 && !has_resv2)
return -1;
if (!has_resv1 && has_resv2)
return 1;
if (job_rec1->part_ptr && job_rec2->part_ptr) {
if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
(job_rec1->job_ptr->het_job_id !=
job_rec2->job_ptr->het_job_id)) {
if ((details = job_rec1->job_ptr->het_details))
p1 = details->priority_tier;
else
p1 = job_rec1->part_ptr->priority_tier;
} else
p1 = job_rec1->part_ptr->priority_tier;
if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
(job_rec2->job_ptr->het_job_id !=
job_rec1->job_ptr->het_job_id)) {
if ((details = job_rec2->job_ptr->het_details))
p2 = details->priority_tier;
else
p2 = job_rec2->part_ptr->priority_tier;
} else
p2 = job_rec2->part_ptr->priority_tier;
if (p1 < p2)
return 1;
if (p1 > p2)
return -1;
}
if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
(job_rec1->job_ptr->het_job_id !=
job_rec2->job_ptr->het_job_id)) {
if ((details = job_rec1->job_ptr->het_details))
p1 = details->priority;
else {
if (job_rec1->job_ptr->part_ptr_list &&
job_rec1->job_ptr->prio_mult &&
job_rec1->job_ptr->prio_mult->priority_array)
p1 = job_rec1->priority;
else
p1 = job_rec1->job_ptr->priority;
}
} else {
if (job_rec1->job_ptr->part_ptr_list &&
job_rec1->job_ptr->prio_mult &&
job_rec1->job_ptr->prio_mult->priority_array)
p1 = job_rec1->priority;
else
p1 = job_rec1->job_ptr->priority;
}
if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
(job_rec2->job_ptr->het_job_id !=
job_rec1->job_ptr->het_job_id)) {
if ((details = job_rec2->job_ptr->het_details))
p2 = details->priority;
else {
if (job_rec2->job_ptr->part_ptr_list &&
job_rec2->job_ptr->prio_mult &&
job_rec2->job_ptr->prio_mult->priority_array)
p2 = job_rec2->priority;
else
p2 = job_rec2->job_ptr->priority;
}
} else {
if (job_rec2->job_ptr->part_ptr_list &&
job_rec2->job_ptr->prio_mult &&
job_rec2->job_ptr->prio_mult->priority_array)
p2 = job_rec2->priority;
else
p2 = job_rec2->job_ptr->priority;
}
if (p1 < p2)
return 1;
if (p1 > p2)
return -1;
/* If the priorities are the same sort by submission time */
if (job_rec1->job_ptr->details && job_rec2->job_ptr->details) {
if (job_rec1->job_ptr->details->submit_time >
job_rec2->job_ptr->details->submit_time)
return 1;
if (job_rec2->job_ptr->details->submit_time >
job_rec1->job_ptr->details->submit_time)
return -1;
}
/* If the submission times are the same sort by increasing job id's */
if (job_rec1->array_task_id == NO_VAL)
job_id1 = job_rec1->job_id;
else
job_id1 = job_rec1->job_ptr->array_job_id;
if (job_rec2->array_task_id == NO_VAL)
job_id2 = job_rec2->job_id;
else
job_id2 = job_rec2->job_ptr->array_job_id;
if (job_id1 > job_id2)
return 1;
else if (job_id1 < job_id2)
return -1;
/* If job IDs match compare task IDs */
if (job_rec1->array_task_id > job_rec2->array_task_id)
return 1;
/* Magnetic or multi-reservation. */
if (job_rec1->resv_ptr && job_rec2->resv_ptr &&
(job_rec1->resv_ptr->start_time > job_rec2->resv_ptr->start_time))
return 1;
if (job_rec1->use_prefer && !job_rec2->use_prefer)
return -1;
else if (!job_rec1->use_prefer && job_rec2->use_prefer)
return 1;
return -1;
}
/* The environment" variable is points to one big xmalloc. In order to
* manipulate the array for a hetjob, we need to split it into an array
* containing multiple xmalloc variables */
static void _split_env(batch_job_launch_msg_t *launch_msg_ptr)
{
int i;
for (i = 1; i < launch_msg_ptr->envc; i++) {
launch_msg_ptr->environment[i] =
xstrdup(launch_msg_ptr->environment[i]);
}
}
/* Given a scheduled job, return a pointer to it batch_job_launch_msg_t data */
static batch_job_launch_msg_t *_build_launch_job_msg(job_record_t *job_ptr,
uint16_t protocol_version)
{
char *fail_why = NULL;
batch_job_launch_msg_t *launch_msg_ptr;
/* Initialization of data structures */
launch_msg_ptr = (batch_job_launch_msg_t *)
xmalloc(sizeof(batch_job_launch_msg_t));
launch_msg_ptr->job_id = job_ptr->job_id;
launch_msg_ptr->het_job_id = job_ptr->het_job_id;
launch_msg_ptr->array_job_id = job_ptr->array_job_id;
launch_msg_ptr->array_task_id = job_ptr->array_task_id;
if (!(launch_msg_ptr->script_buf = get_job_script(job_ptr))) {
fail_why = "Unable to load job batch script";
goto job_failed;
}
/*
* We only want send the number of tasks if we explicitly requested
* them: num_tasks could be set (job_mgr.c
* _figure_out_num_tasks()). Otherwise a step requesting less than the
* allocation will be polluted with this calculated task count
* erroneously.
*/
if (job_ptr->bit_flags & JOB_NTASKS_SET)
launch_msg_ptr->ntasks = job_ptr->details->num_tasks;
launch_msg_ptr->container = xstrdup(job_ptr->container);
launch_msg_ptr->cpu_freq_min = job_ptr->details->cpu_freq_min;
launch_msg_ptr->cpu_freq_max = job_ptr->details->cpu_freq_max;
launch_msg_ptr->cpu_freq_gov = job_ptr->details->cpu_freq_gov;
launch_msg_ptr->nodes = xstrdup(job_ptr->nodes);
launch_msg_ptr->overcommit = job_ptr->details->overcommit;
launch_msg_ptr->open_mode = job_ptr->details->open_mode;
launch_msg_ptr->cpus_per_task = job_ptr->details->cpus_per_task;
launch_msg_ptr->pn_min_memory = job_ptr->details->pn_min_memory;
launch_msg_ptr->restart_cnt = job_ptr->restart_cnt;
launch_msg_ptr->profile = job_ptr->profile;
if (make_batch_job_cred(launch_msg_ptr, job_ptr, protocol_version)) {
error("%s: slurm_cred_create failure for %pJ, holding job",
__func__, job_ptr);
slurm_free_job_launch_msg(launch_msg_ptr);
job_mgr_handle_cred_failure(job_ptr);
return NULL;
}
launch_msg_ptr->acctg_freq = xstrdup(job_ptr->details->acctg_freq);
if (job_ptr->part_ptr)
launch_msg_ptr->partition = xstrdup(job_ptr->part_ptr->name);
else
launch_msg_ptr->partition = xstrdup(job_ptr->partition);
launch_msg_ptr->std_err = xstrdup(job_ptr->details->std_err);
launch_msg_ptr->std_in = xstrdup(job_ptr->details->std_in);
launch_msg_ptr->std_out = xstrdup(job_ptr->details->std_out);
launch_msg_ptr->work_dir = xstrdup(job_ptr->details->work_dir);
launch_msg_ptr->argc = job_ptr->details->argc;
launch_msg_ptr->argv = xduparray(job_ptr->details->argc,
job_ptr->details->argv);
launch_msg_ptr->spank_job_env_size = job_ptr->spank_job_env_size;
launch_msg_ptr->spank_job_env = xduparray(job_ptr->spank_job_env_size,
job_ptr->spank_job_env);
launch_msg_ptr->environment = get_job_env(job_ptr,
&launch_msg_ptr->envc);
if (!launch_msg_ptr->container && !launch_msg_ptr->environment) {
fail_why = "Unable to load job environment";
goto job_failed;
}
_split_env(launch_msg_ptr);
if (job_ptr->bit_flags & STEPMGR_ENABLED) {
env_array_overwrite(&launch_msg_ptr->environment,
"SLURM_STEPMGR", job_ptr->batch_host);
/* Update envc if env was added to */
launch_msg_ptr->envc =
PTR_ARRAY_SIZE(launch_msg_ptr->environment) - 1;
}
launch_msg_ptr->job_mem = job_ptr->details->pn_min_memory;
launch_msg_ptr->num_cpu_groups = job_ptr->job_resrcs->cpu_array_cnt;
launch_msg_ptr->cpus_per_node = xmalloc(
sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt);
memcpy(launch_msg_ptr->cpus_per_node,
job_ptr->job_resrcs->cpu_array_value,
(sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt));
launch_msg_ptr->cpu_count_reps = xmalloc(
sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt);
memcpy(launch_msg_ptr->cpu_count_reps,
job_ptr->job_resrcs->cpu_array_reps,
(sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt));
launch_msg_ptr->account = xstrdup(job_ptr->account);
if (job_ptr->qos_ptr)
launch_msg_ptr->qos = xstrdup(job_ptr->qos_ptr->name);
if (job_ptr->details->oom_kill_step != NO_VAL16)
launch_msg_ptr->oom_kill_step = job_ptr->details->oom_kill_step;
else
launch_msg_ptr->oom_kill_step =
slurm_conf.task_plugin_param & OOM_KILL_STEP;
/*
* Use resv_ptr->name instead of job_ptr->resv_name as the job
* could contain multiple reservation names.
*/
if (job_ptr->resv_ptr)
launch_msg_ptr->resv_name = xstrdup(job_ptr->resv_ptr->name);
launch_msg_ptr->tres_per_task = xstrdup(job_ptr->tres_per_task);
xassert(!fail_why);
return launch_msg_ptr;
job_failed:
/* fatal or kill the job as it can never be recovered */
if (!ignore_state_errors)
fatal("%s: %s for %pJ. Check file system serving StateSaveLocation as that directory may be missing or corrupted. Start with '-i' to ignore this error and kill the afflicted jobs.",
__func__, fail_why, job_ptr);
error("%s: %s for %pJ. %pJ will be killed due to system error.",
__func__, fail_why, job_ptr, job_ptr);
xfree(job_ptr->state_desc);
job_ptr->state_desc = xstrdup(fail_why);
job_ptr->state_reason = FAIL_SYSTEM;
last_job_update = time(NULL);
slurm_free_job_launch_msg(launch_msg_ptr);
/* ignore the return as job is in an unknown state anyway */
job_complete(job_ptr->job_id, slurm_conf.slurm_user_id, false, false,
1);
return NULL;
}
static int _foreach_het_job_ready(void *x, void *arg)
{
job_record_t *het_job = x;
het_job_ready_t *ready_struct = arg;
bool prolog = false;
if (ready_struct->het_job_leader->het_job_id != het_job->het_job_id) {
error("%s: Bad het_job_list for %pJ",
__func__, ready_struct->het_job_leader);
return 0;
}
ready_struct->het_job = het_job;
if (het_job->details)
prolog = het_job->details->prolog_running;
if (prolog || IS_JOB_CONFIGURING(het_job) ||
!test_job_nodes_ready(het_job)) {
ready_struct->het_job_leader = NULL;
return -1;
}
if (!ready_struct->job_ptr->batch_flag ||
(!IS_JOB_RUNNING(ready_struct->job_ptr) &&
!IS_JOB_SUSPENDED(ready_struct->job_ptr))) {
ready_struct->het_job_leader = NULL;
return -1;
}
ready_struct->het_job = NULL;
return 0;
}
/* Validate the job is ready for launch
* RET pointer to batch job to launch or NULL if not ready yet */
static job_record_t *_het_job_ready(job_record_t *job_ptr)
{
het_job_ready_t ready_struct = { 0 };
if (job_ptr->het_job_id == 0) /* Not a hetjob */
return job_ptr;
ready_struct.het_job_leader = find_job_record(job_ptr->het_job_id);
if (!ready_struct.het_job_leader) {
error("Hetjob leader %pJ not found", job_ptr);
return NULL;
}
if (!ready_struct.het_job_leader->het_job_list) {
error("Hetjob leader %pJ lacks het_job_list", job_ptr);
return NULL;
}
ready_struct.job_ptr = job_ptr;
(void) list_for_each(ready_struct.het_job_leader->het_job_list,
_foreach_het_job_ready, &ready_struct);
if (ready_struct.het_job_leader)
log_flag(HETJOB, "Batch hetjob %pJ being launched",
ready_struct.het_job_leader);
else if (ready_struct.het_job)
log_flag(HETJOB, "Batch hetjob %pJ waiting for job to be ready",
ready_struct.het_job);
return ready_struct.het_job_leader;
}
static void _set_job_env(job_record_t *job, batch_job_launch_msg_t *launch)
{
if (job->name)
env_array_overwrite(&launch->environment, "SLURM_JOB_NAME",
job->name);
if (job->details->open_mode) {
/* Propagate mode to spawned job using environment variable */
if (job->details->open_mode == OPEN_MODE_APPEND)
env_array_overwrite(&launch->environment,
"SLURM_OPEN_MODE", "a");
else
env_array_overwrite(&launch->environment,
"SLURM_OPEN_MODE", "t");
}
if (job->details->dependency)
env_array_overwrite(&launch->environment,
"SLURM_JOB_DEPENDENCY",
job->details->dependency);
/* intentionally skipping SLURM_EXPORT_ENV */
if (job->profile) {
char tmp[128] = {0};
acct_gather_profile_to_string_r(job->profile, tmp);
env_array_overwrite(&launch->environment, "SLURM_PROFILE", tmp);
}
if (job->details->acctg_freq)
env_array_overwrite(&launch->environment, "SLURM_ACCTG_FREQ",
job->details->acctg_freq);
if (job->network)
env_array_overwrite(&launch->environment, "SLURM_NETWORK",
job->network);
if (job->details->cpu_freq_min || job->details->cpu_freq_max ||
job->details->cpu_freq_gov) {
char *tmp = cpu_freq_to_cmdline(job->details->cpu_freq_min,
job->details->cpu_freq_max,
job->details->cpu_freq_gov);
if (tmp)
env_array_overwrite(&launch->environment,
"SLURM_CPU_FREQ_REQ", tmp);
xfree(tmp);
}
if (job->details->segment_size)
env_array_overwrite_fmt(&launch->environment,
"SLURM_JOB_SEGMENT_SIZE", "%u",
job->details->segment_size);
/* update size of env in case it changed */
if (launch->environment)
launch->envc = PTR_ARRAY_SIZE(launch->environment) - 1;
}
static int _foreach_set_het_job_env(void *x, void *arg)
{
job_record_t *het_job = x;
het_job_env_t *het_job_env = arg;
job_record_t *het_job_leader = het_job_env->het_job_leader;
int het_job_offset = het_job_env->het_job_offset;
batch_job_launch_msg_t *launch_msg_ptr = het_job_env->launch_msg_ptr;
uint32_t num_cpus = 0;
uint64_t tmp_mem = 0;
char *tmp_str = NULL;
if (het_job_leader->het_job_id != het_job->het_job_id) {
error("%s: Bad het_job_list for %pJ",
__func__, het_job_leader);
return 0;
}
if (het_job->account) {
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_ACCOUNT",
het_job_offset, "%s", het_job->account);
}
if (het_job->job_resrcs) {
tmp_str = uint32_compressed_to_str(
het_job->job_resrcs->cpu_array_cnt,
het_job->job_resrcs->cpu_array_value,
het_job->job_resrcs->cpu_array_reps);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_CPUS_PER_NODE",
het_job_offset, "%s", tmp_str);
xfree(tmp_str);
}
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_ID",
het_job_offset, "%u", het_job->job_id);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_NAME",
het_job_offset, "%s", het_job->name);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_NODELIST",
het_job_offset, "%s", het_job->nodes);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_NUM_NODES",
het_job_offset, "%u", het_job->node_cnt);
if (het_job->partition) {
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_PARTITION",
het_job_offset, "%s", het_job->partition);
}
if (het_job->qos_ptr) {
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_QOS",
het_job_offset, "%s", het_job->qos_ptr->name);
}
if (het_job->resv_ptr) {
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_JOB_RESERVATION",
het_job_offset, "%s", het_job->resv_ptr->name);
}
if (het_job->details)
tmp_mem = het_job->details->pn_min_memory;
if (tmp_mem & MEM_PER_CPU) {
tmp_mem &= (~MEM_PER_CPU);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_MEM_PER_CPU",
het_job_offset, "%"PRIu64"", tmp_mem);
} else if (tmp_mem) {
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_MEM_PER_NODE",
het_job_offset, "%"PRIu64"", tmp_mem);
}
if (het_job->details && het_job->details->segment_size)
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment, "SLURM_JOB_SEGMENT_SIZE",
het_job_offset, "%u", het_job->details->segment_size);
if (het_job->details && het_job->job_resrcs) {
/* Both should always be set for active jobs */
struct job_resources *resrcs_ptr = het_job->job_resrcs;
slurm_step_layout_t *step_layout = NULL;
uint16_t cpus_per_task_array[1];
uint32_t cpus_task_reps[1], task_dist;
uint16_t cpus_per_task = 1;
slurm_step_layout_req_t step_layout_req = {
.cpu_count_reps = resrcs_ptr->cpu_array_reps,
.cpus_per_node = resrcs_ptr->cpu_array_value,
.cpus_per_task = cpus_per_task_array,
.cpus_task_reps = cpus_task_reps,
.num_hosts = het_job->node_cnt,
.plane_size = NO_VAL16,
};
cpus_task_reps[0] = het_job->node_cnt;
for (int i = 0; i < resrcs_ptr->cpu_array_cnt; i++) {
num_cpus += resrcs_ptr->cpu_array_value[i] *
resrcs_ptr->cpu_array_reps[i];
}
if ((het_job->details->cpus_per_task > 0) &&
(het_job->details->cpus_per_task != NO_VAL16))
cpus_per_task = het_job->details->cpus_per_task;
cpus_per_task_array[0] = cpus_per_task;
if (het_job->details->num_tasks) {
step_layout_req.num_tasks =
het_job->details->num_tasks;
} else {
step_layout_req.num_tasks = num_cpus /
cpus_per_task;
}
if ((step_layout_req.node_list =
getenvp(launch_msg_ptr->environment,
"SLURM_ARBITRARY_NODELIST"))) {
task_dist = SLURM_DIST_ARBITRARY;
} else {
step_layout_req.node_list = het_job->nodes;
task_dist = SLURM_DIST_BLOCK;
}
step_layout_req.task_dist = task_dist;
step_layout = slurm_step_layout_create(&step_layout_req);
if (step_layout) {
tmp_str = uint16_array_to_str(
step_layout->node_cnt,
step_layout->tasks);
slurm_step_layout_destroy(step_layout);
(void) env_array_overwrite_het_fmt(
&launch_msg_ptr->environment,
"SLURM_TASKS_PER_NODE",
het_job_offset,"%s", tmp_str);
xfree(tmp_str);
}
} else if (IS_JOB_RUNNING(het_job)) {
if (!het_job->details)
error("%s: %pJ has null details member",
__func__, het_job);
if (!het_job->job_resrcs)
error("%s: %pJ has null job_resrcs member",
__func__, het_job);
}
het_job_env->het_job_offset++;
return 0;
}
/*
* Set some hetjob environment variables. This will include information
* about multiple job components (i.e. different slurmctld job records).
*/
static void _set_het_job_env(job_record_t *het_job_leader,
batch_job_launch_msg_t *launch_msg_ptr)
{
int i;
het_job_env_t het_job_env = {
.het_job_leader = het_job_leader,
.het_job_offset = 0,
.launch_msg_ptr = launch_msg_ptr,
};
if (het_job_leader->het_job_id == 0)
return;
if (!launch_msg_ptr->environment) {
error("%pJ lacks environment", het_job_leader);
return;
}
if (!het_job_leader->het_job_list) {
error("Hetjob leader %pJ lacks het_job_list",
het_job_leader);
return;
}
(void) list_for_each(het_job_leader->het_job_list,
_foreach_set_het_job_env,
&het_job_env);
/* Continue support for old hetjob terminology. */
(void) env_array_overwrite_fmt(&launch_msg_ptr->environment,
"SLURM_PACK_SIZE", "%d",
het_job_env.het_job_offset);
(void) env_array_overwrite_fmt(&launch_msg_ptr->environment,
"SLURM_HET_SIZE", "%d",
het_job_env.het_job_offset);
for (i = 0; launch_msg_ptr->environment[i]; i++)
;
launch_msg_ptr->envc = i;
}
/*
* launch_job - send an RPC to a slurmd to initiate a batch job
* IN job_ptr - pointer to job that will be initiated
*/
extern void launch_job(job_record_t *job_ptr)
{
batch_job_launch_msg_t *launch_msg_ptr;
uint16_t protocol_version = NO_VAL16;
agent_arg_t *agent_arg_ptr;
job_record_t *launch_job_ptr;
node_record_t *node_ptr;
xassert(job_ptr);
xassert(job_ptr->batch_flag);
if (job_ptr->total_cpus == 0)
return;
launch_job_ptr = _het_job_ready(job_ptr);
if (!launch_job_ptr)
return;
if (pick_batch_host(launch_job_ptr) != SLURM_SUCCESS)
return;
node_ptr = find_node_record(job_ptr->batch_host);
if (node_ptr)
protocol_version = node_ptr->protocol_version;
(void)build_batch_step(job_ptr);
launch_msg_ptr = _build_launch_job_msg(launch_job_ptr,protocol_version);
if (launch_msg_ptr == NULL)
return;
if (launch_job_ptr->het_job_id)
_set_het_job_env(launch_job_ptr, launch_msg_ptr);
_set_job_env(launch_job_ptr, launch_msg_ptr);
agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
agent_arg_ptr->protocol_version = protocol_version;
agent_arg_ptr->node_count = 1;
agent_arg_ptr->retry = 0;
xassert(job_ptr->batch_host);
agent_arg_ptr->hostlist = hostlist_create(launch_job_ptr->batch_host);
agent_arg_ptr->msg_type = REQUEST_BATCH_JOB_LAUNCH;
agent_arg_ptr->msg_args = (void *) launch_msg_ptr;
set_agent_arg_r_uid(agent_arg_ptr, SLURM_AUTH_UID_ANY);
/* Launch the RPC via agent */
agent_queue_request(agent_arg_ptr);
}
/*
* make_batch_job_cred - add a job credential to the batch_job_launch_msg
* IN/OUT launch_msg_ptr - batch_job_launch_msg in which job_id, step_id,
* uid and nodes have already been set
* IN job_ptr - pointer to job record
* RET 0 or error code
*/
extern int make_batch_job_cred(batch_job_launch_msg_t *launch_msg_ptr,
job_record_t *job_ptr,
uint16_t protocol_version)
{
slurm_cred_arg_t cred_arg;
job_resources_t *job_resrcs_ptr;
xassert(job_ptr->job_resrcs);
job_resrcs_ptr = job_ptr->job_resrcs;
if (job_ptr->job_resrcs == NULL) {
error("%s: %pJ is missing job_resrcs info",
__func__, job_ptr);
return SLURM_ERROR;
}
setup_cred_arg(&cred_arg, job_ptr);
cred_arg.step_id.job_id = launch_msg_ptr->job_id;
cred_arg.step_id.step_id = SLURM_BATCH_SCRIPT;
cred_arg.step_id.step_het_comp = NO_VAL;
if (job_resrcs_ptr->memory_allocated) {
int batch_inx = job_get_node_inx(
job_ptr->batch_host, job_ptr->node_bitmap);
if (batch_inx == -1) {
error("%s: Invalid batch host %s for %pJ; this should never happen",
__func__, job_ptr->batch_host, job_ptr);
batch_inx = 0;
}
cred_arg.job_mem_alloc = xmalloc(sizeof(uint64_t));
cred_arg.job_mem_alloc[0] =
job_resrcs_ptr->memory_allocated[batch_inx];
cred_arg.job_mem_alloc_rep_count = xmalloc(sizeof(uint64_t));
cred_arg.job_mem_alloc_rep_count[0] = 1;
cred_arg.job_mem_alloc_size = 1;
}
/* cred_arg.step_gres_list = NULL; */
xassert(job_ptr->batch_host);
cred_arg.step_hostlist = job_ptr->batch_host;
cred_arg.step_core_bitmap = job_resrcs_ptr->core_bitmap;
launch_msg_ptr->cred = slurm_cred_create(&cred_arg, false,
protocol_version);
xfree(cred_arg.job_mem_alloc);
xfree(cred_arg.job_mem_alloc_rep_count);
if (launch_msg_ptr->cred)
return SLURM_SUCCESS;
error("slurm_cred_create failure for batch job %u",
cred_arg.step_id.job_id);
return SLURM_ERROR;
}
static int _foreach_depend_list_copy(void *x, void *arg)
{
depend_spec_t *dep_src = x;
list_t **depend_list_dest = arg;
depend_spec_t *dep_dest = xmalloc(sizeof(depend_spec_t));
memcpy(dep_dest, dep_src, sizeof(depend_spec_t));
list_append(*depend_list_dest, dep_dest);
return 0;
}
/*
* Copy a job's dependency list
* IN depend_list_src - a job's depend_lst
* RET copy of depend_list_src, must bee freed by caller
*/
extern list_t *depended_list_copy(list_t *depend_list_src)
{
list_t *depend_list_dest = NULL;
if (!depend_list_src)
return depend_list_dest;
depend_list_dest = list_create(xfree_ptr);
(void) list_for_each(depend_list_src, _foreach_depend_list_copy,
&depend_list_dest);
return depend_list_dest;
}
static char *_depend_type2str(depend_spec_t *dep_ptr)
{
xassert(dep_ptr);
switch (dep_ptr->depend_type) {
case SLURM_DEPEND_AFTER:
return "after";
case SLURM_DEPEND_AFTER_ANY:
return "afterany";
case SLURM_DEPEND_AFTER_NOT_OK:
return "afternotok";
case SLURM_DEPEND_AFTER_OK:
return "afterok";
case SLURM_DEPEND_AFTER_CORRESPOND:
return "aftercorr";
case SLURM_DEPEND_EXPAND:
return "expand";
case SLURM_DEPEND_BURST_BUFFER:
return "afterburstbuffer";
case SLURM_DEPEND_SINGLETON:
return "singleton";
default:
return "unknown";
}
}
static uint32_t _depend_state_str2state(char *state_str)
{
if (!xstrcasecmp(state_str, "fulfilled"))
return DEPEND_FULFILLED;
if (!xstrcasecmp(state_str, "failed"))
return DEPEND_FAILED;
/* Default to not fulfilled */
return DEPEND_NOT_FULFILLED;
}
static char *_depend_state2str(depend_spec_t *dep_ptr)
{
xassert(dep_ptr);
switch(dep_ptr->depend_state) {
case DEPEND_NOT_FULFILLED:
return "unfulfilled";
case DEPEND_FULFILLED:
return "fulfilled";
case DEPEND_FAILED:
return "failed";
default:
return "unknown";
}
}
static int _foreach_depend_list2str(void *x, void *arg)
{
depend_spec_t *dep_ptr = x;
depend_str_t *depend_str = arg;
job_record_t *job_ptr = depend_str->job_ptr;
/*
* Show non-fulfilled (including failed) dependencies, but don't
* show fulfilled dependencies.
*/
if (dep_ptr->depend_state == DEPEND_FULFILLED)
return 0;
if (dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) {
xstrfmtcat(job_ptr->details->dependency,
"%ssingleton(%s)",
depend_str->sep, _depend_state2str(dep_ptr));
} else {
char *dep_str = _depend_type2str(dep_ptr);
if (dep_ptr->array_task_id == INFINITE)
xstrfmtcat(job_ptr->details->dependency, "%s%s:%u_*",
depend_str->sep, dep_str, dep_ptr->job_id);
else if (dep_ptr->array_task_id == NO_VAL)
xstrfmtcat(job_ptr->details->dependency, "%s%s:%u",
depend_str->sep, dep_str, dep_ptr->job_id);
else
xstrfmtcat(job_ptr->details->dependency, "%s%s:%u_%u",
depend_str->sep, dep_str, dep_ptr->job_id,
dep_ptr->array_task_id);
if (dep_ptr->depend_time)
xstrfmtcat(job_ptr->details->dependency,
"+%u", dep_ptr->depend_time / 60);
xstrfmtcat(job_ptr->details->dependency, "(%s)",
_depend_state2str(dep_ptr));
}
if (depend_str->set_or_flag)
dep_ptr->depend_flags |= SLURM_FLAGS_OR;
if (dep_ptr->depend_flags & SLURM_FLAGS_OR)
depend_str->sep = "?";
else
depend_str->sep = ",";
return 0;
}
static void _depend_list2str(job_record_t *job_ptr, bool set_or_flag)
{
depend_str_t depend_str = {
.job_ptr = job_ptr,
.sep = "",
.set_or_flag = set_or_flag,
};
if (job_ptr->details == NULL)
return;
xfree(job_ptr->details->dependency);
if (job_ptr->details->depend_list == NULL
|| list_count(job_ptr->details->depend_list) == 0)
return;
(void) list_for_each(job_ptr->details->depend_list,
_foreach_depend_list2str,
&depend_str);
}
/* Print a job's dependency information based upon job_ptr->depend_list */
extern void print_job_dependency(job_record_t *job_ptr, const char *func)
{
if ((job_ptr->details == NULL) ||
(job_ptr->details->depend_list == NULL)) {
info("%s: %pJ has no dependency.", func, job_ptr);
return;
}
_depend_list2str(job_ptr, false);
info("%s: Dependency information for %pJ:\n %s",
func, job_ptr, job_ptr->details->dependency);
}
static int _test_job_dependency_common(
bool is_complete, bool is_completed, bool is_pending,
bool *clear_dep, bool *failure,
job_record_t *job_ptr, struct depend_spec *dep_ptr)
{
int rc = 0;
job_record_t *djob_ptr = dep_ptr->job_ptr;
time_t now = time(NULL);
xassert(clear_dep);
xassert(failure);
if (dep_ptr->depend_type == SLURM_DEPEND_AFTER) {
if (!is_pending) {
if (!dep_ptr->depend_time ||
(djob_ptr->start_time &&
((now - djob_ptr->start_time) >=
dep_ptr->depend_time)) ||
fed_mgr_job_started_on_sib(djob_ptr)) {
*clear_dep = true;
} /* else still depends */
} /* else still depends */
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_ANY) {
if (is_completed)
*clear_dep = true;
/* else still depends */
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_NOT_OK) {
if (djob_ptr->job_state & JOB_SPECIAL_EXIT)
*clear_dep = true;
else if (!is_completed) { /* Still depends */
} else if (!is_complete)
*clear_dep = true;
else
*failure = true;
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_OK) {
if (!is_completed) { /* Still depends */
} else if (is_complete)
*clear_dep = true;
else
*failure = true;
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_CORRESPOND) {
job_record_t *dcjob_ptr = NULL;
if ((job_ptr->array_task_id == NO_VAL) ||
(job_ptr->array_task_id == INFINITE))
dcjob_ptr = NULL;
else
dcjob_ptr = find_job_array_rec(dep_ptr->job_id,
job_ptr->array_task_id);
if (dcjob_ptr) {
if (!IS_JOB_COMPLETED(dcjob_ptr)) { /* Still depends */
} else if (IS_JOB_COMPLETE(dcjob_ptr))
*clear_dep = true;
else
*failure = true;
} else {
if (!is_completed) { /* Still depends */
} else if (is_complete)
*clear_dep = true;
else if (job_ptr->array_recs &&
(job_ptr->array_task_id == NO_VAL)) {
/* Still depends */
} else
*failure = true;
}
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_BURST_BUFFER) {
if (is_completed &&
(bb_g_job_test_stage_out(djob_ptr) == 1))
*clear_dep = true;
/* else still depends */
rc = 1;
} else if (dep_ptr->depend_type == SLURM_DEPEND_EXPAND) {
time_t now = time(NULL);
if (is_pending) { /* Still depends */
} else if (is_completed)
*failure = true;
else if ((djob_ptr->end_time != 0) &&
(djob_ptr->end_time > now)) {
job_ptr->time_limit = djob_ptr->end_time - now;
job_ptr->time_limit /= 60; /* sec to min */
*clear_dep = true;
}
if (!*failure && job_ptr->details && djob_ptr->details) {
job_ptr->details->share_res =
djob_ptr->details->share_res;
job_ptr->details->whole_node =
djob_ptr->details->whole_node;
}
rc = 1;
}
return rc;
}
static void _test_dependency_state(depend_spec_t *dep_ptr,
test_job_dep_t *test_job_dep)
{
xassert(test_job_dep);
test_job_dep->or_flag =
(dep_ptr->depend_flags & SLURM_FLAGS_OR) ? true : false;
if (test_job_dep->or_flag) {
if (dep_ptr->depend_state == DEPEND_FULFILLED)
test_job_dep->or_satisfied = true;
else if (dep_ptr->depend_state == DEPEND_NOT_FULFILLED)
test_job_dep->has_unfulfilled = true;
} else { /* AND'd dependencies */
if (dep_ptr->depend_state == DEPEND_FAILED)
test_job_dep->and_failed = true;
else if (dep_ptr->depend_state == DEPEND_NOT_FULFILLED)
test_job_dep->has_unfulfilled = true;
}
}
static int _foreach_test_job_dependency(void *x, void *arg)
{
depend_spec_t *dep_ptr = x;
test_job_dep_t *test_job_dep = arg;
job_record_t *job_ptr = test_job_dep->job_ptr;
job_record_t *djob_ptr;
bool clear_dep = false, failure = false;
bool remote = (dep_ptr->depend_flags & SLURM_FLAGS_REMOTE) ?
true : false;
/*
* If the job id is for a cluster that's not in the federation
* (it's likely the cluster left the federation), then set
* this dependency's state to failed.
*/
if (remote) {
if (fed_mgr_is_origin_job(job_ptr) &&
(dep_ptr->depend_state == DEPEND_NOT_FULFILLED) &&
(dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) &&
(!fed_mgr_is_job_id_in_fed(dep_ptr->job_id))) {
log_flag(DEPENDENCY, "%s: %pJ dependency %s:%u failed due to job_id not in federation.",
__func__, job_ptr,
_depend_type2str(dep_ptr),
dep_ptr->job_id);
test_job_dep->changed = true;
dep_ptr->depend_state = DEPEND_FAILED;
}
}
if ((dep_ptr->depend_state != DEPEND_NOT_FULFILLED) || remote) {
_test_dependency_state(dep_ptr, test_job_dep);
return 0;
}
/* Test local, unfulfilled dependency: */
test_job_dep->has_local_depend = true;
dep_ptr->job_ptr = find_job_array_rec(dep_ptr->job_id,
dep_ptr->array_task_id);
djob_ptr = dep_ptr->job_ptr;
if ((dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) &&
job_ptr->name) {
if (list_find_first(job_list, _find_singleton_job,
job_ptr) ||
!fed_mgr_is_singleton_satisfied(job_ptr,
dep_ptr, true)) {
/* Still depends */
} else
clear_dep = true;
} else if (!djob_ptr || (djob_ptr->magic != JOB_MAGIC) ||
((djob_ptr->job_id != dep_ptr->job_id) &&
(djob_ptr->array_job_id != dep_ptr->job_id))) {
/* job is gone, dependency lifted */
clear_dep = true;
} else {
bool is_complete, is_completed, is_pending;
/* Special case, apply test to job array as a whole */
if (dep_ptr->array_task_id == INFINITE) {
is_complete = test_job_array_complete(
dep_ptr->job_id);
is_completed = test_job_array_completed(
dep_ptr->job_id);
is_pending = test_job_array_pending(
dep_ptr->job_id);
} else {
/* Normal job */
is_complete = IS_JOB_COMPLETE(djob_ptr);
is_completed = IS_JOB_COMPLETED(djob_ptr);
is_pending = IS_JOB_PENDING(djob_ptr) ||
IS_JOB_CONFIGURING(djob_ptr);
}
if (!_test_job_dependency_common(
is_complete, is_completed, is_pending,
&clear_dep, &failure,
job_ptr, dep_ptr))
failure = true;
}
if (failure) {
dep_ptr->depend_state = DEPEND_FAILED;
test_job_dep->changed = true;
log_flag(DEPENDENCY, "%s: %pJ dependency %s:%u failed.",
__func__, job_ptr, _depend_type2str(dep_ptr),
dep_ptr->job_id);
} else if (clear_dep) {
dep_ptr->depend_state = DEPEND_FULFILLED;
test_job_dep->changed = true;
log_flag(DEPENDENCY, "%s: %pJ dependency %s:%u fulfilled.",
__func__, job_ptr, _depend_type2str(dep_ptr),
dep_ptr->job_id);
}
_test_dependency_state(dep_ptr, test_job_dep);
return 0;
}
/*
* Determine if a job's dependencies are met
* Inputs: job_ptr
* Outputs: was_changed (optional) -
* If it exists, set it to true if at least 1 dependency changed
* state, otherwise false.
* RET: NO_DEPEND = no dependencies
* LOCAL_DEPEND = local dependencies remain
* FAIL_DEPEND = failure (job completion code not per dependency),
* delete the job
* REMOTE_DEPEND = only remote dependencies remain
*/
extern int test_job_dependency(job_record_t *job_ptr, bool *was_changed)
{
test_job_dep_t test_job_dep = {
.job_ptr = job_ptr,
};
int results = NO_DEPEND;
if ((job_ptr->details == NULL) ||
(job_ptr->details->depend_list == NULL) ||
(list_count(job_ptr->details->depend_list) == 0)) {
job_ptr->bit_flags &= ~JOB_DEPENDENT;
if (was_changed)
*was_changed = false;
return NO_DEPEND;
}
(void) list_for_each(job_ptr->details->depend_list,
_foreach_test_job_dependency,
&test_job_dep);
if (test_job_dep.or_satisfied &&
(job_ptr->state_reason == WAIT_DEP_INVALID)) {
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
last_job_update = time(NULL);
}
if (test_job_dep.or_satisfied ||
(!test_job_dep.or_flag &&
!test_job_dep.and_failed &&
!test_job_dep.has_unfulfilled)) {
/* Dependency fulfilled */
fed_mgr_remove_remote_dependencies(job_ptr);
job_ptr->bit_flags &= ~JOB_DEPENDENT;
/*
* Don't flush the list if this job isn't on the origin - that
* means that we were called from
* fed_mgr_test_remote_dependencies() and need to send back the
* dependency list to the origin.
*/
if (fed_mgr_is_origin_job(job_ptr))
list_flush(job_ptr->details->depend_list);
_depend_list2str(job_ptr, false);
results = NO_DEPEND;
log_flag(DEPENDENCY, "%s: %pJ dependency fulfilled",
__func__, job_ptr);
} else {
if (test_job_dep.changed) {
_depend_list2str(job_ptr, false);
if (slurm_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
print_job_dependency(job_ptr, __func__);
}
job_ptr->bit_flags |= JOB_DEPENDENT;
acct_policy_remove_accrue_time(job_ptr, false);
if (test_job_dep.and_failed ||
(test_job_dep.or_flag && !test_job_dep.has_unfulfilled))
/* Dependency failed */
results = FAIL_DEPEND;
else
/* Still dependent */
results = test_job_dep.has_local_depend ? LOCAL_DEPEND :
REMOTE_DEPEND;
}
if (was_changed)
*was_changed = test_job_dep.changed;
return results;
}
/* Given a new job dependency specification, expand job array specifications
* into a collection of task IDs that update_job_dependency can parse.
* (e.g. "after:123_[4-5]" to "after:123_4:123_5")
* Returns NULL if not valid job array specification.
* Returned value must be xfreed. */
static char *_xlate_array_dep(char *new_depend)
{
char *new_array_dep = NULL, *array_tmp, *jobid_ptr = NULL, *sep;
bitstr_t *array_bitmap;
int i;
uint32_t job_id;
int32_t t, t_first, t_last;
if (strstr(new_depend, "_[") == NULL)
return NULL; /* No job array expressions */
if (max_array_size == NO_VAL) {
max_array_size = slurm_conf.max_array_sz;
}
for (i = 0; new_depend[i]; i++) {
xstrfmtcat(new_array_dep, "%c", new_depend[i]);
if ((new_depend[i] >= '0') && (new_depend[i] <= '9')) {
if (jobid_ptr == NULL)
jobid_ptr = new_depend + i;
} else if ((new_depend[i] == '_') && (new_depend[i+1] == '[') &&
(jobid_ptr != NULL)) {
job_id = (uint32_t) atol(jobid_ptr);
i += 2; /* Skip over "_[" */
array_tmp = xstrdup(new_depend + i);
sep = strchr(array_tmp, ']');
if (sep)
sep[0] = '\0';
array_bitmap = bit_alloc(max_array_size);
if ((sep == NULL) ||
(bit_unfmt(array_bitmap, array_tmp) != 0) ||
((t_first = bit_ffs(array_bitmap)) == -1)) {
/* Invalid format */
xfree(array_tmp);
FREE_NULL_BITMAP(array_bitmap);
xfree(new_array_dep);
return NULL;
}
i += (sep - array_tmp); /* Move to location of ']' */
xfree(array_tmp);
t_last = bit_fls(array_bitmap);
for (t = t_first; t <= t_last; t++) {
if (!bit_test(array_bitmap, t))
continue;
if (t == t_first) {
xstrfmtcat(new_array_dep, "%d", t);
} else {
xstrfmtcat(new_array_dep, ":%u_%d",
job_id, t);
}
}
FREE_NULL_BITMAP(array_bitmap);
jobid_ptr = NULL;
} else {
jobid_ptr = NULL;
}
}
return new_array_dep;
}
/* Copy dependent job's TRES options into another job's options */
static void _copy_tres_opts(job_record_t *job_ptr, job_record_t *dep_job_ptr)
{
xfree(job_ptr->cpus_per_tres);
job_ptr->cpus_per_tres = xstrdup(dep_job_ptr->cpus_per_tres);
xfree(job_ptr->tres_per_job);
job_ptr->tres_per_job = xstrdup(dep_job_ptr->tres_per_job);
xfree(job_ptr->tres_per_node);
job_ptr->tres_per_node = xstrdup(dep_job_ptr->tres_per_node);
xfree(job_ptr->tres_per_socket);
job_ptr->tres_per_socket = xstrdup(dep_job_ptr->tres_per_socket);
xfree(job_ptr->tres_per_task);
job_ptr->tres_per_task = xstrdup(dep_job_ptr->tres_per_task);
xfree(job_ptr->mem_per_tres);
job_ptr->mem_per_tres = xstrdup(dep_job_ptr->mem_per_tres);
}
static int _find_dependency(void *arg, void *key)
{
/* Does arg (dependency in the list) match key (new dependency)? */
depend_spec_t *dep_ptr = (depend_spec_t *)arg;
depend_spec_t *new_dep = (depend_spec_t *)key;
return (dep_ptr->job_id == new_dep->job_id) &&
(dep_ptr->array_task_id == new_dep->array_task_id) &&
(dep_ptr->depend_type == new_dep->depend_type);
}
extern depend_spec_t *find_dependency(job_record_t *job_ptr,
depend_spec_t *dep_ptr)
{
if (!job_ptr->details || !job_ptr->details->depend_list)
return NULL;
return list_find_first(job_ptr->details->depend_list,
_find_dependency, dep_ptr);
}
/*
* Add a new dependency to the list, ensuring that the list is unique.
* Dependencies are uniquely identified by a combination of job_id and
* depend_type.
*/
static void _add_dependency_to_list(list_t *depend_list,
depend_spec_t *dep_ptr)
{
if (!list_find_first(depend_list, _find_dependency, dep_ptr))
list_append(depend_list, dep_ptr);
}
static int _parse_depend_state(char **str_ptr, uint32_t *depend_state)
{
char *sep_ptr;
if ((sep_ptr = strchr(*str_ptr, '('))) {
/* Get the whole string before ")", convert to state */
char *paren = strchr(*str_ptr, ')');
if (!paren)
return SLURM_ERROR;
else
*paren = '\0';
sep_ptr++; /* skip over "(" */
*depend_state = _depend_state_str2state(sep_ptr);
/* Don't allow depend_fulfilled as a string. */
if (*depend_state != DEPEND_FAILED)
*depend_state = DEPEND_NOT_FULFILLED;
*str_ptr = paren + 1; /* skip over ")" */
} else
*depend_state = DEPEND_NOT_FULFILLED;
return SLURM_SUCCESS;
}
static job_record_t *_find_dependent_job_ptr(uint32_t job_id,
uint32_t *array_task_id)
{
job_record_t *dep_job_ptr;
if (*array_task_id == NO_VAL) {
dep_job_ptr = find_job_record(job_id);
if (!dep_job_ptr)
dep_job_ptr = find_job_array_rec(job_id, INFINITE);
if (dep_job_ptr &&
(dep_job_ptr->array_job_id == job_id) &&
((dep_job_ptr->array_task_id != NO_VAL) ||
(dep_job_ptr->array_recs != NULL)))
*array_task_id = INFINITE;
} else
dep_job_ptr = find_job_array_rec(job_id, *array_task_id);
return dep_job_ptr;
}
/*
* job_ptr - job that is getting a new dependency
* dep_job_ptr - pointer to the job that job_ptr wants to depend on
* - This can be NULL, for example if it's a remote dependency. That's okay.
* job_id - job_id of the dependency string
* array_task_id - array_task_id of the dependency string
* - Equals NO_VAL if the dependency isn't a job array.
* - Equals INFINITE if the dependency is the whole job array.
* - Otherwise this equals a specific task of the job array (0, 1, 2, etc.)
*
* RET true if job_ptr is the same job as the new dependency, false otherwise.
*
* Example:
* scontrol update jobid=123 dependency=afterok:456_5
*
* job_ptr points to the job record for jobid=123.
* dep_job_ptr points to the job record for 456_5.
* job_id == 456. (This is probably different from dep_job_ptr->job_id.)
* array_task_id == 5.
*/
static bool _depends_on_same_job(job_record_t *job_ptr,
job_record_t *dep_job_ptr,
uint32_t job_id, uint32_t array_task_id)
{
if (array_task_id == INFINITE) {
/* job_ptr wants to set a dependency on a whole job array */
if ((job_ptr->array_task_id != NO_VAL) ||
(job_ptr->array_recs)) {
/*
* job_ptr is a specific task in a job array, or is
* the meta job of a job array.
* Test if job_ptr belongs to the array indicated by
* the dependency string's "job_id"
*/
return (job_ptr->array_job_id == job_id);
} else {
/* job_ptr is a normal job */
return (job_ptr == dep_job_ptr);
}
} else {
/* Doesn't depend on a whole job array; test normally */
return (job_ptr == dep_job_ptr);
}
}
/*
* The new dependency format is:
*
* <type:job_id[:job_id][,type:job_id[:job_id]]> or
* <type:job_id[:job_id][?type:job_id[:job_id]]>
*
* This function parses the all job id's within a single dependency type.
* One char past the end of valid job id's is returned in (*sep_ptr2).
* Set (*rc) to ESLURM_DEPENDENCY for invalid job id's.
*/
static void _parse_dependency_jobid_new(job_record_t *job_ptr,
list_t *new_depend_list, char *sep_ptr,
char **sep_ptr2, char *tok,
uint16_t depend_type, int select_hetero,
int *rc)
{
depend_spec_t *dep_ptr;
job_record_t *dep_job_ptr = NULL;
int expand_cnt = 0;
uint32_t job_id, array_task_id, depend_state;
char *tmp = NULL;
int depend_time = 0;
while (!(*rc)) {
job_id = strtol(sep_ptr, &tmp, 10);
if ((tmp != NULL) && (tmp[0] == '_')) {
if (tmp[1] == '*') {
array_task_id = INFINITE;
tmp += 2; /* Past "_*" */
} else {
array_task_id = strtol(tmp+1,
&tmp, 10);
}
} else
array_task_id = NO_VAL;
if ((tmp == NULL) || (job_id == 0) ||
((tmp[0] != '\0') && (tmp[0] != ',') &&
(tmp[0] != '?') && (tmp[0] != ':') &&
(tmp[0] != '+') && (tmp[0] != '('))) {
*rc = ESLURM_DEPENDENCY;
break;
}
dep_job_ptr = _find_dependent_job_ptr(job_id, &array_task_id);
if (!dep_job_ptr && fed_mgr_is_origin_job_id(job_id) &&
((depend_type == SLURM_DEPEND_AFTER_OK) ||
(depend_type == SLURM_DEPEND_AFTER_NOT_OK))) {
/*
* Reject the job since we won't be able to check if
* job dependency was fulfilled or not.
*/
*rc = ESLURM_DEPENDENCY;
break;
}
/*
* _find_dependent_job_ptr() may modify array_task_id, so check
* if the job is the same after that.
*/
if (_depends_on_same_job(job_ptr, dep_job_ptr, job_id,
array_task_id)) {
*rc = ESLURM_DEPENDENCY;
break;
}
if ((depend_type == SLURM_DEPEND_EXPAND) &&
((expand_cnt++ > 0) || (dep_job_ptr == NULL) ||
(!IS_JOB_RUNNING(dep_job_ptr)) ||
(dep_job_ptr->qos_id != job_ptr->qos_id) ||
(dep_job_ptr->part_ptr == NULL) ||
(job_ptr->part_ptr == NULL) ||
(dep_job_ptr->part_ptr != job_ptr->part_ptr))) {
/*
* Expand only jobs in the same QOS and partition
*/
*rc = ESLURM_DEPENDENCY;
break;
}
if (tmp[0] == '+') {
sep_ptr = &tmp[1]; /* skip over "+" */
depend_time = strtol(sep_ptr, &tmp, 10);
if (depend_time <= 0) {
*rc = ESLURM_DEPENDENCY;
break;
}
depend_time *= 60;
}
if (_parse_depend_state(&tmp, &depend_state)) {
*rc = ESLURM_DEPENDENCY;
break;
}
if (depend_type == SLURM_DEPEND_EXPAND) {
assoc_mgr_lock_t locks = { .tres = READ_LOCK };
job_details_t *detail_ptr = job_ptr->details;
multi_core_data_t *mc_ptr = detail_ptr->mc_ptr;
gres_job_state_validate_t gres_js_val = {
.cpus_per_task =
&detail_ptr->orig_cpus_per_task,
.max_nodes = &detail_ptr->max_nodes,
.min_cpus = &detail_ptr->min_cpus,
.min_nodes = &detail_ptr->min_nodes,
.ntasks_per_node = &detail_ptr->ntasks_per_node,
.ntasks_per_socket = &mc_ptr->ntasks_per_socket,
.ntasks_per_tres = &detail_ptr->ntasks_per_tres,
.num_tasks = &detail_ptr->num_tasks,
.sockets_per_node = &mc_ptr->sockets_per_node,
.gres_list = &job_ptr->gres_list_req,
};
job_ptr->details->expanding_jobid = job_id;
if (select_hetero == 0) {
/*
* GRES per node of this job must match
* the job being expanded. Other options
* are ignored.
*/
_copy_tres_opts(job_ptr, dep_job_ptr);
}
gres_js_val.cpus_per_tres = job_ptr->cpus_per_tres;
gres_js_val.mem_per_tres = job_ptr->mem_per_tres;
gres_js_val.tres_freq = job_ptr->tres_freq;
gres_js_val.tres_per_job = job_ptr->tres_per_job;
gres_js_val.tres_per_node = job_ptr->tres_per_node;
gres_js_val.tres_per_socket = job_ptr->tres_per_socket;
gres_js_val.tres_per_task = job_ptr->tres_per_task;
FREE_NULL_LIST(job_ptr->gres_list_req);
(void) gres_job_state_validate(&gres_js_val);
assoc_mgr_lock(&locks);
gres_stepmgr_set_job_tres_cnt(
job_ptr->gres_list_req,
job_ptr->details->min_nodes,
job_ptr->tres_req_cnt,
true);
xfree(job_ptr->tres_req_str);
job_ptr->tres_req_str =
assoc_mgr_make_tres_str_from_array(
job_ptr->tres_req_cnt,
TRES_STR_FLAG_SIMPLE, true);
assoc_mgr_unlock(&locks);
}
dep_ptr = xmalloc(sizeof(depend_spec_t));
dep_ptr->array_task_id = array_task_id;
dep_ptr->depend_type = depend_type;
if (job_ptr->fed_details && !fed_mgr_is_origin_job_id(job_id)) {
if (depend_type == SLURM_DEPEND_EXPAND) {
error("%s: Job expansion not permitted for remote jobs",
__func__);
*rc = ESLURM_DEPENDENCY;
xfree(dep_ptr);
break;
}
/* The dependency is on a remote cluster */
dep_ptr->depend_flags |= SLURM_FLAGS_REMOTE;
dep_job_ptr = NULL;
}
if (dep_job_ptr) { /* job still active */
if (array_task_id == NO_VAL)
dep_ptr->job_id = dep_job_ptr->job_id;
else
dep_ptr->job_id = dep_job_ptr->array_job_id;
} else
dep_ptr->job_id = job_id;
dep_ptr->job_ptr = dep_job_ptr;
dep_ptr->depend_time = depend_time;
dep_ptr->depend_state = depend_state;
_add_dependency_to_list(new_depend_list, dep_ptr);
if (tmp[0] != ':')
break;
sep_ptr = tmp + 1; /* skip over ":" */
}
*sep_ptr2 = tmp;
}
/*
* The old dependency format is a comma-separated list of job id's.
* Parse a single jobid.
* One char past the end of a valid job id will be returned in (*sep_ptr).
* For an invalid job id, (*rc) will be set to ESLURM_DEPENDENCY.
*/
static void _parse_dependency_jobid_old(job_record_t *job_ptr,
list_t *new_depend_list, char **sep_ptr,
char *tok, int *rc)
{
depend_spec_t *dep_ptr;
job_record_t *dep_job_ptr = NULL;
uint32_t job_id, array_task_id;
char *tmp = NULL;
job_id = strtol(tok, &tmp, 10);
if ((tmp != NULL) && (tmp[0] == '_')) {
if (tmp[1] == '*') {
array_task_id = INFINITE;
tmp += 2; /* Past "_*" */
} else {
array_task_id = strtol(tmp+1, &tmp, 10);
}
} else {
array_task_id = NO_VAL;
}
*sep_ptr = tmp;
if ((tmp == NULL) || (job_id == 0) ||
((tmp[0] != '\0') && (tmp[0] != ','))) {
*rc = ESLURM_DEPENDENCY;
return;
}
/*
* _find_dependent_job_ptr() may modify array_task_id, so check
* if the job is the same after that.
*/
dep_job_ptr = _find_dependent_job_ptr(job_id, &array_task_id);
if (_depends_on_same_job(job_ptr, dep_job_ptr, job_id, array_task_id)) {
*rc = ESLURM_DEPENDENCY;
return;
}
dep_ptr = xmalloc(sizeof(depend_spec_t));
dep_ptr->array_task_id = array_task_id;
dep_ptr->depend_type = SLURM_DEPEND_AFTER_ANY;
if (job_ptr->fed_details &&
!fed_mgr_is_origin_job_id(job_id)) {
/* The dependency is on a remote cluster */
dep_ptr->depend_flags |= SLURM_FLAGS_REMOTE;
dep_job_ptr = NULL;
}
if (dep_job_ptr) {
if (array_task_id == NO_VAL) {
dep_ptr->job_id = dep_job_ptr->job_id;
} else {
dep_ptr->job_id = dep_job_ptr->array_job_id;
}
} else
dep_ptr->job_id = job_id;
dep_ptr->job_ptr = dep_job_ptr; /* Can be NULL */
_add_dependency_to_list(new_depend_list, dep_ptr);
}
static int _foreach_update_job_depenency_list(void *x, void *arg)
{
depend_spec_t *dep_ptr = x, *job_depend;
test_job_dep_t *test_job_dep = arg;
job_record_t *job_ptr = test_job_dep->job_ptr;
/*
* If the dependency is marked as remote, then it wasn't updated
* by the sibling cluster. Skip it.
*/
if (dep_ptr->depend_flags & SLURM_FLAGS_REMOTE)
return 0;
/*
* Find the dependency in job_ptr that matches this one.
* Then update job_ptr's dependency state (not fulfilled,
* fulfilled, or failed) to match this one.
*/
job_depend = list_find_first(job_ptr->details->depend_list,
_find_dependency,
dep_ptr);
if (!job_depend) {
/*
* This can happen if the job's dependency is updated
* and the update doesn't get to the sibling before
* the sibling sends back an update to the origin (us).
*/
log_flag(DEPENDENCY, "%s: Cannot find dependency %s:%u for %pJ, it may have been cleared before we got here.",
__func__, _depend_type2str(dep_ptr),
dep_ptr->job_id, job_ptr);
return 0;
}
/*
* If the dependency is already fulfilled, don't update it.
* Otherwise update the dependency state.
*/
if ((job_depend->depend_state == DEPEND_FULFILLED) ||
(job_depend->depend_state == dep_ptr->depend_state))
return 0;
if (job_depend->depend_type == SLURM_DEPEND_SINGLETON) {
/*
* We need to update the singleton dependency with
* the cluster bit, but test_job_dependency() will test
* if it is fulfilled, so don't change the depend_state
* here.
*/
job_depend->singleton_bits |= dep_ptr->singleton_bits;
if (!fed_mgr_is_singleton_satisfied(job_ptr, job_depend,
false))
return 0;
}
job_depend->depend_state = dep_ptr->depend_state;
test_job_dep->changed = true;
return 0;
}
extern bool update_job_dependency_list(job_record_t *job_ptr,
list_t *new_depend_list)
{
test_job_dep_t test_job_dep = {
.job_ptr = job_ptr,
};
xassert(job_ptr);
xassert(job_ptr->details);
xassert(job_ptr->details->depend_list);
(void) list_for_each(new_depend_list,
_foreach_update_job_depenency_list,
&test_job_dep);
return test_job_dep.changed;
}
static int _foreach_handle_job_dependency_updates(void *x, void *arg)
{
depend_spec_t *dep_ptr = x;
test_job_dep_t *test_job_dep = arg;
_test_dependency_state(dep_ptr, test_job_dep);
return 0;
}
extern int handle_job_dependency_updates(void *object, void *arg)
{
job_record_t *job_ptr = (job_record_t *) object;
time_t now = time(NULL);
test_job_dep_t test_job_dep = {
.job_ptr = job_ptr,
};
xassert(job_ptr->details);
xassert(job_ptr->details->depend_list);
/*
* Check the depend_state of each dependency.
* All dependencies are OR'd or AND'd - we don't allow a mix.
* OR'd dependencies:
* - If one dependency succeeded, the whole thing passes.
* - If there is at least one unfulfilled dependency,
* the job is still dependent.
* - All dependencies failed == dependency never satisfied.
* AND'd dependencies:
* - One failure == dependency never satisfied
* - One+ not fulfilled == still dependent
* - All succeeded == dependency fulfilled
*/
(void) list_for_each(job_ptr->details->depend_list,
_foreach_handle_job_dependency_updates,
&test_job_dep);
if (test_job_dep.or_satisfied ||
(!test_job_dep.or_flag &&
!test_job_dep.and_failed &&
!test_job_dep.has_unfulfilled)) {
/* Dependency fulfilled */
fed_mgr_remove_remote_dependencies(job_ptr);
job_ptr->bit_flags &= ~JOB_DEPENDENT;
list_flush(job_ptr->details->depend_list);
if ((job_ptr->state_reason == WAIT_DEP_INVALID) ||
(job_ptr->state_reason == WAIT_DEPENDENCY)) {
job_ptr->state_reason = WAIT_NO_REASON;
xfree(job_ptr->state_desc);
last_job_update = now;
}
_depend_list2str(job_ptr, false);
fed_mgr_job_requeue(job_ptr);
} else {
_depend_list2str(job_ptr, false);
job_ptr->bit_flags |= JOB_DEPENDENT;
acct_policy_remove_accrue_time(job_ptr, false);
if (test_job_dep.and_failed ||
(test_job_dep.or_flag && !test_job_dep.has_unfulfilled)) {
/* Dependency failed */
handle_invalid_dependency(job_ptr);
} else {
/* Still dependent */
job_ptr->state_reason = WAIT_DEPENDENCY;
xfree(job_ptr->state_desc);
last_job_update = now;
}
}
if (slurm_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
print_job_dependency(job_ptr, __func__);
return SLURM_SUCCESS;
}
/*
* Parse a job dependency string and use it to establish a "depend_spec"
* list of dependencies. We accept both old format (a single job ID) and
* new format (e.g. "afterok:123:124,after:128").
* IN job_ptr - job record to have dependency and depend_list updated
* IN new_depend - new dependency description
* RET returns an error code from slurm_errno.h
*/
extern int update_job_dependency(job_record_t *job_ptr, char *new_depend)
{
static int select_hetero = -1;
int rc = SLURM_SUCCESS;
uint16_t depend_type = 0;
char *tok, *new_array_dep, *sep_ptr, *sep_ptr2 = NULL;
list_t *new_depend_list = NULL;
depend_spec_t *dep_ptr;
bool or_flag = false;
if (job_ptr->details == NULL)
return EINVAL;
if (select_hetero == -1) {
/*
* Determine if the select plugin supports heterogeneous
* GRES allocations (count differ by node): 1=yes, 0=no
*/
if (xstrstr(slurm_conf.select_type, "cons_tres"))
select_hetero = 1;
else
select_hetero = 0;
}
/* Clear dependencies on NULL, "0", or empty dependency input */
job_ptr->details->expanding_jobid = 0;
if ((new_depend == NULL) || (new_depend[0] == '\0') ||
((new_depend[0] == '0') && (new_depend[1] == '\0'))) {
xfree(job_ptr->details->dependency);
FREE_NULL_LIST(job_ptr->details->depend_list);
return rc;
}
new_depend_list = list_create(xfree_ptr);
if ((new_array_dep = _xlate_array_dep(new_depend)))
tok = new_array_dep;
else
tok = new_depend;
/* validate new dependency string */
while (rc == SLURM_SUCCESS) {
/* test singleton dependency flag */
if (xstrncasecmp(tok, "singleton", 9) == 0) {
uint32_t state;
tok += 9; /* skip past "singleton" */
depend_type = SLURM_DEPEND_SINGLETON;
if (_parse_depend_state(&tok, &state)) {
rc = ESLURM_DEPENDENCY;
break;
}
if (disable_remote_singleton &&
!fed_mgr_is_origin_job(job_ptr)) {
/* Singleton disabled for non-origin cluster */
} else {
dep_ptr = xmalloc(sizeof(depend_spec_t));
dep_ptr->depend_state = state;
dep_ptr->depend_type = depend_type;
/* dep_ptr->job_id = 0; set by xmalloc */
/* dep_ptr->job_ptr = NULL; set by xmalloc */
/* dep_ptr->singleton_bits = 0;set by xmalloc */
_add_dependency_to_list(new_depend_list,
dep_ptr);
}
if (tok[0] == ',') {
tok++;
continue;
} else if (tok[0] == '?') {
tok++;
or_flag = true;
continue;
}
if (tok[0] != '\0')
rc = ESLURM_DEPENDENCY;
break;
}
/* Test for old format, just a job ID */
sep_ptr = strchr(tok, ':');
if ((sep_ptr == NULL) && (tok[0] >= '0') && (tok[0] <= '9')) {
_parse_dependency_jobid_old(job_ptr, new_depend_list,
&sep_ptr, tok, &rc);
if (rc)
break;
if (sep_ptr && (sep_ptr[0] == ',')) {
tok = sep_ptr + 1;
continue;
} else {
break;
}
} else if (sep_ptr == NULL) {
rc = ESLURM_DEPENDENCY;
break;
}
/* New format, <test>:job_ID */
if (!xstrncasecmp(tok, "afternotok:", 11))
depend_type = SLURM_DEPEND_AFTER_NOT_OK;
else if (!xstrncasecmp(tok, "aftercorr:", 10))
depend_type = SLURM_DEPEND_AFTER_CORRESPOND;
else if (!xstrncasecmp(tok, "afterany:", 9))
depend_type = SLURM_DEPEND_AFTER_ANY;
else if (!xstrncasecmp(tok, "afterok:", 8))
depend_type = SLURM_DEPEND_AFTER_OK;
else if (!xstrncasecmp(tok, "afterburstbuffer:", 11))
depend_type = SLURM_DEPEND_BURST_BUFFER;
else if (!xstrncasecmp(tok, "after:", 6))
depend_type = SLURM_DEPEND_AFTER;
else if (!xstrncasecmp(tok, "expand:", 7)) {
if (!permit_job_expansion()) {
rc = ESLURM_NOT_SUPPORTED;
break;
}
depend_type = SLURM_DEPEND_EXPAND;
} else {
rc = ESLURM_DEPENDENCY;
break;
}
sep_ptr++; /* skip over ":" */
_parse_dependency_jobid_new(job_ptr, new_depend_list, sep_ptr,
&sep_ptr2, tok, depend_type,
select_hetero, &rc);
if (sep_ptr2 && (sep_ptr2[0] == ',')) {
tok = sep_ptr2 + 1;
} else if (sep_ptr2 && (sep_ptr2[0] == '?')) {
tok = sep_ptr2 + 1;
or_flag = true;
} else {
break;
}
}
if (rc == SLURM_SUCCESS) {
/* test for circular dependencies (e.g. A -> B -> A) */
(void) _scan_depend(NULL, job_ptr);
if (_scan_depend(new_depend_list, job_ptr))
rc = ESLURM_CIRCULAR_DEPENDENCY;
}
if (rc == SLURM_SUCCESS) {
FREE_NULL_LIST(job_ptr->details->depend_list);
job_ptr->details->depend_list = new_depend_list;
_depend_list2str(job_ptr, or_flag);
if (slurm_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
print_job_dependency(job_ptr, __func__);
} else {
FREE_NULL_LIST(new_depend_list);
}
xfree(new_array_dep);
return rc;
}
static int _foreach_scan_depend(void *x, void *arg)
{
depend_spec_t *dep_ptr = x;
test_job_dep_t *test_job_dep = arg;
job_record_t *job_ptr = test_job_dep->job_ptr;
if (dep_ptr->job_id == 0) /* Singleton */
return 0;
/*
* We can't test for circular dependencies if the job_ptr
* wasn't found - the job may not be on this cluster, or the
* job was already purged when the dependency submitted,
* or the job just didn't exist.
*/
if (!dep_ptr->job_ptr)
return 0;
if ((test_job_dep->changed = _depends_on_same_job(
job_ptr, dep_ptr->job_ptr,
dep_ptr->job_id,
dep_ptr->array_task_id)))
return -1;
else if (dep_ptr->job_ptr->magic != JOB_MAGIC)
return 0; /* purged job, ptr not yet cleared */
else if (!IS_JOB_FINISHED(dep_ptr->job_ptr) &&
dep_ptr->job_ptr->details &&
dep_ptr->job_ptr->details->depend_list) {
test_job_dep->changed = _scan_depend(
dep_ptr->job_ptr->details->depend_list,
job_ptr);
if (test_job_dep->changed) {
info("circular dependency: %pJ is dependent upon %pJ",
dep_ptr->job_ptr, job_ptr);
return -1;
}
}
return 0;
}
/* Return true if the job job_ptr is found in dependency_list.
* Pass NULL dependency list to clear the counter.
* Execute recursively for each dependent job */
static bool _scan_depend(list_t *dependency_list, job_record_t *job_ptr)
{
static int job_counter = 0;
test_job_dep_t test_job_dep = {
.job_ptr = job_ptr,
};
if (dependency_list == NULL) {
job_counter = 0;
return false;
} else if (job_counter++ >= max_depend_depth) {
return false;
}
xassert(job_ptr);
(void) list_for_each(dependency_list,
_foreach_scan_depend,
&test_job_dep);
return test_job_dep.changed;
}
static int _foreach_delayed_job_start_time(void *x, void *arg)
{
job_record_t *job_q_ptr = x;
delay_start_t *delay_start = arg;
job_record_t *job_ptr = delay_start->job_ptr;
uint32_t job_size_cpus, job_size_nodes, job_time;
if (!IS_JOB_PENDING(job_q_ptr) || !job_q_ptr->details ||
(job_q_ptr->part_ptr != job_ptr->part_ptr) ||
(job_q_ptr->priority < job_ptr->priority) ||
(job_q_ptr->job_id == job_ptr->job_id) ||
(IS_JOB_REVOKED(job_q_ptr)))
return 0;
if (job_q_ptr->details->min_nodes == NO_VAL)
job_size_nodes = 1;
else
job_size_nodes = job_q_ptr->details->min_nodes;
if (job_q_ptr->details->min_cpus == NO_VAL)
job_size_cpus = 1;
else
job_size_cpus = job_q_ptr->details->min_cpus;
job_size_cpus = MAX(job_size_cpus,
(job_size_nodes * delay_start->part_cpus_per_node));
if (job_q_ptr->time_limit == NO_VAL)
job_time = job_q_ptr->part_ptr->max_time;
else
job_time = job_q_ptr->time_limit;
delay_start->cume_space_time += job_size_cpus * job_time;
return 0;
}
/* If there are higher priority queued jobs in this job's partition, then
* delay the job's expected initiation time as needed to run those jobs.
* NOTE: This is only a rough estimate of the job's start time as it ignores
* job dependencies, feature requirements, specific node requirements, etc. */
static void _delayed_job_start_time(job_record_t *job_ptr)
{
uint32_t part_node_cnt, part_cpu_cnt;
delay_start_t delay_start = {
.job_ptr = job_ptr,
.part_cpus_per_node = 1,
};
if (job_ptr->part_ptr == NULL)
return;
part_node_cnt = job_ptr->part_ptr->total_nodes;
part_cpu_cnt = job_ptr->part_ptr->total_cpus;
if (part_cpu_cnt > part_node_cnt)
delay_start.part_cpus_per_node = part_cpu_cnt / part_node_cnt;
(void) list_for_each(job_list,
_foreach_delayed_job_start_time,
&delay_start);
delay_start.cume_space_time /= part_cpu_cnt;/* Factor out size */
delay_start.cume_space_time *= 60; /* Minutes to seconds */
debug2("Increasing estimated start of %pJ by %"PRIu64" secs",
job_ptr, delay_start.cume_space_time);
job_ptr->start_time += delay_start.cume_space_time;
}
static int _foreach_add_to_preemptee_job_id(void *x, void *arg)
{
job_record_t *job_ptr = x;
will_run_response_msg_t *resp_data = arg;
uint32_t *preemptee_jid = xmalloc(sizeof(uint32_t));
(*preemptee_jid) = job_ptr->job_id;
if (!resp_data->preemptee_job_id)
resp_data->preemptee_job_id = list_create(xfree_ptr);
list_append(resp_data->preemptee_job_id, preemptee_jid);
return 0;
}
static int _foreach_job_start_data_part(void *x, void *arg)
{
part_record_t *part_ptr = x;
job_start_data_t *job_start_data = arg;
job_record_t *job_ptr = job_start_data->job_ptr;
bitstr_t *active_bitmap = NULL, *avail_bitmap = NULL;
bitstr_t *resv_bitmap = NULL;
uint32_t min_nodes, max_nodes, req_nodes;
int rc2 = SLURM_SUCCESS;
time_t start_res, orig_start_time = (time_t) 0;
list_t *preemptee_candidates = NULL, *preemptee_job_list = NULL;
bool resv_overlap = false;
resv_exc_t resv_exc = { 0 };
job_start_data->rc = SLURM_SUCCESS;
if (!part_ptr) {
job_start_data->rc = ESLURM_INVALID_PARTITION_NAME;
return -1;
}
if (job_ptr->details->req_nodes && job_ptr->details->req_nodes[0]) {
if (node_name2bitmap(job_ptr->details->req_nodes, false,
&avail_bitmap, NULL)) {
job_start_data->rc = ESLURM_INVALID_NODE_NAME;
return -1;
}
} else {
/* assume all nodes available to job for testing */
avail_bitmap = node_conf_get_active_bitmap();
}
/* Consider only nodes in this job's partition */
if (part_ptr->node_bitmap)
bit_and(avail_bitmap, part_ptr->node_bitmap);
else
job_start_data->rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
if (job_req_node_filter(job_ptr, avail_bitmap, true))
job_start_data->rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
if (job_ptr->details->exc_node_bitmap) {
bit_and_not(avail_bitmap, job_ptr->details->exc_node_bitmap);
}
if (job_ptr->details->req_node_bitmap) {
if (!bit_super_set(job_ptr->details->req_node_bitmap,
avail_bitmap)) {
job_start_data->rc =
ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
}
}
/* Enforce reservation: access control, time and nodes */
if (job_ptr->details->begin_time &&
(job_ptr->details->begin_time > job_start_data->now))
start_res = job_ptr->details->begin_time;
else
start_res = job_start_data->now;
rc2 = job_test_resv(job_ptr, &start_res, true, &resv_bitmap,
&resv_exc, &resv_overlap, false);
if (rc2 != SLURM_SUCCESS) {
FREE_NULL_BITMAP(avail_bitmap);
reservation_delete_resv_exc_parts(&resv_exc);
job_start_data->rc = rc2;
if ((rc2 == ESLURM_INVALID_QOS) ||
(rc2 == ESLURM_INVALID_PARTITION_NAME))
return 0;
return -1;
}
bit_and(avail_bitmap, resv_bitmap);
FREE_NULL_BITMAP(resv_bitmap);
/* Only consider nodes that are not DOWN or DRAINED */
bit_and(avail_bitmap, avail_node_bitmap);
if (job_start_data->rc == SLURM_SUCCESS) {
int test_fini = -1;
uint8_t save_share_res, save_whole_node;
/* On BlueGene systems don't adjust the min/max node limits
here. We are working on midplane values. */
min_nodes = MAX(job_ptr->details->min_nodes,
part_ptr->min_nodes);
if (job_ptr->details->max_nodes == 0)
max_nodes = part_ptr->max_nodes;
else
max_nodes = MIN(job_ptr->details->max_nodes,
part_ptr->max_nodes);
max_nodes = MIN(max_nodes, 500000); /* prevent overflows */
if (!job_ptr->limit_set.tres[TRES_ARRAY_NODE] &&
job_ptr->details->max_nodes)
req_nodes = max_nodes;
else
req_nodes = min_nodes;
preemptee_candidates = slurm_find_preemptable_jobs(job_ptr);
/* The orig_start is based upon the backfill scheduler data
* and considers all higher priority jobs. The logic below
* only considers currently running jobs, so the expected
* start time will almost certainly be earlier and not as
* accurate, but this algorithm is much faster. */
orig_start_time = job_ptr->start_time;
build_active_feature_bitmap(job_ptr, avail_bitmap,
&active_bitmap);
if (active_bitmap) {
job_start_data->rc = select_g_job_test(
job_ptr, active_bitmap,
min_nodes, max_nodes, req_nodes,
SELECT_MODE_WILL_RUN,
preemptee_candidates,
&preemptee_job_list,
&resv_exc,
NULL);
if (job_start_data->rc == SLURM_SUCCESS) {
FREE_NULL_BITMAP(avail_bitmap);
avail_bitmap = active_bitmap;
active_bitmap = NULL;
test_fini = 1;
} else {
FREE_NULL_BITMAP(active_bitmap);
save_share_res = job_ptr->details->share_res;
save_whole_node = job_ptr->details->whole_node;
job_ptr->details->share_res = 0;
job_ptr->details->whole_node |=
WHOLE_NODE_REQUIRED;
test_fini = 0;
}
}
if (test_fini != 1) {
job_start_data->rc = select_g_job_test(
job_ptr, avail_bitmap,
min_nodes, max_nodes, req_nodes,
SELECT_MODE_WILL_RUN,
preemptee_candidates,
&preemptee_job_list,
&resv_exc,
NULL);
if (test_fini == 0) {
job_ptr->details->share_res = save_share_res;
job_ptr->details->whole_node = save_whole_node;
}
}
}
if (job_start_data->rc == SLURM_SUCCESS) {
will_run_response_msg_t *resp_data;
resp_data = xmalloc(sizeof(will_run_response_msg_t));
resp_data->job_id = job_ptr->job_id;
resp_data->proc_cnt = job_ptr->total_cpus;
_delayed_job_start_time(job_ptr);
resp_data->start_time = MAX(job_ptr->start_time,
orig_start_time);
resp_data->start_time = MAX(resp_data->start_time, start_res);
job_ptr->start_time = 0; /* restore pending job start time */
resp_data->node_list = bitmap2node_name(avail_bitmap);
resp_data->part_name = xstrdup(part_ptr->name);
if (preemptee_job_list)
(void) list_for_each(preemptee_job_list,
_foreach_add_to_preemptee_job_id,
resp_data);
*job_start_data->resp = resp_data;
} else {
job_start_data->rc = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE;
}
FREE_NULL_LIST(preemptee_candidates);
FREE_NULL_LIST(preemptee_job_list);
FREE_NULL_BITMAP(avail_bitmap);
reservation_delete_resv_exc_parts(&resv_exc);
if (job_start_data->rc)
return 0;
return -1;
}
/*
* Determine if a pending job will run using only the specified nodes, build
* response message and return SLURM_SUCCESS on success. Otherwise return an
* error code. Caller must free response message.
*/
extern int job_start_data(job_record_t *job_ptr,
will_run_response_msg_t **resp)
{
job_start_data_t job_start_data = {
.job_ptr = job_ptr,
.now = time(NULL),
.resp = resp,
};
if (job_ptr == NULL)
return ESLURM_INVALID_JOB_ID;
/*
* NOTE: Do not use IS_JOB_PENDING since that doesn't take
* into account the COMPLETING FLAG which we need to since we don't want
* to schedule a requeued job until it is actually done completing
* the first time.
*/
if ((job_ptr->details == NULL) || (job_ptr->job_state != JOB_PENDING))
return ESLURM_DISABLED;
if (job_ptr->part_ptr_list)
(void) list_for_each(job_ptr->part_ptr_list,
_foreach_job_start_data_part,
&job_start_data);
else
(void) _foreach_job_start_data_part(job_ptr->part_ptr,
&job_start_data);
return job_start_data.rc;
}
/*
* epilog_slurmctld - execute the epilog_slurmctld for a job that has just
* terminated.
* IN job_ptr - pointer to job that has been terminated
*/
extern void epilog_slurmctld(job_record_t *job_ptr)
{
xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
prep_g_epilog_slurmctld(job_ptr);
}
/*
* Determine which nodes must be rebooted for a job
* IN job_ptr - pointer to job that will be initiated
* IN/OUT reboot_features - features that should be applied to the node on
* reboot. Caller must xfree().
* RET bitmap of nodes requiring a reboot for NodeFeaturesPlugin or NULL if none
*/
extern bitstr_t *node_features_reboot(job_record_t *job_ptr,
char **reboot_features)
{
bitstr_t *active_bitmap = NULL, *boot_node_bitmap = NULL;
bitstr_t *feature_node_bitmap, *tmp_bitmap;
xassert(reboot_features);
xassert(!(*reboot_features)); /* It needs to start out NULL */
if ((node_features_g_count() == 0) ||
!node_features_g_user_update(job_ptr->user_id))
return NULL;
/*
* Check if all features supported with AND/OR combinations
*/
build_active_feature_bitmap(job_ptr, job_ptr->node_bitmap,
&active_bitmap);
if (active_bitmap == NULL) /* All nodes have desired features */
return NULL;
FREE_NULL_BITMAP(active_bitmap);
/*
* If some MOR/XAND option, filter out only first set of features
* for NodeFeaturesPlugin
*/
feature_node_bitmap = node_features_g_get_node_bitmap();
if (feature_node_bitmap == NULL) /* No nodes under NodeFeaturesPlugin */
return NULL;
*reboot_features = node_features_g_job_xlate(
job_ptr->details->features_use,
job_ptr->details->feature_list_use,
job_ptr->node_bitmap);
tmp_bitmap = build_active_feature_bitmap2(*reboot_features);
boot_node_bitmap = bit_copy(job_ptr->node_bitmap);
bit_and(boot_node_bitmap, feature_node_bitmap);
FREE_NULL_BITMAP(feature_node_bitmap);
if (tmp_bitmap) {
bit_and_not(boot_node_bitmap, tmp_bitmap);
FREE_NULL_BITMAP(tmp_bitmap);
}
if (bit_ffs(boot_node_bitmap) == -1)
FREE_NULL_BITMAP(boot_node_bitmap);
return boot_node_bitmap;
}
/*
* reboot_job_nodes - Reboot the compute nodes allocated to a job.
* Also change the modes of KNL nodes for node_features/knl_generic plugin.
* IN job_ptr - pointer to job that will be initiated
* RET SLURM_SUCCESS(0) or error code
*/
static void _send_reboot_msg(bitstr_t *node_bitmap, char *features,
uint16_t protocol_version)
{
agent_arg_t *reboot_agent_args = NULL;
reboot_msg_t *reboot_msg;
hostlist_t *hostlist;
reboot_agent_args = xmalloc(sizeof(agent_arg_t));
reboot_agent_args->msg_type = REQUEST_REBOOT_NODES;
reboot_agent_args->retry = 0;
reboot_agent_args->node_count = 0;
reboot_agent_args->protocol_version = protocol_version;
if ((hostlist = bitmap2hostlist(node_bitmap))) {
reboot_agent_args->hostlist = hostlist;
reboot_agent_args->node_count = hostlist_count(hostlist);
}
reboot_msg = xmalloc(sizeof(reboot_msg_t));
slurm_init_reboot_msg(reboot_msg, false);
reboot_agent_args->msg_args = reboot_msg;
reboot_msg->features = xstrdup(features);
set_agent_arg_r_uid(reboot_agent_args, SLURM_AUTH_UID_ANY);
agent_queue_request(reboot_agent_args);
}
static void _do_reboot(bool power_save_on, bitstr_t *node_bitmap,
job_record_t *job_ptr, char *reboot_features,
uint16_t protocol_version)
{
xassert(node_bitmap);
if (bit_ffs(node_bitmap) == -1)
return;
if (power_save_on)
power_job_reboot(node_bitmap, job_ptr, reboot_features);
else
_send_reboot_msg(node_bitmap, reboot_features,
protocol_version);
if (get_log_level() >= LOG_LEVEL_DEBUG) {
char *nodes = bitmap2node_name(node_bitmap);
if (nodes) {
debug("%s: reboot nodes %s features %s",
__func__, nodes,
reboot_features ? "reboot_features" : "N/A");
} else {
error("%s: bitmap2nodename", __func__);
}
xfree(nodes);
}
}
static void _set_reboot_features_active(bitstr_t *node_bitmap,
char *reboot_features)
{
node_record_t *node_ptr;
for (int i = 0; (node_ptr = next_node_bitmap(node_bitmap, &i)); i++) {
char *tmp_feature;
tmp_feature = node_features_g_node_xlate(reboot_features,
node_ptr->features_act,
node_ptr->features, i);
xfree(node_ptr->features_act);
node_ptr->features_act = tmp_feature;
(void) update_node_active_features(node_ptr->name,
node_ptr->features_act,
FEATURE_MODE_IND);
}
}
extern void reboot_job_nodes(job_record_t *job_ptr)
{
node_record_t *node_ptr;
time_t now = time(NULL);
bitstr_t *boot_node_bitmap = NULL, *feature_node_bitmap = NULL;
bitstr_t *non_feature_node_bitmap = NULL;
char *reboot_features = NULL;
uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
static bool power_save_on = false;
static time_t sched_update = 0;
static bool logged = false;
if (sched_update != slurm_conf.last_update) {
power_save_on = power_save_test();
sched_update = slurm_conf.last_update;
}
if ((job_ptr->details == NULL) || (job_ptr->node_bitmap == NULL))
return;
if (job_ptr->reboot)
boot_node_bitmap = bit_copy(job_ptr->node_bitmap);
else
boot_node_bitmap = node_features_reboot(job_ptr,
&reboot_features);
if (!logged && boot_node_bitmap &&
(!power_save_on &&
((slurm_conf.reboot_program == NULL) ||
(slurm_conf.reboot_program[0] == '\0')))) {
info("%s: Preparing node reboot without power saving and RebootProgram",
__func__);
logged = true;
}
if (boot_node_bitmap &&
job_ptr->details->features_use &&
node_features_g_user_update(job_ptr->user_id)) {
non_feature_node_bitmap = bit_copy(boot_node_bitmap);
/*
* node_features_g_job_xlate is called from
* node_features_reboot, which we may have already called.
* Avoid calling node_features_g_job_xlate twice.
*/
if (!reboot_features) {
reboot_features = node_features_g_job_xlate(
job_ptr->details->features_use,
job_ptr->details->feature_list_use,
job_ptr->node_bitmap);
}
if (reboot_features)
feature_node_bitmap = node_features_g_get_node_bitmap();
if (feature_node_bitmap)
bit_and(feature_node_bitmap, non_feature_node_bitmap);
if (!feature_node_bitmap ||
(bit_ffs(feature_node_bitmap) == -1)) {
/* No KNL nodes to reboot */
FREE_NULL_BITMAP(feature_node_bitmap);
} else {
bit_and_not(non_feature_node_bitmap,
feature_node_bitmap);
if (bit_ffs(non_feature_node_bitmap) == -1) {
/* No non-KNL nodes to reboot */
FREE_NULL_BITMAP(non_feature_node_bitmap);
}
}
}
if (feature_node_bitmap) {
/*
* Update node features now to avoid a race where a
* second job may request that this node gets rebooted
* (in order to get a new active feature) *after* the
* first reboot request but *before* slurmd actually
* starts up. If that would happen then the second job
* would stay configuring forever, waiting for the node
* to reboot even though the node already rebooted.
*
* By setting the node's active features right now, any
* other job that wants that active feature can be
* scheduled onto this node, which will also already be
* rebooting, so those other jobs won't send additional
* reboot requests to change the feature.
*/
_set_reboot_features_active(feature_node_bitmap,
reboot_features);
}
/*
* Assume the power save thread will handle the boot if any of the nodes
* are cloud nodes. In KNL/features, the node is being rebooted and not
* brought up from being powered down.
*/
if ((boot_node_bitmap == NULL) ||
bit_overlap_any(cloud_node_bitmap, job_ptr->node_bitmap)) {
/* launch_job() when all nodes have booted */
if (bit_overlap_any(power_down_node_bitmap,
job_ptr->node_bitmap) ||
bit_overlap_any(booting_node_bitmap,
job_ptr->node_bitmap)) {
/* Reset job start time when nodes are booted */
job_state_set_flag(job_ptr, (JOB_CONFIGURING |
JOB_POWER_UP_NODE));
job_ptr->wait_all_nodes = 1;
}
goto cleanup;
}
/* Reset job start time when nodes are booted */
job_state_set_flag(job_ptr, (JOB_CONFIGURING | JOB_POWER_UP_NODE));
/* launch_job() when all nodes have booted */
job_ptr->wait_all_nodes = 1;
/* Modify state information for all nodes, KNL and others */
for (int i = 0; (node_ptr = next_node_bitmap(boot_node_bitmap, &i));
i++) {
if (protocol_version > node_ptr->protocol_version)
protocol_version = node_ptr->protocol_version;
if (IS_NODE_POWERED_DOWN(node_ptr)) {
node_ptr->node_state &= (~NODE_STATE_POWERED_DOWN);
clusteracct_storage_g_node_up(acct_db_conn, node_ptr,
now);
}
node_ptr->node_state |= NODE_STATE_NO_RESPOND;
node_ptr->node_state |= NODE_STATE_POWERING_UP;
bit_clear(avail_node_bitmap, i);
bit_clear(power_down_node_bitmap, i);
bit_set(booting_node_bitmap, i);
node_ptr->boot_req_time = now;
}
if (feature_node_bitmap) {
/* Reboot nodes to change KNL NUMA and/or MCDRAM mode */
_do_reboot(power_save_on, feature_node_bitmap, job_ptr,
reboot_features, protocol_version);
bit_and_not(boot_node_bitmap, feature_node_bitmap);
}
if (non_feature_node_bitmap) {
/* Reboot nodes with no feature changes */
_do_reboot(power_save_on, non_feature_node_bitmap, job_ptr,
NULL, protocol_version);
bit_and_not(boot_node_bitmap, non_feature_node_bitmap);
}
if (job_ptr->reboot) {
/* Reboot the remaining nodes blindly as per direct request */
_do_reboot(power_save_on, boot_node_bitmap, job_ptr, NULL,
protocol_version);
}
cleanup:
xfree(reboot_features);
FREE_NULL_BITMAP(boot_node_bitmap);
FREE_NULL_BITMAP(non_feature_node_bitmap);
FREE_NULL_BITMAP(feature_node_bitmap);
}
/*
* Deferring this setup ensures that all calling paths into select_nodes()
* have had a chance to update all appropriate job records.
* This works since select_nodes() will always be holding the job_write lock,
* and thus this new thread will be blocked waiting to acquire job_write
* until that has completed.
* For HetJobs in particular, this is critical to ensure that all components
* have been setup properly before prolog_slurmctld actually runs.
*/
static void *_start_prolog_slurmctld_thread(void *x)
{
slurmctld_lock_t node_write_lock = {
.conf = READ_LOCK, .job = WRITE_LOCK,
.node = WRITE_LOCK, .fed = READ_LOCK };
uint32_t *job_id = (uint32_t *) x;
job_record_t *job_ptr;
lock_slurmctld(node_write_lock);
if (!(job_ptr = find_job_record(*job_id))) {
error("%s: missing JobId=%u", __func__, *job_id);
unlock_slurmctld(node_write_lock);
return NULL;
}
prep_g_prolog_slurmctld(job_ptr);
/*
* No async prolog_slurmctld threads running, so decrement now to move
* on with the job launch.
*/
if (!job_ptr->prep_prolog_cnt) {
debug2("%s: no async prolog_slurmctld running", __func__);
prolog_running_decr(job_ptr);
}
unlock_slurmctld(node_write_lock);
xfree(job_id);
return NULL;
}
/*
* prolog_slurmctld - execute the prolog_slurmctld for a job that has just
* been allocated resources.
* IN job_ptr - pointer to job that will be initiated
*/
extern void prolog_slurmctld(job_record_t *job_ptr)
{
uint32_t *job_id;
xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
if (!prep_g_required(PREP_PROLOG_SLURMCTLD))
return;
job_ptr->details->prolog_running++;
job_state_set_flag(job_ptr, JOB_CONFIGURING);
job_id = xmalloc(sizeof(*job_id));
*job_id = job_ptr->job_id;
slurm_thread_create_detached(_start_prolog_slurmctld_thread, job_id);
}
/* Decrement a job's prolog_running counter and launch the job if zero */
extern void prolog_running_decr(job_record_t *job_ptr)
{
xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
xassert(verify_lock(FED_LOCK, READ_LOCK));
if (!job_ptr)
return;
if (job_ptr->details && job_ptr->details->prolog_running &&
(--job_ptr->details->prolog_running > 0))
return;
/* Federated job notified the origin that the job is to be requeued,
* need to wait for this job to be cancelled. */
if (job_ptr->job_state & JOB_REQUEUE_FED)
return;
if (IS_JOB_CONFIGURING(job_ptr) && test_job_nodes_ready(job_ptr)) {
info("%s: Configuration for %pJ is complete",
__func__, job_ptr);
job_config_fini(job_ptr);
if (job_ptr->batch_flag &&
(IS_JOB_RUNNING(job_ptr) || IS_JOB_SUSPENDED(job_ptr))) {
launch_job(job_ptr);
}
}
}
static int _foreach_feature_list_copy(void *x, void *arg)
{
job_feature_t *feat_src = x, *feat_dest;
list_t **feature_list_dest = arg;
feat_dest = xmalloc(sizeof(job_feature_t));
memcpy(feat_dest, feat_src, sizeof(job_feature_t));
if (feat_src->node_bitmap_active)
feat_dest->node_bitmap_active =
bit_copy(feat_src->node_bitmap_active);
if (feat_src->node_bitmap_avail)
feat_dest->node_bitmap_avail =
bit_copy(feat_src->node_bitmap_avail);
feat_dest->name = xstrdup(feat_src->name);
list_append(*feature_list_dest, feat_dest);
return 0;
}
/*
* Copy a job's feature list
* IN feature_list_src - a job's depend_lst
* RET copy of feature_list_src, must be freed by caller
*/
extern list_t *feature_list_copy(list_t *feature_list_src)
{
list_t *feature_list_dest = NULL;
if (!feature_list_src)
return feature_list_dest;
feature_list_dest = list_create(feature_list_delete);
(void) list_for_each(feature_list_src,
_foreach_feature_list_copy,
&feature_list_dest);
return feature_list_dest;
}
/*
* IN/OUT convert_to_matching_or -
* If at least one changeable feature is requested, then all the nodes
* in the job allocation need to match the same feature set.
*
* As an input: if true, then mark all '|' operators as matching OR, and also
* imply that it is surrounded by brackets by setting bracket=1 for all the
* features except the last one. The AND operators are still treated as normal
* AND (not XAND), as if they were surrounded by parentheses within the
* brackets.
*
* As an output: if multiple changeable features are requested,
* and bar (OR) was requested, then set this to true.
*
* This is needed for the scheduling logic with parentheses and matching OR.
*/
static int _feature_string2list(char *features, char *debug_str,
list_t **feature_list,
bool *convert_to_matching_or)
{
int rc = SLURM_SUCCESS;
int bracket = 0, count = 0, i, paren = 0;
int brack_set_count = 0;
char *tmp_requested;
char *str_ptr, *feature = NULL;
bool has_changeable = false;
bool has_or = false;
bool has_asterisk = false;
xassert(feature_list);
/* Use of commas separator is a common error. Replace them with '&' */
while ((str_ptr = strstr(features, ",")))
str_ptr[0] = '&';
tmp_requested = xstrdup(features);
*feature_list = list_create(feature_list_delete);
for (i = 0; ; i++) {
job_feature_t *feat;
if (tmp_requested[i] == '*') {
tmp_requested[i] = '\0';
count = strtol(&tmp_requested[i+1], &str_ptr, 10);
if (!bracket)
has_asterisk = true;
if ((feature == NULL) || (count <= 0) || (paren != 0)) {
verbose("%s constraint invalid, '*' must be requested with a positive integer, and after a feature or parentheses: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
i = str_ptr - tmp_requested - 1;
} else if (tmp_requested[i] == '&') {
tmp_requested[i] = '\0';
if (feature == NULL) {
verbose("%s constraint requested '&' without a feature: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
feat = xmalloc(sizeof(job_feature_t));
feat->bracket = *convert_to_matching_or ? 1 : bracket;
feat->name = xstrdup(feature);
feat->changeable = node_features_g_changeable_feature(
feature);
feat->count = count;
feat->paren = paren;
has_changeable |= feat->changeable;
if (paren || *convert_to_matching_or)
feat->op_code = FEATURE_OP_AND;
else if (bracket)
feat->op_code = FEATURE_OP_XAND;
else
feat->op_code = FEATURE_OP_AND;
list_append(*feature_list, feat);
feature = NULL;
count = 0;
} else if (tmp_requested[i] == '|') {
bool changeable;
tmp_requested[i] = '\0';
if (feature == NULL) {
verbose("%s constraint requested '|' without a feature: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
changeable = node_features_g_changeable_feature(
feature);
feat = xmalloc(sizeof(job_feature_t));
feat->bracket = *convert_to_matching_or ? 1 : bracket;
feat->name = xstrdup(feature);
feat->changeable = changeable;
feat->count = count;
feat->paren = paren;
has_changeable |= changeable;
has_or = true;
/*
* The if-else-if is like this for priority:
* - paren is highest priority
* - then bracket
* - then outside of paren/bracket
*/
if (paren && !(*convert_to_matching_or))
feat->op_code = FEATURE_OP_OR;
else if (bracket || changeable ||
(*convert_to_matching_or))
feat->op_code = FEATURE_OP_MOR;
else
feat->op_code = FEATURE_OP_OR;
list_append(*feature_list, feat);
feature = NULL;
count = 0;
} else if (tmp_requested[i] == '[') {
tmp_requested[i] = '\0';
if ((feature != NULL) || bracket || paren) {
verbose("%s constraint has imbalanced brackets: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
bracket++;
brack_set_count++;
if (brack_set_count > 1) {
verbose("%s constraint has more than one set of brackets: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
} else if (tmp_requested[i] == ']') {
tmp_requested[i] = '\0';
if ((feature == NULL) || (bracket == 0) || paren) {
verbose("%s constraint has imbalanced brackets: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
bracket--;
} else if (tmp_requested[i] == '(') {
tmp_requested[i] = '\0';
if ((feature != NULL) || paren) {
verbose("%s constraint has imbalanced parentheses: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
paren++;
} else if (tmp_requested[i] == ')') {
tmp_requested[i] = '\0';
if ((feature == NULL) || (paren == 0)) {
verbose("%s constraint has imbalanced parentheses: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
paren--;
} else if (tmp_requested[i] == '\0') {
if (feature) {
feat = xmalloc(sizeof(job_feature_t));
feat->bracket = bracket;
feat->name = xstrdup(feature);
feat->changeable = node_features_g_changeable_feature(
feature);
feat->count = count;
feat->paren = paren;
feat->op_code = FEATURE_OP_END;
list_append(*feature_list, feat);
has_changeable |= feat->changeable;
}
break;
} else if (feature == NULL) {
feature = &tmp_requested[i];
} else if (i && (tmp_requested[i - 1] == '\0')) {
/* ')' and ']' should be followed by a token. */
verbose("%s constraint has an unexpected character: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
}
if (bracket != 0) {
verbose("%s constraint has unbalanced brackets: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
if (paren != 0) {
verbose("%s constraint has unbalanced parenthesis: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
if (has_asterisk && (list_count(*feature_list) > 1)) {
verbose("%s constraint has '*' outside of brackets with more than one feature: %s",
debug_str, features);
rc = ESLURM_INVALID_FEATURE;
goto fini;
}
*convert_to_matching_or = (has_changeable && has_or);
fini:
if (rc != SLURM_SUCCESS) {
FREE_NULL_LIST(*feature_list);
info("%s invalid constraint: %s",
debug_str, features);
}
xfree(tmp_requested);
return rc;
}
/*
* build_feature_list - Translate a job's feature string into a feature_list
* NOTE: This function is also used for reservations if is_reservation is true
* and for job_desc_msg_t if job_id == 0
* IN details->features
* OUT details->feature_list
* RET error code
*/
extern int build_feature_list(job_record_t *job_ptr, bool prefer,
bool is_reservation)
{
job_details_t *detail_ptr = job_ptr->details;
list_t **feature_list;
int rc;
int feature_err;
bool convert_to_matching_or = false;
valid_feature_t valid_feature = {
.rc = SLURM_SUCCESS,
};
/* no hard constraints */
if (!detail_ptr || (!detail_ptr->features && !detail_ptr->prefer)) {
if (job_ptr->batch_features)
return ESLURM_BATCH_CONSTRAINT;
return SLURM_SUCCESS;
}
if (prefer) {
valid_feature.features = detail_ptr->prefer;
feature_list = &detail_ptr->prefer_list;
feature_err = ESLURM_INVALID_PREFER;
} else {
valid_feature.features = detail_ptr->features;
feature_list = &detail_ptr->feature_list;
feature_err = ESLURM_INVALID_FEATURE;
}
if (!valid_feature.features) /* The other constraint is non NULL. */
return SLURM_SUCCESS;
if (*feature_list) /* already processed */
return SLURM_SUCCESS;
if (is_reservation)
valid_feature.debug_str = xstrdup("Reservation");
else if (!job_ptr->job_id)
valid_feature.debug_str = xstrdup("Job specs");
else
valid_feature.debug_str =
xstrdup_printf("JobId=%u", job_ptr->job_id);
valid_feature.can_reboot =
node_features_g_user_update(job_ptr->user_id);
rc = _feature_string2list(valid_feature.features,
valid_feature.debug_str,
feature_list, &convert_to_matching_or);
if (rc != SLURM_SUCCESS) {
rc = feature_err;
goto fini;
}
if (convert_to_matching_or) {
char *str = NULL;
list_t *feature_sets;
/*
* Restructure the list into a format of AND'ing features in
* parentheses and matching OR each parentheses together. The
* current scheduling logic does not know how to handle matching
* OR inside of parentheses; however, it does know how to handle
* matching OR outside of parentheses, so we restructure the
* feature list to a format the scheduling logic understands.
* This is needed for changeable features which need all nodes
* in the job allocation to match the same feature set, so they
* cannot have any boolean OR in the feature list.
*
* For example, "(a|b)&c" becomes "(a&c)|(b&c)"
*
* Restructure only the feature list; leave the original
* constraint expression intact.
*/
feature_sets = job_features_list2feature_sets(
valid_feature.features,
*feature_list,
false);
list_for_each(feature_sets, job_features_set2str, &str);
FREE_NULL_LIST(feature_sets);
FREE_NULL_LIST(*feature_list);
rc = _feature_string2list(str, valid_feature.debug_str,
feature_list,
&convert_to_matching_or);
if (rc != SLURM_SUCCESS) {
/*
* Something went wrong - we should have caught this
* error the first time we called _feature_string2list.
*/
error("%s: Problem converting feature string %s to matching OR list",
__func__, str);
rc = feature_err;
xfree(str);
goto fini;
}
log_flag(NODE_FEATURES, "%s: Converted %sfeature list:'%s' to matching OR:'%s'",
__func__, prefer ? "prefer " : "",
valid_feature.features, str);
xfree(str);
}
if (job_ptr->batch_features) {
detail_ptr->feature_list_use = *feature_list;
detail_ptr->features_use = valid_feature.features;
rc = _valid_batch_features(job_ptr, valid_feature.can_reboot);
detail_ptr->feature_list_use = NULL;
detail_ptr->features_use = NULL;
if (rc != SLURM_SUCCESS)
goto fini;
}
valid_feature.feature_list = *feature_list;
rc = _valid_feature_list(job_ptr, &valid_feature, is_reservation);
if (rc != SLURM_SUCCESS) {
rc = feature_err;
goto fini;
}
fini:
xfree(valid_feature.debug_str);
return rc;
}
/*
* Delete a record from a job's feature_list
*/
extern void feature_list_delete(void *x)
{
job_feature_t *feature_ptr = (job_feature_t *)x;
xfree(feature_ptr->name);
FREE_NULL_BITMAP(feature_ptr->node_bitmap_active);
FREE_NULL_BITMAP(feature_ptr->node_bitmap_avail);
xfree(feature_ptr);
}
static int _match_job_feature(void *x, void *key)
{
job_feature_t *feat = (job_feature_t *) x;
char *tok = (char *) key;
if (!xstrcmp(feat->name, tok)) /* Found matching feature name */
return 1;
return 0;
}
static int _valid_batch_features(job_record_t *job_ptr, bool can_reboot)
{
char *tmp, *tok, *save_ptr = NULL;
int rc = SLURM_SUCCESS;
bool have_or = false, success_or = false;
if (!job_ptr->batch_features)
return SLURM_SUCCESS;
if (!job_ptr->details || !job_ptr->details->feature_list_use)
return ESLURM_BATCH_CONSTRAINT;
if (strchr(job_ptr->batch_features, '|'))
have_or = true;
tmp = xstrdup(job_ptr->batch_features);
tok = strtok_r(tmp, "&|", &save_ptr);
while (tok) {
if (!list_find_first(job_ptr->details->feature_list_use,
_match_job_feature, tok)) {
rc = ESLURM_BATCH_CONSTRAINT;
break;
}
rc = _valid_node_feature(tok, can_reboot);
if (have_or) {
if (rc == SLURM_SUCCESS)
success_or = true;
/* Ignore failure on some OR components */
} else if (rc != SLURM_SUCCESS) {
rc = ESLURM_BATCH_CONSTRAINT;
break;
}
tok = strtok_r(NULL, "&|", &save_ptr);
}
xfree(tmp);
if (have_or && success_or)
return SLURM_SUCCESS;
return rc;
}
static int _foreach_valid_feature_list(void *x, void *arg)
{
job_feature_t *feat_ptr = x;
valid_feature_t *valid_feature = arg;
if ((feat_ptr->op_code == FEATURE_OP_MOR) ||
(feat_ptr->op_code == FEATURE_OP_XAND)) {
valid_feature->bracket = feat_ptr->paren + 1;
}
if (feat_ptr->paren > valid_feature->paren) {
valid_feature->paren = feat_ptr->paren;
}
if (feat_ptr->paren < valid_feature->paren) {
valid_feature->paren = feat_ptr->paren;
}
if ((valid_feature->rc == SLURM_SUCCESS) &&
!valid_feature->skip_validation) {
valid_feature->rc =
_valid_node_feature(feat_ptr->name,
valid_feature->can_reboot);
if (valid_feature->rc != SLURM_SUCCESS)
verbose("%s feature %s is not usable on any node: %s",
valid_feature->debug_str, feat_ptr->name,
valid_feature->features);
}
if ((feat_ptr->op_code == FEATURE_OP_XAND) && !feat_ptr->count) {
verbose("%s feature %s invalid, count must be used with XAND: %s",
valid_feature->debug_str, feat_ptr->name,
valid_feature->features);
valid_feature->rc = ESLURM_INVALID_FEATURE;
}
if ((feat_ptr->op_code == FEATURE_OP_MOR) && feat_ptr->count) {
verbose("%s feature %s invalid, count must not be used with MOR: %s",
valid_feature->debug_str, feat_ptr->name,
valid_feature->features);
valid_feature->rc = ESLURM_INVALID_FEATURE;
}
/* In brackets, outside of paren */
if ((valid_feature->bracket > valid_feature->paren) &&
((feat_ptr->op_code != FEATURE_OP_MOR) &&
(feat_ptr->op_code != FEATURE_OP_XAND))) {
if (valid_feature->has_xand && !feat_ptr->count) {
valid_feature->rc = ESLURM_INVALID_FEATURE;
verbose("%s feature %s invalid, count must be used with XAND: %s",
valid_feature->debug_str, feat_ptr->name,
valid_feature->features);
}
if (valid_feature->has_mor && feat_ptr->count) {
valid_feature->rc = ESLURM_INVALID_FEATURE;
verbose("%s feature %s invalid, count must not be used with MOR: %s",
valid_feature->debug_str, feat_ptr->name,
valid_feature->features);
}
valid_feature->bracket = 0;
valid_feature->has_xand = false;
valid_feature->has_mor = false;
}
if (feat_ptr->op_code == FEATURE_OP_XAND)
valid_feature->has_xand = true;
if (feat_ptr->op_code == FEATURE_OP_MOR)
valid_feature->has_mor = true;
return 0;
}
static int _valid_feature_list(job_record_t *job_ptr,
valid_feature_t *valid_feature,
bool is_reservation)
{
static time_t sched_update = 0;
static bool ignore_prefer_val = false, ignore_constraint_val = false;
bool is_prefer_list, skip_validation;
if (!valid_feature->feature_list) {
debug2("%s feature list is empty",
valid_feature->debug_str);
return valid_feature->rc;
}
if (sched_update != slurm_conf.last_update) {
sched_update = slurm_conf.last_update;
if (xstrcasestr(slurm_conf.sched_params,
"ignore_prefer_validation"))
ignore_prefer_val = true;
else
ignore_prefer_val = false;
if (xstrcasestr(slurm_conf.sched_params,
"ignore_constraint_validation"))
ignore_constraint_val = true;
else
ignore_constraint_val = false;
}
is_prefer_list = (valid_feature->feature_list ==
job_ptr->details->prefer_list);
skip_validation = (is_prefer_list && ignore_prefer_val) ||
(!is_prefer_list && ignore_constraint_val);
valid_feature->skip_validation = skip_validation;
(void) list_for_each(valid_feature->feature_list,
_foreach_valid_feature_list,
valid_feature);
if (valid_feature->rc == SLURM_SUCCESS) {
debug("%s feature list: %s",
valid_feature->debug_str, valid_feature->features);
} else {
if (is_reservation) {
info("Reservation has invalid feature list: %s",
valid_feature->features);
} else {
if (valid_feature->can_reboot)
info("%s has invalid feature list: %s",
valid_feature->debug_str,
valid_feature->features);
else
info("%s has invalid feature list (%s) or the features are not active and this user cannot reboot to update node features",
valid_feature->debug_str,
valid_feature->features);
}
}
return valid_feature->rc;
}
static int _find_feature_in_list(void *x, void *arg)
{
node_feature_t *feature_ptr = x;
char *feature = arg;
if (!xstrcmp(feature_ptr->name, feature))
return 1;
return 0;
}
/* Validate that job's feature is available on some node(s) */
static int _valid_node_feature(char *feature, bool can_reboot)
{
int rc = ESLURM_INVALID_FEATURE;
list_t *use_list =
can_reboot ? avail_feature_list : active_feature_list;
if (list_find_first(use_list, _find_feature_in_list, feature))
rc = SLURM_SUCCESS;
return rc;
}
#define REBUILD_PENDING SLURM_BIT(0)
#define REBUILD_ACTIVE SLURM_BIT(1)
typedef struct {
uint16_t flags;
job_record_t *job_ptr;
} rebuild_args_t;
static int _build_partition_string(void *object, void *arg) {
part_record_t *part_ptr = object;
rebuild_args_t *args = arg;
uint16_t flags = args->flags;
job_record_t *job_ptr = args->job_ptr;
if (flags & REBUILD_PENDING) {
job_ptr->part_ptr = part_ptr;
flags &= ~(REBUILD_PENDING);
}
if ((flags & REBUILD_ACTIVE) && (part_ptr == job_ptr->part_ptr))
return SLURM_SUCCESS; /* already added */
if (job_ptr->partition)
xstrcat(job_ptr->partition, ",");
xstrcat(job_ptr->partition, part_ptr->name);
return SLURM_SUCCESS;
}
/* If a job can run in multiple partitions, when it is started we want to
* put the name of the partition used _first_ in that list. When slurmctld
* restarts, that will be used to set the job's part_ptr and that will be
* reported to squeue. We leave all of the partitions in the list though,
* so the job can be requeued and have access to them all. */
extern void rebuild_job_part_list(job_record_t *job_ptr)
{
rebuild_args_t arg = {
.job_ptr = job_ptr,
};
xfree(job_ptr->partition);
if (!job_ptr->part_ptr_list) {
job_ptr->partition = xstrdup(job_ptr->part_ptr->name);
last_job_update = time(NULL);
return;
}
if (IS_JOB_RUNNING(job_ptr) || IS_JOB_SUSPENDED(job_ptr)) {
arg.flags |= REBUILD_ACTIVE;
job_ptr->partition = xstrdup(job_ptr->part_ptr->name);
} else if (IS_JOB_PENDING(job_ptr))
arg.flags |= REBUILD_PENDING;
list_for_each(job_ptr->part_ptr_list, _build_partition_string, &arg);
last_job_update = time(NULL);
}
/* cleanup_completing()
*
* Clean up the JOB_COMPLETING flag and eventually
* requeue the job if there is a pending request
* for it. This function assumes the caller has the
* appropriate locks on the job_record.
*/
void cleanup_completing(job_record_t *job_ptr, bool requeue)
{
time_t delay;
if (job_ptr->epilog_running || job_ptr->node_cnt)
return;
log_flag(TRACE_JOBS, "%s: %pJ", __func__, job_ptr);
delay = last_job_update - job_ptr->end_time;
if (delay > 60) {
info("%s: %pJ completion process took %ld seconds",
__func__, job_ptr, (long) delay);
}
license_job_return(job_ptr);
gs_job_fini(job_ptr);
delete_step_records(job_ptr);
job_state_unset_flag(job_ptr, JOB_COMPLETING);
job_hold_requeue(job_ptr);
/*
* Clear alloc tres fields after a requeue. job_set_alloc_tres will
* clear the fields when the job is pending and not completing.
*/
if (IS_JOB_PENDING(job_ptr))
job_set_alloc_tres(job_ptr, false);
/* Job could be pending if the job was requeued due to a node failure */
if (IS_JOB_COMPLETED(job_ptr))
fed_mgr_job_complete(job_ptr, job_ptr->exit_code,
job_ptr->start_time);
if (requeue)
batch_requeue_fini(job_ptr);
}
void main_sched_init(void)
{
if (thread_id_sched)
return;
slurm_thread_create(&thread_id_sched, _sched_agent, NULL);
}
void main_sched_fini(void)
{
if (!thread_id_sched)
return;
slurm_mutex_lock(&sched_mutex);
slurm_cond_broadcast(&sched_cond);
slurm_mutex_unlock(&sched_mutex);
slurm_thread_join(thread_id_sched);
}