| /*****************************************************************************\ |
| * con.c - definitions for connection handlers in connection manager |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #define _GNU_SOURCE |
| #include <limits.h> |
| #include <stdbool.h> |
| #include <sys/stat.h> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <sys/un.h> |
| #include <unistd.h> |
| |
| #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) |
| #include <sys/param.h> |
| #include <sys/ucred.h> |
| #endif |
| |
| #if defined(__linux__) |
| #include <sys/sysmacros.h> |
| #endif /* __linux__ */ |
| |
| #include "slurm/slurm.h" |
| #include "slurm/slurm_errno.h" |
| |
| #include "src/common/fd.h" |
| #include "src/common/http.h" |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/net.h" |
| #include "src/common/pack.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/slurm_protocol_socket.h" |
| #include "src/common/slurm_time.h" |
| #include "src/common/util-net.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/conmgr/conmgr.h" |
| #include "src/conmgr/delayed.h" |
| #include "src/conmgr/mgr.h" |
| #include "src/conmgr/polling.h" |
| #include "src/conmgr/tls.h" |
| |
| #include "src/interfaces/tls.h" |
| #include "src/interfaces/url_parser.h" |
| |
| #define T(type) { type, XSTRINGIFY(type) } |
| static const struct { |
| conmgr_con_type_t type; |
| const char *string; |
| } con_types[] = { |
| T(CON_TYPE_NONE), |
| T(CON_TYPE_RAW), |
| T(CON_TYPE_RPC), |
| }; |
| #undef T |
| |
| #define T(flag) { flag, XSTRINGIFY(flag) } |
| static const struct { |
| con_flags_t flag; |
| const char *string; |
| } con_flags[] = { |
| T(FLAG_NONE), |
| T(FLAG_ON_DATA_TRIED), |
| T(FLAG_IS_SOCKET), |
| T(FLAG_IS_LISTEN), |
| T(FLAG_WAIT_ON_FINISH), |
| T(FLAG_CAN_WRITE), |
| T(FLAG_CAN_READ), |
| T(FLAG_READ_EOF), |
| T(FLAG_IS_CONNECTED), |
| T(FLAG_WORK_ACTIVE), |
| T(FLAG_RPC_KEEP_BUFFER), |
| T(FLAG_QUIESCE), |
| T(FLAG_CAN_QUERY_OUTPUT_BUFFER), |
| T(FLAG_IS_FIFO), |
| T(FLAG_IS_CHR), |
| T(FLAG_TCP_NODELAY), |
| T(FLAG_WATCH_WRITE_TIMEOUT), |
| T(FLAG_WATCH_READ_TIMEOUT), |
| T(FLAG_WATCH_CONNECT_TIMEOUT), |
| T(FLAG_TLS_SERVER), |
| T(FLAG_TLS_CLIENT), |
| T(FLAG_IS_TLS_CONNECTED), |
| T(FLAG_WAIT_ON_FINGERPRINT), |
| T(FLAG_TLS_WAIT_ON_CLOSE), |
| T(FLAG_RPC_RECV_FORWARD), |
| }; |
| #undef T |
| |
| typedef struct { |
| const conmgr_events_t *events; |
| void *arg; |
| conmgr_con_type_t type; |
| int rc; |
| conmgr_con_flags_t flags; |
| } socket_listen_init_t; |
| |
| typedef struct { |
| #define MAGIC_RECEIVE_FD 0xeba8bae0 |
| int magic; /* MAGIC_RECEIVE_FD */ |
| conmgr_con_type_t type; |
| const conmgr_events_t *events; |
| void *arg; |
| } receive_fd_args_t; |
| |
| typedef struct { |
| #define MAGIC_SEND_FD 0xfbf8e2e0 |
| int magic; /* MAGIC_SEND_FD */ |
| int fd; /* fd to send over con */ |
| } send_fd_args_t; |
| |
| static void _validate_pctl_type(pollctl_fd_type_t type) |
| { |
| xassert(type > PCTL_TYPE_INVALID); |
| xassert(type < PCTL_TYPE_INVALID_MAX); |
| } |
| |
| extern const char *conmgr_con_type_string(conmgr_con_type_t type) |
| { |
| for (int i = 0; i < ARRAY_SIZE(con_types); i++) |
| if (con_types[i].type == type) |
| return con_types[i].string; |
| |
| fatal_abort("invalid type"); |
| } |
| static const char *_con_flag_string(con_flags_t flag) |
| { |
| for (int i = 0; i < ARRAY_SIZE(con_flags); i++) |
| if (con_flags[i].flag == flag) |
| return con_flags[i].string; |
| |
| fatal_abort("invalid type"); |
| } |
| |
| extern char *con_flags_string(const con_flags_t flags) |
| { |
| char *str = NULL, *at = NULL; |
| uint32_t matched = 0; |
| |
| if (flags == FLAG_NONE) |
| return xstrdup(_con_flag_string(FLAG_NONE)); |
| |
| /* skip FLAG_NONE */ |
| for (int i = 1; i < ARRAY_SIZE(con_flags); i++) { |
| if ((con_flags[i].flag & flags) == con_flags[i].flag) { |
| xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""), |
| con_flags[i].string); |
| matched |= con_flags[i].flag; |
| } |
| } |
| |
| if (flags ^ matched) |
| xstrfmtcatat(str, &at, "%s0x%08"PRIx32, (str ? "|" : ""), |
| (flags ^ matched)); |
| |
| return str; |
| } |
| |
| /* |
| * Close all connections (for_each) |
| * NOTE: must hold mgr.mutex |
| */ |
| static int _close_con_for_each(void *x, void *arg) |
| { |
| conmgr_fd_t *con = x; |
| close_con(true, con); |
| return 1; |
| } |
| |
| /* mgr.mutex must be locked when calling this function */ |
| extern void close_all_connections(void) |
| { |
| /* close all connections */ |
| list_for_each(mgr.connections, _close_con_for_each, NULL); |
| list_for_each(mgr.listen_conns, _close_con_for_each, NULL); |
| } |
| |
| extern void work_close_con(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| close_con(false, conmgr_args.con); |
| } |
| |
| /* |
| * Stop reading from connection but write out the remaining buffer and finish |
| * any queued work |
| */ |
| extern void close_con(bool locked, conmgr_fd_t *con) |
| { |
| int input_fd = -1; |
| bool is_same_fd, is_socket, is_listen; |
| |
| if (!locked) |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (con->input_fd < 0) { |
| xassert(con_flag(con, FLAG_READ_EOF) || |
| con_flag(con, FLAG_IS_LISTEN)); |
| xassert(!con_flag(con, FLAG_CAN_READ) || |
| con_flag(con, FLAG_IS_LISTEN)); |
| |
| if (!locked) |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| log_flag(CONMGR, "%s: [%s] ignoring duplicate close request", |
| __func__, con->name); |
| return; |
| } |
| |
| log_flag(CONMGR, "%s: [%s] closing input", __func__, con->name); |
| |
| /* |
| * Stop polling read/write to input fd to allow handle_connection() to |
| * select what needs to be monitored |
| */ |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| |
| /* mark it as EOF even if it hasn't */ |
| con_set_flag(con, FLAG_READ_EOF); |
| con_unset_flag(con, FLAG_CAN_READ); |
| |
| /* drop any unprocessed input buffer */ |
| if (con->in) |
| set_buf_offset(con->in, 0); |
| if (con->tls_in) |
| set_buf_offset(con->tls_in, 0); |
| |
| is_same_fd = (con->input_fd == con->output_fd); |
| is_socket = con_flag(con, FLAG_IS_SOCKET); |
| is_listen = con_flag(con, FLAG_IS_LISTEN); |
| |
| input_fd = con->input_fd; |
| con->input_fd = -1; |
| |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| if (!locked) |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| /* unlink listener sockets to avoid leaving ghost socket */ |
| if (is_listen && (con->address.ss_family == AF_LOCAL)) { |
| struct sockaddr_un *un = (struct sockaddr_un *) &con->address; |
| |
| if (unlink(un->sun_path)) |
| error("%s: [%s] unable to unlink %s: %m", |
| __func__, con->name, un->sun_path); |
| else |
| log_flag(CONMGR, "%s: [%s] unlinked %s", |
| __func__, con->name, un->sun_path); |
| } |
| |
| if (is_listen || !is_same_fd) { |
| fd_close(&input_fd); |
| } else if (is_socket && shutdown(input_fd, SHUT_RD)) { |
| /* shutdown input on sockets */ |
| log_flag(CONMGR, "%s: [%s] unable to shutdown incoming socket half: %m", |
| __func__, con->name); |
| } |
| } |
| |
| static char *_resolve_tty_name(int fd) |
| { |
| char buf[PATH_MAX] = {0}; |
| |
| if (ttyname_r(fd, buf, (sizeof(buf) - 1))) { |
| log_flag(CONMGR, "%s: unable to resolve tty at fd:%d: %m", |
| __func__, fd); |
| return NULL; |
| } |
| |
| return xstrdup(buf); |
| } |
| |
| static char *_resolve_fd(int fd, struct stat *stat_ptr) |
| { |
| char *name = NULL; |
| |
| if (S_ISSOCK(stat_ptr->st_mode)) { |
| slurm_addr_t addr = {0}; |
| |
| if (!slurm_get_stream_addr(fd, &addr) && |
| (addr.ss_family != AF_UNSPEC) && |
| (name = sockaddr_to_string(&addr, sizeof(addr)))) |
| return name; |
| } |
| |
| if ((name = fd_resolve_path(fd))) |
| return name; |
| |
| if (S_ISFIFO(stat_ptr->st_mode)) |
| return xstrdup_printf("pipe"); |
| |
| if (S_ISCHR(stat_ptr->st_mode)) { |
| if (isatty(fd) && (name = _resolve_tty_name(fd))) |
| return name; |
| |
| #if defined(__linux__) |
| return xstrdup_printf("device:%u.%u", major(stat_ptr->st_dev), |
| minor(stat_ptr->st_dev)); |
| #else /* !__linux__ */ |
| return xstrdup_printf("device:0x%"PRIx64, stat_ptr->st_dev); |
| #endif /* !__linux__ */ |
| } |
| |
| #if defined(__linux__) |
| if (S_ISBLK(stat_ptr->st_mode)) |
| return xstrdup_printf("block:%u.%u", major(stat_ptr->st_dev), |
| minor(stat_ptr->st_dev)); |
| #endif /* __linux__ */ |
| |
| return NULL; |
| } |
| |
| /* set connection name if one was not resolved already */ |
| static void _set_connection_name(conmgr_fd_t *con, struct stat *in_stat, |
| struct stat *out_stat) |
| { |
| xassert(con); |
| xassert(!con->name); |
| |
| char *in_str = NULL, *out_str = NULL; |
| const bool has_in = (con->input_fd >= 0); |
| const bool has_out = (con->output_fd >= 0); |
| bool is_same = (con->input_fd == con->output_fd); |
| |
| if (!has_in && !has_out) { |
| con->name = xstrdup("INVALID"); |
| return; |
| } |
| |
| /* grab socket peer if possible */ |
| if (con_flag(con, FLAG_IS_SOCKET) && has_out) |
| out_str = fd_resolve_peer(con->output_fd); |
| |
| if (has_out && !out_str) |
| out_str = _resolve_fd(con->output_fd, out_stat); |
| if (has_in) |
| in_str = _resolve_fd(con->input_fd, in_stat); |
| |
| /* avoid "->" syntax if same on both sides */ |
| if (in_str && out_str && !xstrcmp(in_str, out_str)) { |
| is_same = true; |
| xfree(out_str); |
| } |
| |
| if (is_same) { |
| xstrfmtcat(con->name, "%s(fd:%d)", in_str, |
| con->input_fd); |
| } else if (has_in && has_out) { |
| xstrfmtcat(con->name, "%s(fd:%d)->%s(fd:%d)", in_str, |
| con->input_fd, out_str, con->output_fd); |
| } else if (has_in && !has_out) { |
| xstrfmtcat(con->name, "%s(fd:%d)->()", in_str, con->input_fd); |
| } else if (!has_in && has_out) { |
| xstrfmtcat(con->name, "()->%s(fd:%d)", out_str, con->output_fd); |
| } else { |
| xassert(false); |
| } |
| |
| xfree(out_str); |
| xfree(in_str); |
| } |
| |
| static void _check_con_type(conmgr_fd_t *con, conmgr_con_type_t type) |
| { |
| #ifndef NDEBUG |
| if (type == CON_TYPE_RAW) { |
| /* must have on_data() defined */ |
| xassert(con->events->on_data); |
| } else if (type == CON_TYPE_RPC) { |
| /* must have on_msg() defined */ |
| xassert(con->events->on_msg); |
| } else { |
| fatal_abort("invalid type"); |
| } |
| #endif /* !NDEBUG */ |
| } |
| |
| extern int fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| _check_con_type(con, type); |
| |
| if (con->type == type) { |
| log_flag(CONMGR, "%s: [%s] ignoring unchanged type: %s", |
| __func__, con->name, conmgr_con_type_string(type)); |
| return SLURM_SUCCESS; |
| } |
| |
| log_flag(CONMGR, "%s: [%s] changing type: %s->%s pending_reads=%u pending_writes=%u", |
| __func__, con->name, conmgr_con_type_string(con->type), |
| conmgr_con_type_string(type), |
| con->in ? get_buf_offset(con->in) : 0, |
| list_count(con->out)); |
| |
| /* Always set TCP_NODELAY for Slurm RPC connections */ |
| if (con->type == CON_TYPE_RPC) |
| con_set_flag(con, FLAG_TCP_NODELAY); |
| |
| con->type = type; |
| |
| if (con_flag(con, FLAG_IS_SOCKET) && con_flag(con, FLAG_TCP_NODELAY) && |
| (con->output_fd >= 0)) { |
| int rc; |
| |
| if ((rc = net_set_nodelay(con->output_fd, true, NULL))) { |
| log_flag(CONMGR, "%s: [%s] unable to set TCP_NODELAY: %s", |
| __func__, con->name, slurm_strerror(rc)); |
| return rc; |
| } |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern int conmgr_fd_change_mode(conmgr_fd_t *con, conmgr_con_type_t type) |
| { |
| int rc; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| rc = fd_change_mode(con, type); |
| |
| /* wake up watch() to send along any pending data */ |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| extern int add_connection(conmgr_con_type_t type, |
| conmgr_fd_t *source, int input_fd, |
| int output_fd, |
| const conmgr_events_t *events, |
| conmgr_con_flags_t flags, |
| const slurm_addr_t *addr, |
| socklen_t addrlen, bool is_listen, |
| const char *unix_socket_path, void *tls_conn, |
| void *arg) |
| { |
| struct stat in_stat = { 0 }; |
| struct stat out_stat = { 0 }; |
| conmgr_fd_t *con = NULL; |
| bool set_keep_alive, is_socket, is_fifo, is_chr; |
| const bool has_in = (input_fd >= 0); |
| const bool has_out = (output_fd >= 0); |
| const bool is_same = (input_fd == output_fd); |
| const size_t unix_socket_path_len = |
| (unix_socket_path ? (strlen(unix_socket_path) + 1): 0); |
| static const size_t unix_socket_path_max = |
| sizeof(((struct sockaddr_un *) NULL)->sun_path); |
| |
| if (unix_socket_path_len && |
| (unix_socket_path_len > unix_socket_path_max)) { |
| log_flag(CONMGR, "%s: Unix domain socket path too long %zu/%zu: %s", |
| __func__, unix_socket_path_len, unix_socket_path_max, |
| unix_socket_path); |
| return ENAMETOOLONG; |
| } |
| |
| /* verify FD is valid and still open */ |
| if (has_in && fstat(input_fd, &in_stat)) { |
| log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, input_fd); |
| return SLURM_COMMUNICATIONS_INVALID_INCOMING_FD; |
| } |
| if (has_out && fstat(output_fd, &out_stat)) { |
| log_flag(CONMGR, "%s: invalid fd:%d: %m", __func__, output_fd); |
| return SLURM_COMMUNICATIONS_INVALID_OUTGOING_FD; |
| } |
| |
| if (!has_in && !has_out) { |
| log_flag(CONMGR, "%s: refusing connection without input or output fd", |
| __func__); |
| return SLURM_COMMUNICATIONS_INVALID_FD; |
| } |
| |
| is_socket = (has_in && S_ISSOCK(in_stat.st_mode)) || |
| (has_out && S_ISSOCK(out_stat.st_mode)); |
| is_fifo = (has_in && S_ISFIFO(in_stat.st_mode)) || |
| (has_out && S_ISFIFO(out_stat.st_mode)); |
| is_chr = (has_in && S_ISCHR(in_stat.st_mode)) || |
| (has_out && S_ISCHR(out_stat.st_mode)); |
| set_keep_alive = !unix_socket_path && is_socket && !is_listen; |
| |
| /* all connections are non-blocking */ |
| if (has_in) { |
| if (set_keep_alive) |
| net_set_keep_alive(input_fd); |
| fd_set_nonblocking(input_fd); |
| } |
| if (!is_same && has_out) { |
| fd_set_nonblocking(output_fd); |
| |
| if (set_keep_alive) |
| net_set_keep_alive(output_fd); |
| } |
| |
| con = xmalloc(sizeof(*con)); |
| *con = (conmgr_fd_t){ |
| .magic = MAGIC_CON_MGR_FD, |
| .address = { |
| .ss_family = AF_UNSPEC |
| }, |
| .input_fd = input_fd, |
| .output_fd = output_fd, |
| .events = events, |
| .mss = NO_VAL, |
| .work = list_create(NULL), |
| .write_complete_work = list_create(NULL), |
| .new_arg = arg, |
| .type = CON_TYPE_NONE, |
| .polling_input_fd = PCTL_TYPE_NONE, |
| .polling_output_fd = PCTL_TYPE_NONE, |
| /* Set flags not related to connection state tracking */ |
| .flags = (flags & ~FLAGS_MASK_STATE), |
| }; |
| |
| /* save if connection is a socket type to avoid calling fstat() again */ |
| con_assign_flag(con, FLAG_IS_SOCKET, is_socket); |
| con_assign_flag(con, FLAG_IS_LISTEN, is_listen); |
| con_assign_flag(con, FLAG_READ_EOF, !has_in); |
| con_assign_flag(con, FLAG_IS_FIFO, is_fifo); |
| con_assign_flag(con, FLAG_IS_CHR, is_chr); |
| |
| /* |
| * Check for TLS fingerprint if connection is not already flagged as a |
| * TLS connection and the fingerprint callback is present. |
| */ |
| con_assign_flag(con, FLAG_WAIT_ON_FINGERPRINT, |
| (events->on_fingerprint && |
| !con_flag(con, FLAG_TLS_CLIENT) && |
| !con_flag(con, FLAG_TLS_SERVER))); |
| |
| if (!is_listen) { |
| con->in = create_buf(xmalloc(BUFFER_START_SIZE), |
| BUFFER_START_SIZE); |
| con->out = list_create((ListDelF) free_buf); |
| } |
| |
| /* listen on unix socket */ |
| |
| if (!unix_socket_path && source && |
| (source->address.ss_family == AF_LOCAL)) { |
| struct sockaddr_un *un = (struct sockaddr_un *) &source->address; |
| unix_socket_path = un->sun_path; |
| } |
| |
| if (unix_socket_path) { |
| struct sockaddr_un *un = (struct sockaddr_un *) &con->address; |
| |
| xassert(unix_socket_path_len <= unix_socket_path_max); |
| xassert(con_flag(con, FLAG_IS_SOCKET)); |
| xassert(addr->ss_family == AF_LOCAL); |
| |
| con->address.ss_family = AF_LOCAL; |
| strlcpy(un->sun_path, unix_socket_path, unix_socket_path_len); |
| } else if (is_socket && (addrlen > 0) && addr) { |
| memcpy(&con->address, addr, addrlen); |
| } |
| |
| if (is_socket && (con->address.ss_family == AF_UNSPEC)) { |
| int rc = SLURM_SUCCESS; |
| int fd = (has_out ? output_fd : input_fd); |
| |
| if (!is_listen && |
| (rc = slurm_get_peer_addr(fd, &con->address))) { |
| log_flag(CONMGR, "%s: [fd:%d] Unable to resolve remote host: %s", |
| __func__, fd, slurm_strerror(rc)); |
| } else if (slurm_get_stream_addr(fd, &con->address)) { |
| log_flag(CONMGR, "%s: [fd:%d] Unable to resolve bind()ed IP: %m", |
| __func__, fd); |
| } |
| } |
| |
| if (has_out) { |
| int bytes = -1; |
| |
| if (!fd_get_buffered_output_bytes(con->output_fd, &bytes, |
| con->name)) { |
| xassert(bytes >= 0); |
| con_set_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER); |
| } |
| } |
| |
| _set_connection_name(con, &in_stat, &out_stat); |
| |
| fd_change_mode(con, type); |
| |
| if (con_flag(con, FLAG_WATCH_CONNECT_TIMEOUT)) |
| con->last_read = timespec_now(); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| |
| log_flag(CONMGR, "%s: [%s] new connection input_fd=%u output_fd=%u flags=%s", |
| __func__, con->name, input_fd, output_fd, flags); |
| |
| xfree(flags); |
| } |
| |
| if (tls_conn) |
| tls_adopt(con, tls_conn); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| if (is_listen) { |
| xassert(con->output_fd <= 0); |
| list_append(mgr.listen_conns, con); |
| } else { |
| list_append(mgr.connections, con); |
| } |
| |
| /* Attempt to handle connection state immediately */ |
| handle_connection(true, con); |
| |
| /* |
| * interrupt poll () and wake up watch() to eventually re-examine new |
| * connection. |
| */ |
| pollctl_interrupt(__func__); |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern void wrap_on_connection(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| if (con_flag(con, FLAG_IS_LISTEN)) { |
| log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR, |
| __func__, con->name, |
| (uintptr_t) con->events->on_listen_connect); |
| |
| arg = con->events->on_listen_connect(con, con->new_arg); |
| |
| log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR, |
| __func__, con->name, |
| (uintptr_t) con->events->on_listen_connect, |
| (uintptr_t) arg); |
| } else { |
| log_flag(CONMGR, "%s: [%s] BEGIN func=0x%"PRIxPTR, |
| __func__, con->name, |
| (uintptr_t) con->events->on_connection); |
| |
| arg = con->events->on_connection(con, con->new_arg); |
| |
| log_flag(CONMGR, "%s: [%s] END func=0x%"PRIxPTR" arg=0x%"PRIxPTR, |
| __func__, con->name, |
| (uintptr_t) con->events->on_connection, |
| (uintptr_t) arg); |
| } |
| |
| if (!arg) { |
| error("%s: [%s] closing connection due to NULL return from on_connection", |
| __func__, con->name); |
| close_con(false, con); |
| return; |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con->arg = arg; |
| |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern int conmgr_process_fd(conmgr_con_type_t type, int input_fd, |
| int output_fd, const conmgr_events_t *events, |
| conmgr_con_flags_t flags, |
| const slurm_addr_t *addr, socklen_t addrlen, |
| void *tls_conn, void *arg) |
| { |
| return add_connection(type, NULL, input_fd, output_fd, events, |
| flags, addr, addrlen, false, NULL, tls_conn, arg); |
| } |
| |
| extern int conmgr_process_fd_listen(int fd, conmgr_con_type_t type, |
| const conmgr_events_t *events, |
| conmgr_con_flags_t flags, void *arg) |
| { |
| return add_connection(type, NULL, fd, -1, events, flags, NULL, |
| 0, true, NULL, NULL, arg); |
| } |
| |
| static void _receive_fd(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| receive_fd_args_t *args = arg; |
| conmgr_fd_t *src = conmgr_args.con; |
| int fd = -1; |
| |
| xassert(args->magic == MAGIC_RECEIVE_FD); |
| xassert(src->magic == MAGIC_CON_MGR_FD); |
| |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) { |
| log_flag(CONMGR, "%s: [%s] Canceled receive new file descriptor", |
| __func__, src->name); |
| } else if (con_flag(src, FLAG_READ_EOF)) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d", |
| __func__, src->name, src->input_fd); |
| } else if (src->input_fd < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d", |
| __func__, src->name, src->input_fd); |
| } else if ((fd = receive_fd_over_socket(src->input_fd)) < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on input_fd=%d", |
| __func__, src->name, src->input_fd); |
| /* |
| * Close source as receive_fd_over_socket() failed and |
| * connection is now in an unknown state |
| */ |
| close_con(false, src); |
| } else if (add_connection(args->type, NULL, fd, fd, args->events, |
| CON_FLAG_NONE, NULL, 0, false, NULL, |
| NULL, args->arg) != SLURM_SUCCESS) { |
| /* |
| * Error already logged by add_connection() and there is no |
| * reason to assume that failing is due to the state of src. |
| */ |
| } |
| |
| args->magic = ~MAGIC_RECEIVE_FD; |
| xfree(args); |
| } |
| |
| extern int conmgr_queue_receive_fd(conmgr_fd_t *src, conmgr_con_type_t type, |
| const conmgr_events_t *events, void *arg) |
| { |
| int rc = SLURM_ERROR; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| xassert(src->magic == MAGIC_CON_MGR_FD); |
| xassert(type > CON_TYPE_NONE); |
| xassert(type < CON_TYPE_MAX); |
| |
| /* Reject obviously invalid states immediately */ |
| |
| if (!con_flag(src, FLAG_IS_SOCKET)) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on non-socket", |
| __func__, src->name); |
| rc = EAFNOSUPPORT; |
| } else if (con_flag(src, FLAG_READ_EOF)) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on SHUT_RD input_fd=%d", |
| __func__, src->name, src->input_fd); |
| rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR; |
| } else if (src->input_fd < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to receive new file descriptor on invalid input_fd=%d", |
| __func__, src->name, src->input_fd); |
| rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR; |
| } else { |
| receive_fd_args_t *args = xmalloc_nz(sizeof(*args)); |
| *args = (receive_fd_args_t) { |
| .magic = MAGIC_RECEIVE_FD, |
| .type = type, |
| .events = events, |
| .arg = arg, |
| }; |
| add_work_con_fifo(true, src, _receive_fd, args); |
| rc = SLURM_SUCCESS; |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| static void _send_fd(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| send_fd_args_t *args = arg; |
| conmgr_fd_t *con = conmgr_args.con; |
| int fd = args->fd; |
| |
| xassert(args->magic == MAGIC_SEND_FD); |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) { |
| log_flag(CONMGR, "%s: [%s] Canceled sending file descriptor %d.", |
| __func__, con->name, fd); |
| } else if (con->output_fd < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d", |
| __func__, con->name, fd, con->output_fd); |
| } else { |
| send_fd_over_socket(con->output_fd, fd); |
| log_flag(CONMGR, "%s: [%s] Sent file descriptor %d over output_fd=%d", |
| __func__, con->name, fd, con->output_fd); |
| } |
| |
| /* always close the file descriptor in this process to avoid leaking */ |
| fd_close(&fd); |
| |
| args->magic = ~MAGIC_SEND_FD; |
| xfree(args); |
| } |
| |
| extern int conmgr_queue_send_fd(conmgr_fd_t *con, int fd) |
| { |
| int rc = SLURM_ERROR; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (fd < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to send invalid file descriptor %d", |
| __func__, con->name, fd); |
| rc = EINVAL; |
| } else if (!con_flag(con, FLAG_IS_SOCKET)) { |
| log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over non-socket", |
| __func__, con->name, fd); |
| rc = EAFNOSUPPORT; |
| } else if (con->output_fd < 0) { |
| log_flag(CONMGR, "%s: [%s] Unable to send file descriptor %d over invalid output_fd=%d", |
| __func__, con->name, fd, con->output_fd); |
| rc = SLURM_COMMUNICATIONS_MISSING_SOCKET_ERROR; |
| } else { |
| send_fd_args_t *args = xmalloc_nz(sizeof(*args)); |
| *args = (send_fd_args_t) { |
| .magic = MAGIC_SEND_FD, |
| .fd = fd, |
| }; |
| add_work_con_fifo(true, con, _send_fd, args); |
| rc = SLURM_SUCCESS; |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| static void _deferred_close_fd(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| if (con_flag(con, FLAG_WORK_ACTIVE)) { |
| slurm_mutex_unlock(&mgr.mutex); |
| conmgr_queue_close_fd(con); |
| } else { |
| close_con(true, con); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| } |
| |
| /* Caller must hold mgr.mutex lock */ |
| static void _close_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (!con_flag(con, FLAG_WORK_ACTIVE)) { |
| /* |
| * Defer request to close connection until connection is no |
| * longer actively doing work as closing connection would change |
| * several variables guaranteed to not change while work is |
| * active. |
| */ |
| add_work_con_fifo(true, con, _deferred_close_fd, con); |
| } else { |
| close_con(true, con); |
| } |
| } |
| |
| extern void conmgr_queue_close_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| _close_fd(con); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern void conmgr_con_queue_close_free(conmgr_fd_ref_t **ref_ptr) |
| { |
| conmgr_fd_ref_t *ref = NULL; |
| |
| xassert(ref_ptr); |
| |
| /* skip if already released */ |
| if (!(ref = *ref_ptr)) |
| return; |
| |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| xassert(ref->con->magic == MAGIC_CON_MGR_FD); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| _close_fd(ref->con); |
| fd_free_ref(ref_ptr); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static int _match_socket_address(void *x, void *key) |
| { |
| conmgr_fd_t *con = x; |
| const slurm_addr_t *addr1 = key; |
| const slurm_addr_t *addr2 = &con->address; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (addr1->ss_family != addr2->ss_family) |
| return 0; |
| |
| switch (addr1->ss_family) { |
| case AF_INET: |
| { |
| const struct sockaddr_in *a1 = |
| (const struct sockaddr_in *) addr1; |
| const struct sockaddr_in *a2 = |
| (const struct sockaddr_in *) addr2; |
| |
| if (a1->sin_port != a2->sin_port) |
| return 0; |
| |
| return !memcmp(&a1->sin_addr.s_addr, |
| &a2->sin_addr.s_addr, |
| sizeof(a2->sin_addr.s_addr)); |
| } |
| case AF_INET6: |
| { |
| const struct sockaddr_in6 *a1 = |
| (const struct sockaddr_in6 *) addr1; |
| const struct sockaddr_in6 *a2 = |
| (const struct sockaddr_in6 *) addr2; |
| |
| if (a1->sin6_port != a2->sin6_port) |
| return 0; |
| if (a1->sin6_scope_id != a2->sin6_scope_id) |
| return 0; |
| return !memcmp(&a1->sin6_addr.s6_addr, |
| &a2->sin6_addr.s6_addr, |
| sizeof(a2->sin6_addr.s6_addr)); |
| } |
| case AF_UNIX: |
| { |
| const struct sockaddr_un *a1 = |
| (const struct sockaddr_un *) addr1; |
| const struct sockaddr_un *a2 = |
| (const struct sockaddr_un *) addr2; |
| |
| return !xstrcmp(a1->sun_path, a2->sun_path); |
| } |
| default: |
| { |
| fatal_abort("Unexpected ss family type %u", |
| (uint32_t) addr1->ss_family); |
| } |
| } |
| /* Unreachable */ |
| fatal_abort("This should never happen"); |
| } |
| |
| static bool _is_listening(const slurm_addr_t *addr, socklen_t addrlen) |
| { |
| /* use address to ensure memory size is correct */ |
| slurm_addr_t address = {0}; |
| |
| memcpy(&address, addr, addrlen); |
| |
| if (list_find_first_ro(mgr.listen_conns, _match_socket_address, |
| &address)) |
| return true; |
| |
| return false; |
| } |
| |
| static int _add_unix_listener(conmgr_con_type_t type, conmgr_con_flags_t flags, |
| const char *listen_on, const char *unixsock, |
| const conmgr_events_t *events, void *arg) |
| { |
| slurm_addr_t addr = { 0 }; |
| int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); |
| int rc = EINVAL; |
| |
| if (fd < 0) |
| fatal("%s: socket() failed: %m", __func__); |
| |
| addr = sockaddr_from_unix_path(unixsock); |
| |
| if (addr.ss_family != AF_UNIX) |
| fatal("%s: [%s] Invalid Unix socket path: %s", |
| __func__, listen_on, unixsock); |
| |
| log_flag(CONMGR, "%s: [%pA] attempting to bind() and listen() UNIX socket", |
| __func__, &addr); |
| |
| if (unlink(unixsock) && (errno != ENOENT)) |
| error("Error unlink(%s): %m", unixsock); |
| |
| /* bind() will EINVAL if socklen=sizeof(addr) */ |
| if ((rc = bind(fd, (const struct sockaddr *) &addr, |
| sizeof(struct sockaddr_un)))) |
| fatal("%s: [%s] Unable to bind UNIX socket: %m", |
| __func__, listen_on); |
| |
| fd_set_oob(fd, 0); |
| |
| rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG); |
| if (rc < 0) |
| fatal("%s: [%s] unable to listen(): %m", |
| __func__, listen_on); |
| |
| return add_connection(type, NULL, fd, -1, events, flags, &addr, |
| sizeof(addr), true, unixsock, NULL, arg); |
| } |
| |
| static int _add_socket_listener(conmgr_con_type_t type, |
| conmgr_con_flags_t flags, const char *listen_on, |
| url_t *url, const conmgr_events_t *events, |
| void *arg) |
| { |
| int rc = SLURM_SUCCESS; |
| struct addrinfo *addrlist = NULL; |
| |
| /* resolve out the host and port if provided */ |
| if (!(addrlist = xgetaddrinfo(url->host, url->port))) |
| fatal("%s: Unable to listen on %s:%s(%s): %m", |
| __func__, url->host, url->port, listen_on); |
| |
| /* |
| * Create a socket for every address returned |
| * ipv6 clone of net_stream_listen_ports() |
| */ |
| for (struct addrinfo *addr = addrlist; !rc && addr != NULL; |
| addr = addr->ai_next) { |
| /* clone the address since it will be freed at |
| * end of this loop |
| */ |
| int fd; |
| int one = 1; |
| |
| if (_is_listening((const slurm_addr_t *) addr->ai_addr, |
| addr->ai_addrlen)) { |
| verbose("%s: ignoring duplicate listen request for %pA", |
| __func__, (const slurm_addr_t *) addr->ai_addr); |
| continue; |
| } |
| |
| fd = socket(addr->ai_family, addr->ai_socktype | SOCK_CLOEXEC, |
| addr->ai_protocol); |
| if (fd < 0) |
| fatal("%s: [%s] Unable to create socket: %m", |
| __func__, addrinfo_to_string(addr)); |
| |
| /* |
| * activate socket reuse to avoid annoying timing issues |
| * with daemon restarts |
| */ |
| if (setsockopt(fd, addr->ai_socktype, SO_REUSEADDR, |
| &one, sizeof(one))) |
| fatal("%s: [%s] setsockopt(SO_REUSEADDR) failed: %m", |
| __func__, addrinfo_to_string(addr)); |
| |
| if (bind(fd, addr->ai_addr, addr->ai_addrlen) != 0) |
| fatal("%s: [%s] Unable to bind socket: %m", |
| __func__, addrinfo_to_string(addr)); |
| |
| fd_set_oob(fd, 0); |
| |
| rc = listen(fd, SLURM_DEFAULT_LISTEN_BACKLOG); |
| if (rc < 0) |
| fatal("%s: [%s] unable to listen(): %m", |
| __func__, addrinfo_to_string(addr)); |
| |
| rc = add_connection(type, NULL, fd, -1, events, flags, |
| (const slurm_addr_t *) addr->ai_addr, |
| addr->ai_addrlen, true, NULL, NULL, arg); |
| } |
| |
| freeaddrinfo(addrlist); |
| return rc; |
| } |
| |
| extern int conmgr_create_listen_socket(conmgr_con_type_t type, |
| conmgr_con_flags_t flags, |
| const char *listen_on, |
| const conmgr_events_t *events, void *arg) |
| { |
| int rc = SLURM_SUCCESS; |
| url_t url = URL_INITIALIZER; |
| buf_t buffer = { |
| .magic = BUF_MAGIC, |
| .head = (void *) listen_on, |
| .processed = strlen(listen_on), |
| .size = strlen(listen_on), |
| .shadow = true, |
| }; |
| |
| if ((rc = url_parser_g_parse(__func__, &buffer, &url))) |
| fatal("%s: Unable to parse %s: %s", |
| __func__, listen_on, slurm_strerror(rc)); |
| |
| switch (url.scheme) { |
| case URL_SCHEME_UNIX: |
| rc = _add_unix_listener(type, flags, listen_on, url.path, |
| events, arg); |
| break; |
| case URL_SCHEME_HTTPS: |
| flags |= CON_FLAG_TLS_SERVER; |
| /* fall through */ |
| case URL_SCHEME_HTTP: |
| case URL_SCHEME_INVALID: |
| rc = _add_socket_listener(type, flags, listen_on, &url, events, |
| arg); |
| break; |
| case URL_SCHEME_INVALID_MAX: |
| fatal_abort("should never happen"); |
| } |
| |
| url_free_members(&url); |
| return rc; |
| } |
| |
| static int _setup_listen_socket(void *x, void *arg) |
| { |
| const char *hostport = (const char *)x; |
| socket_listen_init_t *init = arg; |
| |
| init->rc = conmgr_create_listen_socket(init->type, init->flags, |
| hostport, init->events, |
| init->arg); |
| |
| return (init->rc ? SLURM_ERROR : SLURM_SUCCESS); |
| } |
| |
| extern int conmgr_create_listen_sockets(conmgr_con_type_t type, |
| conmgr_con_flags_t flags, |
| list_t *hostports, |
| const conmgr_events_t *events, |
| void *arg) |
| { |
| socket_listen_init_t init = { |
| .events = events, |
| .arg = arg, |
| .type = type, |
| .flags = flags, |
| }; |
| |
| (void) list_for_each(hostports, _setup_listen_socket, &init); |
| return init.rc; |
| } |
| |
| extern int conmgr_create_connect_socket(conmgr_con_type_t type, |
| conmgr_con_flags_t flags, |
| slurm_addr_t *addr, socklen_t addrlen, |
| const conmgr_events_t *events, |
| void *arg) |
| { |
| int fd = -1, rc = SLURM_ERROR; |
| //socklen_t bindlen = 0; |
| |
| if (addr->ss_family == AF_UNIX) { |
| fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC), 0); |
| //bindlen = sizeof(struct sockaddr_un); |
| } else if ((addr->ss_family == AF_INET) || |
| (addr->ss_family == AF_INET6)) { |
| fd = socket(addr->ss_family, (SOCK_STREAM | SOCK_CLOEXEC), |
| IPPROTO_TCP); |
| //bindlen = addrlen; |
| } else { |
| return EAFNOSUPPORT; |
| } |
| |
| if (fd < 0) { |
| rc = errno; |
| log_flag(NET, "%s: [%pA] socket() failed: %s", |
| __func__, addr, slurm_strerror(rc)); |
| return rc; |
| } |
| |
| /* Set socket as non-blocking to avoid connect() blocking */ |
| fd_set_nonblocking(fd); |
| |
| log_flag(CONMGR, "%s: [%pA(fd:%d)] attempting to connect() new socket", |
| __func__, addr, fd); |
| |
| again: |
| if ((rc = connect(fd, (const struct sockaddr *) addr, addrlen))) { |
| rc = errno; |
| |
| if (rc == EINTR) { |
| bool shutdown; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(mgr.initialized); |
| shutdown = mgr.shutdown_requested; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| if (shutdown) { |
| log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted during shutdown. Closing connection.", |
| __func__, addr, fd); |
| fd_close(&fd); |
| return SLURM_SUCCESS; |
| } |
| |
| log_flag(CONMGR, "%s: [%pA(fd:%d)] connect() interrupted. Retrying.", |
| __func__, addr, fd); |
| goto again; |
| } |
| |
| if ((rc != EINPROGRESS) && (rc != EAGAIN) && |
| (rc != EWOULDBLOCK)) { |
| log_flag(NET, "%s: [%pA(fd:%d)] connect() failed: %s", |
| __func__, addr, fd, slurm_strerror(rc)); |
| fd_close(&fd); |
| return rc; |
| } |
| |
| /* delayed connect() completion is expected */ |
| } |
| |
| return add_connection(type, NULL, fd, fd, events, flags, addr, addrlen, |
| false, NULL, NULL, arg); |
| } |
| |
| extern int conmgr_get_fd_auth_creds(conmgr_fd_t *con, |
| uid_t *cred_uid, gid_t *cred_gid, |
| pid_t *cred_pid) |
| { |
| int fd, rc = ESLURM_NOT_SUPPORTED; |
| |
| xassert(cred_uid); |
| xassert(cred_gid); |
| xassert(cred_pid); |
| |
| if (!con || !cred_uid || !cred_gid || !cred_pid) |
| return EINVAL; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (((fd = con->input_fd) == -1) && ((fd = con->output_fd) == -1)) |
| return SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR; |
| |
| #if !defined(__APPLE__) && !defined(__FreeBSD__) && !defined(__NetBSD__) |
| struct ucred cred = { 0 }; |
| socklen_t len = sizeof(cred); |
| if (!getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cred, &len)) { |
| *cred_uid = cred.uid; |
| *cred_gid = cred.gid; |
| *cred_pid = cred.pid; |
| return SLURM_SUCCESS; |
| } else { |
| rc = errno; |
| } |
| #else |
| struct xucred cred = { 0 }; |
| socklen_t len = sizeof(cred); |
| if (!getsockopt(fd, 0, LOCAL_PEERCRED, &cred, &len)) { |
| *cred_uid = cred.cr_uid; |
| *cred_gid = cred.cr_groups[0]; |
| *cred_pid = cred.cr_pid; |
| return SLURM_SUCCESS; |
| } else { |
| rc = errno; |
| } |
| #endif |
| |
| return rc; |
| } |
| |
| extern const char *conmgr_fd_get_name(const conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con->name && con->name[0]); |
| return con->name; |
| } |
| |
| extern const char *conmgr_con_get_name(conmgr_fd_ref_t *ref) |
| { |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| xassert(ref->con->magic == MAGIC_CON_MGR_FD); |
| |
| return conmgr_fd_get_name(ref->con); |
| } |
| |
| extern conmgr_fd_status_t conmgr_fd_get_status(conmgr_fd_t *con) |
| { |
| conmgr_fd_status_t status = { |
| .is_socket = con_flag(con, FLAG_IS_SOCKET), |
| .unix_socket = NULL, |
| .is_listen = con_flag(con, FLAG_IS_LISTEN), |
| .read_eof = con_flag(con, FLAG_READ_EOF), |
| .is_connected = con_flag(con, FLAG_IS_CONNECTED), |
| }; |
| |
| if (con->address.ss_family == AF_LOCAL) { |
| struct sockaddr_un *un = (struct sockaddr_un *) &con->address; |
| status.unix_socket = un->sun_path; |
| } |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| return status; |
| } |
| |
| /* |
| * Find by matching fd to connection |
| */ |
| static int _find_by_fd(void *x, void *key) |
| { |
| conmgr_fd_t *con = x; |
| int fd = *(int *)key; |
| return (con->input_fd == fd) || (con->output_fd == fd); |
| } |
| |
| extern conmgr_fd_t *con_find_by_fd(int fd) |
| { |
| conmgr_fd_t *con; |
| |
| if ((con = list_find_first(mgr.connections, _find_by_fd, &fd))) |
| return con; |
| |
| if ((con = list_find_first(mgr.listen_conns, _find_by_fd, &fd))) |
| return con; |
| |
| /* mgr.complete_conns don't have input_fd or output_fd */ |
| |
| return NULL; |
| } |
| |
| extern void con_close_on_poll_error(conmgr_fd_t *con, int fd) |
| { |
| if (con_flag(con, FLAG_IS_SOCKET)) { |
| /* Ask kernel for socket error */ |
| int rc = SLURM_ERROR, err = SLURM_ERROR; |
| |
| if ((rc = fd_get_socket_error(fd, &err))) |
| error("%s: [%s] error while getting socket error: %s", |
| __func__, con->name, slurm_strerror(rc)); |
| else if (err) |
| error("%s: [%s] socket error encountered while polling: %s", |
| __func__, con->name, slurm_strerror(err)); |
| } |
| |
| /* |
| * Socket must not continue to be considered valid to avoid a |
| * infinite calls to poll() which will immediately fail. Close |
| * the relevant file descriptor and remove from connection. |
| */ |
| close_con(true, con); |
| } |
| |
| static int _set_fd_polling(int fd, pollctl_fd_type_t *old, |
| pollctl_fd_type_t new, const char *con_name, |
| const char *caller) |
| { |
| if (*old == PCTL_TYPE_UNSUPPORTED) |
| return SLURM_SUCCESS; |
| |
| if (*old == new) |
| return SLURM_SUCCESS; |
| |
| if (new == PCTL_TYPE_NONE) { |
| int rc = SLURM_SUCCESS; |
| |
| if ((*old != PCTL_TYPE_NONE) && |
| !(rc = pollctl_unlink_fd(fd, con_name, caller))) |
| *old = new; |
| |
| return rc; |
| } |
| |
| if (*old != PCTL_TYPE_NONE) { |
| int rc; |
| |
| if (!(rc = pollctl_relink_fd(fd, new, con_name, caller))) |
| *old = new; |
| |
| return rc; |
| } else { |
| int rc = pollctl_link_fd(fd, new, con_name, caller); |
| |
| if (!rc) |
| *old = new; |
| |
| return rc; |
| } |
| } |
| |
| static void _log_set_polling(conmgr_fd_t *con, bool has_in, bool has_out, |
| pollctl_fd_type_t type, pollctl_fd_type_t in_type, |
| pollctl_fd_type_t out_type, const char *caller) |
| { |
| char *log = NULL, *at = NULL; |
| const char *op = "maintain"; |
| |
| if (!(slurm_conf.debug_flags & DEBUG_FLAG_CONMGR)) |
| return; |
| |
| if (has_in) { |
| const char *old, *new; |
| |
| old = pollctl_type_to_string(con->polling_input_fd); |
| new = pollctl_type_to_string(in_type); |
| |
| xstrfmtcatat(log, &at, " in[%d]:%s", con->input_fd, old); |
| |
| if (in_type != con->polling_input_fd) { |
| xstrfmtcatat(log, &at, "->%s", new); |
| op = "changing"; |
| } |
| } |
| |
| if (has_out) { |
| const char *old, *new; |
| |
| old = pollctl_type_to_string(con->polling_output_fd); |
| new = pollctl_type_to_string(out_type); |
| |
| xstrfmtcatat(log, &at, " out[%d]:%s", con->output_fd, old); |
| |
| if (out_type != con->polling_output_fd) { |
| xstrfmtcatat(log, &at, "->%s", new); |
| op = "changing"; |
| } |
| } |
| |
| log_flag(CONMGR, "%s->%s: [%s] %s polling:%s%s", |
| caller, XSTRINGIFY(con_set_polling), con->name, op, |
| pollctl_type_to_string(type), (log ? log : "")); |
| |
| xfree(log); |
| } |
| |
| static void _on_change_polling_fail(conmgr_fd_t *con, int rc, |
| pollctl_fd_type_t old_type, |
| pollctl_fd_type_t new_type, |
| const char *fd_name, const int fd, |
| pollctl_fd_type_t *dst, const char *caller) |
| { |
| error("%s->%s: [%s] closing connection after change polling %s->%s for %s=%d failed: %s", |
| caller, __func__, con->name, pollctl_type_to_string(old_type), |
| pollctl_type_to_string(new_type), fd_name, fd, |
| slurm_strerror(rc)); |
| |
| if (rc == EBADF) { |
| /* Remove defunct FD immediately */ |
| if (con->input_fd == fd) { |
| con->input_fd = -1; |
| con->polling_input_fd = PCTL_TYPE_UNSUPPORTED; |
| con_unset_flag(con, FLAG_CAN_READ); |
| con_set_flag(con, FLAG_READ_EOF); |
| } |
| |
| if (con->output_fd == fd) { |
| con->output_fd = -1; |
| con->polling_output_fd = PCTL_TYPE_UNSUPPORTED; |
| con_unset_flag(con, FLAG_CAN_WRITE); |
| con_unset_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER); |
| } |
| } else { |
| /* Attempt graceful closing of connection */ |
| *dst = PCTL_TYPE_UNSUPPORTED; |
| } |
| |
| close_con(true, con); |
| close_con_output(true, con); |
| } |
| |
| static void _on_change_polling_rc(conmgr_fd_t *con, int rc, |
| pollctl_fd_type_t old_type, |
| pollctl_fd_type_t new_type, bool input, |
| const char *caller) |
| { |
| const char *fd_name = (input ? "input_fd" : "output_fd"); |
| const int fd = (input ? con->input_fd : con->output_fd); |
| pollctl_fd_type_t *dst = |
| (input ? &con->polling_input_fd : &con->polling_output_fd); |
| |
| switch (rc) { |
| case EEXIST: |
| /* |
| * poll() is already monitoring this file descriptor but conmgr |
| * didn't ask for this poll()ing. conmgr has no idea what mode |
| * poll() is already monitoring, so instead change to relink to |
| * to set the correct mode. |
| */ |
| |
| log_flag(CONMGR, "%s->%s: [%s] forcing changed polling %s->%s for %s=%d", |
| caller, __func__, con->name, |
| pollctl_type_to_string(old_type), |
| pollctl_type_to_string(new_type), fd_name, fd); |
| |
| if ((rc = pollctl_relink_fd(fd, new_type, con->name, __func__))) |
| _on_change_polling_fail(con, rc, old_type, new_type, |
| fd_name, fd, dst, caller); |
| else |
| *dst = new_type; |
| |
| break; |
| case ENOENT: |
| log_flag(CONMGR, "%s->%s: [%s] ignoring request to change polling %s->%s for %s=%d", |
| caller, __func__, con->name, |
| pollctl_type_to_string(old_type), |
| pollctl_type_to_string(new_type), fd_name, fd); |
| |
| *dst = PCTL_TYPE_NONE; |
| break; |
| case EPERM: |
| log_flag(CONMGR, "%s->%s: [%s] polling %s->%s for %s=%d not supported by kernel", |
| caller, __func__, con->name, |
| pollctl_type_to_string(old_type), |
| pollctl_type_to_string(new_type), fd_name, fd); |
| |
| *dst = PCTL_TYPE_UNSUPPORTED; |
| break; |
| default: |
| _on_change_polling_fail(con, rc, old_type, new_type, fd_name, |
| fd, dst, caller); |
| break; |
| } |
| } |
| |
| extern void con_set_polling(conmgr_fd_t *con, pollctl_fd_type_t type, |
| const char *caller) |
| { |
| int has_in, has_out, in, out, is_same; |
| int rc_in = SLURM_SUCCESS, rc_out = SLURM_SUCCESS; |
| pollctl_fd_type_t in_type = PCTL_TYPE_NONE, out_type = PCTL_TYPE_NONE; |
| |
| _validate_pctl_type(type); |
| _validate_pctl_type(con->polling_input_fd); |
| _validate_pctl_type(con->polling_output_fd); |
| |
| in = con->input_fd; |
| has_in = (in >= 0); |
| out = con->output_fd; |
| has_out = (out >= 0); |
| is_same = (con->input_fd == con->output_fd); |
| |
| if (!has_in && !has_out) { |
| xassert(con->polling_input_fd == PCTL_TYPE_NONE); |
| xassert(con->polling_output_fd == PCTL_TYPE_NONE); |
| xassert(type == PCTL_TYPE_NONE); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: skipping connection flags=%s", |
| __func__, flags); |
| xfree(flags); |
| } |
| |
| return; |
| } |
| |
| /* |
| * Map type to type per in/out. The in/out types are initialized to |
| * PCTL_TYPE_NONE above. |
| */ |
| switch (type) { |
| case PCTL_TYPE_UNSUPPORTED: |
| fatal_abort("should never happen"); |
| case PCTL_TYPE_NONE: |
| break; |
| case PCTL_TYPE_CONNECTED: |
| if (is_same) { |
| in_type = PCTL_TYPE_CONNECTED; |
| } else { |
| in_type = PCTL_TYPE_CONNECTED; |
| out_type = PCTL_TYPE_CONNECTED; |
| } |
| break; |
| case PCTL_TYPE_READ_ONLY: |
| in_type = PCTL_TYPE_READ_ONLY; |
| break; |
| case PCTL_TYPE_READ_WRITE: |
| if (is_same) { |
| in_type = PCTL_TYPE_READ_WRITE; |
| } else { |
| in_type = PCTL_TYPE_READ_ONLY; |
| out_type = PCTL_TYPE_WRITE_ONLY; |
| } |
| break; |
| case PCTL_TYPE_WRITE_ONLY: |
| if (is_same) { |
| in_type = PCTL_TYPE_WRITE_ONLY; |
| } else { |
| out_type = PCTL_TYPE_WRITE_ONLY; |
| } |
| break; |
| case PCTL_TYPE_LISTEN: |
| xassert(con_flag(con, FLAG_IS_LISTEN)); |
| in_type = PCTL_TYPE_LISTEN; |
| break; |
| case PCTL_TYPE_INVALID: |
| case PCTL_TYPE_INVALID_MAX: |
| fatal_abort("should never execute"); |
| } |
| |
| if (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED) |
| out_type = PCTL_TYPE_UNSUPPORTED; |
| if (con->polling_input_fd == PCTL_TYPE_UNSUPPORTED) |
| in_type = PCTL_TYPE_UNSUPPORTED; |
| |
| _log_set_polling(con, has_in, has_out, type, in_type, out_type, caller); |
| |
| if (is_same) { |
| /* same never link output_fd */ |
| xassert((con->polling_output_fd == PCTL_TYPE_NONE) || |
| (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED)); |
| |
| rc_in = _set_fd_polling(in, &con->polling_input_fd, in_type, |
| con->name, caller); |
| } else { |
| if (has_in) |
| rc_in = _set_fd_polling(in, &con->polling_input_fd, |
| in_type, con->name, caller); |
| if (has_out) |
| rc_out = _set_fd_polling(out, &con->polling_output_fd, |
| out_type, con->name, caller); |
| } |
| |
| if (rc_in) |
| _on_change_polling_rc(con, rc_in, con->polling_input_fd, |
| in_type, true, caller); |
| |
| if (rc_out) |
| _on_change_polling_rc(con, rc_out, con->polling_output_fd, |
| out_type, false, caller); |
| } |
| |
| extern int conmgr_queue_extract_con_fd(conmgr_fd_t *con, |
| conmgr_extract_fd_func_t func, |
| const char *func_name, |
| void *func_arg) |
| { |
| int rc = SLURM_ERROR; |
| |
| xassert(con); |
| xassert(func); |
| if (!con || !func) |
| return EINVAL; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (con->extract) { |
| rc = EEXIST; |
| } else { |
| extract_fd_t *extract = xmalloc_nz(sizeof(*extract)); |
| *extract = (extract_fd_t) { |
| .magic = MAGIC_EXTRACT_FD, |
| .func = func, |
| .func_name = func_name, |
| .func_arg = func_arg, |
| .input_fd = -1, |
| .output_fd = -1, |
| }; |
| |
| xassert(!con->extract); |
| con->extract = extract; |
| |
| /* Disable all polling for this connection */ |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| |
| /* wake up watch to finish extraction */ |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| rc = SLURM_SUCCESS; |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| static void _free_extract(extract_fd_t **extract_ptr) |
| { |
| extract_fd_t *extract = NULL; |
| SWAP(*extract_ptr, extract); |
| |
| xassert(extract->magic == MAGIC_EXTRACT_FD); |
| extract->magic = ~MAGIC_EXTRACT_FD; |
| xfree(extract); |
| } |
| |
| static void _wrap_on_extract(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| extract_fd_t *extract = arg; |
| xassert(extract->magic == MAGIC_EXTRACT_FD); |
| |
| log_flag(CONMGR, "%s: calling %s() input_fd=%d output_fd=%d arg=0x%"PRIxPTR, |
| __func__, extract->func_name, extract->input_fd, |
| extract->output_fd, (uintptr_t) extract->func_arg); |
| |
| extract->func(conmgr_args, extract->input_fd, extract->output_fd, |
| extract->tls_conn, extract->func_arg); |
| |
| _free_extract(&extract); |
| |
| /* wake up watch() to cleanup connection */ |
| slurm_mutex_lock(&mgr.mutex); |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| /* caller must hold mgr.mutex lock */ |
| extern void extract_con_fd(conmgr_fd_t *con) |
| { |
| extract_fd_t *extract = NULL; |
| |
| SWAP(extract, con->extract); |
| xassert(extract); |
| xassert(extract->magic == MAGIC_EXTRACT_FD); |
| |
| /* Polling should already be disabled */ |
| xassert((con->polling_input_fd == PCTL_TYPE_NONE) || |
| (con->polling_input_fd == PCTL_TYPE_UNSUPPORTED)); |
| xassert((con->polling_output_fd == PCTL_TYPE_NONE) || |
| (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED)); |
| |
| /* can't extract safely when work running or not connected */ |
| xassert(!con_flag(con, FLAG_WORK_ACTIVE)); |
| xassert(con_flag(con, FLAG_IS_CONNECTED)); |
| xassert(!(con_flag(con, FLAG_TLS_SERVER) || |
| con_flag(con, FLAG_TLS_CLIENT)) || |
| con_flag(con, FLAG_IS_TLS_CONNECTED)); |
| xassert(!con_flag(con, FLAG_WAIT_ON_FINGERPRINT)); |
| xassert(!con_flag(con, FLAG_TLS_WAIT_ON_CLOSE)); |
| xassert(!con_flag(con, FLAG_WAIT_ON_FINISH)); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: extracting input_fd=%d output_fd=%d func=%s() flags=%s", |
| __func__, con->input_fd, con->output_fd, |
| extract->func_name, flags); |
| xfree(flags); |
| } |
| |
| /* clear all polling states */ |
| con_set_flag(con, FLAG_READ_EOF); |
| con_unset_flag(con, FLAG_CAN_READ); |
| con_unset_flag(con, FLAG_CAN_WRITE); |
| con_unset_flag(con, FLAG_ON_DATA_TRIED); |
| |
| /* assert input/outputs are empty */ |
| xassert(!con->tls_out || list_is_empty(con->out)); |
| xassert(!con->tls_in || !get_buf_offset(con->tls_in)); |
| xassert(list_is_empty(con->out)); |
| xassert(!get_buf_offset(con->in)); |
| |
| /* Extract TLS state (or fail) */ |
| if (con->tls && tls_extract(con, extract)) { |
| _free_extract(&extract); |
| return; |
| } |
| |
| /* |
| * take the file descriptors, replacing the file descriptors in |
| * con with invalid values initialized in conmgr_queue_extract_con_fd(). |
| */ |
| SWAP(extract->input_fd, con->input_fd); |
| SWAP(extract->output_fd, con->output_fd); |
| |
| /* |
| * Queue up work but not against the connection as we want watch() to |
| * cleanup the connection. |
| */ |
| add_work_fifo(true, _wrap_on_extract, extract); |
| } |
| |
| static int _unquiesce_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (!con_flag(con, FLAG_QUIESCE)) |
| return SLURM_SUCCESS; |
| |
| con_unset_flag(con, FLAG_QUIESCE); |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: unquiesced connection flags=%s", |
| __func__, flags); |
| xfree(flags); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern int conmgr_unquiesce_fd(conmgr_fd_t *con) |
| { |
| int rc; |
| |
| if (!con) |
| return EINVAL; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| rc = _unquiesce_fd(con); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| static int _quiesce_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (con_flag(con, FLAG_QUIESCE)) |
| return SLURM_SUCCESS; |
| |
| con_set_flag(con, FLAG_QUIESCE); |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: quiesced connection flags=%s", |
| __func__, flags); |
| xfree(flags); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern int conmgr_quiesce_fd(conmgr_fd_t *con) |
| { |
| int rc; |
| |
| if (!con) |
| return EINVAL; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| rc = _quiesce_fd(con); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| extern bool conmgr_fd_is_output_open(conmgr_fd_t *con) |
| { |
| bool open; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| open = (con->output_fd >= 0); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return open; |
| } |
| |
| extern conmgr_fd_ref_t *fd_new_ref(conmgr_fd_t *con) |
| { |
| conmgr_fd_ref_t *ref; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| ref = xmalloc(sizeof(*ref)); |
| *ref = (conmgr_fd_ref_t) { |
| .magic = MAGIC_CON_MGR_FD_REF, |
| .con = con, |
| }; |
| |
| con->refs++; |
| xassert(con->refs < INT_MAX); |
| xassert(con->refs > 0); |
| |
| return ref; |
| } |
| |
| extern conmgr_fd_ref_t *conmgr_fd_new_ref(conmgr_fd_t *con) |
| { |
| conmgr_fd_ref_t *ref = NULL; |
| |
| if (!con) |
| fatal_abort("con must not be null"); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| ref = fd_new_ref(con); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return ref; |
| } |
| |
| extern conmgr_fd_ref_t *conmgr_con_link(conmgr_fd_ref_t *con) |
| { |
| conmgr_fd_ref_t *ref = NULL; |
| |
| xassert(con); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| ref = fd_new_ref(con->con); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return ref; |
| } |
| |
| extern void fd_free_ref(conmgr_fd_ref_t **ref_ptr) |
| { |
| conmgr_fd_ref_t *ref = *ref_ptr; |
| conmgr_fd_t *con = ref->con; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| |
| con->refs--; |
| xassert(con->refs < INT_MAX); |
| xassert(con->refs >= 0); |
| |
| ref->magic = ~MAGIC_CON_MGR_FD_REF; |
| xfree((*ref_ptr)); |
| } |
| |
| extern void conmgr_fd_free_ref(conmgr_fd_ref_t **ref_ptr) |
| { |
| |
| if (!ref_ptr) |
| fatal_abort("ref_ptr must not be null"); |
| |
| /* check if already released */ |
| if (!*ref_ptr) |
| return; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| fd_free_ref(ref_ptr); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern conmgr_fd_t *fd_get_ref(conmgr_fd_ref_t *ref) |
| { |
| if (!ref) |
| return NULL; |
| |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| xassert(ref->con); |
| |
| return ref->con; |
| } |
| |
| extern conmgr_fd_t *conmgr_fd_get_ref(conmgr_fd_ref_t *ref) |
| { |
| conmgr_fd_t *con = fd_get_ref(ref); |
| |
| #ifndef NDEBUG |
| if (con) { |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con->refs < INT_MAX); |
| xassert(con->refs > 0); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| #endif |
| |
| return con; |
| } |
| |
| extern bool conmgr_fd_is_tls(conmgr_fd_ref_t *ref) |
| { |
| conmgr_fd_t *con = fd_get_ref(ref); |
| bool tls = false; |
| |
| xassert(con); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| tls = (con_flag(con, FLAG_TLS_CLIENT) || |
| con_flag(con, FLAG_TLS_SERVER)); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return tls; |
| } |