blob: a493658067c518520bae24dcbfb79de64eab2ca5 [file] [log] [blame]
/*****************************************************************************\
* workers.c - definitions for worker thread handlers
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "slurm/slurm_errno.h"
#include "src/common/events.h"
#include "src/common/macros.h"
#include "src/common/probes.h"
#include "src/common/read_config.h"
#include "src/common/slurm_time.h"
#include "src/common/threadpool.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xsched.h"
#include "src/conmgr/conmgr.h"
#include "src/conmgr/mgr.h"
/* Limit automatically set default thread count */
#define THREAD_AUTO_MAX 32
/* Threads to create per kernel reported CPU */
#define CPU_THREAD_MULTIPLIER 2
#define CPU_THREAD_HIGH 2
#define CPU_THREAD_LOW 2
/*
* Amount of time to sleep while polling for all threads to have started up
* during shutdown
*/
#define SHUTDOWN_WAIT_STARTUP_THREADS_SLEEP_NS 10
static void *_worker(void *arg);
static void _check_magic_workers(void)
{
xassert(mgr.workers.workers);
xassert(mgr.workers.active >= 0);
}
static void _check_magic_worker(worker_t *worker)
{
xassert(worker);
xassert(worker->magic == MAGIC_WORKER);
xassert(worker->id > 0);
}
static int _find_worker(void *x, void *arg)
{
return (x == arg);
}
static void _worker_free(void *x)
{
worker_t *worker = x;
if (!worker)
return;
_check_magic_worker(worker);
log_flag(CONMGR, "%s: [%u] free worker", __func__, worker->id);
worker->magic = ~MAGIC_WORKER;
xfree(worker);
}
/* caller must own mgr.mutex lock */
static void _worker_delete(void *x)
{
worker_t *worker = x;
if (!worker)
return;
/* list_delete_first calls _worker_free() */
list_delete_first(mgr.workers.workers, _find_worker, worker);
mgr.workers.total--;
}
static int _detect_cpu_count(void)
{
cpu_set_t mask = { { 0 } };
int rc = EINVAL, count = 0;
if ((rc = slurm_getaffinity(getpid(), sizeof(mask), &mask))) {
error("%s: Unable to query assigned CPU mask: %s",
__func__, slurm_strerror(rc));
return 0;
}
if ((count = task_cpuset_get_assigned_count(sizeof(mask), &mask)) < 0)
return 0;
log_flag(CONMGR, "%s: detected %d CPUs available from kernel",
__func__, count);
return count;
}
static probe_status_t _probe(probe_log_t *log)
{
probe_status_t status = PROBE_RC_UNKNOWN;
slurm_mutex_lock(&mgr.mutex);
probe_log(log, "workers: threads:%d/%d active:%d/%d shutdown_requested:%c",
list_count(mgr.workers.workers), mgr.workers.threads,
mgr.workers.active, mgr.workers.total,
BOOL_CHARIFY(mgr.workers.shutdown_requested));
if (!mgr.workers.workers)
status = PROBE_RC_DOWN;
else if (mgr.workers.shutdown_requested)
status = PROBE_RC_ONLINE;
else
status = PROBE_RC_READY;
slurm_mutex_unlock(&mgr.mutex);
return status;
}
static void _create_worker(const int i)
{
char title[PRCTL_BUF_BYTES] = "INVALID";
const int id = (i + 1);
int rc = EINVAL;
worker_t *worker = xmalloc(sizeof(*worker));
worker->magic = MAGIC_WORKER;
worker->id = id;
/*
* Avoid compiler warnings for id not fitting in PRCTL_BUF_BYTES by
* using %hu instead of %d
*/
xassert(id < UINT16_MAX);
(void) snprintf(title, sizeof(title), "worker[%hu]",
(unsigned short) id);
if ((rc = threadpool_create(_worker, XSTRINGIFY(_worker), worker, true,
title, &worker->tid,
__func__)))
fatal("%s: unable to create new thread: %s",
__func__, slurm_strerror(rc));
_check_magic_worker(worker);
list_append(mgr.workers.workers, worker);
}
extern void workers_init(int count, int default_count)
{
const int detected_cpus = _detect_cpu_count();
const int auto_threads_max = (detected_cpus * CPU_THREAD_MULTIPLIER);
const int auto_threads = MIN(THREAD_AUTO_MAX, auto_threads_max);
const int detected_threads_high = (detected_cpus * CPU_THREAD_HIGH);
const int detected_threads_low = (detected_cpus / CPU_THREAD_LOW);
const int warn_max_threads =
MIN(CONMGR_THREAD_COUNT_MAX, detected_threads_high);
const int min_def_threads =
MIN(THREAD_AUTO_MAX,
MAX(CONMGR_THREAD_COUNT_MIN, default_count));
const int warn_min_threads = MIN(detected_threads_low, min_def_threads);
if (!count && (mgr.workers.conf_threads > 0)) {
count = mgr.workers.conf_threads;
log_flag(CONMGR, "%s: Setting thread count to %s%d threads",
__func__, CONMGR_PARAM_THREADS,
mgr.workers.conf_threads);
}
if (!count) {
if ((default_count > 0)) {
count = default_count;
log_flag(CONMGR, "%s: Setting thread count to default %d threads",
__func__, default_count);
} else {
count = auto_threads;
log_flag(CONMGR, "%s: Setting thread count to %d/%d for %d available CPUs",
__func__, auto_threads, auto_threads_max,
detected_cpus);
}
} else if (((count > warn_max_threads) || (count < warn_min_threads))) {
warning("%s%d is configured outside of the suggested range of [%d, %d] for %d CPUs. Performance will be negatively impacted, potentially causing difficult to debug hangs. Please keep within the suggested range or use the automatically detected thread count of %d threads.",
CONMGR_PARAM_THREADS, count, warn_min_threads,
warn_max_threads, detected_cpus, auto_threads);
}
if (count < CONMGR_THREAD_COUNT_MIN) {
error("%s: %s%d too low, increasing to %d",
__func__, CONMGR_PARAM_THREADS, count,
CONMGR_THREAD_COUNT_MIN);
count = CONMGR_THREAD_COUNT_MIN;
} else if (count > CONMGR_THREAD_COUNT_MAX) {
error("%s: %s%d too high, decreasing to %d",
__func__, CONMGR_PARAM_THREADS, count,
CONMGR_THREAD_COUNT_MAX);
count = CONMGR_THREAD_COUNT_MAX;
}
log_flag(CONMGR, "%s: Initializing with %d workers", __func__, count);
xassert(!mgr.workers.workers);
mgr.workers.workers = list_create(_worker_free);
mgr.workers.threads = count;
_check_magic_workers();
for (int i = 0; i < count; i++)
_create_worker(i);
probe_register("conmgr->workers", _probe);
}
extern void workers_fini(void)
{
/* all workers should have already exited by now */
xassert(mgr.workers.shutdown_requested);
xassert(!mgr.workers.active);
xassert(!mgr.workers.total);
FREE_NULL_LIST(mgr.workers.workers);
mgr.workers.threads = 0;
}
static void *_worker(void *arg)
{
worker_t *worker = arg;
slurm_mutex_lock(&mgr.mutex);
_check_magic_worker(worker);
xassert(worker->tid == pthread_self());
mgr.workers.total++;
/*
* mgr.mutex should be locked at the beginning of this loop. It should
* also be locked when exiting the loop. It may be unlocked and
* relocked during the loop.
*/
while (true) {
work_t *work = NULL;
while (mgr.quiesce.active)
EVENT_WAIT(&mgr.quiesce.on_stop_quiesced, &mgr.mutex);
work = list_pop(mgr.work);
/* wait for work if nothing to do */
if (!work) {
if (mgr.workers.shutdown_requested)
break;
log_flag(CONMGR, "%s: [%u] waiting for work. Current active workers %u/%u",
__func__, worker->id, mgr.workers.active,
mgr.workers.total);
EVENT_WAIT(&mgr.worker_sleep, &mgr.mutex);
continue;
}
xassert(work->magic == MAGIC_WORK);
if (mgr.shutdown_requested) {
log_flag(CONMGR, "%s: [%u->%s] setting work status as cancelled after shutdown requested",
__func__, worker->id,
work->callback.func_name);
work->status = CONMGR_WORK_STATUS_CANCELLED;
}
/* got work, run it! */
mgr.workers.active++;
log_flag(CONMGR, "%s: [%u] %s() running active_workers=%u/%u queue=%u",
__func__, worker->id, work->callback.func_name,
mgr.workers.active, mgr.workers.total,
list_count(mgr.work));
/* Unlock mutex before running work */
slurm_mutex_unlock(&mgr.mutex);
/* run work via wrap_work() which will xfree(work) */
wrap_work(work);
work = NULL;
/* Lock mutex after running work */
slurm_mutex_lock(&mgr.mutex);
mgr.workers.active--;
log_flag(CONMGR, "%s: [%u] finished active_workers=%u/%u queue=%u",
__func__, worker->id, mgr.workers.active,
mgr.workers.total, list_count(mgr.work));
/* wake up watch for all ending work on shutdown */
if (mgr.shutdown_requested || mgr.waiting_on_work)
EVENT_SIGNAL(&mgr.watch_sleep);
}
log_flag(CONMGR, "%s: [%u] shutting down",
__func__, worker->id);
_worker_delete(worker);
EVENT_SIGNAL(&mgr.worker_return);
slurm_mutex_unlock(&mgr.mutex);
return NULL;
}
extern void workers_shutdown(void)
{
/*
* Wait until all threads have started up fully to avoid a thread
* starting after shutdown and hanging forever
*/
while (mgr.workers.threads &&
(mgr.workers.threads != mgr.workers.total)) {
EVENT_BROADCAST(&mgr.worker_sleep);
slurm_mutex_unlock(&mgr.mutex);
(void) slurm_nanosleep(0,
SHUTDOWN_WAIT_STARTUP_THREADS_SLEEP_NS);
slurm_mutex_lock(&mgr.mutex);
}
mgr.workers.shutdown_requested = true;
do {
log_flag(CONMGR, "%s: waiting for work=%u workers=%u/%u",
__func__, list_count(mgr.work), mgr.workers.active,
mgr.workers.total);
if (mgr.workers.total > 0) {
EVENT_BROADCAST(&mgr.worker_sleep);
EVENT_WAIT(&mgr.worker_return, &mgr.mutex);
}
} while (mgr.workers.total);
}