blob: 398a75629a1f3dd3210fb617acbebbce96e1c77e [file] [log] [blame]
# File src/library/parallel/R/unix/mcparallel.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2019 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is available at
# https://www.R-project.org/Licenses/
### Derived from multicore version 0.1-6 by Simon Urbanek
mcparallel <- function(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE)
{
f <- mcfork(detached)
env <- parent.frame()
if (isTRUE(mc.set.seed)) mc.advance.stream()
if (inherits(f, "masterProcess")) {
on.exit(mcexit(1L, structure("fatal error in wrapper code",
class = "try-error")))
if (isTRUE(mc.set.seed)) mc.set.stream()
mc.interactive <- as.logical(mc.interactive)
if (isTRUE(mc.interactive)) mcinteractive(TRUE)
if (isTRUE(!mc.interactive)) mcinteractive(FALSE)
if (!is.null(mc.affinity)) mcaffinity(mc.affinity)
if (isTRUE(silent)) closeStdout(TRUE)
if (detached) {
on.exit(mcexit(1L))
eval(expr, env)
mcexit(0L)
}
sendMaster(try(eval(expr, env), silent = TRUE))
mcexit(0L)
}
if (!missing(name) && !is.null(name)) f$name <- as.character(name)[1L]
class(f) <- c("parallelJob", class(f))
f
}
mccollect <- function(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
{
if (missing(jobs)) jobs <- children()
if (!length(jobs)) return (NULL)
if (isTRUE(intermediate)) intermediate <- utils::str
pids <- if (inherits(jobs, "process") || is.list(jobs))
processID(jobs) else jobs
if (!length(pids)) return(NULL)
if (!is.numeric(pids)) stop("invalid 'jobs' argument")
pids <- as.integer(pids)
pnames <- as.character(pids)
if (!inherits(jobs, "process") && is.list(jobs))
for(i in seq(jobs))
if (!is.null(jobs[[i]]$name))
pnames[i] <- as.character(jobs[[i]]$name)
if (!wait) {
s <- selectChildren(jobs, timeout)
if (is.logical(s) || !length(s)) return(NULL) ## select error
res <- lapply(s, function(x) NULL)
delivered.result <- 0
for (i in seq_along(s)) {
x <- s[i]
r <- readChild(x)
if (is.raw(r)) {
rmChild(x) ## avoid zombie process without waiting
## unserialize(r) might be null
res[i] <- list(unserialize(r))
delivered.result <- delivered.result + 1L
}
}
names(res) <- pnames[match(s, pids)]
expected.result <- length(s)
} else {
res <- lapply(pids, function(x) NULL)
names(res) <- pnames
fin <- rep(FALSE, length(pids))
delivered.result <- 0
while (!all(fin)) {
s <- selectChildren(pids[!fin], -1)
if (is.integer(s)) {
for (pid in s) {
r <- readChild(pid)
if (is.raw(r)) {
## unserialize(r) might be null
res[which(pid == pids)] <- list(unserialize(r))
delivered.result <- delivered.result + 1L
} else
## child exiting or error
fin[pid == pids] <- TRUE
}
if (is.function(intermediate)) intermediate(res)
} else
## should not happen (select error)
if (all(is.na(match(pids, processID(children()))))) break
}
expected.result <- length(pids)
}
nores <- expected.result - delivered.result
if (nores > 0)
warning(sprintf(ngettext(nores,
"%d parallel job did not deliver a result",
"%d parallel jobs did not deliver results"),
nores),
domain = NA)
cleanup(kill = FALSE, detach = FALSE) # compact children
res
}