| /*****************************************************************************\ | 
 |  *  poll.c - Definitions for poll() handlers | 
 |  ***************************************************************************** | 
 |  *  Copyright (C) SchedMD LLC. | 
 |  * | 
 |  *  This file is part of Slurm, a resource management program. | 
 |  *  For details, see <https://slurm.schedmd.com/>. | 
 |  *  Please also read the included file: DISCLAIMER. | 
 |  * | 
 |  *  Slurm is free software; you can redistribute it and/or modify it under | 
 |  *  the terms of the GNU General Public License as published by the Free | 
 |  *  Software Foundation; either version 2 of the License, or (at your option) | 
 |  *  any later version. | 
 |  * | 
 |  *  In addition, as a special exception, the copyright holders give permission | 
 |  *  to link the code of portions of this program with the OpenSSL library under | 
 |  *  certain conditions as described in each individual source file, and | 
 |  *  distribute linked combinations including the two. You must obey the GNU | 
 |  *  General Public License in all respects for all of the code used other than | 
 |  *  OpenSSL. If you modify file(s) with this exception, you may extend this | 
 |  *  exception to your version of the file(s), but you are not obligated to do | 
 |  *  so. If you do not wish to do so, delete this exception statement from your | 
 |  *  version.  If you delete this exception statement from all source files in | 
 |  *  the program, then also delete it here. | 
 |  * | 
 |  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY | 
 |  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
 |  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more | 
 |  *  details. | 
 |  * | 
 |  *  You should have received a copy of the GNU General Public License along | 
 |  *  with Slurm; if not, write to the Free Software Foundation, Inc., | 
 |  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA. | 
 | \*****************************************************************************/ | 
 |  | 
 | #include <fcntl.h> | 
 | #include <poll.h> | 
 | #include <pthread.h> | 
 | #include <stddef.h> | 
 | #include <stdint.h> | 
 | #include <stdlib.h> | 
 |  | 
 | #include "slurm/slurm.h" | 
 | #include "slurm/slurm_errno.h" | 
 |  | 
 | #include "src/common/fd.h" | 
 | #include "src/common/log.h" | 
 | #include "src/common/macros.h" | 
 | #include "src/common/read_config.h" | 
 | #include "src/common/timers.h" | 
 | #include "src/common/xassert.h" | 
 | #include "src/common/xmalloc.h" | 
 | #include "src/common/xstring.h" | 
 |  | 
 | #include "src/conmgr/events.h" | 
 | #include "src/conmgr/polling.h" | 
 |  | 
 | /* | 
 |  * Size event count for 1 input and 1 output per connection and interrupt pipe | 
 |  * fd. Add 35% buffer of extra events to account for non-listener creating | 
 |  * connections. Allocated once to avoid calling xrecalloc() every time poll() is | 
 |  * called. | 
 |  */ | 
 | #define MAX_POLL_EVENTS(max_connections) (((max_connections * 2) + 1) * 1.35) | 
 | /* Increase poll events by amount when not enough slots available */ | 
 | #define POLL_EVENTS_INCREASE(old) ((old) * 2) | 
 |  | 
 | /* string used for interrupt name in logging to match style of others fds */ | 
 | #define INTERRUPT_CON_NAME "interrupt" | 
 |  | 
 | /* | 
 |  * Need an arbitrary sized of bytes to ensure the pipe has been cleared of all | 
 |  * bytes in a single read() even though there should only ever be 1 byte. | 
 |  */ | 
 | #define FLUSH_BUFFER_BYTES 100 | 
 |  | 
 | /* Flags to be used for each type of fd */ | 
 | #define T(type, events) { type, XSTRINGIFY(type), events, XSTRINGIFY(events) } | 
 |  | 
 | static const struct { | 
 | 	pollctl_fd_type_t type; | 
 | 	const char *type_string; | 
 | 	uint32_t events; | 
 | 	const char *events_string; | 
 | } fd_types[] = { | 
 | 	T(PCTL_TYPE_INVALID, 0), | 
 | 	T(PCTL_TYPE_UNSUPPORTED, 0), | 
 | 	T(PCTL_TYPE_NONE, 0), | 
 | 	T(PCTL_TYPE_CONNECTED, (POLLHUP | POLLERR)), | 
 | 	T(PCTL_TYPE_READ_ONLY, (POLLIN | POLLHUP | POLLERR)), | 
 | 	T(PCTL_TYPE_READ_WRITE, (POLLIN | POLLOUT | POLLHUP | POLLERR)), | 
 | 	T(PCTL_TYPE_WRITE_ONLY, (POLLOUT | POLLHUP | POLLERR)), | 
 | 	T(PCTL_TYPE_LISTEN, (POLLIN | POLLHUP | POLLERR)), | 
 | 	T(PCTL_TYPE_INVALID_MAX, 0), | 
 | }; | 
 |  | 
 | #undef T | 
 |  | 
 | #define T(flag) { flag, XSTRINGIFY(flag) } | 
 |  | 
 | static const struct { | 
 | 	uint32_t flag; | 
 | 	const char *string; | 
 | } poll_events[] = { | 
 | 	T(POLLIN),     T(POLLPRI),    T(POLLOUT),    T(POLLERR), | 
 | 	T(POLLHUP),    T(POLLNVAL), | 
 | #ifdef _XOPEN_SOURCE | 
 | #ifdef __linux__ | 
 | 	T(POLLRDHUP), | 
 | #endif /* __linux__ */ | 
 | 	T(POLLRDNORM), T(POLLRDBAND), T(POLLWRNORM), T(POLLWRBAND), | 
 | #endif /* _XOPEN_SOURCE */ | 
 | }; | 
 | #undef T | 
 |  | 
 | #define PCTL_INITIALIZER \ | 
 | 	{ \ | 
 | 	.mutex = PTHREAD_MUTEX_INITIALIZER, \ | 
 | 	.poll_return = EVENT_INITIALIZER("POLL_RETURN"), \ | 
 | 	.interrupt_return = EVENT_INITIALIZER("INTERRUPT_RETURN"), \ | 
 | 	.interrupt =  { \ | 
 | 		.send = -1, \ | 
 | 		.receive = 1, \ | 
 | 	}, \ | 
 | } | 
 |  | 
 | static struct pctl_s { | 
 | 	pthread_mutex_t mutex; | 
 |  | 
 | 	/* Is currently initialized */ | 
 | 	bool initialized; | 
 |  | 
 | 	/* event to wait on pollctl_for_each_event() to return */ | 
 | 	event_signal_t poll_return; | 
 | 	/* event to wait on pollctl_interrupt() to return */ | 
 | 	event_signal_t interrupt_return; | 
 |  | 
 | 	/* True if actively polling() */ | 
 | 	bool polling; | 
 |  | 
 | 	/* array of pollfd structs */ | 
 | 	struct pollfd *events; | 
 | 	/* number of elements in events array */ | 
 | 	int events_count; | 
 |  | 
 | 	/* array of file descriptors and polling type to poll() */ | 
 | 	struct { | 
 | 		pollctl_fd_type_t type; | 
 | 		int fd; | 
 | 	} *fds; | 
 |  | 
 | 	/* number of file descriptors currently registered */ | 
 | 	int fd_count; | 
 |  | 
 | 	struct { | 
 | 		/* pipe() used to break out of poll() */ | 
 | 		int send; | 
 | 		int receive; | 
 |  | 
 | 		/* number of times interrupt requested */ | 
 | 		int requested; | 
 |  | 
 | 		/* if a thread currently trying to send byte */ | 
 | 		bool sending; | 
 | 	} interrupt; | 
 | } pctl = PCTL_INITIALIZER; | 
 |  | 
 | static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
 | 		    const char *caller); | 
 | static int _unlink_fd(int fd, const char *con_name, const char *caller); | 
 | static void _interrupt(const char *caller); | 
 |  | 
 | static const char *_type_to_string(pollctl_fd_type_t type) | 
 | { | 
 | 	for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
 | 		if (fd_types[i].type == type) | 
 | 			return fd_types[i].type_string; | 
 |  | 
 | 	fatal_abort("should never execute"); | 
 | } | 
 |  | 
 | static char *_poll_events_to_string(uint32_t events) | 
 | { | 
 | 	char *str = NULL, *at = NULL; | 
 | 	uint32_t matched = 0; | 
 |  | 
 | 	if (!events) | 
 | 		return xstrdup_printf("0"); | 
 |  | 
 | 	for (int i = 0; i < ARRAY_SIZE(poll_events); i++) { | 
 | 		if ((poll_events[i].flag & events) == poll_events[i].flag) { | 
 | 			xstrfmtcatat(str, &at, "%s%s", (str ? "|" : ""), | 
 | 				     poll_events[i].string); | 
 | 			matched |= poll_events[i].flag; | 
 | 		} | 
 | 	} | 
 |  | 
 | 	if (events ^ matched) | 
 | 		xstrfmtcatat(str, &at, "%s0x%08" PRIx32, (str ? "|" : ""), | 
 | 			     (events ^ matched)); | 
 |  | 
 | 	return str; | 
 | } | 
 |  | 
 | static uint32_t _fd_type_to_events(pollctl_fd_type_t type) | 
 | { | 
 | 	for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
 | 		if (fd_types[i].type == type) | 
 | 			return fd_types[i].events; | 
 |  | 
 | 	fatal_abort("should never happen"); | 
 | } | 
 |  | 
 | static const char *_fd_type_to_type_string(pollctl_fd_type_t type) | 
 | { | 
 | 	for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
 | 		if (fd_types[i].type == type) | 
 | 			return fd_types[i].type_string; | 
 |  | 
 | 	fatal_abort("should never happen"); | 
 | } | 
 |  | 
 | static const char *_fd_type_to_events_string(pollctl_fd_type_t type) | 
 | { | 
 | 	for (int i = 0; i < ARRAY_SIZE(fd_types); i++) | 
 | 		if (fd_types[i].type == type) | 
 | 			return fd_types[i].events_string; | 
 |  | 
 | 	fatal_abort("should never happen"); | 
 | } | 
 |  | 
 | static void _check_pctl_magic(void) | 
 | { | 
 | #ifndef NDEBUG | 
 | 	/* check file descriptors are not sane */ | 
 | 	xassert(pctl.initialized); | 
 | 	xassert(pctl.interrupt.send >= 0); | 
 | 	xassert(pctl.interrupt.receive >= 0); | 
 | 	xassert(pctl.interrupt.send != pctl.interrupt.receive); | 
 | 	xassert(pctl.fd_count >= 0); | 
 | 	xassert(pctl.interrupt.requested >= 0); | 
 | #endif /* !NDEBUG */ | 
 | } | 
 |  | 
 | static void _atfork_child(void) | 
 | { | 
 | 	/* | 
 | 	 * Force pctl to return to default state before it was initialized at | 
 | 	 * forking as all of the prior state is completely unusable. | 
 | 	 */ | 
 | 	pctl = (struct pctl_s) PCTL_INITIALIZER; | 
 | } | 
 |  | 
 | static void _init_events(void) | 
 | { | 
 | 	for (int i = 0; i < pctl.events_count; i++) { | 
 | 		if (pctl.events[i].fd != pctl.interrupt.receive) { | 
 | 			pctl.events[i].fd = -1; | 
 | 			pctl.events[i].events = 0; | 
 | 			pctl.events[i].revents = 0; | 
 | 		} | 
 |  | 
 | 		if (pctl.fds[i].fd != pctl.interrupt.receive) { | 
 | 			pctl.fds[i].fd = -1; | 
 | 			pctl.fds[i].type = PCTL_TYPE_NONE; | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | static void _init(const int max_connections) | 
 | { | 
 | 	int rc; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 |  | 
 | 	if (pctl.initialized) { | 
 | 		log_flag(CONMGR, "%s: Skipping. Already initialized", __func__); | 
 | 		slurm_mutex_unlock(&pctl.mutex); | 
 | 		return; | 
 | 	} | 
 |  | 
 | 	pctl.events_count = MAX_POLL_EVENTS(max_connections); | 
 |  | 
 | 	if ((rc = pthread_atfork(NULL, NULL, _atfork_child))) | 
 | 		fatal_abort("%s: pthread_atfork() failed: %s", __func__, | 
 | 			    slurm_strerror(rc)); | 
 |  | 
 | 	{ | 
 | 		int fd[2] = { -1, -1 }; | 
 | 		if (pipe(fd)) | 
 | 			fatal("%s: unable to open unnamed pipe: %m", __func__); | 
 |  | 
 | 		fd_set_nonblocking(fd[0]); | 
 | 		fd_set_close_on_exec(fd[0]); | 
 | 		pctl.interrupt.receive = fd[0]; | 
 |  | 
 | 		fd_set_blocking(fd[1]); | 
 | 		fd_set_close_on_exec(fd[1]); | 
 | 		pctl.interrupt.send = fd[1]; | 
 | 	} | 
 |  | 
 | 	pctl.events = xcalloc(pctl.events_count, sizeof(*pctl.events)); | 
 | 	pctl.fds = xcalloc(pctl.events_count, sizeof(*pctl.fds)); | 
 | 	_init_events(); | 
 | 	pctl.initialized = true; | 
 |  | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	if (_link_fd(pctl.interrupt.receive, PCTL_TYPE_READ_ONLY, | 
 | 		     INTERRUPT_CON_NAME, __func__)) | 
 | 		fatal_abort("unable to monitor interrupt"); | 
 |  | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 | } | 
 |  | 
 | static void _fini(void) | 
 | { | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	if (!pctl.initialized) { | 
 | 		slurm_mutex_unlock(&pctl.mutex); | 
 | 		return; | 
 | 	} | 
 |  | 
 | 	while (pctl.interrupt.sending) | 
 | 		EVENT_WAIT(&pctl.interrupt_return, &pctl.mutex); | 
 |  | 
 | 	while (pctl.polling) | 
 | 		EVENT_WAIT(&pctl.poll_return, &pctl.mutex); | 
 |  | 
 | #ifdef MEMORY_LEAK_DEBUG | 
 | 	(void) _unlink_fd(pctl.interrupt.receive, INTERRUPT_CON_NAME, __func__); | 
 |  | 
 | 	fd_close(&pctl.interrupt.receive); | 
 | 	fd_close(&pctl.interrupt.send); | 
 |  | 
 | 	xfree(pctl.events); | 
 | 	xfree(pctl.fds); | 
 | 	EVENT_FREE_MEMBERS(&pctl.poll_return); | 
 | 	EVENT_FREE_MEMBERS(&pctl.interrupt_return); | 
 |  | 
 | 	pctl.initialized = false; | 
 | #endif /* MEMORY_LEAK_DEBUG */ | 
 |  | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	/* | 
 | 	 * lock is never destroyed | 
 | 	 * slurm_mutex_destroy(&pctl.mutex); | 
 | 	 */ | 
 | } | 
 |  | 
 | /* caller must hold pctl.mutex lock */ | 
 | static int _link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
 | 		    const char *caller) | 
 | { | 
 | 	for (int i = 0; i < pctl.events_count; i++) { | 
 | 		xassert(pctl.fds[i].fd != fd); | 
 |  | 
 | 		if (pctl.fds[i].fd != -1) | 
 | 			continue; | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL:%s] registered fd[%s]:%d for %s events", | 
 | 			 caller, __func__, con_name, | 
 | 			 _fd_type_to_type_string(type), fd, | 
 | 			 _fd_type_to_events_string(type)); | 
 |  | 
 | 		pctl.fds[i].fd = fd; | 
 | 		pctl.fds[i].type = type; | 
 | 		pctl.fd_count++; | 
 | 		return SLURM_SUCCESS; | 
 | 	} | 
 |  | 
 | 	/* No empty slots found -> need to increase events_count */ | 
 |  | 
 | 	log_flag(CONMGR, "%s->%s: [POLL] Increasing max events: %d -> %d", | 
 | 		 caller, __func__, pctl.events_count, | 
 | 		 POLL_EVENTS_INCREASE(pctl.events_count)); | 
 |  | 
 | 	pctl.events_count = POLL_EVENTS_INCREASE(pctl.events_count); | 
 | 	xrecalloc(pctl.events, pctl.events_count, sizeof(*pctl.events)); | 
 | 	xrecalloc(pctl.fds, pctl.events_count, sizeof(*pctl.fds)); | 
 |  | 
 | 	/* Run again as there will be enough slots available */ | 
 | 	return _link_fd(fd, type, con_name, caller); | 
 | } | 
 |  | 
 | static int _lock_link_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
 | 			 const char *caller) | 
 | { | 
 | 	int rc; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 | 	rc = _link_fd(fd, type, con_name, caller); | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 | 	_interrupt(caller); | 
 |  | 
 | 	return rc; | 
 | } | 
 |  | 
 | static int _relink_fd(int fd, pollctl_fd_type_t type, const char *con_name, | 
 | 		      const char *caller) | 
 | { | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	for (int i = 0; i < pctl.events_count; i++) { | 
 | 		if (pctl.fds[i].fd != fd) | 
 | 			continue; | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL:%s] Modified fd[%s]:%d for %s events", | 
 | 			 caller, __func__, con_name, | 
 | 			 _fd_type_to_type_string(type), fd, | 
 | 			 _fd_type_to_events_string(type)); | 
 |  | 
 | 		pctl.fds[i].type = type; | 
 | 		slurm_mutex_unlock(&pctl.mutex); | 
 | 		_interrupt(caller); | 
 | 		return SLURM_SUCCESS; | 
 | 	} | 
 |  | 
 | 	fatal_abort("should never happen"); | 
 | } | 
 |  | 
 | /* caller must hold pctl.mutex */ | 
 | static int _unlink_fd(int fd, const char *con_name, const char *caller) | 
 | { | 
 | 	int i = -1; | 
 |  | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	for (i = 0; i < pctl.events_count; i++) | 
 | 		if (pctl.fds[i].fd == fd) | 
 | 			break; | 
 |  | 
 | 	if (i >= pctl.events_count) | 
 | 		fatal_abort("should never happen"); | 
 |  | 
 | 	log_flag(CONMGR, "%s->%s: [POLL:%s] deregistered fd:%d events", caller, | 
 | 		 __func__, con_name, fd); | 
 |  | 
 | 	pctl.fds[i].fd = -1; | 
 | 	pctl.fds[i].type = PCTL_TYPE_NONE; | 
 | 	pctl.fd_count--; | 
 |  | 
 | 	return SLURM_SUCCESS; | 
 | } | 
 |  | 
 | static int _lock_unlink_fd(int fd, const char *con_name, const char *caller) | 
 | { | 
 | 	int rc; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 | 	rc = _unlink_fd(fd, con_name, caller); | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 | 	_interrupt(caller); | 
 |  | 
 | 	return rc; | 
 | } | 
 |  | 
 | static void _flush_interrupt(int intr_fd, uint32_t events, const char *caller) | 
 | { | 
 | 	ssize_t event_read = -1; | 
 | 	char buf[FLUSH_BUFFER_BYTES]; /* buffer for event_read */ | 
 |  | 
 | 	/* clear trash from the interrupt pipe */ | 
 |  | 
 | 	if ((event_read = read(intr_fd, buf, sizeof(buf)) < 0) && | 
 | 	    (errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) | 
 | 		fatal_abort("this should never happen read(%d)=%m", intr_fd); | 
 |  | 
 | 	/* only 1 byte should ever get written to pipe at a time */ | 
 | 	xassert(event_read <= 1); | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 |  | 
 | 	log_flag(CONMGR, "%s->%s: [POLL:%s] read %zd bytes representing %d pending requests while sending=%c", | 
 | 		 caller, __func__, INTERRUPT_CON_NAME, event_read, | 
 | 		 pctl.interrupt.requested, | 
 | 		 (pctl.interrupt.sending ? 'T' : 'F')); | 
 |  | 
 | 	/* reset counter */ | 
 | 	pctl.interrupt.requested = 0; | 
 |  | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 | } | 
 |  | 
 | static int _poll(const char *caller) | 
 | { | 
 | 	int nfds = -1, rc = SLURM_SUCCESS, events_count = 0; | 
 | 	int fd_count = 0; | 
 | 	struct pollfd *events = NULL; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	/* | 
 | 	 * Using pctl.polling as way to avoid touching pctl.events while not | 
 | 	 * holding the mutex so poll can be done without the lock. | 
 | 	 */ | 
 | 	xassert(!pctl.polling); | 
 | 	pctl.polling = true; | 
 | 	events = pctl.events; | 
 | 	events_count = pctl.events_count; | 
 | 	fd_count = pctl.fd_count; | 
 |  | 
 | 	if (!events_count || (fd_count <= 1)) { | 
 | 		/* | 
 | 		 * No point in running poll() when only file descriptor is the | 
 | 		 * interrupt pipe or none at all | 
 | 		 */ | 
 | 		slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] skipping poll() with %d/%d file descriptors", | 
 | 			 caller, __func__, fd_count, events_count); | 
 | 		return SLURM_SUCCESS; | 
 | 	} | 
 |  | 
 | 	log_flag(CONMGR, "%s->%s: [POLL] BEGIN: poll() with %d file descriptors", | 
 | 		 caller, __func__, pctl.fd_count); | 
 |  | 
 | 	for (int i = 0, t = 0; i < pctl.events_count; i++) { | 
 | 		if ((pctl.fds[i].fd < 0)) | 
 | 			continue; | 
 |  | 
 | 		xassert(pctl.fds[i].type > PCTL_TYPE_INVALID); | 
 | 		xassert(pctl.fds[i].type < PCTL_TYPE_INVALID_MAX); | 
 | 		xassert(pctl.fds[i].type != PCTL_TYPE_NONE); | 
 | 		xassert(pctl.fds[i].type != PCTL_TYPE_UNSUPPORTED); | 
 | 		xassert(t < fd_count); | 
 |  | 
 | 		pctl.events[t].fd = pctl.fds[i].fd; | 
 | 		pctl.events[t].events = _fd_type_to_events(pctl.fds[i].type); | 
 | 		pctl.events[t].revents = 0; | 
 | 		t++; | 
 | 	} | 
 |  | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	xassert(fd_count > 0); | 
 |  | 
 | 	if ((nfds = poll(events, fd_count, -1)) < 0) | 
 | 		rc = errno; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 |  | 
 | 	xassert(rc || (nfds <= pctl.events_count)); | 
 |  | 
 | 	log_flag(CONMGR, "%s->%s: [POLL] END: poll() with events for %d/%d file descriptors", | 
 | 		 caller, __func__, nfds, pctl.fd_count); | 
 |  | 
 | 	if (nfds > 0) { | 
 | 		/* wait for pollctl_for_each_event() to do anything */ | 
 | 	} else if (!nfds) { | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] END: poll() reported 0 events for %d file descriptors", | 
 | 			 caller, __func__, pctl.fd_count); | 
 | 	} else if (rc == EINTR) { | 
 | 		/* Treat EINTR as no events detected */ | 
 | 		rc = SLURM_SUCCESS; | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] END: poll() interrupted by signal", | 
 | 			 caller, __func__); | 
 | 	} else { | 
 | 		fatal_abort("%s->%s: [POLL] END: poll() failed: %m", caller, | 
 | 			    __func__); | 
 | 	} | 
 |  | 
 | 	/* pctl.polling is set to false by pollctl_for_each_event() */ | 
 | 	xassert(pctl.polling); | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	return rc; | 
 | } | 
 |  | 
 | static int _for_each_event(pollctl_event_func_t func, void *arg, | 
 | 			   const char *func_name, const char *caller) | 
 | { | 
 | 	int rc = SLURM_SUCCESS, intr_fd = -1; | 
 | 	struct pollfd *events = NULL; | 
 | 	event_signal_t *poll_return = NULL; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	xassert(pctl.polling); | 
 |  | 
 | 	events = pctl.events; | 
 | 	intr_fd = pctl.interrupt.receive; | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	for (int i = 0; !rc && (i < pctl.fd_count); ++i) { | 
 | 		char *events_str = NULL; | 
 |  | 
 | 		if (!events[i].revents) | 
 | 			continue; | 
 |  | 
 | 		if (events[i].fd == intr_fd) { | 
 | 			_flush_interrupt(intr_fd, events[i].revents, caller); | 
 | 			continue; | 
 | 		} | 
 |  | 
 | 		if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
 | 			events_str = _poll_events_to_string(events[i].revents); | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] BEGIN: calling %s(fd:%d, (%s), 0x%" PRIxPTR ")", | 
 | 			 caller, __func__, func_name, events[i].fd, events_str, | 
 | 			 (uintptr_t) arg); | 
 |  | 
 | 		rc = func(events[i].fd, events[i].revents, arg); | 
 |  | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] END: called %s(fd:%d, (%s), 0x%" PRIxPTR ")=%s", | 
 | 			 caller, __func__, func_name, events[i].fd, events_str, | 
 | 			 (uintptr_t) arg, slurm_strerror(rc)); | 
 |  | 
 | 		xfree(events_str); | 
 | 	} | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 |  | 
 | 	xassert(pctl.polling); | 
 | 	pctl.polling = false; | 
 | 	poll_return = &pctl.poll_return; | 
 |  | 
 | 	EVENT_BROADCAST(poll_return); | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	return rc; | 
 | } | 
 |  | 
 | /* send 1 byte without lock */ | 
 | static int _intr_send_byte(int fd, const char *caller) | 
 | { | 
 | 	DEF_TIMERS; | 
 | 	char buf[] = "1"; | 
 |  | 
 | 	if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) | 
 | 		START_TIMER; | 
 |  | 
 | 	/* send 1 byte of trash to wake up poll() */ | 
 | 	safe_write(fd, buf, 1); | 
 |  | 
 | 	if (slurm_conf.debug_flags & DEBUG_FLAG_CONMGR) { | 
 | 		END_TIMER3(NULL, 0); | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] interrupt byte sent in %s", | 
 | 			 caller, __func__, TIME_STR); | 
 | 	} | 
 |  | 
 | 	return SLURM_SUCCESS; | 
 | rwfail: | 
 | 	return errno; | 
 | } | 
 |  | 
 | static void _interrupt(const char *caller) | 
 | { | 
 | 	event_signal_t *interrupt_return = NULL; | 
 | 	int rc, fd = -1; | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	_check_pctl_magic(); | 
 |  | 
 | 	if (!pctl.polling) { | 
 | 		log_flag(CONMGR, "%s->%s: [POLL] skipping sending interrupt when not actively poll()ing", | 
 | 			 caller, __func__); | 
 | 	} else { | 
 | 		pctl.interrupt.requested++; | 
 |  | 
 | 		/* Check for duplicate requests. */ | 
 | 		if (pctl.interrupt.requested == 1) { | 
 | 			fd = pctl.interrupt.send; | 
 | 			xassert(!pctl.interrupt.sending); | 
 | 			pctl.interrupt.sending = true; | 
 | 			interrupt_return = &pctl.interrupt_return; | 
 |  | 
 | 			log_flag(CONMGR, "%s->%s: [POLL] sending interrupt requests=%d", | 
 | 				 caller, __func__, pctl.interrupt.requested); | 
 | 		} else { | 
 | 			log_flag(CONMGR, "%s->%s: [POLL] skipping sending another interrupt requests=%d sending=%c", | 
 | 				 caller, __func__, pctl.interrupt.requested, | 
 | 				 (pctl.interrupt.sending ? 'T' : 'F')); | 
 | 		} | 
 | 	} | 
 |  | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 |  | 
 | 	if (fd < 0) | 
 | 		return; | 
 |  | 
 | 	if ((rc = _intr_send_byte(fd, caller))) { | 
 | 		error("%s->%s: [POLL] write(%d) failed: %s", caller, __func__, | 
 | 		      fd, slurm_strerror(errno)); | 
 | 	} | 
 |  | 
 | 	slurm_mutex_lock(&pctl.mutex); | 
 | 	log_flag(CONMGR, "%s->%s: [POLL] interrupt sent requests=%d polling=%c", | 
 | 		 caller, __func__, pctl.interrupt.requested, | 
 | 		 (pctl.polling ? 'T' : 'F')); | 
 |  | 
 | 	xassert(fd == pctl.interrupt.send); | 
 | 	xassert(pctl.interrupt.sending); | 
 | 	pctl.interrupt.sending = false; | 
 |  | 
 | 	EVENT_BROADCAST(interrupt_return); | 
 | 	slurm_mutex_unlock(&pctl.mutex); | 
 | } | 
 |  | 
 | static bool _events_can_read(pollctl_events_t events) | 
 | { | 
 | 	return (events & (POLLIN | POLLHUP)); | 
 | } | 
 |  | 
 | static bool _events_can_write(pollctl_events_t events) | 
 | { | 
 | 	return (events & (POLLOUT | POLLHUP)); | 
 | } | 
 |  | 
 | static bool _events_has_error(pollctl_events_t events) | 
 | { | 
 | 	return (events & POLLERR); | 
 | } | 
 |  | 
 | static bool _events_has_hangup(pollctl_events_t events) | 
 | { | 
 | 	return (events & POLLHUP); | 
 | } | 
 |  | 
 | const poll_funcs_t poll_funcs = { | 
 | 	.mode = POLL_MODE_POLL, | 
 | 	.init = _init, | 
 | 	.fini = _fini, | 
 | 	.type_to_string = _type_to_string, | 
 | 	.link_fd = _lock_link_fd, | 
 | 	.relink_fd = _relink_fd, | 
 | 	.unlink_fd = _lock_unlink_fd, | 
 | 	.poll = _poll, | 
 | 	.for_each_event = _for_each_event, | 
 | 	.interrupt = _interrupt, | 
 | 	.events_can_read = _events_can_read, | 
 | 	.events_can_write = _events_can_write, | 
 | 	.events_has_error = _events_has_error, | 
 | 	.events_has_hangup = _events_has_hangup, | 
 | }; |