| /*****************************************************************************\ |
| * job_scheduler.c - manage the scheduling of pending jobs in priority order |
| * Note there is a global job list (job_list) |
| ***************************************************************************** |
| * Copyright (C) 2002-2006 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Morris Jette <jette1@llnl.gov> |
| * UCRL-CODE-226842. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://www.llnl.gov/linux/slurm/>. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <errno.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include "src/common/list.h" |
| #include "src/common/macros.h" |
| #include "src/common/node_select.h" |
| #include "src/common/xassert.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/slurmctld/agent.h" |
| #include "src/slurmctld/locks.h" |
| #include "src/slurmctld/node_scheduler.h" |
| #include "src/slurmctld/slurmctld.h" |
| #include "src/slurmctld/srun_comm.h" |
| |
| #define MAX_RETRIES 10 |
| |
| struct job_queue { |
| int priority; |
| struct job_record *job_ptr; |
| }; |
| |
| static int _build_job_queue(struct job_queue **job_queue); |
| static void _launch_job(struct job_record *job_ptr); |
| static void _sort_job_queue(struct job_queue *job_queue, |
| int job_queue_size); |
| static char **_xduparray(uint16_t size, char ** array); |
| |
| /* |
| * _build_job_queue - build (non-priority ordered) list of pending jobs |
| * OUT job_queue - pointer to job queue |
| * RET number of entries in job_queue |
| * global: job_list - global list of job records |
| * NOTE: the buffer at *job_queue must be xfreed by the caller |
| */ |
| static int _build_job_queue(struct job_queue **job_queue) |
| { |
| ListIterator job_iterator; |
| struct job_record *job_ptr = NULL; |
| int job_buffer_size, job_queue_size; |
| struct job_queue *my_job_queue; |
| |
| /* build list pending jobs */ |
| job_buffer_size = job_queue_size = 0; |
| job_queue[0] = my_job_queue = NULL; |
| |
| job_iterator = list_iterator_create(job_list); |
| while ((job_ptr = (struct job_record *) list_next(job_iterator))) { |
| xassert (job_ptr->magic == JOB_MAGIC); |
| if ((job_ptr->job_state != JOB_PENDING) || |
| (job_ptr->job_state & JOB_COMPLETING) || |
| (job_ptr->priority == 0)) /* held */ |
| continue; |
| if (!job_independent(job_ptr)) /* waiting for other job */ |
| continue; |
| if (job_buffer_size <= job_queue_size) { |
| job_buffer_size += 50; |
| xrealloc(my_job_queue, job_buffer_size * |
| sizeof(struct job_queue)); |
| } |
| my_job_queue[job_queue_size].job_ptr = job_ptr; |
| my_job_queue[job_queue_size].priority = job_ptr->priority; |
| job_queue_size++; |
| } |
| list_iterator_destroy(job_iterator); |
| |
| job_queue[0] = my_job_queue; |
| return job_queue_size; |
| } |
| |
| /* |
| * job_is_completing - Determine if jobs are in the process of completing. |
| * RET - True of any job is in the process of completing |
| * NOTE: This function can reduce resource fragmentation, which is a |
| * critical issue on Elan interconnect based systems. |
| */ |
| extern bool job_is_completing(void) |
| { |
| bool completing = false; |
| ListIterator job_iterator; |
| struct job_record *job_ptr = NULL; |
| time_t recent = time(NULL) - (slurmctld_conf.kill_wait + 2); |
| |
| job_iterator = list_iterator_create(job_list); |
| while ((job_ptr = (struct job_record *) list_next(job_iterator))) { |
| if ((job_ptr->job_state & JOB_COMPLETING) && |
| (job_ptr->end_time >= recent)) { |
| completing = true; |
| break; |
| } |
| } |
| list_iterator_destroy(job_iterator); |
| |
| return completing; |
| } |
| |
| /* |
| * set_job_elig_time - set the eligible time for pending jobs once their |
| * dependencies are lifted (in job->details->begin_time) |
| */ |
| extern void set_job_elig_time(void) |
| { |
| struct job_record *job_ptr = NULL; |
| struct part_record *part_ptr = NULL; |
| ListIterator job_iterator; |
| slurmctld_lock_t job_write_lock = |
| { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK }; |
| |
| lock_slurmctld(job_write_lock); |
| job_iterator = list_iterator_create(job_list); |
| while ((job_ptr = (struct job_record *) list_next(job_iterator))) { |
| part_ptr = job_ptr->part_ptr; |
| if (job_ptr->job_state != JOB_PENDING) |
| continue; |
| if (part_ptr == NULL) |
| continue; |
| if ((job_ptr->details == NULL) || job_ptr->details->begin_time) |
| continue; |
| if (part_ptr->state_up == 0) |
| continue; |
| if ((job_ptr->time_limit != NO_VAL) && |
| (job_ptr->time_limit > part_ptr->max_time)) |
| continue; |
| if ((job_ptr->details->max_nodes != 0) && |
| ((job_ptr->details->max_nodes < part_ptr->min_nodes) || |
| (job_ptr->details->min_nodes > part_ptr->max_nodes))) |
| continue; |
| if (!job_independent(job_ptr)) |
| continue; |
| } |
| list_iterator_destroy(job_iterator); |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| /* |
| * schedule - attempt to schedule all pending jobs |
| * pending jobs for each partition will be scheduled in priority |
| * order until a request fails |
| * RET count of jobs scheduled |
| * global: job_list - global list of job records |
| * last_job_update - time of last update to job table |
| * Note: We re-build the queue every time. Jobs can not only be added |
| * or removed from the queue, but have their priority or partition |
| * changed with the update_job RPC. In general nodes will be in priority |
| * order (by submit time), so the sorting should be pretty fast. |
| */ |
| int schedule(void) |
| { |
| struct job_queue *job_queue; |
| int i, j, error_code, failed_part_cnt, job_queue_size, job_cnt = 0; |
| struct job_record *job_ptr; |
| struct part_record **failed_parts; |
| /* Locks: Read config, write job, write node, read partition */ |
| slurmctld_lock_t job_write_lock = |
| { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK }; |
| #ifdef HAVE_BG |
| char *ionodes = NULL; |
| char tmp_char[256]; |
| #endif |
| static bool wiki_sched = false; |
| static bool wiki_sched_test = false; |
| time_t now = time(NULL); |
| |
| DEF_TIMERS; |
| |
| START_TIMER; |
| /* don't bother trying to avoid fragmentation with sched/wiki */ |
| if (!wiki_sched_test) { |
| char *sched_type = slurm_get_sched_type(); |
| if ((strcmp(sched_type, "sched/wiki") == 0) |
| || (strcmp(sched_type, "sched/wiki2") == 0)) |
| wiki_sched = true; |
| xfree(sched_type); |
| wiki_sched_test = true; |
| } |
| |
| lock_slurmctld(job_write_lock); |
| /* Avoid resource fragmentation if important */ |
| if ((!wiki_sched) && switch_no_frag() && job_is_completing()) { |
| unlock_slurmctld(job_write_lock); |
| debug("schedule() returning, some job still completing"); |
| return SLURM_SUCCESS; |
| } |
| debug("Running job scheduler"); |
| job_queue_size = _build_job_queue(&job_queue); |
| if (job_queue_size == 0) { |
| unlock_slurmctld(job_write_lock); |
| return SLURM_SUCCESS; |
| } |
| _sort_job_queue(job_queue, job_queue_size); |
| |
| failed_part_cnt = 0; |
| failed_parts = NULL; |
| for (i = 0; i < job_queue_size; i++) { |
| job_ptr = job_queue[i].job_ptr; |
| if (job_ptr->priority == 0) /* held */ |
| continue; |
| for (j = 0; j < failed_part_cnt; j++) { |
| if (failed_parts[j] == job_ptr->part_ptr) |
| break; |
| } |
| if (j < failed_part_cnt) { |
| job_ptr->state_reason = WAIT_PRIORITY; |
| continue; |
| } |
| |
| error_code = select_nodes(job_ptr, false, NULL); |
| if (error_code == ESLURM_NODES_BUSY) { |
| #ifndef HAVE_BG /* keep trying to schedule jobs in partition */ |
| /* While we use static partitiioning on Blue Gene, |
| * each job can be scheduled independently without |
| * impacting other jobs with different characteristics |
| * (e.g. node-use [virtual or coprocessor] or conn-type |
| * [mesh, torus, or nav]). Because of this we sort and |
| * then try to schedule every pending job. This does |
| * increase the overhead of this job scheduling cycle, |
| * but the only way to effectively avoid this is to |
| * define each SLURM partition as containing a |
| * single Blue Gene job partition type (e.g. |
| * group all Blue Gene job partitions of type |
| * 2x2x2 coprocessor mesh into a single SLURM |
| * partition, say "co-mesh-222") */ |
| xrealloc(failed_parts, |
| (failed_part_cnt + 1) * |
| sizeof(struct part_record *)); |
| failed_parts[failed_part_cnt++] = |
| job_ptr->part_ptr; |
| #endif |
| } else if (error_code == SLURM_SUCCESS) { |
| /* job initiated */ |
| last_job_update = now; |
| #ifdef HAVE_BG |
| select_g_get_jobinfo(job_ptr->select_jobinfo, |
| SELECT_DATA_IONODES, |
| &ionodes); |
| if(ionodes) { |
| sprintf(tmp_char,"%s[%s]", |
| job_ptr->nodes, |
| ionodes); |
| } else { |
| sprintf(tmp_char,"%s",job_ptr->nodes); |
| } |
| info("schedule: JobId=%u BPList=%s", |
| job_ptr->job_id, tmp_char); |
| xfree(ionodes); |
| #else |
| info("schedule: JobId=%u NodeList=%s", |
| job_ptr->job_id, job_ptr->nodes); |
| #endif |
| if (job_ptr->batch_flag) |
| _launch_job(job_ptr); |
| else |
| srun_allocate(job_ptr->job_id); |
| job_cnt++; |
| } else if (error_code != |
| ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE) { |
| info("schedule: JobId=%u non-runnable: %s", |
| job_ptr->job_id, |
| slurm_strerror(error_code)); |
| if (!wiki_sched) { |
| last_job_update = now; |
| job_ptr->job_state = JOB_FAILED; |
| job_ptr->exit_code = 1; |
| job_ptr->state_reason = FAIL_BAD_CONSTRAINTS; |
| job_ptr->start_time = job_ptr->end_time = now; |
| job_completion_logger(job_ptr); |
| delete_job_details(job_ptr); |
| } |
| } |
| } |
| |
| xfree(failed_parts); |
| xfree(job_queue); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("schedule"); |
| return job_cnt; |
| } |
| |
| |
| /* |
| * _sort_job_queue - sort job_queue in decending priority order |
| * IN job_queue_size - count of elements in the job queue |
| * IN/OUT job_queue - pointer to sorted job queue |
| */ |
| static void _sort_job_queue(struct job_queue *job_queue, int job_queue_size) |
| { |
| int i, j, top_prio_inx; |
| int tmp_prio, top_prio; |
| struct job_record *tmp_job_ptr; |
| |
| for (i = 0; i < job_queue_size; i++) { |
| top_prio = job_queue[i].priority; |
| top_prio_inx = i; |
| for (j = (i + 1); j < job_queue_size; j++) { |
| if (top_prio >= job_queue[j].priority) |
| continue; |
| top_prio = job_queue[j].priority; |
| top_prio_inx = j; |
| } |
| if (top_prio_inx == i) |
| continue; |
| tmp_prio = job_queue[i].priority; |
| tmp_job_ptr = job_queue[i].job_ptr; |
| job_queue[i].priority = job_queue[top_prio_inx].priority; |
| job_queue[i].job_ptr = job_queue[top_prio_inx].job_ptr; |
| job_queue[top_prio_inx].priority = tmp_prio; |
| job_queue[top_prio_inx].job_ptr = tmp_job_ptr; |
| } |
| } |
| |
| /* _launch_job - send an RPC to a slurmd to initiate a batch job |
| * IN job_ptr - pointer to job that will be initiated |
| */ |
| static void _launch_job(struct job_record *job_ptr) |
| { |
| batch_job_launch_msg_t *launch_msg_ptr; |
| agent_arg_t *agent_arg_ptr; |
| struct node_record *node_ptr; |
| |
| node_ptr = find_first_node_record(job_ptr->node_bitmap); |
| if (node_ptr == NULL) |
| return; |
| |
| /* Initialization of data structures */ |
| launch_msg_ptr = |
| (batch_job_launch_msg_t *) |
| xmalloc(sizeof(batch_job_launch_msg_t)); |
| launch_msg_ptr->job_id = job_ptr->job_id; |
| launch_msg_ptr->step_id = NO_VAL; |
| launch_msg_ptr->uid = job_ptr->user_id; |
| launch_msg_ptr->gid = job_ptr->group_id; |
| launch_msg_ptr->nprocs = job_ptr->details->num_tasks; |
| launch_msg_ptr->nodes = xstrdup(job_ptr->nodes); |
| launch_msg_ptr->overcommit = job_ptr->details->overcommit; |
| |
| if (make_batch_job_cred(launch_msg_ptr)) { |
| error("aborting batch job %u", job_ptr->job_id); |
| /* FIXME: This is a kludge, but this event indicates a serious |
| * problem with OpenSSH and should never happen. We are |
| * too deep into the job launch to gracefully clean up. */ |
| job_ptr->end_time = time(NULL); |
| job_ptr->time_limit = 0; |
| xfree(launch_msg_ptr->nodes); |
| xfree(launch_msg_ptr); |
| return; |
| } |
| |
| launch_msg_ptr->err = xstrdup(job_ptr->details->err); |
| launch_msg_ptr->in = xstrdup(job_ptr->details->in); |
| launch_msg_ptr->out = xstrdup(job_ptr->details->out); |
| launch_msg_ptr->work_dir = xstrdup(job_ptr->details->work_dir); |
| launch_msg_ptr->argc = job_ptr->details->argc; |
| launch_msg_ptr->argv = _xduparray(job_ptr->details->argc, |
| job_ptr->details->argv); |
| launch_msg_ptr->script = get_job_script(job_ptr); |
| launch_msg_ptr->environment = |
| get_job_env(job_ptr, &launch_msg_ptr->envc); |
| |
| launch_msg_ptr->num_cpu_groups = job_ptr->num_cpu_groups; |
| launch_msg_ptr->cpus_per_node = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(launch_msg_ptr->cpus_per_node, job_ptr->cpus_per_node, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| launch_msg_ptr->cpu_count_reps = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(launch_msg_ptr->cpu_count_reps, job_ptr->cpu_count_reps, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| |
| launch_msg_ptr->select_jobinfo = select_g_copy_jobinfo( |
| job_ptr->select_jobinfo); |
| |
| agent_arg_ptr = (agent_arg_t *) xmalloc(sizeof(agent_arg_t)); |
| agent_arg_ptr->node_count = 1; |
| agent_arg_ptr->retry = 0; |
| agent_arg_ptr->hostlist = hostlist_create(node_ptr->name); |
| agent_arg_ptr->msg_type = REQUEST_BATCH_JOB_LAUNCH; |
| agent_arg_ptr->msg_args = (void *) launch_msg_ptr; |
| |
| /* Launch the RPC via agent */ |
| agent_queue_request(agent_arg_ptr); |
| } |
| |
| static char ** |
| _xduparray(uint16_t size, char ** array) |
| { |
| int i; |
| char ** result; |
| |
| if (size == 0) |
| return (char **)NULL; |
| |
| result = (char **) xmalloc(sizeof(char *) * size); |
| for (i=0; i<size; i++) |
| result[i] = xstrdup(array[i]); |
| return result; |
| } |
| |
| /* |
| * make_batch_job_cred - add a job credential to the batch_job_launch_msg |
| * IN/OUT launch_msg_ptr - batch_job_launch_msg in which job_id, step_id, |
| * uid and nodes have already been set |
| * RET 0 or error code |
| */ |
| extern int make_batch_job_cred(batch_job_launch_msg_t *launch_msg_ptr) |
| { |
| slurm_cred_arg_t cred_arg; |
| |
| cred_arg.jobid = launch_msg_ptr->job_id; |
| cred_arg.stepid = launch_msg_ptr->step_id; |
| cred_arg.uid = launch_msg_ptr->uid; |
| cred_arg.hostlist = launch_msg_ptr->nodes; |
| cred_arg.alloc_lps_cnt = 0; |
| cred_arg.alloc_lps = NULL; |
| |
| launch_msg_ptr->cred = slurm_cred_create(slurmctld_config.cred_ctx, |
| &cred_arg); |
| |
| if (launch_msg_ptr->cred) |
| return SLURM_SUCCESS; |
| error("slurm_cred_create failure for batch job %u", cred_arg.jobid); |
| return SLURM_ERROR; |
| } |