blob: 14666f7dc99faae353447b9d4ff3d1fde7ccacd3 [file] [log] [blame]
/*****************************************************************************\
* allocate.c - allocate nodes for a job or step with supplied constraints
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <ctype.h>
#include <errno.h>
#include <netinet/in.h> /* for ntohs() */
#include <poll.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#ifndef __USE_XOPEN_EXTENDED
extern pid_t getsid(pid_t pid); /* missing from <unistd.h> */
#endif
#include "slurm/slurm.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/hostlist.h"
#include "src/common/parse_time.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/certgen.h"
#include "src/interfaces/certmgr.h"
#include "src/interfaces/conn.h"
#define BUFFER_SIZE 1024
#define MAX_ALLOC_WAIT 60 /* seconds */
#define MIN_ALLOC_WAIT 5 /* seconds */
typedef struct {
slurm_addr_t address;
int fd;
char *hostname;
uint16_t port;
} listen_t;
typedef struct {
slurmdb_cluster_rec_t *cluster;
job_desc_msg_t *req;
int *will_run_rc;
list_t *resp_msg_list;
} load_willrun_req_struct_t;
static int _handle_rc_msg(slurm_msg_t *msg);
static listen_t *_create_allocation_response_socket(void);
static void _destroy_allocation_response_socket(listen_t *listen);
static void _wait_for_allocation_response(uint32_t job_id,
const listen_t *listen,
uint16_t msg_type, int timeout,
void **resp);
static int _job_will_run_cluster(job_desc_msg_t *req,
will_run_response_msg_t **will_run_resp,
slurmdb_cluster_rec_t *cluster);
/*
* slurm_allocate_resources - allocate resources for a job request
* IN job_desc_msg - description of resource allocation request
* OUT slurm_alloc_msg - response to request
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
* NOTE: free the response using slurm_free_resource_allocation_response_msg()
*/
int
slurm_allocate_resources (job_desc_msg_t *req,
resource_allocation_response_msg_t **resp)
{
int rc;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
/*
* set Node and session id for this request
*/
if (req->alloc_sid == NO_VAL)
req->alloc_sid = getsid(0);
req_msg.msg_type = REQUEST_RESOURCE_ALLOCATION;
req_msg.data = req;
rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec);
if (rc == SLURM_ERROR)
return SLURM_ERROR;
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0)
return SLURM_ERROR;
*resp = NULL;
break;
case RESPONSE_RESOURCE_ALLOCATION:
*resp = (resource_allocation_response_msg_t *) resp_msg.data;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
}
return SLURM_SUCCESS;
}
/*
* slurm_allocate_resources_blocking
* allocate resources for a job request. This call will block until
* the allocation is granted, or the specified timeout limit is reached.
* IN req - description of resource allocation request
* IN timeout - amount of time, in seconds, to wait for a response before
* giving up.
* A timeout of zero will wait indefinitely.
* IN pending_callback - If the allocation cannot be granted immediately,
* the controller will put the job in the PENDING state. If
* pending callback is not NULL, it will be called with the job_id
* of the pending job as the sole parameter.
*
* RET allocation structure on success, NULL on error set errno to
* indicate the error (errno will be ETIMEDOUT if the timeout is reached
* with no allocation granted)
* NOTE: free the response using slurm_free_resource_allocation_response_msg()
*/
resource_allocation_response_msg_t *
slurm_allocate_resources_blocking (const job_desc_msg_t *user_req,
time_t timeout,
void(*pending_callback)(uint32_t job_id))
{
int rc;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
resource_allocation_response_msg_t *resp = NULL;
uint32_t job_id;
job_desc_msg_t *req;
listen_t *listen = NULL;
int errnum = SLURM_SUCCESS;
bool already_done = false;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
/* make a copy of the user's job description struct so that we
* can make changes before contacting the controller */
req = xmalloc(sizeof(job_desc_msg_t));
memcpy(req, user_req, sizeof(job_desc_msg_t));
/*
* set Node and session id for this request
*/
if (req->alloc_sid == NO_VAL)
req->alloc_sid = getsid(0);
if (!req->immediate) {
listen = _create_allocation_response_socket();
if (listen == NULL) {
xfree(req);
return NULL;
}
req->alloc_resp_port = listen->port;
}
if (tls_enabled()) {
if (!(req->alloc_tls_cert = conn_g_get_own_public_cert())) {
error("Could not get self signed certificate for allocation response");
return NULL;
}
}
req_msg.msg_type = REQUEST_RESOURCE_ALLOCATION;
req_msg.data = req;
rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec);
xfree(req->alloc_tls_cert);
if (rc == SLURM_ERROR) {
int errnum = errno;
destroy_forward(&req_msg.forward);
destroy_forward(&resp_msg.forward);
if (!req->immediate)
_destroy_allocation_response_socket(listen);
xfree(req);
errno = errnum;
return NULL;
}
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0) {
/* will reach this when the allocation fails */
errnum = errno;
} else {
/* shouldn't get here */
errnum = SLURM_ERROR;
}
break;
case RESPONSE_RESOURCE_ALLOCATION:
/* Yay, the controller has acknowledged our request!
* Test if we have an allocation yet? */
resp = (resource_allocation_response_msg_t *) resp_msg.data;
if (resp->node_cnt > 0) {
/* yes, allocation has been granted */
errno = SLURM_SUCCESS;
} else if (!req->immediate) {
if (resp->error_code != SLURM_SUCCESS)
info("%s", slurm_strerror(resp->error_code));
/* no, we need to wait for a response */
/* print out any user messages before we wait. */
print_multi_line_string(resp->job_submit_user_msg,
-1, LOG_LEVEL_INFO);
job_id = resp->job_id;
slurm_free_resource_allocation_response_msg(resp);
if (pending_callback != NULL)
pending_callback(job_id);
_wait_for_allocation_response(job_id, listen,
RESPONSE_RESOURCE_ALLOCATION,
timeout, (void **) &resp);
/* If NULL, we didn't get the allocation in
the time desired, so just free the job id */
if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) {
errnum = errno;
slurm_complete_job(job_id, -1);
}
if ((resp == NULL) && (errno == ESLURM_ALREADY_DONE))
already_done = true;
}
break;
default:
errnum = SLURM_UNEXPECTED_MSG_ERROR;
resp = NULL;
}
destroy_forward(&req_msg.forward);
destroy_forward(&resp_msg.forward);
if (!req->immediate)
_destroy_allocation_response_socket(listen);
xfree(req);
if (!resp && already_done && (errnum == SLURM_SUCCESS))
errnum = ESLURM_ALREADY_DONE;
errno = errnum;
return resp;
}
static int _foreach_log_will_run_resp(void *x, void *key)
{
will_run_response_msg_t *will_run_resp = x;
char buf[256];
slurm_make_time_str(&will_run_resp->start_time, buf, sizeof(buf));
debug("Job %u to start at %s on cluster %s using %u processors on nodes %s in partition %s",
will_run_resp->job_id, buf, will_run_resp->cluster_name,
will_run_resp->proc_cnt, will_run_resp->node_list,
will_run_resp->part_name);
if (will_run_resp->preemptee_job_id) {
list_itr_t *itr;
uint32_t *job_id_ptr;
char *job_list = NULL, *sep = "";
itr = list_iterator_create(will_run_resp->preemptee_job_id);
while ((job_id_ptr = list_next(itr))) {
if (job_list)
sep = ",";
xstrfmtcat(job_list, "%s%u", sep, *job_id_ptr);
}
list_iterator_destroy(itr);
debug(" Preempts: %s", job_list);
xfree(job_list);
}
return 0;
}
static void log_will_run_resps(list_t *list)
{
if (get_log_level() < LOG_LEVEL_DEBUG)
return;
list_for_each(list, _foreach_log_will_run_resp, NULL);
}
static void *_load_willrun_thread(void *args)
{
load_willrun_req_struct_t *load_args =
(load_willrun_req_struct_t *)args;
slurmdb_cluster_rec_t *cluster = load_args->cluster;
will_run_response_msg_t *new_msg = NULL;
if (!_job_will_run_cluster(load_args->req, &new_msg, cluster)) {
list_append(load_args->resp_msg_list, new_msg);
new_msg->cluster_name = xstrdup(cluster->name);
} else {
debug("Problem with submit to cluster %s: %m", cluster->name);
*(load_args->will_run_rc) = errno;
}
xfree(args);
return NULL;
}
extern int slurm_sort_will_run_resp(void *a, void *b)
{
will_run_response_msg_t *resp_a = *(will_run_response_msg_t **) a;
will_run_response_msg_t *resp_b = *(will_run_response_msg_t **) b;
if (resp_a->start_time < resp_b->start_time)
return -1;
else if (resp_a->start_time > resp_b->start_time)
return 1;
if (list_count(resp_a->preemptee_job_id) <
list_count(resp_b->preemptee_job_id))
return -1;
else if (list_count(resp_a->preemptee_job_id) >
list_count(resp_b->preemptee_job_id))
return 1;
if (!xstrcmp(slurm_conf.cluster_name, resp_a->cluster_name))
return -1;
else if (!xstrcmp(slurm_conf.cluster_name, resp_b->cluster_name))
return 1;
return 0;
}
static int _fed_job_will_run(job_desc_msg_t *req,
will_run_response_msg_t **will_run_resp,
slurmdb_federation_rec_t *fed)
{
list_t *resp_msg_list;
int pthread_count = 0, i, will_run_rc = SLURM_SUCCESS;
pthread_t *load_thread = 0;
load_willrun_req_struct_t *load_args;
list_itr_t *iter;
slurmdb_cluster_rec_t *cluster;
list_t *req_clusters = NULL;
xassert(req);
xassert(will_run_resp);
*will_run_resp = NULL;
/*
* If a subset of clusters was specified then only do a will_run to
* those clusters, otherwise check all clusters in the federation.
*/
if (req->clusters && xstrcasecmp(req->clusters, "all")) {
req_clusters = list_create(xfree_ptr);
slurm_addto_char_list(req_clusters, req->clusters);
}
/* Spawn one pthread per cluster to collect job information */
resp_msg_list = list_create(slurm_free_will_run_response_msg);
load_thread = xcalloc(list_count(fed->cluster_list), sizeof(pthread_t));
iter = list_iterator_create(fed->cluster_list);
while ((cluster = (slurmdb_cluster_rec_t *)list_next(iter))) {
if ((cluster->control_host == NULL) ||
(cluster->control_host[0] == '\0'))
continue; /* Cluster down */
if (req_clusters &&
!list_find_first(req_clusters, slurm_find_char_in_list,
cluster->name))
continue;
load_args = xmalloc(sizeof(load_willrun_req_struct_t));
load_args->cluster = cluster;
load_args->req = req;
load_args->resp_msg_list = resp_msg_list;
load_args->will_run_rc = &will_run_rc;
slurm_thread_create(&load_thread[pthread_count],
_load_willrun_thread, load_args);
pthread_count++;
}
list_iterator_destroy(iter);
FREE_NULL_LIST(req_clusters);
/* Wait for all pthreads to complete */
for (i = 0; i < pthread_count; i++)
slurm_thread_join(load_thread[i]);
xfree(load_thread);
list_sort(resp_msg_list, slurm_sort_will_run_resp);
log_will_run_resps(resp_msg_list);
*will_run_resp = list_pop(resp_msg_list);
FREE_NULL_LIST(resp_msg_list);
if (!*will_run_resp) {
errno = will_run_rc;
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
/* Get total node count and lead job ID from RESPONSE_HET_JOB_ALLOCATION */
static void _het_job_alloc_test(list_t *resp, uint32_t *node_cnt,
uint32_t *job_id)
{
resource_allocation_response_msg_t *alloc;
uint32_t inx = 0, het_job_node_cnt = 0, het_job_id = 0;
list_itr_t *iter;
xassert(resp);
iter = list_iterator_create(resp);
while ((alloc = (resource_allocation_response_msg_t *)list_next(iter))){
het_job_node_cnt += alloc->node_cnt;
if (het_job_id == 0)
het_job_id = alloc->job_id;
print_multi_line_string(alloc->job_submit_user_msg,
inx, LOG_LEVEL_INFO);
inx++;
}
list_iterator_destroy(iter);
*job_id = het_job_id;
*node_cnt = het_job_node_cnt;
}
/*
* slurm_allocate_het_job_blocking
* allocate resources for a list of job requests. This call will block
* until the entire allocation is granted, or the specified timeout limit
* is reached.
* IN req - List of resource allocation requests
* IN timeout - amount of time, in seconds, to wait for a response before
* giving up.
* A timeout of zero will wait indefinitely.
* IN pending_callback - If the allocation cannot be granted immediately,
* the controller will put the job in the PENDING state. If
* pending callback is not NULL, it will be called with the job_id
* of the pending job as the sole parameter.
*
* RET List of allocation structures on success, NULL on error set errno to
* indicate the error (errno will be ETIMEDOUT if the timeout is reached
* with no allocation granted)
* NOTE: free the response using list_destroy()
*/
list_t *slurm_allocate_het_job_blocking(
list_t *job_req_list, time_t timeout,
void (*pending_callback) (uint32_t job_id))
{
int rc;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
list_t *resp = NULL;
job_desc_msg_t *req;
listen_t *listen = NULL;
int errnum = SLURM_SUCCESS;
list_itr_t *iter;
bool immediate_flag = false;
uint32_t node_cnt = 0, job_id = 0;
bool already_done = false;
char *alloc_tls_cert = NULL;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
/*
* set node name and session ID for this request
*/
if (!immediate_flag) {
listen = _create_allocation_response_socket();
if (listen == NULL)
return NULL;
}
if (tls_enabled()) {
if (!(alloc_tls_cert = conn_g_get_own_public_cert())) {
error("Could not get self signed certificate for allocation response");
return NULL;
}
}
iter = list_iterator_create(job_req_list);
while ((req = (job_desc_msg_t *) list_next(iter))) {
if (req->alloc_sid == NO_VAL)
req->alloc_sid = getsid(0);
if (listen)
req->alloc_resp_port = listen->port;
if (req->immediate)
immediate_flag = true;
req->alloc_tls_cert = xstrdup(alloc_tls_cert);
}
list_iterator_destroy(iter);
xfree(alloc_tls_cert);
req_msg.msg_type = REQUEST_HET_JOB_ALLOCATION;
req_msg.data = job_req_list;
rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec);
if (rc == SLURM_ERROR) {
int errnum = errno;
destroy_forward(&req_msg.forward);
destroy_forward(&resp_msg.forward);
if (listen)
_destroy_allocation_response_socket(listen);
errno = errnum;
return NULL;
}
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0) {
/* will reach this when the allocation fails */
errnum = errno;
} else {
/* shouldn't get here */
errnum = SLURM_ERROR;
}
break;
case RESPONSE_HET_JOB_ALLOCATION:
/* Yay, the controller has acknowledged our request!
* Test if we have an allocation yet? */
resp = resp_msg.data;
_het_job_alloc_test(resp, &node_cnt, &job_id);
if (node_cnt > 0) {
/* yes, allocation has been granted */
errno = SLURM_SUCCESS;
} else if (immediate_flag) {
debug("Immediate allocation not granted");
} else {
/* no, logs user messages and wait for a response */
FREE_NULL_LIST(resp);
if (pending_callback != NULL)
pending_callback(job_id);
_wait_for_allocation_response(job_id, listen,
RESPONSE_HET_JOB_ALLOCATION,
timeout, (void **) &resp);
/* If NULL, we didn't get the allocation in
* the time desired, so just free the job id */
if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) {
errnum = errno;
slurm_complete_job(job_id, -1);
}
if ((resp == NULL) && (errno == ESLURM_ALREADY_DONE))
already_done = true;
}
break;
default:
errnum = SLURM_UNEXPECTED_MSG_ERROR;
}
destroy_forward(&req_msg.forward);
destroy_forward(&resp_msg.forward);
if (listen)
_destroy_allocation_response_socket(listen);
if (!resp && already_done && (errnum == SLURM_SUCCESS))
errnum = ESLURM_ALREADY_DONE;
errno = errnum;
return resp;
}
/*
* slurm_job_will_run - determine if a job would execute immediately if
* submitted now
* IN job_desc_msg - description of resource allocation request
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
int slurm_job_will_run(job_desc_msg_t *req)
{
will_run_response_msg_t *will_run_resp = NULL;
char buf[256];
int rc;
void *ptr = NULL;
/*
* If working_cluster_rec is defined, the it's already gone through
* slurmdb_get_first_avail_cluster() to find the cluster to go to.
*/
if (!working_cluster_rec && !slurm_load_federation(&ptr) &&
cluster_in_federation(ptr, slurm_conf.cluster_name))
rc = _fed_job_will_run(req, &will_run_resp, ptr);
else
rc = slurm_job_will_run2(req, &will_run_resp);
if (will_run_resp)
print_multi_line_string(
will_run_resp->job_submit_user_msg,
-1, LOG_LEVEL_INFO);
if ((rc == 0) && will_run_resp) {
char *cluster_name = NULL;
slurm_make_time_str(&will_run_resp->start_time,
buf, sizeof(buf));
if (working_cluster_rec)
cluster_name = working_cluster_rec->name;
else if (will_run_resp->cluster_name)
cluster_name = will_run_resp->cluster_name;
info("Job %u to start at %s%s%s a using %u processors on nodes %s in partition %s",
will_run_resp->job_id, buf,
cluster_name ? " on cluster " : "",
cluster_name ? cluster_name : "",
will_run_resp->proc_cnt,
will_run_resp->node_list,
will_run_resp->part_name);
if (will_run_resp->preemptee_job_id) {
list_itr_t *itr;
uint32_t *job_id_ptr;
char *job_list = NULL, *sep = "";
itr = list_iterator_create(will_run_resp->
preemptee_job_id);
while ((job_id_ptr = list_next(itr))) {
if (job_list)
sep = ",";
xstrfmtcat(job_list, "%s%u", sep, *job_id_ptr);
}
list_iterator_destroy(itr);
info(" Preempts: %s", job_list);
xfree(job_list);
}
slurm_free_will_run_response_msg(will_run_resp);
}
if (ptr)
slurm_destroy_federation_rec(ptr);
return rc;
}
/*
* slurm_het_job_will_run - determine if a heterogeneous job would execute
* immediately if submitted now
* IN job_req_list - List of job_desc_msg_t structures describing the resource
* allocation request
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
extern int slurm_het_job_will_run(list_t *job_req_list)
{
job_desc_msg_t *req;
will_run_response_msg_t *will_run_resp;
char buf[256], *sep = "";
int rc = SLURM_SUCCESS, inx = 0;
list_itr_t *iter, *itr;
time_t first_start = (time_t) 0;
uint32_t first_job_id = 0, tot_proc_count = 0, *job_id_ptr;
hostset_t *hs = NULL;
char *job_list = NULL;
if (!job_req_list || (list_count(job_req_list) == 0)) {
error("No job descriptors input");
return SLURM_ERROR;
}
iter = list_iterator_create(job_req_list);
while ((req = (job_desc_msg_t *) list_next(iter))) {
will_run_resp = NULL;
rc = slurm_job_will_run2(req, &will_run_resp);
if (will_run_resp)
print_multi_line_string(
will_run_resp->job_submit_user_msg,
inx, LOG_LEVEL_INFO);
if ((rc == SLURM_SUCCESS) && will_run_resp) {
if (first_job_id == 0)
first_job_id = will_run_resp->job_id;
if ((first_start == 0) ||
(first_start < will_run_resp->start_time))
first_start = will_run_resp->start_time;
tot_proc_count += will_run_resp->proc_cnt;
if (hs)
hostset_insert(hs, will_run_resp->node_list);
else
hs = hostset_create(will_run_resp->node_list);
if (will_run_resp->preemptee_job_id) {
itr = list_iterator_create(will_run_resp->
preemptee_job_id);
while ((job_id_ptr = list_next(itr))) {
if (job_list)
sep = ",";
xstrfmtcat(job_list, "%s%u", sep,
*job_id_ptr);
}
list_iterator_destroy(itr);
}
slurm_free_will_run_response_msg(will_run_resp);
}
if (rc != SLURM_SUCCESS)
break;
inx++;
}
list_iterator_destroy(iter);
if (rc == SLURM_SUCCESS) {
char *node_list = NULL;
if (hs)
node_list = hostset_ranged_string_xmalloc(hs);
slurm_make_time_str(&first_start, buf, sizeof(buf));
info("Job %u to start at %s using %u processors on %s",
first_job_id, buf, tot_proc_count, node_list);
if (job_list)
info(" Preempts: %s", job_list);
xfree(node_list);
}
if (hs)
hostset_destroy(hs);
xfree(job_list);
return rc;
}
/*
* slurm_job_will_run2 - determine if a job would execute immediately if
* submitted now
* IN job_desc_msg - description of resource allocation request
* OUT will_run_resp - job run time data
* free using slurm_free_will_run_response_msg()
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
*/
int slurm_job_will_run2 (job_desc_msg_t *req,
will_run_response_msg_t **will_run_resp)
{
return _job_will_run_cluster(req, will_run_resp, working_cluster_rec);
}
static int _job_will_run_cluster(job_desc_msg_t *req,
will_run_response_msg_t **will_run_resp,
slurmdb_cluster_rec_t *cluster)
{
slurm_msg_t req_msg, resp_msg;
int rc;
/* req.immediate = true; implicit */
slurm_msg_t_init(&req_msg);
req_msg.msg_type = REQUEST_JOB_WILL_RUN;
req_msg.data = req;
rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg, cluster);
if (rc < 0)
return SLURM_ERROR;
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0)
return SLURM_ERROR;
break;
case RESPONSE_JOB_WILL_RUN:
*will_run_resp = (will_run_response_msg_t *) resp_msg.data;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS;
}
/*
* slurm_job_step_create - create a job step for a given job id
* IN slurm_step_alloc_req_msg - description of job step request
* OUT slurm_step_alloc_resp_msg - response to request
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
* NOTE: free the response using slurm_free_job_step_create_response_msg
*/
int
slurm_job_step_create (job_step_create_request_msg_t *req,
job_step_create_response_msg_t **resp)
{
slurm_msg_t req_msg, resp_msg;
int delay = 0, rc, retry = 0;
char *stepmgr_nodename = NULL;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
req_msg.msg_type = REQUEST_JOB_STEP_CREATE;
req_msg.data = req;
re_send:
/* xstrdup() to be consistent with reroute and be able to free. */
if ((stepmgr_nodename = xstrdup(getenv("SLURM_STEPMGR")))) {
trystepmgr:
slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id);
if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags)) {
/*
* The node isn't in the conf, see if the
* controller has an address for it.
*/
slurm_node_alias_addrs_t *alias_addrs;
if (!slurm_get_node_alias_addrs(stepmgr_nodename,
&alias_addrs)) {
add_remote_nodes_to_conf_tbls(
alias_addrs->node_list,
alias_addrs->node_addrs);
}
slurm_free_node_alias_addrs(alias_addrs);
slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags);
}
xfree(stepmgr_nodename);
if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0))
return SLURM_ERROR;
} else if (slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec) < 0) {
return SLURM_ERROR;
}
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_REROUTE_MSG:
{
reroute_msg_t *rr_msg = resp_msg.data;
xfree(stepmgr_nodename);
stepmgr_nodename = rr_msg->stepmgr;
rr_msg->stepmgr = NULL;
if (stepmgr_nodename)
goto trystepmgr;
else
return SLURM_ERROR;
break;
}
case RESPONSE_SLURM_RC:
rc = _handle_rc_msg(&resp_msg);
if ((rc < 0) && (errno == EAGAIN)) {
if (retry++ == 0) {
verbose("Slurm is busy, step creation delayed");
delay = (getpid() % 10) + 10; /* 10-19 secs */
}
sleep(delay);
goto re_send;
}
if (rc < 0)
return SLURM_ERROR;
*resp = NULL;
break;
case RESPONSE_JOB_STEP_CREATE:
*resp = (job_step_create_response_msg_t *) resp_msg.data;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS ;
}
/*
* slurm_allocation_lookup - retrieve info for an existing resource allocation
* without the addrs and such
* IN jobid - job allocation identifier
* OUT info - job allocation information
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
* NOTE: free the response using slurm_free_resource_allocation_response_msg()
*/
extern int slurm_allocation_lookup(uint32_t jobid,
resource_allocation_response_msg_t **info)
{
job_alloc_info_msg_t req;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
memset(&req, 0, sizeof(req));
req.job_id = jobid;
req.req_cluster = slurm_conf.cluster_name;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
req_msg.msg_type = REQUEST_JOB_ALLOCATION_INFO;
req_msg.data = &req;
if (slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec) < 0)
return SLURM_ERROR;
req.req_cluster = NULL;
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0)
return SLURM_ERROR;
*info = NULL;
break;
case RESPONSE_JOB_ALLOCATION_INFO:
*info = (resource_allocation_response_msg_t *) resp_msg.data;
return SLURM_SUCCESS;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS;
}
/*
* slurm_het_job_lookup - retrieve info for an existing heterogeneous job
* allocation without the addrs and such
* IN jobid - job allocation identifier
* OUT info - job allocation information
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
* NOTE: returns information an individual job as well
* NOTE: free the response using list_destroy()
*/
extern int slurm_het_job_lookup(uint32_t jobid, list_t **info)
{
job_alloc_info_msg_t req;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
char *stepmgr_nodename = NULL;
memset(&req, 0, sizeof(req));
req.job_id = jobid;
req.req_cluster = slurm_conf.cluster_name;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
req_msg.msg_type = REQUEST_HET_JOB_ALLOC_INFO;
req_msg.data = &req;
if ((stepmgr_nodename = xstrdup(getenv("SLURM_STEPMGR")))) {
slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id);
if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags)) {
/*
* The node isn't in the conf, see if the
* controller has an address for it.
*/
slurm_node_alias_addrs_t *alias_addrs;
if (!slurm_get_node_alias_addrs(stepmgr_nodename,
&alias_addrs)) {
add_remote_nodes_to_conf_tbls(
alias_addrs->node_list,
alias_addrs->node_addrs);
}
slurm_free_node_alias_addrs(alias_addrs);
slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags);
}
xfree(stepmgr_nodename);
if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0))
return SLURM_ERROR;
} else if (slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec) < 0) {
return SLURM_ERROR;
}
req.req_cluster = NULL;
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0)
return SLURM_ERROR;
*info = NULL;
break;
case RESPONSE_HET_JOB_ALLOCATION:
*info = resp_msg.data;
return SLURM_SUCCESS;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS;
}
/*
* slurm_sbcast_lookup - retrieve info for an existing resource allocation
* including a credential needed for sbcast.
* IN selected_step - filled in with step_id and het_job_offset
* OUT info - job allocation information including a credential for sbcast
* RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
* NOTE: free the "resp" using slurm_free_sbcast_cred_msg
*/
extern int slurm_sbcast_lookup(slurm_selected_step_t *selected_step,
job_sbcast_cred_msg_t **info)
{
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
char *stepmgr_nodename = NULL;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
req_msg.msg_type = REQUEST_JOB_SBCAST_CRED;
req_msg.data = selected_step;
trystepmgr:
if (stepmgr_nodename) {
slurm_msg_set_r_uid(&req_msg, slurm_conf.slurmd_user_id);
if (slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags)) {
/*
* The node isn't in the conf, see if the
* controller has an address for it.
*/
slurm_node_alias_addrs_t *alias_addrs;
if (!slurm_get_node_alias_addrs(stepmgr_nodename,
&alias_addrs)) {
add_remote_nodes_to_conf_tbls(
alias_addrs->node_list,
alias_addrs->node_addrs);
}
slurm_free_node_alias_addrs(alias_addrs);
slurm_conf_get_addr(stepmgr_nodename, &req_msg.address,
req_msg.flags);
}
xfree(stepmgr_nodename);
if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0))
return SLURM_ERROR;
} else if (slurm_send_recv_controller_msg(
&req_msg,
&resp_msg,working_cluster_rec) < 0)
return SLURM_ERROR;
switch (resp_msg.msg_type) {
case RESPONSE_SLURM_REROUTE_MSG:
{
reroute_msg_t *rr_msg = resp_msg.data;
stepmgr_nodename = rr_msg->stepmgr;
rr_msg->stepmgr = NULL;
if (stepmgr_nodename)
goto trystepmgr;
else
return SLURM_ERROR;
break;
}
case RESPONSE_SLURM_RC:
if (_handle_rc_msg(&resp_msg) < 0)
return SLURM_ERROR;
*info = NULL;
break;
case RESPONSE_JOB_SBCAST_CRED:
*info = (job_sbcast_cred_msg_t *)resp_msg.data;
return SLURM_SUCCESS;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS;
}
/*
* Handle a return code message type.
* if return code is nonzero, sets errno to return code and returns < 0.
* Otherwise, returns 0 (SLURM_SUCCES)
*/
static int
_handle_rc_msg(slurm_msg_t *msg)
{
int rc = ((return_code_msg_t *) msg->data)->return_code;
slurm_free_return_code_msg(msg->data);
if (rc)
slurm_seterrno_ret(rc);
else
return SLURM_SUCCESS;
}
/*
* Read a Slurm hostfile specified by "filename". "filename" must contain
* a list of Slurm NodeNames, one per line, comma separated, or * notation.
* Reads up to "n" number of hostnames from the file. Returns a string
* representing a hostlist ranged string of the contents of the file.
* This is a helper function, it does not contact any Slurm daemons.
*
* Returns a string representing the hostlist. Returns NULL if there are fewer
* than "n" hostnames in the file, or if an error occurs. If "n" ==
* NO_VAL then the entire file is read in
*
* Returned string must be freed with free().
*/
char *slurm_read_hostfile(const char *filename, int n)
{
FILE *fp = NULL;
char in_line[BUFFER_SIZE]; /* input line */
int i, j;
int line_size;
int line_num = 0;
hostlist_t *hostlist = NULL;
char *nodelist = NULL, *end_part = NULL;
char *asterisk, *tmp_text = NULL, *save_ptr = NULL, *host_name;
int total_file_len = 0;
if (filename == NULL || strlen(filename) == 0)
return NULL;
if ((fp = fopen(filename, "r")) == NULL) {
error("slurm_allocate_resources error opening file %s, %m",
filename);
return NULL;
}
hostlist = hostlist_create(NULL);
if (hostlist == NULL) {
fclose(fp);
return NULL;
}
while (fgets(in_line, BUFFER_SIZE, fp) != NULL) {
line_size = strlen(in_line);
for (i = 0; i < line_size; i++) {
if (in_line[i] == '\n') {
in_line[i] = '\0';
break;
}
if (in_line[i] == '\0')
break;
if (in_line[i] != '#')
continue;
if ((i > 0) && (in_line[i - 1] == '\\')) {
for (j = i; j < line_size; j++) {
in_line[j - 1] = in_line[j];
}
line_size--;
continue;
}
in_line[i] = '\0';
break;
}
/*
* Get the string length again just to in case it changed from
* the above loop
*/
line_size = strlen(in_line);
total_file_len += line_size;
/*
* If there was an end section from before set it up to be on
* the front of this next chunk.
*/
if (end_part) {
tmp_text = end_part;
end_part = NULL;
}
if (line_size == (BUFFER_SIZE - 1)) {
/*
* If we filled up the buffer get the end past the last
* comma. We will tack it on the next pass through.
*/
char *last_comma = strrchr(in_line, ',');
if (!last_comma) {
error("Line %d, of hostfile %s too long",
line_num, filename);
fclose(fp);
hostlist_destroy(hostlist);
return NULL;
}
end_part = xstrdup(last_comma + 1);
*last_comma = '\0';
} else
line_num++;
xstrcat(tmp_text, in_line);
/* Skip this line */
if (tmp_text[0] == '\0')
continue;
if (!isalpha(tmp_text[0]) && !isdigit(tmp_text[0])) {
error("Invalid hostfile %s contents on line %d",
filename, line_num);
fclose(fp);
hostlist_destroy(hostlist);
xfree(end_part);
xfree(tmp_text);
return NULL;
}
host_name = strtok_r(tmp_text, ",", &save_ptr);
while (host_name) {
if ((asterisk = strchr(host_name, '*')) &&
(i = atoi(asterisk + 1))) {
asterisk[0] = '\0';
if (n != (int) NO_VAL)
i = MIN(i,
n - hostlist_count(hostlist));
/*
* Don't forget the extra space potentially
* needed
*/
total_file_len += strlen(host_name) * i;
for (j = 0; j < i; j++)
hostlist_push_host(hostlist, host_name);
} else {
hostlist_push_host(hostlist, host_name);
}
host_name = strtok_r(NULL, ",", &save_ptr);
if ((n != (int) NO_VAL) &&
(hostlist_count(hostlist) == n))
break;
}
xfree(tmp_text);
if ((n != (int)NO_VAL) && (hostlist_count(hostlist) == n))
break;
}
fclose(fp);
if (hostlist_count(hostlist) <= 0) {
error("Hostlist is empty!");
goto cleanup_hostfile;
}
if (hostlist_count(hostlist) < n) {
error("Too few NodeNames in Slurm Hostfile");
goto cleanup_hostfile;
}
total_file_len += 1024;
nodelist = (char *)malloc(total_file_len);
if (!nodelist) {
error("Nodelist xmalloc failed");
goto cleanup_hostfile;
}
if (hostlist_ranged_string(hostlist, total_file_len, nodelist) == -1) {
error("Hostlist is too long for the allocate RPC!");
free(nodelist);
nodelist = NULL;
goto cleanup_hostfile;
}
debug2("Hostlist from SLURM_HOSTFILE = %s", nodelist);
cleanup_hostfile:
hostlist_destroy(hostlist);
xfree(end_part);
xfree(tmp_text);
return nodelist;
}
/***************************************************************************
* Support functions for slurm_allocate_resources_blocking()
***************************************************************************/
static listen_t *_create_allocation_response_socket(void)
{
listen_t *listen = NULL;
uint16_t *ports;
listen = xmalloc(sizeof(listen_t));
if ((ports = slurm_get_srun_port_range()))
listen->fd = slurm_init_msg_engine_ports(ports);
else
listen->fd = slurm_init_msg_engine_port(0);
if (listen->fd < 0) {
error("slurm_init_msg_engine_port error %m");
xfree(listen);
return NULL;
}
if (slurm_get_stream_addr(listen->fd, &listen->address) < 0) {
error("slurm_get_stream_addr error %m");
close(listen->fd);
xfree(listen);
return NULL;
}
listen->hostname = xshort_hostname();
if ((listen->address.ss_family == AF_INET) ||
(listen->address.ss_family == AF_INET6)) {
listen->port = slurm_get_port(&listen->address);
} else {
error("%s: address family not supported", __func__);
_destroy_allocation_response_socket(listen);
return NULL;
}
fd_set_nonblocking(listen->fd);
return listen;
}
static void _destroy_allocation_response_socket(listen_t *listen)
{
xassert(listen != NULL);
close(listen->fd);
if (listen->hostname)
xfree(listen->hostname);
xfree(listen);
}
/* process RPC from slurmctld
* IN msg: message received
* OUT resp: resource allocation response message or List of them
* RET 1 if resp is filled in, 0 otherwise */
static int _handle_msg(slurm_msg_t *msg, uint16_t msg_type, void **resp,
uint32_t job_id)
{
uid_t req_uid;
uid_t uid = getuid();
int rc = 0;
req_uid = auth_g_get_uid(msg->auth_cred);
if ((req_uid != slurm_conf.slurm_user_id) && (req_uid != 0) &&
(req_uid != uid)) {
error ("Security violation, slurm message from uid %u",
req_uid);
return 0;
}
if (msg->msg_type == msg_type) {
debug2("resource allocation response received");
slurm_send_rc_msg(msg, SLURM_SUCCESS);
*resp = msg->data; /* transfer payload to response */
msg->data = NULL;
rc = 1;
} else if (msg->msg_type == SRUN_JOB_COMPLETE) {
srun_job_complete_msg_t *job_comp_msg = msg->data;
if (job_comp_msg->job_id == job_id) {
info("Job has been cancelled");
} else {
verbose("Ignoring SRUN_JOB_COMPLETE message for JobId=%u (our JobId=%u)",
job_comp_msg->job_id, job_id);
rc = 2;
}
} else {
error("%s: received spurious message type: %s",
__func__, rpc_num2string(msg->msg_type));
rc = 2;
}
return rc;
}
/* Accept RPC from slurmctld and process it.
* IN slurmctld_fd: file descriptor for slurmctld communications
* IN msg_type: RESPONSE_RESOURCE_ALLOCATION or RESPONSE_HET_JOB_ALLOCATION
* OUT resp: resource allocation response message or List
* RET 1 if resp is filled in, 0 otherwise */
static int _accept_msg_connection(int listen_fd, uint16_t msg_type, void **resp,
uint32_t job_id)
{
void *tls_conn = NULL;
slurm_msg_t *msg = NULL;
slurm_addr_t cli_addr;
int rc = 0;
if (!(tls_conn = slurm_accept_msg_conn(listen_fd, &cli_addr)))
return 0;
debug2("got message connection from %pA", &cli_addr);
msg = xmalloc(sizeof(slurm_msg_t));
slurm_msg_t_init(msg);
if ((rc = slurm_receive_msg(tls_conn, msg, 0)) != 0) {
slurm_free_msg(msg);
if (errno == EINTR) {
conn_g_destroy(tls_conn, true);
*resp = NULL;
return 0;
}
error("%s[%pA]: %m", __func__, &cli_addr);
conn_g_destroy(tls_conn, true);
return SLURM_ERROR;
}
rc = _handle_msg(msg, msg_type, resp, job_id); /* xfer payload */
slurm_free_msg(msg);
conn_g_destroy(tls_conn, true);
return rc;
}
/* Wait up to sleep_time for RPC from slurmctld indicating resource allocation
* has occurred.
* IN sleep_time: delay in seconds (0 means unbounded wait)
* RET -1: error, 0: timeout, 1:ready to read */
static int _wait_for_alloc_rpc(const listen_t *listen, int sleep_time)
{
struct pollfd fds[1];
int rc;
int timeout_ms;
if (listen == NULL) {
error("Listening port not found");
sleep(MAX(sleep_time, 1));
return -1;
}
fds[0].fd = listen->fd;
fds[0].events = POLLIN;
if (sleep_time != 0)
timeout_ms = sleep_time * 1000;
else
timeout_ms = -1;
while ((rc = poll(fds, 1, timeout_ms)) < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
return -1;
case EBADF:
case ENOMEM:
case EINVAL:
case EFAULT:
error("poll: %m");
return -1;
default:
error("poll: %m. Continuing...");
}
}
if (rc == 0) { /* poll timed out */
errno = ETIMEDOUT;
} else if (fds[0].revents & POLLIN) {
return 1;
}
return 0;
}
static void _wait_for_allocation_response(uint32_t job_id,
const listen_t *listen,
uint16_t msg_type, int timeout,
void **resp)
{
int errnum, rc;
info("job %u queued and waiting for resources", job_id);
*resp = NULL;
while (true) {
if ((rc = _wait_for_alloc_rpc(listen, timeout)) != 1)
break;
if ((rc = _accept_msg_connection(listen->fd, msg_type, resp,
job_id)) != 2)
break;
}
if (rc <= 0) {
errnum = errno;
/* Maybe the resource allocation response RPC got lost
* in the mail; surely it should have arrived by now.
* Let's see if the controller thinks that the allocation
* has been granted.
*/
if (msg_type == RESPONSE_RESOURCE_ALLOCATION) {
if (slurm_allocation_lookup(job_id,
(resource_allocation_response_msg_t **)
resp) >= 0)
return;
} else if (msg_type == RESPONSE_HET_JOB_ALLOCATION) {
if (slurm_het_job_lookup(job_id, (list_t **) resp) >= 0)
return;
} else {
error("%s: Invalid msg_type (%u)", __func__, msg_type);
}
if (errno == ESLURM_JOB_PENDING) {
debug3("Still waiting for allocation");
errno = errnum;
return;
} else {
debug3("Unable to confirm allocation for job %u: %m",
job_id);
return;
}
}
info("job %u has been allocated resources", job_id);
return;
}