blob: f42b901eadebf665001299bc73ca4e729414f4f4 [file] [log] [blame]
/*****************************************************************************\
* threadpool.c - Thread Pool
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "config.h"
#if HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
#include <pthread.h>
#include <stdint.h>
#include "src/common/events.h"
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/probes.h"
#include "src/common/read_config.h"
#include "src/common/slurm_time.h"
#include "src/common/threadpool.h"
#include "src/common/timers.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
/*
* From man prctl:
* If the length of the string, including the terminating null byte,
* exceeds 16 bytes, the string is silently truncated.
*/
#define PRCTL_BUF_BYTES 17
/* default thread name for logging */
#define CTIME_STR_LEN 72
/*
* Avoid compiler warnings for id not fitting in PRCTL_BUF_BYTES by using %hu
* and uint16_t instead of %d. This risks the index rolling over on longer lived
* daemons.
*/
#define THREAD_NAME_FMT "worker[%hu]"
#ifdef PTHREAD_SCOPE_SYSTEM
#define slurm_attr_init(attr) \
do { \
int err = pthread_attr_init(attr); \
if (err) { \
errno = err; \
fatal("pthread_attr_init: %m"); \
} \
/* we want 1:1 threads if there is a choice */ \
err = pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM); \
if (err) { \
errno = err; \
error("pthread_attr_setscope: %m"); \
} \
err = pthread_attr_setstacksize(attr, STACK_SIZE); \
if (err) { \
errno = err; \
error("pthread_attr_setstacksize: %m"); \
} \
} while (0)
#else
#define slurm_attr_init(attr) \
do { \
int err = pthread_attr_init(attr); \
if (err) { \
errno = err; \
fatal("pthread_attr_init: %m"); \
} \
err = pthread_attr_setstacksize(attr, STACK_SIZE); \
if (err) { \
errno = err; \
error("pthread_attr_setstacksize: %m"); \
} \
} while (0)
#endif
#define THREAD_MAGIC 0xA434F4D2
typedef struct {
int magic; /* THREAD_MAGIC */
/* pthread_self() from thread */
pthread_t id;
bool detached;
/* thread waiting for assignment */
pthread_t requester;
threadpool_func_t func;
timespec_t requested;
const char *thread_name;
const char *func_name;
void *arg;
/* return from func() */
void *ret;
} thread_t;
#define LOG_THREAD_MAGIC 0xabbb1119
typedef struct {
int magic; /* LOG_THREAD_MAGIC */
const char *type; /* zombie/pending */
probe_log_t *log;
} log_thread_t;
static struct {
/* true if enabled, false to ignore everything else */
bool enabled;
/* true if shutdown requested */
bool shutdown;
/* list_t * of thread_t * */
list_t *pending;
/* list_t * of thread_t * */
list_t *zombies;
/* Number of running threads */
int running;
/* Number of idle threads */
int idle;
/* counter of the threads run */
uint64_t total_run;
/* counter of the threads created */
uint64_t total_created;
/* peak thead count encountered */
uint64_t peak_count;
pthread_mutex_t mutex;
struct {
event_signal_t assign;
event_signal_t assigned;
event_signal_t assigned_ack;
event_signal_t end;
event_signal_t zombie;
event_signal_t join;
} events;
struct {
/* histogram of the latency from request to run */
latency_histogram_t request;
/* histogram of the time to run func() in threads */
latency_histogram_t run;
/* histogram of the latency to join threads */
latency_histogram_t join;
} histograms;
struct {
int preallocate;
int preserve;
} config;
} threadpool = {
.enabled = false,
.mutex = PTHREAD_MUTEX_INITIALIZER,
.events = {
.assign = EVENT_INITIALIZER("THREADPOOL-ASSIGN-THREAD"),
.assigned = EVENT_INITIALIZER("THREADPOOL-ASSIGNED-THREAD"),
.assigned_ack = EVENT_INITIALIZER("THREADPOOL-ASSIGNED-ACK-THREAD"),
.end = EVENT_INITIALIZER("THREADPOOL-END-THREAD"),
.zombie = EVENT_INITIALIZER("THREADPOOL-ZOMBIE-THREAD"),
},
.histograms = {
.request = LATENCY_HISTOGRAM_INITIALIZER,
.run = LATENCY_HISTOGRAM_INITIALIZER,
.join = LATENCY_HISTOGRAM_INITIALIZER,
},
.config = {
.preallocate = THREADPOOL_DEFAULT_PREALLOCATE,
.preserve = THREADPOOL_DEFAULT_PRESERVE,
},
};
/* Define slurm-specific aliases for use by plugins, see slurm_xlator.h. */
strong_alias(threadpool_create, slurm_threadpool_create);
strong_alias(threadpool_join, slurm_threadpool_join);
strong_alias(threadpool_init, slurm_threadpool_init);
strong_alias(threadpool_fini, slurm_threadpool_fini);
static int _join(const pthread_t id, const char *caller)
{
int rc = EINVAL;
void *ret = 0;
if ((rc = pthread_join(id, &ret))) {
error("%s->%s: pthread_join(id=0x%"PRIx64") failed: %s",
caller, __func__, (uint64_t) id, slurm_strerror(rc));
return rc;
}
if (ret == PTHREAD_CANCELED)
log_flag(THREAD, "%s->%s: pthread id=0x%"PRIx64" was cancelled",
caller, __func__, (uint64_t) id);
else
log_flag(THREAD, "%s->%s: pthread id=0x%"PRIx64" returned: 0x%"PRIxPTR,
caller, __func__, (uint64_t) id, (uintptr_t) ret);
return rc;
}
static int _match_thread_id(void *x, void *key)
{
thread_t *thread = x;
pthread_t id = (pthread_t) key;
xassert(thread->magic == THREAD_MAGIC);
xassert(thread->id > 0);
xassert(id > 0);
return ((thread->id == id) ? 1 : 0);
}
static int _threadpool_join(const pthread_t id, const char *caller)
{
thread_t *thread = NULL;
const timespec_t start_ts = timespec_now();
int rc = EINVAL;
slurm_mutex_lock(&threadpool.mutex);
do {
if ((thread = list_remove_first(threadpool.zombies,
_match_thread_id, (void *) id)))
break;
if (!threadpool.running)
break;
log_flag(THREAD, "%s->%s: waiting for thread id=0x%"PRIx64" with %d running threads",
caller, __func__, (uint64_t) id,
threadpool.running);
EVENT_WAIT(&threadpool.events.zombie, &threadpool.mutex);
} while (true);
if (thread) {
log_flag(THREAD, "%s->%s: joined pthread id=0x%"PRIx64" returned: 0x%"PRIxPTR,
caller, __func__, (uint64_t) thread->id,
(uintptr_t) thread->ret);
xassert(!thread->detached);
thread->detached = true;
EVENT_BROADCAST(&threadpool.events.join);
HISTOGRAM_ADD_DURATION(&threadpool.histograms.join, start_ts);
rc = SLURM_SUCCESS;
} else {
log_flag(THREAD, "%s->%s: pthread id=0x%"PRIx64" not found",
caller, __func__, (uint64_t) id);
rc = ESRCH;
}
slurm_mutex_unlock(&threadpool.mutex);
return rc;
}
extern int threadpool_join(const pthread_t id, const char *caller)
{
if (!id) {
log_flag(THREAD, "%s->%s: Ignoring invalid pthread id=0x0",
caller, __func__);
return SLURM_SUCCESS;
}
if (threadpool.enabled)
return _threadpool_join(id, caller);
else
return _join(id, caller);
}
static const char *_thread_default_name(void)
{
static __thread char name[PRCTL_BUF_BYTES] = { 0 };
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static int thread_count = 0;
uint16_t index = -1;
if (name[0])
return name;
slurm_mutex_lock(&mutex);
index = thread_count++;
slurm_mutex_unlock(&mutex);
(void) snprintf(name, sizeof(name), THREAD_NAME_FMT, index);
return name;
}
static const char *_thread_name(thread_t *thread)
{
const char *name = NULL;
if (thread)
name = thread->thread_name;
if (!name)
name = _thread_default_name();
return name;
}
static void _set_thread_name(thread_t *thread)
{
#if HAVE_SYS_PRCTL_H
const char *name = _thread_name(thread);
if (prctl(PR_SET_NAME, name, NULL, NULL, NULL))
error("%s: cannot set process name to %s %m", __func__, name);
#endif
}
static void _thread_free(thread_t *thread)
{
if (!thread)
return;
xassert(thread->magic == THREAD_MAGIC);
thread->magic = ~THREAD_MAGIC;
xfree(thread->thread_name);
xfree(thread);
}
static void _run(thread_t *thread)
{
const timespec_t start = timespec_now();
xassert(thread->magic == THREAD_MAGIC);
_set_thread_name(thread);
if (slurm_conf.debug_flags & DEBUG_FLAG_THREAD) {
char ts[CTIME_STR_LEN] = "UNKNOWN";
timespec_diff_ns_t diff =
timespec_diff_ns(start, thread->requested);
(void) timespec_ctime(diff.diff, false, ts, sizeof(ts));
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] BEGIN: %s thread calling %s(0x%"PRIxPTR") after %s",
__func__, _thread_name(thread), (uint64_t) thread->id,
(thread->detached ? "detached" : "attached"),
thread->func_name, (uintptr_t) thread->arg, ts);
}
if (threadpool.enabled)
HISTOGRAM_ADD_DURATION(&threadpool.histograms.request,
thread->requested);
thread->ret = thread->func(thread->arg);
if (threadpool.enabled)
HISTOGRAM_ADD_DURATION(&threadpool.histograms.run, start);
if (slurm_conf.debug_flags & DEBUG_FLAG_THREAD) {
char ts[CTIME_STR_LEN] = "UNKNOWN";
timespec_diff_ns_t diff =
timespec_diff_ns(timespec_now(), start);
(void) timespec_ctime(diff.diff, false, ts, sizeof(ts));
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] END: %s thread called %s(0x%"PRIxPTR")=0x%"PRIxPTR" for %s",
__func__, _thread_name(thread), (uint64_t) thread->id,
(thread->detached ? "detached" : "attached"),
thread->func_name,
(uintptr_t) thread->arg, (uintptr_t) thread->ret, ts);
}
}
/* caller must hold threadpool.mutex lock */
static void _threadpool_wait_ack(thread_t *thread)
{
timespec_t start_ts = { 0, 0 };
pthread_t requester = thread->requester;
if (slurm_conf.debug_flags & DEBUG_FLAG_THREAD) {
start_ts = timespec_now();
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] BEGIN: waiting for requester 0x%"PRIx64" to acknowledge assignment",
__func__, _thread_name(thread), (uint64_t) thread->id,
(uint64_t) requester);
}
while (thread->requester)
EVENT_WAIT(&threadpool.events.assigned_ack, &threadpool.mutex);
if ((slurm_conf.debug_flags & DEBUG_FLAG_THREAD) && start_ts.tv_sec) {
char ts[CTIME_STR_LEN] = "UNKNOWN";
timespec_diff_ns_t diff =
timespec_diff_ns(timespec_now(), start_ts);
(void) timespec_ctime(diff.diff, false, ts, sizeof(ts));
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] END: acknowledged by requester 0x%"PRIx64" after %s",
__func__, _thread_name(thread), (uint64_t) thread->id,
(uint64_t) requester, ts);
}
}
/* caller must hold threadpool.mutex lock */
static void _threadpool_zombie(thread_t *thread)
{
timespec_t start_ts = { 0, 0 };
if (slurm_conf.debug_flags & DEBUG_FLAG_THREAD) {
start_ts = timespec_now();
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] BEGIN: waiting to be joined",
__func__, _thread_name(thread), (uint64_t) thread->id);
}
list_append(threadpool.zombies, thread);
while (!thread->detached) {
EVENT_BROADCAST(&threadpool.events.zombie);
EVENT_WAIT(&threadpool.events.join, &threadpool.mutex);
}
if ((slurm_conf.debug_flags & DEBUG_FLAG_THREAD) && start_ts.tv_sec) {
char ts[CTIME_STR_LEN] = "UNKNOWN";
timespec_diff_ns_t diff =
timespec_diff_ns(timespec_now(), start_ts);
(void) timespec_ctime(diff.diff, false, ts, sizeof(ts));
log_flag(THREAD, "%s: [%s@0x%"PRIx64"] END: joined after waiting %s",
__func__, _thread_name(thread),
(uint64_t) thread->id, ts);
}
/* join thread should have removed ptr from zombie list */
xassert(!list_delete_ptr(threadpool.zombies, thread));
}
/* caller must hold threadpool.mutex lock */
static void _threadpool_prerun(thread_t *thread)
{
xassert(thread->magic == THREAD_MAGIC);
threadpool.total_run++;
xassert(!thread->id);
thread->id = pthread_self();
threadpool.idle--;
threadpool.running++;
if ((threadpool.idle + threadpool.running) > threadpool.peak_count)
threadpool.peak_count = (threadpool.idle + threadpool.running);
xassert(threadpool.idle >= 0);
xassert(threadpool.running > 0);
EVENT_BROADCAST(&threadpool.events.assigned);
}
/* caller must hold threadpool.mutex lock */
static void _threadpool_postrun(thread_t *thread)
{
threadpool.running--;
threadpool.idle++;
xassert(threadpool.idle > 0);
xassert(threadpool.running >= 0);
if (thread->requester)
_threadpool_wait_ack(thread);
if (!thread->detached)
_threadpool_zombie(thread);
xassert(thread->magic == THREAD_MAGIC);
xassert(thread->detached);
xassert(!thread->requester);
}
static void *_thread(void *arg)
{
thread_t *thread = arg;
if (!threadpool.enabled) {
void *ret = NULL;
xassert(thread->magic == THREAD_MAGIC);
_run(thread);
ret = thread->ret;
_thread_free(thread);
return ret;
}
slurm_mutex_lock(&threadpool.mutex);
xassert(!thread || (thread->magic == THREAD_MAGIC));
threadpool.idle++;
#ifndef NDEBUG
if ((threadpool.running + threadpool.idle) > THREADPOOL_MAX_THREADS)
warning("%s: threadpool is over capacity %d/%d",
__func__, (threadpool.running + threadpool.idle),
THREADPOOL_MAX_THREADS);
#endif
do {
if (thread) {
_threadpool_prerun(thread);
slurm_mutex_unlock(&threadpool.mutex);
_run(thread);
slurm_mutex_lock(&threadpool.mutex);
_threadpool_postrun(thread);
slurm_mutex_unlock(&threadpool.mutex);
_thread_free(thread);
thread = NULL;
slurm_mutex_lock(&threadpool.mutex);
}
if ((thread = list_pop(threadpool.pending))) {
xassert(thread->magic == THREAD_MAGIC);
xassert(!thread->id);
continue;
}
if (threadpool.shutdown) {
log_flag(THREAD, "%s: [0x%"PRIx64"] exiting due to shutdown",
__func__, (uint64_t) pthread_self());
break;
}
if (threadpool.idle > threadpool.config.preserve) {
log_flag(THREAD, "%s: [0x%"PRIx64"] exiting due to %d/%d idle threads",
__func__, (uint64_t) pthread_self(),
threadpool.idle, threadpool.config.preserve);
break;
}
log_flag(THREAD, "%s: [0x%"PRIx64"] waiting for pending thread work with %d/%d idle threads",
__func__, (uint64_t) pthread_self(), threadpool.idle,
threadpool.config.preserve);
xassert(list_is_empty(threadpool.pending));
xassert(threadpool.idle > 0);
EVENT_WAIT(&threadpool.events.assign, &threadpool.mutex);
} while (true);
threadpool.idle--;
EVENT_BROADCAST(&threadpool.events.end);
slurm_mutex_unlock(&threadpool.mutex);
return NULL;
}
static void _free_attr(pthread_attr_t *attr)
{
int rc = EINVAL;
if ((rc = pthread_attr_destroy(attr)))
fatal("%s: pthread_attr_destroy failed: %s",
__func__, slurm_strerror(rc));
}
static int _new_thread(thread_t *thread, pthread_t *id_ptr, const char *caller)
{
pthread_t id = 0;
pthread_attr_t attr;
int rc = EINVAL;
xassert((!thread && threadpool.enabled) ||
(thread->magic == THREAD_MAGIC));
if ((rc = pthread_attr_init(&attr)))
fatal("%s->%s: pthread_attr_init() failed: %s",
caller, __func__, slurm_strerror(rc));
#ifdef PTHREAD_SCOPE_SYSTEM
/* we want 1:1 threads if there is a choice */
if ((rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
fatal("%s->%s: pthread_attr_setscope(PTHREAD_SCOPE_SYSTEM) failed: %s",
caller, __func__, slurm_strerror(rc));
#endif
if ((rc = pthread_attr_setstacksize(&attr, STACK_SIZE)))
fatal("%s->%s: pthread_attr_setstacksize(%u) failed: %s",
caller, __func__, STACK_SIZE, slurm_strerror(rc));
if (id_ptr)
*id_ptr = 0;
/* All threadpool threads are always detached */
if ((threadpool.enabled || thread->detached) &&
(rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
fatal("%s->%s: pthread_attr_setdetachstate failed: %s",
caller, __func__, slurm_strerror(rc));
/* Pass ownership of thread to _thread() on success */
rc = pthread_create(&id, &attr, _thread, thread);
if (rc) {
error("%s->%s: pthread_create() failed: %s",
caller, __func__, slurm_strerror(rc));
_thread_free(thread);
} else {
xassert(threadpool.enabled || thread->func_name);
log_flag(THREAD, "%s->%s: pthread_create() created new %spthread id=0x%"PRIx64" for %s%s",
caller, __func__,
(threadpool.enabled ? "" :
(thread->detached ? "detached " : "attached ")),
(uint64_t) id,
(threadpool.enabled ? "threadpool" :
thread->func_name), (threadpool.enabled ? "" : "()"));
if (threadpool.enabled) {
slurm_mutex_lock(&threadpool.mutex);
threadpool.total_created++;
slurm_mutex_unlock(&threadpool.mutex);
}
}
_free_attr(&attr);
if (id_ptr)
*id_ptr = id;
return rc;
}
/*
* True if there is at least 1 thread ready to run.
* Caller must hold threadpool.mutex lock.
*/
static bool _thread_available(void)
{
const int pending = list_count(threadpool.pending);
const int idle = threadpool.idle;
const int zombies = list_count(threadpool.zombies);
/*
* The number of idle threads not stuck as zombies must be greater than
* the current count of pending thread requested to have at least one
* thread available to run.
*/
return ((idle - zombies) > pending);
}
static bool _assign(thread_t *thread, pthread_t *id_ptr, const char *caller)
{
slurm_mutex_lock(&threadpool.mutex);
xassert(thread->magic == THREAD_MAGIC);
xassert(!thread->requester);
if (!_thread_available()) {
log_flag(THREAD, "%s->%s: zero available idle threads for %s()",
caller, __func__, thread->func_name);
slurm_mutex_unlock(&threadpool.mutex);
return false;
}
if (id_ptr) {
/*
* Only assign requester to have the thread signal upon
* assignment but to skip waiting otherwise
*/
thread->requester = pthread_self();
}
list_append(threadpool.pending, thread);
if (!id_ptr) {
/*
* No need to wait for assignment from pending list after waking
* up an idle thread to accept the work
*/
EVENT_SIGNAL(&threadpool.events.assign);
slurm_mutex_unlock(&threadpool.mutex);
return true;
}
/*
* Need to wait for thread assignment if the thread ID pointer needs to
* be populated
*/
while (!thread->id) {
xassert(thread->magic == THREAD_MAGIC);
xassert(thread->requester == pthread_self());
xassert(threadpool.idle > 0);
EVENT_SIGNAL(&threadpool.events.assign);
EVENT_WAIT(&threadpool.events.assigned, &threadpool.mutex);
}
xassert(thread->magic == THREAD_MAGIC);
xassert(thread->requester == pthread_self());
xassert(thread->id);
thread->requester = 0;
*id_ptr = thread->id;
log_flag(THREAD, "%s->%s: assigned pthread id=0x%"PRIx64" for %s()",
caller, __func__, (uint64_t) thread->id,
thread->func_name);
EVENT_BROADCAST(&threadpool.events.assigned_ack);
/* thread should have removed ptr from pending list */
xassert(!list_delete_ptr(threadpool.pending, thread));
slurm_mutex_unlock(&threadpool.mutex);
return true;
}
extern int threadpool_create(threadpool_func_t func, const char *func_name,
void *arg, const bool detached,
const char *thread_name, pthread_t *id_ptr,
const char *caller)
{
thread_t *thread = xmalloc(sizeof(*thread));
#ifndef NDEBUG
if (thread_name && strlen(thread_name) >= PRCTL_BUF_BYTES)
warning("%s: Thread name truncated[%zu/%zu]: %s",
caller, (uint64_t) strlen(thread_name),
(uint64_t) PRCTL_BUF_BYTES, thread_name);
#endif
*thread = (thread_t) {
.magic = THREAD_MAGIC,
.detached = detached,
.thread_name = xstrdup(thread_name),
.func = func,
.func_name = func_name,
.arg = arg,
.requested = timespec_now(),
};
if (threadpool.enabled && _assign(thread, id_ptr, caller))
return SLURM_SUCCESS;
return _new_thread(thread, id_ptr, caller);
}
static void _parse_params(const int default_count, const char *params)
{
char *tmp_str = NULL, *tok = NULL, *saveptr = NULL;
if (default_count > 0)
threadpool.config.preallocate = default_count;
if (!params)
return;
tmp_str = xstrdup(params);
tok = strtok_r(tmp_str, ",", &saveptr);
while (tok) {
if (!xstrncasecmp(tok, THREADPOOL_PARAM,
strlen(THREADPOOL_PARAM))) {
const char *value = (tok + strlen(THREADPOOL_PARAM));
if (!xstrcasecmp(value, "enabled"))
; /* ignore as enabled by default */
else if (!xstrcasecmp(value, "disabled"))
threadpool.shutdown = true;
else
fatal("Invalid parameter %s", tok);
log_flag(THREAD, "%s: threadpool is %s",
__func__, (threadpool.shutdown ? "disabled" :
"enabled"));
} else if (
!xstrncasecmp(tok, THREADPOOL_PARAM_PREALLOCATE,
strlen(THREADPOOL_PARAM_PREALLOCATE))) {
const unsigned long count = slurm_atoul(
tok + strlen(THREADPOOL_PARAM_PREALLOCATE));
if (count > THREADPOOL_MAX_THREADS)
fatal("%s: invalid parameter %s",
__func__, tok);
threadpool.config.preallocate = count;
log_flag(THREAD, "%s: preallocate %lu threads",
__func__, count);
} else if (!xstrncasecmp(tok, THREADPOOL_PARAM_PRESERVE,
strlen(THREADPOOL_PARAM_PRESERVE))) {
const unsigned long count =
slurm_atoul(tok +
strlen(THREADPOOL_PARAM_PRESERVE));
if (count > THREADPOOL_MAX_THREADS)
fatal("%s: invalid parameter %s",
__func__, tok);
threadpool.config.preserve = count;
log_flag(THREAD, "%s: preserve %lu threads",
__func__, count);
} else {
log_flag(THREAD, "%s: threadpool ignoring parameter %s",
__func__, tok);
}
tok = strtok_r(NULL, ",", &saveptr);
}
xfree(tmp_str);
}
static int _log_thread(void *x, void *arg)
{
log_thread_t *logt = arg;
thread_t *thread = x;
xassert(logt->magic == LOG_THREAD_MAGIC);
xassert(thread->magic == THREAD_MAGIC);
probe_log(logt->log, "thread[%s@0x%"PRIx64"]: func=%s(0x%"PRIxPTR") type=%s detached=%c requester=0x%"PRIx64" ret=0x%"PRIxPTR,
thread->thread_name, (uint64_t) thread->id,
thread->func_name, (uintptr_t) thread->arg, logt->type,
BOOL_CHARIFY(thread->detached), (uint64_t) thread->requester,
(uintptr_t) thread->ret);
return 0;
}
/* Caller must hold threadpool.mutex lock */
static void _probe_verbose(probe_log_t *log)
{
log_thread_t logt = {
.magic = LOG_THREAD_MAGIC,
.log = log,
};
char histogram[LATENCY_METRIC_HISTOGRAM_STR_LEN] = { 0 };
probe_log(log, "config: preallocate:%d preserve:%d",
threadpool.config.preallocate, threadpool.config.preserve);
probe_log(
log,
"state: shutdown:%c pending:%d zombies:%d running:%d idle:%d total_run:%" PRIu64 " total_created:%" PRIu64 " peak_count:%" PRIu64,
BOOL_CHARIFY(threadpool.shutdown),
list_count(threadpool.pending), list_count(threadpool.zombies),
threadpool.running, threadpool.idle, threadpool.total_run,
threadpool.total_created, threadpool.peak_count);
logt.type = "pending";
(void) list_for_each_ro(threadpool.pending, _log_thread, &logt);
logt.type = "zombie";
(void) list_for_each_ro(threadpool.zombies, _log_thread, &logt);
(void) latency_histogram_print_labels(histogram, sizeof(histogram));
probe_log(log, "histogram: %s", histogram);
(void) latency_histogram_print(&threadpool.histograms.request,
histogram, sizeof(histogram));
probe_log(log, "request histogram: %s", histogram);
(void) latency_histogram_print(&threadpool.histograms.run, histogram,
sizeof(histogram));
probe_log(log, "run histogram: %s", histogram);
(void) latency_histogram_print(&threadpool.histograms.join, histogram,
sizeof(histogram));
probe_log(log, "join histogram: %s", histogram);
}
static probe_status_t _probe(probe_log_t *log, void *arg)
{
probe_status_t status = PROBE_RC_UNKNOWN;
slurm_mutex_lock(&threadpool.mutex);
if (log)
_probe_verbose(log);
if (!threadpool.enabled)
status = PROBE_RC_UNKNOWN;
else if (threadpool.shutdown)
status = PROBE_RC_ONLINE;
else if ((threadpool.running + threadpool.idle) >
THREADPOOL_MAX_THREADS)
status = PROBE_RC_BUSY;
else
status = PROBE_RC_READY;
slurm_mutex_unlock(&threadpool.mutex);
return status;
}
extern void threadpool_init(const int default_count, const char *params)
{
int preallocate = -1;
_parse_params(default_count, params);
slurm_mutex_lock(&threadpool.mutex);
if (threadpool.enabled || threadpool.shutdown) {
slurm_mutex_unlock(&threadpool.mutex);
return;
}
xassert(!threadpool.shutdown);
threadpool.enabled = true;
xassert(!threadpool.pending);
threadpool.pending = list_create((ListDelF) _thread_free);
xassert(!threadpool.zombies);
threadpool.zombies = list_create((ListDelF) _thread_free);
preallocate = threadpool.config.preallocate;
slurm_mutex_unlock(&threadpool.mutex);
probe_register("threadpool", _probe, NULL);
xassert(preallocate >= 0);
for (int i = 0; i < preallocate; i++)
_new_thread(NULL, NULL, __func__);
log_flag(THREAD, "%s: started with %d threads preallocated",
__func__, preallocate);
}
extern void threadpool_fini(void)
{
slurm_mutex_lock(&threadpool.mutex);
if (!threadpool.enabled) {
slurm_mutex_unlock(&threadpool.mutex);
return;
}
/*
* Never change threadpool.enabled to false to avoid race conditions of
* checking if threadpool was ever enabled
*/
threadpool.shutdown = true;
slurm_mutex_unlock(&threadpool.mutex);
}