| # File src/library/parallel/R/clusterApply.R |
| # Part of the R package, https://www.R-project.org |
| # |
| # Copyright (C) 1995-2016 The R Core Team |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # A copy of the GNU General Public License is available at |
| # https://www.R-project.org/Licenses/ |
| |
| ## Derived from snow 0.3-6 by Luke Tierney |
| |
| staticClusterApply <- function(cl = NULL, fun, n, argfun) { |
| cl <- defaultCluster(cl) |
| p <- length(cl) |
| if (n > 0L && p) { |
| val <- vector("list", n) |
| start <- 1L |
| while (start <= n) { |
| end <- min(n, start + p - 1L) |
| jobs <- end - start + 1L |
| for (i in 1:jobs) |
| sendCall(cl[[i]], fun, argfun(start + i - 1L)) |
| val[start:end] <- lapply(cl[1:jobs], recvResult) |
| start <- start + jobs |
| } |
| checkForRemoteErrors(val) |
| } |
| } |
| |
| dynamicClusterApply <- function(cl = NULL, fun, n, argfun) { |
| cl <- defaultCluster(cl) |
| p <- length(cl) |
| if (n > 0L && p) { |
| submit <- function(node, job) |
| sendCall(cl[[node]], fun, argfun(job), tag = job) |
| for (i in 1:min(n, p)) submit(i, i) |
| val <- vector("list", n) |
| for (i in 1:n) { |
| d <- recvOneResult(cl) |
| j <- i + min(n, p) |
| if (j <= n) submit(d$node, j) |
| val[d$tag] <- list(d$value) |
| } |
| checkForRemoteErrors(val) |
| } |
| } |
| |
| ## exported and documented from here down unless otherwise stated. |
| |
| clusterCall <- function(cl = NULL, fun, ...) |
| { |
| cl <- defaultCluster(cl) |
| for (i in seq_along(cl)) sendCall(cl[[i]], fun, list(...)) |
| checkForRemoteErrors(lapply(cl, recvResult)) |
| } |
| |
| |
| clusterEvalQ <- function(cl = NULL, expr) |
| clusterCall(cl, eval, substitute(expr), env=.GlobalEnv) |
| |
| clusterExport <- local({ |
| gets <- function(n, v) { assign(n, v, envir = .GlobalEnv); NULL } |
| function(cl = NULL, varlist, envir = .GlobalEnv) { |
| ## do this with only one clusterCall--loop on workers? |
| for (name in varlist) { |
| clusterCall(cl, gets, name, get(name, envir = envir)) |
| } |
| } |
| }) |
| |
| clusterApply <- function(cl = NULL, x, fun, ...) |
| { |
| ## **** this closure is sending all of x to all nodes |
| argfun <- function(i) c(list(x[[i]]), list(...)) |
| staticClusterApply(cl, fun, length(x), argfun) |
| } |
| |
| clusterApplyLB <- function(cl = NULL, x, fun, ...) |
| { |
| ## **** this closure is sending all of x to all nodes |
| argfun <- function(i) c(list(x[[i]]), list(...)) |
| dynamicClusterApply(cl, fun, length(x), argfun) |
| } |
| |
| clusterMap <- function (cl = NULL, fun, ..., MoreArgs = NULL, RECYCLE = TRUE, |
| SIMPLIFY = FALSE, USE.NAMES = TRUE, |
| .scheduling = c("static", "dynamic")) |
| { |
| cl <- defaultCluster(cl) |
| args <- list(...) |
| if (length(args) == 0) stop("need at least one argument") |
| .scheduling <- match.arg(.scheduling) |
| n <- lengths(args) |
| if (RECYCLE) { |
| vlen <- max(n) |
| if(vlen && min(n) == 0L) |
| stop("zero-length inputs cannot be mixed with those of non-zero length") |
| if (!all(n == vlen)) |
| for (i in seq_along(args)) # why not lapply? |
| args[[i]] <- rep(args[[i]], length.out = vlen) |
| } |
| else vlen <- min(n) |
| ## **** this closure is sending all of ... to all nodes |
| argfun <- function(i) c(lapply(args, function(x) x[[i]]), MoreArgs) |
| answer <- |
| if(.scheduling == "dynamic") dynamicClusterApply(cl, fun, vlen, argfun) |
| else staticClusterApply(cl, fun, vlen, argfun) |
| ## rest matches mapply(): with a different default for SIMPLIFY |
| if (USE.NAMES && length(args)) { |
| if (is.null(names1 <- names(args[[1L]])) && is.character(args[[1L]])) |
| names(answer) <- args[[1L]] |
| else if (!is.null(names1)) |
| names(answer) <- names1 |
| } |
| if (!isFALSE(SIMPLIFY) && length(answer)) |
| simplify2array(answer, higher = (SIMPLIFY == "array")) |
| else answer |
| } |
| |
| ## splitIndices <- function(nx, ncl) |
| ## { |
| ## i <- seq_len(nx) |
| ## if (ncl == 1L) i |
| ## else structure(split(i, cut(i, ncl)), names = NULL) |
| ## } |
| |
| # The fuzz used by cut() is too small when nx and ncl are both large |
| # and causes some groups to be empty. The definition below avoids that |
| # while minimizing changes from the results produced by the definition |
| # above. |
| splitIndices <- function(nx, ncl) { |
| i <- seq_len(nx) |
| if (ncl == 0L) list() |
| else if (ncl == 1L || nx == 1L) list(i) |
| else { |
| fuzz <- min((nx - 1L) / 1000, 0.4 * nx / ncl) |
| breaks <- seq(1 - fuzz, nx + fuzz, length.out = ncl + 1L) |
| structure(split(i, cut(i, breaks)), names = NULL) |
| } |
| } |
| |
| clusterSplit <- function(cl = NULL, seq) { |
| cl <- defaultCluster(cl) |
| lapply(splitIndices(length(seq), length(cl)), function(i) seq[i]) |
| } |
| |
| #internal |
| splitList <- function(x, ncl) |
| lapply(splitIndices(length(x), ncl), function(i) x[i]) |
| |
| #internal |
| splitRows <- function(x, ncl) |
| lapply(splitIndices(nrow(x), ncl), function(i) x[i, , drop=FALSE]) |
| |
| #internal |
| splitCols <- function(x, ncl) |
| lapply(splitIndices(ncol(x), ncl), function(i) x[, i, drop=FALSE]) |
| |
| #internal |
| staticNChunks <- function(nelems, nnodes, chunk.size) { |
| if (is.null(chunk.size) || chunk.size <= 0) |
| nnodes |
| else |
| max(1, ceiling(nelems / chunk.size)) |
| } |
| |
| #internal |
| dynamicNChunks <- function(nelems, nnodes, chunk.size) { |
| if (is.null(chunk.size)) |
| 2 * nnodes |
| else if (chunk.size <= 0) |
| nelems |
| else |
| max(1, ceiling(nelems / chunk.size)) |
| } |
| |
| parLapply <- function(cl = NULL, X, fun, ..., chunk.size = NULL) |
| { |
| cl <- defaultCluster(cl) |
| nchunks <- staticNChunks(length(X), length(cl), chunk.size) |
| do.call(c, |
| clusterApply(cl = cl, x = splitList(X, nchunks), |
| fun = lapply, FUN = fun, ...), |
| quote = TRUE) |
| } |
| |
| parLapplyLB <- function(cl = NULL, X, fun, ..., chunk.size = NULL) |
| { |
| cl <- defaultCluster(cl) |
| nchunks <- dynamicNChunks(length(X), length(cl), chunk.size) |
| do.call(c, |
| clusterApplyLB(cl = cl, x = splitList(X, nchunks), |
| fun = lapply, FUN = fun, ...), |
| quote = TRUE) |
| } |
| |
| parRapply <- function(cl = NULL, x, FUN, ..., chunk.size = NULL) |
| { |
| cl <- defaultCluster(cl) |
| nchunks <- staticNChunks(nrow(x), length(cl), chunk.size) |
| do.call(c, |
| clusterApply(cl = cl, x = splitRows(x, nchunks), |
| fun = apply, MARGIN = 1L, FUN = FUN, ...), |
| quote = TRUE) |
| } |
| |
| parCapply <- function(cl = NULL, x, FUN, ..., chunk.size = NULL) { |
| cl <- defaultCluster(cl) |
| nchunks <- staticNChunks(ncol(x), length(cl), chunk.size) |
| do.call(c, |
| clusterApply(cl = cl, x = splitCols(x, nchunks), |
| fun = apply, MARGIN = 2L, FUN = FUN, ...), |
| quote = TRUE) |
| } |
| |
| |
| parSapply <- |
| function (cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, |
| chunk.size = NULL) |
| { |
| FUN <- match.fun(FUN) # should this be done on worker? |
| answer <- parLapply(cl = cl, X = as.list(X), fun = FUN, ..., |
| chunk.size = chunk.size) |
| if(USE.NAMES && is.character(X) && is.null(names(answer))) |
| names(answer) <- X |
| if(!isFALSE(simplify) && length(answer)) |
| simplify2array(answer, higher = (simplify == "array")) |
| else answer |
| } |
| |
| parSapplyLB <- |
| function (cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, |
| chunk.size = NULL) |
| { |
| FUN <- match.fun(FUN) # should this be done on worker? |
| answer <- parLapplyLB(cl = cl, X = as.list(X), fun = FUN, ..., |
| chunk.size = chunk.size) |
| if(USE.NAMES && is.character(X) && is.null(names(answer))) |
| names(answer) <- X |
| if(!isFALSE(simplify) && length(answer)) |
| simplify2array(answer, higher = (simplify == "array")) |
| else answer |
| } |
| |
| |
| parApply <- function(cl = NULL, X, MARGIN, FUN, ..., chunk.size = NULL) |
| { |
| cl <- defaultCluster(cl) # initial sanity check |
| FUN <- match.fun(FUN) # should this be done on worker? |
| |
| ## Ensure that X is an array object |
| dl <- length(dim(X)) |
| if(!dl) stop("dim(X) must have a positive length") |
| if(is.object(X)) |
| X <- if(dl == 2L) as.matrix(X) else as.array(X) |
| ## now record dim as coercion can change it |
| ## (e.g. when a data frame contains a matrix). |
| d <- dim(X) |
| dn <- dimnames(X) |
| ds <- seq_len(dl) |
| |
| ## Extract the margins and associated dimnames |
| |
| if (is.character(MARGIN)) { |
| if(is.null(dnn <- names(dn))) # names(NULL) is NULL |
| stop("'X' must have named dimnames") |
| MARGIN <- match(MARGIN, dnn) |
| if (anyNA(MARGIN)) |
| stop("not all elements of 'MARGIN' are names of dimensions") |
| } |
| s.call <- ds[-MARGIN] |
| s.ans <- ds[MARGIN] |
| d.call <- d[-MARGIN] |
| d.ans <- d[MARGIN] |
| dn.call <- dn[-MARGIN] |
| dn.ans <- dn[MARGIN] |
| ## dimnames(X) <- NULL |
| |
| ## do the calls |
| |
| d2 <- prod(d.ans) |
| if(d2 == 0L) { |
| ## arrays with some 0 extents: return ``empty result'' trying |
| ## to use proper mode and dimension: |
| ## The following is still a bit `hackish': use non-empty X |
| newX <- array(vector(typeof(X), 1L), dim = c(prod(d.call), 1L)) |
| ans <- FUN(if(length(d.call) < 2L) newX[,1] else |
| array(newX[, 1L], d.call, dn.call), ...) |
| return(if(is.null(ans)) ans else if(length(d.ans) < 2L) ans[1L][-1L] |
| else array(ans, d.ans, dn.ans)) |
| } |
| ## else |
| newX <- aperm(X, c(s.call, s.ans)) |
| dim(newX) <- c(prod(d.call), d2) |
| ans <- vector("list", d2) |
| arglist <- if(length(d.call) < 2L) {# vector |
| if (length(dn.call)) dimnames(newX) <- c(dn.call, list(NULL)) |
| lapply(seq_len(d2), function(i) newX[,i]) |
| } else |
| lapply(seq_len(d2), function(i) array(newX[,i], d.call, dn.call)) |
| ans <- parLapply(cl = cl, X = arglist, fun = FUN, ..., |
| chunk.size = chunk.size) |
| |
| ## answer dims and dimnames |
| |
| ans.list <- is.recursive(ans[[1L]]) |
| l.ans <- length(ans[[1L]]) |
| |
| ans.names <- names(ans[[1L]]) |
| if(!ans.list) |
| ans.list <- any(lengths(ans) != l.ans) |
| if(!ans.list && length(ans.names)) { |
| all.same <- vapply(ans, function(x) identical(names(x), ans.names), NA) |
| if (!all(all.same)) ans.names <- NULL |
| } |
| len.a <- if(ans.list) d2 else length(ans <- unlist(ans, recursive = FALSE)) |
| if(length(MARGIN) == 1L && len.a == d2) { |
| names(ans) <- if(length(dn.ans[[1L]])) dn.ans[[1L]] # else NULL |
| return(ans) |
| } |
| if(len.a == d2) |
| return(array(ans, d.ans, dn.ans)) |
| if(len.a && len.a %% d2 == 0L) { |
| if(is.null(dn.ans)) dn.ans <- vector(mode="list", length(d.ans)) |
| dn.ans <- c(list(ans.names), dn.ans) |
| return(array(ans, c(len.a %/% d2, d.ans), |
| if(!all(vapply(dn.ans, is.null, NA))) dn.ans)) |
| } |
| return(ans) |
| } |
| |