blob: c06f301a18bdd48f0be4f230092720c88b9875fc [file] [log] [blame]
/*****************************************************************************\
* con.c - definitions for connection handlers in connection manager
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#define _GNU_SOURCE
#include <limits.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__)
#include <sys/param.h>
#include <sys/ucred.h>
#endif
#if defined(__linux__)
#include <sys/sysmacros.h>
#endif /* __linux__ */
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/fd.h"
#include "src/common/http.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/net.h"
#include "src/common/pack.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_socket.h"
#include "src/common/slurm_time.h"
#include "src/common/util-net.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/conmgr/conmgr.h"
#include "src/conmgr/delayed.h"
#include "src/conmgr/mgr.h"
#include "src/conmgr/polling.h"
#include "src/conmgr/tls.h"
#include "src/interfaces/tls.h"
#include "src/interfaces/url_parser.h"
#define T(type) { type, XSTRINGIFY(type) }
static const struct {
conmgr_con_type_t type;
const char *string;
} con_types[] = {
T(CON_TYPE_NONE),
T(CON_TYPE_RAW),
T(CON_TYPE_RPC),
};
#undef T
#define T(flag) { flag, XSTRINGIFY(flag) }
static const struct {
con_flags_t flag;
const char *string;
} con_flags[] = {
T(FLAG_NONE),
T(FLAG_ON_DATA_TRIED),
T(FLAG_IS_SOCKET),
T(FLAG_IS_LISTEN),
T(FLAG_WAIT_ON_FINISH),
T(FLAG_CAN_WRITE),
T(FLAG_CAN_READ),
T(FLAG_READ_EOF),
T(FLAG_IS_CONNECTED),
T(FLAG_WORK_ACTIVE),
T(FLAG_RPC_KEEP_BUFFER),
T(FLAG_QUIESCE),
T(FLAG_CAN_QUERY_OUTPUT_BUFFER),
T(FLAG_IS_FIFO),
T(FLAG_IS_CHR),
T(FLAG_TCP_NODELAY),
T(FLAG_WATCH_WRITE_TIMEOUT),
T(FLAG_WATCH_READ_TIMEOUT),
T(FLAG_WATCH_CONNECT_TIMEOUT),
T(FLAG_TLS_SERVER),
T(FLAG_TLS_CLIENT),
T(FLAG_IS_TLS_CONNECTED),
T(FLAG_WAIT_ON_FINGERPRINT),
T(FLAG_TLS_WAIT_ON_CLOSE),
T(FLAG_RPC_RECV_FORWARD),
};
#undef T
typedef struct {
const conmgr_events_t *events;
void *arg;
conmgr_con_type_t type;
int rc;
conmgr_con_flags_t flags;
} socket_listen_init_t;
typedef struct {
#define MAGIC_RECEIVE_FD 0xeba8bae0
int magic; /* MAGIC_RECEIVE_FD */
conmgr_con_type_t type;
const conmgr_events_t *events;
void *arg;
} receive_fd_args_t;
typedef struct {
#define MAGIC_SEND_FD 0xfbf8e2e0
int magic; /* MAGIC_SEND_FD */
int fd; /* fd to send over con */
} send_fd_args_t;
static void _validate_pctl_type(pollctl_fd_type_t type)
{
xassert(type > PCTL_TYPE_INVALID);
xassert(type < PCTL_TYPE_INVALID_MAX);
}
extern const char *conmgr_con_type_string(conmgr_con_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(con_types); i++)
if (con_types[i].type == type)
return con_types[i].string;
fatal_abort("invalid type");
}
static const char *_con_flag_string(con_flags_t flag)
{
for (int i = 0; i < ARRAY_SIZE(con_flags); i++)
if (con_flags[i].flag == flag)
return con_flags[i].string;
fatal_abort("invalid type");
}
extern char *con_flags_string(const con_flags_t flags)
{
char *str = NULL, *at = NULL;
uint32_t matched = 0;
if (flags == FLAG_NONE)
return xstrdup(_con_flag_string(FLAG_NONE));
/* skip FLAG_NONE */
for (int i = 1; i < ARRAY_SIZE(con_flags); i++) {
if ((con_flags[i].flag & flags) == con_flags[i].flag) {
xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""),
con_flags[i].string);
matched |= con_flags[i].flag;
}
}
if (flags ^ matched)
xstrfmtcatat(str, &at, "%s0x%08"PRIx32, (str ? "|" : ""),
(flags ^ matched));
return str;
}
/*
* Close all connections (for_each)
* NOTE: must hold mgr.mutex
*/
static int _close_con_for_each(void *x, void *arg)
{
conmgr_fd_t *con = x;
close_con(true, con);
return 1;
}
/* mgr.mutex must be locked when calling this function */
extern void close_all_connections(void)
{
/* close all connections */
list_for_each(mgr.connections, _close_con_for_each, NULL);
list_for_each(mgr.listen_conns, _close_con_for_each, NULL);
}
extern void work_close_con(conmgr_callback_args_t conmgr_args, void *arg)
{
close_con(false, conmgr_args.con);
}
/*
* Stop reading from connection but write out the remaining buffer and finish
* any queued work
*/
extern void close_con(bool locked, conmgr_fd_t *con)
{
int input_fd = -1;
bool is_same_fd, is_socket, is_listen;
if (!locked)
slurm_mutex_lock(&mgr.mutex);
if (con->input_fd < 0) {
xassert(con_flag(con, FLAG_READ_EOF) ||
con_flag(con, FLAG_IS_LISTEN));
xassert(!con_flag(con, FLAG_CAN_READ) ||
con_flag(con, FLAG_IS_LISTEN));
if (!locked)
slurm_mutex_unlock(&mgr.mutex);
log_flag(CONMGR, "%s: [%s] ignoring duplicate close request",
__func__, con->name);
return;
}
log_flag(CONMGR, "%s: [%s] closing input", __func__, con->name);
/*
* Stop polling read/write to input fd to allow handle_connection() to
* select what needs to be monitored
*/
con_set_polling(con, PCTL_TYPE_NONE, __func__);
/* mark it as EOF even if it hasn't */
con_set_flag(con, FLAG_READ_EOF);
con_unset_flag(con, FLAG_CAN_READ);
/* drop any unprocessed input buffer */
if (con->in)
set_buf_offset(con->in, 0);
if (con->tls_in)
set_buf_offset(con->tls_in, 0);
is_same_fd = (con->input_fd == con->output_fd);
is_socket = con_flag(con, FLAG_IS_SOCKET);
is_listen = con_flag(con, FLAG_IS_LISTEN);
input_fd = con->input_fd;
con->input_fd = -1;
EVENT_SIGNAL(&mgr.watch_sleep);
if (!locked)
slurm_mutex_unlock(&mgr.mutex);
/* unlink listener sockets to avoid leaving ghost socket */
if (is_listen && (con->address.ss_family == AF_LOCAL)) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
if (unlink(un->sun_path))
error("%s: [%s] unable to unlink %s: %m",
__func__, con->name, un->sun_path);
else
log_flag(CONMGR, "%s: [%s] unlinked %s",
__func__, con->name, un->sun_path);
}
if (is_listen || !is_same_fd) {
fd_close(&input_fd);
} else if (is_socket && shutdown(input_fd, SHUT_RD)) {
/* shutdown input on sockets */
log_flag(CONMGR, "%s: [%s] unable to shutdown incoming socket half: %m",
__func__, con->name);
}
}
static char *_resolve_tty_name(int fd)
{
char buf[PATH_MAX] = {0};
if (ttyname_r(fd, buf, (sizeof(buf) - 1))) {
log_flag(CONMGR, "%s: unable to resolve tty at fd:%d: %m",
__func__, fd);
return NULL;
}
return xstrdup(buf);
}
static char *_resolve_fd(int fd, struct stat *stat_ptr)
{
char *name = NULL;
if (S_ISSOCK(stat_ptr->st_mode)) {
slurm_addr_t addr = {0};
if (!slurm_get_stream_addr(fd, &addr) &&
(addr.ss_family != AF_UNSPEC) &&
(name = sockaddr_to_string(&addr, sizeof(addr))))
return name;
}
if ((name = fd_resolve_path(fd)))
return name;
if (S_ISFIFO(stat_ptr->st_mode))
return xstrdup_printf("pipe");
if (S_ISCHR(stat_ptr->st_mode)) {
if (isatty(fd) && (name = _resolve_tty_name(fd)))
return name;
#if defined(__linux__)
return xstrdup_printf("device:%u.%u", major(stat_ptr->st_dev),
minor(stat_ptr->st_dev));
#else /* !__linux__ */
return xstrdup_printf("device:0x%"PRIx64, stat_ptr->st_dev);
#endif /* !__linux__ */
}
#if defined(__linux__)
if (S_ISBLK(stat_ptr->st_mode))
return xstrdup_printf("block:%u.%u", major(stat_ptr->st_dev),
minor(stat_ptr->st_dev));
#endif /* __linux__ */
return NULL;
}
/* set connection name if one was not resolved already */
static void _set_connection_name(conmgr_fd_t *con, struct stat *in_stat,
struct stat *out_stat)
{
xassert(con);
xassert(!con->name);
char *in_str = NULL, *out_str = NULL;
const bool has_in = (con->input_fd >= 0);
const bool has_out = (con->output_fd >= 0);
bool is_same = (con->input_fd == con->output_fd);
if (!has_in && !has_out) {
con->name = xstrdup("INVALID");
return;
}
/* grab socket peer if possible */
if (con_flag(con, FLAG_IS_SOCKET) && has_out)
out_str = fd_resolve_peer(con->output_fd);
if (has_out && !out_str)
out_str = _resolve_fd(con->output_fd, out_stat);
if (has_in)
in_str = _resolve_fd(con->input_fd, in_stat);
/* avoid "->" syntax if same on both sides */
if (in_str && out_str && !xstrcmp(in_str, out_str)) {
is_same = true;
xfree(out_str);
}
if (is_same) {
xstrfmtcat(con->name, "%s(fd:%d)", in_str,
con->input_fd);
} else if (has_in && has_out) {
xstrfmtcat(con->name, "%s(fd:%d)->%s(fd:%d)", in_str,
con->input_fd, out_str, con->output_fd);
} else if (has_in && !has_out) {
xstrfmtcat(con->name, "%s(fd:%d)->()", in_str, con->input_fd);
} else if (!has_in && has_out) {
xstrfmtcat(con->name, "()->%s(fd:%d)", out_str, con->output_fd);
} else {
xassert(false);
}
xfree(out_str);
xfree(in_str);
}
static void _check_con_type(conmgr_fd_t *con, conmgr_con_type_t type)
{
#ifndef NDEBUG
if (type == CON_TYPE_RAW) {
/* must have on_data() defined */
xassert(con->events->on_data);
} else if (type == CON_TYPE_RPC) {
/* must have on_msg() defined */
xassert(con->events->on_msg);
} else {
fatal_abort("invalid type");
}
#endif /* !NDEBUG */
}
extern int fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
_check_con_type(con, type);
if (con->type == type) {
log_flag(CONMGR, "%s: [%s] ignoring unchanged type: %s",
__func__, con->name, conmgr_con_type_string(type));
return SLURM_SUCCESS;
}
log_flag(CONMGR, "%s: [%s] changing type: %s->%s pending_reads=%u pending_writes=%u",
__func__, con->name, conmgr_con_type_string(con->type),
conmgr_con_type_string(type),
con->in ? get_buf_offset(con->in) : 0,
list_count(con->out));
/* Always set TCP_NODELAY for Slurm RPC connections */
if (con->type == CON_TYPE_RPC)
con_set_flag(con, FLAG_TCP_NODELAY);
con->type = type;
if (con_flag(con, FLAG_IS_SOCKET) && con_flag(con, FLAG_TCP_NODELAY) &&
(con->output_fd >= 0)) {
int rc;
if ((rc = net_set_nodelay(con->output_fd, true, NULL))) {
log_flag(CONMGR, "%s: [%s] unable to set TCP_NODELAY: %s",
__func__, con->name, slurm_strerror(rc));
return rc;
}
}
return SLURM_SUCCESS;
}
extern int conmgr_fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type)
{
int rc;
slurm_mutex_lock(&mgr.mutex);
rc = fd_change_mode(con, type);
/* wake up watch() to send along any pending data */
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
extern int add_connection(conmgr_con_type_t type,
conmgr_fd_t *source, int input_fd,
int output_fd,
const conmgr_events_t *events,
conmgr_con_flags_t flags,
const slurm_addr_t *addr,
socklen_t addrlen, bool is_listen,
const char *unix_socket_path, void *tls_conn,
void *arg)
{
struct stat in_stat = { 0 };
struct stat out_stat = { 0 };
conmgr_fd_t *con = NULL;
bool set_keep_alive, is_socket, is_fifo, is_chr;
const bool has_in = (input_fd >= 0);
const bool has_out = (output_fd >= 0);
const bool is_same = (input_fd == output_fd);
const size_t unix_socket_path_len =
(unix_socket_path ? (strlen(unix_socket_path) + 1): 0);
static const size_t unix_socket_path_max =
sizeof(((struct sockaddr_un *) NULL)->sun_path);
if (unix_socket_path_len &&
(unix_socket_path_len > unix_socket_path_max)) {
log_flag(CONMGR, "%s: Unix domain socket path too long %zu/%zu: %s",
__func__, unix_socket_path_len, unix_socket_path_max,
unix_socket_path);
return ENAMETOOLONG;
}
/* verify FD is valid and still open */
if (has_in && fstat(input_fd, &in_stat)) {
log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, input_fd);
return SLURM_COMMUNICATIONS_INVALID_INCOMING_FD;
}
if (has_out && fstat(output_fd, &out_stat)) {
log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, output_fd);
return SLURM_COMMUNICATIONS_INVALID_OUTGOING_FD;
}
if (!has_in && !has_out) {
log_flag(CONMGR, "%s: refusing connection without input or output fd",
__func__);
return SLURM_COMMUNICATIONS_INVALID_FD;
}
is_socket = (has_in && S_ISSOCK(in_stat.st_mode)) ||
(has_out && S_ISSOCK(out_stat.st_mode));
is_fifo = (has_in && S_ISFIFO(in_stat.st_mode)) ||
(has_out && S_ISFIFO(out_stat.st_mode));
is_chr = (has_in && S_ISCHR(in_stat.st_mode)) ||
(has_out && S_ISCHR(out_stat.st_mode));
set_keep_alive = !unix_socket_path && is_socket && !is_listen;
/* all connections are non-blocking */
if (has_in) {
if (set_keep_alive)
net_set_keep_alive(input_fd);
fd_set_nonblocking(input_fd);
}
if (!is_same && has_out) {
fd_set_nonblocking(output_fd);
if (set_keep_alive)
net_set_keep_alive(output_fd);
}
con = xmalloc(sizeof(*con));
*con = (conmgr_fd_t){
.magic = MAGIC_CON_MGR_FD,
.address = {
.ss_family = AF_UNSPEC
},
.input_fd = input_fd,
.output_fd = output_fd,
.events = events,
.mss = NO_VAL,
.work = list_create(NULL),
.write_complete_work = list_create(NULL),
.new_arg = arg,
.type = CON_TYPE_NONE,
.polling_input_fd = PCTL_TYPE_NONE,
.polling_output_fd = PCTL_TYPE_NONE,
/* Set flags not related to connection state tracking */
.flags = (flags & ~FLAGS_MASK_STATE),
};
/* save if connection is a socket type to avoid calling fstat() again */
con_assign_flag(con, FLAG_IS_SOCKET, is_socket);
con_assign_flag(con, FLAG_IS_LISTEN, is_listen);
con_assign_flag(con, FLAG_READ_EOF, !has_in);
con_assign_flag(con, FLAG_IS_FIFO, is_fifo);
con_assign_flag(con, FLAG_IS_CHR, is_chr);
/*
* Check for TLS fingerprint if connection is not already flagged as a
* TLS connection and the fingerprint callback is present.
*/
con_assign_flag(con, FLAG_WAIT_ON_FINGERPRINT,
(events->on_fingerprint &&
!con_flag(con, FLAG_TLS_CLIENT) &&
!con_flag(con, FLAG_TLS_SERVER)));
if (!is_listen) {
con->in = create_buf(xmalloc(BUFFER_START_SIZE),
BUFFER_START_SIZE);
con->out = list_create((ListDelF) free_buf);
}
/* listen on unix socket */
if (!unix_socket_path && source &&
(source->address.ss_family == AF_LOCAL)) {
struct sockaddr_un *un = (struct sockaddr_un *) &source->address;
unix_socket_path = un->sun_path;
}
if (unix_socket_path) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
xassert(unix_socket_path_len <= unix_socket_path_max);
xassert(con_flag(con, FLAG_IS_SOCKET));
xassert(addr->ss_family == AF_LOCAL);
con->address.ss_family = AF_LOCAL;
strlcpy(un->sun_path, unix_socket_path, unix_socket_path_len);
} else if (is_socket && (addrlen > 0) && addr) {
memcpy(&con->address, addr, addrlen);
}
if (is_socket && (con->address.ss_family == AF_UNSPEC)) {
int rc = SLURM_SUCCESS;
int fd = (has_out ? output_fd : input_fd);
if (!is_listen &&
(rc = slurm_get_peer_addr(fd, &con->address))) {
log_flag(CONMGR, "%s: [fd:%d] Unable to resolve remote host: %s",
__func__, fd, slurm_strerror(rc));
} else if (slurm_get_stream_addr(fd, &con->address)) {
log_flag(CONMGR, "%s: [fd:%d] Unable to resolve bind()ed IP: %m",
__func__, fd);
}
}
if (has_out) {
int bytes = -1;
if (!fd_get_buffered_output_bytes(con->output_fd, &bytes,
con->name)) {
xassert(bytes >= 0);
con_set_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER);
}
}
_set_connection_name(con, &in_stat, &out_stat);
fd_change_mode(con, type);
if (con_flag(con, FLAG_WATCH_CONNECT_TIMEOUT))
con->last_read = timespec_now();
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: [%s] new connection input_fd=%u output_fd=%u flags=%s",
__func__, con->name, input_fd, output_fd, flags);
xfree(flags);
}
if (tls_conn)
tls_adopt(con, tls_conn);
slurm_mutex_lock(&mgr.mutex);
if (is_listen) {
xassert(con->output_fd <= 0);
list_append(mgr.listen_conns, con);
} else {
list_append(mgr.connections, con);
}
/* Attempt to handle connection state immediately */
handle_connection(true, con);
/*
* interrupt poll () and wake up watch() to eventually re-examine new
* connection.
*/
pollctl_interrupt(__func__);
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
return SLURM_SUCCESS;
}
extern void wrap_on_connection(conmgr_callback_args_t conmgr_args, void *arg)
{
conmgr_fd_t *con = conmgr_args.con;
if (con_flag(con, FLAG_IS_LISTEN)) {
log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_listen_connect);
arg = con->events->on_listen_connect(con, con->new_arg);
log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_listen_connect,
(uintptr_t) arg);
} else {
log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_connection);
arg = con->events->on_connection(con, con->new_arg);
log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR,
__func__, con->name,
(uintptr_t) con->events->on_connection,
(uintptr_t) arg);
}
if (!arg) {
error("%s: [%s] closing connection due to NULL return from on_connection",
__func__, con->name);
close_con(false, con);
return;
}
slurm_mutex_lock(&mgr.mutex);
con->arg = arg;
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
}
extern int conmgr_process_fd(conmgr_con_type_t type, int input_fd,
int output_fd, const conmgr_events_t *events,
conmgr_con_flags_t flags,
const slurm_addr_t *addr, socklen_t addrlen,
void *tls_conn, void *arg)
{
return add_connection(type, NULL, input_fd, output_fd, events,
flags, addr, addrlen, false, NULL, tls_conn, arg);
}
extern int conmgr_process_fd_listen(int fd, conmgr_con_type_t type,
const conmgr_events_t *events,
conmgr_con_flags_t flags, void *arg)
{
return add_connection(type, NULL, fd, -1, events, flags, NULL,
0, true, NULL, NULL, arg);
}
static void _receive_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
receive_fd_args_t *args = arg;
conmgr_fd_t *src = conmgr_args.con;
int fd = -1;
xassert(args->magic == MAGIC_RECEIVE_FD);
xassert(src->magic == MAGIC_CON_MGR_FD);
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
log_flag(CONMGR, "%s: [%s] Canceled receive new file descriptor",
__func__, src->name);
} else if (con_flag(src, FLAG_READ_EOF)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d",
__func__, src->name, src->input_fd);
} else if (src->input_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d",
__func__, src->name, src->input_fd);
} else if ((fd = receive_fd_over_socket(src->input_fd)) < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on input_fd=%d",
__func__, src->name, src->input_fd);
/*
* Close source as receive_fd_over_socket() failed and
* connection is now in an unknown state
*/
close_con(false, src);
} else if (add_connection(args->type, NULL, fd, fd, args->events,
CON_FLAG_NONE, NULL, 0, false, NULL,
NULL, args->arg) != SLURM_SUCCESS) {
/*
* Error already logged by add_connection() and there is no
* reason to assume that failing is due to the state of src.
*/
}
args->magic = ~MAGIC_RECEIVE_FD;
xfree(args);
}
extern int conmgr_queue_receive_fd(conmgr_fd_t *src, conmgr_con_type_t type,
const conmgr_events_t *events, void *arg)
{
int rc = SLURM_ERROR;
slurm_mutex_lock(&mgr.mutex);
xassert(src->magic == MAGIC_CON_MGR_FD);
xassert(type > CON_TYPE_NONE);
xassert(type < CON_TYPE_MAX);
/* Reject obviously invalid states immediately */
if (!con_flag(src, FLAG_IS_SOCKET)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on non-socket",
__func__, src->name);
rc = EAFNOSUPPORT;
} else if (con_flag(src, FLAG_READ_EOF)) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d",
__func__, src->name, src->input_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else if (src->input_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d",
__func__, src->name, src->input_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else {
receive_fd_args_t *args = xmalloc_nz(sizeof(*args));
*args = (receive_fd_args_t) {
.magic = MAGIC_RECEIVE_FD,
.type = type,
.events = events,
.arg = arg,
};
add_work_con_fifo(true, src, _receive_fd, args);
rc = SLURM_SUCCESS;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static void _send_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
send_fd_args_t *args = arg;
conmgr_fd_t *con = conmgr_args.con;
int fd = args->fd;
xassert(args->magic == MAGIC_SEND_FD);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) {
log_flag(CONMGR, "%s: [%s] Canceled sending file descriptor %d.",
__func__, con->name, fd);
} else if (con->output_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d",
__func__, con->name, fd, con->output_fd);
} else {
send_fd_over_socket(con->output_fd, fd);
log_flag(CONMGR, "%s: [%s] Sent file descriptor %d over output_fd=%d",
__func__, con->name, fd, con->output_fd);
}
/* always close the file descriptor in this process to avoid leaking */
fd_close(&fd);
args->magic = ~MAGIC_SEND_FD;
xfree(args);
}
extern int conmgr_queue_send_fd(conmgr_fd_t *con, int fd)
{
int rc = SLURM_ERROR;
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send invalid file descriptor %d",
__func__, con->name, fd);
rc = EINVAL;
} else if (!con_flag(con, FLAG_IS_SOCKET)) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over non-socket",
__func__, con->name, fd);
rc = EAFNOSUPPORT;
} else if (con->output_fd < 0) {
log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d",
__func__, con->name, fd, con->output_fd);
rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR;
} else {
send_fd_args_t *args = xmalloc_nz(sizeof(*args));
*args = (send_fd_args_t) {
.magic = MAGIC_SEND_FD,
.fd = fd,
};
add_work_con_fifo(true, con, _send_fd, args);
rc = SLURM_SUCCESS;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static void _deferred_close_fd(conmgr_callback_args_t conmgr_args, void *arg)
{
conmgr_fd_t *con = conmgr_args.con;
slurm_mutex_lock(&mgr.mutex);
if (con_flag(con, FLAG_WORK_ACTIVE)) {
slurm_mutex_unlock(&mgr.mutex);
conmgr_queue_close_fd(con);
} else {
close_con(true, con);
slurm_mutex_unlock(&mgr.mutex);
}
}
/* Caller must hold mgr.mutex lock */
static void _close_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (!con_flag(con, FLAG_WORK_ACTIVE)) {
/*
* Defer request to close connection until connection is no
* longer actively doing work as closing connection would change
* several variables guaranteed to not change while work is
* active.
*/
add_work_con_fifo(true, con, _deferred_close_fd, con);
} else {
close_con(true, con);
}
}
extern void conmgr_queue_close_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
_close_fd(con);
slurm_mutex_unlock(&mgr.mutex);
}
extern void conmgr_con_queue_close_free(conmgr_fd_ref_t **ref_ptr)
{
conmgr_fd_ref_t *ref = NULL;
xassert(ref_ptr);
/* skip if already released */
if (!(ref = *ref_ptr))
return;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
_close_fd(ref->con);
fd_free_ref(ref_ptr);
slurm_mutex_unlock(&mgr.mutex);
}
static int _match_socket_address(void *x, void *key)
{
conmgr_fd_t *con = x;
const slurm_addr_t *addr1 = key;
const slurm_addr_t *addr2 = &con->address;
xassert(con->magic == MAGIC_CON_MGR_FD);
if (addr1->ss_family != addr2->ss_family)
return 0;
switch (addr1->ss_family) {
case AF_INET:
{
const struct sockaddr_in *a1 =
(const struct sockaddr_in *) addr1;
const struct sockaddr_in *a2 =
(const struct sockaddr_in *) addr2;
if (a1->sin_port != a2->sin_port)
return 0;
return !memcmp(&a1->sin_addr.s_addr,
&a2->sin_addr.s_addr,
sizeof(a2->sin_addr.s_addr));
}
case AF_INET6:
{
const struct sockaddr_in6 *a1 =
(const struct sockaddr_in6 *) addr1;
const struct sockaddr_in6 *a2 =
(const struct sockaddr_in6 *) addr2;
if (a1->sin6_port != a2->sin6_port)
return 0;
if (a1->sin6_scope_id != a2->sin6_scope_id)
return 0;
return !memcmp(&a1->sin6_addr.s6_addr,
&a2->sin6_addr.s6_addr,
sizeof(a2->sin6_addr.s6_addr));
}
case AF_UNIX:
{
const struct sockaddr_un *a1 =
(const struct sockaddr_un *) addr1;
const struct sockaddr_un *a2 =
(const struct sockaddr_un *) addr2;
return !xstrcmp(a1->sun_path, a2->sun_path);
}
default:
{
fatal_abort("Unexpected ss family type %u",
(uint32_t) addr1->ss_family);
}
}
/* Unreachable */
fatal_abort("This should never happen");
}
static bool _is_listening(const slurm_addr_t *addr, socklen_t addrlen)
{
/* use address to ensure memory size is correct */
slurm_addr_t address = {0};
memcpy(&address, addr, addrlen);
if (list_find_first_ro(mgr.listen_conns, _match_socket_address,
&address))
return true;
return false;
}
static int _add_unix_listener(conmgr_con_type_t type, conmgr_con_flags_t flags,
const char *listen_on, const char *unixsock,
const conmgr_events_t *events, void *arg)
{
slurm_addr_t addr = { 0 };
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
int rc = EINVAL;
if (fd < 0)
fatal("%s: socket() failed: %m", __func__);
addr = sockaddr_from_unix_path(unixsock);
if (addr.ss_family != AF_UNIX)
fatal("%s: [%s] Invalid Unix socket path: %s",
__func__, listen_on, unixsock);
log_flag(CONMGR, "%s: [%pA] attempting to bind() and listen() UNIX socket",
__func__, &addr);
if (unlink(unixsock) && (errno != ENOENT))
error("Error unlink(%s): %m", unixsock);
/* bind() will EINVAL if socklen=sizeof(addr) */
if ((rc = bind(fd, (const struct sockaddr *) &addr,
sizeof(struct sockaddr_un))))
fatal("%s: [%s] Unable to bind UNIX socket: %m",
__func__, listen_on);
fd_set_oob(fd, 0);
rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG);
if (rc < 0)
fatal("%s: [%s] unable to listen(): %m",
__func__, listen_on);
return add_connection(type, NULL, fd, -1, events, flags, &addr,
sizeof(addr), true, unixsock, NULL, arg);
}
static int _add_socket_listener(conmgr_con_type_t type,
conmgr_con_flags_t flags, const char *listen_on,
url_t *url, const conmgr_events_t *events,
void *arg)
{
int rc = SLURM_SUCCESS;
struct addrinfo *addrlist = NULL;
/* resolve out the host and port if provided */
if (!(addrlist = xgetaddrinfo(url->host, url->port)))
fatal("%s: Unable to listen on %s:%s(%s): %m",
__func__, url->host, url->port, listen_on);
/*
* Create a socket for every address returned
* ipv6 clone of net_stream_listen_ports()
*/
for (struct addrinfo *addr = addrlist; !rc && addr != NULL;
addr = addr->ai_next) {
/* clone the address since it will be freed at
* end of this loop
*/
int fd;
int one = 1;
if (_is_listening((const slurm_addr_t *) addr->ai_addr,
addr->ai_addrlen)) {
verbose("%s: ignoring duplicate listen request for %pA",
__func__, (const slurm_addr_t *) addr->ai_addr);
continue;
}
fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC,
addr->ai_protocol);
if (fd < 0)
fatal("%s: [%s] Unable to create socket: %m",
__func__, addrinfo_to_string(addr));
/*
* activate socket reuse to avoid annoying timing issues
* with daemon restarts
*/
if (setsockopt(fd, addr->ai_socktype, SO_REUSEADDR,
&one, sizeof(one)))
fatal("%s: [%s] setsockopt(SO_REUSEADDR) failed: %m",
__func__, addrinfo_to_string(addr));
if (bind(fd, addr->ai_addr, addr->ai_addrlen) != 0)
fatal("%s: [%s] Unable to bind socket: %m",
__func__, addrinfo_to_string(addr));
fd_set_oob(fd, 0);
rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG);
if (rc < 0)
fatal("%s: [%s] unable to listen(): %m",
__func__, addrinfo_to_string(addr));
rc = add_connection(type, NULL, fd, -1, events, flags,
(const slurm_addr_t *) addr->ai_addr,
addr->ai_addrlen, true, NULL, NULL, arg);
}
freeaddrinfo(addrlist);
return rc;
}
extern int conmgr_create_listen_socket(conmgr_con_type_t type,
conmgr_con_flags_t flags,
const char *listen_on,
const conmgr_events_t *events, void *arg)
{
int rc = SLURM_SUCCESS;
url_t url = URL_INITIALIZER;
buf_t buffer = {
.magic = BUF_MAGIC,
.head = (void *) listen_on,
.processed = strlen(listen_on),
.size = strlen(listen_on),
.shadow = true,
};
if ((rc = url_parser_g_parse(__func__, &buffer, &url)))
fatal("%s: Unable to parse %s: %s",
__func__, listen_on, slurm_strerror(rc));
switch (url.scheme) {
case URL_SCHEME_UNIX:
rc = _add_unix_listener(type, flags, listen_on, url.path,
events, arg);
break;
case URL_SCHEME_HTTPS:
flags |= CON_FLAG_TLS_SERVER;
/* fall through */
case URL_SCHEME_HTTP:
case URL_SCHEME_INVALID:
rc = _add_socket_listener(type, flags, listen_on, &url, events,
arg);
break;
case URL_SCHEME_INVALID_MAX:
fatal_abort("should never happen");
}
url_free_members(&url);
return rc;
}
static int _setup_listen_socket(void *x, void *arg)
{
const char *hostport = (const char *)x;
socket_listen_init_t *init = arg;
init->rc = conmgr_create_listen_socket(init->type, init->flags,
hostport, init->events,
init->arg);
return (init->rc ? SLURM_ERROR : SLURM_SUCCESS);
}
extern int conmgr_create_listen_sockets(conmgr_con_type_t type,
conmgr_con_flags_t flags,
list_t *hostports,
const conmgr_events_t *events,
void *arg)
{
socket_listen_init_t init = {
.events = events,
.arg = arg,
.type = type,
.flags = flags,
};
(void) list_for_each(hostports, _setup_listen_socket, &init);
return init.rc;
}
extern int conmgr_create_connect_socket(conmgr_con_type_t type,
conmgr_con_flags_t flags,
slurm_addr_t *addr, socklen_t addrlen,
const conmgr_events_t *events,
void *arg)
{
int fd = -1, rc = SLURM_ERROR;
//socklen_t bindlen = 0;
if (addr->ss_family == AF_UNIX) {
fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC), 0);
//bindlen = sizeof(struct sockaddr_un);
} else if ((addr->ss_family == AF_INET) ||
(addr->ss_family == AF_INET6)) {
fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC),
IPPROTO_TCP);
//bindlen = addrlen;
} else {
return EAFNOSUPPORT;
}
if (fd < 0) {
rc = errno;
log_flag(NET, "%s: [%pA] socket() failed: %s",
__func__, addr, slurm_strerror(rc));
return rc;
}
/* Set socket as non-blocking to avoid connect() blocking */
fd_set_nonblocking(fd);
log_flag(CONMGR, "%s: [%pA(fd:%d)] attempting to connect() new socket",
__func__, addr, fd);
again:
if ((rc = connect(fd, (const struct sockaddr *) addr, addrlen))) {
rc = errno;
if (rc == EINTR) {
bool shutdown;
slurm_mutex_lock(&mgr.mutex);
xassert(mgr.initialized);
shutdown = mgr.shutdown_requested;
slurm_mutex_unlock(&mgr.mutex);
if (shutdown) {
log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted during shutdown. Closing connection.",
__func__, addr, fd);
fd_close(&fd);
return SLURM_SUCCESS;
}
log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted. Retrying.",
__func__, addr, fd);
goto again;
}
if ((rc != EINPROGRESS) && (rc != EAGAIN) &&
(rc != EWOULDBLOCK)) {
log_flag(NET, "%s: [%pA(fd:%d)] connect() failed: %s",
__func__, addr, fd, slurm_strerror(rc));
fd_close(&fd);
return rc;
}
/* delayed connect() completion is expected */
}
return add_connection(type, NULL, fd, fd, events, flags, addr, addrlen,
false, NULL, NULL, arg);
}
extern int conmgr_get_fd_auth_creds(conmgr_fd_t *con,
uid_t *cred_uid, gid_t *cred_gid,
pid_t *cred_pid)
{
int fd, rc = ESLURM_NOT_SUPPORTED;
xassert(cred_uid);
xassert(cred_gid);
xassert(cred_pid);
if (!con || !cred_uid || !cred_gid || !cred_pid)
return EINVAL;
xassert(con->magic == MAGIC_CON_MGR_FD);
if (((fd = con->input_fd) == -1) && ((fd = con->output_fd) == -1))
return SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR;
#if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(__NetBSD__)
struct ucred cred = { 0 };
socklen_t len = sizeof(cred);
if (!getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cred, &len)) {
*cred_uid = cred.uid;
*cred_gid = cred.gid;
*cred_pid = cred.pid;
return SLURM_SUCCESS;
} else {
rc = errno;
}
#else
struct xucred cred = { 0 };
socklen_t len = sizeof(cred);
if (!getsockopt(fd, 0, LOCAL_PEERCRED, &cred, &len)) {
*cred_uid = cred.cr_uid;
*cred_gid = cred.cr_groups[0];
*cred_pid = cred.cr_pid;
return SLURM_SUCCESS;
} else {
rc = errno;
}
#endif
return rc;
}
extern const char *conmgr_fd_get_name(const conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con->name && con->name[0]);
return con->name;
}
extern const char *conmgr_con_get_name(conmgr_fd_ref_t *ref)
{
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con->magic == MAGIC_CON_MGR_FD);
return conmgr_fd_get_name(ref->con);
}
extern conmgr_fd_status_t conmgr_fd_get_status(conmgr_fd_t *con)
{
conmgr_fd_status_t status = {
.is_socket = con_flag(con, FLAG_IS_SOCKET),
.unix_socket = NULL,
.is_listen = con_flag(con, FLAG_IS_LISTEN),
.read_eof = con_flag(con, FLAG_READ_EOF),
.is_connected = con_flag(con, FLAG_IS_CONNECTED),
};
if (con->address.ss_family == AF_LOCAL) {
struct sockaddr_un *un = (struct sockaddr_un *) &con->address;
status.unix_socket = un->sun_path;
}
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con_flag(con, FLAG_WORK_ACTIVE));
return status;
}
/*
* Find by matching fd to connection
*/
static int _find_by_fd(void *x, void *key)
{
conmgr_fd_t *con = x;
int fd = *(int *)key;
return (con->input_fd == fd) || (con->output_fd == fd);
}
extern conmgr_fd_t *con_find_by_fd(int fd)
{
conmgr_fd_t *con;
if ((con = list_find_first(mgr.connections, _find_by_fd, &fd)))
return con;
if ((con = list_find_first(mgr.listen_conns, _find_by_fd, &fd)))
return con;
/* mgr.complete_conns don't have input_fd or output_fd */
return NULL;
}
extern void con_close_on_poll_error(conmgr_fd_t *con, int fd)
{
if (con_flag(con, FLAG_IS_SOCKET)) {
/* Ask kernel for socket error */
int rc = SLURM_ERROR, err = SLURM_ERROR;
if ((rc = fd_get_socket_error(fd, &err)))
error("%s: [%s] error while getting socket error: %s",
__func__, con->name, slurm_strerror(rc));
else if (err)
error("%s: [%s] socket error encountered while polling: %s",
__func__, con->name, slurm_strerror(err));
}
/*
* Socket must not continue to be considered valid to avoid a
* infinite calls to poll() which will immediately fail. Close
* the relevant file descriptor and remove from connection.
*/
close_con(true, con);
}
static int _set_fd_polling(int fd, pollctl_fd_type_t *old,
pollctl_fd_type_t new, const char *con_name,
const char *caller)
{
if (*old == PCTL_TYPE_UNSUPPORTED)
return SLURM_SUCCESS;
if (*old == new)
return SLURM_SUCCESS;
if (new == PCTL_TYPE_NONE) {
int rc = SLURM_SUCCESS;
if ((*old != PCTL_TYPE_NONE) &&
!(rc = pollctl_unlink_fd(fd, con_name, caller)))
*old = new;
return rc;
}
if (*old != PCTL_TYPE_NONE) {
int rc;
if (!(rc = pollctl_relink_fd(fd, new, con_name, caller)))
*old = new;
return rc;
} else {
int rc = pollctl_link_fd(fd, new, con_name, caller);
if (!rc)
*old = new;
return rc;
}
}
static void _log_set_polling(conmgr_fd_t *con, bool has_in, bool has_out,
pollctl_fd_type_t type, pollctl_fd_type_t in_type,
pollctl_fd_type_t out_type, const char *caller)
{
char *log = NULL, *at = NULL;
const char *op = "maintain";
if (!(slurm_conf.debug_flags & DEBUG_FLAG_CONMGR))
return;
if (has_in) {
const char *old, *new;
old = pollctl_type_to_string(con->polling_input_fd);
new = pollctl_type_to_string(in_type);
xstrfmtcatat(log, &at, " in[%d]:%s", con->input_fd, old);
if (in_type != con->polling_input_fd) {
xstrfmtcatat(log, &at, "->%s", new);
op = "changing";
}
}
if (has_out) {
const char *old, *new;
old = pollctl_type_to_string(con->polling_output_fd);
new = pollctl_type_to_string(out_type);
xstrfmtcatat(log, &at, " out[%d]:%s", con->output_fd, old);
if (out_type != con->polling_output_fd) {
xstrfmtcatat(log, &at, "->%s", new);
op = "changing";
}
}
log_flag(CONMGR, "%s->%s: [%s] %s polling:%s%s",
caller, XSTRINGIFY(con_set_polling), con->name, op,
pollctl_type_to_string(type), (log ? log : ""));
xfree(log);
}
static void _on_change_polling_fail(conmgr_fd_t *con, int rc,
pollctl_fd_type_t old_type,
pollctl_fd_type_t new_type,
const char *fd_name, const int fd,
pollctl_fd_type_t *dst, const char *caller)
{
error("%s->%s: [%s] closing connection after change polling %s->%s for %s=%d failed: %s",
caller, __func__, con->name, pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd,
slurm_strerror(rc));
if (rc == EBADF) {
/* Remove defunct FD immediately */
if (con->input_fd == fd) {
con->input_fd = -1;
con->polling_input_fd = PCTL_TYPE_UNSUPPORTED;
con_unset_flag(con, FLAG_CAN_READ);
con_set_flag(con, FLAG_READ_EOF);
}
if (con->output_fd == fd) {
con->output_fd = -1;
con->polling_output_fd = PCTL_TYPE_UNSUPPORTED;
con_unset_flag(con, FLAG_CAN_WRITE);
con_unset_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER);
}
} else {
/* Attempt graceful closing of connection */
*dst = PCTL_TYPE_UNSUPPORTED;
}
close_con(true, con);
close_con_output(true, con);
}
static void _on_change_polling_rc(conmgr_fd_t *con, int rc,
pollctl_fd_type_t old_type,
pollctl_fd_type_t new_type, bool input,
const char *caller)
{
const char *fd_name = (input ? "input_fd" : "output_fd");
const int fd = (input ? con->input_fd : con->output_fd);
pollctl_fd_type_t *dst =
(input ? &con->polling_input_fd : &con->polling_output_fd);
switch (rc) {
case EEXIST:
/*
* poll() is already monitoring this file descriptor but conmgr
* didn't ask for this poll()ing. conmgr has no idea what mode
* poll() is already monitoring, so instead change to relink to
* to set the correct mode.
*/
log_flag(CONMGR, "%s->%s: [%s] forcing changed polling %s->%s for %s=%d",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
if ((rc = pollctl_relink_fd(fd, new_type, con->name, __func__)))
_on_change_polling_fail(con, rc, old_type, new_type,
fd_name, fd, dst, caller);
else
*dst = new_type;
break;
case ENOENT:
log_flag(CONMGR, "%s->%s: [%s] ignoring request to change polling %s->%s for %s=%d",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
*dst = PCTL_TYPE_NONE;
break;
case EPERM:
log_flag(CONMGR, "%s->%s: [%s] polling %s->%s for %s=%d not supported by kernel",
caller, __func__, con->name,
pollctl_type_to_string(old_type),
pollctl_type_to_string(new_type), fd_name, fd);
*dst = PCTL_TYPE_UNSUPPORTED;
break;
default:
_on_change_polling_fail(con, rc, old_type, new_type, fd_name,
fd, dst, caller);
break;
}
}
extern void con_set_polling(conmgr_fd_t *con, pollctl_fd_type_t type,
const char *caller)
{
int has_in, has_out, in, out, is_same;
int rc_in = SLURM_SUCCESS, rc_out = SLURM_SUCCESS;
pollctl_fd_type_t in_type = PCTL_TYPE_NONE, out_type = PCTL_TYPE_NONE;
_validate_pctl_type(type);
_validate_pctl_type(con->polling_input_fd);
_validate_pctl_type(con->polling_output_fd);
in = con->input_fd;
has_in = (in >= 0);
out = con->output_fd;
has_out = (out >= 0);
is_same = (con->input_fd == con->output_fd);
if (!has_in && !has_out) {
xassert(con->polling_input_fd == PCTL_TYPE_NONE);
xassert(con->polling_output_fd == PCTL_TYPE_NONE);
xassert(type == PCTL_TYPE_NONE);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: skipping connection flags=%s",
__func__, flags);
xfree(flags);
}
return;
}
/*
* Map type to type per in/out. The in/out types are initialized to
* PCTL_TYPE_NONE above.
*/
switch (type) {
case PCTL_TYPE_UNSUPPORTED:
fatal_abort("should never happen");
case PCTL_TYPE_NONE:
break;
case PCTL_TYPE_CONNECTED:
if (is_same) {
in_type = PCTL_TYPE_CONNECTED;
} else {
in_type = PCTL_TYPE_CONNECTED;
out_type = PCTL_TYPE_CONNECTED;
}
break;
case PCTL_TYPE_READ_ONLY:
in_type = PCTL_TYPE_READ_ONLY;
break;
case PCTL_TYPE_READ_WRITE:
if (is_same) {
in_type = PCTL_TYPE_READ_WRITE;
} else {
in_type = PCTL_TYPE_READ_ONLY;
out_type = PCTL_TYPE_WRITE_ONLY;
}
break;
case PCTL_TYPE_WRITE_ONLY:
if (is_same) {
in_type = PCTL_TYPE_WRITE_ONLY;
} else {
out_type = PCTL_TYPE_WRITE_ONLY;
}
break;
case PCTL_TYPE_LISTEN:
xassert(con_flag(con, FLAG_IS_LISTEN));
in_type = PCTL_TYPE_LISTEN;
break;
case PCTL_TYPE_INVALID:
case PCTL_TYPE_INVALID_MAX:
fatal_abort("should never execute");
}
if (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED)
out_type = PCTL_TYPE_UNSUPPORTED;
if (con->polling_input_fd == PCTL_TYPE_UNSUPPORTED)
in_type = PCTL_TYPE_UNSUPPORTED;
_log_set_polling(con, has_in, has_out, type, in_type, out_type, caller);
if (is_same) {
/* same never link output_fd */
xassert((con->polling_output_fd == PCTL_TYPE_NONE) ||
(con->polling_output_fd == PCTL_TYPE_UNSUPPORTED));
rc_in = _set_fd_polling(in, &con->polling_input_fd, in_type,
con->name, caller);
} else {
if (has_in)
rc_in = _set_fd_polling(in, &con->polling_input_fd,
in_type, con->name, caller);
if (has_out)
rc_out = _set_fd_polling(out, &con->polling_output_fd,
out_type, con->name, caller);
}
if (rc_in)
_on_change_polling_rc(con, rc_in, con->polling_input_fd,
in_type, true, caller);
if (rc_out)
_on_change_polling_rc(con, rc_out, con->polling_output_fd,
out_type, false, caller);
}
extern int conmgr_queue_extract_con_fd(conmgr_fd_t *con,
conmgr_extract_fd_func_t func,
const char *func_name,
void *func_arg)
{
int rc = SLURM_ERROR;
xassert(con);
xassert(func);
if (!con || !func)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
if (con->extract) {
rc = EEXIST;
} else {
extract_fd_t *extract = xmalloc_nz(sizeof(*extract));
*extract = (extract_fd_t) {
.magic = MAGIC_EXTRACT_FD,
.func = func,
.func_name = func_name,
.func_arg = func_arg,
.input_fd = -1,
.output_fd = -1,
};
xassert(!con->extract);
con->extract = extract;
/* Disable all polling for this connection */
con_set_polling(con, PCTL_TYPE_NONE, __func__);
/* wake up watch to finish extraction */
EVENT_SIGNAL(&mgr.watch_sleep);
rc = SLURM_SUCCESS;
}
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static void _free_extract(extract_fd_t **extract_ptr)
{
extract_fd_t *extract = NULL;
SWAP(*extract_ptr, extract);
xassert(extract->magic == MAGIC_EXTRACT_FD);
extract->magic = ~MAGIC_EXTRACT_FD;
xfree(extract);
}
static void _wrap_on_extract(conmgr_callback_args_t conmgr_args, void *arg)
{
extract_fd_t *extract = arg;
xassert(extract->magic == MAGIC_EXTRACT_FD);
log_flag(CONMGR, "%s: calling %s() input_fd=%d output_fd=%d arg=0x%"PRIxPTR,
__func__, extract->func_name, extract->input_fd,
extract->output_fd, (uintptr_t) extract->func_arg);
extract->func(conmgr_args, extract->input_fd, extract->output_fd,
extract->tls_conn, extract->func_arg);
_free_extract(&extract);
/* wake up watch() to cleanup connection */
slurm_mutex_lock(&mgr.mutex);
EVENT_SIGNAL(&mgr.watch_sleep);
slurm_mutex_unlock(&mgr.mutex);
}
/* caller must hold mgr.mutex lock */
extern void extract_con_fd(conmgr_fd_t *con)
{
extract_fd_t *extract = NULL;
SWAP(extract, con->extract);
xassert(extract);
xassert(extract->magic == MAGIC_EXTRACT_FD);
/* Polling should already be disabled */
xassert((con->polling_input_fd == PCTL_TYPE_NONE) ||
(con->polling_input_fd == PCTL_TYPE_UNSUPPORTED));
xassert((con->polling_output_fd == PCTL_TYPE_NONE) ||
(con->polling_output_fd == PCTL_TYPE_UNSUPPORTED));
/* can't extract safely when work running or not connected */
xassert(!con_flag(con, FLAG_WORK_ACTIVE));
xassert(con_flag(con, FLAG_IS_CONNECTED));
xassert(!(con_flag(con, FLAG_TLS_SERVER) ||
con_flag(con, FLAG_TLS_CLIENT)) ||
con_flag(con, FLAG_IS_TLS_CONNECTED));
xassert(!con_flag(con, FLAG_WAIT_ON_FINGERPRINT));
xassert(!con_flag(con, FLAG_TLS_WAIT_ON_CLOSE));
xassert(!con_flag(con, FLAG_WAIT_ON_FINISH));
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: extracting input_fd=%d output_fd=%d func=%s() flags=%s",
__func__, con->input_fd, con->output_fd,
extract->func_name, flags);
xfree(flags);
}
/* clear all polling states */
con_set_flag(con, FLAG_READ_EOF);
con_unset_flag(con, FLAG_CAN_READ);
con_unset_flag(con, FLAG_CAN_WRITE);
con_unset_flag(con, FLAG_ON_DATA_TRIED);
/* assert input/outputs are empty */
xassert(!con->tls_out || list_is_empty(con->out));
xassert(!con->tls_in || !get_buf_offset(con->tls_in));
xassert(list_is_empty(con->out));
xassert(!get_buf_offset(con->in));
/* Extract TLS state (or fail) */
if (con->tls && tls_extract(con, extract)) {
_free_extract(&extract);
return;
}
/*
* take the file descriptors, replacing the file descriptors in
* con with invalid values initialized in conmgr_queue_extract_con_fd().
*/
SWAP(extract->input_fd, con->input_fd);
SWAP(extract->output_fd, con->output_fd);
/*
* Queue up work but not against the connection as we want watch() to
* cleanup the connection.
*/
add_work_fifo(true, _wrap_on_extract, extract);
}
static int _unquiesce_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (!con_flag(con, FLAG_QUIESCE))
return SLURM_SUCCESS;
con_unset_flag(con, FLAG_QUIESCE);
EVENT_SIGNAL(&mgr.watch_sleep);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: unquiesced connection flags=%s",
__func__, flags);
xfree(flags);
}
return SLURM_SUCCESS;
}
extern int conmgr_unquiesce_fd(conmgr_fd_t *con)
{
int rc;
if (!con)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
rc = _unquiesce_fd(con);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
static int _quiesce_fd(conmgr_fd_t *con)
{
xassert(con->magic == MAGIC_CON_MGR_FD);
if (con_flag(con, FLAG_QUIESCE))
return SLURM_SUCCESS;
con_set_flag(con, FLAG_QUIESCE);
con_set_polling(con, PCTL_TYPE_NONE, __func__);
EVENT_SIGNAL(&mgr.watch_sleep);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
char *flags = con_flags_string(con->flags);
log_flag(CONMGR, "%s: quiesced connection flags=%s",
__func__, flags);
xfree(flags);
}
return SLURM_SUCCESS;
}
extern int conmgr_quiesce_fd(conmgr_fd_t *con)
{
int rc;
if (!con)
return EINVAL;
slurm_mutex_lock(&mgr.mutex);
rc = _quiesce_fd(con);
slurm_mutex_unlock(&mgr.mutex);
return rc;
}
extern bool conmgr_fd_is_output_open(conmgr_fd_t *con)
{
bool open;
xassert(con->magic == MAGIC_CON_MGR_FD);
slurm_mutex_lock(&mgr.mutex);
open = (con->output_fd >= 0);
slurm_mutex_unlock(&mgr.mutex);
return open;
}
extern conmgr_fd_ref_t *fd_new_ref(conmgr_fd_t *con)
{
conmgr_fd_ref_t *ref;
xassert(con->magic == MAGIC_CON_MGR_FD);
ref = xmalloc(sizeof(*ref));
*ref = (conmgr_fd_ref_t) {
.magic = MAGIC_CON_MGR_FD_REF,
.con = con,
};
con->refs++;
xassert(con->refs < INT_MAX);
xassert(con->refs > 0);
return ref;
}
extern conmgr_fd_ref_t *conmgr_fd_new_ref(conmgr_fd_t *con)
{
conmgr_fd_ref_t *ref = NULL;
if (!con)
fatal_abort("con must not be null");
slurm_mutex_lock(&mgr.mutex);
ref = fd_new_ref(con);
slurm_mutex_unlock(&mgr.mutex);
return ref;
}
extern conmgr_fd_ref_t *conmgr_con_link(conmgr_fd_ref_t *con)
{
conmgr_fd_ref_t *ref = NULL;
xassert(con);
slurm_mutex_lock(&mgr.mutex);
ref = fd_new_ref(con->con);
slurm_mutex_unlock(&mgr.mutex);
return ref;
}
extern void fd_free_ref(conmgr_fd_ref_t **ref_ptr)
{
conmgr_fd_ref_t *ref = *ref_ptr;
conmgr_fd_t *con = ref->con;
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
con->refs--;
xassert(con->refs < INT_MAX);
xassert(con->refs >= 0);
ref->magic = ~MAGIC_CON_MGR_FD_REF;
xfree((*ref_ptr));
}
extern void conmgr_fd_free_ref(conmgr_fd_ref_t **ref_ptr)
{
if (!ref_ptr)
fatal_abort("ref_ptr must not be null");
/* check if already released */
if (!*ref_ptr)
return;
slurm_mutex_lock(&mgr.mutex);
fd_free_ref(ref_ptr);
slurm_mutex_unlock(&mgr.mutex);
}
extern conmgr_fd_t *fd_get_ref(conmgr_fd_ref_t *ref)
{
if (!ref)
return NULL;
xassert(ref->magic == MAGIC_CON_MGR_FD_REF);
xassert(ref->con);
return ref->con;
}
extern conmgr_fd_t *conmgr_fd_get_ref(conmgr_fd_ref_t *ref)
{
conmgr_fd_t *con = fd_get_ref(ref);
#ifndef NDEBUG
if (con) {
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
xassert(con->refs < INT_MAX);
xassert(con->refs > 0);
slurm_mutex_unlock(&mgr.mutex);
}
#endif
return con;
}
extern bool conmgr_fd_is_tls(conmgr_fd_ref_t *ref)
{
conmgr_fd_t *con = fd_get_ref(ref);
bool tls = false;
xassert(con);
slurm_mutex_lock(&mgr.mutex);
xassert(con->magic == MAGIC_CON_MGR_FD);
tls = (con_flag(con, FLAG_TLS_CLIENT) ||
con_flag(con, FLAG_TLS_SERVER));
slurm_mutex_unlock(&mgr.mutex);
return tls;
}