blob: 706cae58431d2bfba99ae8ad328eae1368ae2566 [file] [log] [blame]
/*****************************************************************************\
* con.c - definitions for connection handlers in connection manager
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <limits.h>
#include <stdbool.h>
#include <stdint.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#if defined(__linux__)
#include <sys/sysmacros.h>
#endif /* __linux__ */
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/fd.h"
#include "src/common/http.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/net.h"
#include "src/common/pack.h"
#include "src/common/probes.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/slurm_time.h"
#include "src/common/util-net.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/conmgr/conmgr.h"
#include "src/conmgr/delayed.h"
#include "src/conmgr/mgr.h"
#include "src/conmgr/polling.h"
#include "src/conmgr/tls.h"
#include "src/interfaces/conn.h"
#include "src/interfaces/tls.h"
#include "src/interfaces/url_parser.h"
#define CTIME_STR_LEN 72
#define T(type) { type, XSTRINGIFY(type) }
static const struct {
conmgr_con_type_t type;
const char *string;
} con_types[] = {
T(CON_TYPE_NONE),
T(CON_TYPE_RAW),
T(CON_TYPE_RPC),
};
#undef T
#define T(flag) { flag, XSTRINGIFY(flag) }
static const struct {
con_flags_t flag;
const char *string;
} con_flags[] = {
T(FLAG_NONE),
T(FLAG_ON_DATA_TRIED),
T(FLAG_IS_SOCKET),
T(FLAG_IS_LISTEN),
T(FLAG_WAIT_ON_FINISH),
T(FLAG_CAN_WRITE),
T(FLAG_CAN_READ),
T(FLAG_READ_EOF),
T(FLAG_IS_CONNECTED),
T(FLAG_WORK_ACTIVE),
T(FLAG_RPC_KEEP_BUFFER),
T(FLAG_QUIESCE),
T(FLAG_CAN_QUERY_OUTPUT_BUFFER),
T(FLAG_IS_FIFO),
T(FLAG_IS_CHR),
T(FLAG_TCP_NODELAY),
T(FLAG_WATCH_WRITE_TIMEOUT),
T(FLAG_WATCH_READ_TIMEOUT),
T(FLAG_WATCH_CONNECT_TIMEOUT),
T(FLAG_TLS_SERVER),
T(FLAG_TLS_CLIENT),
T(FLAG_IS_TLS_CONNECTED),
T(FLAG_TLS_FINGERPRINT),
T(FLAG_TLS_WAIT_ON_CLOSE),
T(FLAG_RPC_RECV_FORWARD),
T(FLAG_WAIT_ON_EXTRACT),
};
#undef T
typedef struct {
const conmgr_events_t *events;
void *arg;
conmgr_con_type_t type;
int rc;
conmgr_con_flags_t flags;
} socket_listen_init_t;
typedef struct {
#define MAGIC_RECEIVE_FD 0xeba8bae0
int magic; /* MAGIC_RECEIVE_FD */
conmgr_con_type_t type;
const conmgr_events_t *events;
void *arg;
} receive_fd_args_t;
typedef struct {
#define MAGIC_SEND_FD 0xfbf8e2e0
int magic; /* MAGIC_SEND_FD */
int fd; /* fd to send over con */
} send_fd_args_t;
#define MAGIC_CON_LOG_WORK_ARGS 0xaafb100b
typedef struct {
int magic; /* MAGIC_CON_LOG_WORK_ARGS */
const char *type;
int index;
probe_log_t *log;
} log_con_work_args_t;
static void _validate_pctl_type(pollctl_fd_type_t type)
{
xassert(type > PCTL_TYPE_INVALID);
xassert(type < PCTL_TYPE_INVALID_MAX);
}
extern const char *conmgr_con_type_string(conmgr_con_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(con_types); i++)
if (con_types[i].type == type)
return con_types[i].string;
fatal_abort("invalid type");
}
static const char *_con_flag_string(con_flags_t flag)
{
for (int i = 0; i < ARRAY_SIZE(con_flags); i++)
if (con_flags[i].flag == flag)
return con_flags[i].string;
fatal_abort("invalid type");
}
extern char *con_flags_string(const con_flags_t flags)
{
char *str = NULL, *at = NULL;
uint32_t matched = 0;
if (flags == FLAG_NONE)
return xstrdup(_con_flag_string(FLAG_NONE));
/* skip FLAG_NONE */
for (int i = 1; i < ARRAY_SIZE(con_flags); i++) {
if ((con_flags[i].flag & flags) == con_flags[i].flag) {
xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""),
con_flags[i].string);
matched |= con_flags[i].flag;
}
}
if (flags ^ matched)
xstrfmtcatat(str, &at, "%s0x%08"PRIx32, (str ? "|" : ""),
(flags ^ matched));
return str;
}
/*
* Close all connections (for_each)
* NOTE: must hold mgr.mutex
*/
static int _close_con_for_each(void *x, void *arg)
{
conmgr_fd_t *con = x;
close_con(true, con);
return 1;
}
/* mgr.mutex must be locked when calling this function */
extern void close_all_connections(void)
{
/* close all connections */
list_for_each(mgr.connections, _close_con_for_each, NULL);
list_for_each(mgr.listen_conns, _close_con_for_each, NULL);
}
extern void work_close_con(conmgr_callback_args_t conmgr_args, void *arg)
{
close_con(false, conmgr_args.con);
}
/*
* Stop reading from connection but write out the remaining buffer and finish
* any queued work
*/
extern void close_con(bool locked, conmgr_fd_t *con)
{
int input_fd = -1;
bool is_same_fd, is_socket, is_listen;
if (!locked)
slurm_mutex_lock(&mgr.mutex);
if (con->input_fd < 0) {
xassert(con_flag(con, FLAG_READ_EOF) ||
con_flag(con, FLAG_IS_LISTEN));
xassert(!con_flag(con, FLAG_CAN_READ) ||
con_flag(con, FLAG_IS_LISTEN));
if (!locked)
slurm_mutex_unlock(&mgr.mutex);
log_flag(CONMGR, "%s: [%s] ignoring duplicate close request",
__func__, con->name);
return;
}
log_flag(CONMGR, "%s: [%s] closing input", __func__, con->name);
/*
* Stop polling read/write to input fd to allow handle_connection() to
* select what needs to be monitored
*/
con_set_polling(con, PCTL_TYPE_NONE, __func__);
/* mark it as EOF even if it hasn't */
con_set_flag(con, FLAG_READ_EOF);
con_unset_flag(con, FLAG_CAN_READ);
/* drop any unprocessed input buffer */
if (con->in)
set_buf_offset(con->in, 0);
if (con->tls_in)
set_buf_offset(con->tls_in, 0);
is_same_fd = (con->input_fd == con->output_fd);
is_socket = con_flag(con, FLAG_IS_SOCKET);
is_listen = con_flag(con, FLAG_IS_LISTEN);
input_fd = con->input_fd;
con->input_fd = -1;
EVENT_SIGNAL(&mgr.watch_sleep);
if (!locked)
slurm_mutex_unlock(&mgr.mutex);
/* unlink listener sockets to avoid leaving ghost socket */
if (is_listen && (con->address.ss_family == AF_LOCAL)) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
if (unlink(un->sun_path))
error("%s: [%s] unable to unlink %s: %m",
__func__, con->name, un->sun_path);
else
log_flag(CONMGR, "%s: [%s] unlinked %s",
__func__, con->name, un->sun_path);
}
if (is_listen || !is_same_fd) {
fd_close(&input_fd);
} else if (is_socket && shutdown(input_fd, SHUT_RD)) {
/* shutdown input on sockets */
log_flag(CONMGR, "%s: [%s] unable to shutdown incoming socket half: %m",
__func__, con->name);
}
}
static char *_resolve_tty_name(int fd)
{
char buf[PATH_MAX] = {0};
if (ttyname_r(fd, buf, (sizeof(buf) - 1))) {
log_flag(CONMGR, "%s: unable to resolve tty at fd:%d: %m",
__func__, fd);
return NULL;
}
return xstrdup(buf);
}
static char *_resolve_fd(int fd, struct stat *stat_ptr)
{
char *name = NULL;
if (S_ISSOCK(stat_ptr->st_mode)) {
slurm_addr_t addr = {0};
if (!slurm_get_stream_addr(fd, &addr) &&
(addr.ss_family != AF_UNSPEC) &&
(name = sockaddr_to_string(&addr, sizeof(addr))))
return name;
}
if ((name = fd_resolve_path(fd)))
return name;
if (S_ISFIFO(stat_ptr->st_mode))
return xstrdup_printf("pipe");
if (S_ISCHR(stat_ptr->st_mode)) {
if (isatty(fd) && (name = _resolve_tty_name(fd)))
return name;
#if defined(__linux__)
return xstrdup_printf("device:%u.%u", major(stat_ptr->st_dev),
minor(stat_ptr->st_dev));
#else /* !__linux__ */
return xstrdup_printf("device:0x%"PRIx64, stat_ptr->st_dev);
#endif /* !__linux__ */
}
#if defined(__linux__)
if (S_ISBLK(stat_ptr->st_mode))
return xstrdup_printf("block:%u.%u", major(stat_ptr->st_dev),
minor(stat_ptr->st_dev));
#endif /* __linux__ */
return NULL;
}
/* set connection name if one was not resolved already */
static void _set_connection_name(conmgr_fd_t *con, struct stat *in_stat,
struct stat *out_stat)
{
xassert(con);
xassert(!con->name);
char *in_str = NULL, *out_str = NULL;
const bool has_in = (con->input_fd >= 0);
const bool has_out = (con->output_fd >= 0);
bool is_same = (con->input_fd == con->output_fd);
if (!has_in && !has_out) {
con->name = xstrdup("INVALID");
return;
}
/* grab socket peer if possible */
if (con_flag(con, FLAG_IS_SOCKET) && has_out)
out_str = fd_resolve_peer(con->output_fd);
if (has_out && !out_str)
out_str = _resolve_fd(con->output_fd, out_stat);
if (has_in)
in_str = _resolve_fd(con->input_fd, in_stat);
/* avoid "->" syntax if same on both sides */
if (in_str && out_str && !xstrcmp(in_str, out_str)) {
is_same = true;
xfree(out_str);
}
if (is_same) {
xstrfmtcat(con->name, "%s(fd:%d)", in_str,
con->input_fd);
} else if (has_in && has_out) {
xstrfmtcat(con->name, "%s(fd:%d)->%s(fd:%d)", in_str,
con->input_fd, out_str, con->output_fd);
} else if (has_in && !has_out) {
xstrfmtcat(con->name, "%s(fd:%d)->()", in_str, con->input_fd);
} else if (!has_in && has_out) {
xstrfmtcat(con->name, "()->%s(fd:%d)", out_str, con->output_fd);
} else {
xassert(false);
}
xfree(out_str);
xfree(in_str);
}
static void _check_con_type(conmgr_fd_t *con, conmgr_con_type_t type)
{
#ifndef NDEBUG
if (type == CON_TYPE_RAW) {
/* must have on_data() defined */
xassert(con->events->on_data);
} else if (type == CON_TYPE_RPC) {
/* must have on_msg() defined */
xassert(con->events->on_msg);
} else {
fatal_abort("invalid type");
}
#endif /* !NDEBUG */
}
extern int fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
_check_con_type(con, type);
if (con->type == type) {
log_flag(CONMGR, "%s: [%s] ignoring unchanged type: %s",
__func__, con->name, conmgr_con_type_string(type));
return SLURM_SUCCESS;
}
log_flag(CONMGR, "%s: [%s] changing type: %s->%s pending_reads=%u pending_writes=%u",
__func__, con->name, conmgr_con_type_string(con->type),
conmgr_con_type_string(type),
con->in ? get_buf_offset(con->in) : 0,
list_count(con->out));
/* Always set TCP_NODELAY for Slurm RPC connections */
if (con->type == CON_TYPE_RPC)
con_set_flag(con, FLAG_TCP_NODELAY);
/* New mode should run on_data() if there is anything pending */
con_unset_flag(con, FLAG_ON_DATA_TRIED);
/* Reset polling to trigger re-evaluation */
con_set_polling(con, PCTL_TYPE_NONE, __func__);
con->type = type;
if (con_flag(con, FLAG_IS_SOCKET) && con_flag(con, FLAG_TCP_NODELAY) &&
(con->output_fd >= 0)) {
int rc;
if ((rc = net_set_nodelay(con->output_fd, true, NULL))) {
log_flag(CONMGR, "%s: [%s] unable to set TCP_NODELAY: %s",
__func__, con->name, slurm_strerror(rc));
return rc;
}
}
return SLURM_SUCCESS;
}
extern int conmgr_fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type)
{
int rc;
slurm_mutex_lock(&mgr.mutex);
rc = fd_change_mode(con, type);
/* wake up watch() to send along any pending data */
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
extern int add_connection(conmgr_con_type_t type,
conmgr_fd_t *source, int input_fd,
int output_fd,
const conmgr_events_t *events,
conmgr_con_flags_t flags,
const slurm_addr_t *addr,
socklen_t addrlen, bool is_listen,
const char *unix_socket_path, void *tls_conn,
void *arg)
{
struct stat in_stat = { 0 };
struct stat out_stat = { 0 };
conmgr_fd_t *con = NULL;
bool set_keep_alive, is_socket, is_fifo, is_chr;
const bool has_in = (input_fd >= 0);
const bool has_out = (output_fd >= 0);
const bool is_same = (input_fd == output_fd);
const size_t unix_socket_path_len =
(unix_socket_path ? (strlen(unix_socket_path) + 1): 0);
static const size_t unix_socket_path_max =
sizeof(((struct sockaddr_un *) NULL)->sun_path);
if (unix_socket_path_len &&
(unix_socket_path_len > unix_socket_path_max)) {
log_flag(CONMGR, "%s: Unix domain socket path too long %zu/%zu: %s",
__func__, unix_socket_path_len, unix_socket_path_max,
unix_socket_path);
return ENAMETOOLONG;
}
/* verify FD is valid and still open */
if (has_in && fstat(input_fd, &in_stat)) {
log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, input_fd);
return SLURM_COMMUNICATIONS_INVALID_INCOMING_FD;
}
if (has_out && fstat(output_fd, &out_stat)) {
log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, output_fd);
return SLURM_COMMUNICATIONS_INVALID_OUTGOING_FD;
}
if (!has_in && !has_out) {
log_flag(CONMGR, "%s: refusing connection without input or output fd",
__func__);
return SLURM_COMMUNICATIONS_INVALID_FD;
}
is_socket = (has_in && S_ISSOCK(in_stat.st_mode)) ||
(has_out && S_ISSOCK(out_stat.st_mode));
is_fifo = (has_in && S_ISFIFO(in_stat.st_mode)) ||
(has_out && S_ISFIFO(out_stat.st_mode));
is_chr = (has_in && S_ISCHR(in_stat.st_mode)) ||
(has_out && S_ISCHR(out_stat.st_mode));
set_keep_alive = !unix_socket_path && is_socket && !is_listen;
/* all connections are non-blocking */
if (has_in) {
if (set_keep_alive)
net_set_keep_alive(input_fd);
fd_set_nonblocking(input_fd);
}
if (!is_same && has_out) {
fd_set_nonblocking(output_fd);
if (set_keep_alive)
net_set_keep_alive(output_fd);
}
con = xmalloc(sizeof(*con));
*con = (conmgr_fd_t){
.magic = MAGIC_CON_MGR_FD,
.address = {
.ss_family = AF_UNSPEC
},
.input_fd = input_fd,
.output_fd = output_fd,
.events = events,
.mss = NO_VAL,
.work = list_create(NULL),
.write_complete_work = list_create(NULL),
.new_arg = arg,
.type = CON_TYPE_NONE,
.polling_input_fd = PCTL_TYPE_NONE,
.polling_output_fd = PCTL_TYPE_NONE,
/* Set flags not related to connection state tracking */
.flags = (flags & ~FLAGS_MASK_STATE),
};
/* save if connection is a socket type to avoid calling fstat() again */
con_assign_flag(con, FLAG_IS_SOCKET, is_socket);
con_assign_flag(con, FLAG_IS_LISTEN, is_listen);
con_assign_flag(con, FLAG_READ_EOF, !has_in);
con_assign_flag(con, FLAG_IS_FIFO, is_fifo);
con_assign_flag(con, FLAG_IS_CHR, is_chr);
/*
* FLAG_TLS_FINGERPRINT is mutually exclusive with FLAG_TLS_CLIENT or
* FLAG_TLS_SERVER. It is expected that FLAG_TLS_FINGERPRINT may be set
* by default, which means we should silently remove
* FLAG_TLS_FINGERPRINT if either are set.
*/
if (con_flag(con, FLAG_TLS_CLIENT) || con_flag(con, FLAG_TLS_SERVER))
con_unset_flag(con, FLAG_TLS_FINGERPRINT);
if (!is_listen) {
con->in = create_buf(xmalloc(BUFFER_START_SIZE),
BUFFER_START_SIZE);
con->out = list_create((ListDelF) free_buf);
}
/* listen on unix socket */
if (!unix_socket_path && source &&
(source->address.ss_family == AF_LOCAL)) {
struct sockaddr_un *un = (struct sockaddr_un *) &source->address;
unix_socket_path = un->sun_path;
}
if (unix_socket_path) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
xassert(unix_socket_path_len <= unix_socket_path_max);
xassert(con_flag(con, FLAG_IS_SOCKET));
xassert(addr->ss_family == AF_LOCAL);
con->address.ss_family = AF_LOCAL;
strlcpy(un->sun_path, unix_socket_path, unix_socket_path_len);
} else if (is_socket && (addrlen > 0) && addr) {
memcpy(&con->address, addr, addrlen);
}
if (is_socket && (con->address.ss_family == AF_UNSPEC)) {
int rc = SLURM_SUCCESS;
int fd = (has_out ? output_fd : input_fd);
if (!is_listen &&
(rc = slurm_get_peer_addr(fd, &con->address))) {
log_flag(CONMGR, "%s: [fd:%d] Unable to resolve remote host: %s",
__func__, fd, slurm_strerror(rc));
} else if (slurm_get_stream_addr(fd, &con->address)) {
log_flag(CONMGR, "%s: [fd:%d] Unable to resolve bind()ed IP: %m",
__func__, fd);
}
}
if (has_out) {
int bytes = -1;
if (!fd_get_buffered_output_bytes(con->output_fd, &bytes,
con->name)) {
xassert(bytes >= 0);
con_set_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER);
}
}
_set_connection_name(con, &in_stat, &out_stat);
fd_change_mode(con, type);
if (con_flag(con, FLAG_WATCH_CONNECT_TIMEOUT))
con->last_read = timespec_now();
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: [%s] new connection input_fd=%u output_fd=%u flags=%s",
__func__, con->name, input_fd, output_fd, flags);
xfree(flags);
}
if (tls_conn)
tls_adopt(con, tls_conn);
slurm_mutex_lock(&mgr.mutex);
if (is_listen) {
xassert(con->output_fd <= 0);
list_append(mgr.listen_conns, con);
} else {
list_append(mgr.connections, con);
}
/* Attempt to handle connection state immediately */
handle_connection(true, con);
/*
* interrupt poll () and wake up watch() to eventually re-examine new
* connection.
*/
pollctl_interrupt(__func__);
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
return SLURM_SUCCESS;
}
extern void wrap_on_connection(conmgr_callback_args_t conmgr_args, void *arg)
{
conmgr_fd_t *con = conmgr_args.con;
if (con_flag(con, FLAG_IS_LISTEN)) {
log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_listen_connect);
arg = con->events->on_listen_connect(conmgr_args, con->new_arg);
log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_listen_connect,
(uintptr_t) arg);
} else {
log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_connection);
arg = con->events->on_connection(conmgr_args, con->new_arg);
log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_connection,
(uintptr_t) arg);
}
if (!arg) {
error("%s: [%s] closing connection due to NULL return from on_connection",
__func__, con->name);
close_con(false, con);
return;
}
slurm_mutex_lock(&mgr.mutex);
con->arg = arg;
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
}
extern int conmgr_process_fd(conmgr_con_type_t type, int input_fd,
int output_fd, const conmgr_events_t *events,
conmgr_con_flags_t flags,
const slurm_addr_t *addr, socklen_t addrlen,
void *tls_conn, void *arg)
{
return add_connection(type, NULL, input_fd, output_fd, events,
flags, addr, addrlen, false, NULL, tls_conn, arg);
}
extern int conmgr_process_fd_listen(int fd, conmgr_con_type_t type,
const conmgr_events_t *events,
conmgr_con_flags_t flags, void *arg)
{
return add_connection(type, NULL, fd, -1, events, flags, NULL,
0, true, NULL, NULL, arg);
}
static void _receive_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
receive_fd_args_t *args = arg;
conmgr_fd_t *src = conmgr_args.con;
int fd = -1;
xassert(args->magic == MAGIC_RECEIVE_FD);
xassert(src->magic == MAGIC_CON_MGR_FD);
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
log_flag(CONMGR, "%s: [%s] Canceled receive new file descriptor",
__func__, src->name);
} else if (con_flag(src, FLAG_READ_EOF)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d",
__func__, src->name, src->input_fd);
} else if (src->input_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d",
__func__, src->name, src->input_fd);
} else if ((fd = receive_fd_over_socket(src->input_fd)) < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on input_fd=%d",
__func__, src->name, src->input_fd);
/*
* Close source as receive_fd_over_socket() failed and
* connection is now in an unknown state
*/
close_con(false, src);
} else if (add_connection(args->type, NULL, fd, fd, args->events,
CON_FLAG_NONE, NULL, 0, false, NULL,
NULL, args->arg) != SLURM_SUCCESS) {
/*
* Error already logged by add_connection() and there is no
* reason to assume that failing is due to the state of src.
*/
}
args->magic = ~MAGIC_RECEIVE_FD;
xfree(args);
}
extern int conmgr_queue_receive_fd(conmgr_fd_t *src, conmgr_con_type_t type,
const conmgr_events_t *events, void *arg)
{
int rc = SLURM_ERROR;
slurm_mutex_lock(&mgr.mutex);
xassert(src->magic == MAGIC_CON_MGR_FD);
xassert(type > CON_TYPE_NONE);
xassert(type < CON_TYPE_MAX);
/* Reject obviously invalid states immediately */
if (!con_flag(src, FLAG_IS_SOCKET)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on non-socket",
__func__, src->name);
rc = EAFNOSUPPORT;
} else if (con_flag(src, FLAG_READ_EOF)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d",
__func__, src->name, src->input_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else if (src->input_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d",
__func__, src->name, src->input_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else {
receive_fd_args_t *args = xmalloc_nz(sizeof(*args));
*args = (receive_fd_args_t) {
.magic = MAGIC_RECEIVE_FD,
.type = type,
.events = events,
.arg = arg,
};
add_work_con_fifo(true, src, _receive_fd, args);
rc = SLURM_SUCCESS;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static void _send_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
send_fd_args_t *args = arg;
conmgr_fd_t *con = conmgr_args.con;
int fd = args->fd;
xassert(args->magic == MAGIC_SEND_FD);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
log_flag(CONMGR, "%s: [%s] Canceled sending file descriptor %d.",
__func__, con->name, fd);
} else if (con->output_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d",
__func__, con->name, fd, con->output_fd);
} else {
send_fd_over_socket(con->output_fd, fd);
log_flag(CONMGR, "%s: [%s] Sent file descriptor %d over output_fd=%d",
__func__, con->name, fd, con->output_fd);
}
/* always close the file descriptor in this process to avoid leaking */
fd_close(&fd);
args->magic = ~MAGIC_SEND_FD;
xfree(args);
}
extern int conmgr_queue_send_fd(conmgr_fd_t *con, int fd)
{
int rc = SLURM_ERROR;
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send invalid file descriptor %d",
__func__, con->name, fd);
rc = EINVAL;
} else if (!con_flag(con, FLAG_IS_SOCKET)) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over non-socket",
__func__, con->name, fd);
rc = EAFNOSUPPORT;
} else if (con->output_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d",
__func__, con->name, fd, con->output_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else {
send_fd_args_t *args = xmalloc_nz(sizeof(*args));
*args = (send_fd_args_t) {
.magic = MAGIC_SEND_FD,
.fd = fd,
};
add_work_con_fifo(true, con, _send_fd, args);
rc = SLURM_SUCCESS;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static void _deferred_close_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
conmgr_fd_t *con = conmgr_args.con;
slurm_mutex_lock(&mgr.mutex);
if (con_flag(con, FLAG_WORK_ACTIVE)) {
slurm_mutex_unlock(&mgr.mutex);
conmgr_queue_close_fd(con);
} else {
close_con(true, con);
slurm_mutex_unlock(&mgr.mutex);
}
}
/* Caller must hold mgr.mutex lock */
static void _close_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (!con_flag(con, FLAG_WORK_ACTIVE)) {
/*
* Defer request to close connection until connection is no
* longer actively doing work as closing connection would change
* several variables guaranteed to not change while work is
* active.
*/
add_work_con_fifo(true, con, _deferred_close_fd, con);
} else {
close_con(true, con);
}
}
extern void conmgr_queue_close_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
_close_fd(con);
slurm_mutex_unlock(&mgr.mutex);
}
extern void conmgr_con_queue_close(conmgr_fd_ref_t *ref)
{
if (!ref || !ref->con)
return;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
_close_fd(ref->con);
slurm_mutex_unlock(&mgr.mutex);
}
extern void conmgr_con_queue_close_free(conmgr_fd_ref_t **ref_ptr)
{
conmgr_fd_ref_t *ref = NULL;
xassert(ref_ptr);
/* skip if already released */
if (!(ref = *ref_ptr))
return;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
_close_fd(ref->con);
fd_free_ref(ref_ptr);
slurm_mutex_unlock(&mgr.mutex);
}
static int _match_socket_address(void *x, void *key)
{
conmgr_fd_t *con = x;
const slurm_addr_t *addr1 = key;
const slurm_addr_t *addr2 = &con->address;
xassert(con->magic == MAGIC_CON_MGR_FD);
if (addr1->ss_family != addr2->ss_family)
return 0;
switch (addr1->ss_family) {
case AF_INET:
{
const struct sockaddr_in *a1 =
(const struct sockaddr_in *) addr1;
const struct sockaddr_in *a2 =
(const struct sockaddr_in *) addr2;
if (a1->sin_port != a2->sin_port)
return 0;
return !memcmp(&a1->sin_addr.s_addr,
&a2->sin_addr.s_addr,
sizeof(a2->sin_addr.s_addr));
}
case AF_INET6:
{
const struct sockaddr_in6 *a1 =
(const struct sockaddr_in6 *) addr1;
const struct sockaddr_in6 *a2 =
(const struct sockaddr_in6 *) addr2;
if (a1->sin6_port != a2->sin6_port)
return 0;
if (a1->sin6_scope_id != a2->sin6_scope_id)
return 0;
return !memcmp(&a1->sin6_addr.s6_addr,
&a2->sin6_addr.s6_addr,
sizeof(a2->sin6_addr.s6_addr));
}
case AF_UNIX:
{
const struct sockaddr_un *a1 =
(const struct sockaddr_un *) addr1;
const struct sockaddr_un *a2 =
(const struct sockaddr_un *) addr2;
return !xstrcmp(a1->sun_path, a2->sun_path);
}
default:
{
fatal_abort("Unexpected ss family type %u",
(uint32_t) addr1->ss_family);
}
}
/* Unreachable */
fatal_abort("This should never happen");
}
static bool _is_listening(const slurm_addr_t *addr, socklen_t addrlen)
{
/* use address to ensure memory size is correct */
slurm_addr_t address = {0};
memcpy(&address, addr, addrlen);
if (list_find_first_ro(mgr.listen_conns, _match_socket_address,
&address))
return true;
return false;
}
static int _add_unix_listener(conmgr_con_type_t type, conmgr_con_flags_t flags,
const char *listen_on, const char *unixsock,
const conmgr_events_t *events, void *arg)
{
slurm_addr_t addr = { 0 };
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
int rc = EINVAL;
if (fd < 0)
fatal("%s: socket() failed: %m", __func__);
addr = sockaddr_from_unix_path(unixsock);
if (addr.ss_family != AF_UNIX)
fatal("%s: [%s] Invalid Unix socket path: %s",
__func__, listen_on, unixsock);
log_flag(CONMGR, "%s: [%pA] attempting to bind() and listen() UNIX socket",
__func__, &addr);
if (unlink(unixsock) && (errno != ENOENT))
error("Error unlink(%s): %m", unixsock);
/* bind() will EINVAL if socklen=sizeof(addr) */
if ((rc = bind(fd, (const struct sockaddr *) &addr,
sizeof(struct sockaddr_un))))
fatal("%s: [%s] Unable to bind UNIX socket: %m",
__func__, listen_on);
fd_set_oob(fd, 0);
rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG);
if (rc < 0)
fatal("%s: [%s] unable to listen(): %m",
__func__, listen_on);
return add_connection(type, NULL, fd, -1, events, flags, &addr,
sizeof(addr), true, unixsock, NULL, arg);
}
static int _add_socket_listener(conmgr_con_type_t type,
conmgr_con_flags_t flags, const char *listen_on,
url_t *url, const conmgr_events_t *events,
void *arg)
{
int rc = SLURM_SUCCESS;
struct addrinfo *addrlist = NULL;
/* resolve out the host and port if provided */
if (!(addrlist = xgetaddrinfo(url->host, url->port)))
fatal("%s: Unable to listen on %s:%s(%s): %m",
__func__, url->host, url->port, listen_on);
/*
* Create a socket for every address returned
* ipv6 clone of net_stream_listen_ports()
*/
for (struct addrinfo *addr = addrlist; !rc && addr != NULL;
addr = addr->ai_next) {
/* clone the address since it will be freed at
* end of this loop
*/
int fd;
int one = 1;
if (_is_listening((const slurm_addr_t *) addr->ai_addr,
addr->ai_addrlen)) {
verbose("%s: ignoring duplicate listen request for %pA",
__func__, (const slurm_addr_t *) addr->ai_addr);
continue;
}
fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC,
addr->ai_protocol);
if (fd < 0)
fatal("%s: [%s] Unable to create socket: %m",
__func__, addrinfo_to_string(addr));
/*
* activate socket reuse to avoid annoying timing issues
* with daemon restarts
*/
if (setsockopt(fd, addr->ai_socktype, SO_REUSEADDR,
&one, sizeof(one)))
fatal("%s: [%s] setsockopt(SO_REUSEADDR) failed: %m",
__func__, addrinfo_to_string(addr));
if (bind(fd, addr->ai_addr, addr->ai_addrlen) != 0)
fatal("%s: [%s] Unable to bind socket: %m",
__func__, addrinfo_to_string(addr));
fd_set_oob(fd, 0);
rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG);
if (rc < 0)
fatal("%s: [%s] unable to listen(): %m",
__func__, addrinfo_to_string(addr));
rc = add_connection(type, NULL, fd, -1, events, flags,
(const slurm_addr_t *) addr->ai_addr,
addr->ai_addrlen, true, NULL, NULL, arg);
}
freeaddrinfo(addrlist);
return rc;
}
extern int conmgr_create_listen_socket(conmgr_con_type_t type,
conmgr_con_flags_t flags,
const char *listen_on,
const conmgr_events_t *events, void *arg)
{
int rc = SLURM_SUCCESS;
url_t url = URL_INITIALIZER;
buf_t buffer = {
.magic = BUF_MAGIC,
.head = (void *) listen_on,
.processed = strlen(listen_on),
.size = strlen(listen_on),
.shadow = true,
};
if ((rc = url_parser_g_parse(__func__, &buffer, &url)))
fatal("%s: Unable to parse %s: %s",
__func__, listen_on, slurm_strerror(rc));
switch (url.scheme) {
case URL_SCHEME_UNIX:
rc = _add_unix_listener(type, flags, listen_on, url.path,
events, arg);
break;
case URL_SCHEME_HTTPS:
flags |= CON_FLAG_TLS_SERVER;
if (!tls_available())
fatal("Cannot create https:// socket because no TLS plugin is available");
/* fall through */
case URL_SCHEME_HTTP:
case URL_SCHEME_INVALID:
rc = _add_socket_listener(type, flags, listen_on, &url, events,
arg);
break;
case URL_SCHEME_INVALID_MAX:
fatal_abort("should never happen");
}
url_free_members(&url);
return rc;
}
static int _setup_listen_socket(void *x, void *arg)
{
const char *hostport = (const char *)x;
socket_listen_init_t *init = arg;
init->rc = conmgr_create_listen_socket(init->type, init->flags,
hostport, init->events,
init->arg);
return (init->rc ? SLURM_ERROR : SLURM_SUCCESS);
}
extern int conmgr_create_listen_sockets(conmgr_con_type_t type,
conmgr_con_flags_t flags,
list_t *hostports,
const conmgr_events_t *events,
void *arg)
{
socket_listen_init_t init = {
.events = events,
.arg = arg,
.type = type,
.flags = flags,
};
(void) list_for_each(hostports, _setup_listen_socket, &init);
return init.rc;
}
extern int conmgr_create_connect_socket(conmgr_con_type_t type,
conmgr_con_flags_t flags,
slurm_addr_t *addr, socklen_t addrlen,
const conmgr_events_t *events,
void *arg)
{
int fd = -1, rc = SLURM_ERROR;
//socklen_t bindlen = 0;
if (addr->ss_family == AF_UNIX) {
fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC), 0);
//bindlen = sizeof(struct sockaddr_un);
} else if ((addr->ss_family == AF_INET) ||
(addr->ss_family == AF_INET6)) {
fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC),
IPPROTO_TCP);
//bindlen = addrlen;
} else {
return EAFNOSUPPORT;
}
if (fd < 0) {
rc = errno;
log_flag(NET, "%s: [%pA] socket() failed: %s",
__func__, addr, slurm_strerror(rc));
return rc;
}
/* Set socket as non-blocking to avoid connect() blocking */
fd_set_nonblocking(fd);
log_flag(CONMGR, "%s: [%pA(fd:%d)] attempting to connect() new socket",
__func__, addr, fd);
again:
if ((rc = connect(fd, (const struct sockaddr *) addr, addrlen))) {
rc = errno;
if (rc == EINTR) {
bool shutdown;
slurm_mutex_lock(&mgr.mutex);
xassert(mgr.initialized);
shutdown = mgr.shutdown_requested;
slurm_mutex_unlock(&mgr.mutex);
if (shutdown) {
log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted during shutdown. Closing connection.",
__func__, addr, fd);
fd_close(&fd);
return SLURM_SUCCESS;
}
log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted. Retrying.",
__func__, addr, fd);
goto again;
}
if ((rc != EINPROGRESS) && (rc != EAGAIN) &&
(rc != EWOULDBLOCK)) {
log_flag(NET, "%s: [%pA(fd:%d)] connect() failed: %s",
__func__, addr, fd, slurm_strerror(rc));
fd_close(&fd);
return rc;
}
/* delayed connect() completion is expected */
}
return add_connection(type, NULL, fd, fd, events, flags, addr, addrlen,
false, NULL, NULL, arg);
}
/* WARNING: caller must not hold mgr.mutex lock */
static int _get_auth_creds(conmgr_fd_t *con, uid_t *cred_uid, gid_t *cred_gid,
pid_t *cred_pid)
{
int rc = EINVAL, input_fd = -1, output_fd = -1;
xassert(cred_uid);
xassert(cred_gid);
xassert(cred_pid);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (!cred_uid || !cred_gid || !cred_pid)
return rc;
slurm_mutex_lock(&mgr.mutex);
if (con_flag(con, FLAG_IS_SOCKET)) {
if (!con_flag(con, FLAG_READ_EOF))
input_fd = con->input_fd;
output_fd = con->output_fd;
}
slurm_mutex_unlock(&mgr.mutex);
if ((input_fd < 0) || (output_fd < 0)) {
/* Both sockets must be open to authenticate */
return SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else if ((rc = net_get_peer(input_fd, cred_uid, cred_gid,
cred_pid))) {
return rc;
}
if (!rc) {
slurm_mutex_lock(&mgr.mutex);
/* Catch connection state changing during kernel queries */
if ((input_fd != con->input_fd) ||
(output_fd != con->output_fd) ||
con_flag(con, FLAG_READ_EOF))
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
slurm_mutex_unlock(&mgr.mutex);
}
return rc;
}
extern int conmgr_get_fd_auth_creds(conmgr_fd_t *con, uid_t *cred_uid,
gid_t *cred_gid, pid_t *cred_pid)
{
if (!con)
return EINVAL;
xassert(con->magic == MAGIC_CON_MGR_FD);
return _get_auth_creds(con, cred_uid, cred_gid, cred_pid);
}
extern int conmgr_con_get_auth_creds(conmgr_fd_ref_t *con, uid_t *cred_uid,
gid_t *cred_gid, pid_t *cred_pid)
{
if (!con || !con->con)
return EINVAL;
xassert(con->magic == MAGIC_CON_MGR_FD_REF);
xassert(con->con->magic == MAGIC_CON_MGR_FD);
return _get_auth_creds(con->con, cred_uid, cred_gid, cred_pid);
}
extern const char *conmgr_fd_get_name(const conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con->name && con->name[0]);
return con->name;
}
extern const char *conmgr_con_get_name(conmgr_fd_ref_t *ref)
{
if (!ref)
return NULL;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
if (!ref->con)
return NULL;
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
return conmgr_fd_get_name(ref->con);
}
extern conmgr_fd_status_t conmgr_fd_get_status(conmgr_fd_t *con)
{
conmgr_fd_status_t status = {
.is_socket = con_flag(con, FLAG_IS_SOCKET),
.unix_socket = NULL,
.is_listen = con_flag(con, FLAG_IS_LISTEN),
.read_eof = con_flag(con, FLAG_READ_EOF),
.is_connected = con_flag(con, FLAG_IS_CONNECTED),
};
if (con->address.ss_family == AF_LOCAL) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
status.unix_socket = un->sun_path;
}
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con_flag(con, FLAG_WORK_ACTIVE));
return status;
}
/*
* Find by matching fd to connection
*/
static int _find_by_fd(void *x, void *key)
{
conmgr_fd_t *con = x;
int fd = *(int *)key;
return (con->input_fd == fd) || (con->output_fd == fd);
}
extern conmgr_fd_t *con_find_by_fd(int fd)
{
conmgr_fd_t *con;
if ((con = list_find_first(mgr.connections, _find_by_fd, &fd)))
return con;
if ((con = list_find_first(mgr.listen_conns, _find_by_fd, &fd)))
return con;
/* mgr.complete_conns don't have input_fd or output_fd */
return NULL;
}
extern void con_close_on_poll_error(conmgr_fd_t *con, int fd)
{
if (con_flag(con, FLAG_IS_SOCKET)) {
/* Ask kernel for socket error */
int rc = SLURM_ERROR, err = SLURM_ERROR;
if ((rc = fd_get_socket_error(fd, &err)))
error("%s: [%s] error while getting socket error: %s",
__func__, con->name, slurm_strerror(rc));
else if (err)
error("%s: [%s] socket error encountered while polling: %s",
__func__, con->name, slurm_strerror(err));
}
/*
* Socket must not continue to be considered valid to avoid a
* infinite calls to poll() which will immediately fail. Close
* the relevant file descriptor and remove from connection.
*/
close_con(true, con);
}
static int _set_fd_polling(int fd, pollctl_fd_type_t *old,
pollctl_fd_type_t new, const char *con_name,
const char *caller)
{
if (*old == PCTL_TYPE_UNSUPPORTED)
return SLURM_SUCCESS;
if (*old == new)
return SLURM_SUCCESS;
if (new == PCTL_TYPE_NONE) {
int rc = SLURM_SUCCESS;
if ((*old != PCTL_TYPE_NONE) &&
!(rc = pollctl_unlink_fd(fd, con_name, caller)))
*old = new;
return rc;
}
if (*old != PCTL_TYPE_NONE) {
int rc;
if (!(rc = pollctl_relink_fd(fd, new, con_name, caller)))
*old = new;
return rc;
} else {
int rc = pollctl_link_fd(fd, new, con_name, caller);
if (!rc)
*old = new;
return rc;
}
}
static void _log_set_polling(conmgr_fd_t *con, bool has_in, bool has_out,
pollctl_fd_type_t type, pollctl_fd_type_t in_type,
pollctl_fd_type_t out_type, const char *caller)
{
char *log = NULL, *at = NULL;
const char *op = "maintain";
if (!(slurm_conf.debug_flags & DEBUG_FLAG_CONMGR))
return;
if (has_in) {
const char *old, *new;
old = pollctl_type_to_string(con->polling_input_fd);
new = pollctl_type_to_string(in_type);
xstrfmtcatat(log, &at, " in[%d]:%s", con->input_fd, old);
if (in_type != con->polling_input_fd) {
xstrfmtcatat(log, &at, "->%s", new);
op = "changing";
}
}
if (has_out) {
const char *old, *new;
old = pollctl_type_to_string(con->polling_output_fd);
new = pollctl_type_to_string(out_type);
xstrfmtcatat(log, &at, " out[%d]:%s", con->output_fd, old);
if (out_type != con->polling_output_fd) {
xstrfmtcatat(log, &at, "->%s", new);
op = "changing";
}
}
log_flag(CONMGR, "%s->%s: [%s] %s polling:%s%s",
caller, XSTRINGIFY(con_set_polling), con->name, op,
pollctl_type_to_string(type), (log ? log : ""));
xfree(log);
}
static void _on_change_polling_fail(conmgr_fd_t *con, int rc,
pollctl_fd_type_t old_type,
pollctl_fd_type_t new_type,
const char *fd_name, const int fd,
pollctl_fd_type_t *dst, const char *caller)
{
error("%s->%s: [%s] closing connection after change polling %s->%s for %s=%d failed: %s",
caller, __func__, con->name, pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd,
slurm_strerror(rc));
if (rc == EBADF) {
/* Remove defunct FD immediately */
if (con->input_fd == fd) {
con->input_fd = -1;
con->polling_input_fd = PCTL_TYPE_UNSUPPORTED;
con_unset_flag(con, FLAG_CAN_READ);
con_set_flag(con, FLAG_READ_EOF);
}
if (con->output_fd == fd) {
con->output_fd = -1;
con->polling_output_fd = PCTL_TYPE_UNSUPPORTED;
con_unset_flag(con, FLAG_CAN_WRITE);
con_unset_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER);
}
} else {
/* Attempt graceful closing of connection */
*dst = PCTL_TYPE_UNSUPPORTED;
}
close_con(true, con);
close_con_output(true, con);
}
static void _on_change_polling_rc(conmgr_fd_t *con, int rc,
pollctl_fd_type_t old_type,
pollctl_fd_type_t new_type, bool input,
const char *caller)
{
const char *fd_name = (input ? "input_fd" : "output_fd");
const int fd = (input ? con->input_fd : con->output_fd);
pollctl_fd_type_t *dst =
(input ? &con->polling_input_fd : &con->polling_output_fd);
switch (rc) {
case EEXIST:
/*
* poll() is already monitoring this file descriptor but conmgr
* didn't ask for this poll()ing. conmgr has no idea what mode
* poll() is already monitoring, so instead change to relink to
* to set the correct mode.
*/
log_flag(CONMGR, "%s->%s: [%s] forcing changed polling %s->%s for %s=%d",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
if ((rc = pollctl_relink_fd(fd, new_type, con->name, __func__)))
_on_change_polling_fail(con, rc, old_type, new_type,
fd_name, fd, dst, caller);
else
*dst = new_type;
break;
case ENOENT:
log_flag(CONMGR, "%s->%s: [%s] ignoring request to change polling %s->%s for %s=%d",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
*dst = PCTL_TYPE_NONE;
break;
case EPERM:
log_flag(CONMGR, "%s->%s: [%s] polling %s->%s for %s=%d not supported by kernel",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
*dst = PCTL_TYPE_UNSUPPORTED;
break;
default:
_on_change_polling_fail(con, rc, old_type, new_type, fd_name,
fd, dst, caller);
break;
}
}
extern void con_set_polling(conmgr_fd_t *con, pollctl_fd_type_t type,
const char *caller)
{
int has_in, has_out, in, out, is_same;
int rc_in = SLURM_SUCCESS, rc_out = SLURM_SUCCESS;
pollctl_fd_type_t in_type = PCTL_TYPE_NONE, out_type = PCTL_TYPE_NONE;
_validate_pctl_type(type);
_validate_pctl_type(con->polling_input_fd);
_validate_pctl_type(con->polling_output_fd);
in = con->input_fd;
has_in = (in >= 0);
out = con->output_fd;
has_out = (out >= 0);
is_same = (con->input_fd == con->output_fd);
if (!has_in && !has_out) {
xassert(con->polling_input_fd == PCTL_TYPE_NONE);
xassert(con->polling_output_fd == PCTL_TYPE_NONE);
xassert(type == PCTL_TYPE_NONE);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: skipping connection flags=%s",
__func__, flags);
xfree(flags);
}
return;
}
/*
* Map type to type per in/out. The in/out types are initialized to
* PCTL_TYPE_NONE above.
*/
switch (type) {
case PCTL_TYPE_UNSUPPORTED:
fatal_abort("should never happen");
case PCTL_TYPE_NONE:
break;
case PCTL_TYPE_CONNECTED:
if (is_same) {
in_type = PCTL_TYPE_CONNECTED;
} else {
in_type = PCTL_TYPE_CONNECTED;
out_type = PCTL_TYPE_CONNECTED;
}
break;
case PCTL_TYPE_READ_ONLY:
in_type = PCTL_TYPE_READ_ONLY;
break;
case PCTL_TYPE_READ_WRITE:
if (is_same) {
in_type = PCTL_TYPE_READ_WRITE;
} else {
in_type = PCTL_TYPE_READ_ONLY;
out_type = PCTL_TYPE_WRITE_ONLY;
}
break;
case PCTL_TYPE_WRITE_ONLY:
if (is_same) {
in_type = PCTL_TYPE_WRITE_ONLY;
} else {
out_type = PCTL_TYPE_WRITE_ONLY;
}
break;
case PCTL_TYPE_LISTEN:
xassert(con_flag(con, FLAG_IS_LISTEN));
in_type = PCTL_TYPE_LISTEN;
break;
case PCTL_TYPE_INVALID:
case PCTL_TYPE_INVALID_MAX:
fatal_abort("should never execute");
}
if (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED)
out_type = PCTL_TYPE_UNSUPPORTED;
if (con->polling_input_fd == PCTL_TYPE_UNSUPPORTED)
in_type = PCTL_TYPE_UNSUPPORTED;
_log_set_polling(con, has_in, has_out, type, in_type, out_type, caller);
if (is_same) {
/* same never link output_fd */
xassert((con->polling_output_fd == PCTL_TYPE_NONE) ||
(con->polling_output_fd == PCTL_TYPE_UNSUPPORTED));
rc_in = _set_fd_polling(in, &con->polling_input_fd, in_type,
con->name, caller);
} else {
if (has_in)
rc_in = _set_fd_polling(in, &con->polling_input_fd,
in_type, con->name, caller);
if (has_out)
rc_out = _set_fd_polling(out, &con->polling_output_fd,
out_type, con->name, caller);
}
if (rc_in)
_on_change_polling_rc(con, rc_in, con->polling_input_fd,
in_type, true, caller);
if (rc_out)
_on_change_polling_rc(con, rc_out, con->polling_output_fd,
out_type, false, caller);
}
extern void on_extract(conmgr_callback_args_t conmgr_args, void *arg)
{
int input_fd = -1, output_fd = -1;
void *conn = NULL;
conmgr_extract_fd_func_t func = NULL;
const char *func_name = NULL;
void *func_arg = NULL;
conmgr_fd_t *con = conmgr_args.con;
xassert(!arg);
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
/* Verify extract was requested */
xassert(con_flag(con, FLAG_WAIT_ON_EXTRACT));
xassert(con_flag(con, FLAG_WORK_ACTIVE));
xassert(con->on_extract.func);
xassert(con->on_extract.func_name);
/* Polling should already be disabled */
xassert((con->polling_input_fd == PCTL_TYPE_NONE) ||
(con->polling_input_fd == PCTL_TYPE_UNSUPPORTED));
xassert((con->polling_output_fd == PCTL_TYPE_NONE) ||
(con->polling_output_fd == PCTL_TYPE_UNSUPPORTED));
/* can't extract safely when work running or not connected */
xassert(con_flag(con, FLAG_IS_CONNECTED));
xassert(!(con_flag(con, FLAG_TLS_SERVER) ||
con_flag(con, FLAG_TLS_CLIENT)) ||
con_flag(con, FLAG_IS_TLS_CONNECTED));
xassert(!con_flag(con, FLAG_TLS_FINGERPRINT));
xassert(!con_flag(con, FLAG_TLS_WAIT_ON_CLOSE));
xassert(!con_flag(con, FLAG_WAIT_ON_FINISH));
/* assert input/outputs are empty */
xassert(!con->tls_out || list_is_empty(con->out));
xassert(!con->tls_in || !get_buf_offset(con->tls_in));
xassert(list_is_empty(con->out));
xassert(!get_buf_offset(con->in));
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: [%s] BEGIN: extracting input_fd=%d output_fd=%d tls=0x%"PRIxPTR " func=%s(0x%"PRIxPTR") flags=%s",
__func__, con->name, con->input_fd, con->output_fd,
(uintptr_t) con->tls, con->on_extract.func_name,
(uintptr_t) con->on_extract.func_arg, flags);
xfree(flags);
}
/*
* Swap out func() and args to allow calling func() on failure
* even if the file descriptors have not been extracted
*/
SWAP(func, con->on_extract.func);
SWAP(func_name, con->on_extract.func_name);
SWAP(func_arg, con->on_extract.func_arg);
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED)
goto failed;
/* Catch file descriptors being closed for any reason before now */
if (((con->input_fd < 0) || con_flag(con, FLAG_READ_EOF)) &&
(con->output_fd < 0)) {
log_flag(CONMGR, "%s: [%s] invalid input_fd and output_fd",
__func__, con->name);
goto failed;
}
/* clear all polling states */
con_set_flag(con, FLAG_READ_EOF);
con_unset_flag(con, FLAG_CAN_READ);
con_unset_flag(con, FLAG_CAN_WRITE);
con_unset_flag(con, FLAG_ON_DATA_TRIED);
/* Remove extraction state from connection */
SWAP(input_fd, con->input_fd);
SWAP(output_fd, con->output_fd);
SWAP(conn, con->tls);
slurm_mutex_unlock(&mgr.mutex);
/*
* Treat partially shutdown connections as a single file descriptor to
* avoid triggering assert()s or failures in interfaces/tls
*/
if (input_fd < 0)
input_fd = output_fd;
else if (output_fd < 0)
output_fd = input_fd;
/* Set file descriptors as blocking by default */
fd_set_blocking(input_fd);
if (input_fd != output_fd)
fd_set_blocking(output_fd);
if (conn) {
int rc = EINVAL;
/*
* Assign ownership of the file descriptor to the interface/TLS
* connection.
*/
if ((rc = tls_g_set_conn_fds(conn, input_fd, output_fd))) {
log_flag(CONMGR, "%s: [%s] tls_g_set_fds() failed: %s",
__func__, con->name, slurm_strerror(rc));
slurm_mutex_lock(&mgr.mutex);
goto failed;
}
} else {
conn_args_t args = {
.input_fd = input_fd,
.output_fd = output_fd,
};
/* Create new interface/conn connection from file descriptors */
if (!(conn = conn_g_create(&args))) {
log_flag(CONMGR, "%s: [%s] conn_g_create() failed: %m",
__func__, con->name);
slurm_mutex_lock(&mgr.mutex);
goto failed;
}
}
func(conmgr_args, conn, func_arg);
slurm_mutex_lock(&mgr.mutex);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: [%s] END: extracting input_fd=%d output_fd=%d tls=0x%"PRIxPTR " func=%s(0x%"PRIxPTR") flags=%s",
__func__, con->name, input_fd, output_fd,
(uintptr_t) conn, func_name, (uintptr_t) func_arg,
flags);
xfree(flags);
}
/* Close connection as file descriptors are now extracted */
con_unset_flag(con, FLAG_WAIT_ON_EXTRACT);
close_con(true, con);
slurm_mutex_unlock(&mgr.mutex);
return;
failed:
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: [%s] FAILED: extracting input_fd=%d output_fd=%d tls=0x%"PRIxPTR " func=%s(0x%"PRIxPTR") flags=%s",
__func__, con->name,
((input_fd > 0) ? input_fd : con->input_fd),
((output_fd > 0) ? output_fd : con->output_fd),
(uintptr_t) (conn ? conn : con->tls),
con->on_extract.func_name,
(uintptr_t) con->on_extract.func_arg, flags);
xfree(flags);
}
con_unset_flag(con, FLAG_WAIT_ON_EXTRACT);
close_con(true, con);
slurm_mutex_unlock(&mgr.mutex);
/* Close file descriptors if they were extracted */
if (!conn) {
fd_close(&input_fd);
fd_close(&output_fd);
}
FREE_NULL_CONN(conn);
/* Always set failed as cancelled */
conmgr_args.status = CONMGR_WORK_STATUS_CANCELLED;
/* Notify requester that extract failed */
func(conmgr_args, NULL, func_arg);
}
extern int conmgr_queue_extract_con_fd(conmgr_fd_t *con,
conmgr_extract_fd_func_t func,
const char *func_name, void *func_arg)
{
xassert(con);
xassert(func);
if (!con || !func)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(!con_flag(con, FLAG_WAIT_ON_EXTRACT));
xassert(!con->on_extract.func);
xassert(!con->on_extract.func_name);
xassert(!con->on_extract.func_arg);
/* Disable all polling for this connection */
con_set_polling(con, PCTL_TYPE_NONE, __func__);
/* Set extract state on connectoin */
con->on_extract.func = func;
con->on_extract.func_name = func_name;
con->on_extract.func_arg = func_arg;
handle_connection(true, con);
slurm_mutex_unlock(&mgr.mutex);
return SLURM_SUCCESS;
}
static int _unquiesce_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (!con_flag(con, FLAG_QUIESCE))
return SLURM_SUCCESS;
con_unset_flag(con, FLAG_QUIESCE);
EVENT_SIGNAL(&mgr.watch_sleep);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: unquiesced connection flags=%s",
__func__, flags);
xfree(flags);
}
return SLURM_SUCCESS;
}
extern int conmgr_unquiesce_fd(conmgr_fd_t *con)
{
int rc;
if (!con)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
rc = _unquiesce_fd(con);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
extern bool conmgr_con_is_quiesced(conmgr_fd_ref_t *con)
{
bool quiesced;
if (!con || !con->con)
return false;
xassert(con->magic == MAGIC_CON_MGR_FD_REF);
xassert(con->con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
quiesced = con_flag(con->con, FLAG_QUIESCE);
slurm_mutex_unlock(&mgr.mutex);
return quiesced;
}
static int _quiesce_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (con_flag(con, FLAG_QUIESCE))
return SLURM_SUCCESS;
con_set_flag(con, FLAG_QUIESCE);
con_set_polling(con, PCTL_TYPE_NONE, __func__);
EVENT_SIGNAL(&mgr.watch_sleep);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: quiesced connection flags=%s",
__func__, flags);
xfree(flags);
}
return SLURM_SUCCESS;
}
extern int conmgr_quiesce_fd(conmgr_fd_t *con)
{
int rc;
if (!con)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
rc = _quiesce_fd(con);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static bool _is_output_open(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
return (!con_flag(con, FLAG_READ_EOF) && (con->output_fd >= 0));
}
extern bool conmgr_fd_is_output_open(conmgr_fd_t *con)
{
bool open;
xassert(con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
open = _is_output_open(con);
slurm_mutex_unlock(&mgr.mutex);
return open;
}
extern bool conmgr_con_is_output_open(conmgr_fd_ref_t *ref)
{
bool open;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
slurm_mutex_lock(&mgr.mutex);
open = _is_output_open(ref->con);
slurm_mutex_unlock(&mgr.mutex);
return open;
}
extern void fd_new_ref(conmgr_fd_t *src, conmgr_fd_ref_t **dst_ptr)
{
xassert(src->magic == MAGIC_CON_MGR_FD);
xassert(dst_ptr);
xassert(!*dst_ptr);
*dst_ptr = xmalloc(sizeof(**dst_ptr));
**dst_ptr = (conmgr_fd_ref_t) {
.magic = MAGIC_CON_MGR_FD_REF,
.con = src,
};
src->refs++;
xassert(src->refs < INT_MAX);
xassert(src->refs > 0);
}
extern conmgr_fd_ref_t *conmgr_fd_new_ref(conmgr_fd_t *con)
{
conmgr_fd_ref_t *ref = NULL;
if (!con)
fatal_abort("con must not be null");
slurm_mutex_lock(&mgr.mutex);
fd_new_ref(con, &ref);
slurm_mutex_unlock(&mgr.mutex);
return ref;
}
extern conmgr_fd_ref_t *conmgr_con_link(conmgr_fd_ref_t *con)
{
conmgr_fd_ref_t *ref = NULL;
xassert(con);
xassert(con->magic == MAGIC_CON_MGR_FD_REF);
xassert(con->con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
fd_new_ref(con->con, &ref);
slurm_mutex_unlock(&mgr.mutex);
return ref;
}
extern void fd_free_ref(conmgr_fd_ref_t **ref_ptr)
{
conmgr_fd_ref_t *ref = *ref_ptr;
conmgr_fd_t *con = ref->con;
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
con->refs--;
xassert(con->refs < INT_MAX);
xassert(con->refs >= 0);
ref->magic = ~MAGIC_CON_MGR_FD_REF;
xfree((*ref_ptr));
}
extern void conmgr_fd_free_ref(conmgr_fd_ref_t **ref_ptr)
{
if (!ref_ptr)
fatal_abort("ref_ptr must not be null");
/* check if already released */
if (!*ref_ptr)
return;
slurm_mutex_lock(&mgr.mutex);
fd_free_ref(ref_ptr);
slurm_mutex_unlock(&mgr.mutex);
}
extern conmgr_fd_t *fd_get_ref(conmgr_fd_ref_t *ref)
{
if (!ref)
return NULL;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con);
return ref->con;
}
extern conmgr_fd_t *conmgr_fd_get_ref(conmgr_fd_ref_t *ref)
{
conmgr_fd_t *con = fd_get_ref(ref);
#ifndef NDEBUG
if (con) {
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con->refs < INT_MAX);
xassert(con->refs > 0);
slurm_mutex_unlock(&mgr.mutex);
}
#endif
return con;
}
extern bool conmgr_fd_is_tls(conmgr_fd_ref_t *ref)
{
conmgr_fd_t *con = fd_get_ref(ref);
bool tls = false;
xassert(con);
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
tls = (con_flag(con, FLAG_TLS_CLIENT) ||
con_flag(con, FLAG_TLS_SERVER));
slurm_mutex_unlock(&mgr.mutex);
return tls;
}
extern int conmgr_con_get_events(conmgr_fd_ref_t *ref,
const conmgr_events_t **events_ptr,
void **arg_ptr)
{
if (!ref)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
xassert(events_ptr);
xassert(!*events_ptr);
xassert(arg_ptr);
xassert(!*arg_ptr);
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
*events_ptr = ref->con->events;
*arg_ptr = ref->con->arg;
slurm_mutex_unlock(&mgr.mutex);
return SLURM_SUCCESS;
}
extern int conmgr_con_set_events(conmgr_fd_ref_t *ref,
const conmgr_events_t *events, void *arg,
const char *caller)
{
int rc = EINVAL;
if (!ref || !ref->con)
return rc;
slurm_mutex_lock(&mgr.mutex);
xassert(events);
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
/* Reject changing connections in process of cleaning up */
if ((ref->con->input_fd >= 0) || (ref->con->output_fd >= 0)) {
log_flag(CONMGR, "%s->%s: [%s] changing events:0x%"PRIxPTR"->0x%"PRIxPTR" arg:0x%"PRIxPTR"->0x%"PRIxPTR,
caller, __func__, ref->con->name,
(uintptr_t) ref->con->events, (uintptr_t) events,
(uintptr_t) ref->con->arg, (uintptr_t) arg);
ref->con->events = events;
ref->con->arg = arg;
rc = SLURM_SUCCESS;
} else {
log_flag(CONMGR, "%s->%s: [%s] rejecting changing events:0x%"PRIxPTR"->0x%"PRIxPTR" arg:0x%"PRIxPTR"->0x%"PRIxPTR" for closed connection",
caller, __func__, ref->con->name,
(uintptr_t) ref->con->events, (uintptr_t) events,
(uintptr_t) ref->con->arg, (uintptr_t) arg);
rc = ESHUTDOWN;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
#define MAGIC_LIST_BUFFER_STATS 0x4aa19f2f
typedef struct {
int magic; /* MAGIC_LIST_BUFFER_STATS */
int size; /* total number of bytes in list of buffers */
int offset; /* total of offsets in each buffer */
} list_buffer_stats_t;
static int _foreach_count_buffer(void *x, void *arg)
{
buf_t *buf = x;
list_buffer_stats_t *out_stats = arg;
xassert(buf->magic == BUF_MAGIC);
xassert(out_stats->magic == MAGIC_LIST_BUFFER_STATS);
out_stats->offset += get_buf_offset(buf);
out_stats->size += size_buf(buf);
return SLURM_SUCCESS;
}
static int _foreach_log_work(void *x, void *arg)
{
const work_t *work = x;
log_con_work_args_t *args = arg;
probe_log_t *log = args->log;
char str[PRINTF_WORK_CHARS];
xassert(args->magic == MAGIC_CON_LOG_WORK_ARGS);
xassert(work->magic == MAGIC_WORK);
/* logging connection would be redundant */
printf_work(work, str, sizeof(str), false);
probe_log(log, "%s[%d]: %s", args->type, args->index, str);
args->index++;
return SLURM_SUCCESS;
}
static int _foreach_log_connection(void *x, void *arg)
{
conmgr_fd_t *con = x;
probe_log_t *log = arg;
list_buffer_stats_t out_stats = {
.magic = MAGIC_LIST_BUFFER_STATS,
};
list_buffer_stats_t tls_out_stats = {
.magic = MAGIC_LIST_BUFFER_STATS,
};
char last_read[CTIME_STR_LEN] = "", *last_read_delim = "";
char last_write[CTIME_STR_LEN] = "", *last_write_delim = "";
char *flags = NULL;
xassert(con->magic == MAGIC_CON_MGR_FD);
flags = con_flags_string(con->flags);
if (con->last_read.tv_sec) {
last_read_delim = "@";
timespec_ctime(con->last_read, true, last_read,
sizeof(last_read));
}
if (con->last_write.tv_sec) {
last_write_delim = "@";
timespec_ctime(con->last_write, true, last_write,
sizeof(last_write));
}
if (con->out)
(void) list_for_each_ro(con->out, _foreach_count_buffer,
&out_stats);
if (con->tls_out)
(void) list_for_each_ro(con->tls_out, _foreach_count_buffer,
&tls_out_stats);
probe_log(log, "connection: [%s]+%d flags=%s type=%s input_fd=%d output_fd=%d address=%pA TLS=%c tls_input_buffer=%d/%d tls_output_buffer=%d/%d[%d] input_buffer=%d/%d%s%s output_buffers=%d/%d[%d]%s%s mss=%d extracting=%c polling=%s/%s",
con->name, con->refs, flags, conmgr_con_type_string(con->type),
con->input_fd, con->output_fd, &con->address,
BOOL_CHARIFY(con->tls),
(con->tls_in ? get_buf_offset(con->tls_in) : 0),
(con->tls_in ? size_buf(con->tls_in) : 0), tls_out_stats.offset,
tls_out_stats.size, (con->tls_out ? list_count(con->tls_out) : 0),
(con->in ? get_buf_offset(con->in) : 0),
(con->in ? size_buf(con->in) : 0), last_read_delim, last_read,
out_stats.offset, out_stats.size,
(con->out ? list_count(con->out) : 0), last_write_delim,
last_write, (con->mss == NO_VAL ? 0 : con->mss),
BOOL_CHARIFY(con->on_extract.func),
pollctl_type_to_string(con->polling_input_fd),
pollctl_type_to_string(con->polling_output_fd));
xfree(flags);
if (con->work) {
log_con_work_args_t args = {
.magic = MAGIC_CON_LOG_WORK_ARGS,
.type = "work",
.log = log,
};
(void) list_for_each_ro(con->work, _foreach_log_work, &args);
}
if (con->write_complete_work) {
log_con_work_args_t args = {
.magic = MAGIC_CON_LOG_WORK_ARGS,
.type = "write_complete_work",
.log = log,
};
(void) list_for_each_ro(con->write_complete_work,
_foreach_log_work, &args);
}
return SLURM_SUCCESS;
}
/* Caller must hold mgr.mutex lock */
static void _probe_verbose(probe_log_t *log)
{
probe_log(log, "connections:%d/%d listeners:%d complete:%d",
list_count(mgr.connections), mgr.max_connections,
list_count(mgr.listen_conns), list_count(mgr.complete_conns));
(void) list_for_each_ro(mgr.listen_conns, _foreach_log_connection,
log);
(void) list_for_each_ro(mgr.connections, _foreach_log_connection, log);
}
extern probe_status_t probe_connections(probe_log_t *log)
{
probe_status_t status = PROBE_RC_UNKNOWN;
slurm_mutex_lock(&mgr.mutex);
if (log)
_probe_verbose(log);
if (!mgr.connections || !mgr.listen_conns || !mgr.complete_conns)
status = PROBE_RC_DOWN;
else if (mgr_is_accept_deferred())
status = PROBE_RC_BUSY;
else
status = PROBE_RC_READY;
slurm_mutex_unlock(&mgr.mutex);
return status;
}