| /*****************************************************************************\ |
| * proc_req.c - process incomming messages to slurmctld |
| * |
| * $Id$ |
| ***************************************************************************** |
| * Copyright (C) 2002-2007 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Morris Jette <jette@llnl.gov>, Kevin Tew |
| * <tew1@llnl.gov>, et. al. |
| * UCRL-CODE-226842. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://www.llnl.gov/linux/slurm/>. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #ifdef WITH_PTHREADS |
| # include <pthread.h> |
| #endif /* WITH_PTHREADS */ |
| |
| #include <errno.h> |
| #include <signal.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include <slurm/slurm_errno.h> |
| |
| #include "src/common/checkpoint.h" |
| #include "src/common/daemonize.h" |
| #include "src/common/fd.h" |
| #include "src/common/hostlist.h" |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/node_select.h" |
| #include "src/common/pack.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_auth.h" |
| #include "src/common/slurm_cred.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/switch.h" |
| #include "src/common/xstring.h" |
| #include "src/common/forward.h" |
| |
| #include "src/slurmctld/agent.h" |
| #include "src/slurmctld/locks.h" |
| #include "src/slurmctld/proc_req.h" |
| #include "src/slurmctld/read_config.h" |
| #include "src/slurmctld/sched_plugin.h" |
| #include "src/slurmctld/slurmctld.h" |
| #include "src/slurmctld/state_save.h" |
| #include "src/slurmctld/trigger_mgr.h" |
| |
| static void _fill_ctld_conf(slurm_ctl_conf_t * build_ptr); |
| static inline bool _is_super_user(uid_t uid); |
| static void _kill_job_on_msg_fail(uint32_t job_id); |
| static int _launch_batch_step(job_desc_msg_t *job_desc_msg, |
| uid_t uid, uint32_t *step_id); |
| static int _make_step_cred(struct step_record *step_rec, |
| slurm_cred_t *slurm_cred); |
| inline static void _slurm_rpc_allocate_resources(slurm_msg_t * msg, |
| slurm_addr *addr); |
| inline static void _slurm_rpc_checkpoint(slurm_msg_t * msg); |
| inline static void _slurm_rpc_checkpoint_comp(slurm_msg_t * msg); |
| inline static void _slurm_rpc_delete_partition(slurm_msg_t * msg); |
| inline static void _slurm_rpc_complete_job_allocation(slurm_msg_t * msg); |
| inline static void _slurm_rpc_complete_batch_script(slurm_msg_t * msg); |
| inline static void _slurm_rpc_dump_conf(slurm_msg_t * msg); |
| inline static void _slurm_rpc_dump_jobs(slurm_msg_t * msg); |
| inline static void _slurm_rpc_dump_nodes(slurm_msg_t * msg); |
| inline static void _slurm_rpc_dump_partitions(slurm_msg_t * msg); |
| inline static void _slurm_rpc_epilog_complete(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_ready(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_step_kill(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_step_create(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_step_get_info(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_will_run(slurm_msg_t * msg); |
| inline static void _slurm_rpc_node_registration(slurm_msg_t * msg); |
| inline static void _slurm_rpc_node_select_info(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_alloc_info(slurm_msg_t * msg); |
| inline static void _slurm_rpc_job_alloc_info_lite(slurm_msg_t * msg); |
| inline static void _slurm_rpc_ping(slurm_msg_t * msg); |
| inline static void _slurm_rpc_reconfigure_controller(slurm_msg_t * msg); |
| inline static void _slurm_rpc_requeue(slurm_msg_t * msg); |
| inline static void _slurm_rpc_shutdown_controller(slurm_msg_t * msg); |
| inline static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * |
| msg); |
| inline static void _slurm_rpc_step_complete(slurm_msg_t * msg); |
| inline static void _slurm_rpc_step_layout(slurm_msg_t * msg); |
| inline static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg); |
| inline static void _slurm_rpc_suspend(slurm_msg_t * msg); |
| inline static void _slurm_rpc_trigger_clear(slurm_msg_t * msg); |
| inline static void _slurm_rpc_trigger_get(slurm_msg_t * msg); |
| inline static void _slurm_rpc_trigger_set(slurm_msg_t * msg); |
| inline static void _slurm_rpc_update_job(slurm_msg_t * msg); |
| inline static void _slurm_rpc_update_node(slurm_msg_t * msg); |
| inline static void _slurm_rpc_update_partition(slurm_msg_t * msg); |
| inline static void _slurm_rpc_end_time(slurm_msg_t * msg); |
| inline static void _update_cred_key(void); |
| |
| |
| /* |
| * slurmctld_req - Process an individual RPC request |
| * IN/OUT msg - the request message, data associated with the message is freed |
| */ |
| void slurmctld_req (slurm_msg_t * msg, slurm_addr *cli_addr) |
| { |
| /* Just to validate the cred */ |
| (void) g_slurm_auth_get_uid(msg->auth_cred); |
| if (g_slurm_auth_errno(msg->auth_cred) != SLURM_SUCCESS) { |
| error("Bad authentication: %s", |
| g_slurm_auth_errstr(g_slurm_auth_errno(msg->auth_cred))); |
| return; |
| } |
| |
| switch (msg->msg_type) { |
| case REQUEST_RESOURCE_ALLOCATION: |
| _slurm_rpc_allocate_resources(msg, cli_addr); |
| slurm_free_job_desc_msg(msg->data); |
| break; |
| case REQUEST_BUILD_INFO: |
| _slurm_rpc_dump_conf(msg); |
| slurm_free_last_update_msg(msg->data); |
| break; |
| case REQUEST_JOB_INFO: |
| _slurm_rpc_dump_jobs(msg); |
| slurm_free_job_info_request_msg(msg->data); |
| break; |
| case REQUEST_JOB_END_TIME: |
| _slurm_rpc_end_time(msg); |
| slurm_free_job_alloc_info_msg(msg->data); |
| break; |
| case REQUEST_NODE_INFO: |
| _slurm_rpc_dump_nodes(msg); |
| slurm_free_node_info_request_msg(msg->data); |
| break; |
| case REQUEST_PARTITION_INFO: |
| _slurm_rpc_dump_partitions(msg); |
| slurm_free_part_info_request_msg(msg->data); |
| break; |
| case MESSAGE_EPILOG_COMPLETE: |
| _slurm_rpc_epilog_complete(msg); |
| slurm_free_epilog_complete_msg(msg->data); |
| break; |
| case REQUEST_CANCEL_JOB_STEP: |
| _slurm_rpc_job_step_kill(msg); |
| slurm_free_job_step_kill_msg(msg->data); |
| break; |
| case REQUEST_COMPLETE_JOB_ALLOCATION: |
| _slurm_rpc_complete_job_allocation(msg); |
| slurm_free_complete_job_allocation_msg(msg->data); |
| break; |
| case REQUEST_COMPLETE_BATCH_SCRIPT: |
| _slurm_rpc_complete_batch_script(msg); |
| slurm_free_complete_batch_script_msg(msg->data); |
| break; |
| case REQUEST_JOB_STEP_CREATE: |
| _slurm_rpc_job_step_create(msg); |
| slurm_free_job_step_create_request_msg(msg->data); |
| break; |
| case REQUEST_JOB_STEP_INFO: |
| _slurm_rpc_job_step_get_info(msg); |
| slurm_free_job_step_info_request_msg(msg->data); |
| break; |
| case REQUEST_JOB_WILL_RUN: |
| _slurm_rpc_job_will_run(msg); |
| slurm_free_job_desc_msg(msg->data); |
| break; |
| case MESSAGE_NODE_REGISTRATION_STATUS: |
| _slurm_rpc_node_registration(msg); |
| slurm_free_node_registration_status_msg(msg->data); |
| break; |
| case REQUEST_JOB_ALLOCATION_INFO: |
| _slurm_rpc_job_alloc_info(msg); |
| slurm_free_job_alloc_info_msg(msg->data); |
| break; |
| case REQUEST_JOB_ALLOCATION_INFO_LITE: |
| _slurm_rpc_job_alloc_info_lite(msg); |
| slurm_free_job_alloc_info_msg(msg->data); |
| break; |
| case REQUEST_PING: |
| _slurm_rpc_ping(msg); |
| /* No body to free */ |
| break; |
| case REQUEST_RECONFIGURE: |
| _slurm_rpc_reconfigure_controller(msg); |
| /* No body to free */ |
| break; |
| case REQUEST_CONTROL: |
| _slurm_rpc_shutdown_controller(msg); |
| /* No body to free */ |
| break; |
| case REQUEST_SHUTDOWN: |
| _slurm_rpc_shutdown_controller(msg); |
| slurm_free_shutdown_msg(msg->data); |
| break; |
| case REQUEST_SHUTDOWN_IMMEDIATE: |
| _slurm_rpc_shutdown_controller_immediate(msg); |
| /* No body to free */ |
| break; |
| case REQUEST_SUBMIT_BATCH_JOB: |
| _slurm_rpc_submit_batch_job(msg); |
| slurm_free_job_desc_msg(msg->data); |
| break; |
| case REQUEST_UPDATE_JOB: |
| _slurm_rpc_update_job(msg); |
| slurm_free_job_desc_msg(msg->data); |
| break; |
| case REQUEST_UPDATE_NODE: |
| _slurm_rpc_update_node(msg); |
| slurm_free_update_node_msg(msg->data); |
| break; |
| case REQUEST_UPDATE_PARTITION: |
| _slurm_rpc_update_partition(msg); |
| slurm_free_update_part_msg(msg->data); |
| break; |
| case REQUEST_DELETE_PARTITION: |
| _slurm_rpc_delete_partition(msg); |
| slurm_free_delete_part_msg(msg->data); |
| break; |
| case REQUEST_NODE_REGISTRATION_STATUS: |
| error("slurmctld is talking with itself. " |
| "SlurmctldPort == SlurmdPort"); |
| slurm_send_rc_msg(msg, EINVAL); |
| break; |
| case REQUEST_CHECKPOINT: |
| _slurm_rpc_checkpoint(msg); |
| slurm_free_checkpoint_msg(msg->data); |
| break; |
| case REQUEST_CHECKPOINT_COMP: |
| _slurm_rpc_checkpoint_comp(msg); |
| slurm_free_checkpoint_comp_msg(msg->data); |
| break; |
| case REQUEST_SUSPEND: |
| _slurm_rpc_suspend(msg); |
| slurm_free_suspend_msg(msg->data); |
| break; |
| case REQUEST_JOB_REQUEUE: |
| _slurm_rpc_requeue(msg); |
| slurm_free_job_id_msg(msg->data); |
| break; |
| case REQUEST_JOB_READY: |
| _slurm_rpc_job_ready(msg); |
| slurm_free_job_id_msg(msg->data); |
| break; |
| case REQUEST_NODE_SELECT_INFO: |
| _slurm_rpc_node_select_info(msg); |
| slurm_free_node_select_msg(msg->data); |
| break; |
| case REQUEST_STEP_COMPLETE: |
| _slurm_rpc_step_complete(msg); |
| slurm_free_step_complete_msg(msg->data); |
| break; |
| case REQUEST_STEP_LAYOUT: |
| _slurm_rpc_step_layout(msg); |
| slurm_free_job_step_id_msg(msg->data); |
| break; |
| case REQUEST_TRIGGER_SET: |
| _slurm_rpc_trigger_set(msg); |
| slurm_free_trigger_msg(msg->data); |
| break; |
| case REQUEST_TRIGGER_GET: |
| _slurm_rpc_trigger_get(msg); |
| slurm_free_trigger_msg(msg->data); |
| break; |
| case REQUEST_TRIGGER_CLEAR: |
| _slurm_rpc_trigger_clear(msg); |
| slurm_free_trigger_msg(msg->data); |
| break; |
| default: |
| error("invalid RPC msg_type=%d", msg->msg_type); |
| slurm_send_rc_msg(msg, EINVAL); |
| break; |
| } |
| } |
| |
| /* |
| * _fill_ctld_conf - make a copy of current slurm configuration |
| * this is done with locks set so the data can change at other times |
| * OUT conf_ptr - place to copy configuration to |
| */ |
| void _fill_ctld_conf(slurm_ctl_conf_t * conf_ptr) |
| { |
| slurm_ctl_conf_t *conf = slurm_conf_lock(); |
| |
| conf_ptr->last_update = time(NULL); |
| conf_ptr->authtype = xstrdup(conf->authtype); |
| conf_ptr->backup_addr = xstrdup(conf->backup_addr); |
| conf_ptr->backup_controller = xstrdup(conf->backup_controller); |
| conf_ptr->cache_groups = conf->cache_groups; |
| conf_ptr->checkpoint_type = xstrdup(conf->checkpoint_type); |
| conf_ptr->control_addr = xstrdup(conf->control_addr); |
| conf_ptr->control_machine = xstrdup(conf->control_machine); |
| conf_ptr->disable_root_jobs = conf->disable_root_jobs; |
| conf_ptr->epilog = xstrdup(conf->epilog); |
| conf_ptr->fast_schedule = conf->fast_schedule; |
| conf_ptr->first_job_id = conf->first_job_id; |
| conf_ptr->inactive_limit = conf->inactive_limit; |
| conf_ptr->job_acct_logfile = xstrdup(conf->job_acct_logfile); |
| conf_ptr->job_acct_freq = conf->job_acct_freq; |
| conf_ptr->job_acct_type = xstrdup(conf->job_acct_type); |
| conf_ptr->job_comp_loc = xstrdup(conf->job_comp_loc); |
| conf_ptr->job_comp_type = xstrdup(conf->job_comp_type); |
| conf_ptr->job_credential_private_key = xstrdup(conf-> |
| job_credential_private_key); |
| conf_ptr->job_credential_public_certificate = xstrdup(conf-> |
| job_credential_public_certificate); |
| conf_ptr->job_file_append = conf->job_file_append; |
| conf_ptr->get_env_timeout = conf->get_env_timeout; |
| conf_ptr->kill_wait = conf->kill_wait; |
| conf_ptr->mail_prog = xstrdup(conf->mail_prog); |
| conf_ptr->max_job_cnt = conf->max_job_cnt; |
| conf_ptr->min_job_age = conf->min_job_age; |
| conf_ptr->mpi_default = xstrdup(conf->mpi_default); |
| conf_ptr->msg_timeout = conf->msg_timeout; |
| conf_ptr->next_job_id = get_next_job_id(); |
| conf_ptr->plugindir = xstrdup(conf->plugindir); |
| conf_ptr->plugstack = xstrdup(conf->plugstack); |
| conf_ptr->proctrack_type = xstrdup(conf->proctrack_type); |
| conf_ptr->prolog = xstrdup(conf->prolog); |
| conf_ptr->propagate_prio_process = |
| slurmctld_conf.propagate_prio_process; |
| conf_ptr->propagate_rlimits = xstrdup(conf->propagate_rlimits); |
| conf_ptr->propagate_rlimits_except = xstrdup(conf-> |
| propagate_rlimits_except); |
| conf_ptr->ret2service = conf->ret2service; |
| conf_ptr->schedport = conf->schedport; |
| conf_ptr->schedrootfltr = conf->schedrootfltr; |
| conf_ptr->schedtype = xstrdup(conf->schedtype); |
| conf_ptr->select_type = xstrdup(conf->select_type); |
| conf_ptr->select_type_param = conf->select_type_param; |
| conf_ptr->slurm_user_id = conf->slurm_user_id; |
| conf_ptr->slurm_user_name = xstrdup(conf->slurm_user_name); |
| conf_ptr->slurmctld_debug = conf->slurmctld_debug; |
| conf_ptr->slurmctld_logfile = xstrdup(conf->slurmctld_logfile); |
| conf_ptr->slurmctld_pidfile = xstrdup(conf->slurmctld_pidfile); |
| conf_ptr->slurmctld_port = conf->slurmctld_port; |
| conf_ptr->slurmctld_timeout = conf->slurmctld_timeout; |
| conf_ptr->slurmd_debug = conf->slurmd_debug; |
| conf_ptr->slurmd_logfile = xstrdup(conf->slurmd_logfile); |
| conf_ptr->slurmd_pidfile = xstrdup(conf->slurmd_pidfile); |
| conf_ptr->slurmd_port = conf->slurmd_port; |
| conf_ptr->slurmd_spooldir = xstrdup(conf->slurmd_spooldir); |
| conf_ptr->slurmd_timeout = conf->slurmd_timeout; |
| conf_ptr->slurm_conf = xstrdup(conf->slurm_conf); |
| conf_ptr->state_save_location = xstrdup(conf->state_save_location); |
| conf_ptr->switch_type = xstrdup(conf->switch_type); |
| conf_ptr->task_epilog = xstrdup(conf->task_epilog); |
| conf_ptr->task_prolog = xstrdup(conf->task_prolog); |
| conf_ptr->task_plugin = xstrdup(conf->task_plugin); |
| conf_ptr->task_plugin_param = conf->task_plugin_param; |
| conf_ptr->tmp_fs = xstrdup(conf->tmp_fs); |
| conf_ptr->wait_time = conf->wait_time; |
| conf_ptr->srun_prolog = xstrdup(conf->srun_prolog); |
| conf_ptr->srun_epilog = xstrdup(conf->srun_epilog); |
| conf_ptr->node_prefix = xstrdup(conf->node_prefix); |
| conf_ptr->tree_width = conf->tree_width; |
| conf_ptr->use_pam = conf->use_pam; |
| conf_ptr->unkillable_program = xstrdup(conf->unkillable_program); |
| conf_ptr->unkillable_timeout = conf->unkillable_timeout; |
| |
| slurm_conf_unlock(); |
| return; |
| } |
| |
| /* return true if supplied uid is a super-user: root, self, or SlurmUser */ |
| static inline bool _is_super_user(uid_t uid) |
| { |
| /* READ lock_slurmctld config would be ideal here, but |
| * that value should be identical to getuid() anyway. |
| * privileged calls should be coming from user root too, |
| * so we forgo the overhead here. */ |
| if ( (uid == 0) || |
| (uid == slurmctld_conf.slurm_user_id) || |
| (uid == getuid()) ) |
| return true; |
| else |
| return false; |
| } |
| |
| /* _kill_job_on_msg_fail - The request to create a job record successed, |
| * but the reply message to srun failed. We kill the job to avoid |
| * leaving it orphaned */ |
| static void _kill_job_on_msg_fail(uint32_t job_id) |
| { |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| |
| error("Job allocate response msg send failure, killing JobId=%u", |
| job_id); |
| lock_slurmctld(job_write_lock); |
| job_complete(job_id, 0, false, 0); |
| unlock_slurmctld(job_write_lock); |
| } |
| |
| /* create a credential for a given job step, return error code */ |
| static int _make_step_cred(struct step_record *step_rec, |
| slurm_cred_t *slurm_cred) |
| { |
| slurm_cred_arg_t cred_arg; |
| |
| cred_arg.jobid = step_rec->job_ptr->job_id; |
| cred_arg.stepid = step_rec->step_id; |
| cred_arg.uid = step_rec->job_ptr->user_id; |
| cred_arg.hostlist = step_rec->step_layout->node_list; |
| if(step_rec->job_ptr->details->shared == 0) |
| cred_arg.alloc_lps_cnt = 0; |
| else |
| cred_arg.alloc_lps_cnt = step_rec->job_ptr->alloc_lps_cnt; |
| if (cred_arg.alloc_lps_cnt > 0) { |
| cred_arg.alloc_lps = xmalloc(cred_arg.alloc_lps_cnt * |
| sizeof(uint32_t)); |
| memcpy(cred_arg.alloc_lps, step_rec->job_ptr->alloc_lps, |
| cred_arg.alloc_lps_cnt*sizeof(uint32_t)); |
| } else |
| cred_arg.alloc_lps = NULL; |
| |
| *slurm_cred = slurm_cred_create(slurmctld_config.cred_ctx, |
| &cred_arg); |
| xfree(cred_arg.alloc_lps); |
| if (*slurm_cred == NULL) { |
| error("slurm_cred_create error"); |
| return ESLURM_INVALID_JOB_CREDENTIAL; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Set alloc_resp_host to string representation of addr |
| */ |
| static void _set_alloc_resp_hostname (job_desc_msg_t *job, slurm_addr *addr) |
| { |
| char host [256]; |
| uint16_t port; |
| slurm_get_ip_str (addr, &port, host, sizeof (host)); |
| |
| xfree (job->alloc_resp_hostname); |
| job->alloc_resp_hostname = xstrdup (host); |
| xfree (job->other_hostname); |
| job->other_hostname = xstrdup (host); |
| } |
| |
| /* _slurm_rpc_allocate_resources: process RPC to allocate resources for |
| * a job */ |
| static void _slurm_rpc_allocate_resources(slurm_msg_t * msg, slurm_addr *addr) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| slurm_msg_t response_msg; |
| DEF_TIMERS; |
| job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data; |
| resource_allocation_response_msg_t alloc_msg; |
| /* Locks: Read config, write job, write node, read partition */ |
| slurmctld_lock_t job_write_lock = { |
| READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK }; |
| uid_t uid; |
| int immediate = job_desc_msg->immediate; |
| bool do_unlock = false; |
| bool job_waiting = false; |
| struct job_record *job_ptr; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_RESOURCE_ALLOCATION"); |
| |
| _set_alloc_resp_hostname (job_desc_msg, addr); |
| |
| /* do RPC call */ |
| dump_job_desc(job_desc_msg); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error("Security violation, RESOURCE_ALLOCATE from uid=%u", |
| (unsigned int) uid); |
| } |
| |
| if (error_code == SLURM_SUCCESS) { |
| do_unlock = true; |
| lock_slurmctld(job_write_lock); |
| error_code = job_allocate(job_desc_msg, |
| immediate, false, |
| true, uid, &job_ptr); |
| /* unlock after finished using the job structure data */ |
| END_TIMER2("_slurm_rpc_allocate_resources"); |
| } |
| |
| /* return result */ |
| if ((error_code == ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE) |
| || (error_code == ESLURM_JOB_HELD)) |
| job_waiting = true; |
| |
| if ((error_code == SLURM_SUCCESS) |
| || ((immediate == 0) && job_waiting)) { |
| xassert(job_ptr); |
| info("_slurm_rpc_allocate_resources JobId=%u NodeList=%s %s", |
| job_ptr->job_id, job_ptr->nodes, TIME_STR); |
| |
| /* send job_ID and node_name_ptr */ |
| alloc_msg.cpu_count_reps = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(alloc_msg.cpu_count_reps, job_ptr->cpu_count_reps, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| alloc_msg.cpus_per_node = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(alloc_msg.cpus_per_node, job_ptr->cpus_per_node, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| alloc_msg.error_code = error_code; |
| alloc_msg.job_id = job_ptr->job_id; |
| alloc_msg.node_cnt = job_ptr->node_cnt; |
| alloc_msg.node_list = xstrdup(job_ptr->nodes); |
| alloc_msg.num_cpu_groups = job_ptr->num_cpu_groups; |
| alloc_msg.select_jobinfo = |
| select_g_copy_jobinfo(job_ptr->select_jobinfo); |
| unlock_slurmctld(job_write_lock); |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.msg_type = RESPONSE_RESOURCE_ALLOCATION; |
| response_msg.data = &alloc_msg; |
| |
| if (slurm_send_node_msg(msg->conn_fd, &response_msg) < 0) |
| _kill_job_on_msg_fail(job_ptr->job_id); |
| xfree(alloc_msg.cpu_count_reps); |
| xfree(alloc_msg.cpus_per_node); |
| xfree(alloc_msg.node_list); |
| select_g_free_jobinfo(&alloc_msg.select_jobinfo); |
| schedule_job_save(); /* has own locks */ |
| schedule_node_save(); /* has own locks */ |
| } else { /* allocate error */ |
| if (do_unlock) |
| unlock_slurmctld(job_write_lock); |
| info("_slurm_rpc_allocate_resources: %s ", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } |
| } |
| |
| /* _slurm_rpc_dump_conf - process RPC for Slurm configuration information */ |
| static void _slurm_rpc_dump_conf(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| slurm_msg_t response_msg; |
| last_update_msg_t *last_time_msg = (last_update_msg_t *) msg->data; |
| slurm_ctl_conf_info_msg_t config_tbl; |
| /* Locks: Read config */ |
| slurmctld_lock_t config_read_lock = { |
| READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_BUILD_INFO"); |
| lock_slurmctld(config_read_lock); |
| |
| /* check to see if configuration data has changed */ |
| if ((last_time_msg->last_update - 1) >= slurmctld_conf.last_update) { |
| unlock_slurmctld(config_read_lock); |
| debug2("_slurm_rpc_dump_conf, no change"); |
| slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA); |
| } else { |
| _fill_ctld_conf(&config_tbl); |
| unlock_slurmctld(config_read_lock); |
| END_TIMER2("_slurm_rpc_dump_conf"); |
| |
| /* init response_msg structure */ |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_BUILD_INFO; |
| response_msg.data = &config_tbl; |
| |
| /* send message */ |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| free_slurm_conf(&config_tbl, false); |
| } |
| } |
| |
| /* _slurm_rpc_dump_jobs - process RPC for job state information */ |
| static void _slurm_rpc_dump_jobs(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| char *dump; |
| int dump_size; |
| slurm_msg_t response_msg; |
| job_info_request_msg_t *job_info_request_msg = |
| (job_info_request_msg_t *) msg->data; |
| /* Locks: Read job, write node (for hiding) */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_INFO"); |
| lock_slurmctld(job_read_lock); |
| |
| if ((job_info_request_msg->last_update - 1) >= last_job_update) { |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_dump_jobs, no change"); |
| slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA); |
| } else { |
| pack_all_jobs(&dump, &dump_size, |
| job_info_request_msg->show_flags, |
| g_slurm_auth_get_uid(msg->auth_cred)); |
| unlock_slurmctld(job_read_lock); |
| END_TIMER2("_slurm_rpc_dump_jobs"); |
| debug2("_slurm_rpc_dump_jobs, size=%d %s", |
| dump_size, TIME_STR); |
| |
| /* init response_msg structure */ |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_JOB_INFO; |
| response_msg.data = dump; |
| response_msg.data_size = dump_size; |
| |
| /* send message */ |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| xfree(dump); |
| } |
| } |
| |
| /* _slurm_rpc_end_time - Process RPC for job end time */ |
| static void _slurm_rpc_end_time(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| job_alloc_info_msg_t *time_req_msg = |
| (job_alloc_info_msg_t *) msg->data; |
| srun_timeout_msg_t timeout_msg; |
| slurm_msg_t response_msg; |
| int rc; |
| /* Locks: Read job */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST JOB_END_TIME"); |
| lock_slurmctld(job_read_lock); |
| rc = job_end_time(time_req_msg, &timeout_msg); |
| unlock_slurmctld(job_read_lock); |
| END_TIMER2("_slurm_rpc_end_time"); |
| |
| if (rc != SLURM_SUCCESS) { |
| slurm_send_rc_msg(msg, rc); |
| } else { |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = SRUN_TIMEOUT; |
| response_msg.data = &timeout_msg; |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| } |
| debug2("_slurm_rpc_end_time jobid=%u %s", |
| time_req_msg->job_id, TIME_STR); |
| } |
| |
| /* _slurm_rpc_dump_nodes - process RPC for node state information */ |
| static void _slurm_rpc_dump_nodes(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| char *dump; |
| int dump_size; |
| slurm_msg_t response_msg; |
| node_info_request_msg_t *node_req_msg = |
| (node_info_request_msg_t *) msg->data; |
| /* Locks: Read config, read node, write node (for hiding) */ |
| slurmctld_lock_t node_read_lock = { |
| READ_LOCK, NO_LOCK, READ_LOCK, WRITE_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_NODE_INFO"); |
| lock_slurmctld(node_read_lock); |
| |
| if ((node_req_msg->last_update - 1) >= last_node_update) { |
| unlock_slurmctld(node_read_lock); |
| debug2("_slurm_rpc_dump_nodes, no change"); |
| slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA); |
| } else { |
| pack_all_node(&dump, &dump_size, node_req_msg->show_flags, |
| g_slurm_auth_get_uid(msg->auth_cred)); |
| unlock_slurmctld(node_read_lock); |
| END_TIMER2("_slurm_rpc_dump_nodes"); |
| debug2("_slurm_rpc_dump_nodes, size=%d %s", |
| dump_size, TIME_STR); |
| |
| /* init response_msg structure */ |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_NODE_INFO; |
| response_msg.data = dump; |
| response_msg.data_size = dump_size; |
| |
| /* send message */ |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| xfree(dump); |
| } |
| } |
| |
| /* _slurm_rpc_dump_partitions - process RPC for partition state information */ |
| static void _slurm_rpc_dump_partitions(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| char *dump; |
| int dump_size; |
| slurm_msg_t response_msg; |
| part_info_request_msg_t *part_req_msg; |
| |
| /* Locks: Read partition */ |
| slurmctld_lock_t part_read_lock = { |
| NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_PARTITION_INFO"); |
| part_req_msg = (part_info_request_msg_t *) msg->data; |
| lock_slurmctld(part_read_lock); |
| |
| if ((part_req_msg->last_update - 1) >= last_part_update) { |
| unlock_slurmctld(part_read_lock); |
| debug2("_slurm_rpc_dump_partitions, no change"); |
| slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA); |
| } else { |
| pack_all_part(&dump, &dump_size, part_req_msg->show_flags, |
| g_slurm_auth_get_uid(msg->auth_cred)); |
| unlock_slurmctld(part_read_lock); |
| END_TIMER2("_slurm_rpc_dump_partitions"); |
| debug2("_slurm_rpc_dump_partitions, size=%d %s", |
| dump_size, TIME_STR); |
| |
| /* init response_msg structure */ |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_PARTITION_INFO; |
| response_msg.data = dump; |
| response_msg.data_size = dump_size; |
| |
| /* send message */ |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| xfree(dump); |
| } |
| } |
| |
| /* _slurm_rpc_epilog_complete - process RPC noting the completion of |
| * the epilog denoting the completion of a job it its entirety */ |
| static void _slurm_rpc_epilog_complete(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| epilog_complete_msg_t *epilog_msg = |
| (epilog_complete_msg_t *) msg->data; |
| bool run_scheduler = false; |
| |
| START_TIMER; |
| debug2("Processing RPC: MESSAGE_EPILOG_COMPLETE"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error("Security violation, EPILOG_COMPLETE RPC from uid=%u", |
| (unsigned int) uid); |
| return; |
| } |
| |
| lock_slurmctld(job_write_lock); |
| if (job_epilog_complete(epilog_msg->job_id, epilog_msg->node_name, |
| epilog_msg->return_code)) |
| run_scheduler = true; |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_epilog_complete"); |
| |
| if (epilog_msg->return_code) |
| error("_slurm_rpc_epilog_complete JobId=%u Node=%s Err=%s %s", |
| epilog_msg->job_id, epilog_msg->node_name, |
| slurm_strerror(epilog_msg->return_code), TIME_STR); |
| else |
| debug2("_slurm_rpc_epilog_complete JobId=%u Node=%s %s", |
| epilog_msg->job_id, epilog_msg->node_name, |
| TIME_STR); |
| |
| /* Functions below provide their own locking */ |
| if (run_scheduler) { |
| (void) schedule(); |
| schedule_node_save(); |
| schedule_job_save(); |
| } |
| |
| /* NOTE: RPC has no response */ |
| } |
| |
| /* _slurm_rpc_job_step_kill - process RPC to cancel an entire job or |
| * an individual job step */ |
| static void _slurm_rpc_job_step_kill(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| job_step_kill_msg_t *job_step_kill_msg = |
| (job_step_kill_msg_t *) msg->data; |
| /* Locks: Read config, write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_CANCEL_JOB_STEP"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| lock_slurmctld(job_write_lock); |
| |
| /* do RPC call */ |
| if (job_step_kill_msg->job_step_id == SLURM_BATCH_SCRIPT) { |
| /* NOTE: SLURM_BATCH_SCRIPT == NO_VAL */ |
| error_code = job_signal(job_step_kill_msg->job_id, |
| job_step_kill_msg->signal, |
| job_step_kill_msg->batch_flag, uid); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_job_step_kill"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_job_step_kill JobId=%u: %s", |
| job_step_kill_msg->job_id, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_job_step_kill JobId=%u %s", |
| job_step_kill_msg->job_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| |
| /* Below function provides its own locking */ |
| schedule_job_save(); |
| } |
| } else { |
| error_code = job_step_signal(job_step_kill_msg->job_id, |
| job_step_kill_msg->job_step_id, |
| job_step_kill_msg->signal, |
| uid); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_job_step_kill"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_job_step_kill StepId=%u.%u: %s", |
| job_step_kill_msg->job_id, |
| job_step_kill_msg->job_step_id, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_job_step_kill StepId=%u.%u %s", |
| job_step_kill_msg->job_id, |
| job_step_kill_msg->job_step_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| |
| /* Below function provides its own locking */ |
| schedule_job_save(); |
| } |
| } |
| |
| } |
| |
| /* _slurm_rpc_complete_job_allocation - process RPC to note the |
| * completion of a job allocation */ |
| static void _slurm_rpc_complete_job_allocation(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| complete_job_allocation_msg_t *comp_msg = |
| (complete_job_allocation_msg_t *) msg->data; |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK |
| }; |
| uid_t uid; |
| bool job_requeue = false; |
| |
| /* init */ |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_COMPLETE_JOB_ALLOCATION %u", |
| comp_msg->job_id); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| lock_slurmctld(job_write_lock); |
| |
| /* do RPC call */ |
| /* Mark job and/or job step complete */ |
| error_code = job_complete(comp_msg->job_id, uid, |
| job_requeue, comp_msg->job_rc); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_complete_job_allocation"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_complete_job_allocation JobId=%u: %s ", |
| comp_msg->job_id, slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_complete_job_allocation JobId=%u %s", |
| comp_msg->job_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| (void) schedule_job_save(); /* Has own locking */ |
| (void) schedule_node_save(); /* Has own locking */ |
| } |
| } |
| |
| /* _slurm_rpc_complete_batch - process RPC from slurmstepd to note the |
| * completion of a batch script */ |
| static void _slurm_rpc_complete_batch_script(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| complete_batch_script_msg_t *comp_msg = |
| (complete_batch_script_msg_t *) msg->data; |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK |
| }; |
| uid_t uid; |
| bool job_requeue = false; |
| bool dump_job = false, dump_node = false; |
| |
| /* init */ |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_COMPLETE_BATCH_SCRIPT %u", |
| comp_msg->job_id); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| if (!_is_super_user(uid)) { |
| /* Only the slurmstepd can complete a batch script */ |
| END_TIMER2("_slurm_rpc_complete_batch_script"); |
| return; |
| } |
| |
| lock_slurmctld(job_write_lock); |
| |
| /* do RPC call */ |
| /* First set node DOWN if fatal error */ |
| if (comp_msg->slurm_rc == ESLURM_ALREADY_DONE) { |
| /* race condition on job termination, not a real error */ |
| info("slurmd error running JobId=%u from node=%s: %s", |
| comp_msg->job_id, |
| comp_msg->node_name, |
| slurm_strerror(comp_msg->slurm_rc)); |
| comp_msg->slurm_rc = SLURM_SUCCESS; |
| } |
| if (comp_msg->slurm_rc != SLURM_SUCCESS) { |
| error("Fatal slurmd error %u running JobId=%u on node=%s: %s", |
| comp_msg->slurm_rc, |
| comp_msg->job_id, |
| comp_msg->node_name, |
| slurm_strerror(comp_msg->slurm_rc)); |
| if (error_code == SLURM_SUCCESS) { |
| update_node_msg_t update_node_msg; |
| update_node_msg.node_names = |
| comp_msg->node_name; |
| update_node_msg.features = NULL; |
| update_node_msg.node_state = NODE_STATE_DOWN; |
| update_node_msg.reason = "step complete failure"; |
| error_code = update_node(&update_node_msg); |
| if (comp_msg->job_rc != SLURM_SUCCESS) |
| job_requeue = true; |
| dump_job = true; |
| dump_node = true; |
| } |
| } |
| |
| /* Mark job allocation complete */ |
| error_code = job_complete(comp_msg->job_id, uid, |
| job_requeue, comp_msg->job_rc); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_complete_batch_script"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_complete_batch_script JobId=%u: %s ", |
| comp_msg->job_id, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_complete_batch_script JobId=%u %s", |
| comp_msg->job_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| dump_job = true; |
| } |
| if (dump_job) |
| (void) schedule_job_save(); /* Has own locking */ |
| if (dump_node) |
| (void) schedule_node_save(); /* Has own locking */ |
| } |
| |
| /* _slurm_rpc_job_step_create - process RPC to creates/registers a job step |
| * with the step_mgr */ |
| static void _slurm_rpc_job_step_create(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| slurm_msg_t resp; |
| struct step_record *step_rec; |
| job_step_create_response_msg_t job_step_resp; |
| job_step_create_request_msg_t *req_step_msg = |
| (job_step_create_request_msg_t *) msg->data; |
| slurm_cred_t slurm_cred = (slurm_cred_t) NULL; |
| /* Locks: Write jobs, read nodes */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_STEP_CREATE"); |
| |
| dump_step_desc(req_step_msg); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (uid != req_step_msg->user_id) { |
| error("Security violation, JOB_STEP_CREATE RPC from uid=%u " |
| "to run as uid %u", |
| (unsigned int) uid, req_step_msg->user_id); |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| return; |
| } |
| |
| #ifdef HAVE_FRONT_END /* Limited job step support */ |
| /* Non-super users not permitted to run job steps on front-end. |
| * A single slurmd can not handle a heavy load. */ |
| if (!_is_super_user(uid)) { |
| info("Attempt to execute job step by uid=%u", |
| (unsigned int) uid); |
| slurm_send_rc_msg(msg, ESLURM_BATCH_ONLY); |
| return; |
| } |
| #endif |
| if (error_code == SLURM_SUCCESS) { |
| /* issue the RPC */ |
| lock_slurmctld(job_write_lock); |
| error_code = |
| step_create(req_step_msg, &step_rec, false, false); |
| } |
| if (error_code == SLURM_SUCCESS) |
| error_code = _make_step_cred(step_rec, &slurm_cred); |
| END_TIMER2("_slurm_rpc_job_step_create"); |
| |
| /* return result */ |
| if (error_code) { |
| unlock_slurmctld(job_write_lock); |
| error("_slurm_rpc_job_step_create: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| slurm_step_layout_t *layout = step_rec->step_layout; |
| |
| info("_slurm_rpc_job_step_create: StepId=%u.%u %s %s", |
| step_rec->job_ptr->job_id, step_rec->step_id, |
| req_step_msg->node_list, TIME_STR); |
| |
| job_step_resp.job_step_id = step_rec->step_id; |
| job_step_resp.step_layout = slurm_step_layout_copy(layout); |
| |
| job_step_resp.cred = slurm_cred; |
| job_step_resp.switch_job = switch_copy_jobinfo( |
| step_rec->switch_job); |
| |
| unlock_slurmctld(job_write_lock); |
| slurm_msg_t_init(&resp); |
| resp.address = msg->address; |
| resp.msg_type = RESPONSE_JOB_STEP_CREATE; |
| resp.data = &job_step_resp; |
| |
| slurm_send_node_msg(msg->conn_fd, &resp); |
| slurm_step_layout_destroy(job_step_resp.step_layout); |
| slurm_cred_destroy(slurm_cred); |
| switch_free_jobinfo(job_step_resp.switch_job); |
| schedule_job_save(); /* Sets own locks */ |
| } |
| } |
| |
| /* _slurm_rpc_job_step_get_info - process request for job step info */ |
| static void _slurm_rpc_job_step_get_info(slurm_msg_t * msg) |
| { |
| DEF_TIMERS; |
| void *resp_buffer = NULL; |
| int resp_buffer_size = 0; |
| int error_code = SLURM_SUCCESS; |
| job_step_info_request_msg_t *request = |
| (job_step_info_request_msg_t *) msg->data; |
| /* Locks: Read job, write partition (for filtering) */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, NO_LOCK, WRITE_LOCK }; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_STEP_INFO"); |
| |
| lock_slurmctld(job_read_lock); |
| |
| if ((request->last_update - 1) >= last_job_update) { |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_job_step_get_info, no change"); |
| error_code = SLURM_NO_CHANGE_IN_DATA; |
| } else { |
| Buf buffer = init_buf(BUF_SIZE); |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred); |
| error_code = pack_ctld_job_step_info_response_msg( |
| request->job_id, request->step_id, |
| uid, request->show_flags, buffer); |
| unlock_slurmctld(job_read_lock); |
| END_TIMER2("_slurm_rpc_job_step_get_info"); |
| if (error_code) { |
| /* job_id:step_id not found or otherwise *\ |
| \* error message is printed elsewhere */ |
| debug2("_slurm_rpc_job_step_get_info: %s", |
| slurm_strerror(error_code)); |
| free_buf(buffer); |
| } else { |
| resp_buffer_size = get_buf_offset(buffer); |
| resp_buffer = xfer_buf_data(buffer); |
| debug2("_slurm_rpc_job_step_get_info size=%d %s", |
| resp_buffer_size, TIME_STR); |
| } |
| } |
| |
| if (error_code) |
| slurm_send_rc_msg(msg, error_code); |
| else { |
| slurm_msg_t response_msg; |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_JOB_STEP_INFO; |
| response_msg.data = resp_buffer; |
| response_msg.data_size = resp_buffer_size; |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| xfree(resp_buffer); |
| } |
| } |
| |
| /* _slurm_rpc_job_will_run - process RPC to determine if job with given |
| * configuration can be initiated */ |
| static void _slurm_rpc_job_will_run(slurm_msg_t * msg) |
| { |
| /* init */ |
| DEF_TIMERS; |
| int error_code = SLURM_SUCCESS; |
| struct job_record *job_ptr; |
| job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data; |
| /* Locks: Write job, read node, read partition */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_WILL_RUN"); |
| |
| /* do RPC call */ |
| dump_job_desc(job_desc_msg); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error("Security violation, JOB_WILL_RUN RPC from uid=%u", |
| (unsigned int) uid); |
| } |
| |
| if (error_code == SLURM_SUCCESS) { |
| lock_slurmctld(job_write_lock); |
| error_code = job_allocate(job_desc_msg, |
| true, true, true, uid, &job_ptr); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_job_will_run"); |
| } |
| |
| /* return result */ |
| if (error_code) { |
| debug2("_slurm_rpc_job_will_run: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_job_will_run success %s", TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| } |
| } |
| |
| /* _slurm_rpc_node_registration - process RPC to determine if a node's |
| * actual configuration satisfies the configured specification */ |
| static void _slurm_rpc_node_registration(slurm_msg_t * msg) |
| { |
| /* init */ |
| DEF_TIMERS; |
| int error_code = SLURM_SUCCESS; |
| slurm_node_registration_status_msg_t *node_reg_stat_msg = |
| (slurm_node_registration_status_msg_t *) msg->data; |
| /* Locks: Read config, write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: MESSAGE_NODE_REGISTRATION_STATUS"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error("Security violation, NODE_REGISTER RPC from uid=%u", |
| (unsigned int) uid); |
| } |
| if (error_code == SLURM_SUCCESS) { |
| /* do RPC call */ |
| lock_slurmctld(job_write_lock); |
| #ifdef HAVE_FRONT_END /* Operates only on front-end */ |
| error_code = validate_nodes_via_front_end( |
| node_reg_stat_msg->job_count, |
| node_reg_stat_msg->job_id, |
| node_reg_stat_msg->step_id, |
| node_reg_stat_msg->status); |
| #else |
| validate_jobs_on_node(node_reg_stat_msg->node_name, |
| &node_reg_stat_msg->job_count, |
| node_reg_stat_msg->job_id, |
| node_reg_stat_msg->step_id); |
| error_code = |
| validate_node_specs(node_reg_stat_msg->node_name, |
| node_reg_stat_msg->cpus, |
| node_reg_stat_msg->sockets, |
| node_reg_stat_msg->cores, |
| node_reg_stat_msg->threads, |
| node_reg_stat_msg->real_memory_size, |
| node_reg_stat_msg->temporary_disk_space, |
| node_reg_stat_msg->job_count, |
| node_reg_stat_msg->status); |
| #endif |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_node_registration"); |
| } |
| |
| /* return result */ |
| if (error_code) { |
| error("_slurm_rpc_node_registration node=%s: %s", |
| node_reg_stat_msg->node_name, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_node_registration complete for %s %s", |
| node_reg_stat_msg->node_name, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| } |
| } |
| |
| /* _slurm_rpc_job_alloc_info - process RPC to get details on existing job */ |
| static void _slurm_rpc_job_alloc_info(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| slurm_msg_t response_msg; |
| struct job_record *job_ptr; |
| DEF_TIMERS; |
| job_alloc_info_msg_t *job_info_msg = |
| (job_alloc_info_msg_t *) msg->data; |
| job_alloc_info_response_msg_t job_info_resp_msg; |
| /* Locks: Read job, read node */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK }; |
| uid_t uid; |
| bool do_unlock = false; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_ALLOCATION_INFO"); |
| |
| /* do RPC call */ |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| do_unlock = true; |
| lock_slurmctld(job_read_lock); |
| error_code = job_alloc_info(uid, job_info_msg->job_id, &job_ptr); |
| END_TIMER2("_slurm_rpc_job_alloc_info"); |
| |
| /* return result */ |
| if (error_code || (job_ptr == NULL)) { |
| if (do_unlock) |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_job_alloc_info: JobId=%u, uid=%u: %s", |
| job_info_msg->job_id, uid, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_job_alloc_info JobId=%u NodeList=%s %s", |
| job_info_msg->job_id, job_ptr->nodes, TIME_STR); |
| |
| /* send job_ID and node_name_ptr */ |
| job_info_resp_msg.cpu_count_reps = |
| xmalloc(sizeof(uint32_t) * job_ptr->num_cpu_groups); |
| memcpy(job_info_resp_msg.cpu_count_reps, |
| job_ptr->cpu_count_reps, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| job_info_resp_msg.cpus_per_node = |
| xmalloc(sizeof(uint32_t) * job_ptr->num_cpu_groups); |
| memcpy(job_info_resp_msg.cpus_per_node, job_ptr->cpus_per_node, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| job_info_resp_msg.error_code = error_code; |
| job_info_resp_msg.job_id = job_info_msg->job_id; |
| job_info_resp_msg.node_addr = xmalloc(sizeof(slurm_addr) * |
| job_ptr->node_cnt); |
| memcpy(job_info_resp_msg.node_addr, job_ptr->node_addr, |
| (sizeof(slurm_addr) * job_ptr->node_cnt)); |
| job_info_resp_msg.node_cnt = job_ptr->node_cnt; |
| job_info_resp_msg.node_list = xstrdup(job_ptr->nodes); |
| job_info_resp_msg.num_cpu_groups = job_ptr->num_cpu_groups; |
| job_info_resp_msg.select_jobinfo = |
| select_g_copy_jobinfo(job_ptr->select_jobinfo); |
| unlock_slurmctld(job_read_lock); |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.msg_type = RESPONSE_JOB_ALLOCATION_INFO; |
| response_msg.data = &job_info_resp_msg; |
| |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| select_g_free_jobinfo(&job_info_resp_msg.select_jobinfo); |
| xfree(job_info_resp_msg.cpu_count_reps); |
| xfree(job_info_resp_msg.cpus_per_node); |
| xfree(job_info_resp_msg.node_addr); |
| xfree(job_info_resp_msg.node_list); |
| } |
| } |
| |
| /* _slurm_rpc_job_alloc_info_lite - process RPC to get minor details |
| on existing job */ |
| static void _slurm_rpc_job_alloc_info_lite(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| slurm_msg_t response_msg; |
| struct job_record *job_ptr; |
| DEF_TIMERS; |
| job_alloc_info_msg_t *job_info_msg = |
| (job_alloc_info_msg_t *) msg->data; |
| resource_allocation_response_msg_t job_info_resp_msg; |
| /* Locks: Read job, read node */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK }; |
| uid_t uid; |
| bool do_unlock = false; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_JOB_ALLOCATION_INFO_LITE"); |
| |
| /* do RPC call */ |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| do_unlock = true; |
| lock_slurmctld(job_read_lock); |
| error_code = job_alloc_info(uid, job_info_msg->job_id, &job_ptr); |
| END_TIMER2("_slurm_rpc_job_alloc_info_lite"); |
| |
| /* return result */ |
| if (error_code || (job_ptr == NULL)) { |
| if (do_unlock) |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_job_alloc_info_lite: JobId=%u, uid=%u: %s", |
| job_info_msg->job_id, uid, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_job_alloc_info_lite JobId=%u NodeList=%s %s", |
| job_info_msg->job_id, job_ptr->nodes, TIME_STR); |
| |
| /* send job_ID and node_name_ptr */ |
| job_info_resp_msg.cpu_count_reps = |
| xmalloc(sizeof(uint32_t) * job_ptr->num_cpu_groups); |
| memcpy(job_info_resp_msg.cpu_count_reps, |
| job_ptr->cpu_count_reps, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| job_info_resp_msg.cpus_per_node = |
| xmalloc(sizeof(uint32_t) * job_ptr->num_cpu_groups); |
| memcpy(job_info_resp_msg.cpus_per_node, job_ptr->cpus_per_node, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| job_info_resp_msg.error_code = error_code; |
| job_info_resp_msg.job_id = job_info_msg->job_id; |
| job_info_resp_msg.node_cnt = job_ptr->node_cnt; |
| job_info_resp_msg.node_list = xstrdup(job_ptr->nodes); |
| job_info_resp_msg.num_cpu_groups = job_ptr->num_cpu_groups; |
| job_info_resp_msg.select_jobinfo = |
| select_g_copy_jobinfo(job_ptr->select_jobinfo); |
| unlock_slurmctld(job_read_lock); |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.msg_type = RESPONSE_JOB_ALLOCATION_INFO_LITE; |
| response_msg.data = &job_info_resp_msg; |
| |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| select_g_free_jobinfo(&job_info_resp_msg.select_jobinfo); |
| xfree(job_info_resp_msg.cpu_count_reps); |
| xfree(job_info_resp_msg.cpus_per_node); |
| xfree(job_info_resp_msg.node_list); |
| } |
| } |
| |
| /* _slurm_rpc_ping - process ping RPC */ |
| static void _slurm_rpc_ping(slurm_msg_t * msg) |
| { |
| /* We could authenticate here, if desired */ |
| |
| /* return result */ |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| } |
| |
| |
| /* _slurm_rpc_reconfigure_controller - process RPC to re-initialize |
| * slurmctld from configuration file */ |
| static void _slurm_rpc_reconfigure_controller(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| static bool in_progress = false; |
| |
| DEF_TIMERS; |
| /* Locks: Write configuration, job, node and partition */ |
| slurmctld_lock_t config_write_lock = { |
| WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| info("Processing RPC: REQUEST_RECONFIGURE"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error("Security violation, RECONFIGURE RPC from uid=%u", |
| (unsigned int) uid); |
| error_code = ESLURM_USER_ID_MISSING; |
| } |
| if (in_progress) |
| error_code = EINPROGRESS; |
| |
| /* do RPC call */ |
| if (error_code == SLURM_SUCCESS) { |
| lock_slurmctld(config_write_lock); |
| in_progress = true; |
| error_code = read_slurm_conf(0); |
| if (error_code == SLURM_SUCCESS) { |
| _update_cred_key(); |
| set_slurmctld_state_loc(); |
| msg_to_slurmd(REQUEST_RECONFIGURE); |
| } |
| in_progress = false; |
| unlock_slurmctld(config_write_lock); |
| trigger_reconfig(); |
| } |
| END_TIMER2("_slurm_rpc_reconfigure_controller"); |
| |
| /* return result */ |
| if (error_code) { |
| error("_slurm_rpc_reconfigure_controller: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_reconfigure_controller: completed %s", |
| TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| slurm_sched_partition_change(); |
| schedule(); /* has its own locks */ |
| save_all_state(); |
| } |
| } |
| |
| /* _slurm_rpc_shutdown_controller - process RPC to shutdown slurmctld */ |
| static void _slurm_rpc_shutdown_controller(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS, i; |
| uint16_t core_arg = 0; |
| shutdown_msg_t *shutdown_msg = (shutdown_msg_t *) msg->data; |
| uid_t uid; |
| /* Locks: Read node */ |
| slurmctld_lock_t node_read_lock = { |
| NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK }; |
| |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error("Security violation, SHUTDOWN RPC from uid=%u", |
| (unsigned int) uid); |
| error_code = ESLURM_USER_ID_MISSING; |
| } |
| if (error_code); |
| else if (msg->msg_type == REQUEST_CONTROL) { |
| info("Performing RPC: REQUEST_CONTROL"); |
| /* resume backup mode */ |
| slurmctld_config.resume_backup = true; |
| } else { |
| info("Performing RPC: REQUEST_SHUTDOWN"); |
| core_arg = shutdown_msg->core; |
| } |
| |
| /* do RPC call */ |
| if (error_code); |
| else if (core_arg) |
| info("performing immeditate shutdown without state save"); |
| else if (slurmctld_config.shutdown_time) |
| debug2("shutdown RPC issued when already in progress"); |
| else { |
| if (msg->msg_type == REQUEST_SHUTDOWN) { |
| /* This means (msg->msg_type != REQUEST_CONTROL) */ |
| lock_slurmctld(node_read_lock); |
| msg_to_slurmd(REQUEST_SHUTDOWN); |
| unlock_slurmctld(node_read_lock); |
| } |
| if (slurmctld_config.thread_id_sig) /* signal clean-up */ |
| pthread_kill(slurmctld_config.thread_id_sig, SIGTERM); |
| else { |
| error("thread_id_sig undefined, hard shutdown"); |
| slurmctld_config.shutdown_time = time(NULL); |
| /* send REQUEST_SHUTDOWN_IMMEDIATE RPC */ |
| slurmctld_shutdown(); |
| } |
| } |
| |
| if (msg->msg_type == REQUEST_CONTROL) { |
| /* Wait for workload to dry up before sending reply. |
| * One thread should remain, this one. */ |
| for (i = 1; i < CONTROL_TIMEOUT; i++) { |
| if (slurmctld_config.server_thread_count <= 1) |
| break; |
| sleep(1); |
| } |
| if (slurmctld_config.server_thread_count > 1) |
| error("REQUEST_CONTROL reply with %d active threads", |
| slurmctld_config.server_thread_count); |
| /* save_all_state(); performed by _slurmctld_background */ |
| } |
| |
| |
| slurm_send_rc_msg(msg, error_code); |
| if ((error_code == SLURM_SUCCESS) && core_arg && |
| (slurmctld_config.thread_id_sig)) |
| pthread_kill(slurmctld_config.thread_id_sig, SIGABRT); |
| } |
| |
| /* _slurm_rpc_shutdown_controller_immediate - process RPC to shutdown |
| * slurmctld */ |
| static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| uid_t uid; |
| |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error |
| ("Security violation, SHUTDOWN_IMMEDIATE RPC from uid=%u", |
| (unsigned int) uid); |
| error_code = ESLURM_USER_ID_MISSING; |
| } |
| |
| /* do RPC call */ |
| /* No op: just used to knock loose accept RPC thread */ |
| if (error_code == SLURM_SUCCESS) |
| debug("Performing RPC: REQUEST_SHUTDOWN_IMMEDIATE"); |
| } |
| |
| /* _slurm_rpc_step_complete - process step completion RPC to note the |
| * completion of a job step on at least some nodes. |
| * If the job step is complete, it may |
| * represent the termination of an entire job */ |
| static void _slurm_rpc_step_complete(slurm_msg_t *msg) |
| { |
| int error_code = SLURM_SUCCESS, rc, rem, step_rc; |
| DEF_TIMERS; |
| step_complete_msg_t *req = (step_complete_msg_t *)msg->data; |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| bool job_requeue = false; |
| bool dump_job = false, dump_node = false; |
| |
| /* init */ |
| START_TIMER; |
| debug("Processing RPC: REQUEST_STEP_COMPLETE for %u.%u " |
| "nodes %u-%u rc=%u", |
| req->job_id, req->job_step_id, |
| req->range_first, req->range_last, req->step_rc); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| /* Don't trust RPC, it is not from slurmstepd */ |
| error("Invalid user %d attempted REQUEST_STEP_COMPLETE", |
| uid); |
| return; |
| } |
| |
| lock_slurmctld(job_write_lock); |
| rc = step_partial_comp(req, &rem, &step_rc); |
| |
| if (rc || rem) { /* some error or not totally done */ |
| if (rc) { |
| info("step_partial_comp: %s", |
| slurm_strerror(rc)); |
| } |
| unlock_slurmctld(job_write_lock); |
| slurm_send_rc_msg(msg, rc); |
| if (!rc) /* partition completion */ |
| schedule_job_save(); /* Has own locking */ |
| return; |
| } |
| |
| if (req->job_step_id == SLURM_BATCH_SCRIPT) { |
| /* FIXME: test for error, possibly cause batch job requeue */ |
| error_code = job_complete(req->job_id, uid, job_requeue, |
| step_rc); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_step_complete"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_step_complete JobId=%u: %s", |
| req->job_id, slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_step_complete JobId=%u: %s", |
| req->job_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| dump_job = true; |
| } |
| } else { |
| error_code = job_step_complete(req->job_id, req->job_step_id, |
| uid, job_requeue, step_rc); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_step_complete"); |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_step_complete 1 StepId=%u.%u %s", |
| req->job_id, req->job_step_id, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_step_complete StepId=%u.%u %s", |
| req->job_id, req->job_step_id, |
| TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| dump_job = true; |
| } |
| } |
| if (dump_job) |
| (void) schedule_job_save(); /* Has own locking */ |
| if (dump_node) |
| (void) schedule_node_save(); /* Has own locking */ |
| } |
| |
| /* _slurm_rpc_step_layout - return the step layout structure for |
| * a job step, if it currently exists |
| */ |
| static void _slurm_rpc_step_layout(slurm_msg_t *msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| slurm_msg_t response_msg; |
| DEF_TIMERS; |
| job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data; |
| slurm_step_layout_t *step_layout = NULL; |
| /* Locks: Write job, write node */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK }; |
| uid_t uid = g_slurm_auth_get_uid(msg->auth_cred); |
| struct job_record *job_ptr = NULL; |
| struct step_record *step_ptr = NULL; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_STEP_LAYOUT"); |
| |
| lock_slurmctld(job_read_lock); |
| error_code = job_alloc_info(uid, req->job_id, &job_ptr); |
| END_TIMER2("_slurm_rpc_step_layout"); |
| /* return result */ |
| if (error_code || (job_ptr == NULL)) { |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_step_layout: JobId=%u, uid=%u: %s", |
| req->job_id, uid, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| return; |
| } |
| |
| step_ptr = find_step_record(job_ptr, req->step_id); |
| if(!step_ptr) { |
| unlock_slurmctld(job_read_lock); |
| debug2("_slurm_rpc_step_layout: " |
| "JobId=%u.%u Not Found", |
| req->job_id, req->step_id); |
| slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); |
| return; |
| } |
| step_layout = slurm_step_layout_copy(step_ptr->step_layout); |
| unlock_slurmctld(job_read_lock); |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.msg_type = RESPONSE_STEP_LAYOUT; |
| response_msg.data = step_layout; |
| |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| slurm_step_layout_destroy(step_layout); |
| } |
| |
| /* _slurm_rpc_submit_batch_job - process RPC to submit a batch job */ |
| static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| uint32_t step_id = 0; |
| struct job_record *job_ptr; |
| slurm_msg_t response_msg; |
| submit_response_msg_t submit_msg; |
| job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data; |
| |
| /* Locks: Write job, read node, read partition */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_SUBMIT_BATCH_JOB"); |
| |
| slurm_msg_t_init(&response_msg); |
| /* do RPC call */ |
| dump_job_desc(job_desc_msg); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) { |
| /* NOTE: User root can submit a batch job for any other user */ |
| error_code = ESLURM_USER_ID_MISSING; |
| error("Security violation, SUBMIT_JOB from uid=%u", |
| (unsigned int) uid); |
| } |
| if (error_code == SLURM_SUCCESS) { |
| lock_slurmctld(job_write_lock); |
| if (job_desc_msg->job_id != SLURM_BATCH_SCRIPT) { |
| job_ptr = find_job_record(job_desc_msg->job_id); |
| if (job_ptr && IS_JOB_FINISHED(job_ptr)) |
| job_ptr = NULL; |
| } else |
| job_ptr = NULL; |
| |
| if (job_ptr) { /* Active job allocation */ |
| #ifdef HAVE_FRONT_END /* Limited job step support */ |
| /* Non-super users not permitted to run job steps on front-end. |
| * A single slurmd can not handle a heavy load. */ |
| if (!_is_super_user(uid)) { |
| info("Attempt to execute batch job step by uid=%u", |
| (unsigned int) uid); |
| slurm_send_rc_msg(msg, ESLURM_BATCH_ONLY); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| #endif |
| |
| if (job_ptr->user_id != uid) { |
| error("Security violation, uid=%u attempting " |
| "to execute a step within job %u owned " |
| "by user %u", |
| (unsigned int) uid, job_ptr->job_id, |
| job_ptr->user_id); |
| slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); |
| unlock_slurmctld(job_write_lock); |
| return; |
| } |
| error_code = _launch_batch_step(job_desc_msg, uid, |
| &step_id); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_submit_batch_job"); |
| |
| if (error_code != SLURM_SUCCESS) { |
| info("_launch_batch_step: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_launch_batch_step StepId=%u.%u %s", |
| job_desc_msg->job_id, step_id, |
| TIME_STR); |
| submit_msg.job_id = job_desc_msg->job_id; |
| submit_msg.step_id = step_id; |
| submit_msg.error_code = error_code; |
| response_msg.msg_type = |
| RESPONSE_SUBMIT_BATCH_JOB; |
| |
| response_msg.data = &submit_msg; |
| slurm_send_node_msg(msg->conn_fd, |
| &response_msg); |
| schedule_job_save(); |
| } |
| return; |
| } |
| |
| /* Create new job allocation */ |
| error_code = job_allocate(job_desc_msg, |
| job_desc_msg->immediate, false, |
| false, uid, &job_ptr); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_submit_batch_job"); |
| } |
| |
| /* return result */ |
| if ((error_code != SLURM_SUCCESS) |
| && (error_code != ESLURM_JOB_HELD) |
| && (error_code != ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE)) { |
| info("_slurm_rpc_submit_batch_job: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_submit_batch_job JobId=%u %s", |
| job_ptr->job_id, TIME_STR); |
| /* send job_ID */ |
| submit_msg.job_id = job_ptr->job_id; |
| submit_msg.step_id = SLURM_BATCH_SCRIPT; |
| submit_msg.error_code = error_code; |
| response_msg.msg_type = RESPONSE_SUBMIT_BATCH_JOB; |
| response_msg.data = &submit_msg; |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| schedule(); /* has own locks */ |
| schedule_job_save(); /* has own locks */ |
| schedule_node_save(); /* has own locks */ |
| } |
| } |
| |
| /* _slurm_rpc_update_job - process RPC to update the configuration of a |
| * job (e.g. priority) */ |
| static void _slurm_rpc_update_job(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code; |
| DEF_TIMERS; |
| job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data; |
| /* Locks: Write job, read node, read partition */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_UPDATE_JOB"); |
| |
| /* do RPC call */ |
| dump_job_desc(job_desc_msg); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| lock_slurmctld(job_write_lock); |
| error_code = update_job(job_desc_msg, uid); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_update_job"); |
| |
| /* return result */ |
| if (error_code) { |
| error("_slurm_rpc_update_job JobId=%u: %s", |
| job_desc_msg->job_id, slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_update_job complete JobId=%u %s", |
| job_desc_msg->job_id, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| /* Below functions provide their own locking */ |
| schedule(); |
| schedule_job_save(); |
| schedule_node_save(); |
| } |
| } |
| |
| /* |
| * slurm_drain_nodes - process a request to drain a list of nodes, |
| * no-op for nodes already drained or draining |
| * node_list IN - list of nodes to drain |
| * reason IN - reason to drain the nodes |
| * RET SLURM_SUCCESS or error code |
| * NOTE: This is utilzed by plugins and not via RPC and it sets its |
| * own locks. |
| */ |
| extern int slurm_drain_nodes(char *node_list, char *reason) |
| { |
| int error_code; |
| DEF_TIMERS; |
| /* Locks: Write node */ |
| slurmctld_lock_t node_write_lock = { |
| NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK }; |
| |
| START_TIMER; |
| lock_slurmctld(node_write_lock); |
| error_code = drain_nodes(node_list, reason); |
| unlock_slurmctld(node_write_lock); |
| END_TIMER2("slurm_drain_nodes"); |
| |
| return error_code; |
| } |
| |
| /* |
| * slurm_fail_job - terminate a job due to a launch failure |
| * no-op for jobs already terminated |
| * job_id IN - slurm job id |
| * RET SLURM_SUCCESS or error code |
| * NOTE: This is utilzed by plugins and not via RPC and it sets its |
| * own locks. |
| */ |
| extern int slurm_fail_job(uint32_t job_id) |
| { |
| int error_code; |
| DEF_TIMERS; |
| /* Locks: Write job and node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| |
| START_TIMER; |
| lock_slurmctld(job_write_lock); |
| error_code = job_fail(job_id); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("slurm_fail_job"); |
| |
| return error_code; |
| } |
| |
| /* _slurm_rpc_update_node - process RPC to update the configuration of a |
| * node (e.g. UP/DOWN) */ |
| static void _slurm_rpc_update_node(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| update_node_msg_t *update_node_msg_ptr = |
| (update_node_msg_t *) msg->data; |
| /* Locks: Write job and write node */ |
| slurmctld_lock_t node_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_UPDATE_NODE"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error("Security violation, UPDATE_NODE RPC from uid=%u", |
| (unsigned int) uid); |
| } |
| |
| if (error_code == SLURM_SUCCESS) { |
| /* do RPC call */ |
| lock_slurmctld(node_write_lock); |
| error_code = update_node(update_node_msg_ptr); |
| unlock_slurmctld(node_write_lock); |
| END_TIMER2("_slurm_rpc_update_node"); |
| } |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_update_node for %s: %s", |
| update_node_msg_ptr->node_names, |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_update_node complete for %s %s", |
| update_node_msg_ptr->node_names, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| } |
| |
| /* Below functions provide their own locks */ |
| if (schedule()) |
| schedule_job_save(); |
| schedule_node_save(); |
| trigger_reconfig(); |
| } |
| |
| /* _slurm_rpc_update_partition - process RPC to update the configuration |
| * of a partition (e.g. UP/DOWN) */ |
| static void _slurm_rpc_update_partition(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| update_part_msg_t *part_desc_ptr = (update_part_msg_t *) msg->data; |
| /* Locks: Read config, read node, write partition */ |
| slurmctld_lock_t part_write_lock = { |
| READ_LOCK, NO_LOCK, READ_LOCK, WRITE_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_UPDATE_PARTITION"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error |
| ("Security violation, UPDATE_PARTITION RPC from uid=%u", |
| (unsigned int) uid); |
| } |
| |
| if (error_code == SLURM_SUCCESS) { |
| /* do RPC call */ |
| if(part_desc_ptr->hidden == (uint16_t)INFINITE) |
| error_code = select_g_update_block(part_desc_ptr); |
| else if(part_desc_ptr->root_only == (uint16_t)INFINITE) |
| error_code = select_g_update_sub_node(part_desc_ptr); |
| else { |
| lock_slurmctld(part_write_lock); |
| error_code = update_part(part_desc_ptr); |
| unlock_slurmctld(part_write_lock); |
| } |
| END_TIMER2("_slurm_rpc_update_partition"); |
| } |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_update_partition partition=%s: %s", |
| part_desc_ptr->name, slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_update_partition complete for %s %s", |
| part_desc_ptr->name, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| |
| /* NOTE: These functions provide their own locks */ |
| schedule_part_save(); |
| if (schedule()) { |
| schedule_job_save(); |
| schedule_node_save(); |
| } |
| } |
| } |
| |
| /* _slurm_rpc_delete_partition - process RPC to delete a partition */ |
| static void _slurm_rpc_delete_partition(slurm_msg_t * msg) |
| { |
| /* init */ |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| delete_part_msg_t *part_desc_ptr = (delete_part_msg_t *) msg->data; |
| /* Locks: write job, read node, write partition */ |
| slurmctld_lock_t part_write_lock = { |
| NO_LOCK, WRITE_LOCK, READ_LOCK, WRITE_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_DELETE_PARTITION"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| if (!_is_super_user(uid)) { |
| error_code = ESLURM_USER_ID_MISSING; |
| error |
| ("Security violation, DELETE_PARTITION RPC from uid=%u", |
| (unsigned int) uid); |
| } |
| |
| if (error_code == SLURM_SUCCESS) { |
| /* do RPC call */ |
| lock_slurmctld(part_write_lock); |
| error_code = delete_partition(part_desc_ptr); |
| unlock_slurmctld(part_write_lock); |
| END_TIMER2("_slurm_rpc_delete_partition"); |
| } |
| |
| /* return result */ |
| if (error_code) { |
| info("_slurm_rpc_delete_partition partition=%s: %s", |
| part_desc_ptr->name, slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| info("_slurm_rpc_delete_partition complete for %s %s", |
| part_desc_ptr->name, TIME_STR); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| |
| /* NOTE: These functions provide their own locks */ |
| schedule(); |
| save_all_state(); |
| |
| } |
| } |
| |
| /* determine of nodes are ready for the job */ |
| static void _slurm_rpc_job_ready(slurm_msg_t * msg) |
| { |
| int error_code, result; |
| job_id_msg_t *id_msg = (job_id_msg_t *) msg->data; |
| DEF_TIMERS; |
| slurm_msg_t response_msg; |
| return_code_msg_t rc_msg; |
| |
| START_TIMER; |
| error_code = job_node_ready(id_msg->job_id, &result); |
| END_TIMER2("_slurm_rpc_job_ready"); |
| |
| if (error_code) { |
| debug2("_slurm_rpc_job_ready: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| debug2("_slurm_rpc_job_ready(%u)=%d %s", id_msg->job_id, |
| result, TIME_STR); |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_JOB_READY; |
| rc_msg.return_code = result; |
| response_msg.data = &rc_msg; |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| } |
| } |
| |
| /* get node select info plugin */ |
| static void _slurm_rpc_node_select_info(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| Buf buffer = NULL; |
| node_info_select_request_msg_t *sel_req_msg = |
| (node_info_select_request_msg_t *) msg->data; |
| slurm_msg_t response_msg; |
| DEF_TIMERS; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_NODE_SELECT_INFO"); |
| error_code = select_g_pack_node_info(sel_req_msg->last_update, |
| &buffer); |
| END_TIMER2("_slurm_rpc_node_select_info"); |
| |
| if (error_code) { |
| debug3("_slurm_rpc_node_select_info: %s", |
| slurm_strerror(error_code)); |
| slurm_send_rc_msg(msg, error_code); |
| } else { |
| /* init response_msg structure */ |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_NODE_SELECT_INFO; |
| response_msg.data = get_buf_data(buffer); |
| response_msg.data_size = get_buf_offset(buffer); |
| /* send message */ |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| |
| if (buffer) |
| free_buf(buffer); |
| } |
| } |
| |
| /* Reset the job credential key based upon configuration parameters. |
| * NOTE: READ lock_slurmctld config before entry */ |
| static void _update_cred_key(void) |
| { |
| slurm_cred_ctx_key_update(slurmctld_config.cred_ctx, |
| slurmctld_conf.job_credential_private_key); |
| } |
| |
| inline static void _slurm_rpc_suspend(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| suspend_msg_t *sus_ptr = (suspend_msg_t *) msg->data; |
| /* Locks: write job and node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| char *op; |
| |
| START_TIMER; |
| switch (sus_ptr->op) { |
| case SUSPEND_JOB: |
| op = "suspend"; |
| break; |
| case RESUME_JOB: |
| op = "resume"; |
| break; |
| default: |
| op = "unknown"; |
| } |
| info("Processing RPC: REQUEST_SUSPEND(%s)", op); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| lock_slurmctld(job_write_lock); |
| error_code = job_suspend(sus_ptr, uid, msg->conn_fd); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_suspend"); |
| |
| if (error_code) { |
| info("_slurm_rpc_suspend(%s) %u: %s", op, |
| sus_ptr->job_id, slurm_strerror(error_code)); |
| } else { |
| info("_slurm_rpc_suspend(%s) for %u %s", op, |
| sus_ptr->job_id, TIME_STR); |
| /* Functions below provide their own locking */ |
| if (sus_ptr->op == SUSPEND_JOB) |
| (void) schedule(); |
| schedule_job_save(); |
| } |
| } |
| |
| inline static void _slurm_rpc_requeue(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| job_id_msg_t *requeue_ptr = (job_id_msg_t *) msg->data; |
| /* Locks: write job and node */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| info("Processing RPC: REQUEST_REQUEUE"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| lock_slurmctld(job_write_lock); |
| error_code = job_requeue(uid, requeue_ptr->job_id, |
| msg->conn_fd); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_requeue"); |
| |
| if (error_code) { |
| info("_slurm_rpc_requeue %u: %s", requeue_ptr->job_id, |
| slurm_strerror(error_code)); |
| } else { |
| info("_slurm_rpc_requeue %u: %s", requeue_ptr->job_id, |
| TIME_STR); |
| /* Functions below provide their own locking */ |
| schedule_job_save(); |
| } |
| } |
| |
| /* Assorted checkpoint operations */ |
| inline static void _slurm_rpc_checkpoint(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| checkpoint_msg_t *ckpt_ptr = (checkpoint_msg_t *) msg->data; |
| /* Locks: write job */ |
| slurmctld_lock_t job_write_lock = { |
| NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK }; |
| uid_t uid; |
| char *op; |
| |
| START_TIMER; |
| switch (ckpt_ptr->op) { |
| case CHECK_ABLE: |
| op = "able"; |
| break; |
| case CHECK_CREATE: |
| op = "create"; |
| break; |
| case CHECK_DISABLE: |
| op = "disable"; |
| break; |
| case CHECK_ENABLE: |
| op = "enable"; |
| break; |
| case CHECK_ERROR: |
| op = "error"; |
| break; |
| case CHECK_RESTART: |
| op = "restart"; |
| break; |
| case CHECK_VACATE: |
| op = "vacate"; |
| break; |
| default: |
| op = "unknown"; |
| } |
| debug2("Processing RPC: REQUEST_CHECKPOINT %s", op); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| /* do RPC call and send reply */ |
| lock_slurmctld(job_write_lock); |
| error_code = job_step_checkpoint(ckpt_ptr, uid, msg->conn_fd); |
| unlock_slurmctld(job_write_lock); |
| END_TIMER2("_slurm_rpc_checkpoint"); |
| |
| if (error_code) { |
| if (ckpt_ptr->step_id == SLURM_BATCH_SCRIPT) |
| info("_slurm_rpc_checkpoint %s %u: %s", op, |
| ckpt_ptr->job_id, slurm_strerror(error_code)); |
| else |
| info("_slurm_rpc_checkpoint %s %u.%u: %s", op, |
| ckpt_ptr->job_id, ckpt_ptr->step_id, |
| slurm_strerror(error_code)); |
| } else { |
| if (ckpt_ptr->step_id == SLURM_BATCH_SCRIPT) |
| info("_slurm_rpc_checkpoint %s for %u %s", op, |
| ckpt_ptr->job_id, TIME_STR); |
| else |
| info("_slurm_rpc_checkpoint %s for %u.%u %s", op, |
| ckpt_ptr->job_id, ckpt_ptr->step_id, TIME_STR); |
| |
| if ((ckpt_ptr->op != CHECK_ABLE) |
| && (ckpt_ptr->op != CHECK_ERROR)) { |
| /* job state changed, save it */ |
| /* NOTE: This function provides it own locks */ |
| schedule_job_save(); |
| } |
| } |
| } |
| |
| inline static void _slurm_rpc_checkpoint_comp(slurm_msg_t * msg) |
| { |
| int error_code = SLURM_SUCCESS; |
| DEF_TIMERS; |
| checkpoint_comp_msg_t *ckpt_ptr = (checkpoint_comp_msg_t *) msg->data; |
| /* Locks: read job */ |
| slurmctld_lock_t job_read_lock = { |
| NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; |
| uid_t uid; |
| |
| START_TIMER; |
| debug2("Processing RPC: REQUEST_CHECKPOINT_COMP"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| /* do RPC call and send reply */ |
| lock_slurmctld(job_read_lock); |
| error_code = job_step_checkpoint_comp(ckpt_ptr, uid, msg->conn_fd); |
| unlock_slurmctld(job_read_lock); |
| END_TIMER2("_slurm_rpc_checkpoint_comp"); |
| |
| if (error_code) { |
| info("_slurm_rpc_checkpoint_comp %u.%u: %s", |
| ckpt_ptr->job_id, ckpt_ptr->step_id, |
| slurm_strerror(error_code)); |
| } else { |
| info("_slurm_rpc_checkpoint_comp %u.%u %s", |
| ckpt_ptr->job_id, ckpt_ptr->step_id, TIME_STR); |
| } |
| } |
| |
| static char ** |
| _xduparray(uint16_t size, char ** array) |
| { |
| int i; |
| char ** result; |
| |
| if (size == 0) |
| return (char **)NULL; |
| |
| result = (char **) xmalloc(sizeof(char *) * size); |
| for (i=0; i<size; i++) |
| result[i] = xstrdup(array[i]); |
| |
| return result; |
| } |
| |
| /* Like _xduparray(), but performs two xmalloc(). The output format of this |
| * must be identical to _read_data_array_from_file() */ |
| static char ** |
| _xduparray2(uint16_t size, char ** array) |
| { |
| int i, len = 0; |
| char *ptr, ** result; |
| |
| if (size == 0) |
| return (char **) NULL; |
| |
| for (i=0; i<size; i++) |
| len += (strlen(array[i]) + 1); |
| ptr = xmalloc(len); |
| result = (char **) xmalloc(sizeof(char *) * size); |
| |
| for (i=0; i<size; i++) { |
| result[i] = ptr; |
| len = strlen(array[i]); |
| strcpy(ptr, array[i]); |
| ptr += (len + 1); |
| } |
| |
| return result; |
| } |
| |
| |
| |
| int _max_nprocs(struct job_record *job_ptr) |
| { |
| int i, num, nprocs = 0; |
| if (!job_ptr) return 0; |
| num = job_ptr->num_cpu_groups; |
| for (i = 0; i < num; i++) { |
| nprocs += job_ptr->cpu_count_reps[i]*job_ptr->cpus_per_node[i]; |
| } |
| return nprocs; |
| } |
| |
| /* _launch_batch_step |
| * IN: job_desc_msg from _slurm_rpc_submit_batch_job() but with jobid set |
| * which means it's trying to launch within a pre-existing allocation. |
| * IN: uid launching this batch job, which has already been validated. |
| * OUT: SLURM error code if launch fails, or SLURM_SUCCESS |
| */ |
| int _launch_batch_step(job_desc_msg_t *job_desc_msg, uid_t uid, |
| uint32_t *step_id) |
| { |
| struct job_record *job_ptr; |
| time_t now = time(NULL); |
| int error_code = SLURM_SUCCESS; |
| |
| batch_job_launch_msg_t *launch_msg_ptr; |
| agent_arg_t *agent_arg_ptr; |
| struct node_record *node_ptr; |
| |
| /* |
| * Create a job step. Note that a credential is not necessary, |
| * since the slurmctld will be submitting this job directly to |
| * the slurmd. |
| */ |
| job_step_create_request_msg_t req_step_msg; |
| struct step_record *step_rec; |
| |
| /* |
| * As far as the step record in slurmctld goes, we are just |
| * launching a batch script which will be run on a single |
| * processor on a single node. The actual launch request sent |
| * to the slurmd should contain the proper allocation values |
| * for subsequent srun jobs within the batch script. |
| */ |
| req_step_msg.job_id = job_desc_msg->job_id; |
| req_step_msg.user_id = uid; |
| req_step_msg.node_count = 1; |
| req_step_msg.cpu_count = 1; |
| req_step_msg.num_tasks = 1; |
| req_step_msg.relative = 0; |
| req_step_msg.task_dist = SLURM_DIST_CYCLIC; |
| req_step_msg.port = 0; |
| req_step_msg.host = NULL; |
| req_step_msg.name = job_desc_msg->name; |
| req_step_msg.network = NULL; |
| req_step_msg.node_list = NULL; |
| |
| error_code = step_create(&req_step_msg, &step_rec, false, true); |
| xfree(req_step_msg.node_list); /* may be set by step_create */ |
| |
| if (error_code != SLURM_SUCCESS) |
| return error_code; |
| |
| /* |
| * TODO: check all instances of step_record to ensure there's no |
| * problem with a null switch_job_info pointer. |
| */ |
| |
| /* Get the allocation in order to construct the batch job |
| * launch request for the slurmd. |
| */ |
| |
| job_ptr = step_rec->job_ptr; |
| |
| /* TODO: need to address batch job step request options such as |
| * the ability to run a batch job on a subset of the nodes in the |
| * current allocation. |
| * TODO: validate the specific batch job request vs. the |
| * existing allocation. Note that subsequent srun steps within |
| * the batch script will work within the full allocation, but |
| * the batch step options can still provide default settings via |
| * environment variables |
| * |
| * NOTE: for now we are *ignoring* most of the job_desc_msg |
| * allocation-related settings. At some point we |
| * should perform better error-checking, otherwise |
| * the submitter will make some invalid assumptions |
| * about how this job actually ran. |
| */ |
| job_ptr->time_last_active = now; |
| |
| |
| /* Launch the batch job */ |
| node_ptr = find_first_node_record(job_ptr->node_bitmap); |
| if (node_ptr == NULL) { |
| delete_step_record(job_ptr, step_rec->step_id); |
| return ESLURM_INVALID_JOB_ID; |
| } |
| |
| /* Initialization of data structures */ |
| launch_msg_ptr = (batch_job_launch_msg_t *) |
| xmalloc(sizeof(batch_job_launch_msg_t)); |
| launch_msg_ptr->job_id = job_ptr->job_id; |
| launch_msg_ptr->step_id = step_rec->step_id; |
| launch_msg_ptr->gid = job_ptr->group_id; |
| launch_msg_ptr->uid = uid; |
| launch_msg_ptr->nodes = xstrdup(job_ptr->nodes); |
| |
| if (make_batch_job_cred(launch_msg_ptr)) { |
| error("aborting batch step %u.%u", job_ptr->job_id, |
| job_ptr->group_id); |
| xfree(launch_msg_ptr->nodes); |
| xfree(launch_msg_ptr); |
| delete_step_record(job_ptr, step_rec->step_id); |
| return SLURM_ERROR; |
| } |
| |
| launch_msg_ptr->err = xstrdup(job_desc_msg->err); |
| launch_msg_ptr->in = xstrdup(job_desc_msg->in); |
| launch_msg_ptr->out = xstrdup(job_desc_msg->out); |
| launch_msg_ptr->work_dir = xstrdup(job_desc_msg->work_dir); |
| launch_msg_ptr->argc = job_desc_msg->argc; |
| launch_msg_ptr->argv = _xduparray(job_desc_msg->argc, |
| job_desc_msg->argv); |
| launch_msg_ptr->script = xstrdup(job_desc_msg->script); |
| launch_msg_ptr->environment = _xduparray2(job_desc_msg->env_size, |
| job_desc_msg->environment); |
| launch_msg_ptr->envc = job_desc_msg->env_size; |
| |
| /* _max_nprocs() represents the total number of CPUs available |
| * for this step (overcommit not supported yet). If job_desc_msg |
| * contains a reasonable num_procs request, use that value; |
| * otherwise default to the allocation processor request. |
| */ |
| launch_msg_ptr->nprocs = _max_nprocs(job_ptr); |
| if (job_desc_msg->num_procs > 0 && |
| job_desc_msg->num_procs < launch_msg_ptr->nprocs) |
| launch_msg_ptr->nprocs = job_desc_msg->num_procs; |
| if (launch_msg_ptr->nprocs < 0) |
| launch_msg_ptr->nprocs = job_ptr->num_procs; |
| |
| launch_msg_ptr->num_cpu_groups = job_ptr->num_cpu_groups; |
| launch_msg_ptr->cpus_per_node = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(launch_msg_ptr->cpus_per_node, job_ptr->cpus_per_node, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| launch_msg_ptr->cpu_count_reps = xmalloc(sizeof(uint32_t) * |
| job_ptr->num_cpu_groups); |
| memcpy(launch_msg_ptr->cpu_count_reps, job_ptr->cpu_count_reps, |
| (sizeof(uint32_t) * job_ptr->num_cpu_groups)); |
| |
| /* FIXME: for some reason these CPU arrays total all the CPUs |
| * actually allocated, rather than totaling up to the requested |
| * CPU count for the allocation. |
| * This means that SLURM_TASKS_PER_NODE will not match with |
| * SLURM_NPROCS in the batch script environment. |
| */ |
| |
| agent_arg_ptr = (agent_arg_t *) xmalloc(sizeof(agent_arg_t)); |
| agent_arg_ptr->node_count = 1; |
| agent_arg_ptr->retry = 0; |
| agent_arg_ptr->hostlist = hostlist_create(node_ptr->name); |
| agent_arg_ptr->msg_type = REQUEST_BATCH_JOB_LAUNCH; |
| agent_arg_ptr->msg_args = (void *) launch_msg_ptr; |
| |
| /* Launch the RPC via agent */ |
| agent_queue_request(agent_arg_ptr); |
| |
| *step_id = step_rec->step_id; |
| return SLURM_SUCCESS; |
| } |
| |
| inline static void _slurm_rpc_trigger_clear(slurm_msg_t * msg) |
| { |
| int rc; |
| uid_t uid; |
| trigger_info_msg_t * trigger_ptr = (trigger_info_msg_t *) msg->data; |
| DEF_TIMERS; |
| |
| START_TIMER; |
| debug("Processing RPC: REQUEST_TRIGGER_CLEAR"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| rc = trigger_clear(uid, trigger_ptr); |
| END_TIMER2("_slurm_rpc_trigger_clear"); |
| |
| slurm_send_rc_msg(msg, rc); |
| } |
| |
| inline static void _slurm_rpc_trigger_get(slurm_msg_t * msg) |
| { |
| uid_t uid; |
| trigger_info_msg_t *resp_data; |
| trigger_info_msg_t * trigger_ptr = (trigger_info_msg_t *) msg->data; |
| slurm_msg_t response_msg; |
| DEF_TIMERS; |
| |
| START_TIMER; |
| debug("Processing RPC: REQUEST_TRIGGER_GET"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| |
| resp_data = trigger_get(uid, trigger_ptr); |
| END_TIMER2("_slurm_rpc_trigger_get"); |
| |
| slurm_msg_t_init(&response_msg); |
| response_msg.address = msg->address; |
| response_msg.msg_type = RESPONSE_TRIGGER_GET; |
| response_msg.data = resp_data; |
| slurm_send_node_msg(msg->conn_fd, &response_msg); |
| slurm_free_trigger_msg(resp_data); |
| } |
| |
| inline static void _slurm_rpc_trigger_set(slurm_msg_t * msg) |
| { |
| int rc; |
| uid_t uid; |
| gid_t gid; |
| trigger_info_msg_t * trigger_ptr = (trigger_info_msg_t *) msg->data; |
| DEF_TIMERS; |
| |
| START_TIMER; |
| debug("Processing RPC: REQUEST_TRIGGER_SET"); |
| uid = g_slurm_auth_get_uid(msg->auth_cred); |
| gid = g_slurm_auth_get_gid(msg->auth_cred); |
| |
| rc = trigger_set(uid, gid, trigger_ptr); |
| END_TIMER2("_slurm_rpc_trigger_set"); |
| |
| slurm_send_rc_msg(msg, rc); |
| } |