| /*****************************************************************************\ |
| * src/srun/allocate.c - srun functions for managing node allocations |
| * $Id$ |
| ***************************************************************************** |
| * Copyright (C) 2002 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Mark Grondona <mgrondona@llnl.gov>. |
| * UCRL-CODE-217948. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://www.llnl.gov/linux/slurm/>. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #if HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <sys/poll.h> |
| #include <sys/types.h> |
| #include <pwd.h> |
| |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/slurm_auth.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xsignal.h" |
| #include "src/common/xstring.h" |
| #include "src/common/forward.h" |
| #include "src/common/env.h" |
| |
| #include "src/srun/allocate.h" |
| #include "src/srun/msg.h" |
| #include "src/srun/opt.h" |
| #include "src/srun/attach.h" |
| |
| #define MAX_ALLOC_WAIT 60 /* seconds */ |
| #define MIN_ALLOC_WAIT 5 /* seconds */ |
| #define MAX_RETRIES 10 |
| |
| extern char **environ; |
| |
| /* |
| * Static Prototypes |
| */ |
| static int _accept_msg_connection(slurm_fd slurmctld_fd, |
| resource_allocation_response_msg_t **resp); |
| static int _handle_msg(slurm_msg_t *msg, \ |
| resource_allocation_response_msg_t **resp); |
| static int _wait_for_alloc_rpc(int sleep_time, |
| resource_allocation_response_msg_t **resp); |
| static void _wait_for_resources(resource_allocation_response_msg_t **resp); |
| static bool _retry(); |
| static void _intr_handler(int signo); |
| |
| static job_step_create_request_msg_t * _step_req_create(srun_job_t *j); |
| |
| static sig_atomic_t destroy_job = 0; |
| static srun_job_t *allocate_job = NULL; |
| |
| int |
| allocate_test(void) |
| { |
| int rc; |
| job_desc_msg_t *j = job_desc_msg_create_from_opts (NULL); |
| if(!j) |
| return SLURM_ERROR; |
| |
| rc = slurm_job_will_run(j); |
| job_desc_msg_destroy(j); |
| return rc; |
| } |
| |
| resource_allocation_response_msg_t * |
| allocate_nodes(void) |
| { |
| int rc = 0; |
| static int sigarray[] = { SIGQUIT, SIGINT, SIGTERM, 0 }; |
| SigFunc *oquitf, *ointf, *otermf; |
| sigset_t oset; |
| resource_allocation_response_msg_t *resp = NULL; |
| job_desc_msg_t *j = job_desc_msg_create_from_opts (NULL); |
| |
| if(!j) |
| return NULL; |
| |
| oquitf = xsignal(SIGQUIT, _intr_handler); |
| ointf = xsignal(SIGINT, _intr_handler); |
| otermf = xsignal(SIGTERM, _intr_handler); |
| |
| xsignal_save_mask(&oset); |
| xsignal_unblock(sigarray); |
| |
| /* Do not re-use existing job id when submitting new job |
| * from within a running job */ |
| if ((j->job_id != NO_VAL) && !opt.jobid_set) { |
| info("WARNING: Creating SLURM job allocation from within " |
| "another allocation"); |
| info("WARNING: You are attempting to initiate a second job"); |
| j->job_id = NO_VAL; |
| } |
| |
| while ((rc = slurm_allocate_resources(j, &resp) < 0) && _retry()) { |
| if (destroy_job) |
| goto done; |
| } |
| |
| if(!resp) |
| goto done; |
| |
| if ((rc == 0) && (resp->node_list == NULL)) { |
| if (resp->error_code) |
| verbose("Warning: %s", slurm_strerror(resp->error_code)); |
| _wait_for_resources(&resp); |
| } |
| |
| done: |
| xsignal_set_mask(&oset); |
| xsignal(SIGINT, ointf); |
| xsignal(SIGTERM, otermf); |
| xsignal(SIGQUIT, oquitf); |
| |
| job_desc_msg_destroy(j); |
| |
| return resp; |
| } |
| |
| /* |
| * Returns jobid if SLURM_JOBID was set in the user's environment |
| * or if --jobid option was given, else returns 0 |
| */ |
| uint32_t |
| jobid_from_env(void) |
| { |
| if (opt.jobid != NO_VAL) |
| return ((uint32_t) opt.jobid); |
| else |
| return (0); |
| } |
| |
| resource_allocation_response_msg_t * |
| existing_allocation(void) |
| { |
| old_job_alloc_msg_t job; |
| resource_allocation_response_msg_t *resp = NULL; |
| |
| if ((job.job_id = jobid_from_env()) == 0) |
| return NULL; |
| |
| if (slurm_confirm_allocation(&job, &resp) < 0) { |
| if (opt.parallel_debug || opt.jobid_set) |
| return NULL; /* create new allocation as needed */ |
| if (errno == ESLURM_ALREADY_DONE) |
| error ("SLURM job %u has expired.", job.job_id); |
| else |
| error ("Unable to confirm allocation for job %u: %m", |
| job.job_id); |
| info ("Check SLURM_JOBID environment variable " |
| "for expired or invalid job."); |
| exit(1); |
| } |
| |
| return resp; |
| } |
| |
| |
| static void |
| _wait_for_resources(resource_allocation_response_msg_t **resp) |
| { |
| old_job_alloc_msg_t old; |
| resource_allocation_response_msg_t *r = *resp; |
| int sleep_time = MIN_ALLOC_WAIT; |
| |
| if (!opt.quiet) |
| info ("job %u queued and waiting for resources", r->job_id); |
| |
| old.job_id = r->job_id; |
| slurm_free_resource_allocation_response_msg(r); |
| |
| /* Keep polling until the job is allocated resources */ |
| while (_wait_for_alloc_rpc(sleep_time, resp) <= 0) { |
| |
| if (slurm_confirm_allocation(&old, resp) >= 0) |
| break; |
| |
| if (slurm_get_errno() == ESLURM_JOB_PENDING) |
| debug3 ("Still waiting for allocation"); |
| else |
| fatal ("Unable to confirm allocation for job %u: %m", |
| old.job_id); |
| |
| if (destroy_job) { |
| verbose("cancelling job %u", old.job_id); |
| slurm_complete_job(old.job_id, 0); |
| debugger_launch_failure(allocate_job); |
| exit(0); |
| } |
| |
| if (sleep_time < MAX_ALLOC_WAIT) |
| sleep_time++; |
| } |
| if (!opt.quiet) |
| info ("job %u has been allocated resources", (*resp)->job_id); |
| } |
| |
| /* Wait up to sleep_time for RPC from slurmctld indicating resource allocation |
| * has occured. |
| * IN sleep_time: delay in seconds |
| * OUT resp: resource allocation response message |
| * RET 1 if resp is filled in, 0 otherwise */ |
| static int |
| _wait_for_alloc_rpc(int sleep_time, resource_allocation_response_msg_t **resp) |
| { |
| struct pollfd fds[1]; |
| slurm_fd slurmctld_fd; |
| |
| if ((slurmctld_fd = slurmctld_msg_init()) < 0) { |
| sleep (sleep_time); |
| return (0); |
| } |
| |
| fds[0].fd = slurmctld_fd; |
| fds[0].events = POLLIN; |
| |
| while (poll (fds, 1, (sleep_time * 1000)) < 0) { |
| switch (errno) { |
| case EAGAIN: |
| case EINTR: |
| return (-1); |
| case ENOMEM: |
| case EINVAL: |
| case EFAULT: |
| fatal("poll: %m"); |
| default: |
| error("poll: %m. Continuing..."); |
| } |
| } |
| |
| if (fds[0].revents & POLLIN) |
| return (_accept_msg_connection(slurmctld_fd, resp)); |
| |
| return (0); |
| } |
| |
| /* Accept RPC from slurmctld and process it. |
| * IN slurmctld_fd: file descriptor for slurmctld communications |
| * OUT resp: resource allocation response message |
| * RET 1 if resp is filled in, 0 otherwise */ |
| static int |
| _accept_msg_connection(slurm_fd slurmctld_fd, |
| resource_allocation_response_msg_t **resp) |
| { |
| slurm_fd fd; |
| slurm_msg_t *msg = NULL; |
| slurm_addr cli_addr; |
| char host[256]; |
| uint16_t port; |
| int rc = 0; |
| List ret_list; |
| |
| fd = slurm_accept_msg_conn(slurmctld_fd, &cli_addr); |
| if (fd < 0) { |
| error("Unable to accept connection: %m"); |
| return rc; |
| } |
| |
| slurm_get_addr(&cli_addr, &port, host, sizeof(host)); |
| debug2("got message connection from %s:%d", host, port); |
| |
| msg = xmalloc(sizeof(slurm_msg_t)); |
| forward_init(&msg->forward, NULL); |
| msg->ret_list = NULL; |
| msg->conn_fd = fd; |
| msg->forward_struct_init = 0; |
| |
| again: |
| ret_list = slurm_receive_msg(fd, msg, 0); |
| |
| if(!ret_list || errno != SLURM_SUCCESS) { |
| if (errno == EINTR) { |
| goto again; |
| } |
| if(ret_list) |
| list_destroy(ret_list); |
| |
| error("_accept_msg_connection[%s]: %m", host); |
| slurm_free_msg(msg); |
| return SLURM_ERROR; |
| } |
| if(list_count(ret_list)>0) { |
| error("_accept_msg_connection: " |
| "got %d from receive, expecting 0", |
| list_count(ret_list)); |
| } |
| msg->ret_list = ret_list; |
| |
| |
| rc = _handle_msg(msg, resp); /* handle_msg frees msg */ |
| slurm_free_msg(msg); |
| |
| slurm_close_accepted_conn(fd); |
| return rc; |
| } |
| |
| /* process RPC from slurmctld |
| * IN msg: message recieved |
| * OUT resp: resource allocation response message |
| * RET 1 if resp is filled in, 0 otherwise */ |
| static int |
| _handle_msg(slurm_msg_t *msg, resource_allocation_response_msg_t **resp) |
| { |
| uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred); |
| uid_t uid = getuid(); |
| uid_t slurm_uid = (uid_t) slurm_get_slurm_user_id(); |
| int rc = 0; |
| srun_timeout_msg_t *to; |
| |
| if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) { |
| error ("Security violation, slurm message from uid %u", |
| (unsigned int) req_uid); |
| return 0; |
| } |
| |
| switch (msg->msg_type) { |
| case SRUN_PING: |
| debug3("slurmctld ping received"); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| slurm_free_srun_ping_msg(msg->data); |
| break; |
| case RESPONSE_RESOURCE_ALLOCATION: |
| debug2("resource allocation response received"); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| *resp = msg->data; |
| rc = 1; |
| break; |
| case SRUN_TIMEOUT: |
| debug2("timeout received"); |
| to = msg->data; |
| timeout_handler(to->timeout); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| slurm_free_srun_timeout_msg(msg->data); |
| break; |
| default: |
| error("received spurious message type: %d\n", |
| msg->msg_type); |
| } |
| return rc; |
| } |
| |
| static bool |
| _retry() |
| { |
| static int retries = 0; |
| static char *msg = "Slurm controller not responding, " |
| "sleeping and retrying."; |
| |
| if (errno == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) { |
| if (retries == 0) |
| error (msg); |
| else if (retries < MAX_RETRIES) |
| debug (msg); |
| else |
| return false; |
| sleep (++retries); |
| } else { |
| error("Unable to allocate resources: %m"); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /* |
| * SIGINT handler while waiting for resources to become available. |
| */ |
| static void |
| _intr_handler(int signo) |
| { |
| destroy_job = 1; |
| } |
| |
| |
| /* |
| * Create job description structure based off srun options |
| * (see opt.h) |
| */ |
| job_desc_msg_t * |
| job_desc_msg_create_from_opts (char *script) |
| { |
| job_desc_msg_t *j = xmalloc(sizeof(*j)); |
| |
| slurm_init_job_desc_msg(j); |
| |
| j->contiguous = opt.contiguous; |
| j->features = opt.constraints; |
| j->immediate = opt.immediate; |
| j->name = opt.job_name; |
| j->req_nodes = opt.nodelist; |
| |
| if(opt.distribution == SLURM_DIST_ARBITRARY |
| && !j->req_nodes) { |
| error("With Arbitrary distribution you need to " |
| "specify a nodelist or hostfile with the -w option"); |
| return NULL; |
| } |
| j->exc_nodes = opt.exc_nodes; |
| j->partition = opt.partition; |
| j->min_nodes = opt.min_nodes; |
| j->user_id = opt.uid; |
| j->dependency = opt.dependency; |
| if (opt.nice) |
| j->nice = NICE_OFFSET + opt.nice; |
| j->exclusive = opt.exclusive; |
| j->group_id = opt.gid; |
| j->mail_type = opt.mail_type; |
| if (opt.mail_user) |
| j->mail_user = xstrdup(opt.mail_user); |
| if (opt.begin) |
| j->begin_time = opt.begin; |
| if (opt.network) |
| j->network = xstrdup(opt.network); |
| if (opt.account) |
| j->account = xstrdup(opt.account); |
| |
| if (opt.hold) |
| j->priority = 0; |
| if (opt.jobid != NO_VAL) |
| j->job_id = opt.jobid; |
| #if SYSTEM_DIMENSIONS |
| if (opt.geometry[0] > 0) { |
| int i; |
| for (i=0; i<SYSTEM_DIMENSIONS; i++) |
| j->geometry[i] = opt.geometry[i]; |
| } |
| #endif |
| |
| if (opt.conn_type != -1) |
| j->conn_type = opt.conn_type; |
| |
| if (opt.no_rotate) |
| j->rotate = 0; |
| |
| if (opt.max_nodes) |
| j->max_nodes = opt.max_nodes; |
| if (opt.mincpus > -1) |
| j->min_procs = opt.mincpus; |
| if (opt.realmem > -1) |
| j->min_memory = opt.realmem; |
| if (opt.tmpdisk > -1) |
| j->min_tmp_disk = opt.tmpdisk; |
| |
| if (opt.overcommit) |
| j->num_procs = opt.min_nodes; |
| else |
| j->num_procs = opt.nprocs * opt.cpus_per_task; |
| |
| if (opt.cpus_set) |
| j->cpus_per_task = opt.cpus_per_task; |
| |
| if (opt.no_kill) |
| j->kill_on_node_fail = 0; |
| if (opt.time_limit > -1) |
| j->time_limit = opt.time_limit; |
| if (opt.share) |
| j->shared = 1; |
| |
| j->port = slurmctld_comm_addr.port; |
| if (slurmctld_comm_addr.hostname) |
| j->host = xstrdup(slurmctld_comm_addr.hostname); |
| else |
| j->host = NULL; |
| |
| if (script) { |
| char *buf = NULL; |
| /* |
| * If script is set then we are building a request for |
| * a batch job |
| */ |
| xassert (opt.batch); |
| |
| if (opt.batch) |
| putenv("ENVIRONMENT=BATCH"); |
| if (opt.overcommit) |
| putenv("SLURM_OVERCOMMIT=1"); |
| if (opt.nprocs_set) { |
| xstrfmtcat(buf, "SLURM_NPROCS=%d", opt.nprocs); |
| putenv(buf); |
| } |
| |
| j->environment = NULL; |
| if (opt.get_user_env) { |
| struct passwd *pw = NULL; |
| pw = getpwuid(opt.uid); |
| if (pw != NULL) { |
| j->environment = |
| env_array_user_default(pw->pw_name); |
| /* FIXME - should we abort if j->environment |
| is NULL? */ |
| } |
| } |
| env_array_merge(&j->environment, (const char **)environ); |
| j->env_size = envcount (j->environment); |
| j->script = script; |
| j->argv = remote_argv; |
| j->argc = remote_argc; |
| j->err = opt.efname; |
| j->in = opt.ifname; |
| j->out = opt.ofname; |
| j->work_dir = opt.cwd; |
| } |
| |
| return (j); |
| } |
| |
| void |
| job_desc_msg_destroy(job_desc_msg_t *j) |
| { |
| if (j) { |
| xfree(j->account); |
| xfree(j->host); |
| xfree(j); |
| } |
| } |
| |
| static job_step_create_request_msg_t * |
| _step_req_create(srun_job_t *j) |
| { |
| job_step_create_request_msg_t *r = xmalloc(sizeof(*r)); |
| r->job_id = j->jobid; |
| r->user_id = opt.uid; |
| |
| /* get the correct number of hosts to run tasks on */ |
| r->node_count = j->nhosts; |
| /* info("send %d or %d? sending %d", opt.max_nodes, */ |
| /* j->nhosts, r->node_count); */ |
| if(r->node_count > j->nhosts) { |
| error("Asking for more nodes that allocated"); |
| return NULL; |
| } |
| r->cpu_count = opt.overcommit ? r->node_count |
| : (opt.nprocs*opt.cpus_per_task); |
| /* info("%d, %d %d*%d = %d = %d", opt.overcommit, */ |
| /* r->node_count, opt.nprocs, opt.cpus_per_task, */ |
| /* (opt.nprocs*opt.cpus_per_task), r->cpu_count); */ |
| r->num_tasks = opt.nprocs; |
| r->node_list = xstrdup(opt.nodelist); |
| r->network = xstrdup(opt.network); |
| r->name = xstrdup(opt.job_name); |
| if(opt.relative) |
| /* works now, better fix in 1.2 */ |
| r->relative = atoi(opt.relative); |
| else |
| r->relative = (uint16_t)NO_VAL; |
| |
| switch (opt.distribution) { |
| case SLURM_DIST_CYCLIC: |
| r->task_dist = SLURM_DIST_CYCLIC; |
| break; |
| case SLURM_DIST_BLOCK: |
| r->task_dist = SLURM_DIST_BLOCK; |
| break; |
| case SLURM_DIST_ARBITRARY: |
| r->task_dist = SLURM_DIST_ARBITRARY; |
| break; |
| case SLURM_DIST_UNKNOWN: |
| default: |
| r->task_dist = (opt.nprocs <= r->node_count) |
| ? SLURM_DIST_CYCLIC : SLURM_DIST_BLOCK; |
| break; |
| } |
| opt.distribution = r->task_dist; |
| |
| if (slurmctld_comm_addr.port) { |
| r->host = xstrdup(slurmctld_comm_addr.hostname); |
| r->port = slurmctld_comm_addr.port; |
| } |
| |
| return(r); |
| } |
| |
| int |
| create_job_step(srun_job_t *job, |
| resource_allocation_response_msg_t *alloc_resp) |
| { |
| job_step_create_request_msg_t *req = NULL; |
| job_step_create_response_msg_t *resp = NULL; |
| |
| if (!(req = _step_req_create(job))) { |
| error ("Unable to allocate step request message"); |
| return -1; |
| } |
| if ((slurm_job_step_create(req, &resp) < 0) || (resp == NULL)) { |
| error ("Unable to create job step: %m"); |
| return -1; |
| } |
| |
| job->stepid = resp->job_step_id; |
| job->cred = resp->cred; |
| job->switch_job = resp->switch_job; |
| job->step_layout = step_layout_create(alloc_resp, resp, req); |
| |
| /* Number of hosts in job may not have been initialized yet if |
| * --jobid was used or only SLURM_JOBID was set in user env. |
| * Reset the value here just in case. |
| */ |
| job->nhosts = job->step_layout->num_hosts; |
| |
| if(!job->step_layout) { |
| error("step_layout not created correctly"); |
| return -1; |
| } |
| if(task_layout(job->step_layout) != SLURM_SUCCESS) { |
| error("problem with task layout"); |
| return -1; |
| } |
| |
| /* |
| * Recreate filenames which may depend upon step id |
| */ |
| job_update_io_fnames(job); |
| |
| slurm_free_job_step_create_request_msg(req); |
| |
| return 0; |
| } |
| |
| void |
| set_allocate_job(srun_job_t *job) |
| { |
| allocate_job = job; |
| return; |
| } |