blob: e055fcf1b0feedfd710fdb54e70d929b37026e21 [file] [log] [blame]
# File src/library/parallel/R/unix/mclapply.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2018 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is available at
# https://www.R-project.org/Licenses/
### Derived from multicore version 0.1-6 by Simon Urbanek
mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
affinity.list = NULL)
{
cores <- as.integer(mc.cores)
if((is.na(cores) || cores < 1L) && is.null(affinity.list))
stop("'mc.cores' must be >= 1")
.check_ncores(cores)
if (isChild() && !isTRUE(mc.allow.recursive))
return(lapply(X = X, FUN = FUN, ...))
## Follow lapply
if(!is.vector(X) || is.object(X)) X <- as.list(X)
if(!is.null(affinity.list) && length(affinity.list) < length(X))
stop("affinity.list and X must have the same length")
if(mc.set.seed) mc.reset.stream()
if(length(X) < 2) {
old.aff <- mcaffinity()
mcaffinity(affinity.list[[1]])
res <- lapply(X = X, FUN = FUN, ...)
mcaffinity(old.aff)
return(res)
}
if (length(X) < cores) cores <- length(X)
if (cores < 2L && is.null(affinity.list))
return(lapply(X = X, FUN = FUN, ...))
jobs <- list()
## all processes created from now on will be terminated by cleanup
prepareCleanup()
on.exit(cleanup(mc.cleanup))
if (!mc.preschedule) { # sequential (non-scheduled)
FUN <- match.fun(FUN)
if (length(X) <= cores && is.null(affinity.list)) { # we can use one-shot parallel
jobs <- lapply(seq_along(X),
function(i) mcparallel(FUN(X[[i]], ...),
name = names(X)[i],
mc.set.seed = mc.set.seed,
silent = mc.silent))
res <- mccollect(jobs)
if (length(res) == length(X)) names(res) <- names(X)
has.errors <- sum(sapply(res, inherits, "try-error"))
} else { # more complicated, we have to wait for jobs selectively
sx <- seq_along(X)
res <- vector("list", length(sx))
names(res) <- names(X)
fin <- rep(FALSE, length(X)) # values finished
if (!is.null(affinity.list)) {
## build matrix for job mapping with affinity.list
## entry i,j is true if item i is allowed to run on core j
cores <- max(unlist(x = affinity.list, recursive = TRUE))
d0 <- logical(cores)
cpu.map <- lapply(sx, function (i){
data <- d0
data[as.vector(affinity.list[[i]])] <- TRUE
data
})
ava <- do.call(rbind, cpu.map)
} else {
## build matrix for job mapping without affinity.list
## all entries true
ava <- matrix(TRUE, nrow = length(X), ncol = cores)
}
jobid <- integer(cores)
## choose first job for each core to start
for (i in 1:cores) {
jobid[i] <- match(TRUE, ava[,i]) # = which(ava[, i])[1]
ava[jobid[i],] <- FALSE
}
## remove unused cores from matrix
if(anyNA(jobid)) {
unused <- which(is.na(jobid))
jobid <- jobid[-unused]
ava <- ava[, -unused, drop = FALSE]
}
jobs <- lapply(jobid,
function(i) mcparallel(FUN(X[[i]], ...),
mc.set.seed = mc.set.seed,
silent = mc.silent,
mc.affinity = affinity.list[[i]]))
jobsp <- processID(jobs)
has.errors <- 0L
delivered.result <- 0L
while (!all(fin)) {
s <- selectChildren(jobs[!is.na(jobsp)], -1)
if (is.null(s)) break # no children -> no hope (should not happen)
if (is.integer(s))
for (ch in s) {
ji <- match(TRUE, jobsp == ch)
ci <- jobid[ji]
r <- readChild(ch)
if (is.raw(r)) {
child.res <- unserialize(r)
if (inherits(child.res, "try-error"))
has.errors <- has.errors + 1L
## we can't just assign it since a NULL
## assignment would remove it from the list
if (!is.null(child.res)) res[[ci]] <- child.res
delivered.result <- delivered.result + 1L
} else {
fin[ci] <- TRUE
## the job has finished, so we must not run
## select on its fds again
jobsp[ji] <- jobid[ji] <- NA
if (any(ava)) { # still something to do,
## look for first job which is allowed to
## run on the now idling core and spawn it
nexti <- which.max(ava[, ji])
if(!is.na(nexti)) {
jobid[ji] <- nexti
jobs[[ji]] <- mcparallel(FUN(X[[nexti]], ...),
mc.set.seed = mc.set.seed,
silent = mc.silent,
mc.affinity = affinity.list[[nexti]])
jobsp[ji] <- processID(jobs[[ji]])
ava[nexti,] <- FALSE
}
}
}
}
}
nores <- length(X) - delivered.result
if (nores > 0)
warning(sprintf(ngettext(nores,
"%d parallel function call did not deliver a result",
"%d parallel function calls did not deliver results"),
nores),
domain = NA)
}
if (has.errors)
warning(gettextf("%d function calls resulted in an error",
has.errors), domain = NA)
return(res)
}
## mc.preschedule = TRUE from here on.
if(!is.null(affinity.list))
warning("'mc.preschedule' must be false if 'affinity.list' is used")
sindex <- lapply(seq_len(cores),
function(i) seq(i, length(X), by = cores))
schedule <- lapply(seq_len(cores),
function(i) X[seq(i, length(X), by = cores)])
ch <- list()
res <- vector("list", length(X))
names(res) <- names(X)
cp <- rep(0L, cores)
fin <- rep(FALSE, cores)
dr <- rep(FALSE, cores)
inner.do <- function(core) {
S <- schedule[[core]]
f <- mcfork()
if (isTRUE(mc.set.seed)) mc.advance.stream()
if (inherits(f, "masterProcess")) { # this is the child process
on.exit(mcexit(1L, structure("fatal error in wrapper code", class="try-error")))
if (isTRUE(mc.set.seed)) mc.set.stream()
if (isTRUE(mc.silent)) closeStdout(TRUE)
sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
mcexit(0L)
}
jobs[[core]] <<- ch[[core]] <<- f
cp[core] <<- processID(f)
NULL
}
job.res <- lapply(seq_len(cores), inner.do)
ac <- cp[cp > 0]
has.errors <- integer(0)
while (!all(fin)) {
s <- selectChildren(ac[!fin], -1)
if (is.null(s)) break # no children -> no hope we get anything (should not happen)
if (is.integer(s))
for (ch in s) {
a <- readChild(ch)
if (is.integer(a)) {
core <- which(cp == a)
fin[core] <- TRUE
} else if (is.raw(a)) {
core <- which(cp == attr(a, "pid"))
job.res[[core]] <- ijr <- unserialize(a)
if (inherits(ijr, "try-error"))
has.errors <- c(has.errors, core)
dr[core] <- TRUE
} else if (is.null(a)) {
# the child no longer exists (should not happen)
core <- which(cp == ch)
fin[core] <- TRUE
}
}
}
for (i in seq_len(cores)) {
this <- job.res[[i]]
if (inherits(this, "try-error")) { ## length-1 result
for (j in sindex[[i]]) res[[j]] <- this
} else
## we can't just assign it since a NULL
## assignment would remove it from the list
if (!is.null(this)) res[sindex[[i]]] <- this
}
nores <- cores - sum(dr)
if (nores > 0)
warning(sprintf(ngettext(nores,
"scheduled core %s did not deliver a result, all values of the job will be affected",
"scheduled cores %s did not deliver results, all values of the jobs will be affected"),
paste(which(dr == FALSE), collapse = ", ")),
domain = NA)
if (length(has.errors)) {
if (length(has.errors) == cores)
warning("all scheduled cores encountered errors in user code")
else
warning(sprintf(ngettext(has.errors,
"scheduled core %s encountered error in user code, all values of the job will be affected",
"scheduled cores %s encountered errors in user code, all values of the jobs will be affected"),
paste(has.errors, collapse = ", ")),
domain = NA)
}
res
}