blob: e1152aa42f6d9795cacde33adbcf93d02982faca [file] [log] [blame]
/*****************************************************************************\
* poll.c - Definitions for poll() handlers
*****************************************************************************
* Copyright (C) SchedMD LLC.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/fd.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/read_config.h"
#include "src/common/timers.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/conmgr/events.h"
#include "src/conmgr/polling.h"
/*
* Size event count for 1 input and 1 output per connection and interrupt pipe
* fd. Add 35% buffer of extra events to account for non-listener creating
* connections. Allocated once to avoid calling xrecalloc() every time poll() is
* called.
*/
#define MAX_POLL_EVENTS(max_connections) (((max_connections * 2) + 1) * 1.35)
/* Increase poll events by amount when not enough slots available */
#define POLL_EVENTS_INCREASE(old) ((old) * 2)
/* string used for interrupt name in logging to match style of others fds */
#define INTERRUPT_CON_NAME "interrupt"
/*
* Need an arbitrary sized of bytes to ensure the pipe has been cleared of all
* bytes in a single read() even though there should only ever be 1 byte.
*/
#define FLUSH_BUFFER_BYTES 100
/* Flags to be used for each type of fd */
#define T(type, events) { type, XSTRINGIFY(type), events, XSTRINGIFY(events) }
static const struct {
pollctl_fd_type_t type;
const char *type_string;
uint32_t events;
const char *events_string;
} fd_types[] = {
T(PCTL_TYPE_INVALID, 0),
T(PCTL_TYPE_UNSUPPORTED, 0),
T(PCTL_TYPE_NONE, 0),
T(PCTL_TYPE_CONNECTED, (POLLHUP | POLLERR)),
T(PCTL_TYPE_READ_ONLY, (POLLIN | POLLHUP | POLLERR)),
T(PCTL_TYPE_READ_WRITE, (POLLIN | POLLOUT | POLLHUP | POLLERR)),
T(PCTL_TYPE_WRITE_ONLY, (POLLOUT | POLLHUP | POLLERR)),
T(PCTL_TYPE_LISTEN, (POLLIN | POLLHUP | POLLERR)),
T(PCTL_TYPE_INVALID_MAX, 0),
};
#undef T
#define T(flag) { flag, XSTRINGIFY(flag) }
static const struct {
uint32_t flag;
const char *string;
} poll_events[] = {
T(POLLIN), T(POLLPRI), T(POLLOUT), T(POLLERR),
T(POLLHUP), T(POLLNVAL),
#ifdef _XOPEN_SOURCE
#ifdef __linux__
T(POLLRDHUP),
#endif /* __linux__ */
T(POLLRDNORM), T(POLLRDBAND), T(POLLWRNORM), T(POLLWRBAND),
#endif /* _XOPEN_SOURCE */
};
#undef T
#define PCTL_INITIALIZER \
{ \
.mutex = PTHREAD_MUTEX_INITIALIZER, \
.poll_return = EVENT_INITIALIZER("POLL_RETURN"), \
.interrupt_return = EVENT_INITIALIZER("INTERRUPT_RETURN"), \
.interrupt = { \
.send = -1, \
.receive = 1, \
}, \
}
static struct pctl_s {
pthread_mutex_t mutex;
/* Is currently initialized */
bool initialized;
/* event to wait on pollctl_for_each_event() to return */
event_signal_t poll_return;
/* event to wait on pollctl_interrupt() to return */
event_signal_t interrupt_return;
/* True if actively polling() */
bool polling;
/* array of pollfd structs */
struct pollfd *events;
/* number of elements in events array */
int events_count;
/* array of file descriptors and polling type to poll() */
struct {
pollctl_fd_type_t type;
int fd;
} *fds;
/* number of file descriptors currently registered */
int fd_count;
struct {
/* pipe() used to break out of poll() */
int send;
int receive;
/* number of times interrupt requested */
int requested;
/* if a thread currently trying to send byte */
bool sending;
} interrupt;
} pctl = PCTL_INITIALIZER;
static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name,
const char *caller);
static int _unlink_fd(int fd, const char *con_name, const char *caller);
static void _interrupt(const char *caller);
static const char *_type_to_string(pollctl_fd_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(fd_types); i++)
if (fd_types[i].type == type)
return fd_types[i].type_string;
fatal_abort("should never execute");
}
static char *_poll_events_to_string(uint32_t events)
{
char *str = NULL, *at = NULL;
uint32_t matched = 0;
if (!events)
return xstrdup_printf("0");
for (int i = 0; i < ARRAY_SIZE(poll_events); i++) {
if ((poll_events[i].flag & events) == poll_events[i].flag) {
xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""),
poll_events[i].string);
matched |= poll_events[i].flag;
}
}
if (events ^ matched)
xstrfmtcatat(str, &at, "%s0x%08" PRIx32, (str ? "|" : ""),
(events ^ matched));
return str;
}
static uint32_t _fd_type_to_events(pollctl_fd_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(fd_types); i++)
if (fd_types[i].type == type)
return fd_types[i].events;
fatal_abort("should never happen");
}
static const char *_fd_type_to_type_string(pollctl_fd_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(fd_types); i++)
if (fd_types[i].type == type)
return fd_types[i].type_string;
fatal_abort("should never happen");
}
static const char *_fd_type_to_events_string(pollctl_fd_type_t type)
{
for (int i = 0; i < ARRAY_SIZE(fd_types); i++)
if (fd_types[i].type == type)
return fd_types[i].events_string;
fatal_abort("should never happen");
}
static void _check_pctl_magic(void)
{
#ifndef NDEBUG
/* check file descriptors are not sane */
xassert(pctl.initialized);
xassert(pctl.interrupt.send >= 0);
xassert(pctl.interrupt.receive >= 0);
xassert(pctl.interrupt.send != pctl.interrupt.receive);
xassert(pctl.fd_count >= 0);
xassert(pctl.interrupt.requested >= 0);
#endif /* !NDEBUG */
}
static void _atfork_child(void)
{
/*
* Force pctl to return to default state before it was initialized at
* forking as all of the prior state is completely unusable.
*/
pctl = (struct pctl_s) PCTL_INITIALIZER;
}
static void _init_events(void)
{
for (int i = 0; i < pctl.events_count; i++) {
if (pctl.events[i].fd != pctl.interrupt.receive) {
pctl.events[i].fd = -1;
pctl.events[i].events = 0;
pctl.events[i].revents = 0;
}
if (pctl.fds[i].fd != pctl.interrupt.receive) {
pctl.fds[i].fd = -1;
pctl.fds[i].type = PCTL_TYPE_NONE;
}
}
}
static void _init(const int max_connections)
{
int rc;
slurm_mutex_lock(&pctl.mutex);
if (pctl.initialized) {
log_flag(CONMGR, "%s: Skipping. Already initialized", __func__);
slurm_mutex_unlock(&pctl.mutex);
return;
}
pctl.events_count = MAX_POLL_EVENTS(max_connections);
if ((rc = pthread_atfork(NULL, NULL, _atfork_child)))
fatal_abort("%s: pthread_atfork() failed: %s", __func__,
slurm_strerror(rc));
{
int fd[2] = { -1, -1 };
if (pipe(fd))
fatal("%s: unable to open unnamed pipe: %m", __func__);
fd_set_nonblocking(fd[0]);
fd_set_close_on_exec(fd[0]);
pctl.interrupt.receive = fd[0];
fd_set_blocking(fd[1]);
fd_set_close_on_exec(fd[1]);
pctl.interrupt.send = fd[1];
}
pctl.events = xcalloc(pctl.events_count, sizeof(*pctl.events));
pctl.fds = xcalloc(pctl.events_count, sizeof(*pctl.fds));
_init_events();
pctl.initialized = true;
_check_pctl_magic();
if (_link_fd(pctl.interrupt.receive, PCTL_TYPE_READ_ONLY,
INTERRUPT_CON_NAME, __func__))
fatal_abort("unable to monitor interrupt");
slurm_mutex_unlock(&pctl.mutex);
}
static void _fini(void)
{
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
if (!pctl.initialized) {
slurm_mutex_unlock(&pctl.mutex);
return;
}
while (pctl.interrupt.sending)
EVENT_WAIT(&pctl.interrupt_return, &pctl.mutex);
while (pctl.polling)
EVENT_WAIT(&pctl.poll_return, &pctl.mutex);
#ifdef MEMORY_LEAK_DEBUG
(void) _unlink_fd(pctl.interrupt.receive, INTERRUPT_CON_NAME, __func__);
fd_close(&pctl.interrupt.receive);
fd_close(&pctl.interrupt.send);
xfree(pctl.events);
xfree(pctl.fds);
EVENT_FREE_MEMBERS(&pctl.poll_return);
EVENT_FREE_MEMBERS(&pctl.interrupt_return);
pctl.initialized = false;
#endif /* MEMORY_LEAK_DEBUG */
slurm_mutex_unlock(&pctl.mutex);
/*
* lock is never destroyed
* slurm_mutex_destroy(&pctl.mutex);
*/
}
/* caller must hold pctl.mutex lock */
static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name,
const char *caller)
{
for (int i = 0; i < pctl.events_count; i++) {
xassert(pctl.fds[i].fd != fd);
if (pctl.fds[i].fd != -1)
continue;
log_flag(CONMGR, "%s->%s: [POLL:%s] registered fd[%s]:%d for %s events",
caller, __func__, con_name,
_fd_type_to_type_string(type), fd,
_fd_type_to_events_string(type));
pctl.fds[i].fd = fd;
pctl.fds[i].type = type;
pctl.fd_count++;
return SLURM_SUCCESS;
}
/* No empty slots found -> need to increase events_count */
log_flag(CONMGR, "%s->%s: [POLL] Increasing max events: %d -> %d",
caller, __func__, pctl.events_count,
POLL_EVENTS_INCREASE(pctl.events_count));
pctl.events_count = POLL_EVENTS_INCREASE(pctl.events_count);
xrecalloc(pctl.events, pctl.events_count, sizeof(*pctl.events));
xrecalloc(pctl.fds, pctl.events_count, sizeof(*pctl.fds));
/* Run again as there will be enough slots available */
return _link_fd(fd, type, con_name, caller);
}
static int _lock_link_fd(int fd, pollctl_fd_type_t type, const char *con_name,
const char *caller)
{
int rc;
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
rc = _link_fd(fd, type, con_name, caller);
slurm_mutex_unlock(&pctl.mutex);
_interrupt(caller);
return rc;
}
static int _relink_fd(int fd, pollctl_fd_type_t type, const char *con_name,
const char *caller)
{
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
for (int i = 0; i < pctl.events_count; i++) {
if (pctl.fds[i].fd != fd)
continue;
log_flag(CONMGR, "%s->%s: [POLL:%s] Modified fd[%s]:%d for %s events",
caller, __func__, con_name,
_fd_type_to_type_string(type), fd,
_fd_type_to_events_string(type));
pctl.fds[i].type = type;
slurm_mutex_unlock(&pctl.mutex);
_interrupt(caller);
return SLURM_SUCCESS;
}
fatal_abort("should never happen");
}
/* caller must hold pctl.mutex */
static int _unlink_fd(int fd, const char *con_name, const char *caller)
{
int i = -1;
_check_pctl_magic();
for (i = 0; i < pctl.events_count; i++)
if (pctl.fds[i].fd == fd)
break;
if (i >= pctl.events_count)
fatal_abort("should never happen");
log_flag(CONMGR, "%s->%s: [POLL:%s] deregistered fd:%d events", caller,
__func__, con_name, fd);
pctl.fds[i].fd = -1;
pctl.fds[i].type = PCTL_TYPE_NONE;
pctl.fd_count--;
return SLURM_SUCCESS;
}
static int _lock_unlink_fd(int fd, const char *con_name, const char *caller)
{
int rc;
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
rc = _unlink_fd(fd, con_name, caller);
slurm_mutex_unlock(&pctl.mutex);
_interrupt(caller);
return rc;
}
static void _flush_interrupt(int intr_fd, uint32_t events, const char *caller)
{
ssize_t event_read = -1;
char buf[FLUSH_BUFFER_BYTES]; /* buffer for event_read */
/* clear trash from the interrupt pipe */
if ((event_read = read(intr_fd, buf, sizeof(buf)) < 0) &&
(errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR))
fatal_abort("this should never happen read(%d)=%m", intr_fd);
/* only 1 byte should ever get written to pipe at a time */
xassert(event_read <= 1);
slurm_mutex_lock(&pctl.mutex);
log_flag(CONMGR, "%s->%s: [POLL:%s] read %zd bytes representing %d pending requests while sending=%c",
caller, __func__, INTERRUPT_CON_NAME, event_read,
pctl.interrupt.requested,
(pctl.interrupt.sending ? 'T' : 'F'));
/* reset counter */
pctl.interrupt.requested = 0;
slurm_mutex_unlock(&pctl.mutex);
}
static int _poll(const char *caller)
{
int nfds = -1, rc = SLURM_SUCCESS, events_count = 0;
int fd_count = 0;
struct pollfd *events = NULL;
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
/*
* Using pctl.polling as way to avoid touching pctl.events while not
* holding the mutex so poll can be done without the lock.
*/
xassert(!pctl.polling);
pctl.polling = true;
events = pctl.events;
events_count = pctl.events_count;
fd_count = pctl.fd_count;
if (!events_count || (fd_count <= 1)) {
/*
* No point in running poll() when only file descriptor is the
* interrupt pipe or none at all
*/
slurm_mutex_unlock(&pctl.mutex);
log_flag(CONMGR, "%s->%s: [POLL] skipping poll() with %d/%d file descriptors",
caller, __func__, fd_count, events_count);
return SLURM_SUCCESS;
}
log_flag(CONMGR, "%s->%s: [POLL] BEGIN: poll() with %d file descriptors",
caller, __func__, pctl.fd_count);
for (int i = 0, t = 0; i < pctl.events_count; i++) {
if ((pctl.fds[i].fd < 0))
continue;
xassert(pctl.fds[i].type > PCTL_TYPE_INVALID);
xassert(pctl.fds[i].type < PCTL_TYPE_INVALID_MAX);
xassert(pctl.fds[i].type != PCTL_TYPE_NONE);
xassert(pctl.fds[i].type != PCTL_TYPE_UNSUPPORTED);
xassert(t < fd_count);
pctl.events[t].fd = pctl.fds[i].fd;
pctl.events[t].events = _fd_type_to_events(pctl.fds[i].type);
pctl.events[t].revents = 0;
t++;
}
slurm_mutex_unlock(&pctl.mutex);
xassert(fd_count > 0);
if ((nfds = poll(events, fd_count, -1)) < 0)
rc = errno;
slurm_mutex_lock(&pctl.mutex);
xassert(rc || (nfds <= pctl.events_count));
log_flag(CONMGR, "%s->%s: [POLL] END: poll() with events for %d/%d file descriptors",
caller, __func__, nfds, pctl.fd_count);
if (nfds > 0) {
/* wait for pollctl_for_each_event() to do anything */
} else if (!nfds) {
log_flag(CONMGR, "%s->%s: [POLL] END: poll() reported 0 events for %d file descriptors",
caller, __func__, pctl.fd_count);
} else if (rc == EINTR) {
/* Treat EINTR as no events detected */
rc = SLURM_SUCCESS;
log_flag(CONMGR, "%s->%s: [POLL] END: poll() interrupted by signal",
caller, __func__);
} else {
fatal_abort("%s->%s: [POLL] END: poll() failed: %m", caller,
__func__);
}
/* pctl.polling is set to false by pollctl_for_each_event() */
xassert(pctl.polling);
slurm_mutex_unlock(&pctl.mutex);
return rc;
}
static int _for_each_event(pollctl_event_func_t func, void *arg,
const char *func_name, const char *caller)
{
int rc = SLURM_SUCCESS, intr_fd = -1;
struct pollfd *events = NULL;
event_signal_t *poll_return = NULL;
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
xassert(pctl.polling);
events = pctl.events;
intr_fd = pctl.interrupt.receive;
slurm_mutex_unlock(&pctl.mutex);
for (int i = 0; !rc && (i < pctl.fd_count); ++i) {
char *events_str = NULL;
if (!events[i].revents)
continue;
if (events[i].fd == intr_fd) {
_flush_interrupt(intr_fd, events[i].revents, caller);
continue;
}
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR)
events_str = _poll_events_to_string(events[i].revents);
log_flag(CONMGR, "%s->%s: [POLL] BEGIN: calling %s(fd:%d, (%s), 0x%" PRIxPTR ")",
caller, __func__, func_name, events[i].fd, events_str,
(uintptr_t) arg);
rc = func(events[i].fd, events[i].revents, arg);
log_flag(CONMGR, "%s->%s: [POLL] END: called %s(fd:%d, (%s), 0x%" PRIxPTR ")=%s",
caller, __func__, func_name, events[i].fd, events_str,
(uintptr_t) arg, slurm_strerror(rc));
xfree(events_str);
}
slurm_mutex_lock(&pctl.mutex);
xassert(pctl.polling);
pctl.polling = false;
poll_return = &pctl.poll_return;
EVENT_BROADCAST(poll_return);
slurm_mutex_unlock(&pctl.mutex);
return rc;
}
/* send 1 byte without lock */
static int _intr_send_byte(int fd, const char *caller)
{
DEF_TIMERS;
char buf[] = "1";
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR)
START_TIMER;
/* send 1 byte of trash to wake up poll() */
safe_write(fd, buf, 1);
if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) {
END_TIMER3(NULL, 0);
log_flag(CONMGR, "%s->%s: [POLL] interrupt byte sent in %s",
caller, __func__, TIME_STR);
}
return SLURM_SUCCESS;
rwfail:
return errno;
}
static void _interrupt(const char *caller)
{
event_signal_t *interrupt_return = NULL;
int rc, fd = -1;
slurm_mutex_lock(&pctl.mutex);
_check_pctl_magic();
if (!pctl.polling) {
log_flag(CONMGR, "%s->%s: [POLL] skipping sending interrupt when not actively poll()ing",
caller, __func__);
} else {
pctl.interrupt.requested++;
/* Check for duplicate requests. */
if (pctl.interrupt.requested == 1) {
fd = pctl.interrupt.send;
xassert(!pctl.interrupt.sending);
pctl.interrupt.sending = true;
interrupt_return = &pctl.interrupt_return;
log_flag(CONMGR, "%s->%s: [POLL] sending interrupt requests=%d",
caller, __func__, pctl.interrupt.requested);
} else {
log_flag(CONMGR, "%s->%s: [POLL] skipping sending another interrupt requests=%d sending=%c",
caller, __func__, pctl.interrupt.requested,
(pctl.interrupt.sending ? 'T' : 'F'));
}
}
slurm_mutex_unlock(&pctl.mutex);
if (fd < 0)
return;
if ((rc = _intr_send_byte(fd, caller))) {
error("%s->%s: [POLL] write(%d) failed: %s", caller, __func__,
fd, slurm_strerror(errno));
}
slurm_mutex_lock(&pctl.mutex);
log_flag(CONMGR, "%s->%s: [POLL] interrupt sent requests=%d polling=%c",
caller, __func__, pctl.interrupt.requested,
(pctl.polling ? 'T' : 'F'));
xassert(fd == pctl.interrupt.send);
xassert(pctl.interrupt.sending);
pctl.interrupt.sending = false;
EVENT_BROADCAST(interrupt_return);
slurm_mutex_unlock(&pctl.mutex);
}
static bool _events_can_read(pollctl_events_t events)
{
return (events & (POLLIN | POLLHUP));
}
static bool _events_can_write(pollctl_events_t events)
{
return (events & (POLLOUT | POLLHUP));
}
static bool _events_has_error(pollctl_events_t events)
{
return (events & POLLERR);
}
static bool _events_has_hangup(pollctl_events_t events)
{
return (events & POLLHUP);
}
const poll_funcs_t poll_funcs = {
.mode = POLL_MODE_POLL,
.init = _init,
.fini = _fini,
.type_to_string = _type_to_string,
.link_fd = _lock_link_fd,
.relink_fd = _relink_fd,
.unlink_fd = _lock_unlink_fd,
.poll = _poll,
.for_each_event = _for_each_event,
.interrupt = _interrupt,
.events_can_read = _events_can_read,
.events_can_write = _events_can_write,
.events_has_error = _events_has_error,
.events_has_hangup = _events_has_hangup,
};