blob: 9b897f2207b4677ac94b491fca5171c9c1b17fd2 [file] [log] [blame]
/*****************************************************************************\
* slurm_protocol_api.c - high-level slurm communication functions
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2010 Lawrence Livermore National Security.
* Copyright (C) SchedMD LLC.
* Copyright (C) 2013 Intel, Inc.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Kevin Tew <tew1@llnl.gov>, et. al.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
/* GLOBAL INCLUDES */
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <poll.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "slurm/slurm_errno.h"
#include "src/common/assoc_mgr.h"
#include "src/common/fd.h"
#include "src/common/forward.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/net.h"
#include "src/common/pack.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_common.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/stepd_proxy.h"
#include "src/common/strlcpy.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/interfaces/accounting_storage.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/conn.h"
#include "src/interfaces/hash.h"
#include "src/conmgr/conmgr.h"
#include "src/slurmdbd/read_config.h"
typedef struct {
uint32_t control_cnt;
slurm_addr_t *controller_addr;
bool vip_addr_set;
slurm_addr_t vip_addr;
} slurm_protocol_config_t;
strong_alias(convert_num_unit2, slurm_convert_num_unit2);
strong_alias(convert_num_unit, slurm_convert_num_unit);
strong_alias(revert_num_unit, slurm_revert_num_unit);
strong_alias(get_convert_unit_val, slurm_get_convert_unit_val);
strong_alias(get_unit_type, slurm_get_unit_type);
/* EXTERNAL VARIABLES */
/* #DEFINES */
/* STATIC VARIABLES */
/* STATIC FUNCTIONS */
static char *_global_auth_key(void);
static void _remap_slurmctld_errno(void);
/* define slurmdbd_conf here so we can treat its existence as a flag */
slurmdbd_conf_t *slurmdbd_conf = NULL;
/**********************************************************************\
* protocol configuration functions
\**********************************************************************/
/* Free memory space returned by _slurm_api_get_comm_config() */
static void _slurm_api_free_comm_config(slurm_protocol_config_t *proto_conf)
{
if (proto_conf) {
xfree(proto_conf->controller_addr);
xfree(proto_conf);
}
}
/*
* Get communication data structure based upon configuration file
* RET communication information structure, call _slurm_api_free_comm_config
* to release allocated memory
*/
static slurm_protocol_config_t *_slurm_api_get_comm_config(void)
{
slurm_protocol_config_t *proto_conf = NULL;
slurm_conf_t *conf;
uint16_t port;
conf = slurm_conf_lock();
if (!conf->control_cnt ||
!conf->control_addr || !conf->control_addr[0]) {
error("Unable to establish controller machine");
goto cleanup;
}
if (conf->slurmctld_port == 0) {
error("Unable to establish controller port");
goto cleanup;
}
port = slurm_conf.slurmctld_port;
port += (time(NULL) + getpid()) % slurm_conf.slurmctld_port_count;
proto_conf = xmalloc(sizeof(slurm_protocol_config_t));
proto_conf->controller_addr = xcalloc(conf->control_cnt,
sizeof(slurm_addr_t));
proto_conf->control_cnt = conf->control_cnt;
for (int i = 0; i < proto_conf->control_cnt; i++) {
if (conf->control_addr[i]) {
slurm_set_addr(&proto_conf->controller_addr[i],
port, conf->control_addr[i]);
}
}
if (conf->slurmctld_addr) {
proto_conf->vip_addr_set = true;
slurm_set_addr(&proto_conf->vip_addr, port,
conf->slurmctld_addr);
}
cleanup:
slurm_conf_unlock();
return proto_conf;
}
static int _check_hash(buf_t *buffer, header_t *header, slurm_msg_t *msg,
void *cred)
{
char *cred_hash = NULL;
uint32_t cred_hash_len = 0;
int rc;
static time_t config_update = (time_t) -1;
static bool block_null_hash = true;
static bool block_zero_hash = true;
/* No auth also means no hash to verify */
if (header->flags & SLURM_NO_AUTH_CRED)
return SLURM_SUCCESS;
if (config_update != slurm_conf.last_update) {
block_null_hash = (xstrcasestr(slurm_conf.comm_params,
"block_null_hash"));
block_zero_hash = (xstrcasestr(slurm_conf.comm_params,
"block_zero_hash"));
config_update = slurm_conf.last_update;
}
if (!slurm_get_plugin_hash_enable(msg->auth_index))
return SLURM_SUCCESS;
rc = auth_g_get_data(cred, &cred_hash, &cred_hash_len);
if (cred_hash_len) {
log_flag_hex(NET_RAW, cred_hash, cred_hash_len,
"%s: cred_hash:", __func__);
if (cred_hash[0] == HASH_PLUGIN_NONE) {
/*
* Unfortunately the older versions did not normalize
* msg_type to network-byte order when this was added
* to the payload, so the sequence may be flipped and
* either ordering must be permitted.
*/
uint16_t msg_type_nb = htons(msg->msg_type);
char *type = (char *) &msg_type_nb;
if (block_zero_hash || (cred_hash_len != 3))
rc = SLURM_ERROR;
else if ((cred_hash[1] == type[0]) &&
(cred_hash[2] == type[1]))
msg->hash_index = HASH_PLUGIN_NONE;
else
rc = SLURM_ERROR;
} else {
char *data;
uint32_t size = header->body_length;
slurm_hash_t hash = { 0 };
int h_len;
uint16_t msg_type = htons(msg->msg_type);
data = get_buf_data(buffer) + get_buf_offset(buffer);
hash.type = cred_hash[0];
h_len = hash_g_compute(data, size, (char *) &msg_type,
sizeof(msg_type), &hash);
if ((h_len + 1) != cred_hash_len ||
memcmp(cred_hash + 1, hash.hash, h_len))
rc = SLURM_ERROR;
else
msg->hash_index = hash.type;
log_flag_hex(NET_RAW, &hash, sizeof(hash),
"%s: hash:", __func__);
}
} else if (block_null_hash)
rc = SLURM_ERROR;
xfree(cred_hash);
return rc;
}
static int _compute_hash(buf_t *buffer, slurm_msg_t *msg, slurm_hash_t *hash)
{
int h_len = 0;
if (slurm_get_plugin_hash_enable(msg->auth_index)) {
uint16_t msg_type = htons(msg->msg_type);
if (msg->hash_index != HASH_PLUGIN_DEFAULT)
hash->type = msg->hash_index;
if (hash->type == HASH_PLUGIN_NONE) {
memcpy(hash->hash, &msg_type, sizeof(msg_type));
h_len = sizeof(msg->msg_type);
} else {
h_len = hash_g_compute(get_buf_data(buffer),
get_buf_offset(buffer),
(char *) &msg_type,
sizeof(msg_type), hash);
}
if (h_len < 0)
return h_len;
h_len++;
}
return h_len;
}
static int _get_tres_id(char *type, char *name)
{
slurmdb_tres_rec_t tres_rec;
memset(&tres_rec, 0, sizeof(slurmdb_tres_rec_t));
tres_rec.type = type;
tres_rec.name = name;
return assoc_mgr_find_tres_pos(&tres_rec, false);
}
static int _tres_weight_item(double *weights, char *item_str)
{
char *type = NULL, *value_str = NULL, *val_unit = NULL, *name = NULL;
int tres_id;
double weight_value = 0;
if (!item_str) {
error("TRES weight item is null");
return SLURM_ERROR;
}
type = strtok_r(item_str, "=", &value_str);
if (type == NULL) {
error("\"%s\" is an invalid TRES weight entry", item_str);
return SLURM_ERROR;
}
if (strchr(type, '/'))
type = strtok_r(type, "/", &name);
if (!value_str || !*value_str) {
error("\"%s\" is an invalid TRES weight entry", item_str);
return SLURM_ERROR;
}
if ((tres_id = _get_tres_id(type, name)) == -1) {
error("TRES weight '%s%s%s' is not a configured TRES type.",
type, (name) ? ":" : "", (name) ? name : "");
return SLURM_ERROR;
}
errno = 0;
weight_value = strtod(value_str, &val_unit);
if (errno) {
error("Unable to convert %s value to double in %s",
__func__, value_str);
return SLURM_ERROR;
}
if (val_unit && *val_unit) {
int base_unit = slurmdb_get_tres_base_unit(type);
int convert_val = get_convert_unit_val(base_unit, *val_unit);
if (convert_val == SLURM_ERROR)
return SLURM_ERROR;
if (convert_val > 0) {
weight_value /= convert_val;
}
}
weights[tres_id] = weight_value;
return SLURM_SUCCESS;
}
/* slurm_get_tres_weight_array
* IN weights_str - string of tres and weights to be parsed.
* IN tres_cnt - count of how many tres' are on the system (e.g.
* slurmctld_tres_cnt).
* IN fail - whether to fatal or not if there are parsing errors.
* RET double* of tres weights.
*/
double *slurm_get_tres_weight_array(char *weights_str, int tres_cnt, bool fail)
{
double *weights;
char *tmp_str;
char *token, *last = NULL;
if (!weights_str || !*weights_str || !tres_cnt)
return NULL;
tmp_str = xstrdup(weights_str);
weights = xcalloc(tres_cnt, sizeof(double));
token = strtok_r(tmp_str, ",", &last);
while (token) {
if (_tres_weight_item(weights, token)) {
xfree(weights);
xfree(tmp_str);
if (fail)
fatal("failed to parse tres weights str '%s'",
weights_str);
else
error("failed to parse tres weights str '%s'",
weights_str);
return NULL;
}
token = strtok_r(NULL, ",", &last);
}
xfree(tmp_str);
return weights;
}
/* slurm_get_stepd_loc
* get path to the slurmstepd
* 1. configure --sbindir concatenated with slurmstepd.
* 2. configure --prefix concatenated with /sbin/slurmstepd.
* RET char * - absolute path to the slurmstepd, MUST be xfreed by caller
*/
extern char *slurm_get_stepd_loc(void)
{
#ifdef SBINDIR
return xstrdup_printf("%s/slurmstepd", SBINDIR);
#elif defined SLURM_PREFIX
return xstrdup_printf("%s/sbin/slurmstepd", SLURM_PREFIX);
#endif
}
/* slurm_get_tmp_fs
* returns the TmpFS configuration parameter from slurm_conf object
* RET char * - tmp_fs, MUST be xfreed by caller
*/
extern char *slurm_get_tmp_fs(char *node_name)
{
char *tmp_fs = NULL;
slurm_conf_t *conf = NULL;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
if (!node_name)
tmp_fs = xstrdup(conf->tmp_fs);
else
tmp_fs = slurm_conf_expand_slurmd_path(
conf->tmp_fs, node_name, NULL);
slurm_conf_unlock();
}
return tmp_fs;
}
/* slurm_get_track_wckey
* returns the value of track_wckey in slurm_conf object
*/
extern uint16_t slurm_get_track_wckey(void)
{
uint16_t track_wckey = 0;
slurm_conf_t *conf;
if (slurmdbd_conf) {
track_wckey = slurmdbd_conf->track_wckey;
} else {
conf = slurm_conf_lock();
track_wckey = conf->conf_flags & CONF_FLAG_WCKEY ? 1 : 0;
slurm_conf_unlock();
}
return track_wckey;
}
/* slurm_with_slurmdbd
* returns true if operating with slurmdbd
*/
bool slurm_with_slurmdbd(void)
{
static bool with_slurmdbd = false;
static bool is_set = false;
slurm_conf_t *conf;
/*
* Since accounting_storage_type is a plugin and plugins can't change
* on reconfigure, we don't need to worry about reconfigure with this
* static variable.
*/
if (is_set)
return with_slurmdbd;
conf = slurm_conf_lock();
with_slurmdbd = !xstrcasecmp(conf->accounting_storage_type,
"accounting_storage/slurmdbd");
is_set = true;
slurm_conf_unlock();
return with_slurmdbd;
}
/*
* Convert AuthInfo to a socket path. Accepts two input formats:
* 1) <path> (Old format)
* 2) socket=<path>[,] (New format)
* NOTE: Caller must xfree return value
*/
extern char *slurm_auth_opts_to_socket(char *opts)
{
char *socket = NULL;
if (!opts)
return NULL;
socket = conf_get_opt_str(opts, "socket=");
/*
* If socket not specified AND = is not present, assume its the old
* format and opts is the socket
*/
if (!socket && !strchr(opts, '='))
socket = xstrdup(opts);
return socket;
}
/* slurm_get_auth_ttl
* returns the credential Time To Live option from the AuthInfo parameter
* cache value in local buffer for best performance
* RET int - Time To Live in seconds or 0 if not specified
*/
extern int slurm_get_auth_ttl(void)
{
static int ttl = -1;
char *tmp;
if (ttl >= 0)
return ttl;
if (!slurm_conf.authinfo)
return 0;
tmp = strstr(slurm_conf.authinfo, "ttl=");
if (tmp) {
ttl = atoi(tmp + 4);
if (ttl < 0)
ttl = 0;
} else {
ttl = 0;
}
return ttl;
}
/* _global_auth_key
* returns the storage password from slurm_conf or slurmdbd_conf object
* cache value in local buffer for best performance
* RET char * - storage password
*/
static char *_global_auth_key(void)
{
static bool loaded_storage_pass = false;
static char storage_pass[512] = "\0";
static char *storage_pass_ptr = NULL;
if (loaded_storage_pass)
return storage_pass_ptr;
if (slurmdbd_conf) {
if (slurm_conf.authinfo) {
if (strlcpy(storage_pass, slurm_conf.authinfo,
sizeof(storage_pass))
>= sizeof(storage_pass))
fatal("AuthInfo is too long");
storage_pass_ptr = storage_pass;
}
} else {
slurm_conf_t *conf = slurm_conf_lock();
if (conf->accounting_storage_pass) {
if (strlcpy(storage_pass, conf->accounting_storage_pass,
sizeof(storage_pass))
>= sizeof(storage_pass))
fatal("AccountingStoragePass is too long");
storage_pass_ptr = storage_pass;
}
slurm_conf_unlock();
}
loaded_storage_pass = true;
return storage_pass_ptr;
}
/* slurm_get_interconnect_accounting_type
* get InterconnectAccountingType from slurm_conf object
* RET char * - interconnect_accounting type, MUST be xfreed by caller
*/
char *slurm_get_acct_gather_interconnect_type(void)
{
char *acct_gather_interconnect_type = NULL;
slurm_conf_t *conf;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
acct_gather_interconnect_type =
xstrdup(conf->acct_gather_interconnect_type);
slurm_conf_unlock();
}
return acct_gather_interconnect_type;
}
/*
* returns the configured GpuFreqDef value
* RET char * - GpuFreqDef value, MUST be xfreed by caller
*/
char *slurm_get_gpu_freq_def(void)
{
char *gpu_freq_def = NULL;
slurm_conf_t *conf;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
gpu_freq_def = xstrdup(conf->gpu_freq_def);
slurm_conf_unlock();
}
return gpu_freq_def;
}
/* slurm_get_preempt_type
* get PreemptType from slurm_conf object
* RET char * - preempt type, MUST be xfreed by caller
*/
char *slurm_get_preempt_type(void)
{
char *preempt_type = NULL;
slurm_conf_t *conf;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
preempt_type = xstrdup(conf->preempt_type);
slurm_conf_unlock();
}
return preempt_type;
}
/* slurm_get_select_type
* get select_type from slurm_conf object
* RET char * - select_type, MUST be xfreed by caller
*/
char *slurm_get_select_type(void)
{
char *select_type = NULL;
slurm_conf_t *conf;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
select_type = xstrdup(conf->select_type);
slurm_conf_unlock();
}
return select_type;
}
/* slurm_get_srun_port_range()
*/
uint16_t *
slurm_get_srun_port_range(void)
{
uint16_t *ports = NULL;
slurm_conf_t *conf;
if (slurmdbd_conf) {
} else {
conf = slurm_conf_lock();
ports = conf->srun_port_range;
slurm_conf_unlock();
}
return ports; /* CLANG false positive */
}
/* Change general slurm communication errors to slurmctld specific errors */
static void _remap_slurmctld_errno(void)
{
int err = errno;
if (err == SLURM_COMMUNICATIONS_CONNECTION_ERROR)
errno = SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR;
else if (err == SLURM_COMMUNICATIONS_SEND_ERROR)
errno = SLURMCTLD_COMMUNICATIONS_SEND_ERROR;
else if (err == SLURM_COMMUNICATIONS_RECEIVE_ERROR)
errno = SLURMCTLD_COMMUNICATIONS_RECEIVE_ERROR;
else if (err == SLURM_COMMUNICATIONS_SHUTDOWN_ERROR)
errno = SLURMCTLD_COMMUNICATIONS_SHUTDOWN_ERROR;
}
/**********************************************************************\
* general message management functions used by slurmctld, slurmd
\**********************************************************************/
/* In the socket implementation it creates a socket, binds to it, and
* listens for connections. Retry if bind() or listen() fail
* even if asked for an ephemeral port.
*
* IN port - port to bind the msg server to
* RET int - file descriptor of the connection created
*/
int slurm_init_msg_engine_port(uint16_t port)
{
int cc;
slurm_addr_t addr;
int i;
slurm_setup_addr(&addr, port);
cc = slurm_init_msg_engine(&addr, (port == 0));
if ((cc < 0) && (port == 0) && (errno == EADDRINUSE)) {
/* All ephemeral ports are in use, test other ports */
for (i = 10001; i < 65536; i++) {
slurm_set_port(&addr, i);
cc = slurm_init_msg_engine(&addr, true);
if (cc >= 0)
break;
}
if (cc < 0)
error("%s: all ephemeral ports, and the range (10001, 65536) are exhausted, cannot establish listening port",
__func__);
}
return cc;
}
/* slurm_init_msg_engine_ports()
*/
int slurm_init_msg_engine_ports(uint16_t *ports)
{
int fd;
uint16_t port;
return net_stream_listen_ports(&fd, &port, ports, false);
}
/**********************************************************************\
* msg connection establishment functions used by msg clients
\**********************************************************************/
static void *_open_msg_conn(slurm_addr_t *addr, char *tls_cert, bool maybe)
{
int fd;
void *tls_conn = NULL;
conn_args_t tls_args = {
.mode = TLS_CONN_CLIENT,
.cert = tls_cert,
.maybe = maybe,
};
if ((fd = slurm_open_stream(addr, false)) < 0) {
log_flag(NET, "Unable to connect to address %pA: %m", addr);
return NULL;
}
tls_args.input_fd = tls_args.output_fd = fd;
if (!(tls_conn = conn_g_create(&tls_args))) {
log_flag(NET, "Unable to create client TLS connection to address %pA on fd %d: %m",
addr, fd);
fd_close(&fd);
return NULL;
}
log_flag(NET, "Successfully opened connection to %pA on fd %d",
addr, fd);
return tls_conn;
}
extern void *slurm_open_msg_conn(slurm_addr_t *addr, char *tls_cert)
{
return _open_msg_conn(addr, tls_cert, false);
}
extern void *slurm_open_msg_conn_maybe(slurm_addr_t *addr, char *tls_cert)
{
return _open_msg_conn(addr, tls_cert, true);
}
/*
* Calls connect to make a connection-less datagram connection
* primary or secondary slurmctld message engine
* IN/OUT addr - address of controller contacted
* IN/OUT index - IN: which controller to start from
* - OUT: which controller is connected
* IN comm_cluster_rec - Communication record (host/port/version)/
* RET tls_conn or NULL on error
*/
static void *_open_controller(int *index,
slurmdb_cluster_rec_t *comm_cluster_rec)
{
void *tls_conn = NULL;
int retry = false;
time_t start;
slurm_protocol_config_t *proto_conf = NULL;
if (!comm_cluster_rec) {
/* This means the addr wasn't set up already */
if (!(proto_conf = _slurm_api_get_comm_config()))
return NULL;
}
start = time(NULL);
while (true) {
if (retry) {
if ((time(NULL) - start) >= slurm_conf.msg_timeout)
break;
sleep(1);
}
retry = true;
if (comm_cluster_rec) {
if (slurm_addr_is_unspec(
&comm_cluster_rec->control_addr)) {
slurm_set_addr(
&comm_cluster_rec->control_addr,
comm_cluster_rec->control_port,
comm_cluster_rec->control_host);
}
if ((tls_conn = slurm_open_msg_conn(
&comm_cluster_rec->control_addr, NULL)))
goto end_it;
log_flag(NET, "%s: Failed to contact controller(%pA): %m",
__func__, &comm_cluster_rec->control_addr);
} else if (proto_conf->vip_addr_set) {
if ((tls_conn =
slurm_open_msg_conn(&proto_conf->vip_addr,
NULL)))
goto end_it;
log_flag(NET, "%s: Failed to contact controller(%pA): %m",
__func__, &proto_conf->vip_addr);
} else {
for (int i = 0; i < proto_conf->control_cnt; i++) {
int inx = (*index + i) % proto_conf->control_cnt;
slurm_addr_t *ctrl_addr =
&proto_conf->controller_addr[inx];
if (slurm_addr_is_unspec(ctrl_addr))
continue;
if ((tls_conn = slurm_open_msg_conn(ctrl_addr,
NULL))) {
log_flag(NET, "%s: Contacted SlurmctldHost[%d](%pA)",
__func__, inx, ctrl_addr);
*index = inx;
goto end_it;
}
log_flag(NET, "%s: Failed to contact SlurmctldHost[%d](%pA): %m",
__func__, inx, ctrl_addr);
}
*index = 0;
}
}
_slurm_api_free_comm_config(proto_conf);
errno = SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR;
return NULL;
end_it:
_slurm_api_free_comm_config(proto_conf);
return tls_conn;
}
/*
* Calls connect to make a connection-less datagram connection to a specific
* primary or backup slurmctld message engine
* IN dest - controller to contact (0=primary, 1=backup, 2=backup2, etc.)
* IN comm_cluster_rec - Communication record (host/port/version)/
* RET tls_conn or NULL
*/
extern void *slurm_open_controller(int dest,
slurmdb_cluster_rec_t *comm_cluster_rec)
{
slurm_protocol_config_t *proto_conf = NULL;
slurm_addr_t *addr = NULL;
void *tls_conn = NULL;
if (comm_cluster_rec) {
if (slurm_addr_is_unspec(&comm_cluster_rec->control_addr)) {
slurm_set_addr(
&comm_cluster_rec->control_addr,
comm_cluster_rec->control_port,
comm_cluster_rec->control_host);
}
addr = &comm_cluster_rec->control_addr;
} else { /* Some backup slurmctld */
if (!(proto_conf = _slurm_api_get_comm_config())) {
debug3("Error: Unable to set default config");
return NULL;
}
addr = NULL;
if ((dest >= 0) && (dest <= proto_conf->control_cnt))
addr = &proto_conf->controller_addr[dest];
if (!addr)
goto fini;
}
xassert(addr);
if (!(tls_conn = slurm_open_msg_conn(addr, NULL))) {
log_flag(NET, "%s: slurm_open_msg_conn(%pA): %m",
__func__, addr);
_remap_slurmctld_errno();
}
fini:
_slurm_api_free_comm_config(proto_conf);
return tls_conn;
}
extern int slurm_unpack_received_msg(slurm_msg_t *msg, int fd, buf_t *buffer)
{
header_t header;
int rc;
void *auth_cred = NULL;
char *peer = NULL;
if (slurm_conf.debug_flags & (DEBUG_FLAG_NET | DEBUG_FLAG_NET_RAW)) {
/*
* cache to avoid resolving multiple times
* this call is expensive
*/
peer = fd_resolve_peer(fd);
}
if ((rc = unpack_header(&header, buffer)))
goto total_return;
log_flag(NET_RAW, "%s: [%s] header version=0x%hx flags=0x%hx msg_type=%s(0x%hx) body_length=%ub ret_cnt=%hx forward.cnt=%hu forward.init=0x%hx forward.nodelist=%s forward.timeout=%u forward.tree_width=%hu orig_addr=%pA",
__func__, peer, header.version, header.flags,
rpc_num2string(header.msg_type), header.msg_type,
header.body_length, header.ret_cnt, header.forward.cnt,
header.forward.init, header.forward.nodelist,
header.forward.timeout, header.forward.tree_width,
&header.orig_addr);
if (header.ret_cnt > 0) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] we received more than one message back use slurm_receive_msgs instead",
__func__, peer);
header.ret_cnt = 0;
FREE_NULL_LIST(header.ret_list);
header.ret_list = NULL;
}
/* Forward message to other nodes */
if (header.forward.cnt > 0) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] We need to forward this to other nodes use slurm_unpack_msg_and_forward instead",
__func__, peer);
header.forward.cnt = 0;
xfree(header.forward.nodelist);
}
/* Copy over header info before attempting auth */
msg->protocol_version = header.version;
msg->msg_type = header.msg_type;
msg->flags = header.flags;
if (header.flags & SLURM_NO_AUTH_CRED)
goto skip_auth;
if (!(auth_cred = auth_g_unpack(buffer, header.version))) {
int rc2 = errno;
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_unpack: %s has authentication error: %s",
__func__, peer, rpc_num2string(header.msg_type),
slurm_strerror(rc2));
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
msg->auth_index = auth_index(auth_cred);
if (header.flags & SLURM_GLOBAL_AUTH_KEY) {
rc = auth_g_verify(auth_cred, _global_auth_key());
} else {
rc = auth_g_verify(auth_cred, slurm_conf.authinfo);
}
if (rc != SLURM_SUCCESS) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_verify: %s has authentication error: %s",
__func__, peer, rpc_num2string(header.msg_type),
slurm_strerror(rc));
auth_g_destroy(auth_cred);
rc = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto total_return;
}
auth_g_get_ids(auth_cred, &msg->auth_uid, &msg->auth_gid);
msg->auth_ids_set = true;
skip_auth:
/*
* Unpack message body
*/
msg->body_offset = get_buf_offset(buffer);
if ((header.body_length != remaining_buf(buffer)) ||
_check_hash(buffer, &header, msg, auth_cred) ||
(unpack_msg(msg, buffer) != SLURM_SUCCESS)) {
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
auth_g_destroy(auth_cred);
goto total_return;
}
msg->auth_cred = auth_cred;
rc = SLURM_SUCCESS;
total_return:
destroy_forward(&header.forward);
errno = rc;
if (rc != SLURM_SUCCESS) {
msg->auth_cred = NULL;
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] %s", __func__, peer, slurm_strerror(rc));
usleep(10000); /* Discourage brute force attack */
}
xfree(peer);
return rc;
}
/**********************************************************************\
* receive message functions
\**********************************************************************/
/*
* NOTE: memory is allocated for the returned msg must be freed at
* some point using the slurm_free_functions.
* IN tls_conn
* OUT msg - a slurm_msg struct to be filled in by the function
* IN timeout - how long to wait in milliseconds
* RET int - returns 0 on success, -1 on failure and sets errno
*/
extern int slurm_receive_msg(void *tls_conn, slurm_msg_t *msg, int timeout)
{
int fd = -1;
char *buf = NULL;
size_t buflen = 0;
int rc;
buf_t *buffer;
bool keep_buffer = false;
if (msg->flags & SLURM_MSG_KEEP_BUFFER)
keep_buffer = true;
if (msg->conn) {
persist_msg_t persist_msg;
buffer = slurm_persist_recv_msg(msg->conn);
if (!buffer) {
error("%s: No response to persist_init", __func__);
slurm_persist_conn_close(msg->conn);
return SLURM_ERROR;
}
memset(&persist_msg, 0, sizeof(persist_msg_t));
rc = slurm_persist_msg_unpack(msg->conn, &persist_msg, buffer);
if (keep_buffer)
msg->buffer = buffer;
else
FREE_NULL_BUFFER(buffer);
if (rc) {
error("%s: Failed to unpack persist msg", __func__);
slurm_persist_conn_close(msg->conn);
return SLURM_ERROR;
}
msg->msg_type = persist_msg.msg_type;
msg->data = persist_msg.data;
return SLURM_SUCCESS;
}
fd = conn_g_get_fd(tls_conn);
msg->tls_conn = tls_conn;
if (timeout <= 0) {
/* convert secs to msec */
timeout = slurm_conf.msg_timeout * MSEC_IN_SEC;
} else if (timeout > (slurm_conf.msg_timeout * MSEC_IN_SEC * 10)) {
/* consider 10x the timeout to be very long */
log_flag(NET, "%s: You are receiving a message with very long timeout of %d seconds",
__func__, (timeout / MSEC_IN_SEC));
} else if (timeout < MSEC_IN_SEC) {
/* consider a less than 1 second to be very short */
error("%s: You are receiving a message with a very short "
"timeout of %d msecs", __func__, timeout);
}
/*
* Receive a msg. slurm_msg_recvfrom() will read the message
* length and allocate space on the heap for a buffer containing
* the message.
*/
if (slurm_msg_recvfrom_timeout(tls_conn, &buf, &buflen, timeout) < 0) {
rc = errno;
if (!rc)
rc = SLURMCTLD_COMMUNICATIONS_RECEIVE_ERROR;
goto endit;
}
log_flag_hex(NET_RAW, buf, buflen, "%s: read", __func__);
buffer = create_buf(buf, buflen);
rc = slurm_unpack_received_msg(msg, fd, buffer);
if (keep_buffer)
msg->buffer = buffer;
else
FREE_NULL_BUFFER(buffer);
log_flag(NET, "Received message %s from %pA on fd %d",
rpc_num2string(msg->msg_type), &msg->address, fd);
endit:
errno = rc;
/*
* We just set errno, we need to return SLURM_ERROR if not SLURM_SUCCESS
*/
if (rc != SLURM_SUCCESS)
rc = SLURM_ERROR;
return rc;
}
/*
* NOTE: memory is allocated for the returned list
* and must be freed at some point using the list_destroy function.
* IN tls_conn
* IN steps - how many steps down the tree we have to wait for
* IN timeout - how long to wait in milliseconds
* RET List - List containing the responses of the children (if any) we
* forwarded the message to. List containing type
* (ret_data_info_t).
*/
extern list_t *slurm_receive_msgs(void *tls_conn, int steps, int timeout)
{
int fd = -1;
char *buf = NULL;
size_t buflen = 0;
header_t header;
int rc;
void *auth_cred = NULL;
slurm_msg_t msg;
buf_t *buffer;
ret_data_info_t *ret_data_info = NULL;
list_t *ret_list = NULL;
int orig_timeout = timeout;
char *peer = NULL;
fd = conn_g_get_fd(tls_conn);
if (slurm_conf.debug_flags & (DEBUG_FLAG_NET | DEBUG_FLAG_NET_RAW)) {
/*
* cache to avoid resolving multiple times
* this call is expensive
*/
peer = fd_resolve_peer(fd);
}
slurm_msg_t_init(&msg);
if (timeout <= 0) {
/* convert secs to msec */
timeout = slurm_conf.msg_timeout * 1000;
orig_timeout = timeout;
}
if (steps) {
orig_timeout = timeout / (2 * steps);
steps--;
}
log_flag(NET, "%s: [%s] orig_timeout was %d we have %d steps and a timeout of %d",
__func__, peer, orig_timeout, steps, timeout);
/* we compare to the orig_timeout here because that is really
* what we are going to wait for each step
*/
if (orig_timeout >= (slurm_conf.msg_timeout * 10000)) {
log_flag(NET, "%s: [%s] Sending a message with timeout's greater than %d seconds, requested timeout is %d seconds",
__func__, peer, (slurm_conf.msg_timeout * 10),
(timeout/1000));
} else if (orig_timeout < 1000) {
log_flag(NET, "%s: [%s] Sending a message with a very short timeout of %d milliseconds each step in the tree has %d milliseconds",
__func__, peer, timeout, orig_timeout);
}
/*
* Receive a msg. slurm_msg_recvfrom() will read the message
* length and allocate space on the heap for a buffer containing
* the message.
*/
if (slurm_msg_recvfrom_timeout(tls_conn, &buf, &buflen, timeout) < 0) {
forward_init(&header.forward);
rc = errno;
goto total_return;
}
log_flag_hex(NET_RAW, buf, buflen, "%s: [%s] read", __func__, peer);
buffer = create_buf(buf, buflen);
if ((rc = unpack_header(&header, buffer))) {
FREE_NULL_BUFFER(buffer);
goto total_return;
}
if (header.ret_cnt > 0) {
if (header.ret_list)
ret_list = header.ret_list;
else
ret_list = list_create(destroy_data_info);
header.ret_cnt = 0;
header.ret_list = NULL;
}
/* Forward message to other nodes */
if (header.forward.cnt > 0) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] We need to forward this to other nodes use slurm_unpack_msg_and_forward instead",
__func__, peer);
}
if (header.flags & SLURM_NO_AUTH_CRED)
goto skip_auth;
if (!(auth_cred = auth_g_unpack(buffer, header.version))) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_unpack: %m", __func__, peer);
FREE_NULL_BUFFER(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
msg.auth_index = auth_index(auth_cred);
if (header.flags & SLURM_GLOBAL_AUTH_KEY) {
rc = auth_g_verify(auth_cred, _global_auth_key());
} else {
rc = auth_g_verify(auth_cred, slurm_conf.authinfo);
}
if (rc != SLURM_SUCCESS) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_verify: %s has authentication error: %m",
__func__, peer, rpc_num2string(header.msg_type));
auth_g_destroy(auth_cred);
FREE_NULL_BUFFER(buffer);
rc = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto total_return;
}
auth_g_get_ids(auth_cred, &msg.auth_uid, &msg.auth_gid);
msg.auth_ids_set = true;
skip_auth:
/*
* Unpack message body
*/
msg.protocol_version = header.version;
msg.msg_type = header.msg_type;
msg.flags = header.flags;
if ((header.body_length != remaining_buf(buffer)) ||
_check_hash(buffer, &header, &msg, auth_cred) ||
(unpack_msg(&msg, buffer) != SLURM_SUCCESS)) {
auth_g_destroy(auth_cred);
FREE_NULL_BUFFER(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
auth_g_destroy(auth_cred);
FREE_NULL_BUFFER(buffer);
rc = SLURM_SUCCESS;
total_return:
destroy_forward(&header.forward);
if (rc != SLURM_SUCCESS) {
if (ret_list) {
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->err = rc;
ret_data_info->type = RESPONSE_FORWARD_FAILED;
ret_data_info->data = NULL;
list_push(ret_list, ret_data_info);
}
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] failed: %s",
__func__, peer, slurm_strerror(rc));
usleep(10000); /* Discourage brute force attack */
} else {
if (!ret_list)
ret_list = list_create(destroy_data_info);
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->err = rc;
ret_data_info->node_name = NULL;
ret_data_info->type = msg.msg_type;
ret_data_info->data = msg.data;
list_push(ret_list, ret_data_info);
}
errno = rc;
xfree(peer);
return ret_list;
}
extern list_t *slurm_receive_resp_msgs(void *tls_conn, int steps, int timeout)
{
int fd = -1;
char *buf = NULL;
size_t buflen = 0;
header_t header;
int rc;
void *auth_cred = NULL;
slurm_msg_t msg;
buf_t *buffer;
ret_data_info_t *ret_data_info = NULL;
list_t *ret_list = NULL;
int orig_timeout = timeout;
char *peer = NULL;
fd = conn_g_get_fd(tls_conn);
if (slurm_conf.debug_flags & (DEBUG_FLAG_NET | DEBUG_FLAG_NET_RAW)) {
/*
* cache to avoid resolving multiple times
* this call is expensive
*/
peer = fd_resolve_peer(fd);
}
slurm_msg_t_init(&msg);
if (timeout <= 0) {
/* convert secs to msec */
timeout = slurm_conf.msg_timeout * 1000;
orig_timeout = timeout;
}
if (steps) {
orig_timeout = timeout / (2 * steps);
steps--;
}
log_flag(NET, "%s: [%s] orig_timeout was %d we have %d steps and a timeout of %d",
__func__, peer, orig_timeout, steps, timeout);
/*
* Compare to the orig_timeout here, because that is what we are
* going to wait for each step.
*/
if (orig_timeout >= (slurm_conf.msg_timeout * 10000)) {
log_flag(NET, "%s: [%s] Sending a message with timeouts greater than %d seconds, requested timeout is %d seconds",
__func__, peer, (slurm_conf.msg_timeout * 10),
(timeout / 1000));
} else if (orig_timeout < 1000) {
log_flag(NET, "%s: [%s] Sending a message with a very short timeout of %d milliseconds, each step in the tree has %d milliseconds",
__func__, peer, timeout, orig_timeout);
}
/*
* Receive a msg. slurm_msg_recvfrom_timeout() will read the message
* length and allocate space on the heap for a buffer containing the
* message.
*/
if (slurm_msg_recvfrom_timeout(tls_conn, &buf, &buflen, timeout) < 0) {
forward_init(&header.forward);
rc = errno;
goto total_return;
}
log_flag_hex(NET_RAW, buf, buflen, "%s: [%s] read", __func__, peer);
buffer = create_buf(buf, buflen);
if ((rc = unpack_header(&header, buffer))) {
FREE_NULL_BUFFER(buffer);
goto total_return;
}
if (header.ret_cnt > 0) {
if (header.ret_list)
ret_list = header.ret_list;
else
ret_list = list_create(destroy_data_info);
header.ret_cnt = 0;
header.ret_list = NULL;
}
/* Forward message to other nodes */
if (header.forward.cnt > 0) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] We need to forward this to other nodes use slurm_unpack_msg_and_forward instead",
__func__, peer);
}
if (header.flags & SLURM_NO_AUTH_CRED)
goto skip_auth;
/*
* Skip credential verification here. This is on the reply path, so the
* connections have been previously verified in the opposite direction.
*/
if (!(auth_cred = auth_g_unpack(buffer, header.version))) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_unpack: %m", __func__, peer);
FREE_NULL_BUFFER(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
auth_g_destroy(auth_cred);
skip_auth:
/*
* Unpack message body
*/
msg.protocol_version = header.version;
msg.msg_type = header.msg_type;
msg.flags = header.flags;
if ((header.body_length > remaining_buf(buffer)) ||
(unpack_msg(&msg, buffer) != SLURM_SUCCESS)) {
FREE_NULL_BUFFER(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
FREE_NULL_BUFFER(buffer);
rc = SLURM_SUCCESS;
total_return:
destroy_forward(&header.forward);
if (rc != SLURM_SUCCESS) {
if (ret_list) {
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->err = rc;
ret_data_info->type = RESPONSE_FORWARD_FAILED;
ret_data_info->data = NULL;
list_push(ret_list, ret_data_info);
}
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] failed: %s",
__func__, peer, slurm_strerror(rc));
usleep(10000); /* Discourage brute force attack */
} else {
if (!ret_list)
ret_list = list_create(destroy_data_info);
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->err = rc;
ret_data_info->node_name = NULL;
ret_data_info->type = msg.msg_type;
ret_data_info->data = msg.data;
list_push(ret_list, ret_data_info);
}
errno = rc;
xfree(peer);
return ret_list;
}
extern int slurm_unpack_msg_and_forward(slurm_msg_t *msg,
slurm_addr_t *orig_addr, int fd,
buf_t *buffer)
{
header_t header;
int rc;
void *auth_cred = NULL;
char *peer = NULL;
xassert(fd >= 0);
if (slurm_conf.debug_flags & (DEBUG_FLAG_NET | DEBUG_FLAG_NET_RAW)) {
/*
* cache to avoid resolving multiple times
* this call is expensive
*/
peer = fd_resolve_peer(fd);
}
/* this is the direct peer connection */
memcpy(&msg->address, orig_addr, sizeof(slurm_addr_t));
/*
* Where the connection originated from. This will change to the
* originator if the header is unpacked successfully later.
*/
memcpy(&msg->orig_addr, orig_addr, sizeof(slurm_addr_t));
msg->ret_list = list_create(destroy_data_info);
if ((rc = unpack_header(&header, buffer)))
goto total_return;
if (header.ret_cnt > 0) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] we received more than one message back use slurm_receive_msgs instead",
__func__, peer);
header.ret_cnt = 0;
FREE_NULL_LIST(header.ret_list);
header.ret_list = NULL;
}
/*
* Update orig_addr to where the connection originated from before
* the forwarding subsystem relayed it to us.
*/
if (!slurm_addr_is_unspec(&header.orig_addr)) {
memcpy(&msg->orig_addr, &header.orig_addr,
sizeof(slurm_addr_t));
} else {
memcpy(&header.orig_addr, orig_addr, sizeof(slurm_addr_t));
}
/* Forward message to other nodes */
if (header.forward.cnt > 0) {
log_flag(NET, "%s: [%s] forwarding to %u nodes",
__func__, peer, header.forward.cnt);
msg->forward_struct = xmalloc(sizeof(forward_struct_t));
slurm_mutex_init(&msg->forward_struct->forward_mutex);
slurm_cond_init(&msg->forward_struct->notify, NULL);
msg->forward_struct->buf_len = remaining_buf(buffer);
msg->forward_struct->buf =
xmalloc(msg->forward_struct->buf_len);
memcpy(msg->forward_struct->buf,
&buffer->head[buffer->processed],
msg->forward_struct->buf_len);
msg->forward_struct->ret_list = msg->ret_list;
msg->forward_struct->fwd_cnt = header.forward.cnt;
if (forward_msg(msg->forward_struct, &header) == SLURM_ERROR) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] problem with forward msg",
__func__, peer);
}
}
if (header.flags & SLURM_NO_AUTH_CRED)
goto skip_auth;
if (!(auth_cred = auth_g_unpack(buffer, header.version))) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_unpack: %s has authentication error: %m",
__func__, peer, rpc_num2string(header.msg_type));
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
msg->auth_index = auth_index(auth_cred);
if (header.flags & SLURM_GLOBAL_AUTH_KEY) {
rc = auth_g_verify(auth_cred, _global_auth_key());
} else {
rc = auth_g_verify(auth_cred, slurm_conf.authinfo);
}
if (rc != SLURM_SUCCESS) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
error("%s: [%s] auth_g_verify: %s has authentication error: %m",
__func__, peer, rpc_num2string(header.msg_type));
auth_g_destroy(auth_cred);
rc = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto total_return;
}
auth_g_get_ids(auth_cred, &msg->auth_uid, &msg->auth_gid);
msg->auth_ids_set = true;
skip_auth:
/* Unpack message body */
msg->protocol_version = header.version;
msg->msg_type = header.msg_type;
msg->flags = header.flags;
msg->body_offset = get_buf_offset(buffer);
if ((header.body_length != remaining_buf(buffer)) ||
_check_hash(buffer, &header, msg, auth_cred) ||
(unpack_msg(msg, buffer) != SLURM_SUCCESS)) {
auth_g_destroy(auth_cred);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
msg->auth_cred = auth_cred;
rc = SLURM_SUCCESS;
total_return:
destroy_forward(&header.forward);
errno = rc;
if (rc != SLURM_SUCCESS) {
/* peer may have not been resolved already */
if (!peer)
peer = fd_resolve_peer(fd);
msg->msg_type = RESPONSE_FORWARD_FAILED;
msg->auth_cred = NULL;
msg->data = NULL;
error("%s: [%s] failed: %s",
__func__, peer, slurm_strerror(rc));
usleep(10000); /* Discourage brute force attack */
}
xfree(peer);
return rc;
}
/**********************************************************************\
* message packing routines
\**********************************************************************/
extern int slurm_buffers_pack_msg(slurm_msg_t *msg, msg_bufs_t *buffers,
bool block_for_forwarding)
{
header_t header;
int rc = SLURM_SUCCESS;
void *auth_cred = NULL;
time_t start_time = time(NULL);
slurm_hash_t hash = { 0 };
int h_len = 0;
if (!msg->restrict_uid_set)
fatal("%s: restrict_uid is not set", __func__);
/*
* Pack message into buffer
*/
buffers->body = init_buf(BUF_SIZE);
pack_msg(msg, buffers->body);
log_flag_hex(NET_RAW, get_buf_data(buffers->body),
get_buf_offset(buffers->body),
"%s: packed body", __func__);
if (msg->flags & SLURM_NO_AUTH_CRED)
goto skip_auth1;
/*
* Initialize header with Auth credential and message type.
* We get the credential now rather than later so the work can
* can be done in parallel with waiting for message to forward,
* but we may need to generate the credential again later if we
* wait too long for the incoming message.
*/
h_len = _compute_hash(buffers->body, msg, &hash);
if (h_len < 0) {
error("%s: hash_g_compute: %s has error",
__func__, rpc_num2string(msg->msg_type));
FREE_NULL_BUFFER(buffers->body);
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
}
log_flag_hex(NET_RAW, &hash, sizeof(hash),
"%s: hash:", __func__);
if (msg->flags & SLURM_GLOBAL_AUTH_KEY) {
auth_cred = auth_g_create(msg->auth_index, _global_auth_key(),
msg->restrict_uid, &hash, h_len);
} else {
auth_cred = auth_g_create(msg->auth_index, slurm_conf.authinfo,
msg->restrict_uid, &hash, h_len);
}
skip_auth1:
if (msg->forward.init != FORWARD_INIT) {
forward_init(&msg->forward);
msg->ret_list = NULL;
}
if (!msg->forward.tree_width)
msg->forward.tree_width = slurm_conf.tree_width;
if (block_for_forwarding)
forward_wait(msg);
init_header(&header, msg, msg->flags);
if (msg->flags & SLURM_NO_AUTH_CRED)
goto skip_auth2;
if (difftime(time(NULL), start_time) >= 60) {
auth_g_destroy(auth_cred);
if (msg->flags & SLURM_GLOBAL_AUTH_KEY) {
auth_cred = auth_g_create(msg->auth_index,
_global_auth_key(),
msg->restrict_uid, &hash,
h_len);
} else {
auth_cred = auth_g_create(msg->auth_index,
slurm_conf.authinfo,
msg->restrict_uid, &hash,
h_len);
}
}
if (auth_cred == NULL) {
error("%s: auth_g_create: %s has authentication error",
__func__, rpc_num2string(msg->msg_type));
FREE_NULL_BUFFER(buffers->body);
slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
}
/*
* Pack auth credential
*/
buffers->auth = init_buf(BUF_SIZE);
rc = auth_g_pack(auth_cred, buffers->auth, header.version);
if (rc) {
error("%s: auth_g_pack: %s has authentication error: %m",
__func__, rpc_num2string(header.msg_type));
auth_g_destroy(auth_cred);
FREE_NULL_BUFFER(buffers->auth);
FREE_NULL_BUFFER(buffers->body);
slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
}
auth_g_destroy(auth_cred);
log_flag_hex(NET_RAW, get_buf_data(buffers->auth),
get_buf_offset(buffers->auth),
"%s: packed auth_cred", __func__);
skip_auth2:
/*
* Pack and send message
*/
update_header(&header, get_buf_offset(buffers->body));
buffers->header = init_buf(BUF_SIZE);
pack_header(&header, buffers->header);
log_flag_hex(NET_RAW, get_buf_data(buffers->header),
get_buf_offset(buffers->header),
"%s: packed header", __func__);
return rc;
}
/**********************************************************************\
* send message functions
\**********************************************************************/
/*
* Send a slurm message over an open file descriptor `fd'
* Returns the size of the message sent in bytes, or -1 on failure.
*/
extern int slurm_send_node_msg(void *tls_conn, slurm_msg_t *msg)
{
int fd = -1;
msg_bufs_t buffers = { 0 };
int rc;
if (msg->conn) {
persist_msg_t persist_msg;
buf_t *buffer;
char *peer = NULL;
int persist_fd = conn_g_get_fd(msg->conn->tls_conn);
log_flag(NET, "Sending persist_msg_t %s to %pA on fd %d",
rpc_num2string(msg->msg_type), &msg->address,
persist_fd);
memset(&persist_msg, 0, sizeof(persist_msg_t));
persist_msg.msg_type = msg->msg_type;
persist_msg.data = msg->data;
buffer = slurm_persist_msg_pack(msg->conn, &persist_msg);
if (!buffer) /* pack error */
return SLURM_ERROR;
rc = slurm_persist_send_msg(msg->conn, buffer);
FREE_NULL_BUFFER(buffer);
if ((rc != SLURM_SUCCESS) && (fd < 0))
fd = persist_fd;
if ((rc != SLURM_SUCCESS) && (errno == ENOTCONN)) {
if (slurm_conf.debug_flags & DEBUG_FLAG_NET)
peer = fd_resolve_peer(fd);
log_flag(NET, "%s: [%s] persistent connection has disappeared for msg_type=%s",
__func__, peer, rpc_num2string(msg->msg_type));
} else if (rc != SLURM_SUCCESS) {
peer = fd_resolve_peer(fd);
error("%s: [%s] slurm_persist_send_msg(msg_type=%s) failed: %m",
__func__, peer, rpc_num2string(msg->msg_type));
}
xfree(peer);
return rc;
}
fd = conn_g_get_fd(tls_conn);
log_flag(NET, "Sending message %s to %pA on fd %d",
rpc_num2string(msg->msg_type), &msg->address, fd);
/*
* Pack and send message
*/
if ((rc = slurm_buffers_pack_msg(msg, &buffers, true)))
goto cleanup;
rc = slurm_bufs_sendto(tls_conn, &buffers);
if (rc >= 0) {
/* sent successfully */
} else if (errno == ENOTCONN) {
log_flag(NET, "%s: peer has disappeared for msg_type=%s",
__func__, rpc_num2string(msg->msg_type));
} else if (errno == EBADF) {
/* failure of sendto() and peer lookup will never work */
error("%s: slurm_bufs_sendto(fd=%d) with msg_type=%s failed: %m",
__func__, fd, rpc_num2string(msg->msg_type));
} else {
int err = errno;
char *peer = fd_resolve_path(fd);
error("%s: [%s] slurm_bufs_sendto(msg_type=%s) failed: %s",
__func__, peer, rpc_num2string(msg->msg_type),
slurm_strerror(err));
xfree(peer);
}
cleanup:
FREE_NULL_BUFFER(buffers.header);
FREE_NULL_BUFFER(buffers.auth);
FREE_NULL_BUFFER(buffers.body);
return rc;
}
/*
* Send a slurm message over an open connection ignoring any errors.
* Not usable for persistent connections.
*/
static void _send_node_msg_maybe(void *tls_conn, slurm_msg_t *msg)
{
msg_bufs_t buffers = { 0 };
if (!msg->conn) {
/* Pack and send message */
if (slurm_buffers_pack_msg(msg, &buffers, true))
goto cleanup;
slurm_bufs_sendto(tls_conn, &buffers);
}
cleanup:
FREE_NULL_BUFFER(buffers.header);
FREE_NULL_BUFFER(buffers.auth);
FREE_NULL_BUFFER(buffers.body);
}
/**********************************************************************\
* stream functions
\**********************************************************************/
/* slurm_write_stream
* writes a buffer out a stream file descriptor
* IN tls_conn
* IN buffer - buffer to send
* IN size - size of buffer send
* IN timeout - how long to wait in milliseconds
* RET size_t - bytes sent , or -1 on error
*/
extern size_t slurm_write_stream(void *tls_conn, char *buffer, size_t size)
{
return slurm_send_timeout(tls_conn, buffer, size,
(slurm_conf.msg_timeout * 1000));
}
/* slurm_read_stream
* read into buffer grom a stream file descriptor
* IN tls_conn
* OUT buffer - buffer to receive into
* IN size - size of buffer
* IN timeout - how long to wait in milliseconds
* RET size_t - bytes read , or -1 on error
*/
extern size_t slurm_read_stream(void *tls_conn, char *buffer, size_t size)
{
return slurm_recv_timeout(tls_conn, buffer, size,
(slurm_conf.msg_timeout * 1000));
}
/**********************************************************************\
* address conversion and management functions
\**********************************************************************/
/* slurm_get_ip_str
* given a slurm_address it returns its ip address as a string
* IN slurm_address - slurm_addr_t to be queried
* OUT ip - ip address in dotted-quad string form
* IN buf_len - length of ip buffer
*/
void slurm_get_ip_str(slurm_addr_t *addr, char *ip, unsigned int buf_len)
{
if (addr->ss_family == AF_INET6) {
struct sockaddr_in6 *sin = (struct sockaddr_in6 *) addr;
inet_ntop(AF_INET6, &sin->sin6_addr, ip, buf_len);
} else {
struct sockaddr_in *sin = (struct sockaddr_in *) addr;
inet_ntop(AF_INET, &sin->sin_addr, ip, buf_len);
}
}
/* slurm_get_peer_addr
* get the slurm address of the peer connection, similar to getpeeraddr
* IN fd - an open connection
* OUT slurm_address - place to park the peer's slurm_addr
*/
int slurm_get_peer_addr(int fd, slurm_addr_t * slurm_address)
{
slurm_addr_t name = {0};
socklen_t namelen = (socklen_t) sizeof(name);
if (getpeername((int) fd, (struct sockaddr *) &name, &namelen))
return errno;
memcpy(slurm_address, &name, sizeof(slurm_addr_t));
return 0;
}
/**********************************************************************\
* slurm_addr_t pack routines
\**********************************************************************/
/* slurm_pack_addr_array
* packs an array of slurm_addrs into a buffer
* OUT addr_array - slurm_addr_t[] to pack
* IN size_val - how many to pack
* IN/OUT buffer - buffer to pack the slurm_addr_t from
* returns - Slurm error code
*/
extern void slurm_pack_addr_array(slurm_addr_t *addr_array, uint32_t size_val,
buf_t *buffer)
{
pack32(size_val, buffer);
for (int i = 0; i < size_val; i++)
slurm_pack_addr(&addr_array[i], buffer);
}
/* slurm_unpack_addr_array
* unpacks an array of slurm_addrs from a buffer
* OUT addr_array_ptr - slurm_addr_t[] to unpack to
* IN/OUT size_val - how many to unpack
* IN/OUT buffer - buffer to unpack the slurm_addr_t from
* returns - Slurm error code
*/
extern int slurm_unpack_addr_array(slurm_addr_t **addr_array_ptr,
uint32_t *size_val, buf_t *buffer)
{
slurm_addr_t *addr_array = NULL;
safe_unpack32(size_val, buffer);
safe_xcalloc(addr_array, *size_val, sizeof(slurm_addr_t));
for (int i = 0; i < *size_val; i++) {
if (slurm_unpack_addr_no_alloc(&addr_array[i], buffer))
goto unpack_error;
}
*addr_array_ptr = addr_array;
return SLURM_SUCCESS;
unpack_error:
*size_val = 0;
xfree(addr_array);
return SLURM_ERROR;
}
/**********************************************************************\
* simplified communication routines
* They open a connection do work then close the connection all within
* the function
\**********************************************************************/
extern void slurm_resp_msg_init(slurm_msg_t *resp_msg, slurm_msg_t *msg,
uint16_t msg_type, void *data)
{
slurm_msg_t_init(resp_msg);
resp_msg->address = msg->address;
resp_msg->auth_index = msg->auth_index;
resp_msg->conn = msg->conn;
resp_msg->data = data;
resp_msg->flags = msg->flags;
resp_msg->forward = msg->forward;
resp_msg->forward_struct = msg->forward_struct;
resp_msg->hash_index = msg->hash_index;
resp_msg->msg_type = msg_type;
resp_msg->protocol_version = msg->protocol_version;
resp_msg->ret_list = msg->ret_list;
resp_msg->orig_addr = msg->orig_addr;
/*
* Extra sanity check. This should always be set. But if for some
* reason it isn't, restrict the decode to avoid leaking an
* unrestricted authentication token.
*
* Implicitly trust communications initiated by SlurmUser and
* SlurmdUser. In future releases this won't matter - there's
* no point packing an auth token on the reply as it isn't checked,
* but we're stuck doing that on older protocol versions for
* backwards-compatibility.
*/
if (!msg->auth_ids_set)
slurm_msg_set_r_uid(resp_msg, SLURM_AUTH_NOBODY);
else if ((msg->auth_uid != slurm_conf.slurm_user_id) &&
(msg->auth_uid != slurm_conf.slurmd_user_id))
slurm_msg_set_r_uid(resp_msg, msg->auth_uid);
else
slurm_msg_set_r_uid(resp_msg, SLURM_AUTH_UID_ANY);
/*
* Skip sending an auth credential on the reply. Clients don't need
* it, and already implicitly trust the connection.
*/
resp_msg->flags |= SLURM_NO_AUTH_CRED;
}
extern int send_msg_response(slurm_msg_t *source_msg, slurm_msg_type_t msg_type,
void *data)
{
int rc;
slurm_msg_t resp_msg;
slurm_resp_msg_init(&resp_msg, source_msg, msg_type, data);
if (source_msg->conmgr_con) {
rc = conmgr_con_queue_write_msg(source_msg->conmgr_con,
&resp_msg);
if (rc)
log_flag(NET, "%s: [%s] write response RPC %s failure: %s",
__func__,
conmgr_con_get_name(source_msg->conmgr_con),
rpc_num2string(msg_type), slurm_strerror(rc));
return rc;
}
resp_msg.conn = source_msg->conn;
rc = slurm_send_node_msg(source_msg->tls_conn, &resp_msg);
if (rc >= 0)
return SLURM_SUCCESS;
rc = errno;
log_flag(NET, "%s: [fd:%d] write response RPC %s failed: %s",
__func__, (source_msg->conn ?
conn_g_get_fd(source_msg->conn->tls_conn) :
conn_g_get_fd(source_msg->tls_conn)),
rpc_num2string(msg_type), slurm_strerror(rc));
return rc;
}
/* slurm_send_rc_msg
* given the original request message this function sends a
* slurm_return_code message back to the client that made the request
* IN request_msg - slurm_msg the request msg
* IN rc - the return_code to send back to the client
*/
int slurm_send_rc_msg(slurm_msg_t *msg, int rc)
{
return_code_msg_t rc_msg = {
.return_code = rc,
};
if ((rc = send_msg_response(msg, RESPONSE_SLURM_RC, &rc_msg))) {
errno = rc;
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
/* slurm_send_rc_err_msg
* given the original request message this function sends a
* slurm_return_code message back to the client that made the request
* IN request_msg - slurm_msg the request msg
* IN rc - the return_code to send back to the client
* IN err_msg - message for user
*/
int slurm_send_rc_err_msg(slurm_msg_t *msg, int rc, char *err_msg)
{
return_code2_msg_t rc_msg = {
.return_code = rc,
.err_msg = err_msg,
};
if ((rc = send_msg_response(msg, RESPONSE_SLURM_RC_MSG, &rc_msg))) {
errno = rc;
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
extern void slurm_send_msg_maybe(slurm_msg_t *req)
{
void *tls_conn = NULL;
if (!(tls_conn = slurm_open_msg_conn_maybe(&req->address,
req->tls_cert))) {
log_flag(NET, "%s: slurm_open_msg_conn_maybe(%pA): %m",
__func__, &req->address);
return;
}
_send_node_msg_maybe(tls_conn, req);
conn_g_destroy(tls_conn, true);
}
/*
* Sends back reroute_msg_t which directs the client to make the request to
* another cluster.
*
* IN msg - msg to respond to.
* IN cluster_rec - cluster to direct msg to.
*/
int slurm_send_reroute_msg(slurm_msg_t *msg,
slurmdb_cluster_rec_t *cluster_rec,
char *stepmgr)
{
int rc;
reroute_msg_t reroute_msg = {
.working_cluster_rec = cluster_rec,
.stepmgr = stepmgr,
};
if ((rc = send_msg_response(msg, RESPONSE_SLURM_REROUTE_MSG,
&reroute_msg))) {
errno = rc;
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
/*
* Send and recv a slurm request and response on the open slurm descriptor
* Doesn't close the connection.
* IN tls_conn
* IN req - a slurm_msg struct to be sent by the function
* OUT resp - a slurm_msg struct to be filled in by the function
* IN timeout - how long to wait in milliseconds
* RET int - returns 0 on success, -1 on failure and sets errno
*/
extern int slurm_send_recv_msg(void *tls_conn, slurm_msg_t *req,
slurm_msg_t *resp, int timeout)
{
slurm_msg_t_init(resp);
/* If we are using a persistent connection make sure it is the one we
* actually want. This should be the correct one already, but just make
* sure.
*/
if (req->conn) {
resp->conn = req->conn;
}
if (slurm_send_node_msg(tls_conn, req) < 0)
return -1;
/*
* No need to adjust the timeout here since we are not forwarding or
* expecting anything other than one message. The default timeout will
* be used if it is set to 0.
*/
if (slurm_receive_msg(tls_conn, resp, timeout))
return -1;
return 0;
}
static int _foreach_ret_list_hostname_set(void *x, void *arg)
{
ret_data_info_t *ret_data_info = x;
char *name = arg;
if (!ret_data_info->node_name)
ret_data_info->node_name = xstrdup(name);
return 0;
}
/*
* slurm_send_recv_controller_msg
* opens a connection to the controller, sends the controller a message,
* listens for the response, then closes the connection
* IN request_msg - slurm_msg request
* OUT response_msg - slurm_msg response
* IN comm_cluster_rec - Communication record (host/port/version)/
* RET int - returns 0 on success, -1 on failure and sets errno
*/
extern int slurm_send_recv_controller_msg(slurm_msg_t * request_msg,
slurm_msg_t * response_msg,
slurmdb_cluster_rec_t *comm_cluster_rec)
{
int rc = 0;
time_t start_time = time(NULL);
slurm_conf_t *conf;
bool have_backup;
uint16_t slurmctld_timeout;
static int index = 0;
slurmdb_cluster_rec_t *save_comm_cluster_rec = comm_cluster_rec;
int ratelimited = 0;
/*
* Just in case the caller didn't initialize his slurm_msg_t, and
* since we KNOW that we are only sending to one node (the controller),
* we initialize some forwarding variables to disable forwarding.
*/
forward_init(&request_msg->forward);
request_msg->ret_list = NULL;
request_msg->forward_struct = NULL;
slurm_msg_set_r_uid(request_msg, SLURM_AUTH_UID_ANY);
if (tls_enabled() && running_in_slurmstepd()) {
return stepd_proxy_send_recv_ctld_msg(request_msg,
response_msg);
}
tryagain:
if (comm_cluster_rec)
request_msg->flags |= SLURM_GLOBAL_AUTH_KEY;
conf = slurm_conf_lock();
have_backup = conf->control_cnt > 1;
slurmctld_timeout = conf->slurmctld_timeout;
slurm_conf_unlock();
while (true) {
void *tls_conn = NULL;
if (!(tls_conn = _open_controller(&index, comm_cluster_rec))) {
rc = -1;
break;
}
if (comm_cluster_rec)
request_msg->protocol_version =
comm_cluster_rec->rpc_version;
rc = slurm_send_recv_msg(tls_conn, request_msg, response_msg,
0);
conn_g_destroy(tls_conn, true);
if (response_msg->auth_cred)
auth_g_destroy(response_msg->auth_cred);
if ((rc == 0) && (!comm_cluster_rec)
&& (response_msg->msg_type == RESPONSE_SLURM_RC)
&& ((((return_code_msg_t *) response_msg->data)->return_code
== ESLURM_IN_STANDBY_MODE) ||
(((return_code_msg_t *) response_msg->data)->return_code
== ESLURM_IN_STANDBY_USE_BACKUP))
&& (have_backup)
&& (difftime(time(NULL), start_time)
< (slurmctld_timeout + (slurmctld_timeout / 2)))) {
log_flag(NET, "%s: SlurmctldHost[%d] is in standby, trying next",
__func__, index);
index++;
/*
* After running through all backups, pause to
* give the primary some time to come back up.
*/
if (index == conf->control_cnt) {
index = 0;
sleep(slurmctld_timeout / 2);
}
slurm_free_return_code_msg(response_msg->data);
continue;
}
break;
}
if (!rc && (response_msg->msg_type == RESPONSE_SLURM_RC) &&
((((return_code_msg_t *) response_msg->data)->return_code)
== SLURMCTLD_COMMUNICATIONS_BACKOFF)) {
ratelimited++;
/*
* slurmctld thinks we're being too chatty.
* sleep for one second and try again.
*/
verbose("RPC rate limited %d time(s). Sleeping then trying again.",
ratelimited);
sleep(ratelimited);
goto tryagain;
}
if (!rc && (response_msg->msg_type == RESPONSE_SLURM_REROUTE_MSG)) {
reroute_msg_t *rr_msg = response_msg->data;
if (rr_msg->working_cluster_rec) {
/*
* Don't expect multiple hops but in the case it does
* happen, free the previous rr cluster_rec.
*/
if (comm_cluster_rec &&
(comm_cluster_rec != save_comm_cluster_rec))
slurmdb_destroy_cluster_rec(comm_cluster_rec);
comm_cluster_rec = rr_msg->working_cluster_rec;
slurmdb_setup_cluster_rec(comm_cluster_rec);
rr_msg->working_cluster_rec = NULL;
goto tryagain;
}
}
if (comm_cluster_rec != save_comm_cluster_rec)
slurmdb_destroy_cluster_rec(comm_cluster_rec);
if (rc != 0)
_remap_slurmctld_errno();
return rc;
}
/* slurm_send_recv_node_msg
* opens a connection to node, sends the node a message, listens
* for the response, then closes the connection
* IN request_msg - slurm_msg request
* OUT response_msg - slurm_msg response
* IN timeout - how long to wait in milliseconds
* RET int - returns 0 on success, -1 on failure and sets errno
*/
int slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp, int timeout)
{
void *tls_conn = NULL;
int rc;
resp->auth_cred = NULL;
if (tls_enabled() && running_in_slurmstepd()) {
return stepd_proxy_send_recv_node_msg(req, resp, timeout);
}
if (!(tls_conn = slurm_open_msg_conn(&req->address, req->tls_cert))) {
log_flag(NET, "%s: slurm_open_msg_conn(%pA): %m",
__func__, &req->address);
return -1;
}
rc = slurm_send_recv_msg(tls_conn, req, resp, timeout);
conn_g_destroy(tls_conn, true);
return rc;
}
/* slurm_send_only_controller_msg
* opens a connection to the controller, sends the controller a
* message then, closes the connection
* IN request_msg - slurm_msg request
* IN comm_cluster_rec - Communication record (host/port/version)
* RET int - return code
* NOTE: NOT INTENDED TO BE CROSS-CLUSTER
*/
extern int slurm_send_only_controller_msg(slurm_msg_t *req,
slurmdb_cluster_rec_t *comm_cluster_rec)
{
void *tls_conn = NULL;
int rc = SLURM_SUCCESS;
int index = 0;
if (tls_enabled() && running_in_slurmstepd()) {
return stepd_proxy_send_only_ctld_msg(req);
}
/*
* Open connection to Slurm controller:
*/
if (!(tls_conn = _open_controller(&index, comm_cluster_rec))) {
rc = SLURM_ERROR;
goto cleanup;
}
slurm_msg_set_r_uid(req, slurm_conf.slurm_user_id);
if ((rc = slurm_send_node_msg(tls_conn, req)) < 0) {
rc = SLURM_ERROR;
} else {
log_flag(NET, "%s: sent %d", __func__, rc);
rc = SLURM_SUCCESS;
}
conn_g_destroy(tls_conn, true);
cleanup:
if (rc != SLURM_SUCCESS)
_remap_slurmctld_errno();
return rc;
}
/*
* Open a connection to the "address" specified in the slurm msg `req'
* Then, immediately close the connection w/out waiting for a reply.
*
* Returns SLURM_SUCCESS on success SLURM_ERROR (< 0) for failure.
*
* DO NOT USE THIS IN NEW CODE
* Use slurm_send_recv_rc_msg_only_one() or something similar instead.
*
* By not waiting for a response message, the message to be transmitted
* may never be received by the remote end. The remote TCP stack may
* acknowledge the data while the application itself has not had a chance
* to receive it. The only way to tell that the application has processed
* a given packet is for it to send back a message across the socket itself.
*
* The receive side looks like: poll() && read(), close(). If the poll() times
* out, the kernel may still ACK the data while the application has jumped to
* closing the connection. The send side cannot then distinguish between the
* close happening as a result of the timeout vs. as a normal message shutdown.
*
* This is only one example of the many races inherent in this approach.
*
* See "UNIX Network Programming" Volume 1 (Third Edition) Section 7.5 on
* SO_LINGER for a description of the subtle hazards inherent in abusing
* TCP as a unidirectional pipe.
*/
int slurm_send_only_node_msg(slurm_msg_t *req)
{
void *tls_conn = NULL;
int rc = SLURM_SUCCESS;
int fd = -1;
struct pollfd pfd;
int value = -1;
int pollrc;
if (tls_enabled() && running_in_slurmstepd()) {
return stepd_proxy_send_only_node_msg(req);
}
if (!(tls_conn = slurm_open_msg_conn(&req->address, req->tls_cert))) {
log_flag(NET, "%s: slurm_open_msg_conn(%pA): %m",
__func__, &req->address);
return SLURM_ERROR;
}
fd = conn_g_get_fd(tls_conn);
if ((rc = slurm_send_node_msg(tls_conn, req)) < 0) {
rc = SLURM_ERROR;
} else {
log_flag(NET, "%s: sent %d", __func__, rc);
rc = SLURM_SUCCESS;
}
/*
* Make sure message was received by remote, and that there isn't
* and outstanding write() or that the connection has been reset.
*
* The shutdown() call intentionally falls through to the next block,
* the poll() should hit POLLERR which gives the TICOUTQ count as an
* additional diagnostic element.
*
* The steps below may result in a false-positive on occasion, in
* which case the code path above may opt to retransmit an already
* received message. If this is a concern, you should not be using
* this function.
*
* Skip this if running with TLS enabled as it may mess up the internal
* session state.
*/
if (!tls_enabled() && shutdown(fd, SHUT_WR))
log_flag(NET, "%s: shutdown call failed: %m", __func__);
again:
pfd.fd = fd;
pfd.events = POLLIN;
pollrc = poll(&pfd, 1, (slurm_conf.msg_timeout * 1000));
if (pollrc == -1) {
if (errno == EINTR)
goto again;
log_flag(NET, "%s: poll error: %m", __func__);
conn_g_destroy(tls_conn, true);
return SLURM_ERROR;
}
if (pollrc == 0) {
if (ioctl(fd, TIOCOUTQ, &value))
log_flag(NET, "%s: TIOCOUTQ ioctl failed",
__func__);
log_flag(NET, "%s: poll timed out with %d outstanding: %m",
__func__, value);
conn_g_destroy(tls_conn, true);
return SLURM_ERROR;
}
if (pfd.revents & POLLERR) {
int value = -1;
int rc;
int err = SLURM_SUCCESS;
if (ioctl(fd, TIOCOUTQ, &value))
log_flag(NET, "%s: TIOCOUTQ ioctl failed",
__func__);
if ((rc = fd_get_socket_error(fd, &err)))
log_flag(NET, "%s fd_get_socket_error failed with %s",
__func__, slurm_strerror(rc));
else
log_flag(NET, "%s: poll error with %d outstanding: %s",
__func__, value, slurm_strerror(err));
conn_g_destroy(tls_conn, true);
return SLURM_ERROR;
}
conn_g_destroy(tls_conn, true);
return rc;
}
/*
* Send a message to the nodelist specified using fanout
* Then return List containing type (ret_data_info_t).
* IN nodelist - list of nodes to send to.
* IN msg - a slurm_msg struct to be sent by the function
* IN timeout - how long to wait in milliseconds
* RET List - List containing the responses of the children
* (if any) we forwarded the message to. List
* containing type (ret_data_info_t).
*/
list_t *slurm_send_recv_msgs(const char *nodelist, slurm_msg_t *msg, int timeout)
{
list_t *ret_list = NULL;
hostlist_t *hl = NULL;
if (!nodelist || !strlen(nodelist)) {
error("slurm_send_recv_msgs: no nodelist given");
return NULL;
}
hl = hostlist_create(nodelist);
if (!hl) {
error("slurm_send_recv_msgs: problem creating hostlist");
return NULL;
}
ret_list = start_msg_tree(hl, msg, timeout);
hostlist_destroy(hl);
return ret_list;
}
/*
* Send a message to msg->address
* Then return List containing type (ret_data_info_t).
* IN msg - a slurm_msg struct to be sent by the function
* IN timeout - how long to wait in milliseconds
* RET List - List containing the responses of the children
* (if any) we forwarded the message to. List
* containing type (ret_types_t).
*/
list_t *slurm_send_addr_recv_msgs(slurm_msg_t *msg, char *name, int timeout)
{
time_t start, now;
uint16_t conn_timeout = MIN(slurm_conf.msg_timeout, 10);
void *tls_conn = NULL;
list_t *ret_list = NULL;
bool first = true;
start = now = time(NULL);
/* This connect retry logic permits Slurm hierarchical communications
* to better survive slurmd restarts */
while ((now - start) < conn_timeout) {
if ((tls_conn =
slurm_open_msg_conn(&msg->address, msg->tls_cert)))
break;
if ((errno != ECONNREFUSED) && (errno != ETIMEDOUT))
break;
if (errno == ETIMEDOUT) {
if (first)
log_flag(NET, "Timed out connecting to %pA, retrying...",
&msg->address);
} else {
if (first)
log_flag(NET, "Connection refused by %pA, retrying...",
&msg->address);
sleep(1);
}
first = false;
now = time(NULL);
}
if (!tls_conn) {
log_flag(NET, "Failed to connect to %pA, %m", &msg->address);
mark_as_failed_forward(&ret_list, name,
SLURM_COMMUNICATIONS_CONNECTION_ERROR);
errno = SLURM_COMMUNICATIONS_CONNECTION_ERROR;
return ret_list;
}
msg->ret_list = NULL;
msg->forward_struct = NULL;
if (slurm_send_node_msg(tls_conn, msg) >= 0)
ret_list = slurm_receive_msgs(tls_conn, msg->forward.tree_depth,
msg->forward.timeout);
if (!ret_list) {
mark_as_failed_forward(&ret_list, name, errno);
conn_g_destroy(tls_conn, true);
errno = SLURM_COMMUNICATIONS_CONNECTION_ERROR;
return ret_list;
}
(void) list_for_each(
ret_list, _foreach_ret_list_hostname_set, name);
conn_g_destroy(tls_conn, true);
return ret_list;
}
/*
* Open a connection to the "address" specified in the slurm msg "req".
* Then read back an "rc" message returning the "return_code" specified
* in the response in the "rc" parameter.
* IN req - a slurm_msg struct to be sent by the function
* OUT rc - return code from the sent message
* IN timeout - how long to wait in milliseconds
* RET int either 0 for success or -1 for failure.
*/
int slurm_send_recv_rc_msg_only_one(slurm_msg_t *req, int *rc, int timeout)
{
int ret_c = 0;
slurm_msg_t resp;
slurm_msg_t_init(&resp);
/* Just in case the caller didn't initialize his slurm_msg_t, and
* since we KNOW that we are only sending to one node,
* we initialize some forwarding variables to disable forwarding.
*/
forward_init(&req->forward);
req->ret_list = NULL;
req->forward_struct = NULL;
if (!slurm_send_recv_node_msg(req, &resp, timeout)) {
if (resp.auth_cred)
auth_g_destroy(resp.auth_cred);
*rc = slurm_get_return_code(resp.msg_type, resp.data);
slurm_free_msg_data(resp.msg_type, resp.data);
ret_c = 0;
} else
ret_c = -1;
return ret_c;
}
/*
* Send message to controller and get return code.
* Make use of slurm_send_recv_controller_msg(), which handles
* support for backup controller and retry during transition.
* IN req - request to send
* OUT rc - return code
* IN comm_cluster_rec - Communication record (host/port/version)/
* RET - 0 on success, -1 on failure
*/
extern int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc,
slurmdb_cluster_rec_t *comm_cluster_rec)
{
int ret_c;
slurm_msg_t resp;
if (!slurm_send_recv_controller_msg(req, &resp, comm_cluster_rec)) {
*rc = slurm_get_return_code(resp.msg_type, resp.data);
slurm_free_msg_data(resp.msg_type, resp.data);
ret_c = 0;
} else {
ret_c = -1;
}
return ret_c;
}
/*
* Free a slurm message's members but not the message itself
*/
extern void slurm_free_msg_members(slurm_msg_t *msg)
{
if (msg) {
if (msg->auth_cred)
auth_g_destroy(msg->auth_cred);
FREE_NULL_BUFFER(msg->buffer);
slurm_free_msg_data(msg->msg_type, msg->data);
FREE_NULL_LIST(msg->ret_list);
xfree(msg->tls_cert);
conmgr_fd_free_ref(&msg->conmgr_con);
}
}
/*
* Free a slurm message
*/
extern void slurm_free_msg(slurm_msg_t *msg)
{
if (msg) {
slurm_free_msg_members(msg);
xfree(msg);
}
}
extern void slurm_msg_set_r_uid(slurm_msg_t *msg, uid_t r_uid)
{
msg->restrict_uid = r_uid;
msg->restrict_uid_set = true;
}
extern char *nodelist_nth_host(const char *nodelist, int inx)
{
hostlist_t *hl = hostlist_create(nodelist);
char *name = hostlist_nth(hl, inx);
hostlist_destroy(hl);
return name;
}
extern int nodelist_find(const char *nodelist, const char *name)
{
hostlist_t *hl = hostlist_create(nodelist);
int id = hostlist_find(hl, name);
hostlist_destroy(hl);
return id;
}
/*
* Convert number from one unit to another.
* By default, Will convert num to largest divisible unit.
* Appends unit type suffix -- if applicable.
*
* IN num: number to convert.
* OUT buf: buffer to copy converted number into.
* IN buf_size: size of buffer.
* IN orig_type: The original type of num.
* IN spec_type: Type to convert num to. If specified, num will be converted up
* or down to this unit type.
* IN divisor: size of type
* IN flags: flags to control whether to convert exactly or not at all.
*/
extern void convert_num_unit2(double num, char *buf, int buf_size,
int orig_type, int spec_type, int divisor,
uint32_t flags)
{
char *unit = "\0KMGTP?";
uint64_t i;
if ((int64_t)num == 0) {
snprintf(buf, buf_size, "0");
return;
}
if (spec_type != NO_VAL) {
/* spec_type overrides all flags */
if (spec_type < orig_type) {
while (spec_type < orig_type) {
num *= divisor;
orig_type--;
}
} else if (spec_type > orig_type) {
while (spec_type > orig_type) {
num /= divisor;
orig_type++;
}
}
} else if (flags & CONVERT_NUM_UNIT_RAW) {
orig_type = UNIT_NONE;
} else if (flags & CONVERT_NUM_UNIT_NO) {
/* no op */
} else if (flags & CONVERT_NUM_UNIT_EXACT) {
/* convert until we would loose precision */
/* half values (e.g., 2.5G) are still considered precise */
while (num >= divisor
&& ((uint64_t)num % (divisor / 2) == 0)) {
num /= divisor;
orig_type++;
}
} else {
/* aggressively convert values */
while (num >= divisor) {
num /= divisor;
orig_type++;
}
}
if (orig_type < UNIT_NONE || orig_type > UNIT_PETA)
orig_type = UNIT_UNKNOWN;
i = (uint64_t)num;
/* Here we are checking to see if these numbers are the same,
* meaning the float has not floating point. If we do have
* floating point print as a float.
*/
if ((double)i == num)
snprintf(buf, buf_size, "%"PRIu64"%c", i, unit[orig_type]);
else
snprintf(buf, buf_size, "%.2f%c", num, unit[orig_type]);
}
extern void convert_num_unit(double num, char *buf, int buf_size,
int orig_type, int spec_type, uint32_t flags)
{
convert_num_unit2(num, buf, buf_size, orig_type, spec_type, 1024,
flags);
}
extern int revert_num_unit(const char *buf)
{
char *unit = "\0KMGTP\0";
int i = 1, j = 0, number = 0;
if (!buf)
return -1;
j = strlen(buf) - 1;
while (unit[i]) {
if (toupper((int)buf[j]) == unit[i])
break;
i++;
}
number = atoi(buf);
if (unit[i])
number *= (i*1024);
return number;
}
extern int get_convert_unit_val(int base_unit, char convert_to)
{
int conv_unit = 0, conv_value = 0;
if ((conv_unit = get_unit_type(convert_to)) == SLURM_ERROR)
return SLURM_ERROR;
while (base_unit++ < conv_unit) {
if (!conv_value)
conv_value = 1024;
else
conv_value *= 1024;
}
return conv_value;
}
extern int get_unit_type(char unit)
{
char *units = "\0KMGTP";
char *tmp_char = NULL;
if (unit == '\0') {
error("Invalid unit type '%c'. Possible options are '%s'",
unit, units + 1);
return SLURM_ERROR;
}
tmp_char = strchr(units + 1, toupper(unit));
if (!tmp_char) {
error("Invalid unit type '%c'. Possible options are '%s'",
unit, units + 1);
return SLURM_ERROR;
}
return tmp_char - units;
}
/*
* slurm_forward_data - forward arbitrary data to unix domain sockets on nodes
* IN/OUT nodelist: Nodes to forward data to (if failure this list is changed to
* reflect the failed nodes).
* IN address: address of unix domain socket
* IN len: length of data
* IN data: real data
* RET: error code
*/
extern int slurm_forward_data(
char **nodelist, char *address, uint32_t len, const char *data)
{
list_t *ret_list = NULL;
int temp_rc = 0, rc = 0;
ret_data_info_t *ret_data_info = NULL;
slurm_msg_t msg;
forward_data_msg_t req;
hostlist_t *hl = NULL;
bool redo_nodelist = false;
slurm_msg_t_init(&msg);
log_flag(NET, "%s: nodelist=%s, address=%s, len=%u",
__func__, *nodelist, address, len);
req.address = address;
req.len = len;
req.data = (char *)data;
slurm_msg_set_r_uid(&msg, SLURM_AUTH_UID_ANY);
msg.msg_type = REQUEST_FORWARD_DATA;
msg.data = &req;
if ((ret_list = slurm_send_recv_msgs(*nodelist, &msg, 0))) {
if (list_count(ret_list) > 1)
redo_nodelist = true;
while ((ret_data_info = list_pop(ret_list))) {
temp_rc = slurm_get_return_code(ret_data_info->type,
ret_data_info->data);
if (temp_rc != SLURM_SUCCESS) {
rc = temp_rc;
if (redo_nodelist) {
if (!hl)
hl = hostlist_create(
ret_data_info->
node_name);
else
hostlist_push_host(
hl, ret_data_info->
node_name);
}
}
destroy_data_info(ret_data_info);
}
} else {
error("slurm_forward_data: no list was returned");
rc = SLURM_ERROR;
}
if (hl) {
xfree(*nodelist);
hostlist_sort(hl);
*nodelist = hostlist_ranged_string_xmalloc(hl);
hostlist_destroy(hl);
}
FREE_NULL_LIST(ret_list);
return rc;
}
extern void slurm_setup_addr(slurm_addr_t *sin, uint16_t port)
{
static slurm_addr_t s_addr = { 0 };
memset(sin, 0, sizeof(*sin));
if (slurm_addr_is_unspec(&s_addr)) {
/* On systems with multiple interfaces we might not
* want to get just any address. This is the case on
* a Cray system with RSIP.
*/
char *var;
if (running_in_slurmctld())
var = "NoCtldInAddrAny";
else
var = "NoInAddrAny";
if (xstrcasestr(slurm_conf.comm_params, var)) {
char host[HOST_NAME_MAX];
if (!gethostname(host, HOST_NAME_MAX)) {
slurm_set_addr(&s_addr, port, host);
} else
fatal("%s: Can't get hostname or addr: %m",
__func__);
} else {
slurm_set_addr(&s_addr, port, NULL);
}
}
memcpy(sin, &s_addr, sizeof(*sin));
slurm_set_port(sin, port);
log_flag(NET, "%s: update address to %pA", __func__, sin);
}
extern int slurm_hex_to_char(int v)
{
if (v >= 0 && v < 10)
return '0' + v;
else if (v >= 10 && v < 16)
return ('a' - 10) + v;
else
return -1;
}
extern int slurm_char_to_hex(int c)
{
int cl;
cl = tolower(c);
if (c >= '0' && c <= '9')
return c - '0';
else if (cl >= 'a' && cl <= 'f')
return cl + (10 - 'a');
else
return -1;
}
extern int slurm_associations_get_shares(shares_request_msg_t *shares_req,
shares_response_msg_t **shares_resp)
{
int rc;
slurm_msg_t req_msg;
slurm_msg_t resp_msg;
slurm_msg_t_init(&req_msg);
slurm_msg_t_init(&resp_msg);
req_msg.msg_type = REQUEST_SHARE_INFO;
req_msg.data = shares_req;
if (slurm_send_recv_controller_msg(&req_msg, &resp_msg,
working_cluster_rec) < 0)
return SLURM_ERROR;
switch (resp_msg.msg_type) {
case RESPONSE_SHARE_INFO:
*shares_resp = (shares_response_msg_t *) resp_msg.data;
break;
case RESPONSE_SLURM_RC:
rc = ((return_code_msg_t *) resp_msg.data)->return_code;
slurm_free_return_code_msg(resp_msg.data);
if (rc)
slurm_seterrno_ret(rc);
*shares_resp = NULL;
break;
default:
slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
break;
}
return SLURM_SUCCESS;
}