| /*****************************************************************************\ |
| * watch.c - definitions for watch loop in connection manager |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #define _GNU_SOURCE |
| #include "config.h" |
| |
| #if HAVE_SYS_PRCTL_H |
| #include <sys/prctl.h> |
| #endif |
| |
| #include <limits.h> |
| #include <stdint.h> |
| #include <sys/un.h> |
| #include <sys/socket.h> |
| |
| #include "slurm/slurm.h" |
| #include "slurm/slurm_errno.h" |
| |
| #include "src/common/fd.h" |
| #include "src/common/list.h" |
| #include "src/common/macros.h" |
| #include "src/common/net.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_time.h" |
| #include "src/common/timers.h" |
| #include "src/common/xmalloc.h" |
| |
| #include "src/conmgr/conmgr.h" |
| #include "src/conmgr/delayed.h" |
| #include "src/conmgr/events.h" |
| #include "src/conmgr/mgr.h" |
| #include "src/conmgr/polling.h" |
| #include "src/conmgr/signals.h" |
| #include "src/conmgr/tls.h" |
| |
| /* Default watch() sleep to only 5 minutes */ |
| #define WATCH_DEFAULT_SLEEP \ |
| (timespec_t) \ |
| { \ |
| .tv_sec = 300, \ |
| } |
| #define CTIME_STR_LEN 72 |
| |
| typedef struct { |
| #define MAGIC_HANDLE_CONNECTION 0xaaaffb03 |
| int magic; /* MAGIC_HANDLE_CONNECTION */ |
| /* output of timespec_now() in _inspect_connections() */ |
| timespec_t time; |
| } handle_connection_args_t; |
| |
| static void _listen_accept(conmgr_callback_args_t conmgr_args, void *arg); |
| |
| static void _set_time(handle_connection_args_t *args) |
| { |
| if (!args->time.tv_sec) |
| args->time = timespec_now(); |
| } |
| |
| static bool _handle_time_limit(handle_connection_args_t *args, |
| const struct timespec timestamp, |
| const struct timespec limit, const char *what, |
| const char *name, const char *caller) |
| { |
| const struct timespec deadline = timespec_add(timestamp, limit); |
| bool after, change_max_sleep = false; |
| |
| _set_time(args); |
| |
| if (!(after = timespec_is_after(args->time, deadline))) |
| change_max_sleep = |
| (!mgr.watch_max_sleep.tv_sec || |
| timespec_is_after(mgr.watch_max_sleep, deadline)); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(limit, false, str, sizeof(str)); |
| |
| log_flag(CONMGR, "%s->%s: %s%s%s%s %s timeout %s %s", |
| caller, __func__, (name ? "[" : ""), name, |
| (name ? "] " : ""), |
| (change_max_sleep ? "updating watch() sleep" : |
| "evaluating"), |
| what, (after ? "triggered" : "ETA"), str); |
| } |
| |
| if (after) |
| return true; |
| |
| if (change_max_sleep) { |
| mgr.watch_max_sleep = deadline; |
| |
| /* Always wake up watch() as if deadline changed */ |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| } |
| |
| return false; |
| } |
| |
| static void _reset_watch_max_sleep(void) |
| { |
| const timespec_t now = timespec_now(); |
| |
| /* skip if the timeout hasn't elapsed yet */ |
| if (timespec_is_after(mgr.watch_max_sleep, now)) |
| return; |
| |
| /* timeout triggered and needs reset */ |
| mgr.watch_max_sleep = timespec_add(now, WATCH_DEFAULT_SLEEP); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.watch_max_sleep, true, str, sizeof(str)); |
| |
| log_flag(CONMGR, "%s: reset watch() sleep to %s", |
| __func__, str); |
| } |
| } |
| |
| static void _on_finish_wrapper(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| if (con_flag(con, FLAG_IS_LISTEN)) { |
| if (con->events->on_listen_finish) |
| con->events->on_listen_finish(con, arg); |
| } else if (con->events->on_finish) { |
| con->events->on_finish(con, arg); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con_unset_flag(con, FLAG_WAIT_ON_FINISH); |
| /* on_finish must free arg */ |
| con->arg = NULL; |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static void _on_write_complete_work(conmgr_callback_args_t conmgr_args, |
| void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (list_is_empty(con->write_complete_work)) { |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| log_flag(CONMGR, "%s: [%s] skipping with 0 write complete work pending", |
| __func__, con->name); |
| return; |
| } |
| |
| if ((con->polling_output_fd != PCTL_TYPE_UNSUPPORTED) && |
| ((con->output_fd >= 0) && !con_flag(con, FLAG_CAN_WRITE))) { |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| /* |
| * if FLAG_CAN_WRITE is not set, then kernel is telling us that |
| * the outgoing buffer hasn't been flushed yet |
| */ |
| log_flag(CONMGR, "%s: [%s] waiting for FLAG_CAN_WRITE", |
| __func__, con->name); |
| return; |
| } |
| |
| if ((con->output_fd >= 0) && |
| con_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER)) { |
| int rc = EINVAL; |
| int bytes = -1; |
| int output_fd = con->output_fd; |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| if (output_fd >= 0) |
| rc = fd_get_buffered_output_bytes(output_fd, &bytes, |
| con->name); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (rc) { |
| log_flag(CONMGR, "%s: [%s] unable to query output_fd[%d] outgoing buffer remaining: %s. Queuing pending %u write complete work", |
| __func__, con->name, output_fd, |
| slurm_strerror(rc), |
| list_count(con->write_complete_work)); |
| |
| con_unset_flag(con, FLAG_CAN_QUERY_OUTPUT_BUFFER); |
| } else if (bytes > 0) { |
| log_flag(CONMGR, "%s: [%s] output_fd[%d] has %d bytes in outgoing buffer remaining. Retrying in %us", |
| __func__, con->name, output_fd, bytes, |
| mgr.conf_delay_write_complete); |
| |
| /* Turn off Nagle while we wait for buffer to flush */ |
| if (con_flag(con, FLAG_IS_SOCKET) && |
| !con_flag(con, FLAG_TCP_NODELAY)) { |
| slurm_mutex_unlock(&mgr.mutex); |
| (void) net_set_nodelay(output_fd, true, |
| con->name); |
| slurm_mutex_lock(&mgr.mutex); |
| } |
| |
| add_work_con_delayed_fifo(true, con, |
| _on_write_complete_work, NULL, |
| mgr.conf_delay_write_complete, |
| 0); |
| slurm_mutex_unlock(&mgr.mutex); |
| return; |
| } else { |
| xassert(!bytes); |
| |
| /* Turn back on Nagle every time in case it got set */ |
| if (con_flag(con, FLAG_IS_SOCKET) && |
| !con_flag(con, FLAG_TCP_NODELAY)) { |
| slurm_mutex_unlock(&mgr.mutex); |
| (void) net_set_nodelay(output_fd, false, |
| con->name); |
| slurm_mutex_lock(&mgr.mutex); |
| } |
| |
| log_flag(CONMGR, "%s: [%s] output_fd[%d] has 0 bytes in outgoing buffer remaining. Queuing pending %u write complete work", |
| __func__, con->name, output_fd, |
| list_count(con->write_complete_work)); |
| } |
| } else { |
| log_flag(CONMGR, "%s: [%s] queuing pending %u write complete work", |
| __func__, con->name, |
| list_count(con->write_complete_work)); |
| } |
| |
| list_transfer(con->work, con->write_complete_work); |
| |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static void _update_mss(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| if (con_flag(con, FLAG_IS_SOCKET) && (con->output_fd != -1)) |
| con->mss = fd_get_maxmss(con->output_fd, con->name); |
| } |
| |
| static void _close_output_fd(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int output_fd = (uint64_t) arg; |
| int rc = SLURM_SUCCESS; |
| |
| xassert(output_fd >= 0); |
| xassert(output_fd < NO_VAL64); |
| |
| log_flag(CONMGR, "%s: [%s] closing connection output_fd=%d", |
| __func__, con->name, output_fd); |
| |
| /* |
| * From man 2 close: |
| * > A careful programmer who wants to know about I/O errors may precede |
| * > close() with a call to fsync(2) |
| * |
| * Avoid fsync() on pipe()s and chr devices per man page: |
| * > fd is bound to a special file (e.g., a pipe, FIFO, or socket) which |
| * > does not support synchronization. |
| */ |
| if (!con_flag(con, FLAG_IS_SOCKET) && !con_flag(con, FLAG_IS_FIFO) && |
| !con_flag(con, FLAG_IS_CHR) && (output_fd >= 0)) { |
| do { |
| if (fsync(output_fd)) { |
| rc = errno; |
| log_flag(CONMGR, "%s: [%s] unable to fsync(fd:%d): %s", |
| __func__, con->name, output_fd, |
| slurm_strerror(rc)); |
| |
| if (rc == EBADF) |
| output_fd = -1; |
| } |
| } while (rc == EINTR); |
| } |
| |
| if ((output_fd >= 0) && close(output_fd)) { |
| rc = errno; |
| log_flag(CONMGR, "%s: [%s] unable to close output fd:%d: %s", |
| __func__, con->name, output_fd, |
| slurm_strerror(rc)); |
| } |
| } |
| |
| static void _on_close_output_fd(conmgr_fd_t *con) |
| { |
| if (con->output_fd < 0) { |
| log_flag(CONMGR, "%s: [%s] skipping as output_fd already closed", |
| __func__, con->name); |
| return; |
| } |
| |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| |
| if (con->out) |
| list_flush(con->out); |
| |
| if (con->output_fd >= 0) |
| add_work_con_fifo(true, con, _close_output_fd, |
| ((void *) (uint64_t) con->output_fd)); |
| |
| con->output_fd = -1; |
| } |
| |
| extern void close_con_output(bool locked, conmgr_fd_t *con) |
| { |
| if (!locked) |
| slurm_mutex_lock(&mgr.mutex); |
| |
| _on_close_output_fd(con); |
| |
| if (!locked) |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static void _wrap_on_connect_timeout(conmgr_callback_args_t conmgr_args, |
| void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int rc; |
| |
| if (con->events->on_connect_timeout) |
| rc = con->events->on_connect_timeout(con, con->new_arg); |
| else |
| rc = SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT; |
| |
| if (rc) { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_connect_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] closing due to connect %s timeout failed: %s", |
| __func__, con->name, str, slurm_strerror(rc)); |
| } |
| close_con(false, con); |
| } else { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_connect_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] connect %s timeout resetting", |
| __func__, con->name, str); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con->last_read = timespec_now(); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| } |
| |
| static void _on_connect_timeout(handle_connection_args_t *args, |
| conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(args->magic == MAGIC_HANDLE_CONNECTION); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char time_str[CTIME_STR_LEN], total_str[CTIME_STR_LEN]; |
| |
| _set_time(args); |
| |
| timespec_ctime(timespec_diff_ns(con->last_read, |
| args->time).diff, false, |
| time_str, sizeof(time_str)); |
| timespec_ctime(mgr.conf_connect_timeout, false, total_str, |
| sizeof(total_str)); |
| |
| log_flag(CONMGR, "%s: [%s] connect timed out at %s/%s", |
| __func__, con->name, time_str, total_str); |
| } |
| |
| add_work_con_fifo(true, con, _wrap_on_connect_timeout, NULL); |
| } |
| |
| static void _wrap_on_write_timeout(conmgr_callback_args_t conmgr_args, |
| void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int rc; |
| |
| if (con->events->on_write_timeout) |
| rc = con->events->on_write_timeout(con, con->arg); |
| else |
| rc = SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT; |
| |
| if (rc) { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_write_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] closing due to write %s timeout failed: %s", |
| __func__, con->name, str, slurm_strerror(rc)); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| /* Close read and write file descriptors */ |
| close_con(true, con); |
| _on_close_output_fd(con); |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| } else { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_write_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] write %s timeout resetting", |
| __func__, con->name, str); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con->last_write = timespec_now(); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| } |
| |
| static void _on_write_timeout(handle_connection_args_t *args, conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(args->magic == MAGIC_HANDLE_CONNECTION); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char time_str[CTIME_STR_LEN], total_str[CTIME_STR_LEN]; |
| |
| _set_time(args); |
| |
| timespec_ctime(timespec_diff_ns(con->last_write, |
| args->time).diff, false, |
| time_str, sizeof(time_str)); |
| timespec_ctime(mgr.conf_write_timeout, false, total_str, |
| sizeof(total_str)); |
| |
| log_flag(CONMGR, "%s: [%s] write timed out at %s/%s", |
| __func__, con->name, time_str, total_str); |
| } |
| |
| add_work_con_fifo(true, con, _wrap_on_write_timeout, NULL); |
| } |
| |
| static void _wrap_on_read_timeout(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int rc; |
| |
| if (con->events->on_read_timeout) |
| rc = con->events->on_read_timeout(con, con->arg); |
| else |
| rc = SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT; |
| |
| if (rc) { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_read_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] closing due to read %s timeout failed: %s", |
| __func__, con->name, str, slurm_strerror(rc)); |
| } |
| |
| close_con(false, con); |
| } else { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char str[CTIME_STR_LEN]; |
| |
| timespec_ctime(mgr.conf_read_timeout, false, str, |
| sizeof(str)); |
| |
| log_flag(CONMGR, "%s: [%s] read %s timeout resetting", |
| __func__, con->name, str); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con->last_read = timespec_now(); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| } |
| |
| static void _on_read_timeout(handle_connection_args_t *args, conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(args->magic == MAGIC_HANDLE_CONNECTION); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char time_str[CTIME_STR_LEN], total_str[CTIME_STR_LEN]; |
| |
| _set_time(args); |
| |
| timespec_ctime(timespec_diff_ns(con->last_read, args->time).diff, |
| false, time_str, sizeof(time_str)); |
| timespec_ctime(mgr.conf_read_timeout, false, total_str, |
| sizeof(total_str)); |
| |
| log_flag(CONMGR, "%s: [%s] read timed out at %s/%s", |
| __func__, con->name, time_str, total_str); |
| } |
| |
| add_work_con_fifo(true, con, _wrap_on_read_timeout, NULL); |
| } |
| |
| /* Caller must hold mgr->mutex lock */ |
| static bool _is_accept_deferred(void) |
| { |
| return (list_count(mgr.connections) >= mgr.max_connections); |
| } |
| |
| /* caller must hold mgr->mutex lock */ |
| extern void queue_on_connection(conmgr_fd_t *con) |
| { |
| /* disable polling until on_connect() is done */ |
| con_set_polling(con, PCTL_TYPE_CONNECTED, __func__); |
| |
| add_work_con_fifo(true, con, wrap_on_connection, con); |
| |
| log_flag(CONMGR, "%s: [%s] Fully connected. Queuing on_connect() callback.", |
| __func__, con->name); |
| } |
| |
| static int _handle_connection_wait_write(conmgr_fd_t *con, |
| handle_connection_args_t *args, |
| list_t *out) |
| { |
| /* |
| * Only monitor for when connection is ready for writes |
| * as there is no point reading until the write is |
| * complete since it will be ignored. |
| */ |
| con_set_polling(con, PCTL_TYPE_WRITE_ONLY, __func__); |
| |
| if (con_flag(con, FLAG_WATCH_WRITE_TIMEOUT) && |
| _handle_time_limit(args, con->last_write, mgr.conf_write_timeout, |
| "write", con->name, __func__)) { |
| _on_write_timeout(args, con); |
| return 0; |
| } |
| |
| /* must wait until poll allows write of this socket */ |
| log_flag(CONMGR, "%s: [%s] waiting for %u writes", |
| __func__, con->name, list_count(out)); |
| |
| return 0; |
| } |
| |
| static int _handle_connection_write(conmgr_fd_t *con, |
| handle_connection_args_t *args) |
| { |
| if (!con_flag(con, FLAG_CAN_WRITE) && |
| (con->polling_output_fd != PCTL_TYPE_UNSUPPORTED)) |
| return _handle_connection_wait_write(con, args, con->out); |
| |
| log_flag(CONMGR, "%s: [%s] %u pending writes", |
| __func__, con->name, list_count(con->out)); |
| add_work_con_fifo(true, con, handle_write, con); |
| return 0; |
| } |
| |
| extern void _handle_fingerprint(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int match = EINVAL; |
| bool bail = false; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| xassert(con_flag(con, FLAG_IS_CONNECTED)); |
| xassert(!con_flag(con, FLAG_IS_TLS_CONNECTED)); |
| |
| if (con_flag(con, FLAG_READ_EOF) || con_flag(con, FLAG_CAN_READ)) |
| bail = true; |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| if (bail) { |
| log_flag(CONMGR, "%s: [%s] skipping fingerprint match", |
| __func__, con->name); |
| return; |
| } |
| |
| match = con->events->on_fingerprint(con, get_buf_data(con->in), |
| get_buf_offset(con->in), con->arg); |
| |
| if (match == SLURM_SUCCESS) { |
| log_flag(CONMGR, "%s: [%s] fingerprint match completed", |
| __func__, con->name); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con_unset_flag(con, FLAG_WAIT_ON_FINGERPRINT); |
| con_unset_flag(con, FLAG_ON_DATA_TRIED); |
| |
| if (con->events->on_connection && |
| !con_flag(con, FLAG_TLS_SERVER)) |
| queue_on_connection(con); |
| slurm_mutex_unlock(&mgr.mutex); |
| } else if (match == EWOULDBLOCK) { |
| log_flag(CONMGR, "%s: [%s] waiting for more bytes for fingerprint", |
| __func__, con->name); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| con_set_flag(con, FLAG_ON_DATA_TRIED); |
| slurm_mutex_unlock(&mgr.mutex); |
| } else { |
| log_flag(CONMGR, "%s: [%s] fingerprint failed: %s", |
| __func__, con->name, slurm_strerror(match)); |
| |
| close_con(false, con); |
| } |
| } |
| |
| /* |
| * handle connection states and apply actions required. |
| * mgr mutex must be locked. |
| * |
| * RET 1 to remove or 0 to remain in list |
| */ |
| static int _handle_connection(conmgr_fd_t *con, handle_connection_args_t *args) |
| { |
| int count; |
| const bool is_tls = (con_flag(con, FLAG_TLS_SERVER) || |
| con_flag(con, FLAG_TLS_CLIENT)); |
| handle_connection_args_t local_args = { |
| .magic = MAGIC_HANDLE_CONNECTION, |
| }; |
| |
| if (!args) { |
| /* |
| * Always have args pointer populated to avoid breaking timeout |
| * logic |
| */ |
| args = &local_args; |
| } |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(args->magic == MAGIC_HANDLE_CONNECTION); |
| |
| /* connection may have a running thread, do nothing */ |
| if (con_flag(con, FLAG_WORK_ACTIVE)) { |
| log_flag(CONMGR, "%s: [%s] connection has work to do", |
| __func__, con->name); |
| return 0; |
| } |
| |
| if (((con->input_fd < 0) && (con->output_fd < 0)) || |
| con_flag(con, FLAG_TLS_WAIT_ON_CLOSE)) { |
| xassert(con_flag(con, FLAG_READ_EOF)); |
| /* connection already closed */ |
| } else if (con_flag(con, FLAG_IS_CONNECTED)) { |
| /* continue on to follow other checks */ |
| } else if (!con_flag(con, FLAG_IS_SOCKET) || |
| con_flag(con, FLAG_CAN_READ) || |
| con_flag(con, FLAG_CAN_WRITE) || |
| con_flag(con, FLAG_IS_LISTEN)) { |
| /* |
| * Only sockets need special handling to know when they are |
| * connected. Enqueue on_connect callback if defined. |
| */ |
| con_set_flag(con, FLAG_IS_CONNECTED); |
| |
| if (con_flag(con, FLAG_WATCH_READ_TIMEOUT)) { |
| _set_time(args); |
| con->last_read = args->time; |
| } |
| |
| if (con_flag(con, FLAG_IS_SOCKET) && (con->output_fd != -1)) { |
| /* Query outbound MSS now kernel should know the answer */ |
| add_work_con_fifo(true, con, _update_mss, NULL); |
| } |
| |
| if (con_flag(con, FLAG_IS_LISTEN)) { |
| if (con->events->on_listen_connect) { |
| /* disable polling until on_listen_connect() */ |
| con_set_polling(con, PCTL_TYPE_CONNECTED, |
| __func__); |
| |
| add_work_con_fifo(true, con, wrap_on_connection, |
| con); |
| |
| log_flag(CONMGR, "%s: [%s] Fully connected. Queuing on_listen_connect() callback.", |
| __func__, con->name); |
| return 0; |
| } else { |
| /* follow normal checks */ |
| } |
| } else if (con->events->on_connection && !is_tls && |
| !con_flag(con, FLAG_WAIT_ON_FINGERPRINT)) { |
| queue_on_connection(con); |
| return 0; |
| } else { |
| /* |
| * Only watch for incoming data as there can't be any |
| * outgoing data yet |
| */ |
| xassert(list_is_empty(con->out)); |
| |
| /* |
| * Continue on to follow other checks as nothing special |
| * needs to be done |
| */ |
| } |
| } else { |
| xassert(!con_flag(con, FLAG_CAN_READ) && |
| !con_flag(con, FLAG_CAN_WRITE)); |
| |
| /* |
| * Need to wait for connection to establish or fail. |
| * |
| * From man 2 connect: |
| * |
| * It is possible to select(2) or poll(2) for completion by |
| * selecting the socket for writing. After select(2) indicates |
| * writability, use getsockopt(2) to read the SO_ERROR option at |
| * level SOL_SOCKET to determine whether connect() completed |
| * success‐ fully (SO_ERROR is zero) or unsuccessfully |
| */ |
| con_set_polling(con, PCTL_TYPE_READ_WRITE, __func__); |
| |
| if (con_flag(con, FLAG_WATCH_CONNECT_TIMEOUT) && |
| _handle_time_limit(args, con->last_read, |
| mgr.conf_connect_timeout, "connect", |
| con->name, __func__)) { |
| _on_connect_timeout(args, con); |
| return 0; |
| } |
| |
| log_flag(CONMGR, "%s: [%s] waiting for connection to establish", |
| __func__, con->name); |
| return 0; |
| } |
| |
| /* always do work first once connected */ |
| if ((count = list_count(con->work))) { |
| work_t *work = list_pop(con->work); |
| |
| log_flag(CONMGR, "%s: [%s] queuing pending work: %u total", |
| __func__, con->name, count); |
| |
| work->status = CONMGR_WORK_STATUS_RUN; |
| /* unset by _wrap_con_work() */ |
| xassert(!con_flag(con, FLAG_WORK_ACTIVE)); |
| con_set_flag(con, FLAG_WORK_ACTIVE); |
| |
| handle_work(true, work); |
| return 0; |
| } |
| |
| /* |
| * Skip all monitoring when FLAG_QUIESCE set but only if there is |
| * at least 1 file descriptor to avoid stopping a closed connection. |
| */ |
| if (con_flag(con, FLAG_QUIESCE) && ((con->input_fd >= 0) || |
| (con->output_fd >= 0))) { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: connection is quiesced flags=%s", |
| __func__, flags); |
| xfree(flags); |
| } |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| return 0; |
| } |
| |
| if (con_flag(con, FLAG_TLS_WAIT_ON_CLOSE)) { |
| log_flag(CONMGR, "%s: [%s] waiting on delayed close of TLS connection", |
| __func__, con->name); |
| return 0; |
| } |
| |
| if (con->extract && !con_flag(con, FLAG_WAIT_ON_FINGERPRINT) && |
| (!is_tls || (con_flag(con, FLAG_IS_TLS_CONNECTED) && |
| !con_flag(con, FLAG_WAIT_ON_FINISH))) && |
| !con_flag(con, FLAG_QUIESCE) && list_is_empty(con->out) && |
| (!con->tls_out || list_is_empty(con->tls_out)) && |
| (!con->tls_in || !get_buf_offset(con->tls_in)) && |
| !con_flag(con, FLAG_READ_EOF) && |
| !con_flag(con, FLAG_WAIT_ON_FINISH) && !get_buf_offset(con->in)) { |
| /* |
| * extraction of file descriptors requested |
| * but only after starting TLS if needed |
| */ |
| extract_con_fd(con); |
| return 0; |
| } |
| |
| /* handle out going data */ |
| if (!con_flag(con, FLAG_IS_LISTEN) && (con->output_fd >= 0) && |
| con->tls_out && !list_is_empty(con->tls_out)) { |
| if (con_flag(con, FLAG_CAN_WRITE) || |
| (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED)) { |
| log_flag(CONMGR, "%s: [%s] %u pending TLS writes", |
| __func__, con->name, list_count(con->tls_out)); |
| add_work_con_fifo(true, con, tls_handle_write, con); |
| return 0; |
| } else { |
| return _handle_connection_wait_write(con, args, |
| con->tls_out); |
| } |
| } |
| |
| if (!con_flag(con, FLAG_IS_LISTEN) && (con->output_fd >= 0) && |
| !list_is_empty(con->out)) { |
| if (con->tls) { |
| if (con_flag(con, FLAG_IS_TLS_CONNECTED)) { |
| log_flag(CONMGR, "%s: [%s] %u pending writes to encrypt", |
| __func__, con->name, |
| list_count(con->out)); |
| add_work_con_fifo(true, con, tls_handle_encrypt, |
| con); |
| return 0; |
| } else { |
| log_flag(CONMGR, "%s: [%s] deferring %u pending writes to encrypt until TLS connected", |
| __func__, con->name, |
| list_count(con->out)); |
| } |
| } else { |
| return _handle_connection_write(con, args); |
| } |
| } |
| |
| if (!con_flag(con, FLAG_IS_LISTEN) && |
| (count = list_count(con->write_complete_work))) { |
| bool queue_work = false; |
| |
| xassert(!con_flag(con, FLAG_WAIT_ON_FINGERPRINT)); |
| xassert(!is_tls || con_flag(con, FLAG_IS_TLS_CONNECTED)); |
| |
| if (con->output_fd < 0) { |
| /* output_fd is already closed so no more write()s */ |
| queue_work = true; |
| } else if (con->polling_output_fd == PCTL_TYPE_UNSUPPORTED) { |
| /* output_fd can't be polled for CAN_WRITE */ |
| queue_work = true; |
| } else if ((con->polling_output_fd == PCTL_TYPE_NONE) && |
| con_flag(con, FLAG_CAN_WRITE)) { |
| /* poll() already marked connection as CAN_WRITE */ |
| queue_work = true; |
| } |
| |
| if (queue_work) { |
| log_flag(CONMGR, "%s: [%s] waiting for %u write_complete work", |
| __func__, con->name, count); |
| add_work_con_fifo(true, con, _on_write_complete_work, |
| NULL); |
| } else { |
| log_flag(CONMGR, "%s: [%s] waiting for FLAG_CAN_WRITE to queue %u write_complete work", |
| __func__, con->name, count); |
| |
| /* |
| * Always unset FLAG_CAN_WRITE if we are not queuing up |
| * _on_write_complete_work() as we want to trigger on |
| * the next edge activation of FLAG_CAN_WRITE to avoid |
| * wasting time calling ioctl(TIOCOUTQ) when nothing has |
| * changed |
| */ |
| con_unset_flag(con, FLAG_CAN_WRITE); |
| |
| /* |
| * Existing polling either did not set FLAG_CAN_WRITE or |
| * the polling type was not monitoring for |
| * FLAG_CAN_WRITE. output_fd is still valid and we need |
| * to change the polling to monitor outbound buffer |
| * (indirectly) to queue up _on_write_complete_work() on |
| * when FLAG_CAN_WRITE is set. |
| */ |
| con_set_polling(con, PCTL_TYPE_READ_WRITE, __func__); |
| } |
| |
| return 0; |
| } |
| |
| /* check if there is new connection waiting */ |
| if (con_flag(con, FLAG_IS_LISTEN) && !con_flag(con, FLAG_READ_EOF) && |
| con_flag(con, FLAG_CAN_READ)) { |
| /* disable polling until _listen_accept() completes */ |
| con_set_polling(con, PCTL_TYPE_CONNECTED, __func__); |
| con_unset_flag(con, FLAG_CAN_READ); |
| |
| if (_is_accept_deferred()) { |
| warning("%s: [%s] Deferring incoming connection due to %d/%d connections", |
| __func__, con->name, |
| list_count(mgr.connections), |
| mgr.max_connections); |
| } else { |
| log_flag(CONMGR, "%s: [%s] listener has incoming connection", |
| __func__, con->name); |
| add_work_con_fifo(true, con, _listen_accept, con); |
| } |
| return 0; |
| } |
| |
| /* read as much data as possible before processing */ |
| if (!con_flag(con, FLAG_IS_LISTEN) && !con_flag(con, FLAG_READ_EOF) && |
| (con_flag(con, FLAG_CAN_READ) || |
| (con->polling_input_fd == PCTL_TYPE_UNSUPPORTED))) { |
| /* |
| * reset if data has already been tried if about to read |
| * data |
| */ |
| con_unset_flag(con, FLAG_ON_DATA_TRIED); |
| |
| if (con->tls_in) { |
| log_flag(CONMGR, "%s: [%s] queuing TLS read", |
| __func__, con->name); |
| add_work_con_fifo(true, con, tls_handle_read, con); |
| } else { |
| log_flag(CONMGR, "%s: [%s] queuing read", __func__, con->name); |
| add_work_con_fifo(true, con, handle_read, con); |
| } |
| return 0; |
| } |
| |
| if (is_tls && !con_flag(con, FLAG_IS_LISTEN) && |
| !con_flag(con, FLAG_ON_DATA_TRIED) && |
| !con_flag(con, FLAG_TLS_WAIT_ON_CLOSE) |
| && con_flag(con, FLAG_IS_TLS_CONNECTED) && con->tls_in && |
| get_buf_offset(con->tls_in)) { |
| log_flag(CONMGR, "%s: [%s] queuing TLS decrypt of %u bytes", |
| __func__, con->name, get_buf_offset(con->tls_in)); |
| add_work_con_fifo(true, con, tls_handle_decrypt, con); |
| return 0; |
| } |
| |
| if (!con_flag(con, FLAG_IS_LISTEN) && is_tls && |
| !con_flag(con, FLAG_IS_TLS_CONNECTED) && |
| !con_flag(con, FLAG_ON_DATA_TRIED) && |
| !con_flag(con, FLAG_READ_EOF)) { |
| xassert(!con_flag(con, FLAG_WAIT_ON_FINGERPRINT)); |
| |
| /* |
| * TLS handshake must happen attempting to process any of the |
| * incoming data but unwrapped data must flow both directions |
| * until negotiations are complete |
| */ |
| log_flag(CONMGR, "%s: [%s] queuing up TLS handshake", |
| __func__, con->name); |
| add_work_con_fifo(true, con, tls_create, NULL); |
| return 0; |
| } |
| |
| /* handle already read data */ |
| if (!con_flag(con, FLAG_IS_LISTEN) && get_buf_offset(con->in) && |
| !con_flag(con, FLAG_ON_DATA_TRIED)) { |
| xassert(!is_tls || con_flag(con, FLAG_IS_TLS_CONNECTED)); |
| |
| if (con_flag(con, FLAG_WAIT_ON_FINGERPRINT)) { |
| log_flag(CONMGR, "%s: [%s] checking for fingerprint in %u bytes", |
| __func__, con->name, get_buf_offset(con->in)); |
| add_work_con_fifo(true, con, _handle_fingerprint, con); |
| } else { |
| log_flag(CONMGR, "%s: [%s] need to process %u bytes", |
| __func__, con->name, get_buf_offset(con->in)); |
| add_work_con_fifo(true, con, wrap_on_data, con); |
| } |
| return 0; |
| } |
| |
| if (!con_flag(con, FLAG_READ_EOF)) { |
| xassert(con->input_fd != -1); |
| |
| /* must wait until poll allows read from this socket */ |
| if (con_flag(con, FLAG_IS_LISTEN)) { |
| if (_is_accept_deferred()) { |
| warning("%s: [%s] Deferring polling for new connections due to %d/%d connections", |
| __func__, con->name, |
| list_count(mgr.connections), |
| mgr.max_connections); |
| |
| con_set_polling(con, PCTL_TYPE_CONNECTED, |
| __func__); |
| } else { |
| con_set_polling(con, PCTL_TYPE_LISTEN, |
| __func__); |
| log_flag(CONMGR, "%s: [%s] waiting for new connection", |
| __func__, con->name); |
| } |
| } else { |
| con_set_polling(con, PCTL_TYPE_READ_ONLY, __func__); |
| |
| if (con_flag(con, CON_FLAG_WATCH_READ_TIMEOUT) && |
| list_is_empty(con->write_complete_work) && |
| _handle_time_limit(args, con->last_read, |
| mgr.conf_read_timeout, "read", |
| con->name, __func__)) { |
| _on_read_timeout(args, con); |
| return 0; |
| } |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: [%s] waiting for events: pending_read=%u pending_writes=%u pending_tls_read=%d pending_tls_writes=%d work=%d write_complete_work=%d flags=%s", |
| __func__, con->name, |
| get_buf_offset(con->in), |
| list_count(con->out), |
| (con->tls_in ? |
| get_buf_offset(con->tls_in) : -1), |
| (con->tls_out ? |
| list_count(con->tls_out) : -1), |
| list_count(con->work), |
| list_count(con->write_complete_work), |
| flags); |
| xfree(flags); |
| } |
| } |
| return 0; |
| } |
| |
| /* |
| * Close out the incoming to avoid any new work coming into the |
| * connection. |
| */ |
| if (con->input_fd != -1) { |
| log_flag(CONMGR, "%s: [%s] queuing close of incoming on connection input_fd=%d", |
| __func__, con->name, con->input_fd); |
| xassert(con_flag(con, FLAG_READ_EOF)); |
| add_work_con_fifo(true, con, work_close_con, NULL); |
| return 0; |
| } |
| |
| if (con_flag(con, FLAG_WAIT_ON_FINISH)) { |
| log_flag(CONMGR, "%s: [%s] waiting for %s", |
| __func__, con->name, |
| (con_flag(con, FLAG_IS_LISTEN) ? "on_finish()" : |
| "on_listen_finish()")); |
| return 0; |
| } |
| |
| if (con->arg) { |
| log_flag(CONMGR, "%s: [%s] queuing up %s", |
| __func__, con->name, |
| (con_flag(con, FLAG_IS_LISTEN) ? "on_finish()" : |
| "on_listen_finish()")); |
| |
| con_set_flag(con, FLAG_WAIT_ON_FINISH); |
| |
| /* notify caller of closing */ |
| add_work_con_fifo(true, con, _on_finish_wrapper, con->arg); |
| return 0; |
| } |
| |
| if (!list_is_empty(con->work) || !list_is_empty(con->write_complete_work)) { |
| log_flag(CONMGR, "%s: [%s] outstanding work for connection output_fd=%d work=%u write_complete_work=%u", |
| __func__, con->name, con->output_fd, |
| list_count(con->work), |
| list_count(con->write_complete_work)); |
| |
| /* |
| * Must finish all outstanding work before deletion. |
| * Work must have been added by on_finish() |
| */ |
| return 0; |
| } |
| |
| xassert(con->refs < INT_MAX); |
| xassert(con->refs >= 0); |
| if (con->refs > 0) { |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: [%s] waiting on outstanding references:%d flags=%s", |
| __func__, con->name, con->refs, flags); |
| xfree(flags); |
| } |
| |
| return 0; |
| } |
| |
| if (!con_flag(con, FLAG_IS_LISTEN) && is_tls && con->tls) { |
| log_flag(CONMGR, "%s: [%s] waiting to close TLS connection", |
| __func__, con->name); |
| add_work_con_fifo(true, con, tls_close, NULL); |
| return 0; |
| } |
| |
| /* |
| * This connection has no more pending work or possible IO: |
| * Remove the connection and close everything. |
| */ |
| |
| if (con->output_fd != -1) { |
| log_flag(CONMGR, "%s: [%s] waiting to close output_fd=%d", |
| __func__, con->name, con->output_fd); |
| _on_close_output_fd(con); |
| return 0; |
| } |
| |
| log_flag(CONMGR, "%s: [%s] closed connection", __func__, con->name); |
| |
| /* mark this connection for cleanup */ |
| return 1; |
| } |
| |
| extern void handle_connection(bool locked, conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (!locked) |
| slurm_mutex_lock(&mgr.mutex); |
| |
| (void) _handle_connection(con, NULL); |
| |
| if (!locked) |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static int _list_transfer_handle_connection(void *x, void *arg) |
| { |
| conmgr_fd_t *con = x; |
| handle_connection_args_t *args = arg; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(args->magic == MAGIC_HANDLE_CONNECTION); |
| |
| return _handle_connection(con, args); |
| } |
| |
| /* RET true to run again */ |
| static bool _attempt_accept(conmgr_fd_t *con) |
| { |
| slurm_addr_t addr = {0}; |
| socklen_t addrlen = sizeof(addr); |
| int input_fd = -1, fd = -1, rc = EINVAL; |
| const char *unix_path = NULL; |
| conmgr_con_type_t type = CON_TYPE_INVALID; |
| con_flags_t flags = FLAG_NONE; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if ((input_fd = con->input_fd) < 0) { |
| slurm_mutex_unlock(&mgr.mutex); |
| log_flag(CONMGR, "%s: [%s] skipping accept on closed connection", |
| __func__, con->name); |
| return false; |
| } else if (con_flag(con, FLAG_QUIESCE)) { |
| slurm_mutex_unlock(&mgr.mutex); |
| log_flag(CONMGR, "%s: [%s] skipping accept on quiesced connection", |
| __func__, con->name); |
| return false; |
| } else if (_is_accept_deferred()) { |
| warning("%s: [%s] Deferring to attempt to accept new incoming connection due to %d/%d connections", |
| __func__, con->name, list_count(mgr.connections), |
| mgr.max_connections); |
| slurm_mutex_unlock(&mgr.mutex); |
| return false; |
| } else |
| log_flag(CONMGR, "%s: [%s] attempting to accept new connection", |
| __func__, con->name); |
| |
| type = con->type; |
| flags = con->flags; |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| /* try to get the new file descriptor and retry on errors */ |
| if ((fd = accept4(input_fd, (struct sockaddr *) &addr, &addrlen, |
| SOCK_CLOEXEC)) < 0) { |
| if (errno == EINTR) { |
| log_flag(CONMGR, "%s: [%s] interrupt on accept(). Retrying.", |
| __func__, con->name); |
| return true; |
| } |
| if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { |
| log_flag(CONMGR, "%s: [%s] retry: %m", |
| __func__, con->name); |
| return false; |
| } |
| |
| error("%s: [%s] Error on accept socket: %m", |
| __func__, con->name); |
| |
| if ((errno == EMFILE) || (errno == ENFILE) || |
| (errno == ENOBUFS) || (errno == ENOMEM)) { |
| error("%s: [%s] retry on error: %m", |
| __func__, con->name); |
| return false; |
| } |
| |
| /* socket is likely dead: fail out */ |
| close_con(false, con); |
| return true; |
| } |
| |
| if (addrlen <= 0) |
| fatal("%s: empty address returned from accept()", |
| __func__); |
| if (addrlen > sizeof(addr)) |
| fatal("%s: unexpected large address returned from accept(): %u bytes", |
| __func__, addrlen); |
| |
| if (addr.ss_family == AF_UNIX) { |
| struct sockaddr_un *usock = (struct sockaddr_un *) &addr; |
| |
| xassert(usock->sun_family == AF_UNIX); |
| |
| if (!usock->sun_path[0]) { |
| /* |
| * Attempt to use parent socket's path. |
| * Need to lock to access con->address safely. |
| */ |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (con->address.ss_family == AF_UNIX) { |
| struct sockaddr_un *psock = |
| (struct sockaddr_un *) &con->address; |
| |
| if (psock->sun_path[0]) |
| (void) memcpy(&usock->sun_path, |
| &psock->sun_path, |
| sizeof(usock->sun_path)); |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| /* address may not be populated by kernel */ |
| if (usock->sun_path[0]) |
| unix_path = usock->sun_path; |
| } |
| |
| /* hand over FD for normal processing */ |
| if ((rc = add_connection(type, con, fd, fd, con->events, |
| (conmgr_con_flags_t) flags, &addr, addrlen, |
| false, unix_path, NULL, con->new_arg))) { |
| log_flag(CONMGR, "%s: [fd:%d] unable to a register new connection: %s", |
| __func__, fd, slurm_strerror(rc)); |
| return true; |
| } |
| |
| log_flag(CONMGR, "%s: [%s->fd:%d] registered newly accepted connection", |
| __func__, con->name, fd); |
| return true; |
| } |
| |
| /* |
| * listen socket is ready to accept |
| */ |
| static void _listen_accept(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) { |
| log_flag(CONMGR, "%s: [%s] skipping accept during shutdown", |
| __func__, con->name); |
| return; |
| } |
| |
| while (_attempt_accept(con)) |
| ; |
| } |
| |
| /* |
| * Inspect all connection states and apply actions required |
| */ |
| |
| static void _inspect_connections(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| bool send_signal = false; |
| handle_connection_args_t args = { |
| .magic = MAGIC_HANDLE_CONNECTION, |
| }; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(mgr.inspecting); |
| |
| _set_time(&args); |
| |
| /* |
| * Always check mgr.connections list first to avoid |
| * _is_accept_deferred() returning a different answer which could result |
| * in listeners not being set to PCTL_TYPE_LISTEN after enough |
| * connections were closed to fall below the max connection count. |
| */ |
| |
| if (list_transfer_match(mgr.connections, mgr.complete_conns, |
| _list_transfer_handle_connection, &args)) |
| send_signal = true; |
| if (list_transfer_match(mgr.listen_conns, mgr.complete_conns, |
| _list_transfer_handle_connection, &args)) |
| send_signal = true; |
| |
| mgr.inspecting = false; |
| |
| if (send_signal) |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| /* caller (or thread) must hold mgr.mutex lock */ |
| static int _handle_poll_event(int fd, pollctl_events_t events, void *arg) |
| { |
| conmgr_fd_t *con = NULL; |
| con_flags_t old_flags; |
| |
| xassert(fd >= 0); |
| |
| if (!(con = con_find_by_fd(fd))) { |
| /* close_con() was called during poll() was running */ |
| log_flag(CONMGR, "%s: Ignoring events for unknown fd:%d", |
| __func__, fd); |
| return SLURM_SUCCESS; |
| } |
| |
| /* record prior flags to know if something changed */ |
| old_flags = con->flags; |
| |
| con_unset_flag(con, FLAG_CAN_READ); |
| con_unset_flag(con, FLAG_CAN_WRITE); |
| |
| if (pollctl_events_has_error(events)) { |
| con_close_on_poll_error(con, fd); |
| /* connection errored but not handling of the connection */ |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Avoid poll()ing the connection until we handle the flags via |
| * _handle_connection() to avoid the kernel thinking we successfully |
| * received the new edge triggered events. |
| */ |
| con_set_polling(con, PCTL_TYPE_NONE, __func__); |
| |
| if (con_flag(con, FLAG_IS_LISTEN)) { |
| /* Special handling for listening sockets */ |
| if (pollctl_events_has_hangup(events)) { |
| log_flag(CONMGR, "%s: [%s] listener HANGUP", |
| __func__, con->name); |
| con_set_flag(con, FLAG_READ_EOF); |
| } else if (pollctl_events_can_read(events)) { |
| con_set_flag(con, FLAG_CAN_READ); |
| } else { |
| fatal_abort("should never happen"); |
| } |
| |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: [%s] listener fd=%u flags=%s", |
| __func__, con->name, fd, flags); |
| xfree(flags); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| if (fd == con->input_fd) { |
| con_assign_flag(con, FLAG_CAN_READ, |
| pollctl_events_can_read(events)); |
| |
| /* Avoid setting FLAG_READ_EOF if FLAG_CAN_READ */ |
| if (!con_flag(con, FLAG_CAN_READ) && |
| !con_flag(con, FLAG_READ_EOF)) |
| con_assign_flag(con, FLAG_READ_EOF, |
| pollctl_events_has_hangup(events)); |
| } |
| if (fd == con->output_fd) |
| con_assign_flag(con, FLAG_CAN_WRITE, |
| pollctl_events_can_write(events)); |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { |
| char *flags = con_flags_string(con->flags); |
| log_flag(CONMGR, "%s: [%s] fd=%d flags=%s", |
| __func__, con->name, fd, flags); |
| xfree(flags); |
| } |
| |
| /* Attempt to changed connection state immediately */ |
| if ((con->flags & FLAGS_MASK_STATE) != (old_flags & FLAGS_MASK_STATE)) |
| (void) _handle_connection(con, NULL); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* caller must hold mgr.mutex lock */ |
| static bool _is_poll_interrupt(void) |
| { |
| return (mgr.shutdown_requested || |
| (mgr.waiting_on_work && (mgr.workers.active == 1))); |
| } |
| |
| /* Poll all connections */ |
| static void _poll_connections(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| int rc; |
| |
| xassert(!conmgr_args.con); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| xassert(mgr.poll_active); |
| |
| if (_is_poll_interrupt()) { |
| log_flag(CONMGR, "%s: skipping poll()", __func__); |
| goto done; |
| } else if (list_is_empty(mgr.connections) && |
| list_is_empty(mgr.listen_conns)) { |
| log_flag(CONMGR, "%s: skipping poll() with 0 connections", |
| __func__); |
| goto done; |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| if ((rc = pollctl_poll(__func__))) |
| fatal_abort("%s: should never fail: pollctl_poll()=%s", |
| __func__, slurm_strerror(rc)); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if ((rc = pollctl_for_each_event(_handle_poll_event, NULL, |
| XSTRINGIFY(_handle_poll_event), |
| __func__))) |
| fatal_abort("%s: should never fail: pollctl_for_each_event()=%s", |
| __func__, slurm_strerror(rc)); |
| |
| done: |
| xassert(mgr.poll_active); |
| mgr.poll_active = false; |
| |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| log_flag(CONMGR, "%s: poll done", __func__); |
| } |
| |
| extern void wait_for_watch(void) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| while (mgr.watch_thread) |
| EVENT_WAIT(&mgr.watch_return, &mgr.mutex); |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| static void _connection_fd_delete(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = arg; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(!con->tls); |
| |
| log_flag(CONMGR, "%s: [%s] free connection input_fd=%d output_fd=%d", |
| __func__, con->name, con->input_fd, con->output_fd); |
| |
| FREE_NULL_BUFFER(con->in); |
| FREE_NULL_BUFFER(con->tls_in); |
| FREE_NULL_LIST(con->out); |
| FREE_NULL_LIST(con->tls_out); |
| FREE_NULL_LIST(con->work); |
| FREE_NULL_LIST(con->write_complete_work); |
| xfree(con->name); |
| xassert(!con->refs); |
| |
| con->magic = ~MAGIC_CON_MGR_FD; |
| xfree(con); |
| } |
| |
| static void _handle_complete_conns(void) |
| { |
| conmgr_fd_t *con; |
| |
| /* |
| * Memory cleanup of connections can be done entirely |
| * independently as there should be nothing left in |
| * conmgr that references the connection. |
| */ |
| |
| while ((con = list_pop(mgr.complete_conns))) { |
| /* |
| * Not adding work against connection since this is just |
| * to delete the connection and cleanup and it should |
| * not queue into the connection work queue itself |
| */ |
| add_work_fifo(true, _connection_fd_delete, con); |
| } |
| } |
| |
| static bool _handle_events(void) |
| { |
| /* grab counts once */ |
| int count = list_count(mgr.connections) + list_count(mgr.listen_conns); |
| |
| log_flag(CONMGR, "%s: connections=%u listen_conns=%u complete_conns=%u", |
| __func__, list_count(mgr.connections), |
| list_count(mgr.listen_conns), list_count(mgr.complete_conns)); |
| |
| if (!list_is_empty(mgr.complete_conns)) |
| _handle_complete_conns(); |
| |
| if (!count) |
| return false; |
| |
| if (!mgr.inspecting) { |
| mgr.inspecting = true; |
| add_work_fifo(true, _inspect_connections, NULL); |
| } |
| |
| /* start poll thread if needed */ |
| if (!mgr.poll_active) { |
| /* request a listen thread to run */ |
| log_flag(CONMGR, "%s: queuing up poll", __func__); |
| mgr.poll_active = true; |
| |
| add_work_fifo(true, _poll_connections, NULL); |
| } else |
| log_flag(CONMGR, "%s: poll active already", __func__); |
| |
| return true; |
| } |
| |
| static int _foreach_is_waiter(void *x, void *arg) |
| { |
| int *waiters_ptr = arg; |
| conmgr_fd_t *con = x; |
| bool skip = false; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(*waiters_ptr >= 0); |
| |
| if (is_signal_connection(con)) { |
| skip = true; |
| } else if (con_flag(con, FLAG_WORK_ACTIVE)) { |
| /* never skip when connection has active work */ |
| } else if (con_flag(con, FLAG_IS_LISTEN)) { |
| /* |
| * listeners don't matter if they are not running |
| * _listen_accept() as work |
| */ |
| skip = true; |
| } else if (con_flag(con, FLAG_QUIESCE)) { |
| /* |
| * Individually quiesced will not do anything and need to be |
| * skipped or the global quiesce will never happen |
| */ |
| skip = true; |
| } |
| |
| if (!skip) |
| (*waiters_ptr)++; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* Get count of connections that quiesce is waiting to complete */ |
| static int _get_quiesced_waiter_count(void) |
| { |
| int waiters = 0; |
| |
| (void) list_for_each_ro(mgr.connections, _foreach_is_waiter, &waiters); |
| (void) list_for_each_ro(mgr.listen_conns, _foreach_is_waiter, &waiters); |
| |
| return waiters; |
| } |
| |
| /* NOTE: must hold mgr.mutex */ |
| static int _close_con_for_each(void *x, void *arg) |
| { |
| conmgr_fd_t *con = x; |
| close_con(true, con); |
| return 1; |
| } |
| |
| static void _on_quiesce_timeout(void) |
| { |
| log_flag(CONMGR, "%s: Quiesce timed out. Closing all connections.", |
| __func__); |
| |
| /* Close all connections but not listeners */ |
| list_for_each(mgr.connections, _close_con_for_each, NULL); |
| } |
| |
| static void _quiesce_max_sleep(void) |
| { |
| handle_connection_args_t args = { |
| .magic = MAGIC_HANDLE_CONNECTION, |
| }; |
| |
| _set_time(&args); |
| |
| if (_handle_time_limit(&args, mgr.quiesce.start, |
| mgr.quiesce.conf_timeout, "quiesce", NULL, |
| __func__)) |
| _on_quiesce_timeout(); |
| } |
| |
| static bool _watch_loop(void) |
| { |
| if (mgr.shutdown_requested) { |
| signal_mgr_stop(); |
| cancel_delayed_work(false); |
| close_all_connections(); |
| } |
| |
| if (mgr.quiesce.requested) { |
| int waiters; |
| |
| /* |
| * Limit amount of time watch() will sleep to ensure that the |
| * quiesce timeout is enforced |
| */ |
| _quiesce_max_sleep(); |
| |
| /* Cancel any delayed connection work to avoid waiting on it */ |
| cancel_delayed_work(true); |
| |
| if (signal_mgr_has_incoming()) { |
| /* |
| * Must wait for any outstanding incoming signals to be |
| * processed or a pending signal may be deferred until |
| * after quiesce that may never come |
| */ |
| log_flag(CONMGR, "%s: quiesced state deferred due to pending incoming POSIX signal(s)", |
| __func__); |
| } else if ((waiters = _get_quiesced_waiter_count())) { |
| log_flag(CONMGR, "%s: quiesced state deferred to process connections:%d/%d", |
| __func__, waiters, |
| (list_count(mgr.connections) + |
| list_count(mgr.listen_conns))); |
| } else if (mgr.workers.active) { |
| log_flag(CONMGR, "%s: quiesced state waiting on workers:%d/%d", |
| __func__, mgr.workers.active, |
| mgr.workers.total); |
| mgr.waiting_on_work = true; |
| return true; |
| } else { |
| log_flag(CONMGR, "%s: BEGIN: quiesced state", __func__); |
| mgr.quiesce.active = true; |
| |
| EVENT_BROADCAST(&mgr.quiesce.on_start_quiesced); |
| |
| while (mgr.quiesce.active) |
| EVENT_WAIT(&mgr.quiesce.on_stop_quiesced, |
| &mgr.mutex); |
| |
| log_flag(CONMGR, "%s: END: quiesced state", __func__); |
| |
| /* |
| * All the worker threads may be waiting for a |
| * worker_sleep event and not an on_start_quiesced |
| * event. Wake them all up right now if there is any |
| * pending work queued to avoid workers remaining |
| * sleeping until add_work() is called enough times to |
| * wake them all up independent of the size of the |
| * mgr.work queue. |
| */ |
| if (!list_is_empty(mgr.work)) |
| EVENT_BROADCAST(&mgr.worker_sleep); |
| } |
| } |
| |
| if (_handle_events()) |
| return true; |
| |
| /* |
| * Avoid watch() ending if there are any other active workers or |
| * any queued work. |
| */ |
| |
| if (mgr.workers.active || !list_is_empty(mgr.work) || |
| !list_is_empty(mgr.delayed_work)) { |
| /* Need to wait for all work/workers to complete */ |
| log_flag(CONMGR, "%s: waiting on workers:%d work:%d delayed_work:%d", |
| __func__, mgr.workers.active, |
| list_count(mgr.delayed_work), list_count(mgr.work)); |
| mgr.waiting_on_work = true; |
| return true; |
| } |
| |
| log_flag(CONMGR, "%s: cleaning up", __func__); |
| |
| xassert(!mgr.poll_active); |
| return false; |
| } |
| |
| extern void *watch(void *arg) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| |
| xassert(mgr.watch_thread == pthread_self()); |
| |
| if (mgr.shutdown_requested) { |
| slurm_mutex_unlock(&mgr.mutex); |
| return NULL; |
| } |
| |
| add_work_fifo(true, signal_mgr_start, NULL); |
| |
| while (_watch_loop()) { |
| char timeout_str[CTIME_STR_LEN]; |
| |
| if (mgr.poll_active && _is_poll_interrupt()) { |
| /* |
| * poll() hasn't returned yet but we want to |
| * shutdown. Send interrupt before sleeping or |
| * watch() may end up sleeping forever. |
| */ |
| pollctl_interrupt(__func__); |
| } |
| |
| if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) |
| timespec_ctime(mgr.watch_max_sleep, true, timeout_str, |
| sizeof(timeout_str)); |
| |
| log_flag(CONMGR, "%s: waiting for new events: workers:%d/%d work:%d delayed_work:%d connections:%d listeners:%d complete:%d polling:%c inspecting:%c shutdown_requested:%c quiesce_requested:%c waiting_on_work:%c timeout:%s", |
| __func__, mgr.workers.active, |
| mgr.workers.total, list_count(mgr.work), |
| list_count(mgr.delayed_work), |
| list_count(mgr.connections), |
| list_count(mgr.listen_conns), |
| list_count(mgr.complete_conns), |
| BOOL_CHARIFY(mgr.poll_active), |
| BOOL_CHARIFY(mgr.inspecting), |
| BOOL_CHARIFY(mgr.shutdown_requested), |
| BOOL_CHARIFY(mgr.quiesce.requested), |
| BOOL_CHARIFY(mgr.waiting_on_work), |
| timeout_str); |
| |
| EVENT_WAIT_TIMED(&mgr.watch_sleep, mgr.watch_max_sleep, |
| &mgr.mutex); |
| mgr.waiting_on_work = false; |
| _reset_watch_max_sleep(); |
| } |
| |
| log_flag(CONMGR, "%s: returning shutdown_requested=%c connections=%u listen_conns=%u", |
| __func__, BOOL_CHARIFY(mgr.shutdown_requested), |
| list_count(mgr.connections), list_count(mgr.listen_conns)); |
| |
| xassert(mgr.watch_thread == pthread_self()); |
| mgr.watch_thread = 0; |
| |
| EVENT_BROADCAST(&mgr.watch_return); |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return NULL; |
| } |
| |
| extern void *watch_thread(void *arg) |
| { |
| #if HAVE_SYS_PRCTL_H |
| static char title[] = "watch"; |
| |
| if (prctl(PR_SET_NAME, title, NULL, NULL, NULL)) { |
| error("%s: cannot set process name to %s %m", |
| __func__, title); |
| } |
| #endif |
| |
| return watch(NULL); |
| } |