Apply Operations using Clusters
Description
These functions provide several ways to parallelize computations using
a cluster.
Usage
clusterCall(cl = NULL, fun, ...)
clusterApply(cl = NULL, x, fun, ...)
clusterApplyLB(cl = NULL, x, fun, ...)
clusterEvalQ(cl = NULL, expr)
clusterExport(cl = NULL, varlist, envir = .GlobalEnv)
clusterMap(cl = NULL, fun, ..., MoreArgs = NULL, RECYCLE = TRUE,
SIMPLIFY = FALSE, USE.NAMES = TRUE,
.scheduling = c("static", "dynamic"))
clusterSplit(cl = NULL, seq)
parLapply(cl = NULL, X, fun, ..., chunk.size = NULL)
parSapply(cl = NULL, X, FUN, ..., simplify = TRUE,
USE.NAMES = TRUE, chunk.size = NULL)
parApply(cl = NULL, X, MARGIN, FUN, ..., chunk.size = NULL)
parRapply(cl = NULL, x, FUN, ..., chunk.size = NULL)
parCapply(cl = NULL, x, FUN, ..., chunk.size = NULL)
parLapplyLB(cl = NULL, X, fun, ..., chunk.size = NULL)
parSapplyLB(cl = NULL, X, FUN, ..., simplify = TRUE,
USE.NAMES = TRUE, chunk.size = NULL)
Arguments
cl 
a cluster object, created by this package or by package
snow. If NULL , use the registered default cluster.

fun , FUN

function or character string naming a function.

expr 
expression to evaluate.

seq 
vector to split.

varlist 
character vector of names of objects to export.

envir 
environment from which to export variables

x 
a vector for clusterApply and clusterApplyLB , a
matrix for parRapply and parCapply .

... 
additional arguments to pass to fun or FUN :
beware of partial matching to earlier arguments.

MoreArgs 
additional arguments for fun .

RECYCLE 
logical; if true shorter arguments are recycled.

X 
A vector (atomic or list) for parLapply and
parSapply , an array for parApply .

chunk.size 
scalar number; number of invocations of fun or
FUN in one chunk; a chunk is a unit for scheduling.

MARGIN 
vector specifying the dimensions to use.

simplify , USE.NAMES

logical; see sapply .

SIMPLIFY 
logical; see mapply .

.scheduling 
should tasks be statically allocated to nodes or
dynamic loadbalancing used?

Details
clusterCall
calls a function fun
with identical
arguments ...
on each node.
clusterEvalQ
evaluates a literal expression on each cluster
node. It is a parallel version of evalq
, and is a
convenience function invoking clusterCall
.
clusterApply
calls fun
on the first node with
arguments x[[1]]
and ...
, on the second node with
x[[2]]
and ...
, and so on, recycling nodes as needed.
clusterApplyLB
is a load balancing version of
clusterApply
. If the length n
of x
is not
greater than the number of nodes p
, then a job is sent to
n
nodes. Otherwise the first p
jobs are placed in order
on the p
nodes. When the first job completes, the next job is
placed on the node that has become free; this continues until all jobs
are complete. Using clusterApplyLB
can result in better
cluster utilization than using clusterApply
, but increased
communication can reduce performance. Furthermore, the node that
executes a particular job is nondeterministic. This means that
simulations that assign RNG streams to nodes will not be reproducible.
clusterMap
is a multiargument version of clusterApply
,
analogous to mapply
and Map
. If
RECYCLE
is true shorter arguments are recycled (and either none
or all must be of length zero); otherwise, the result length is the
length of the shortest argument. Nodes are recycled if the length of
the result is greater than the number of nodes. (mapply
always
uses RECYCLE = TRUE
, and has argument SIMPLIFY = TRUE
.
Map
always uses RECYCLE = TRUE
.)
clusterExport
assigns the values on the master R process of
the variables named in varlist
to variables of the same names
in the global environment (aka ‘workspace’) of each node. The
environment on the master from which variables are exported defaults
to the global environment.
clusterSplit
splits seq
into a consecutive piece for
each cluster and returns the result as a list with length equal to the
number of nodes. Currently the pieces are chosen to be close
to equal in length: the computation is done on the master.
parLapply
, parSapply
, and parApply
are parallel
versions of lapply
, sapply
and apply
. Chunks of
computation are statically allocated to nodes using clusterApply
.
By default, the number of chunks is the same as the number of nodes.
parLapplyLB
, parSapplyLB
are loadbalancing versions,
intended for use when applying FUN
to different elements of
X
takes quite variable amounts of time, and either the function is
deterministic or reproducible results are not required. Chunks of
computation are allocated dynamically to nodes using
clusterApplyLB
. From R 3.5.0, the default number of chunks is
twice the number of nodes. Before R 3.5.0, the (fixed) number of chunks
was the same as the number of nodes. As for clusterApplyLB
,
with load balancing the node that executes a particular job is
nondeterministic and simulations that assign RNG streams to nodes
will not be reproducible.
parRapply
and parCapply
are parallel row and column
apply
functions for a matrix x
; they may be slightly
more efficient than parApply
but do less postprocessing of the
result.
A chunk size of 0
with static scheduling uses the default (one
chunk per node). With dynamic scheduling, chunk size of 0
has the
same effect as 1
(one invocation of FUN
/fun
per
chunk).
Value
For clusterCall
, clusterEvalQ
and clusterSplit
, a
list with one element per node.
For clusterApply
and clusterApplyLB
, a list the same
length as x
.
clusterMap
follows mapply
.
clusterExport
returns nothing.
parLapply
returns a list the length of X
.
parSapply
and parApply
follow sapply
and
apply
respectively.
parRapply
and parCapply
always return a vector. If
FUN
always returns a scalar result this will be of length the
number of rows or columns: otherwise it will be the concatenation of
the returned values.
An error is signalled on the master if any of the workers produces an
error.
Note
These functions are almost identical to those in package snow.
Two exceptions: parLapply
has argument X
not x
for consistency with lapply
, and
parSapply
has been updated to match sapply
.
Author(s)
Luke Tierney and R Core.
Derived from the snow package.
Examples
cl < makeCluster(getOption("cl.cores", 2))
clusterApply(cl, 1:2, get("+"), 3)
xx < 1
clusterExport(cl, "xx")
clusterCall(cl, function(y) xx + y, 2)
clusterMap(cl, function(x, y) seq_len(x) + y,
c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = 10))
parSapply(cl, 1:20, get("+"), 3)
clusterEvalQ(cl, {
library(boot)
cd4.rg < function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle < list(m = colMeans(cd4), v = var(cd4))
NULL
})
res < clusterEvalQ(cl, boot(cd4, corr, R = 100,
sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle))
library(boot)
cd4.boot < do.call(c, res)
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)
library(boot)
run1 < function(...) {
library(boot)
cd4.rg < function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v)
cd4.mle < list(m = colMeans(cd4), v = var(cd4))
boot(cd4, corr, R = 500, sim = "parametric",
ran.gen = cd4.rg, mle = cd4.mle)
}
cl < makeCluster(mc < getOption("cl.cores", 2))
clusterSetRNGStream(cl, 123)
cd4.boot < do.call(c, parLapply(cl, seq_len(mc), run1))
boot.ci(cd4.boot, type = c("norm", "basic", "perc"),
conf = 0.9, h = atanh, hinv = tanh)
stopCluster(cl)