|  | /*****************************************************************************\ | 
|  | *  epoll.c - Definitions for epoll_*() handlers | 
|  | ***************************************************************************** | 
|  | *  Copyright (C) SchedMD LLC. | 
|  | * | 
|  | *  This file is part of Slurm, a resource management program. | 
|  | *  For details, see <https://slurm.schedmd.com/>. | 
|  | *  Please also read the included file: DISCLAIMER. | 
|  | * | 
|  | *  Slurm is free software; you can redistribute it and/or modify it under | 
|  | *  the terms of the GNU General Public License as published by the Free | 
|  | *  Software Foundation; either version 2 of the License, or (at your option) | 
|  | *  any later version. | 
|  | * | 
|  | *  In addition, as a special exception, the copyright holders give permission | 
|  | *  to link the code of portions of this program with the OpenSSL library under | 
|  | *  certain conditions as described in each individual source file, and | 
|  | *  distribute linked combinations including the two. You must obey the GNU | 
|  | *  General Public License in all respects for all of the code used other than | 
|  | *  OpenSSL. If you modify file(s) with this exception, you may extend this | 
|  | *  exception to your version of the file(s), but you are not obligated to do | 
|  | *  so. If you do not wish to do so, delete this exception statement from your | 
|  | *  version.  If you delete this exception statement from all source files in | 
|  | *  the program, then also delete it here. | 
|  | * | 
|  | *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY | 
|  | *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
|  | *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more | 
|  | *  details. | 
|  | * | 
|  | *  You should have received a copy of the GNU General Public License along | 
|  | *  with Slurm; if not, write to the Free Software Foundation, Inc., | 
|  | *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA. | 
|  | \*****************************************************************************/ | 
|  |  | 
|  | #include <fcntl.h> | 
|  | #include <pthread.h> | 
|  | #include <stdint.h> | 
|  | #include <stdlib.h> | 
|  | #include <sys/epoll.h> | 
|  |  | 
|  | #include "slurm/slurm.h" | 
|  | #include "slurm/slurm_errno.h" | 
|  |  | 
|  | #include "src/common/fd.h" | 
|  | #include "src/common/log.h" | 
|  | #include "src/common/macros.h" | 
|  | #include "src/common/read_config.h" | 
|  | #include "src/common/timers.h" | 
|  | #include "src/common/xassert.h" | 
|  | #include "src/common/xmalloc.h" | 
|  | #include "src/common/xstring.h" | 
|  |  | 
|  | #include "src/conmgr/polling.h" | 
|  | #include "src/conmgr/events.h" | 
|  |  | 
|  | /* | 
|  | * Size event count for 1 input and 1 output per connection and | 
|  | * interrupt pipe fd. Allocated once to avoid calling | 
|  | * xrecalloc() every time poll() is called. | 
|  | */ | 
|  | #define MAX_POLL_EVENTS(max_connections) ((max_connections * 2) + 1) | 
|  |  | 
|  | /* string used for interrupt name in logging to match style of others fds */ | 
|  | #define INTERRUPT_CON_NAME "interrupt" | 
|  |  | 
|  | /* | 
|  | * Need an arbitrary sized of bytes to ensure the pipe has been cleared of all | 
|  | * bytes in a single read() even though there should only ever be 1 byte. | 
|  | */ | 
|  | #define FLUSH_BUFFER_BYTES 100 | 
|  |  | 
|  | /* Flags to be used for each type of fd */ | 
|  | #define T(type, events) { type, XSTRINGIFY(type), events, XSTRINGIFY(events) } | 
|  | static const struct { | 
|  | pollctl_fd_type_t type; | 
|  | const char *type_string; | 
|  | uint32_t events; | 
|  | const char *events_string; | 
|  | } fd_types[] = { | 
|  | T(PCTL_TYPE_INVALID, 0), | 
|  | T(PCTL_TYPE_UNSUPPORTED, 0), | 
|  | T(PCTL_TYPE_NONE, 0), | 
|  | T(PCTL_TYPE_CONNECTED, (EPOLLHUP | EPOLLERR | EPOLLET)), | 
|  | T(PCTL_TYPE_READ_ONLY, (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | | 
|  | EPOLLET)), | 
|  | T(PCTL_TYPE_READ_WRITE, | 
|  | (EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLET)), | 
|  | T(PCTL_TYPE_WRITE_ONLY, (EPOLLOUT | EPOLLHUP | EPOLLERR | EPOLLET)), | 
|  | T(PCTL_TYPE_LISTEN, (EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLET)), | 
|  | T(PCTL_TYPE_INVALID_MAX, 0), | 
|  | }; | 
|  | #undef T | 
|  |  | 
|  | #define T(flag) { flag, XSTRINGIFY(flag) } | 
|  | static const struct { | 
|  | uint32_t flag; | 
|  | const char *string; | 
|  | } epoll_events[] = { | 
|  | T(EPOLLIN), | 
|  | T(EPOLLOUT), | 
|  | T(EPOLLPRI), | 
|  | T(EPOLLERR), | 
|  | T(EPOLLHUP), | 
|  | T(EPOLLRDHUP), | 
|  | T(EPOLLET), | 
|  | T(EPOLLONESHOT), | 
|  | T(EPOLLWAKEUP), | 
|  | #ifdef EPOLLEXCLUSIVE | 
|  | T(EPOLLEXCLUSIVE), | 
|  | #endif | 
|  | }; | 
|  | #undef T | 
|  |  | 
|  | #define PCTL_INITIALIZER \ | 
|  | { \ | 
|  | .mutex = PTHREAD_MUTEX_INITIALIZER, \ | 
|  | .poll_return = EVENT_INITIALIZER("POLL_RETURN"), \ | 
|  | .interrupt_return = EVENT_INITIALIZER("INTERRUPT_RETURN"), \ | 
|  | .epoll = -1, \ | 
|  | .interrupt =  { \ | 
|  | .send = -1, \ | 
|  | .receive = 1, \ | 
|  | }, \ | 
|  | } | 
|  |  | 
|  | static struct pctl_s { | 
|  | pthread_mutex_t mutex; | 
|  |  | 
|  | /* Is currently initialized */ | 
|  | bool initialized; | 
|  |  | 
|  | /* event to wait on pollctl_for_each_event() to return */ | 
|  | event_signal_t poll_return; | 
|  | /* event to wait on pollctl_interrupt() to return */ | 
|  | event_signal_t interrupt_return; | 
|  |  | 
|  | /* True if actively polling() */ | 
|  | bool polling; | 
|  | /* file descriptor for epoll */ | 
|  | int epoll; | 
|  | /* array holding results of epoll */ | 
|  | struct epoll_event *events; | 
|  | /* number of elements in events array */ | 
|  | int events_count; | 
|  | /* | 
|  | * Number of elements triggred in last epoll_wait(). | 
|  | * Only set when polling=true. | 
|  | */ | 
|  | int events_triggered; | 
|  | /* number of file descriptors currently registered */ | 
|  | int fd_count; | 
|  |  | 
|  | struct { | 
|  | /* pipe() used to break out of epoll() */ | 
|  | int send; | 
|  | int receive; | 
|  |  | 
|  | /* number of times interrupt requested */ | 
|  | int requested; | 
|  |  | 
|  | /* if a thread currently trying to send byte */ | 
|  | bool sending; | 
|  | } interrupt; | 
|  | } pctl = PCTL_INITIALIZER; | 
|  |  | 
|  | static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
|  | const char *caller); | 
|  | static int _unlink_fd(int fd, const char *con_name, const char *caller); | 
|  |  | 
|  | static const char *_type_to_string(pollctl_fd_type_t type) | 
|  | { | 
|  | for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
|  | if (fd_types[i].type == type) | 
|  | return fd_types[i].type_string; | 
|  |  | 
|  | fatal_abort("should never execute"); | 
|  | } | 
|  |  | 
|  | static char *_epoll_events_to_string(uint32_t events) | 
|  | { | 
|  | char *str = NULL, *at = NULL; | 
|  | uint32_t matched = 0; | 
|  |  | 
|  | if (!events) | 
|  | return xstrdup_printf("0"); | 
|  |  | 
|  | for (int i = 0; i < ARRAY_SIZE(epoll_events); i++) { | 
|  | if ((epoll_events[i].flag & events) == epoll_events[i].flag) { | 
|  | xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""), | 
|  | epoll_events[i].string); | 
|  | matched |= epoll_events[i].flag; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (events ^ matched) | 
|  | xstrfmtcatat(str, &at, "%s0x%08"PRIx32, (str ? "|" : ""), | 
|  | (events ^ matched)); | 
|  |  | 
|  | return str; | 
|  | } | 
|  |  | 
|  | static uint32_t _fd_type_to_events(pollctl_fd_type_t type) | 
|  | { | 
|  | for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
|  | if (fd_types[i].type == type) | 
|  | return fd_types[i].events; | 
|  |  | 
|  | fatal_abort("should never happen"); | 
|  | } | 
|  |  | 
|  | static const char *_fd_type_to_type_string(pollctl_fd_type_t type) | 
|  | { | 
|  | for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
|  | if (fd_types[i].type == type) | 
|  | return fd_types[i].type_string; | 
|  |  | 
|  | fatal_abort("should never happen"); | 
|  | } | 
|  |  | 
|  | static const char *_fd_type_to_events_string(pollctl_fd_type_t type) | 
|  | { | 
|  | for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
|  | if (fd_types[i].type == type) | 
|  | return fd_types[i].events_string; | 
|  |  | 
|  | fatal_abort("should never happen"); | 
|  | } | 
|  |  | 
|  | static void _check_pctl_magic(void) | 
|  | { | 
|  | #ifndef NDEBUG | 
|  | /* check file descriptors are not sane */ | 
|  | xassert(pctl.initialized); | 
|  | xassert(pctl.epoll >= 0); | 
|  | xassert(pctl.interrupt.send >= 0); | 
|  | xassert(pctl.interrupt.receive >= 0); | 
|  | xassert(pctl.epoll != pctl.interrupt.send); | 
|  | xassert(pctl.epoll != pctl.interrupt.receive); | 
|  | xassert(pctl.interrupt.send != pctl.interrupt.receive); | 
|  | xassert(pctl.fd_count >= 0); | 
|  |  | 
|  | xassert(pctl.interrupt.requested >= 0); | 
|  | #endif /* !NDEBUG */ | 
|  | } | 
|  |  | 
|  | static void _atfork_child(void) | 
|  | { | 
|  | /* | 
|  | * Force pctl to return to default state before it was initialized at | 
|  | * forking as all of the prior state is completely unusable. | 
|  | */ | 
|  | pctl = (struct pctl_s) PCTL_INITIALIZER; | 
|  | } | 
|  |  | 
|  | static void _init(const int max_connections) | 
|  | { | 
|  | int rc; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  |  | 
|  | if (pctl.initialized) { | 
|  | log_flag(CONMGR, "%s: Skipping. Already initialized", __func__); | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  | return; | 
|  | } | 
|  |  | 
|  | pctl.events_count = MAX_POLL_EVENTS(max_connections); | 
|  |  | 
|  | if ((rc = pthread_atfork(NULL, NULL, _atfork_child))) | 
|  | fatal_abort("%s: pthread_atfork() failed: %s", | 
|  | __func__, slurm_strerror(rc)); | 
|  |  | 
|  | { | 
|  | int fd[2] = { -1, -1 }; | 
|  | if (pipe(fd)) | 
|  | fatal("%s: unable to open unnamed pipe: %m", __func__); | 
|  |  | 
|  | fd_set_nonblocking(fd[0]); | 
|  | fd_set_close_on_exec(fd[0]); | 
|  | pctl.interrupt.receive = fd[0]; | 
|  |  | 
|  | fd_set_blocking(fd[1]); | 
|  | fd_set_close_on_exec(fd[1]); | 
|  | pctl.interrupt.send = fd[1]; | 
|  | } | 
|  |  | 
|  | if ((pctl.epoll = epoll_create1(EPOLL_CLOEXEC)) < 0) | 
|  | fatal_abort("%s: epoll_create1(FD_CLOEXEC) failed which should never happen: %m", | 
|  | __func__); | 
|  |  | 
|  | pctl.events = xcalloc(pctl.events_count, sizeof(*pctl.events)); | 
|  | pctl.initialized = true; | 
|  |  | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | if (_link_fd(pctl.interrupt.receive, PCTL_TYPE_READ_ONLY, | 
|  | INTERRUPT_CON_NAME, __func__)) | 
|  | fatal_abort("unable to monitor interrupt"); | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  | } | 
|  |  | 
|  | static void _fini(void) | 
|  | { | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | if (!pctl.initialized) { | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  | return; | 
|  | } | 
|  |  | 
|  | while (pctl.interrupt.sending) | 
|  | EVENT_WAIT(&pctl.interrupt_return, &pctl.mutex); | 
|  |  | 
|  | while (pctl.polling) | 
|  | EVENT_WAIT(&pctl.poll_return, &pctl.mutex); | 
|  |  | 
|  | #ifdef MEMORY_LEAK_DEBUG | 
|  | (void) _unlink_fd(pctl.interrupt.receive, INTERRUPT_CON_NAME, __func__); | 
|  |  | 
|  | fd_close(&pctl.interrupt.receive); | 
|  | fd_close(&pctl.interrupt.send); | 
|  | fd_close(&pctl.epoll); | 
|  |  | 
|  | xfree(pctl.events); | 
|  | EVENT_FREE_MEMBERS(&pctl.poll_return); | 
|  | EVENT_FREE_MEMBERS(&pctl.interrupt_return); | 
|  |  | 
|  | pctl.initialized = false; | 
|  | #endif /* MEMORY_LEAK_DEBUG */ | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | /* | 
|  | * lock is never destroyed | 
|  | * slurm_mutex_destroy(&pctl.mutex); | 
|  | */ | 
|  | } | 
|  |  | 
|  | /* caller must hold pctl.mutex lock */ | 
|  | static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
|  | const char *caller) | 
|  | { | 
|  | struct epoll_event ev = { | 
|  | .events = _fd_type_to_events(type), | 
|  | .data.fd = fd, | 
|  | }; | 
|  |  | 
|  | if (epoll_ctl(pctl.epoll, EPOLL_CTL_ADD, ev.data.fd, &ev)) { | 
|  | int rc = errno; | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] epoll_ctl(EPOLL_CTL_ADD, %d, %s) failed: %s", | 
|  | caller, __func__, con_name, ev.data.fd, | 
|  | _fd_type_to_events_string(type), slurm_strerror(rc)); | 
|  |  | 
|  | return rc; | 
|  | } else if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] registered fd[%s]:%d for %s events", | 
|  | caller, __func__, con_name, | 
|  | _fd_type_to_type_string(type), fd, | 
|  | _fd_type_to_events_string(type)); | 
|  |  | 
|  | pctl.fd_count++; | 
|  | return SLURM_SUCCESS; | 
|  | } | 
|  |  | 
|  | static int _lock_link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
|  | const char *caller) | 
|  | { | 
|  | int rc; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  | rc = _link_fd(fd, type, con_name, caller); | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static int _relink_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
|  | const char *caller) | 
|  | { | 
|  | struct epoll_event ev = { | 
|  | .events = _fd_type_to_events(type), | 
|  | .data.fd = fd, | 
|  | }; | 
|  | int rc = SLURM_SUCCESS; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | if (epoll_ctl(pctl.epoll, EPOLL_CTL_MOD, ev.data.fd, &ev)) { | 
|  | rc = errno; | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] epoll_ctl(EPOLL_CTL_MOD, %d, %s) failed: %m", | 
|  | caller, __func__, con_name, ev.data.fd, | 
|  | _fd_type_to_events_string(type)); | 
|  | } else if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] Modified fd[%s]:%d for %s events", | 
|  | caller, __func__, con_name, | 
|  | _fd_type_to_type_string(type), fd, | 
|  | _fd_type_to_events_string(type)); | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* caller must hold pctl.mutex */ | 
|  | static int _unlink_fd(int fd, const char *con_name, const char *caller) | 
|  | { | 
|  | int rc = SLURM_SUCCESS; | 
|  |  | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | if (epoll_ctl(pctl.epoll, EPOLL_CTL_DEL, fd, NULL)) { | 
|  | rc = errno; | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] epoll_ctl(EPOLL_CTL_DEL, %d) failed: %m", | 
|  | caller, __func__, con_name, fd); | 
|  | } else if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] deregistered fd:%d events", | 
|  | caller, __func__, con_name, fd); | 
|  |  | 
|  | pctl.fd_count--; | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static int _lock_unlink_fd(int fd, const char *con_name, const char *caller) | 
|  | { | 
|  | int rc; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | rc = _unlink_fd(fd, con_name, caller); | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static void _flush_interrupt(int intr_fd, uint32_t events, const char *caller) | 
|  | { | 
|  | ssize_t event_read = -1; | 
|  | char buf[FLUSH_BUFFER_BYTES]; /* buffer for event_read */ | 
|  |  | 
|  | /* clear trash from the interrupt pipe */ | 
|  |  | 
|  | if ((event_read = read(intr_fd, buf, sizeof(buf)) < 0) && | 
|  | (errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) | 
|  | fatal_abort("this should never happen read(%d)=%m", intr_fd); | 
|  |  | 
|  | /* only 1 byte should ever get written to pipe at a time */ | 
|  | xassert(event_read <= 1); | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL:%s] read %zd bytes representing %d pending requests while sending=%c", | 
|  | caller, __func__, INTERRUPT_CON_NAME, event_read, | 
|  | pctl.interrupt.requested, | 
|  | (pctl.interrupt.sending ? 'T' : 'F')); | 
|  |  | 
|  | /* reset counter */ | 
|  | pctl.interrupt.requested = 0; | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  | } | 
|  |  | 
|  | static int _poll(const char *caller) | 
|  | { | 
|  | int nfds = -1, rc = SLURM_SUCCESS, events_count = 0, epoll = -1; | 
|  | int fd_count = 0; | 
|  | struct epoll_event *events = NULL; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | /* | 
|  | * Using pctl.polling as way to avoid touching pctl.events while not | 
|  | * holding the mutex so poll can be done without the lock. | 
|  | */ | 
|  | xassert(!pctl.polling); | 
|  | xassert(!pctl.events_triggered); | 
|  | pctl.polling = true; | 
|  | events_count = pctl.events_count; | 
|  | epoll = pctl.epoll; | 
|  | fd_count = pctl.fd_count; | 
|  | events = pctl.events; | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] BEGIN: epoll_wait() with %d file descriptors", | 
|  | caller, __func__, pctl.fd_count); | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | xassert(events_count > 0); | 
|  |  | 
|  | if (fd_count <= 1) { | 
|  | /* | 
|  | * No point in running poll() when only file descriptor is the | 
|  | * interrupt pipe | 
|  | */ | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] skipping epoll_wait() with %d file descriptors", | 
|  | caller, __func__, fd_count); | 
|  | nfds = 0; | 
|  | } else if ((nfds = epoll_wait(epoll, events, events_count, -1)) < 0) { | 
|  | rc = errno; | 
|  | } | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  |  | 
|  | xassert(nfds <= pctl.events_count); | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] END: epoll_wait() with events for %d/%d file descriptors", | 
|  | caller, __func__, nfds, pctl.fd_count); | 
|  |  | 
|  | if (nfds > 0) { | 
|  | /* wait for pollctl_for_each_event() to do anything */ | 
|  | pctl.events_triggered = nfds; | 
|  | } else if (!nfds) { | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] END: epoll_wait() reported 0 events for %d file descriptors", | 
|  | caller, __func__, pctl.fd_count); | 
|  | } else if (rc == EINTR) { | 
|  | /* Treat EINTR as no events detected */ | 
|  | nfds = 0; | 
|  | rc = SLURM_SUCCESS; | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] END: epoll_wait() interrupted by signal", | 
|  | caller, __func__); | 
|  | } else { | 
|  | fatal_abort("%s->%s: [EPOLL] END: epoll_wait() failed: %m", | 
|  | caller, __func__); | 
|  | } | 
|  |  | 
|  | /* pctl.polling is set to false by pollctl_for_each_event() */ | 
|  | xassert(pctl.polling); | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | static int _for_each_event(pollctl_event_func_t func, void *arg, | 
|  | const char *func_name, const char *caller) | 
|  | { | 
|  | int nfds = -1, rc = SLURM_SUCCESS, intr_fd = -1; | 
|  | struct epoll_event *events = NULL; | 
|  | event_signal_t *poll_return = NULL; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | xassert(pctl.polling); | 
|  |  | 
|  | events = pctl.events; | 
|  | nfds = pctl.events_triggered; | 
|  | intr_fd = pctl.interrupt.receive; | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | for (int i = 0; !rc && (i < nfds); ++i) { | 
|  | char *events_str = NULL; | 
|  |  | 
|  | if (events[i].data.fd == intr_fd) { | 
|  | _flush_interrupt(intr_fd, events[i].events, caller); | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
|  | events_str = _epoll_events_to_string(events[i].events); | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] BEGIN: calling %s(fd:%d, (%s), 0x%"PRIxPTR")", | 
|  | caller, __func__, func_name, events[i].data.fd, | 
|  | events_str, (uintptr_t) arg); | 
|  |  | 
|  | rc = func(events[i].data.fd, events[i].events, arg); | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] END: called %s(fd:%d, (%s), 0x%"PRIxPTR")=%s", | 
|  | caller, __func__, func_name, events[i].data.fd, | 
|  | events_str, (uintptr_t) arg, slurm_strerror(rc)); | 
|  |  | 
|  | xfree(events_str); | 
|  | } | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  |  | 
|  | xassert(pctl.polling); | 
|  | pctl.polling = false; | 
|  | pctl.events_triggered = 0; | 
|  | poll_return = &pctl.poll_return; | 
|  |  | 
|  | EVENT_BROADCAST(poll_return); | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | return rc; | 
|  | } | 
|  |  | 
|  | /* send 1 byte without lock */ | 
|  | static int _intr_send_byte(int fd, const char *caller) | 
|  | { | 
|  | DEF_TIMERS; | 
|  | char buf[] = "1"; | 
|  |  | 
|  | if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
|  | START_TIMER; | 
|  |  | 
|  | /* send 1 byte of trash to wake up poll() */ | 
|  | safe_write(fd, buf, 1); | 
|  |  | 
|  | if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { | 
|  | END_TIMER3(NULL, 0); | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] interrupt byte sent in %s", | 
|  | caller, __func__, TIME_STR); | 
|  | } | 
|  |  | 
|  | return SLURM_SUCCESS; | 
|  | rwfail: | 
|  | return errno; | 
|  | } | 
|  |  | 
|  | static void _interrupt(const char *caller) | 
|  | { | 
|  | event_signal_t *interrupt_return = NULL; | 
|  | int rc, fd = -1; | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | if (!pctl.polling) { | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] skipping sending interrupt when not actively poll()ing", | 
|  | caller, __func__); | 
|  | } else { | 
|  | pctl.interrupt.requested++; | 
|  |  | 
|  | /* Check for duplicate requests. */ | 
|  | if (pctl.interrupt.requested == 1) { | 
|  | fd = pctl.interrupt.send; | 
|  | xassert(!pctl.interrupt.sending); | 
|  | pctl.interrupt.sending = true; | 
|  | interrupt_return = &pctl.interrupt_return; | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] sending interrupt requests=%d", | 
|  | caller, __func__, | 
|  | pctl.interrupt.requested); | 
|  | } else { | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] skipping sending another interrupt requests=%d sending=%c", | 
|  | caller, __func__, | 
|  | pctl.interrupt.requested, | 
|  | (pctl.interrupt.sending ? 'T' : 'F')); | 
|  | } | 
|  | } | 
|  |  | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  |  | 
|  | if (fd < 0) | 
|  | return; | 
|  |  | 
|  | if ((rc = _intr_send_byte(fd, caller))) { | 
|  | error("%s->%s: [EPOLL] write(%d) failed: %s", | 
|  | caller, __func__, fd, slurm_strerror(errno)); | 
|  | } | 
|  |  | 
|  | slurm_mutex_lock(&pctl.mutex); | 
|  | _check_pctl_magic(); | 
|  |  | 
|  | log_flag(CONMGR, "%s->%s: [EPOLL] interrupt sent requests=%d polling=%c", | 
|  | caller, __func__, pctl.interrupt.requested, | 
|  | (pctl.polling ? 'T' : 'F')); | 
|  |  | 
|  | xassert(fd == pctl.interrupt.send); | 
|  | xassert(pctl.interrupt.sending); | 
|  | pctl.interrupt.sending = false; | 
|  |  | 
|  | EVENT_BROADCAST(interrupt_return); | 
|  | slurm_mutex_unlock(&pctl.mutex); | 
|  | } | 
|  |  | 
|  | static bool _events_can_read(pollctl_events_t events) | 
|  | { | 
|  | /* | 
|  | * Allow read()/write() to catch EPOLLRDHUP AND EPOLLHUP as there may | 
|  | * still be more bytes the fd's buffers and we don't want to close() the | 
|  | * connection yet either to drop those buffers on the floor. | 
|  | */ | 
|  | return (events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP)); | 
|  | } | 
|  |  | 
|  | static bool _events_can_write(pollctl_events_t events) | 
|  | { | 
|  | return (events & (EPOLLOUT | EPOLLRDHUP | EPOLLHUP)); | 
|  | } | 
|  |  | 
|  | static bool _events_has_error(pollctl_events_t events) | 
|  | { | 
|  | return (events & EPOLLERR); | 
|  | } | 
|  |  | 
|  | static bool _events_has_hangup(pollctl_events_t events) | 
|  | { | 
|  | return (events & (EPOLLRDHUP | EPOLLHUP)); | 
|  | } | 
|  |  | 
|  | const poll_funcs_t epoll_funcs = { | 
|  | .mode = POLL_MODE_EPOLL, | 
|  | .init = _init, | 
|  | .fini = _fini, | 
|  | .type_to_string = _type_to_string, | 
|  | .link_fd = _lock_link_fd, | 
|  | .relink_fd = _relink_fd, | 
|  | .unlink_fd = _lock_unlink_fd, | 
|  | .poll = _poll, | 
|  | .for_each_event = _for_each_event, | 
|  | .interrupt = _interrupt, | 
|  | .events_can_read = _events_can_read, | 
|  | .events_can_write = _events_can_write, | 
|  | .events_has_error = _events_has_error, | 
|  | .events_has_hangup = _events_has_hangup, | 
|  | }; |