| % File src/library/parallel/vignettes/parallel.Rnw |
| % Part of the R package, https://www.R-project.org |
| % Copyright 2011-2017 R Core Team |
| % Distributed under GPL 2 or later |
| |
| \documentclass[a4paper]{article} |
| |
| \usepackage{Rd, parskip, amsmath, enumerate} |
| \usepackage[round]{natbib} |
| \usepackage{hyperref} |
| \usepackage{color} |
| \definecolor{Blue}{rgb}{0,0,0.8} |
| \hypersetup{% |
| colorlinks,% |
| plainpages=true,% |
| linkcolor=black,% |
| citecolor=black,% |
| urlcolor=Blue,% |
| pdfstartview={XYZ null null 1},% 100% |
| pdfview={XYZ null null null},% |
| pdfpagemode=UseNone,% for no outline |
| pdfauthor={R Core},% |
| pdftitle={Package `parallel'}% Could also have pdfusetitle |
| } |
| |
| %\VignetteIndexEntry{Package 'parallel'} |
| %\VignettePackage{parallel} |
| |
| |
| \title{Package `parallel'} |
| \author{R-core} |
| |
| |
| \begin{document} |
| |
| \maketitle |
| |
| \section{Introduction} |
| |
| Package \pkg{parallel} was first included in \R{} 2.14.0. It builds |
| on the work done for CRAN packages \CRANpkg{multicore} \citep{multicore} |
| and \CRANpkg{snow} \citep{snow} and provides drop-in replacements for most |
| of the functionality of those packages, with integrated handling of |
| random-number generation. |
| |
| Parallelism can be done in computation at many different levels: this |
| package is principally concerned with `coarse-grained |
| parallelization'. At the lowest level, modern CPUs can do several |
| basic operations simultaneously (e.g.{} integer and floating-point |
| arithmetic), and several implementations of external BLAS libraries |
| use multiple threads to do parts of basic vector/matrix operations in |
| parallel. Several contributed \R{} packages use multiple threads at C |
| level \emph{via} OpenMP or pthreads. |
| |
| This package handles running much larger chunks of computations in |
| parallel. A typical example is to evaluate the same \R{} function on |
| many different sets of data: often simulated data as in bootstrap |
| computations (or with `data' being the random-number stream). The |
| crucial point is that these chunks of computation are unrelated and do |
| not need to communicate in any way. It is often the case that the |
| chunks take approximately the same length of time. The basic |
| computational model is |
| \begin{enumerate}[(a)] |
| \item Start up $M$ `worker' processes, and do any initialization |
| needed on the workers. |
| \item Send any data required for each task to the workers. |
| \item Split the task into $M$ roughly equally-sized chunks, and send |
| the chunks (including the \R{} code needed) to the workers. |
| \item Wait for all the workers to complete their tasks, and ask them |
| for their results. |
| \item Repeat steps (b--d) for any further tasks. |
| \item Shut down the worker processes. |
| \end{enumerate} |
| Amongst the initializations which may be needed are to load packages |
| and initialize the random-number stream. |
| |
| There are implementations of this model in the functions |
| \code{mclapply} and \code{parLapply} as near-drop-in replacements for |
| \code{lapply}. |
| |
| A slightly different model is to split the task into $M_1 > M$ chunks, |
| send the first $M$ chunks to the workers, then repeatedly wait for any |
| worker to complete and send it the next remaining task: see the |
| section on `load balancing'. |
| |
| In principle the workers could be implemented by threads\footnote{only |
| `in principle' since the \R{} interpreter is not thread-safe.} or |
| lightweight processes, but in the current implementation they are full |
| processes. They can be created in one of three ways: |
| \begin{enumerate} |
| \item \emph{Via} \code{system("Rscript")} or similar to launch a new |
| process on the current machine or a similar machine with an identical |
| \R{} installation. This then needs a way to communicate between |
| master and worker processes, which is usually done \emph{via} |
| sockets. |
| |
| This should be available on all \R{} platforms, although it is |
| conceivable that zealous security measures could block the |
| inter-process communication \emph{via} sockets. Users of Windows |
| and macOS may expect pop-up dialog boxes from the firewall asking |
| if an \R{} process should accept incoming connections. |
| |
| Following \CRANpkg{snow}, a pool of worker processes listening |
| \emph{via} sockets for commands from the master is called a |
| `cluster' of nodes. |
| |
| \item \emph{Via} forking. \emph{Fork} is a |
| concept\footnote{\url{https://en.wikipedia.org/wiki/Fork_(operating_system)}} |
| from POSIX operating systems, and should be available on all \R{} |
| platforms except Windows. This creates a new \R{} process by taking |
| a complete copy of the master process, including the workspace and |
| state of the random-number stream. However, the copy will (in any |
| reasonable OS) share memory pages with the master until modified so |
| forking is very fast. |
| |
| The use of forking was pioneered by package \CRANpkg{multicore}. |
| |
| Note that as it does share the complete process, it also shares any |
| GUI elements, for example an \R{} console and on-screen devices. |
| This can cause havoc.\footnote{Some precautions are taken on macOS: |
| for example the event loops for \command{R.app} and the |
| \code{quartz} device are inhibited in the child. This information |
| is available at C level in the \code{Rboolean} variable |
| \code{R\_isForkedChild}.} |
| |
| There needs to be a way to communicate between master and worker. |
| Once again there are several possibilities since master and workers |
| share memory. In \CRANpkg{multicore} the initial fork sends an \R{} |
| expression to be evaluated to the worker, and the master process |
| opens a pipe for reading that is used by the worker to return the |
| results. Both that and creating a cluster of nodes communicating |
| \emph{via} sockets are supported in package \pkg{parallel}. |
| |
| \item Using OS-level facilities to set up a means to send tasks to |
| other members of a group of machines. There are several ways to do |
| that, and for example package \CRANpkg{snow} can make use of MPI |
| (`message passing interface') using \R{} package \CRANpkg{Rmpi}. |
| Communication overheads can dominate computation times in this |
| approach, so it is most often used on tightly-coupled networks of |
| computers with high-speed interconnects. |
| |
| CRAN packages following this approach include \CRANpkg{GridR} (using |
| Condor or Globus) and \CRANpkg{Rsge} (using SGE, currently called |
| `Oracle Grid Engine'). |
| |
| It will not be considered further in this vignette, but those parts |
| of \pkg{parallel} which provide \CRANpkg{snow}-like functions will |
| accept \CRANpkg{snow} clusters including MPI clusters. |
| \end{enumerate} |
| |
| The landscape of parallel computing has changed with the advent of |
| shared-memory computers with multiple (and often many) CPU cores. |
| Until the late 2000's parallel computing was mainly done on clusters |
| of large numbers of single- or dual-CPU computers: nowadays even |
| laptops have two or four cores, and servers with 8, 32 or more cores |
| are commonplace. It is such hardware that package \pkg{parallel} is |
| designed to exploit. It can also be used with several computers |
| running the same version of \R{} connected by (reasonable-speed) |
| ethernet: the computers need not be running the same OS. |
| |
| Note that all these methods of communication use |
| \code{serialize}/\code{unserialize} to send \R{} objects between |
| processes. This has limits (typically hundreds of millions of |
| elements) which a well-designed parallelized algorithm should not |
| approach. |
| |
| \section{Numbers of CPUs/cores} |
| |
| In setting up parallel computations it can be helpful to have some |
| idea of the number of CPUs or cores available, but this is a rather |
| slippery concept. Nowadays almost all physical CPUs contain two or |
| more cores that run more-or-less independently (they may share parts |
| of the cache memory, and they do share access to RAM). However, on |
| some processors these cores may themselves be able to run multiple |
| tasks simultaneously, and some OSes (e.g.{} Windows) have the concept |
| of \emph{logical} CPUs which may exceed the number of cores. |
| |
| Note that all a program can possibly determine is the total number of |
| CPUs and/or cores available. This is not necessarily the same as the |
| number of CPUs available \emph{to the current user} which may well be |
| restricted by system policies on multi-user systems. Nor does it give |
| much idea of a reasonable number of CPUs to use for the current task: |
| the user may be running many \R{} processes simultaneously, and those |
| processes may themselves be using multiple threads through a |
| multi-threaded BLAS, compiled code using OpenMP or other low-level |
| forms of parallelism. We have even seen instances of |
| \CRANpkg{multicore}'s \code{mclapply} being called |
| recursively,\footnote{\code{parallel::mclapply} detects this and runs |
| nested calls serially.} generating $2n + n^2$ processes on a machine |
| estimated to have $n = 16$ cores. |
| |
| But in so far as it is a useful guideline, function |
| \code{detectCores()} tries to determine the number of CPU cores in the |
| machine on which \R{} is running: it has ways to do so on all known |
| current \R{} platforms. What exactly it measures is OS-specific: we |
| try where possible to report the number of logical cores available. |
| |
| On Windows the default is to report the number of logical CPUs. On |
| modern hardware (e.g.{} Intel \emph{Core i7}) the latter may not be |
| unreasonable as hyper-threading does give a significant extra |
| throughput. What \code{detectCores(logical = FALSE)} reports is |
| OS-version-dependent: on recent versions of Windows it reports the |
| number of physical cores but on older versions it might report the |
| number of physical CPU packages. |
| |
| \section{Analogues of apply functions} |
| |
| By far the most common direct applications of packages \CRANpkg{multicore} |
| and \CRANpkg{snow} have been to provide parallelized replacements of |
| \code{lapply}, \code{sapply}, \code{apply} and related functions. |
| |
| As analogues of \code{lapply} there are |
| \begin{verbatim} |
| parLapply(cl, x, FUN, ...) |
| mclapply(X, FUN, ..., mc.cores) |
| \end{verbatim} |
| where \code{mclapply} is not available\footnote{except as a stub |
| which simply calls \code{lapply}.} on Windows and has further |
| arguments discussed on its help page. They differ slightly in |
| philosophy: \code{mclapply} sets up a pool of \code{mc.cores} workers |
| just for this computation, whereas \code{parLapply} uses a less |
| ephemeral pool specified by the object \code{cl} created by a call to |
| \code{makeCluster} (which \emph{inter alia} specifies the size of the |
| pool). So the workflow is |
| \begin{verbatim} |
| cl <- makeCluster(<size of pool>) |
| # one or more parLapply calls |
| stopCluster(cl) |
| \end{verbatim} |
| % we could arrange to stop a cluster when cl is gc-ed |
| |
| For matrices there are the rarely used \code{parApply} and |
| \code{parCapply} functions, and the more commonly used |
| \code{parRapply}, a parallel row \code{apply} for a matrix. |
| |
| \section{SNOW Clusters} |
| |
| The package contains a slightly revised copy of much of \CRANpkg{snow}, |
| and the functions it contains can also be used with clusters created |
| by \CRANpkg{snow} (provided the package is on the search path). |
| |
| Two functions are provided to create SNOW clusters, |
| \code{makePSOCKcluster} (a streamlined version of |
| \code{snow::makeSOCKcluster}) and (except on Windows) |
| \code{makeForkCluster}. They differ only in the way they spawn worker |
| processes: \code{makePSOCKcluster} uses \code{Rscript} to launch |
| further copies of \R{} (on the same host or optionally elsewhere) |
| whereas \code{makeForkCluster} forks the workers on the current host |
| (which thus inherit the environment of the current session). |
| |
| These functions would normally be called \emph{via} \code{makeCluster}. |
| |
| Both \code{stdout()} and \code{stderr()} of the workers are |
| redirected, by default being discarded but they can be logged using |
| the \code{outfile} option. Note that the previous sentence refers to |
| the \emph{connections} of those names, not the C-level file handles. |
| Thus properly written \R{} packages using \code{Rprintf} will have |
| their output redirected, but not direct C-level output. |
| |
| A default cluster can be registered by a call to |
| \code{setDefaultCluster()}: this is then used whenever one of the |
| higher-level functions such as \code{parApply} is called without an |
| explicit cluster. A little care is needed when repeatedly re-using a |
| pool of workers, as their workspaces will accumulate objects from past |
| usage, and packages may get added to the search path. |
| |
| If clusters are to be created on a host other than the current machine |
| (\samp{localhost}), \code{makeCluster} may need to be be given more |
| information in the shape of extra arguments. |
| \begin{itemize} |
| \item If the worker machines are not set up in exactly the same way |
| as the master (for example if they are of a different |
| architecture), use \code{homogeneous = FALSE} and perhaps set |
| \code{rscript} to the full path to \command{Rscript} on the workers. |
| \item The worker machines need to know how to communicate with the |
| master: normally this can be done using the hostname found by |
| \code{Sys.info()}, but on private networks this need not be the |
| case and \code{master} may need to be supplied as a name or IP |
| address, e.g. \code{master = "192.168.1.111"}. |
| \item By default \command{ssh} is used to launch R on the workers. |
| If it is known by some other name, use e.g.{} |
| \code{rshcmd = "plink.exe"} for a Windows box using PUTTY. |
| SSH should be set |
| up to use silent authentication: setups which require a password |
| to be supplied may or may not work. |
| \item Socket communication is done over port a randomly chosen port |
| in the range \code{11000:11999}: site policies |
| might require some other port to be used, in which case set |
| argument \code{port} or environment variable \env{R\_PARALLEL\_PORT}. |
| \end{itemize} |
| |
| \section{Forking} |
| |
| Except on Windows, the package contains a copy of \CRANpkg{multicore}: |
| there a few names with the added prefix \code{mc}, e.g.{} |
| \code{mccollect} and \code{mcparallel}. (Package \CRANpkg{multicore} |
| used these names, but also the versions without the prefix which are |
| too easily masked: e.g.{} package \CRANpkg{lattice} used to have a |
| function \code{parallel}.) |
| |
| The low-level functions from \CRANpkg{multicore} are provided but not |
| exported from the namespace. |
| |
| There are high-level functions \code{mclapply} and \code{pvec}: unlike |
| the versions in \CRANpkg{multicore} these default to 2 cores, but this can |
| be controlled by setting \code{options("mc.cores")}, and that takes |
| its default from environment variable \code{MC\_CORES} when the |
| package is loaded. (Setting this to \code{1} inhibits parallel |
| operation: there are stub versions of these functions on Windows which |
| force \code{mc.cores = 1}.) |
| |
| Functions \code{mcmapply} and \code{mcMap} provide analogues of |
| \code{mapply} and \code{Map}. |
| |
| Note the earlier comments about using forking in a GUI environment. |
| |
| The parent and forked \R{} processes share the per-session temporary |
| directory \code{tempdir()}, which can be a problem as a lot of code |
| has assumed it is private to the \R{} process. Further, prior to \R{} |
| 2.14.1 it was possibly for \code{tempfile} in two processes to select |
| the same filename in that temporary directory, and do it sufficiently |
| simultaneously that neither saw it as being in use. |
| |
| The forked workers share file handles with the master: this means that |
| any output from the worker should go to the same place as |
| \file{stdout} and \file{stderr} of the master process. (This does not |
| work reliably on all OSes: problems have also been noted when forking |
| a session that is processing batch input from \file{stdin}.) Setting |
| argument \code{mc.silent = TRUE} silences \file{stdout} for the child: |
| \file{stderr} is not affected. |
| |
| Sharing file handles also impacts graphics devices as forked workers |
| inherit all open graphics devices of the parent: they should not |
| attempt to make use of them. |
| |
| |
| \section{Random-number generation} |
| |
| Some care is needed with parallel computation using (pseudo-)random |
| numbers: the processes/threads which run separate parts of the |
| computation need to run independent (and preferably reproducible) |
| random-number streams. One way to avoid any difficulties is (where |
| possible) to do all the randomization in the master process: this is |
| done where possible in package \CRANpkg{boot} (version 1.3-1 and later). |
| |
| When an \R{} process is started up it takes the random-number seed |
| from the object \code{.Random.seed} in a saved workspace or constructs |
| one from the clock time and process ID when random-number generation |
| is first used (see the help on \code{RNG}). Thus worker processes |
| might get the same seed because a workspace containing |
| \code{.Random.seed} was restored or the random number generator has |
| been used before forking: otherwise these get a non-reproducible seed |
| (but with very high probability a different seed for each worker). |
| |
| The alternative is to set separate seeds for each worker process in |
| some reproducible way from the seed in the master process. This is |
| generally plenty safe enough, but there have been worries that the |
| random-number streams in the workers might somehow get into step. One |
| approach is to take the seeds a long way apart in the random-number |
| stream: note that random numbers taken a long (fixed) distance apart |
| in a single stream are not necessarily (and often are not) as |
| independent as those taken a short distance apart. Yet another idea |
| (as used by e.g.{} \pkg{JAGS}) is to use different random-number |
| generators for each separate run/process. |
| |
| Package \pkg{parallel} contains an implementation of the ideas of |
| \citet{lecuyer.2002}: this uses a single RNG and make \emph{streams} |
| with seeds $2^{127}$ steps apart in the random number stream (which |
| has period approximately $2^{191}$). This is based on the generator |
| of \citet{lecuyer.1999}; the reason for choosing that |
| generator\footnote{apart from the commonality of authors!} is that it |
| has a fairly long period with a small seed (6 integers), and unlike |
| \R{}'s default \code{"Mersenne-Twister"} RNG, it is simple to advance |
| the seed by a fixed number of steps. The generator is the combination |
| of two: |
| \begin{eqnarray*} |
| x_n &=& 1403580 \times x_{n-2} - 810728 \times x_{n-3} \mod{(2^{32} - 209)}\\ |
| y_n &=& 527612 \times y_{n-1} - 1370589 \times y_{n-3} \mod{(2^{32} - 22853)}\\ |
| z_n &=& (x_n - y_n) \mod{4294967087}\\ |
| u_n &=& z_n/4294967088\ \mbox{unless $z_n = 0$} |
| \end{eqnarray*} |
| The `seed' then consists of $(x_{n-3}, x_{n-2}, x_{n-1}, y_{n-3}, y_{n-2}, y_{n-1})$, |
| and the recursion for each of $x_n$ and $y_n$ can have |
| pre-computed coefficients for $k$ steps ahead. For $k = 2^{127}$, the |
| seed is advanced by $k$ steps by \R{} call |
| \code{.Random.seed <- nextRNGStream(.Random.seed)}. |
| |
| % use \verb to get consistent quote mapping. |
| The \citet{lecuyer.1999} generator is available \emph{via} |
| \verb|RNGkind("L'Ecuyer-CMRG")|. Thus using the ideas of |
| \citet{lecuyer.2002} is as simple as |
| <<Ecuyer-ex, eval=FALSE>>= |
| RNGkind("L'Ecuyer-CMRG") |
| set.seed(2002) # something |
| M <- 16 ## start M workers |
| s <- .Random.seed |
| for (i in 1:M) { |
| s <- nextRNGStream(s) |
| # send s to worker i as .Random.seed |
| } |
| @ |
| and this is is implemented for SNOW clusters in function |
| \code{clusterSetRNGStream}, and as part of \code{mcparallel} and |
| \code{mclapply} (by default). |
| |
| Apart from \emph{streams} ($2^{127}$ steps apart), there is the concept of |
| \emph{sub-streams} starting from seeds $2^{76}$ steps apart. Function |
| \code{nextRNGSubStream} advances to the next substream. |
| |
| A direct \R{} interface to the (clunkier) original C implementation is |
| available in CRAN package \CRANpkg{rlecuyer} \citep{rlecuyer}. That works |
| with named streams, each of which have three 6-element seeds |
| associated with them. This can easily be emulated in \R{} by storing |
| \code{.Random.seed} at suitable times. There is another interface |
| using S4 classes in package \CRANpkg{rstream} \citep{rstream}. |
| |
| \section{Load balancing} |
| |
| The introduction mentioned a different strategy which dynamically |
| allocates tasks to workers: this is sometimes known as `load |
| balancing' and is implemented in \code{mclapply(mc.preschedule = |
| FALSE)}, \code{clusterApplyLB} and wrappers. |
| |
| Load balancing is potentially advantageous when the tasks take quite |
| dissimilar amounts of computation time, or where the nodes are of |
| disparate capabilities. But some \emph{caveats} are in order: |
| \begin{enumerate}[(a)] |
| \item Random number streams are allocated to nodes, so if the tasks |
| involve random numbers they are likely to be non-repeatable (as the |
| allocation of tasks to nodes depends on the workloads of the nodes). |
| It would however take only slightly more work to allocate a stream to |
| each task. |
| \item More care is needed is allocating the tasks. If 1000 tasks need |
| to be allocated to 10 nodes, the standard approach send chunks of |
| 100 tasks to each of the nodes. The load-balancing approach sends |
| tasks one at a time to a node, and the communication overhead may be |
| high. So it makes sense to have substantially more tasks than |
| nodes, but not by a factor of 100 (and maybe not by 10). |
| \end{enumerate} |
| |
| |
| \section{Setting the CPU Affinity with mclapply} |
| %%\author{Helena Kotthaus, Andreas Lang \\ LS12 Department of Computer Science, TU Dortmund} |
| |
| The parameter \code{affinity.list} of the \code{mclapply} function |
| can be used to run elements of the input vector \code{X} of \code{mclapply} on |
| specific CPUs. |
| \code{affinity.list} is a vector (atomic or list) containing the CPU |
| affinity mask for each element of \code{X}, it describes on which CPU |
| (core or hyperthread unit) a given item is allowed to run, see |
| \code{? mcaffinity}. |
| This can be helpful, if the elements of \code{X} (parallel jobs) have a |
| high variance of completion time or if the hardware architecture |
| is heterogeneous. It also enables the development of scheduling |
| strategies for optimizing the overall runtime of independent parallel jobs. |
| If \code{affinity.list} is set, the \code{mc.core} parameter is |
| replaced with the number of CPU ids used in the affinity masks. |
| To use this parameter prescheduling has to be deactivated |
| (\code{mc.preschedule = FALSE}). |
| For each value of the input vector \code{X} a separate job is forked. |
| The master process only forks one child process for each selected CPU |
| at once, to ensure that the number of forked jobs does not exceed the number |
| of selected CPUs. As soon as a child process has finished, the next |
| available job for the CPU is forked. |
| |
| The following code example demonstrates how the execution |
| time can be reduced by assigning the elements of \code{X} to specific CPUs, |
| not on Windows, where \code{mc.cores} must remain at 1. |
| |
| %% setting eval=TRUE would cost ca. 1.5 minutes every time this is built |
| %% also, .Platform$OS.type == "Windows" cannot use mc.cores = 2 |
| <<affinity-ex, eval=FALSE>>= |
| ## Exemplary variance filter executed on three different matrices in parallel. |
| ## Can be used in gene expression analysis as a prefilter |
| ## for the number of covariates. |
| |
| library(parallel) |
| n <- 300 # observations |
| p <- 20000 # covariates |
| |
| ## Different sized matrices as filter inputs |
| ## Matrix A and B form smaller work loads |
| ## while matrix C forms a bigger workload (2*p) |
| library(stats) |
| A <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p) |
| B <- matrix(replicate( p, rnorm(n, sd = runif(1, 0.1, 10))), n, p) |
| C <- matrix(replicate(2*p, rnorm(n, sd = runif(1, 0.1, 10))), n, 2*p) |
| |
| varFilter <- function (X, nSim = 20) { |
| for (i in 1:nSim) { |
| train <- sample(nrow(X), 2 / 3 * nrow(X)) |
| colVars <- apply(X[train, ], 2, var) |
| keep <- names(head(sort(colVars, decreasing = TRUE), 100)) |
| # myAlgorithm(X[, keep]) |
| } |
| } |
| |
| ## Runtime comparison ----------------------------------- |
| |
| ## mclapply with affinity.list |
| ## CPU mapping: A and B run on CPU 1 while C runs on CPU 2: |
| affinity <- c(1,1,2) |
| system.time( |
| mclapply(X = list(A,B,C), FUN = varFilter, |
| mc.preschedule = FALSE, affinity.list = affinity)) |
| ## user system elapsed |
| ## 34.909 0.873 36.720 |
| |
| |
| ## mclapply without affinity.list |
| system.time( |
| mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2, |
| mc.preschedule = FALSE) ) |
| ## user system elapsed |
| ## 72.893 1.588 55.982 |
| |
| |
| ## mclapply with prescheduling |
| system.time( |
| mclapply(X = list(A,B,C), FUN = varFilter, mc.cores = 2, |
| mc.preschedule = TRUE) ) |
| ## user system elapsed |
| ## 53.455 1.326 53.399 |
| @ |
| |
| Instead of using \code{affinity.list} for runtime optimization, |
| it can also be used to simply restrict the computation |
| to specific CPUs as in the following example. |
| |
| <<eval = FALSE>>= |
| ## Restricts all elements of X to run on CPU 1 and 2. |
| X <- list(1, 2, 3) |
| affinity.list <- list(c(1,2), c(1,2), c(1,2)) |
| mclapply(X = X, FUN = function (i) i*i, |
| mc.preschedule = FALSE, affinity.list = affinity.list) |
| @ |
| |
| |
| \section{Portability considerations} |
| |
| People wanting to provide parallel facilities in their code need to |
| decide how hard they want to try to be portable and efficient: no |
| approach works optimally on all platforms. |
| |
| Using \code{mclapply} is usually the simplest approach, but will run |
| serial versions of the code on Windows. This may suffice where |
| parallel computation is only required for use on a single multi-core |
| Unix-alike server---for \code{mclapply} can only run on a single |
| shared-memory system. There is fallback to serial use when needed, by |
| setting \code{mc.cores = 1}. |
| |
| Using \code{parLapply} will work everywhere that socket communication |
| works, and can be used, for example, to harness all the CPU cores in a |
| lab of machines that are not otherwise in use. But socket |
| communication may be blocked even when using a single machine and is |
| quite likely to be blocked between machines in a lab. There is not |
| currently any fallback to serial use, nor could there easily be (as |
| the workers start with a different \R{} environment from the one |
| current on the master). |
| |
| An example of providing access to both approaches as well as serial |
| code is package \CRANpkg{boot}, version \code{1.3-3} or later. |
| |
| \section{Extended examples} |
| |
| \SweaveOpts{eval=FALSE} |
| <<hide=TRUE>>= |
| library(parallel) |
| @ |
| |
| Probably the most common use of coarse-grained parallelization in |
| statistics is to do multiple simulation runs, for example to do large |
| numbers of bootstrap replicates or several runs of an MCMC simulation. |
| We show an example of each. |
| |
| Note that some of the examples will only work serially on Windows and |
| some actually are computer-intensive. |
| |
| \subsection{Bootstrapping} |
| |
| Package \CRANpkg{boot} \citep{boot} is support software for the monograph |
| by \citet{Davison.Hinkley.97}. Bootstrapping is often used as an |
| example of easy parallelization, and some methods of producing |
| confidence intervals require many thousands of bootstrap samples. |
| As from version \code{1.3-1} the package itself has parallel support |
| within its main functions, but we illustrate how to use the original |
| (serial) functions in parallel computations. |
| |
| We consider two examples using the \code{cd4} dataset from package |
| \CRANpkg{boot} where the interest is in the correlation between before and |
| after measurements. The first is a straight simulation, often called |
| a \emph{parametric bootstrap}. The non-parallel form is |
| <<>>= |
| library(boot) |
| cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) |
| cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) |
| cd4.boot <- boot(cd4, corr, R = 999, sim = "parametric", |
| ran.gen = cd4.rg, mle = cd4.mle) |
| boot.ci(cd4.boot, type = c("norm", "basic", "perc"), |
| conf = 0.9, h = atanh, hinv = tanh) |
| @ |
| |
| To do this with \code{mclapply} we need to break this into separate |
| runs, and we will illustrate two runs of 500 simulations each: |
| <<>>= |
| cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) |
| cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) |
| run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric", |
| ran.gen = cd4.rg, mle = cd4.mle) |
| mc <- 2 # set as appropriate for your hardware |
| ## To make this reproducible: |
| set.seed(123, "L'Ecuyer") |
| cd4.boot <- do.call(c, mclapply(seq_len(mc), run1) ) |
| boot.ci(cd4.boot, type = c("norm", "basic", "perc"), |
| conf = 0.9, h = atanh, hinv = tanh) |
| @ |
| There are many ways to program things like this: often the neatest is |
| to encapsulate the computation in a function, so this is the parallel |
| form of |
| <<eval=FALSE>>= |
| do.call(c, lapply(seq_len(mc), run1)) |
| @ |
| |
| To run this with \code{parLapply} we could take a similar approach by |
| <<>>= |
| run1 <- function(...) { |
| library(boot) |
| cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) |
| cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) |
| boot(cd4, corr, R = 500, sim = "parametric", |
| ran.gen = cd4.rg, mle = cd4.mle) |
| } |
| cl <- makeCluster(mc) |
| ## make this reproducible |
| clusterSetRNGStream(cl, 123) |
| library(boot) # needed for c() method on master |
| cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1) ) |
| boot.ci(cd4.boot, type = c("norm", "basic", "perc"), |
| conf = 0.9, h = atanh, hinv = tanh) |
| stopCluster(cl) |
| @ |
| Note that whereas with \code{mclapply} all the packages and objects |
| we use are automatically available on the workers, this is not in |
| general\footnote{it is with clusters set up with |
| \code{makeForkCluster}.} the case with the \code{parLapply} |
| approach. There is often a delicate choice of where to do the |
| computations: for example we could compute \code{cd4.mle} on the |
| workers (as above) or on the master and send the value to the |
| workers. We illustrate the latter by the following code |
| <<>>= |
| cl <- makeCluster(mc) |
| cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) |
| cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) |
| clusterExport(cl, c("cd4.rg", "cd4.mle")) |
| junk <- clusterEvalQ(cl, library(boot)) # discard result |
| clusterSetRNGStream(cl, 123) |
| res <- clusterEvalQ(cl, boot(cd4, corr, R = 500, |
| sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle)) |
| library(boot) # needed for c() method on master |
| cd4.boot <- do.call(c, res) |
| boot.ci(cd4.boot, type = c("norm", "basic", "perc"), |
| conf = 0.9, h = atanh, hinv = tanh) |
| stopCluster(cl) |
| @ |
| |
| Running the double bootstrap on the same problem is far more |
| computer-intensive. The standard version is |
| <<fig=TRUE>>= |
| R <- 999; M <- 999 ## we would like at least 999 each |
| cd4.nest <- boot(cd4, nested.corr, R=R, stype="w", t0=corr(cd4), M=M) |
| ## nested.corr is a function in package boot |
| op <- par(pty = "s", xaxs = "i", yaxs = "i") |
| qqplot((1:R)/(R+1), cd4.nest$t[, 2], pch = ".", asp = 1, |
| xlab = "nominal", ylab = "estimated") |
| abline(a = 0, b = 1, col = "grey") |
| abline(h = 0.05, col = "grey") |
| abline(h = 0.95, col = "grey") |
| par(op) |
| |
| nominal <- (1:R)/(R+1) |
| actual <- cd4.nest$t[, 2] |
| 100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))] |
| @ |
| which took about 55 secs on one core of an 8-core Linux server. |
| |
| Using \code{mclapply} we could use |
| <<eval=FALSE>>= |
| mc <- 9 |
| R <- 999; M <- 999; RR <- floor(R/mc) |
| run2 <- function(...) |
| cd4.nest <- boot(cd4, nested.corr, R=RR, stype="w", t0=corr(cd4), M=M) |
| cd4.nest <- do.call(c, mclapply(seq_len(mc), run2, mc.cores = mc) ) |
| nominal <- (1:R)/(R+1) |
| actual <- cd4.nest$t[, 2] |
| 100*nominal[c(sum(actual <= 0.05), sum(actual < 0.95))] |
| @ |
| which ran in 11 secs (elapsed) using all of that server. |
| |
| \subsection{MCMC runs} |
| |
| \citet{Ripley.88} discusses the maximum-likelihood estimation of the |
| Strauss process, which is done by solving a moment equation |
| \[ |
| E_c T = t |
| \] |
| where $T$ is the number of $R$-close pairs and $t$ is the observed |
| value, $30$ in the following example. A serial approach to the |
| initial exploration might be |
| <<>>= |
| library(spatial) |
| towns <- ppinit("towns.dat") |
| tget <- function(x, r=3.5) sum(dist(cbind(x$x, x$y)) < r) |
| t0 <- tget(towns) |
| R <- 1000 |
| c <- seq(0, 1, 0.1) |
| ## res[1] = 0 |
| res <- c(0, sapply(c[-1], function(c) |
| mean(replicate(R, tget(Strauss(69, c=c, r=3.5)))))) |
| plot(c, res, type="l", ylab="E t") |
| abline(h=t0, col="grey") |
| @ |
| which takes about 20 seconds today, but many hours when first done |
| in 1985. A parallel version might be |
| <<>>= |
| run3 <- function(c) { |
| library(spatial) |
| towns <- ppinit("towns.dat") # has side effects |
| mean(replicate(R, tget(Strauss(69, c=c, r=3.5)))) |
| } |
| cl <- makeCluster(10, methods = FALSE) |
| clusterExport(cl, c("R", "towns", "tget")) |
| res <- c(0, parSapply(cl, c[-1], run3)) # 10 tasks |
| stopCluster(cl) |
| @ |
| which took about 4.5 secs, plus 2 secs to set up the cluster. Using a |
| fork cluster (not on Windows) makes the startup much faster and setup easier: |
| <<eval=FALSE>>= |
| cl <- makeForkCluster(10) # fork after the variables have been set up |
| run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5)))) |
| res <- c(0, parSapply(cl, c[-1], run4)) |
| stopCluster(cl) |
| @ |
| As one might expect, the \code{mclapply} version is slightly simpler: |
| <<eval=FALSE>>= |
| run4 <- function(c) mean(replicate(R, tget(Strauss(69, c=c, r=3.5)))) |
| res <- c(0, unlist(mclapply(c[-1], run4, mc.cores = 10))) |
| @ |
| If you do not have as many as 10 cores, you might want to consider |
| load-balancing in a task like this as the time taken per simulation |
| does vary with \code{c}. This can be done using |
| \code{mclapply(mc.preschedule = FALSE)} or \code{parSapplyLB}. The |
| disadvantage is that the results would not be reproducible (which does |
| not matter here). |
| |
| \subsection{Package installation} |
| |
| With over 4000 \R{} packages available, it is often helpful to do a |
| comprehensive install in parallel. We provide facilities to do so in |
| \code{install.packages} \emph{via} a parallel \command{make}, but that |
| approach may not be suitable.\footnote{A parallel \command{make} might |
| not be available, and we have seen a couple of instances of package |
| installation not working correctly when run from \command{make}.} |
| Some of the tasks take many times longer than others, and we do not |
| know in advance which these are. |
| |
| We illustrate an approach using package \pkg{parallel} which is used |
| on part of the CRAN check farm. Suppose that there is a function |
| \code{do\_one(pkg)} which installs a single package and then returns. |
| Then the task is to run \code{do\_one} on as many of the \code{M} |
| workers as possible whilst ensuring that all of the direct and |
| indirect dependencies of \code{pkg} are installed before \code{pkg} |
| itself. As the installation of a single package can block several |
| others, we do need to allow the number of installs running |
| simultaneously to vary: the following code achieves that, but needs to |
| use low-level functions to do so. |
| |
| <<eval=FALSE>>= |
| pkgs <- "<names of packages to be installed>" |
| M <- 20 # number of parallel installs |
| M <- min(M, length(pkgs)) |
| library(parallel) |
| unlink("install_log") |
| cl <- makeCluster(M, outfile = "install_log") |
| clusterExport(cl, c("tars", "fakes", "gcc")) # variables needed by do_one |
| |
| ## set up available via a call to available.packages() for |
| ## repositories containing all the packages involved and all their |
| ## dependencies. |
| DL <- utils:::.make_dependency_list(pkgs, available, recursive = TRUE) |
| DL <- lapply(DL, function(x) x[x %in% pkgs]) |
| lens <- sapply(DL, length) |
| ready <- names(DL[lens == 0L]) |
| done <- character() # packages already installed |
| n <- length(ready) |
| submit <- function(node, pkg) |
| parallel:::sendCall(cl[[node]], do_one, list(pkg), tag = pkg) |
| for (i in 1:min(n, M)) submit(i, ready[i]) |
| DL <- DL[!names(DL) %in% ready[1:min(n, M)]] |
| av <- if(n < M) (n+1L):M else integer() # available workers |
| while(length(done) < length(pkgs)) { |
| d <- parallel:::recvOneResult(cl) |
| av <- c(av, d$node) |
| done <- c(done, d$tag) |
| OK <- unlist(lapply(DL, function(x) all(x %in% done) )) |
| if (!any(OK)) next |
| p <- names(DL)[OK] |
| m <- min(length(p), length(av)) # >= 1 |
| for (i in 1:m) submit(av[i], p[i]) |
| av <- av[-(1:m)] |
| DL <- DL[!names(DL) %in% p[1:m]] |
| } |
| @ |
| |
| \subsection{Passing \code{...}} |
| |
| The semantics of \code{...} do not fit well with parallel operation, |
| since lazy evaluation may be delayed until the tasks have been sent to |
| the workers. This is no problem in the forking approach, as the |
| information needed for lazy evaluation will be present in the forked workers. |
| |
| For \CRANpkg{snow}-like clusters the trick is to ensure that \code{...} |
| has any promises forced whilst the information is still available. |
| This is how \CRANpkg{boot} does it: |
| <<eval=FALSE>>= |
| fn <- function(r) statistic(data, i[r, ], ...) |
| RR <- sum(R) |
| res <- if (ncpus > 1L && (have_mc || have_snow)) { |
| if (have_mc) { |
| parallel::mclapply(seq_len(RR), fn, mc.cores = ncpus) |
| } else if (have_snow) { |
| list(...) # evaluate any promises |
| if (is.null(cl)) { |
| cl <- parallel::makePSOCKcluster(rep("localhost", ncpus)) |
| if(RNGkind()[1L] == "L'Ecuyer-CMRG") |
| parallel::clusterSetRNGStream(cl) |
| res <- parallel::parLapply(cl, seq_len(RR), fn) |
| parallel::stopCluster(cl) |
| res |
| } else parallel::parLapply(cl, seq_len(RR), fn) |
| } |
| } else lapply(seq_len(RR), fn) |
| @ |
| Note that \code{...} is an argument to \code{boot}, and so after |
| <<eval=FALSE>>= |
| list(...) # evaluate any promises |
| @ |
| it refers to objects within the evaluation frame of \code{boot} and |
| hence the environment of \code{fn} which will therefore be sent to the |
| workers along with \code{fn}. |
| |
| \section{Differences from earlier versions} |
| |
| The support of parallel RNG differs from \CRANpkg{snow}: \CRANpkg{multicore} |
| had no support. |
| |
| \subsection{Differences from multicore} |
| |
| \CRANpkg{multicore} had quite elaborate code to inhibit the Aqua event |
| loop for \command{R.app} and the event loop for the \code{quartz} |
| graphics device. This has been replaced by recording a flag in the |
| \R{} executable for a child process. |
| |
| Functions \code{fork} and \code{kill} have \code{mc} prefixes and are |
| not exported. This avoids confusion with other packages (such as |
| package \CRANpkg{fork}), and note that \code{mckill} is not as general as |
| \code{tools::pskill}. |
| |
| Aliased functions \code{collect} and \code{parallel} are no longer provided. |
| |
| \subsection{Differences from snow} |
| |
| \CRANpkg{snow} set a timeout that exceeded the maximum value required by |
| POSIX, and did not provide a means to set the timeout on the workers. |
| This resulted in process interlocks on Solaris. |
| |
| \code{makeCluster} creates MPI or NWS clusters by calling \CRANpkg{snow}. |
| |
| \code{makePSOCKcluster} has been streamlined, as package \pkg{parallel} is in |
| a known location on all systems, and \command{Rscript} is these days |
| always available. Logging of workers is set up to append to the file, |
| so multiple processes can be logged. |
| |
| \code{parSapply} has been brought into line with \code{sapply}. |
| |
| \code{clusterMap()} gains \code{SIMPLIFY} and \code{USE.NAMES} |
| arguments to make it a parallel version of \code{mapply} and \code{Map}. |
| |
| The timing interface has not been copied. |
| |
| \bibliographystyle{jss} |
| \bibliography{parallel} |
| |
| \end{document} |
| |
| %%% Local Variables: |
| %%% mode: latex |
| %%% TeX-master: t |
| %%% End: |