blob: 2869ee72e40f4e498831c6c0631935607cef4f92 [file] [log] [blame]
/*
* R : A Computer Language for Statistical Data Analysis
* (C) Copyright 2008-2011 Simon Urbanek
* Copyright 2011-2019 R Core Team.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, a copy is available at
* https://www.R-project.org/Licenses/
fork.c
interface to system-level tools for spawning copies of the current
process and IPC
Derived from multicore version 0.1-8 by Simon Urbanek
*/
/* #define MC_DEBUG */
#ifdef HAVE_CONFIG_H
# include <config.h> /* for affinity function checks and sigaction */
#endif
#define NO_NLS
#include <Defn.h> /* for R_isForkedChild */
#include "parallel.h"
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif
#include <sys/select.h>
#include <sys/wait.h>
#include <signal.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <Rinterface.h> /* for R_Interactive */
#include <R_ext/eventloop.h> /* for R_SelectEx */
/* NOTE: the logging is not safe to use in signal handler because printf is
not async-signal-safe */
#ifndef FILE_LOG
/* use printf instead of Rprintf for debugging to avoid forked console interactions */
#define Dprintf printf
#else
/* logging into a file */
#include <stdarg.h>
void Dprintf(char *format, ...) {
va_list (args);
va_start (args, format);
FILE *f = fopen("mc_debug.txt", "a");
if (f) {
fprintf(f, "%d> ", getpid());
vfprintf(f, format, args);
fclose(f);
}
va_end (args);
}
#endif
/* A child is created in mc_fork as detached (sEstranged=TRUE, has sifd
and pfd set to -1) or attached (sifd and pfd connected to pipes). The list
of children also includes cleanup marks.
A detached child is not visible to R user code (children(), mccollect(),
readChild(), selectChildren(), rmChild(), etc). Upon receiving sigchld,
a detached child is waited-for from the signal handler and waitedfor is
set. The child is eventually removed from the list by compact_children.
User R code must never do anything with a detached child to avoid race
conditions/unpredictable behavior due to PID reuse. A detached child
can never become attached.
An attached child is visible to R user code and always has file descriptors
sifd and pifd open and >= 0. It becomes detached via readChild() when it
returns an integer (signalling to user that the child is finishing or has
failed). An attached child is never waited for in the signal handler as
user R code is allowed to invoke operations on the child, such as kill - if
an attached child could have been waited for, PID reuse may lead to a
wrong process being killed. Instead, the child will be waited for when
detached. Detaching a child atomically involves a non-blocking waitpid to
check whether it already terminated, and closing the pfd and sifd. An
attached child can also be terminated and detached by user code via rmChild.
Note that if an attached child crashes (and sigchld is received though not
acted upon), its file descriptors will be closed, and hence selectChildren
in select will be notified of EOF on pfd and eventually readChild will
detach the child.
A cleanup mark is a child entry that is detached, waited-for, and has a
pid of -1. Hence, it is easy to skip in most code accessing the list.
The mark is created by mc_prepare_cleanup. mc_cleanup removes all children
up to and including the cleanup mark (terminates and detaches the children
and optionally also sends them kill signals and actively waits for them
to terminate).
When R is forked by mc_fork, the children list is cleaned in the child,
freeing the entries and closing the file descriptors, so that the entries
belonging to the parent do not end up in the child. However, R could
also be forked by some other function outside of parallel. To be robust
against that, parallel always checks whether the child still belongs to
it. We could use getppid for that, but it would not work for cleanup
marks, hence there is also ppid field in child entries.
*/
typedef struct child_info {
pid_t pid; /* child's pid */
int pfd, sifd; /* master's ends of pipes */
int detached; /* run with mcfork(estranged=TRUE) or manually removed */
int waitedfor; /* the child has been reaped */
pid_t ppid; /* parent's pid when the child/mark is created */
struct child_info *next;
} child_info_t;
static child_info_t *children; /* in master, linked list of details of children */
static int master_fd = -1; /* in child, write end of data pipe */
static int is_master = 1; /* 0 in child */
/* must only be called on detached child, not waited for,
not re-entrant (ok when called with sigchld blocked or
from signal handler established without SA_NODEFER) */
static void wait_for_child_ci(child_info_t *ci) {
int wstat;
if (waitpid(ci->pid, &wstat, WNOHANG) == ci->pid &&
(WIFEXITED(wstat) || WIFSIGNALED(wstat))) {
ci->waitedfor = 1;
#ifdef MC_DEBUG
/* FIXME: printing not safe from signal handler */
if (WIFEXITED(wstat))
Dprintf("child %d terminated with exit status %d\n", ci->pid,
WEXITSTATUS(wstat));
else
Dprintf("child %d terminated by signal %d\n", ci->pid,
WTERMSIG(wstat));
#endif
}
}
static void block_sigchld(sigset_t *oldset)
{
sigset_t ss;
sigemptyset(&ss);
sigaddset(&ss, SIGCHLD);
sigprocmask(SIG_BLOCK, &ss, oldset);
}
static void restore_sigchld(sigset_t *oldset)
{
sigprocmask(SIG_SETMASK, oldset, NULL);
}
/* must not be called from a signal handler */
static void close_fds_child_ci(child_info_t *ci)
{
/* note the check and close is not atomic */
if (ci->pfd > 0) { close(ci->pfd); ci->pfd = -1; }
if (ci->sifd > 0) { close(ci->sifd); ci->sifd = -1; }
}
/* must only be called on attached child */
static void kill_and_detach_child_ci(child_info_t *ci, int sig)
{
/* need to atomically wait and set detached */
sigset_t ss;
block_sigchld(&ss);
close_fds_child_ci(ci);
if (kill(ci->pid, sig) == -1)
warning(_("unable to terminate child process: %s"), strerror(errno));
ci->detached = 1;
/* check if the child has exited already, as sigchld may already have
been received */
wait_for_child_ci(ci);
#ifdef MC_DEBUG
Dprintf("detached child %d (signal %d)\n", ci->pid, sig);
#endif
restore_sigchld(&ss);
}
/* must only be called on attached child */
static void terminate_and_detach_child_ci(child_info_t *ci)
{
kill_and_detach_child_ci(ci, SIGUSR1);
}
/* must only be called on a detached child */
static void kill_detached_child_ci(child_info_t *ci, int sig)
{
/* need to atomically check waitedfor and kill */
sigset_t ss;
block_sigchld(&ss);
if (!ci->waitedfor) {
if (kill(ci->pid, sig) == -1) {
warning(_("unable to terminate child: %s"), strerror(errno));
#ifdef MC_DEBUG
Dprintf("failed to kill detached child %d (signal %d)\n", ci->pid, sig);
#endif
}
#ifdef MC_DEBUG
else
Dprintf("killed detached child %d (signal %d)\n", ci->pid, sig);
#endif
}
restore_sigchld(&ss);
}
/* detach and terminate a child */
static int rm_child(int pid)
{
child_info_t *ci = children;
#ifdef MC_DEBUG
Dprintf("removing child %d\n", pid);
#endif
pid_t ppid = getpid();
while (ci) {
/* detached children are not visible to R code */
if (!ci->detached && ci->pid == pid && ci->ppid == ppid) {
terminate_and_detach_child_ci(ci);
return 1;
}
ci = ci->next;
}
#ifdef MC_DEBUG
Dprintf("WARNING: child %d was to be removed but it doesn't exist\n", pid);
#endif
return 0;
}
/* delete entries for waited-for children and children that are not of this process */
static void compact_children() {
child_info_t *ci = children, *prev = NULL;
pid_t ppid = getpid();
/* prevent interference with signal handler accessing the list */
sigset_t ss;
block_sigchld(&ss);
while(ci) {
if ((ci->waitedfor && ci->pid >= 0) || ci->ppid != ppid) {
/* ci->pid >= to skip cleanup mark */
/* fds of waited-for children have been closed when detaching */
if (ci->ppid != ppid) {
close_fds_child_ci(ci);
#ifdef MC_DEBUG
Dprintf("removing child %d from the list as it is not ours\n", ci->pid);
#endif
}
#ifdef MC_DEBUG
else
Dprintf("removing waited-for child %d from the list\n", ci->pid);
#endif
child_info_t *next = ci->next;
if (prev) prev->next = next;
else children = next;
free(ci);
ci = next;
} else {
prev = ci;
ci = ci->next;
}
}
restore_sigchld(&ss);
}
/* insert a cleanup mark into children */
SEXP mc_prepare_cleanup()
{
child_info_t *ci;
compact_children();
ci = (child_info_t*) malloc(sizeof(child_info_t));
if (!ci) error(_("memory allocation error"));
ci->waitedfor = 1;
ci->detached = 1;
ci->pid = -1; /* a cleanup mark */
ci->pfd = -1;
ci->sifd = -1; /* set fds to -1 to simplify close */
ci->ppid = getpid();
ci->next = children;
children = ci;
return R_NilValue;
}
/* Terminate and detach all children up to the first cleanup mark. Compact
children.
sKill - like mc.cleanup from mclapply;
TRUE - send SIGTERM to all children,
FALSE - do not kill children (but detach until sDetach = FALSE)
num>0 - send signal of this number
sDetach - TRUE - detach children (with a signal specified via sKill, or
with SIGUSR1 when sKill = FALSE)
sShutdown - shut down parallel (when unloading the package or exiting R)
TRUE - act on all children, not just up to the first mark,
wait until they all terminate, unregister signal handler
FALSE - none of the above
Typical use:
sKill = TRUE, sDetach = TRUE, sShutdown = FALSE
(mclapply, pvec)
sKill = FALSE, sDetach = FALSE, sShutdown = FALSE
(mccollect - only compact children)
sKill = tools:SIGKILL, sDetach = TRUE, sShutdown = TRUE
(clean_pids - finalizer on a private object in the package namespace)
*/
static void restore_sig_handler();
SEXP mc_cleanup(SEXP sKill, SEXP sDetach, SEXP sShutdown)
{
int sig = -1; /* signal from sKill/mc.cleanup */
if (isLogical(sKill)) {
int lkill = asLogical(sKill);
if (lkill == TRUE) sig = SIGTERM;
else if (lkill == FALSE) sig = 0;
} else {
int ikill = asInteger(sKill);
if (ikill >= 1 && ikill != NA_INTEGER)
sig = ikill;
}
if (sig == -1)
error(_("invalid '%s' argument"), "mc.cleanup");
int detach = asLogical(sDetach);
if (detach == NA_LOGICAL)
error(_("invalid '%s' argument"), "detach");
int shutdown = asLogical(sShutdown);
if (shutdown == NA_LOGICAL)
error(_("invalid '%s' argument"), "shutdown");
compact_children(); /* also removes children that are not ours */
child_info_t *ci = children;
int nattached = 0;
while(ci) {
if (ci->detached && ci->waitedfor && ci->pid == -1) {
/* cleanup mark */
/* set pid to a nonzero number so that the child will be removed
from the list by compact_children; as it is waitedfor, it will
not be sent any signal */
ci->pid = INT_MAX;
if (!shutdown) break;
}
if (ci->detached && sig)
/* only kills if not waited for */
kill_detached_child_ci(ci, sig);
if (!ci->detached && detach) {
/* With sKill == FALSE (mclapply mc.cleanup=FALSE), send
SIGUSR1 to just detach the child. Detaching also closes the file
descriptors which contributes to termination probably even more,
as it is not likely that the child will be finished and just
waiting for SIGUSR1. */
kill_and_detach_child_ci(ci, sig ? sig : SIGUSR1);
nattached++;
}
ci = ci->next;
}
/* now all children in the range are detached */
if (nattached > 0) {
#ifdef MC_DEBUG
Dprintf(" %d children detached during cleanup\n", nattached);
#endif
sleep(1); /* wait a tiny bit for at least the first child */
}
compact_children(); /* also removes the cleanup mark(s) */
if (shutdown) {
double before = currentTime();
while(children) {
sleep(1); /* can be interrupted by SIGCHLD */
compact_children();
if (!children)
break;
#ifdef MC_DEBUG
ci = children;
while(ci) {
Dprintf(" still existing child: %d (waitedfor=%d detached=%d)\n",
ci->pid, ci->waitedfor, ci->detached);
ci = ci->next;
}
#endif
double now = currentTime();
if (now - before > 10) { /* give up after 10 seconds */
REprintf(_("Error while shutting down parallel: unable to terminate some child processes\n"));
restore_sig_handler();
return R_NilValue;
}
}
restore_sig_handler();
#ifdef MC_DEBUG
Dprintf("process %d parallel shutdown ok\n", getpid());
#endif
}
return R_NilValue;
}
#ifndef STDIN_FILENO
#define STDIN_FILENO 0
#endif
#ifndef STDOUT_FILENO
#define STDOUT_FILENO 1
#endif
#ifndef STDERR_FILENO
#define STDERR_FILENO 2
#endif
/* FIXME: child_exit_status is always -1 */
/* FIXME: do we still need SIGUSR1 to signal children they can exit? */
static int child_can_exit = 0, child_exit_status = -1;
static void child_sig_handler(int sig)
{
if (sig == SIGUSR1) {
#ifdef MC_DEBUG
Dprintf("child process %d got SIGUSR1; child_exit_status=%d\n",
getpid(), child_exit_status);
#endif
child_can_exit = 1;
if (child_exit_status >= 0)
_exit(child_exit_status);
}
}
static struct sigaction old_sig_handler;
static int parent_handler_set = 0;
/* The siginfo version of the sigchld handler gives the PID of the exiting
process, but because of merging of signals one would have to poll through
all detached children, anyway, so it won't really help. */
static void parent_sig_handler(int sig)
{
child_info_t *ci = children;
while(ci) {
if (ci->detached && !ci->waitedfor)
wait_for_child_ci(ci);
ci = ci->next;
}
/* TODO: chain to old sig handler */
}
static void setup_sig_handler()
{
if (!parent_handler_set) {
parent_handler_set = 1;
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_handler = parent_sig_handler;
sa.sa_flags = SA_RESTART;
sigaction(SIGCHLD, &sa, &old_sig_handler);
}
}
static void restore_sig_handler()
{
if (parent_handler_set) {
parent_handler_set = 0;
sigaction(SIGCHLD, &old_sig_handler, NULL);
}
}
SEXP mc_fork(SEXP sEstranged)
{
int pipefd[2]; /* write end, read end */
int sipfd[2];
pid_t pid;
int estranged = (asInteger(sEstranged) > 0);
SEXP res = allocVector(INTSXP, 3);
int *res_i = INTEGER(res);
if (!estranged) {
if (pipe(pipefd)) error(_("unable to create a pipe"));
if (pipe(sipfd)) {
close(pipefd[0]); close(pipefd[1]);
error(_("unable to create a pipe"));
}
#ifdef MC_DEBUG
Dprintf("parent[%d] created pipes: comm (%d->%d), sir (%d->%d)\n",
getpid(), pipefd[1], pipefd[0], sipfd[1], sipfd[0]);
#endif
}
/* make sure we get SIGCHLD to clean up the child process */
setup_sig_handler();
sigset_t ss;
/* Block sigchld to make sure the child does not exist before registered
in children list. Also make sure that the parent signal handler is not
used in the child. */
block_sigchld(&ss);
fflush(stdout); // or children may output pending text
pid = fork();
if (pid == -1) {
if (!estranged) {
close(pipefd[0]); close(pipefd[1]);
close(sipfd[0]); close(sipfd[1]);
}
error(_("unable to fork, possible reason: %s"), strerror(errno));
}
res_i[0] = (int) pid;
if (pid == 0) { /* child */
R_isForkedChild = 1;
/* free children entries inherited from parent */
while(children) {
close_fds_child_ci(children);
child_info_t *ci = children->next;
free(children);
children = ci;
}
restore_sigchld(&ss);
restore_sig_handler(); /* enable the previous signal handler */
if (estranged)
res_i[1] = res_i[2] = NA_INTEGER;
else {
close(pipefd[0]); /* close read end */
master_fd = res_i[1] = pipefd[1];
res_i[2] = NA_INTEGER;
/* re-map stdin */
dup2(sipfd[0], STDIN_FILENO);
close(sipfd[0]);
}
is_master = 0;
/* master uses USR1 to signal that the child process can terminate */
child_exit_status = -1;
if (estranged)
child_can_exit = 1;
else {
child_can_exit = 0;
signal(SIGUSR1, child_sig_handler);
}
#ifdef MC_DEBUG
Dprintf("child process %d started\n", getpid());
#endif
} else { /* master process */
child_info_t *ci;
ci = (child_info_t*) malloc(sizeof(child_info_t));
if (!ci) error(_("memory allocation error"));
ci->pid = pid;
ci->ppid = getpid();
ci->waitedfor = 0;
if (estranged) {
ci->detached = 1;
res_i[1] = res_i[2] = NA_INTEGER;
ci->pfd = -1;
ci->sifd = -1;
} else {
ci->detached = 0;
close(pipefd[1]); /* close write end of the data pipe */
close(sipfd[0]); /* close read end of the child-stdin pipe */
res_i[1] = pipefd[0];
res_i[2] = sipfd[1];
/* register the new child and its pipes */
ci->pfd = pipefd[0];
ci->sifd= sipfd[1];
}
#ifdef MC_DEBUG
Dprintf("parent registers new child %d\n", pid);
#endif
ci->next = children;
children = ci;
restore_sigchld(&ss);
}
return res; /* (pid, fd of data pipe, fd of child-stdin pipe) */
}
SEXP mc_close_stdout(SEXP toNULL)
{
if (asLogical(toNULL) == 1) {
int fd = open("/dev/null", O_WRONLY);
if (fd != -1) {
dup2(fd, STDOUT_FILENO);
close(fd);
} else close(STDOUT_FILENO);
} else
close(STDOUT_FILENO);
return R_NilValue;
}
/* not used */
SEXP mc_close_stderr(SEXP toNULL)
{
if (asLogical(toNULL) == 1) {
int fd = open("/dev/null", O_WRONLY);
if (fd != -1) {
dup2(fd, STDERR_FILENO);
close(fd);
} else close(STDERR_FILENO);
} else
close(STDERR_FILENO);
return R_NilValue;
}
/* not used */
SEXP mc_close_fds(SEXP sFDS)
{
int *fd, fds, i = 0;
if (TYPEOF(sFDS) != INTSXP) error("descriptors must be integers");
fds = LENGTH(sFDS);
fd = INTEGER(sFDS);
while (i < fds) close(fd[i++]);
return ScalarLogical(1);
}
/* Read from descriptor with restart on signal interrupt. While we register
SIGCHLD with SA_RESTART, some other package might register a signal
without it. */
static ssize_t readrep(int fildes, void *buf, size_t nbyte)
{
size_t rbyte = 0;
char *ptr = (char *)buf;
for(;;) {
ssize_t r = read(fildes, ptr + rbyte, nbyte - rbyte);
if (r == -1) {
if (errno == EINTR)
continue;
else
return -1;
}
if (r == 0) /* EOF */
return rbyte;
rbyte += r;
if (rbyte == nbyte)
return rbyte;
}
}
/* Write to descriptor with restart on signal interrupt. While we register
SIGCHLD with SA_RESTART, some other package might register a signal
without it. */
static ssize_t writerep(int fildes, const void *buf, size_t nbyte)
{
size_t wbyte = 0;
const char *ptr = (const char *)buf;
for(;;) {
ssize_t w = write(fildes, ptr + wbyte, nbyte - wbyte);
if (w == -1) {
if (errno == EINTR)
continue;
else
return -1;
}
if (w == 0) {
/* possibly sending EOF on some systems via nbyte == 0,
or an old indication that non-blocking read of not
even a single more byte is possible */
return wbyte;
}
wbyte += w;
if (wbyte == nbyte)
return wbyte;
}
}
/* This format is read by read_child_ci (only).
Prior to R 3.4.0 len was unsigned int and the format did not
allow long vectors.
*/
SEXP mc_send_master(SEXP what)
{
if (is_master)
error(_("only children can send data to the master process"));
if (master_fd == -1)
error(_("there is no pipe to the master process"));
if (TYPEOF(what) != RAWSXP)
error(_("content to send must be RAW, use serialize() if needed"));
R_xlen_t len = XLENGTH(what);
unsigned char *b = RAW(what);
#ifdef MC_DEBUG
Dprintf("child %d: send_master (%lld bytes)\n", getpid(), (long long)len);
#endif
if (writerep(master_fd, &len, sizeof(len)) != sizeof(len)) {
close(master_fd);
master_fd = -1;
error(_("write error, closing pipe to the master"));
}
ssize_t n;
for (R_xlen_t i = 0; i < len; i += n) {
n = writerep(master_fd, b + i, len - i);
if (n < 1) {
close(master_fd);
master_fd = -1;
error(_("write error, closing pipe to the master"));
}
}
return ScalarLogical(1);
}
SEXP mc_send_child_stdin(SEXP sPid, SEXP what)
{
int pid = asInteger(sPid);
if (!is_master)
error(_("only the master process can send data to a child process"));
if (TYPEOF(what) != RAWSXP) error("what must be a raw vector");
child_info_t *ci = children;
pid_t ppid = getpid();
while (ci) {
if (!ci->detached && ci->pid == pid && ci->ppid == ppid) break;
ci = ci->next;
}
if (!ci || ci->sifd < 0) error(_("child %d does not exist"), pid);
R_xlen_t len = XLENGTH(what);
unsigned char *b = RAW(what);
unsigned int fd = ci -> sifd;
for (R_xlen_t i = 0; i < len;) {
ssize_t n = writerep(fd, b + i, len - i);
if (n < 1) error(_("write error"));
i += n;
}
return ScalarLogical(1);
}
/* some systems have FD_COPY */
void fdcopy(fd_set *dst, fd_set *src, int nfds)
{
FD_ZERO(dst);
for(int i = 0; i < nfds; i++)
if (FD_ISSET(i, src)) FD_SET(i, dst);
}
SEXP mc_select_children(SEXP sTimeout, SEXP sWhich)
{
int maxfd = -1, sr;
unsigned int wlen = 0, wcount = 0;
SEXP res;
int *res_i, *which = 0;
child_info_t *ci = children;
fd_set fs;
double timeout = 0;
pid_t ppid = getpid();
if (isReal(sTimeout) && LENGTH(sTimeout) == 1)
timeout = asReal(sTimeout);
if (TYPEOF(sWhich) == INTSXP && LENGTH(sWhich)) {
which = INTEGER(sWhich);
wlen = LENGTH(sWhich);
}
FD_ZERO(&fs);
while (ci) {
if (!ci->detached && ci->ppid == ppid) {
/* attached children have ci->pfd > 0 */
if (which) { /* check for the FD only if it's on the list */
unsigned int k = 0;
while (k < wlen) {
if (which[k++] == ci->pid) {
if (ci->pfd > FD_SETSIZE)
error("file descriptor is too large for select()");
FD_SET(ci->pfd, &fs);
if (ci->pfd > maxfd) maxfd = ci->pfd;
#ifdef MC_DEBUG
Dprintf("select_children: added child %d (%d)\n", ci->pid, ci->pfd);
#endif
wcount++;
break;
}
}
} else {
/* not sure if this should be allowed */
if (ci->pfd > FD_SETSIZE)
error("file descriptor is too large for select()");
FD_SET(ci->pfd, &fs);
if (ci->pfd > maxfd) maxfd = ci->pfd;
#ifdef MC_DEBUG
Dprintf("select_children: added child %d (%d)\n", ci->pid, ci->pfd);
#endif
}
}
ci = ci->next;
}
if (which && wcount < wlen) {
/* children specified multiple times or not found */
for(unsigned int k = 0; k < wlen; k++) {
ci = children;
int found = 0;
while(ci) {
if (!ci->detached && ci->ppid == ppid &&
ci->pid == which[k] && FD_ISSET(ci->pfd, &fs)) {
found = 1;
break;
}
ci = ci->next;
}
if (!found) {
#ifdef MC_DEBUG
Dprintf("select_children: cannot wait for child %d (idx %d) as it does not exist\n",
which[k], k);
#endif
/* FIXME: probably should be an error */
warning(_("cannot wait for child %d as it does not exist"), which[k]);
}
}
}
#ifdef MC_DEBUG
Dprintf("select_children: maxfd=%d, wlen=%d, wcount=%d, timeout=%f\n", maxfd, wlen, wcount, timeout);
#endif
if (maxfd == -1)
return R_NilValue; /* NULL signifies no children to tend to */
if (timeout == 0) {
/* polling */
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
sr = R_SelectEx(maxfd + 1, &fs, 0, 0, &tv, NULL);
} else {
double before = currentTime();
double remains = timeout;
struct timeval tv;
fd_set savefs;
fdcopy(&savefs, &fs, maxfd + 1);
for(;;) {
R_ProcessEvents();
/* re-set tv as it may get updated by select */
if (R_wait_usec > 0) {
tv.tv_sec = 0;
tv.tv_usec = R_wait_usec;
/* FIXME: ?Rg_wait_usec */
} else if (timeout > 0) {
tv.tv_sec = (int) remains;
tv.tv_usec = (int) ((remains - ((double) tv.tv_sec)) * 1e6);
} else {
/* Note: I'm not sure we really should allow this .. */
tv.tv_sec = 1; /* still allow to process events */
tv.tv_usec = 0;
}
sr = R_SelectEx(maxfd + 1, &fs, 0, 0, &tv, NULL);
if (sr > 0 || (sr < 0 && errno != EINTR))
break;
if (timeout > 0) {
remains = timeout - (currentTime() - before);
if (remains <= 0)
/* sr == 0 (timed out) or sr<0 && errno==EINTR */
break;
}
fdcopy(&fs, &savefs, maxfd + 1);
}
}
#ifdef MC_DEBUG
Dprintf(" sr = %d\n", sr);
#endif
if (sr < 0) {
if (errno == EINTR) /* treat as timeout */
return ScalarLogical(TRUE);
warning(_("error '%s' in select"), strerror(errno));
return ScalarLogical(FALSE); /* FALSE on select error */
}
if (sr < 1) return ScalarLogical(TRUE); /* TRUE on timeout */
ci = children;
#ifdef MC_DEBUG
Dprintf(" - read select %d children: ", sr);
#endif
res = allocVector(INTSXP, sr);
res_i = INTEGER(res);
while (ci) { /* fill the array */
if (!ci->detached && ci->ppid == ppid && FD_ISSET(ci->pfd, &fs)) {
(res_i++)[0] = ci->pid;
#ifdef MC_DEBUG
Dprintf("%d ", ci->pid);
#endif
}
ci = ci->next;
}
#ifdef MC_DEBUG
Dprintf("\n");
#endif
return res;
}
static SEXP read_child_ci(child_info_t *ci)
{
if (ci->detached)
/* should not happen */
return R_NilValue; /* not visible to R code */
R_xlen_t len;
int fd = ci->pfd;
int pid = ci->pid;
ssize_t n = readrep(fd, &len, sizeof(len));
#ifdef MC_DEBUG
Dprintf("read_child_ci(%d) - read length returned %lld\n", pid, (long long)n);
#endif
if (n != sizeof(len) || len == 0) {
/* child is exiting (n==0), or error */
terminate_and_detach_child_ci(ci);
return ScalarInteger(pid);
} else {
SEXP rv = allocVector(RAWSXP, len);
unsigned char *rvb = RAW(rv);
R_xlen_t i = 0;
while (i < len) {
n = readrep(fd, rvb + i, len - i);
#ifdef MC_DEBUG
Dprintf("read_child_ci(%d) - read %lld at %lld returned %lld\n",
ci->pid, (long long)len-i, (long long)i, (long long)n);
#endif
if (n < 1) { /* error */
terminate_and_detach_child_ci(ci);
return ScalarInteger(pid);
}
i += n;
}
PROTECT(rv);
{
SEXP pa;
PROTECT(pa = ScalarInteger(ci->pid));
setAttrib(rv, install("pid"), pa);
UNPROTECT(1); /* pa */
}
UNPROTECT(1); /* rv */
return rv;
}
}
SEXP mc_read_child(SEXP sPid)
{
int pid = asInteger(sPid);
child_info_t *ci = children;
pid_t ppid = getpid();
while (ci) {
if (!ci->detached && ci->pid == pid && ci->ppid == ppid) break;
ci = ci->next;
}
#ifdef MC_DEBUG
if (!ci) Dprintf("read_child(%d) - pid is not in the list of children\n", pid);
#endif
if (!ci) return R_NilValue; /* if the child doesn't exist anymore, returns NULL */
return read_child_ci(ci);
}
/* not used */
SEXP mc_read_children(SEXP sTimeout)
{
int maxfd = 0, sr;
child_info_t *ci = children;
fd_set fs;
struct timeval tv = { 0, 0 }, *tvp = &tv;
if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
double tov = asReal(sTimeout);
if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
else {
tv.tv_sec = (int) tov;
tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
}
}
{
int wstat;
while (waitpid(-1, &wstat, WNOHANG) > 0) ; /* check for zombies */
}
FD_ZERO(&fs);
pid_t ppid = getpid();
while (ci) {
if (!ci->detached && ci->ppid == ppid) {
if (ci->pfd > maxfd) maxfd = ci->pfd;
if (ci->pfd > 0) FD_SET(ci->pfd, &fs);
}
ci = ci -> next;
}
#ifdef MC_DEBUG
Dprintf("read_children: maxfd=%d, timeout=%d:%d\n", maxfd, (int)tv.tv_sec, (int)tv.tv_usec);
#endif
if (maxfd == 0) return R_NilValue; /* NULL signifies no children to tend to */
sr = R_SelectEx(maxfd+1, &fs, 0, 0, tvp, NULL);
#ifdef MC_DEBUG
Dprintf("sr = %d\n", sr);
#endif
if (sr < 0) {
warning(_("error '%s' in select"), strerror(errno));
return ScalarLogical(0); /* FALSE on select error */
}
if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
ci = children;
while (ci) {
if (!ci->detached && ci->ppid == ppid) {
if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) break;
}
ci = ci -> next;
}
#ifdef MC_DEBUG
Dprintf("set ci=%p (%d, %d)\n", (void*) ci, ci?ci->pid:0, ci?ci->pfd:0);
#endif
/* this should never occur really - select signalled a read handle
but none of the handles is set - let's treat it as a timeout */
if (!ci) return ScalarLogical(1);
else return read_child_ci(ci);
}
SEXP mc_rm_child(SEXP sPid)
{
int pid = asInteger(sPid);
return ScalarLogical(rm_child(pid));
}
SEXP mc_children()
{
child_info_t *ci = children;
unsigned int count = 0;
pid_t ppid = getpid();
while (ci) {
if (!ci->detached && ci->ppid == ppid) count++;
ci = ci->next;
}
SEXP res = allocVector(INTSXP, count);
/* the list of attached children cannot be changed by signal handlers */
if (count) {
int *pids = INTEGER(res);
ci = children;
while (ci) {
if (!ci->detached && ci->ppid == ppid) (pids++)[0] = ci->pid;
ci = ci->next;
}
}
return res;
}
/* not used */
SEXP mc_fds(SEXP sFdi)
{
int fdi = asInteger(sFdi);
unsigned int count = 0;
SEXP res;
child_info_t *ci = children;
pid_t ppid = getpid();
while (ci) {
if (!ci->detached && ci->ppid == ppid) count++;
ci = ci->next;
}
res = allocVector(INTSXP, count);
if (count) {
int *fds = INTEGER(res);
ci = children;
while (ci) {
if (!ci->detached && ci->ppid == ppid)
(fds++)[0] = (fdi == 0) ? ci->pfd : ci->sifd;
ci = ci->next;
}
}
return res;
}
/* not used */
SEXP mc_master_fd()
{
return ScalarInteger(master_fd);
}
SEXP mc_is_child()
{
return ScalarLogical(is_master ? FALSE : TRUE);
}
/* not used */
SEXP mc_kill(SEXP sPid, SEXP sSig)
{
int pid = asInteger(sPid);
int sig = asInteger(sSig);
if (kill((pid_t) pid, sig))
error(_("'mckill' failed"));
return ScalarLogical(1);
}
extern int R_ignore_SIGPIPE; /* defined in src/main/main.c on unix */
SEXP NORET mc_exit(SEXP sRes)
{
int res = asInteger(sRes);
#ifdef MC_DEBUG
Dprintf("child %d: 'mcexit(%d)' called, master_fd=%d\n", getpid(),
res, master_fd);
#endif
if (is_master) error(_("'mcexit' can only be used in a child process"));
if (master_fd != -1) { /* send 0 to signify that we're leaving */
size_t len = 0;
/* If rmChild() was called, the master may already have closed its end
of the pipe, so the write may fail with EPIPE; disable the default
R SIGPIPE handler to avoid a runtime error in that case, and ignore
EPIPE. This may hide also real errors, but they should be detected
via other means (results not delivered). Alternatively, we could
rewrite mccollect(wait=FALSE) to wait for jobs that delivered
results to also finish, and avoid using rmChild(). */
R_ignore_SIGPIPE = 1;
/* assign result for Fedora security settings */
ssize_t n = writerep(master_fd, &len, sizeof(len));
/* make sure the pipe is closed before we enter any waiting */
close(master_fd);
R_ignore_SIGPIPE = 0;
master_fd = -1;
if (n < 0 && errno != EPIPE)
error(_("write error, closing pipe to the master"));
}
if (!child_can_exit) {
#ifdef MC_DEBUG
Dprintf("child %d is waiting for permission to exit\n", getpid());
#endif
while (!child_can_exit) sleep(1);
/* SIGUSR1 will terminate the sleep */
}
#ifdef MC_DEBUG
Dprintf("child %d: exiting with exit status %d\n", getpid(), res);
#endif
_exit(res);
error(_("'mcexit' failed"));
}
/* NA = query, TRUE/FALSE = set R_Interactive accordingly */
SEXP mc_interactive(SEXP sWhat) {
int what = asInteger(sWhat);
if (what != NA_INTEGER)
R_Interactive = what;
return ScalarLogical(R_Interactive);
}
/*-- mcaffinity --
FIXME: we may want to move this outside fork.c in case Windows can do that */
#ifdef HAVE_SCHED_SETAFFINITY
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#if defined(CPU_ZERO) && defined(CPU_COUNT) && defined(CPU_SETSIZE) && defined(CPU_SET) && defined(CPU_SET_S) && defined(CPU_ISSET)
#define WORKING_MC_AFFINITY
#endif
#endif
#ifdef WORKING_MC_AFFINITY
/* req is one-based, cpu_set is zero-based */
SEXP mc_affinity(SEXP req) {
if (req != R_NilValue && TYPEOF(req) != INTSXP && TYPEOF(req) != REALSXP)
error(_("invalid CPU affinity specification"));
if (TYPEOF(req) == REALSXP)
req = coerceVector(req, INTSXP);
if (TYPEOF(req) == INTSXP) {
int max_cpu = 0, i, n = LENGTH(req), *v = INTEGER(req);
for (i = 0; i < n; i++) {
if (v[i] > max_cpu)
max_cpu = v[i];
if (v[i] < 1)
error(_("invalid CPU affinity specification"));
}
/* These are both one-based */
if (max_cpu <= CPU_SETSIZE) { /* can use static set */
cpu_set_t cs;
CPU_ZERO(&cs);
for (i = 0; i < n; i++)
CPU_SET(v[i] - 1, &cs);
sched_setaffinity(0, sizeof(cpu_set_t), &cs);
} else {
#ifndef CPU_ALLOC
error(_("requested CPU set is too large for this system"));
#else
size_t css = CPU_ALLOC_SIZE(max_cpu);
cpu_set_t *cs = CPU_ALLOC(max_cpu);
CPU_ZERO_S(css, cs);
for (i = 0; i < n; i++)
CPU_SET_S(v[i] - 1, css, cs);
sched_setaffinity(0, css, cs);
#endif
}
}
{
/* FIXME: in theory we may want to use *_S versions as well,
but that would require some knowledge about the number of
available CPUs and comparing that to CPU_SETSIZE, so for now
we just use static cpu_set -- the mask will be still set
correctly, just the returned set will be truncated at
CPU_SETSIZE */
cpu_set_t cs;
CPU_ZERO(&cs);
if (sched_getaffinity(0, sizeof(cs), &cs)) {
if (req == R_NilValue)
error(_("retrieving CPU affinity set failed"));
return R_NilValue;
} else {
SEXP res = allocVector(INTSXP, CPU_COUNT(&cs));
int i, *v = INTEGER(res);
for (i = 0; i < CPU_SETSIZE; i++)
if (CPU_ISSET(i, &cs))
*(v++) = i + 1;
return res;
}
}
}
#else /* ! WORKING_MC_AFFINITY */
SEXP mc_affinity(SEXP req) {
return R_NilValue;
}
#endif /* WORKING_MC_AFFINITY */