| /*****************************************************************************\ |
| * conmgr.c - definitions for connection manager |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include <errno.h> |
| #include <limits.h> |
| #include <signal.h> |
| #include <stdlib.h> |
| |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/read_config.h" |
| #include "src/common/xassert.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/conmgr/conmgr.h" |
| #include "src/conmgr/delayed.h" |
| #include "src/conmgr/mgr.h" |
| #include "src/conmgr/polling.h" |
| #include "src/conmgr/signals.h" |
| |
| #include "src/interfaces/tls.h" |
| #include "src/interfaces/url_parser.h" |
| |
| #define MAX_CONNECTIONS_DEFAULT 150 |
| |
| conmgr_t mgr = CONMGR_DEFAULT; |
| |
| static sig_atomic_t enabled_init = 0; |
| static bool enabled_status = false; |
| |
| static void _atfork_child(void) |
| { |
| /* Do nothing if conmgr was not running or already cleaned up */ |
| if (!mgr.initialized || !mgr.connections) |
| return; |
| |
| /* |
| * fork() while conmgr was running which means mgr's state is invalid |
| * and must not be used. Set mgr to same state as if conmgr_init() and |
| * conmgr_fini() was already called while being in an error state. |
| */ |
| mgr = CONMGR_DEFAULT; |
| mgr.initialized = true; |
| mgr.shutdown_requested = true; |
| mgr.error = ESHUTDOWN; |
| } |
| |
| static void _at_exit(void) |
| { |
| /* Skip locking mgr.mutex to avoid a deadlock */ |
| mgr.shutdown_requested = true; |
| } |
| |
| extern void conmgr_init(int thread_count, int max_connections) |
| { |
| int rc = EINVAL; |
| |
| (void) url_parser_g_init(); |
| |
| /* The configured value takes the highest precedence */ |
| if (mgr.conf_max_connections > 0) |
| max_connections = mgr.conf_max_connections; |
| else if (max_connections < 1) |
| max_connections = MAX_CONNECTIONS_DEFAULT; |
| xassert(max_connections > 0); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (mgr.initialized) { |
| slurm_mutex_unlock(&mgr.mutex); |
| debug5("%s: skipping - already initialized", __func__); |
| return; |
| } |
| |
| enabled_status = true; |
| mgr.shutdown_requested = false; |
| |
| if (mgr.workers.conf_threads > 0) |
| thread_count = mgr.workers.conf_threads; |
| workers_init(thread_count); |
| |
| if ((rc = pthread_atfork(NULL, NULL, _atfork_child))) |
| fatal_abort("%s: pthread_atfork() failed: %s", |
| __func__, slurm_strerror(rc)); |
| |
| add_work(true, NULL, |
| (conmgr_callback_t) { |
| .func = on_signal_alarm, |
| .func_name = XSTRINGIFY(on_signal_alarm), |
| }, |
| (conmgr_work_control_t) { |
| .depend_type = CONMGR_WORK_DEP_SIGNAL, |
| .on_signal_number = SIGALRM, |
| .schedule_type = CONMGR_WORK_SCHED_FIFO, |
| }, |
| 0, __func__); |
| |
| if (!mgr.conf_delay_write_complete) |
| mgr.conf_delay_write_complete = slurm_conf.msg_timeout; |
| if (!mgr.conf_read_timeout.tv_nsec && !mgr.conf_read_timeout.tv_sec) |
| mgr.conf_read_timeout.tv_sec = slurm_conf.msg_timeout; |
| if (!mgr.conf_write_timeout.tv_nsec && !mgr.conf_write_timeout.tv_sec) |
| mgr.conf_write_timeout.tv_sec = slurm_conf.msg_timeout; |
| if (!mgr.conf_connect_timeout.tv_nsec && |
| !mgr.conf_connect_timeout.tv_sec) |
| mgr.conf_connect_timeout.tv_sec = slurm_conf.msg_timeout; |
| if (!mgr.quiesce.conf_timeout.tv_nsec && |
| !mgr.quiesce.conf_timeout.tv_sec) |
| mgr.quiesce.conf_timeout.tv_sec = (2 * slurm_conf.msg_timeout); |
| |
| mgr.max_connections = max_connections; |
| mgr.connections = list_create(NULL); |
| mgr.listen_conns = list_create(NULL); |
| mgr.complete_conns = list_create(NULL); |
| mgr.work = list_create(NULL); |
| init_delayed_work(); |
| |
| pollctl_init(mgr.max_connections); |
| |
| mgr.initialized = true; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| /* Hook into atexit() in always clean shutdown if exit() called */ |
| (void) atexit(_at_exit); |
| } |
| |
| extern void conmgr_fini(void) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (!mgr.initialized) |
| fatal_abort("%s: shutdown but never initialized", __func__); |
| |
| mgr.shutdown_requested = true; |
| |
| if (mgr.watch_thread) { |
| slurm_mutex_unlock(&mgr.mutex); |
| wait_for_watch(); |
| slurm_mutex_lock(&mgr.mutex); |
| } |
| |
| if (!mgr.connections) { |
| log_flag(CONMGR, "%s: skipping clean up", __func__); |
| slurm_mutex_unlock(&mgr.mutex); |
| return; |
| } |
| |
| log_flag(CONMGR, "%s: connection manager shutting down", __func__); |
| |
| /* stop and cleanup signal manager */ |
| signal_mgr_fini(); |
| |
| /* processing may still be running at this point in a thread */ |
| close_all_connections(); |
| |
| /* tell all timers about being canceled */ |
| cancel_delayed_work(false); |
| |
| /* wait until all workers are done */ |
| workers_shutdown(); |
| |
| /* |
| * At this point, there should be no threads running. |
| * It should be safe to shutdown the mgr. |
| */ |
| FREE_NULL_LIST(mgr.connections); |
| FREE_NULL_LIST(mgr.listen_conns); |
| FREE_NULL_LIST(mgr.complete_conns); |
| |
| free_delayed_work(); |
| |
| workers_fini(); |
| |
| xassert(!mgr.quiesce.requested); |
| xassert(!mgr.quiesce.active); |
| xassert(!mgr.quiesce.start.tv_sec); |
| |
| /* work should have been cleared by workers_fini() */ |
| xassert(list_is_empty(mgr.work)); |
| FREE_NULL_LIST(mgr.work); |
| |
| pollctl_fini(); |
| |
| /* |
| * Do not destroy the mutex or cond so that this function does not |
| * crash when it tries to lock mgr.mutex if called more than once. |
| */ |
| /* slurm_mutex_destroy(&mgr.mutex); */ |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| (void) tls_g_fini(); |
| url_parser_g_fini(); |
| } |
| |
| extern int conmgr_run(bool blocking) |
| { |
| int rc = SLURM_SUCCESS; |
| bool running = false; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| |
| if (mgr.shutdown_requested) { |
| log_flag(CONMGR, "%s: refusing to run when conmgr is shutdown", |
| __func__); |
| |
| rc = mgr.error; |
| slurm_mutex_unlock(&mgr.mutex); |
| return rc; |
| } |
| |
| xassert(!mgr.error || !mgr.exit_on_error); |
| |
| if (mgr.watch_thread) |
| running = true; |
| else if (!blocking) |
| slurm_thread_create(&mgr.watch_thread, watch_thread, NULL); |
| else |
| mgr.watch_thread = pthread_self(); |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| if (blocking) { |
| if (running) |
| wait_for_watch(); |
| else |
| (void) watch(NULL); |
| } |
| |
| slurm_mutex_lock(&mgr.mutex); |
| rc = mgr.error; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| extern void conmgr_request_shutdown(void) |
| { |
| log_flag(CONMGR, "%s: shutdown requested", __func__); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| if (mgr.initialized) { |
| mgr.shutdown_requested = true; |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| } |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern void conmgr_set_exit_on_error(bool exit_on_error) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| mgr.exit_on_error = exit_on_error; |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern bool conmgr_get_exit_on_error(void) |
| { |
| bool exit_on_error; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| exit_on_error = mgr.exit_on_error; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return exit_on_error; |
| } |
| |
| extern int conmgr_get_error(void) |
| { |
| int rc; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| rc = mgr.error; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| return rc; |
| } |
| |
| extern bool conmgr_enabled(void) |
| { |
| if (enabled_init) |
| return enabled_status; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| enabled_status = mgr.initialized; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| log_flag(CONMGR, "%s: enabled=%c", |
| __func__, BOOL_CHARIFY(enabled_status)); |
| |
| enabled_init = true; |
| return enabled_status; |
| } |
| |
| extern int conmgr_set_params(const char *params) |
| { |
| char *tmp_str = NULL, *tok = NULL, *saveptr = NULL; |
| |
| slurm_mutex_lock(&mgr.mutex); |
| /* |
| * This should be called before conmgr is initialized so that params |
| * are applied on initialization. |
| */ |
| xassert(!mgr.initialized); |
| |
| tmp_str = xstrdup(params); |
| tok = strtok_r(tmp_str, ",", &saveptr); |
| while (tok) { |
| if (!xstrncasecmp(tok, CONMGR_PARAM_THREADS, |
| strlen(CONMGR_PARAM_THREADS))) { |
| const unsigned long count = |
| slurm_atoul(tok + strlen(CONMGR_PARAM_THREADS)); |
| |
| mgr.workers.conf_threads = count; |
| |
| log_flag(CONMGR, "%s: %s set %lu threads", |
| __func__, tok, count); |
| } else if (!xstrncasecmp(tok, CONMGR_PARAM_MAX_CONN, |
| strlen(CONMGR_PARAM_MAX_CONN))) { |
| const unsigned long count = |
| slurm_atoul(tok + strlen(CONMGR_PARAM_MAX_CONN)); |
| |
| if (count < 1) |
| fatal("%s: There must be at least 1 max connection", |
| __func__); |
| |
| mgr.conf_max_connections = count; |
| |
| log_flag(CONMGR, "%s: %s activated with %lu max connections", |
| __func__, tok, count); |
| } else if (!xstrncasecmp(tok, CONMGR_PARAM_QUIESCE_TIMEOUT, |
| strlen(CONMGR_PARAM_QUIESCE_TIMEOUT))) { |
| const unsigned long count = slurm_atoul(tok + |
| strlen(CONMGR_PARAM_QUIESCE_TIMEOUT)); |
| |
| if (count == ULONG_MAX) |
| fatal("%s: Invalid timeout: %m", __func__); |
| |
| mgr.quiesce.conf_timeout.tv_sec = count; |
| log_flag(CONMGR, "%s: %s activated with %lu seconds", |
| __func__, tok, count); |
| } else if (!xstrcasecmp(tok, CONMGR_PARAM_POLL_ONLY)) { |
| log_flag(CONMGR, "%s: %s activated", __func__, tok); |
| pollctl_set_mode(POLL_MODE_POLL); |
| } else if ( |
| !xstrncasecmp(tok, CONMGR_PARAM_WAIT_WRITE_DELAY, |
| strlen(CONMGR_PARAM_WAIT_WRITE_DELAY))) { |
| const unsigned long count = slurm_atoul(tok + |
| strlen(CONMGR_PARAM_WAIT_WRITE_DELAY)); |
| log_flag(CONMGR, "%s: %s activated", __func__, tok); |
| mgr.conf_delay_write_complete = count; |
| } else if (!xstrncasecmp(tok, CONMGR_PARAM_READ_TIMEOUT, |
| strlen(CONMGR_PARAM_READ_TIMEOUT))) { |
| const unsigned long count = slurm_atoul(tok + |
| strlen(CONMGR_PARAM_READ_TIMEOUT)); |
| log_flag(CONMGR, "%s: %s activated", __func__, tok); |
| mgr.conf_read_timeout.tv_sec = count; |
| } else if (!xstrncasecmp(tok, CONMGR_PARAM_WRITE_TIMEOUT, |
| strlen(CONMGR_PARAM_WRITE_TIMEOUT))) { |
| const unsigned long count = slurm_atoul(tok + |
| strlen(CONMGR_PARAM_WRITE_TIMEOUT)); |
| log_flag(CONMGR, "%s: %s activated", __func__, tok); |
| mgr.conf_write_timeout.tv_sec = count; |
| } else if ( |
| !xstrncasecmp(tok, CONMGR_PARAM_CONNECT_TIMEOUT, |
| strlen(CONMGR_PARAM_CONNECT_TIMEOUT))) { |
| const unsigned long count = slurm_atoul(tok + |
| strlen(CONMGR_PARAM_CONNECT_TIMEOUT)); |
| log_flag(CONMGR, "%s: %s activated", __func__, tok); |
| mgr.conf_connect_timeout.tv_sec = count; |
| } else { |
| log_flag(CONMGR, "%s: Ignoring parameter %s", |
| __func__, tok); |
| } |
| |
| tok = strtok_r(NULL, ",", &saveptr); |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| xfree(tmp_str); |
| return SLURM_SUCCESS; |
| } |
| |
| extern void conmgr_quiesce(const char *caller) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| |
| log_flag(CONMGR, "%s->%s: quiesce requested", caller, __func__); |
| |
| /* wait until other request has completed */ |
| while (mgr.quiesce.requested) |
| EVENT_WAIT(&mgr.quiesce.on_stop_quiesced, &mgr.mutex); |
| |
| xassert(!mgr.quiesce.active); |
| mgr.quiesce.requested = true; |
| xassert(!mgr.quiesce.start.tv_sec); |
| mgr.quiesce.start = timespec_now(); |
| |
| while (!mgr.quiesce.active) { |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| EVENT_WAIT(&mgr.quiesce.on_start_quiesced, &mgr.mutex); |
| } |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| } |
| |
| extern void conmgr_unquiesce(const char *caller) |
| { |
| slurm_mutex_lock(&mgr.mutex); |
| |
| xassert(mgr.quiesce.requested); |
| xassert(mgr.quiesce.active); |
| xassert(mgr.quiesce.start.tv_sec); |
| |
| mgr.quiesce.requested = false; |
| mgr.quiesce.active = false; |
| mgr.quiesce.start.tv_sec = 0; |
| |
| EVENT_BROADCAST(&mgr.quiesce.on_stop_quiesced); |
| |
| /* |
| * If watch() never gets to an active quiesce then watch() may not be |
| * waiting on on_stop_quiesced event before conmgr_unquiesce() is |
| * called. Then watch() could still be waiting for a watch_sleep event |
| * and not a on_stop_quiesced event which could result it in never |
| * waking up. |
| */ |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| |
| slurm_mutex_unlock(&mgr.mutex); |
| } |