| /*****************************************************************************\ |
| * allocate.c - allocate nodes for a job or step with supplied constraints |
| ***************************************************************************** |
| * Copyright (C) 2002-2007 The Regents of the University of California. |
| * Copyright (C) 2008-2009 Lawrence Livermore National Security. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Morris Jette <jette1@llnl.gov>. |
| * CODE-OCEC-09-009. All rights reserved. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include <ctype.h> |
| #include <errno.h> |
| #include <netinet/in.h> /* for ntohs() */ |
| #include <poll.h> |
| #include <stdbool.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <sys/types.h> |
| #include <time.h> |
| #include <unistd.h> |
| |
| #ifndef __USE_XOPEN_EXTENDED |
| extern pid_t getsid(pid_t pid); /* missing from <unistd.h> */ |
| #endif |
| |
| #include "slurm/slurm.h" |
| #include "src/common/fd.h" |
| #include "src/common/forward.h" |
| #include "src/common/hostlist.h" |
| #include "src/common/parse_time.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/slurm_protocol_defs.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/interfaces/auth.h" |
| #include "src/interfaces/certgen.h" |
| #include "src/interfaces/certmgr.h" |
| #include "src/interfaces/conn.h" |
| |
| #define BUFFER_SIZE 1024 |
| #define MAX_ALLOC_WAIT 60 /* seconds */ |
| #define MIN_ALLOC_WAIT 5 /* seconds */ |
| |
| typedef struct { |
| slurm_addr_t address; |
| int fd; |
| char *hostname; |
| uint16_t port; |
| } listen_t; |
| |
| typedef struct { |
| slurmdb_cluster_rec_t *cluster; |
| job_desc_msg_t *req; |
| int *will_run_rc; |
| list_t *resp_msg_list; |
| } load_willrun_req_struct_t; |
| |
| static int _handle_rc_msg(slurm_msg_t *msg); |
| static listen_t *_create_allocation_response_socket(void); |
| static void _destroy_allocation_response_socket(listen_t *listen); |
| static void _wait_for_allocation_response(uint32_t job_id, |
| const listen_t *listen, |
| uint16_t msg_type, int timeout, |
| void **resp); |
| static int _job_will_run_cluster(job_desc_msg_t *req, |
| will_run_response_msg_t **will_run_resp, |
| slurmdb_cluster_rec_t *cluster); |
| |
| /* |
| * slurm_allocate_resources - allocate resources for a job request |
| * IN job_desc_msg - description of resource allocation request |
| * OUT slurm_alloc_msg - response to request |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| * NOTE: free the response using slurm_free_resource_allocation_response_msg() |
| */ |
| int |
| slurm_allocate_resources (job_desc_msg_t *req, |
| resource_allocation_response_msg_t **resp) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| /* |
| * set Node and session id for this request |
| */ |
| if (req->alloc_sid == NO_VAL) |
| req->alloc_sid = getsid(0); |
| |
| req_msg.msg_type = REQUEST_RESOURCE_ALLOCATION; |
| req_msg.data = req; |
| |
| rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec); |
| |
| if (rc == SLURM_ERROR) |
| return SLURM_ERROR; |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) |
| return SLURM_ERROR; |
| *resp = NULL; |
| break; |
| case RESPONSE_RESOURCE_ALLOCATION: |
| *resp = (resource_allocation_response_msg_t *) resp_msg.data; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * slurm_allocate_resources_blocking |
| * allocate resources for a job request. This call will block until |
| * the allocation is granted, or the specified timeout limit is reached. |
| * IN req - description of resource allocation request |
| * IN timeout - amount of time, in seconds, to wait for a response before |
| * giving up. |
| * A timeout of zero will wait indefinitely. |
| * IN pending_callback - If the allocation cannot be granted immediately, |
| * the controller will put the job in the PENDING state. If |
| * pending callback is not NULL, it will be called with the job_id |
| * of the pending job as the sole parameter. |
| * |
| * RET allocation structure on success, NULL on error set errno to |
| * indicate the error (errno will be ETIMEDOUT if the timeout is reached |
| * with no allocation granted) |
| * NOTE: free the response using slurm_free_resource_allocation_response_msg() |
| */ |
| resource_allocation_response_msg_t * |
| slurm_allocate_resources_blocking (const job_desc_msg_t *user_req, |
| time_t timeout, |
| void(*pending_callback)(uint32_t job_id)) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| resource_allocation_response_msg_t *resp = NULL; |
| uint32_t job_id; |
| job_desc_msg_t *req; |
| listen_t *listen = NULL; |
| int errnum = SLURM_SUCCESS; |
| bool already_done = false; |
| |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| |
| /* make a copy of the user's job description struct so that we |
| * can make changes before contacting the controller */ |
| req = xmalloc(sizeof(job_desc_msg_t)); |
| memcpy(req, user_req, sizeof(job_desc_msg_t)); |
| |
| /* |
| * set Node and session id for this request |
| */ |
| if (req->alloc_sid == NO_VAL) |
| req->alloc_sid = getsid(0); |
| |
| if (!req->immediate) { |
| listen = _create_allocation_response_socket(); |
| if (listen == NULL) { |
| xfree(req); |
| return NULL; |
| } |
| req->alloc_resp_port = listen->port; |
| } |
| |
| if (tls_enabled()) { |
| if (!(req->alloc_tls_cert = conn_g_get_own_public_cert())) { |
| error("Could not get self signed certificate for allocation response"); |
| return NULL; |
| } |
| } |
| |
| req_msg.msg_type = REQUEST_RESOURCE_ALLOCATION; |
| req_msg.data = req; |
| |
| rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec); |
| xfree(req->alloc_tls_cert); |
| |
| if (rc == SLURM_ERROR) { |
| int errnum = errno; |
| destroy_forward(&req_msg.forward); |
| destroy_forward(&resp_msg.forward); |
| if (!req->immediate) |
| _destroy_allocation_response_socket(listen); |
| xfree(req); |
| errno = errnum; |
| return NULL; |
| } |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) { |
| /* will reach this when the allocation fails */ |
| errnum = errno; |
| } else { |
| /* shouldn't get here */ |
| errnum = SLURM_ERROR; |
| } |
| break; |
| case RESPONSE_RESOURCE_ALLOCATION: |
| /* Yay, the controller has acknowledged our request! |
| * Test if we have an allocation yet? */ |
| resp = (resource_allocation_response_msg_t *) resp_msg.data; |
| if (resp->node_cnt > 0) { |
| /* yes, allocation has been granted */ |
| errno = SLURM_SUCCESS; |
| } else if (!req->immediate) { |
| if (resp->error_code != SLURM_SUCCESS) |
| info("%s", slurm_strerror(resp->error_code)); |
| /* no, we need to wait for a response */ |
| |
| /* print out any user messages before we wait. */ |
| print_multi_line_string(resp->job_submit_user_msg, |
| -1, LOG_LEVEL_INFO); |
| |
| job_id = resp->job_id; |
| slurm_free_resource_allocation_response_msg(resp); |
| if (pending_callback != NULL) |
| pending_callback(job_id); |
| _wait_for_allocation_response(job_id, listen, |
| RESPONSE_RESOURCE_ALLOCATION, |
| timeout, (void **) &resp); |
| /* If NULL, we didn't get the allocation in |
| the time desired, so just free the job id */ |
| if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) { |
| errnum = errno; |
| slurm_complete_job(job_id, -1); |
| } |
| if ((resp == NULL) && (errno == ESLURM_ALREADY_DONE)) |
| already_done = true; |
| } |
| break; |
| default: |
| errnum = SLURM_UNEXPECTED_MSG_ERROR; |
| resp = NULL; |
| } |
| |
| destroy_forward(&req_msg.forward); |
| destroy_forward(&resp_msg.forward); |
| if (!req->immediate) |
| _destroy_allocation_response_socket(listen); |
| xfree(req); |
| if (!resp && already_done && (errnum == SLURM_SUCCESS)) |
| errnum = ESLURM_ALREADY_DONE; |
| errno = errnum; |
| return resp; |
| } |
| |
| static int _foreach_log_will_run_resp(void *x, void *key) |
| { |
| will_run_response_msg_t *will_run_resp = x; |
| char buf[256]; |
| slurm_make_time_str(&will_run_resp->start_time, buf, sizeof(buf)); |
| debug("Job %u to start at %s on cluster %s using %u processors on nodes %s in partition %s", |
| will_run_resp->job_id, buf, will_run_resp->cluster_name, |
| will_run_resp->proc_cnt, will_run_resp->node_list, |
| will_run_resp->part_name); |
| |
| if (will_run_resp->preemptee_job_id) { |
| list_itr_t *itr; |
| uint32_t *job_id_ptr; |
| char *job_list = NULL, *sep = ""; |
| itr = list_iterator_create(will_run_resp->preemptee_job_id); |
| while ((job_id_ptr = list_next(itr))) { |
| if (job_list) |
| sep = ","; |
| xstrfmtcat(job_list, "%s%u", sep, *job_id_ptr); |
| } |
| list_iterator_destroy(itr); |
| debug(" Preempts: %s", job_list); |
| xfree(job_list); |
| } |
| |
| return 0; |
| } |
| |
| static void log_will_run_resps(list_t *list) |
| { |
| if (get_log_level() < LOG_LEVEL_DEBUG) |
| return; |
| |
| list_for_each(list, _foreach_log_will_run_resp, NULL); |
| } |
| |
| static void *_load_willrun_thread(void *args) |
| { |
| load_willrun_req_struct_t *load_args = |
| (load_willrun_req_struct_t *)args; |
| slurmdb_cluster_rec_t *cluster = load_args->cluster; |
| will_run_response_msg_t *new_msg = NULL; |
| |
| if (!_job_will_run_cluster(load_args->req, &new_msg, cluster)) { |
| list_append(load_args->resp_msg_list, new_msg); |
| new_msg->cluster_name = xstrdup(cluster->name); |
| } else { |
| debug("Problem with submit to cluster %s: %m", cluster->name); |
| *(load_args->will_run_rc) = errno; |
| } |
| xfree(args); |
| |
| return NULL; |
| } |
| |
| extern int slurm_sort_will_run_resp(void *a, void *b) |
| { |
| will_run_response_msg_t *resp_a = *(will_run_response_msg_t **) a; |
| will_run_response_msg_t *resp_b = *(will_run_response_msg_t **) b; |
| |
| if (resp_a->start_time < resp_b->start_time) |
| return -1; |
| else if (resp_a->start_time > resp_b->start_time) |
| return 1; |
| |
| if (list_count(resp_a->preemptee_job_id) < |
| list_count(resp_b->preemptee_job_id)) |
| return -1; |
| else if (list_count(resp_a->preemptee_job_id) > |
| list_count(resp_b->preemptee_job_id)) |
| return 1; |
| |
| if (!xstrcmp(slurm_conf.cluster_name, resp_a->cluster_name)) |
| return -1; |
| else if (!xstrcmp(slurm_conf.cluster_name, resp_b->cluster_name)) |
| return 1; |
| |
| return 0; |
| } |
| |
| static int _fed_job_will_run(job_desc_msg_t *req, |
| will_run_response_msg_t **will_run_resp, |
| slurmdb_federation_rec_t *fed) |
| { |
| list_t *resp_msg_list; |
| int pthread_count = 0, i, will_run_rc = SLURM_SUCCESS; |
| pthread_t *load_thread = 0; |
| load_willrun_req_struct_t *load_args; |
| list_itr_t *iter; |
| slurmdb_cluster_rec_t *cluster; |
| list_t *req_clusters = NULL; |
| |
| xassert(req); |
| xassert(will_run_resp); |
| |
| *will_run_resp = NULL; |
| |
| /* |
| * If a subset of clusters was specified then only do a will_run to |
| * those clusters, otherwise check all clusters in the federation. |
| */ |
| if (req->clusters && xstrcasecmp(req->clusters, "all")) { |
| req_clusters = list_create(xfree_ptr); |
| slurm_addto_char_list(req_clusters, req->clusters); |
| } |
| |
| /* Spawn one pthread per cluster to collect job information */ |
| resp_msg_list = list_create(slurm_free_will_run_response_msg); |
| load_thread = xcalloc(list_count(fed->cluster_list), sizeof(pthread_t)); |
| iter = list_iterator_create(fed->cluster_list); |
| while ((cluster = (slurmdb_cluster_rec_t *)list_next(iter))) { |
| if ((cluster->control_host == NULL) || |
| (cluster->control_host[0] == '\0')) |
| continue; /* Cluster down */ |
| |
| if (req_clusters && |
| !list_find_first(req_clusters, slurm_find_char_in_list, |
| cluster->name)) |
| continue; |
| |
| load_args = xmalloc(sizeof(load_willrun_req_struct_t)); |
| load_args->cluster = cluster; |
| load_args->req = req; |
| load_args->resp_msg_list = resp_msg_list; |
| load_args->will_run_rc = &will_run_rc; |
| slurm_thread_create(&load_thread[pthread_count], |
| _load_willrun_thread, load_args); |
| pthread_count++; |
| } |
| list_iterator_destroy(iter); |
| FREE_NULL_LIST(req_clusters); |
| |
| /* Wait for all pthreads to complete */ |
| for (i = 0; i < pthread_count; i++) |
| slurm_thread_join(load_thread[i]); |
| xfree(load_thread); |
| |
| list_sort(resp_msg_list, slurm_sort_will_run_resp); |
| log_will_run_resps(resp_msg_list); |
| *will_run_resp = list_pop(resp_msg_list); |
| FREE_NULL_LIST(resp_msg_list); |
| |
| if (!*will_run_resp) { |
| errno = will_run_rc; |
| return SLURM_ERROR; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* Get total node count and lead job ID from RESPONSE_HET_JOB_ALLOCATION */ |
| static void _het_job_alloc_test(list_t *resp, uint32_t *node_cnt, |
| uint32_t *job_id) |
| { |
| resource_allocation_response_msg_t *alloc; |
| uint32_t inx = 0, het_job_node_cnt = 0, het_job_id = 0; |
| list_itr_t *iter; |
| |
| xassert(resp); |
| iter = list_iterator_create(resp); |
| while ((alloc = (resource_allocation_response_msg_t *)list_next(iter))){ |
| het_job_node_cnt += alloc->node_cnt; |
| if (het_job_id == 0) |
| het_job_id = alloc->job_id; |
| print_multi_line_string(alloc->job_submit_user_msg, |
| inx, LOG_LEVEL_INFO); |
| inx++; |
| } |
| list_iterator_destroy(iter); |
| |
| *job_id = het_job_id; |
| *node_cnt = het_job_node_cnt; |
| } |
| |
| /* |
| * slurm_allocate_het_job_blocking |
| * allocate resources for a list of job requests. This call will block |
| * until the entire allocation is granted, or the specified timeout limit |
| * is reached. |
| * IN req - List of resource allocation requests |
| * IN timeout - amount of time, in seconds, to wait for a response before |
| * giving up. |
| * A timeout of zero will wait indefinitely. |
| * IN pending_callback - If the allocation cannot be granted immediately, |
| * the controller will put the job in the PENDING state. If |
| * pending callback is not NULL, it will be called with the job_id |
| * of the pending job as the sole parameter. |
| * |
| * RET List of allocation structures on success, NULL on error set errno to |
| * indicate the error (errno will be ETIMEDOUT if the timeout is reached |
| * with no allocation granted) |
| * NOTE: free the response using list_destroy() |
| */ |
| list_t *slurm_allocate_het_job_blocking( |
| list_t *job_req_list, time_t timeout, |
| void (*pending_callback) (uint32_t job_id)) |
| { |
| int rc; |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| list_t *resp = NULL; |
| job_desc_msg_t *req; |
| listen_t *listen = NULL; |
| int errnum = SLURM_SUCCESS; |
| list_itr_t *iter; |
| bool immediate_flag = false; |
| uint32_t node_cnt = 0, job_id = 0; |
| bool already_done = false; |
| char *alloc_tls_cert = NULL; |
| |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| |
| /* |
| * set node name and session ID for this request |
| */ |
| |
| if (!immediate_flag) { |
| listen = _create_allocation_response_socket(); |
| if (listen == NULL) |
| return NULL; |
| } |
| |
| if (tls_enabled()) { |
| if (!(alloc_tls_cert = conn_g_get_own_public_cert())) { |
| error("Could not get self signed certificate for allocation response"); |
| return NULL; |
| } |
| } |
| |
| iter = list_iterator_create(job_req_list); |
| while ((req = (job_desc_msg_t *) list_next(iter))) { |
| if (req->alloc_sid == NO_VAL) |
| req->alloc_sid = getsid(0); |
| if (listen) |
| req->alloc_resp_port = listen->port; |
| |
| if (req->immediate) |
| immediate_flag = true; |
| req->alloc_tls_cert = xstrdup(alloc_tls_cert); |
| } |
| list_iterator_destroy(iter); |
| |
| xfree(alloc_tls_cert); |
| |
| req_msg.msg_type = REQUEST_HET_JOB_ALLOCATION; |
| req_msg.data = job_req_list; |
| |
| rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec); |
| |
| if (rc == SLURM_ERROR) { |
| int errnum = errno; |
| destroy_forward(&req_msg.forward); |
| destroy_forward(&resp_msg.forward); |
| if (listen) |
| _destroy_allocation_response_socket(listen); |
| errno = errnum; |
| return NULL; |
| } |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) { |
| /* will reach this when the allocation fails */ |
| errnum = errno; |
| } else { |
| /* shouldn't get here */ |
| errnum = SLURM_ERROR; |
| } |
| break; |
| case RESPONSE_HET_JOB_ALLOCATION: |
| /* Yay, the controller has acknowledged our request! |
| * Test if we have an allocation yet? */ |
| resp = resp_msg.data; |
| _het_job_alloc_test(resp, &node_cnt, &job_id); |
| if (node_cnt > 0) { |
| /* yes, allocation has been granted */ |
| errno = SLURM_SUCCESS; |
| } else if (immediate_flag) { |
| debug("Immediate allocation not granted"); |
| } else { |
| /* no, logs user messages and wait for a response */ |
| FREE_NULL_LIST(resp); |
| if (pending_callback != NULL) |
| pending_callback(job_id); |
| _wait_for_allocation_response(job_id, listen, |
| RESPONSE_HET_JOB_ALLOCATION, |
| timeout, (void **) &resp); |
| /* If NULL, we didn't get the allocation in |
| * the time desired, so just free the job id */ |
| if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) { |
| errnum = errno; |
| slurm_complete_job(job_id, -1); |
| } |
| if ((resp == NULL) && (errno == ESLURM_ALREADY_DONE)) |
| already_done = true; |
| } |
| break; |
| default: |
| errnum = SLURM_UNEXPECTED_MSG_ERROR; |
| } |
| |
| destroy_forward(&req_msg.forward); |
| destroy_forward(&resp_msg.forward); |
| if (listen) |
| _destroy_allocation_response_socket(listen); |
| if (!resp && already_done && (errnum == SLURM_SUCCESS)) |
| errnum = ESLURM_ALREADY_DONE; |
| errno = errnum; |
| |
| return resp; |
| } |
| |
| /* |
| * slurm_job_will_run - determine if a job would execute immediately if |
| * submitted now |
| * IN job_desc_msg - description of resource allocation request |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| int slurm_job_will_run(job_desc_msg_t *req) |
| { |
| will_run_response_msg_t *will_run_resp = NULL; |
| char buf[256]; |
| int rc; |
| void *ptr = NULL; |
| |
| /* |
| * If working_cluster_rec is defined, the it's already gone through |
| * slurmdb_get_first_avail_cluster() to find the cluster to go to. |
| */ |
| if (!working_cluster_rec && !slurm_load_federation(&ptr) && |
| cluster_in_federation(ptr, slurm_conf.cluster_name)) |
| rc = _fed_job_will_run(req, &will_run_resp, ptr); |
| else |
| rc = slurm_job_will_run2(req, &will_run_resp); |
| |
| if (will_run_resp) |
| print_multi_line_string( |
| will_run_resp->job_submit_user_msg, |
| -1, LOG_LEVEL_INFO); |
| |
| if ((rc == 0) && will_run_resp) { |
| char *cluster_name = NULL; |
| slurm_make_time_str(&will_run_resp->start_time, |
| buf, sizeof(buf)); |
| |
| if (working_cluster_rec) |
| cluster_name = working_cluster_rec->name; |
| else if (will_run_resp->cluster_name) |
| cluster_name = will_run_resp->cluster_name; |
| |
| info("Job %u to start at %s%s%s a using %u processors on nodes %s in partition %s", |
| will_run_resp->job_id, buf, |
| cluster_name ? " on cluster " : "", |
| cluster_name ? cluster_name : "", |
| will_run_resp->proc_cnt, |
| will_run_resp->node_list, |
| will_run_resp->part_name); |
| if (will_run_resp->preemptee_job_id) { |
| list_itr_t *itr; |
| uint32_t *job_id_ptr; |
| char *job_list = NULL, *sep = ""; |
| itr = list_iterator_create(will_run_resp-> |
| preemptee_job_id); |
| while ((job_id_ptr = list_next(itr))) { |
| if (job_list) |
| sep = ","; |
| xstrfmtcat(job_list, "%s%u", sep, *job_id_ptr); |
| } |
| list_iterator_destroy(itr); |
| info(" Preempts: %s", job_list); |
| xfree(job_list); |
| } |
| |
| slurm_free_will_run_response_msg(will_run_resp); |
| } |
| |
| if (ptr) |
| slurm_destroy_federation_rec(ptr); |
| |
| return rc; |
| } |
| |
| /* |
| * slurm_het_job_will_run - determine if a heterogeneous job would execute |
| * immediately if submitted now |
| * IN job_req_list - List of job_desc_msg_t structures describing the resource |
| * allocation request |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| extern int slurm_het_job_will_run(list_t *job_req_list) |
| { |
| job_desc_msg_t *req; |
| will_run_response_msg_t *will_run_resp; |
| char buf[256], *sep = ""; |
| int rc = SLURM_SUCCESS, inx = 0; |
| list_itr_t *iter, *itr; |
| time_t first_start = (time_t) 0; |
| uint32_t first_job_id = 0, tot_proc_count = 0, *job_id_ptr; |
| hostset_t *hs = NULL; |
| char *job_list = NULL; |
| |
| if (!job_req_list || (list_count(job_req_list) == 0)) { |
| error("No job descriptors input"); |
| return SLURM_ERROR; |
| } |
| |
| iter = list_iterator_create(job_req_list); |
| while ((req = (job_desc_msg_t *) list_next(iter))) { |
| will_run_resp = NULL; |
| rc = slurm_job_will_run2(req, &will_run_resp); |
| |
| if (will_run_resp) |
| print_multi_line_string( |
| will_run_resp->job_submit_user_msg, |
| inx, LOG_LEVEL_INFO); |
| |
| if ((rc == SLURM_SUCCESS) && will_run_resp) { |
| if (first_job_id == 0) |
| first_job_id = will_run_resp->job_id; |
| if ((first_start == 0) || |
| (first_start < will_run_resp->start_time)) |
| first_start = will_run_resp->start_time; |
| tot_proc_count += will_run_resp->proc_cnt; |
| if (hs) |
| hostset_insert(hs, will_run_resp->node_list); |
| else |
| hs = hostset_create(will_run_resp->node_list); |
| |
| if (will_run_resp->preemptee_job_id) { |
| itr = list_iterator_create(will_run_resp-> |
| preemptee_job_id); |
| while ((job_id_ptr = list_next(itr))) { |
| if (job_list) |
| sep = ","; |
| xstrfmtcat(job_list, "%s%u", sep, |
| *job_id_ptr); |
| } |
| list_iterator_destroy(itr); |
| } |
| |
| slurm_free_will_run_response_msg(will_run_resp); |
| } |
| if (rc != SLURM_SUCCESS) |
| break; |
| inx++; |
| } |
| list_iterator_destroy(iter); |
| |
| |
| if (rc == SLURM_SUCCESS) { |
| char *node_list = NULL; |
| |
| if (hs) |
| node_list = hostset_ranged_string_xmalloc(hs); |
| slurm_make_time_str(&first_start, buf, sizeof(buf)); |
| info("Job %u to start at %s using %u processors on %s", |
| first_job_id, buf, tot_proc_count, node_list); |
| if (job_list) |
| info(" Preempts: %s", job_list); |
| |
| xfree(node_list); |
| } |
| |
| if (hs) |
| hostset_destroy(hs); |
| xfree(job_list); |
| |
| return rc; |
| } |
| |
| /* |
| * slurm_job_will_run2 - determine if a job would execute immediately if |
| * submitted now |
| * IN job_desc_msg - description of resource allocation request |
| * OUT will_run_resp - job run time data |
| * free using slurm_free_will_run_response_msg() |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| */ |
| int slurm_job_will_run2 (job_desc_msg_t *req, |
| will_run_response_msg_t **will_run_resp) |
| { |
| return _job_will_run_cluster(req, will_run_resp, working_cluster_rec); |
| } |
| |
| static int _job_will_run_cluster(job_desc_msg_t *req, |
| will_run_response_msg_t **will_run_resp, |
| slurmdb_cluster_rec_t *cluster) |
| { |
| slurm_msg_t req_msg, resp_msg; |
| int rc; |
| /* req.immediate = true; implicit */ |
| |
| slurm_msg_t_init(&req_msg); |
| req_msg.msg_type = REQUEST_JOB_WILL_RUN; |
| req_msg.data = req; |
| |
| rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg, cluster); |
| |
| if (rc < 0) |
| return SLURM_ERROR; |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) |
| return SLURM_ERROR; |
| break; |
| case RESPONSE_JOB_WILL_RUN: |
| *will_run_resp = (will_run_response_msg_t *) resp_msg.data; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| break; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * slurm_job_step_create - create a job step for a given job id |
| * IN slurm_step_alloc_req_msg - description of job step request |
| * OUT slurm_step_alloc_resp_msg - response to request |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| * NOTE: free the response using slurm_free_job_step_create_response_msg |
| */ |
| int |
| slurm_job_step_create (job_step_create_request_msg_t *req, |
| job_step_create_response_msg_t **resp) |
| { |
| slurm_msg_t req_msg, resp_msg; |
| int delay = 0, rc, retry = 0; |
| char *stepmgr_nodename = NULL; |
| |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| req_msg.msg_type = REQUEST_JOB_STEP_CREATE; |
| req_msg.data = req; |
| |
| re_send: |
| /* xstrdup() to be consistent with reroute and be able to free. */ |
| if ((stepmgr_nodename = xstrdup(getenv("SLURM_STEPMGR")))) { |
| trystepmgr: |
| slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id); |
| |
| if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags)) { |
| /* |
| * The node isn't in the conf, see if the |
| * controller has an address for it. |
| */ |
| slurm_node_alias_addrs_t *alias_addrs; |
| if (!slurm_get_node_alias_addrs(stepmgr_nodename, |
| &alias_addrs)) { |
| add_remote_nodes_to_conf_tbls( |
| alias_addrs->node_list, |
| alias_addrs->node_addrs); |
| } |
| slurm_free_node_alias_addrs(alias_addrs); |
| slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags); |
| } |
| xfree(stepmgr_nodename); |
| |
| if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0)) |
| return SLURM_ERROR; |
| } else if (slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec) < 0) { |
| return SLURM_ERROR; |
| } |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_REROUTE_MSG: |
| { |
| reroute_msg_t *rr_msg = resp_msg.data; |
| xfree(stepmgr_nodename); |
| stepmgr_nodename = rr_msg->stepmgr; |
| rr_msg->stepmgr = NULL; |
| if (stepmgr_nodename) |
| goto trystepmgr; |
| else |
| return SLURM_ERROR; |
| break; |
| } |
| case RESPONSE_SLURM_RC: |
| rc = _handle_rc_msg(&resp_msg); |
| if ((rc < 0) && (errno == EAGAIN)) { |
| if (retry++ == 0) { |
| verbose("Slurm is busy, step creation delayed"); |
| delay = (getpid() % 10) + 10; /* 10-19 secs */ |
| } |
| sleep(delay); |
| goto re_send; |
| } |
| if (rc < 0) |
| return SLURM_ERROR; |
| *resp = NULL; |
| break; |
| case RESPONSE_JOB_STEP_CREATE: |
| *resp = (job_step_create_response_msg_t *) resp_msg.data; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| break; |
| } |
| |
| return SLURM_SUCCESS ; |
| } |
| |
| /* |
| * slurm_allocation_lookup - retrieve info for an existing resource allocation |
| * without the addrs and such |
| * IN jobid - job allocation identifier |
| * OUT info - job allocation information |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| * NOTE: free the response using slurm_free_resource_allocation_response_msg() |
| */ |
| extern int slurm_allocation_lookup(uint32_t jobid, |
| resource_allocation_response_msg_t **info) |
| { |
| job_alloc_info_msg_t req; |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| |
| memset(&req, 0, sizeof(req)); |
| req.job_id = jobid; |
| req.req_cluster = slurm_conf.cluster_name; |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| req_msg.msg_type = REQUEST_JOB_ALLOCATION_INFO; |
| req_msg.data = &req; |
| |
| if (slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec) < 0) |
| return SLURM_ERROR; |
| |
| req.req_cluster = NULL; |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) |
| return SLURM_ERROR; |
| *info = NULL; |
| break; |
| case RESPONSE_JOB_ALLOCATION_INFO: |
| *info = (resource_allocation_response_msg_t *) resp_msg.data; |
| return SLURM_SUCCESS; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| break; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * slurm_het_job_lookup - retrieve info for an existing heterogeneous job |
| * allocation without the addrs and such |
| * IN jobid - job allocation identifier |
| * OUT info - job allocation information |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| * NOTE: returns information an individual job as well |
| * NOTE: free the response using list_destroy() |
| */ |
| extern int slurm_het_job_lookup(uint32_t jobid, list_t **info) |
| { |
| job_alloc_info_msg_t req; |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| char *stepmgr_nodename = NULL; |
| |
| memset(&req, 0, sizeof(req)); |
| req.job_id = jobid; |
| req.req_cluster = slurm_conf.cluster_name; |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| req_msg.msg_type = REQUEST_HET_JOB_ALLOC_INFO; |
| req_msg.data = &req; |
| |
| if ((stepmgr_nodename = xstrdup(getenv("SLURM_STEPMGR")))) { |
| slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id); |
| |
| if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags)) { |
| /* |
| * The node isn't in the conf, see if the |
| * controller has an address for it. |
| */ |
| slurm_node_alias_addrs_t *alias_addrs; |
| if (!slurm_get_node_alias_addrs(stepmgr_nodename, |
| &alias_addrs)) { |
| add_remote_nodes_to_conf_tbls( |
| alias_addrs->node_list, |
| alias_addrs->node_addrs); |
| } |
| slurm_free_node_alias_addrs(alias_addrs); |
| slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags); |
| } |
| xfree(stepmgr_nodename); |
| |
| if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0)) |
| return SLURM_ERROR; |
| } else if (slurm_send_recv_controller_msg(&req_msg, &resp_msg, |
| working_cluster_rec) < 0) { |
| return SLURM_ERROR; |
| } |
| |
| req.req_cluster = NULL; |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) |
| return SLURM_ERROR; |
| *info = NULL; |
| break; |
| case RESPONSE_HET_JOB_ALLOCATION: |
| *info = resp_msg.data; |
| return SLURM_SUCCESS; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| break; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * slurm_sbcast_lookup - retrieve info for an existing resource allocation |
| * including a credential needed for sbcast. |
| * IN selected_step - filled in with step_id and het_job_offset |
| * OUT info - job allocation information including a credential for sbcast |
| * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set |
| * NOTE: free the "resp" using slurm_free_sbcast_cred_msg |
| */ |
| extern int slurm_sbcast_lookup(slurm_selected_step_t *selected_step, |
| job_sbcast_cred_msg_t **info) |
| { |
| slurm_msg_t req_msg; |
| slurm_msg_t resp_msg; |
| char *stepmgr_nodename = NULL; |
| |
| slurm_msg_t_init(&req_msg); |
| slurm_msg_t_init(&resp_msg); |
| req_msg.msg_type = REQUEST_JOB_SBCAST_CRED; |
| req_msg.data = selected_step; |
| |
| trystepmgr: |
| if (stepmgr_nodename) { |
| slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id); |
| |
| if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags)) { |
| /* |
| * The node isn't in the conf, see if the |
| * controller has an address for it. |
| */ |
| slurm_node_alias_addrs_t *alias_addrs; |
| if (!slurm_get_node_alias_addrs(stepmgr_nodename, |
| &alias_addrs)) { |
| add_remote_nodes_to_conf_tbls( |
| alias_addrs->node_list, |
| alias_addrs->node_addrs); |
| } |
| slurm_free_node_alias_addrs(alias_addrs); |
| slurm_conf_get_addr(stepmgr_nodename, &req_msg.address, |
| req_msg.flags); |
| } |
| xfree(stepmgr_nodename); |
| |
| if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0)) |
| return SLURM_ERROR; |
| } else if (slurm_send_recv_controller_msg( |
| &req_msg, |
| &resp_msg,working_cluster_rec) < 0) |
| return SLURM_ERROR; |
| |
| switch (resp_msg.msg_type) { |
| case RESPONSE_SLURM_REROUTE_MSG: |
| { |
| reroute_msg_t *rr_msg = resp_msg.data; |
| stepmgr_nodename = rr_msg->stepmgr; |
| rr_msg->stepmgr = NULL; |
| if (stepmgr_nodename) |
| goto trystepmgr; |
| else |
| return SLURM_ERROR; |
| |
| break; |
| } |
| case RESPONSE_SLURM_RC: |
| if (_handle_rc_msg(&resp_msg) < 0) |
| return SLURM_ERROR; |
| *info = NULL; |
| break; |
| case RESPONSE_JOB_SBCAST_CRED: |
| *info = (job_sbcast_cred_msg_t *)resp_msg.data; |
| return SLURM_SUCCESS; |
| break; |
| default: |
| slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); |
| break; |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Handle a return code message type. |
| * if return code is nonzero, sets errno to return code and returns < 0. |
| * Otherwise, returns 0 (SLURM_SUCCES) |
| */ |
| static int |
| _handle_rc_msg(slurm_msg_t *msg) |
| { |
| int rc = ((return_code_msg_t *) msg->data)->return_code; |
| slurm_free_return_code_msg(msg->data); |
| |
| if (rc) |
| slurm_seterrno_ret(rc); |
| else |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Read a Slurm hostfile specified by "filename". "filename" must contain |
| * a list of Slurm NodeNames, one per line, comma separated, or * notation. |
| * Reads up to "n" number of hostnames from the file. Returns a string |
| * representing a hostlist ranged string of the contents of the file. |
| * This is a helper function, it does not contact any Slurm daemons. |
| * |
| * Returns a string representing the hostlist. Returns NULL if there are fewer |
| * than "n" hostnames in the file, or if an error occurs. If "n" == |
| * NO_VAL then the entire file is read in |
| * |
| * Returned string must be freed with free(). |
| */ |
| char *slurm_read_hostfile(const char *filename, int n) |
| { |
| FILE *fp = NULL; |
| char in_line[BUFFER_SIZE]; /* input line */ |
| int i, j; |
| int line_size; |
| int line_num = 0; |
| hostlist_t *hostlist = NULL; |
| char *nodelist = NULL, *end_part = NULL; |
| char *asterisk, *tmp_text = NULL, *save_ptr = NULL, *host_name; |
| int total_file_len = 0; |
| |
| if (filename == NULL || strlen(filename) == 0) |
| return NULL; |
| |
| if ((fp = fopen(filename, "r")) == NULL) { |
| error("slurm_allocate_resources error opening file %s, %m", |
| filename); |
| return NULL; |
| } |
| |
| hostlist = hostlist_create(NULL); |
| if (hostlist == NULL) { |
| fclose(fp); |
| return NULL; |
| } |
| |
| while (fgets(in_line, BUFFER_SIZE, fp) != NULL) { |
| |
| line_size = strlen(in_line); |
| for (i = 0; i < line_size; i++) { |
| if (in_line[i] == '\n') { |
| in_line[i] = '\0'; |
| break; |
| } |
| if (in_line[i] == '\0') |
| break; |
| if (in_line[i] != '#') |
| continue; |
| if ((i > 0) && (in_line[i - 1] == '\\')) { |
| for (j = i; j < line_size; j++) { |
| in_line[j - 1] = in_line[j]; |
| } |
| line_size--; |
| continue; |
| } |
| in_line[i] = '\0'; |
| break; |
| } |
| |
| /* |
| * Get the string length again just to in case it changed from |
| * the above loop |
| */ |
| line_size = strlen(in_line); |
| total_file_len += line_size; |
| |
| /* |
| * If there was an end section from before set it up to be on |
| * the front of this next chunk. |
| */ |
| if (end_part) { |
| tmp_text = end_part; |
| end_part = NULL; |
| } |
| |
| if (line_size == (BUFFER_SIZE - 1)) { |
| /* |
| * If we filled up the buffer get the end past the last |
| * comma. We will tack it on the next pass through. |
| */ |
| char *last_comma = strrchr(in_line, ','); |
| if (!last_comma) { |
| error("Line %d, of hostfile %s too long", |
| line_num, filename); |
| fclose(fp); |
| hostlist_destroy(hostlist); |
| return NULL; |
| } |
| end_part = xstrdup(last_comma + 1); |
| *last_comma = '\0'; |
| } else |
| line_num++; |
| |
| xstrcat(tmp_text, in_line); |
| |
| /* Skip this line */ |
| if (tmp_text[0] == '\0') |
| continue; |
| |
| if (!isalpha(tmp_text[0]) && !isdigit(tmp_text[0])) { |
| error("Invalid hostfile %s contents on line %d", |
| filename, line_num); |
| fclose(fp); |
| hostlist_destroy(hostlist); |
| xfree(end_part); |
| xfree(tmp_text); |
| return NULL; |
| } |
| |
| host_name = strtok_r(tmp_text, ",", &save_ptr); |
| while (host_name) { |
| if ((asterisk = strchr(host_name, '*')) && |
| (i = atoi(asterisk + 1))) { |
| asterisk[0] = '\0'; |
| |
| if (n != (int) NO_VAL) |
| i = MIN(i, |
| n - hostlist_count(hostlist)); |
| /* |
| * Don't forget the extra space potentially |
| * needed |
| */ |
| total_file_len += strlen(host_name) * i; |
| |
| for (j = 0; j < i; j++) |
| hostlist_push_host(hostlist, host_name); |
| } else { |
| hostlist_push_host(hostlist, host_name); |
| } |
| host_name = strtok_r(NULL, ",", &save_ptr); |
| |
| if ((n != (int) NO_VAL) && |
| (hostlist_count(hostlist) == n)) |
| break; |
| } |
| xfree(tmp_text); |
| |
| if ((n != (int)NO_VAL) && (hostlist_count(hostlist) == n)) |
| break; |
| } |
| fclose(fp); |
| |
| if (hostlist_count(hostlist) <= 0) { |
| error("Hostlist is empty!"); |
| goto cleanup_hostfile; |
| } |
| if (hostlist_count(hostlist) < n) { |
| error("Too few NodeNames in Slurm Hostfile"); |
| goto cleanup_hostfile; |
| } |
| |
| total_file_len += 1024; |
| nodelist = (char *)malloc(total_file_len); |
| if (!nodelist) { |
| error("Nodelist xmalloc failed"); |
| goto cleanup_hostfile; |
| } |
| |
| if (hostlist_ranged_string(hostlist, total_file_len, nodelist) == -1) { |
| error("Hostlist is too long for the allocate RPC!"); |
| free(nodelist); |
| nodelist = NULL; |
| goto cleanup_hostfile; |
| } |
| |
| debug2("Hostlist from SLURM_HOSTFILE = %s", nodelist); |
| |
| cleanup_hostfile: |
| hostlist_destroy(hostlist); |
| xfree(end_part); |
| xfree(tmp_text); |
| |
| return nodelist; |
| } |
| |
| /*************************************************************************** |
| * Support functions for slurm_allocate_resources_blocking() |
| ***************************************************************************/ |
| static listen_t *_create_allocation_response_socket(void) |
| { |
| listen_t *listen = NULL; |
| uint16_t *ports; |
| |
| listen = xmalloc(sizeof(listen_t)); |
| |
| if ((ports = slurm_get_srun_port_range())) |
| listen->fd = slurm_init_msg_engine_ports(ports); |
| else |
| listen->fd = slurm_init_msg_engine_port(0); |
| |
| if (listen->fd < 0) { |
| error("slurm_init_msg_engine_port error %m"); |
| xfree(listen); |
| return NULL; |
| } |
| |
| if (slurm_get_stream_addr(listen->fd, &listen->address) < 0) { |
| error("slurm_get_stream_addr error %m"); |
| close(listen->fd); |
| xfree(listen); |
| return NULL; |
| } |
| listen->hostname = xshort_hostname(); |
| |
| if ((listen->address.ss_family == AF_INET) || |
| (listen->address.ss_family == AF_INET6)) { |
| listen->port = slurm_get_port(&listen->address); |
| } else { |
| error("%s: address family not supported", __func__); |
| _destroy_allocation_response_socket(listen); |
| return NULL; |
| } |
| |
| fd_set_nonblocking(listen->fd); |
| |
| return listen; |
| } |
| |
| static void _destroy_allocation_response_socket(listen_t *listen) |
| { |
| xassert(listen != NULL); |
| |
| close(listen->fd); |
| if (listen->hostname) |
| xfree(listen->hostname); |
| xfree(listen); |
| } |
| |
| /* process RPC from slurmctld |
| * IN msg: message received |
| * OUT resp: resource allocation response message or List of them |
| * RET 1 if resp is filled in, 0 otherwise */ |
| static int _handle_msg(slurm_msg_t *msg, uint16_t msg_type, void **resp, |
| uint32_t job_id) |
| { |
| uid_t req_uid; |
| uid_t uid = getuid(); |
| int rc = 0; |
| |
| req_uid = auth_g_get_uid(msg->auth_cred); |
| |
| if ((req_uid != slurm_conf.slurm_user_id) && (req_uid != 0) && |
| (req_uid != uid)) { |
| error ("Security violation, slurm message from uid %u", |
| req_uid); |
| return 0; |
| } |
| |
| if (msg->msg_type == msg_type) { |
| debug2("resource allocation response received"); |
| slurm_send_rc_msg(msg, SLURM_SUCCESS); |
| *resp = msg->data; /* transfer payload to response */ |
| msg->data = NULL; |
| rc = 1; |
| } else if (msg->msg_type == SRUN_JOB_COMPLETE) { |
| srun_job_complete_msg_t *job_comp_msg = msg->data; |
| if (job_comp_msg->job_id == job_id) { |
| info("Job has been cancelled"); |
| } else { |
| verbose("Ignoring SRUN_JOB_COMPLETE message for JobId=%u (our JobId=%u)", |
| job_comp_msg->job_id, job_id); |
| rc = 2; |
| } |
| } else { |
| error("%s: received spurious message type: %s", |
| __func__, rpc_num2string(msg->msg_type)); |
| rc = 2; |
| } |
| return rc; |
| } |
| |
| /* Accept RPC from slurmctld and process it. |
| * IN slurmctld_fd: file descriptor for slurmctld communications |
| * IN msg_type: RESPONSE_RESOURCE_ALLOCATION or RESPONSE_HET_JOB_ALLOCATION |
| * OUT resp: resource allocation response message or List |
| * RET 1 if resp is filled in, 0 otherwise */ |
| static int _accept_msg_connection(int listen_fd, uint16_t msg_type, void **resp, |
| uint32_t job_id) |
| { |
| void *tls_conn = NULL; |
| slurm_msg_t *msg = NULL; |
| slurm_addr_t cli_addr; |
| int rc = 0; |
| |
| if (!(tls_conn = slurm_accept_msg_conn(listen_fd, &cli_addr))) |
| return 0; |
| |
| debug2("got message connection from %pA", &cli_addr); |
| |
| msg = xmalloc(sizeof(slurm_msg_t)); |
| slurm_msg_t_init(msg); |
| |
| if ((rc = slurm_receive_msg(tls_conn, msg, 0)) != 0) { |
| slurm_free_msg(msg); |
| |
| if (errno == EINTR) { |
| conn_g_destroy(tls_conn, true); |
| *resp = NULL; |
| return 0; |
| } |
| |
| error("%s[%pA]: %m", __func__, &cli_addr); |
| conn_g_destroy(tls_conn, true); |
| return SLURM_ERROR; |
| } |
| |
| rc = _handle_msg(msg, msg_type, resp, job_id); /* xfer payload */ |
| |
| slurm_free_msg(msg); |
| |
| conn_g_destroy(tls_conn, true); |
| return rc; |
| } |
| |
| /* Wait up to sleep_time for RPC from slurmctld indicating resource allocation |
| * has occurred. |
| * IN sleep_time: delay in seconds (0 means unbounded wait) |
| * RET -1: error, 0: timeout, 1:ready to read */ |
| static int _wait_for_alloc_rpc(const listen_t *listen, int sleep_time) |
| { |
| struct pollfd fds[1]; |
| int rc; |
| int timeout_ms; |
| |
| if (listen == NULL) { |
| error("Listening port not found"); |
| sleep(MAX(sleep_time, 1)); |
| return -1; |
| } |
| |
| fds[0].fd = listen->fd; |
| fds[0].events = POLLIN; |
| |
| if (sleep_time != 0) |
| timeout_ms = sleep_time * 1000; |
| else |
| timeout_ms = -1; |
| |
| while ((rc = poll(fds, 1, timeout_ms)) < 0) { |
| switch (errno) { |
| case EAGAIN: |
| case EINTR: |
| return -1; |
| case EBADF: |
| case ENOMEM: |
| case EINVAL: |
| case EFAULT: |
| error("poll: %m"); |
| return -1; |
| default: |
| error("poll: %m. Continuing..."); |
| } |
| } |
| |
| if (rc == 0) { /* poll timed out */ |
| errno = ETIMEDOUT; |
| } else if (fds[0].revents & POLLIN) { |
| return 1; |
| } |
| |
| return 0; |
| } |
| |
| static void _wait_for_allocation_response(uint32_t job_id, |
| const listen_t *listen, |
| uint16_t msg_type, int timeout, |
| void **resp) |
| { |
| int errnum, rc; |
| |
| info("job %u queued and waiting for resources", job_id); |
| *resp = NULL; |
| while (true) { |
| if ((rc = _wait_for_alloc_rpc(listen, timeout)) != 1) |
| break; |
| |
| if ((rc = _accept_msg_connection(listen->fd, msg_type, resp, |
| job_id)) != 2) |
| break; |
| } |
| if (rc <= 0) { |
| errnum = errno; |
| /* Maybe the resource allocation response RPC got lost |
| * in the mail; surely it should have arrived by now. |
| * Let's see if the controller thinks that the allocation |
| * has been granted. |
| */ |
| if (msg_type == RESPONSE_RESOURCE_ALLOCATION) { |
| if (slurm_allocation_lookup(job_id, |
| (resource_allocation_response_msg_t **) |
| resp) >= 0) |
| return; |
| } else if (msg_type == RESPONSE_HET_JOB_ALLOCATION) { |
| if (slurm_het_job_lookup(job_id, (list_t **) resp) >= 0) |
| return; |
| } else { |
| error("%s: Invalid msg_type (%u)", __func__, msg_type); |
| } |
| |
| if (errno == ESLURM_JOB_PENDING) { |
| debug3("Still waiting for allocation"); |
| errno = errnum; |
| return; |
| } else { |
| debug3("Unable to confirm allocation for job %u: %m", |
| job_id); |
| return; |
| } |
| } |
| info("job %u has been allocated resources", job_id); |
| return; |
| } |