Title: | Support for Parallel Computation in R |
---|---|
Description: | Support for parallel computation, including by forking (taken from package multicore), by sockets (taken from package snow) and random-number generation. |
Authors: | R Core Team |
Maintainer: | R Core Team <[email protected]> |
License: | Part of R 4.4.1 |
Version: | 4.4.1 |
Built: | 2024-06-15 17:27:47 UTC |
Source: | base |
Support for parallel computation, including random-number generation.
This package was first included with R 2.14.0 in 2011.
There is support for multiple RNG streams with the
‘"L'Ecuyer-CMRG"’ RNG: see nextRNGStream
.
It contains functionality derived from and pretty much equivalent to that contained in packages multicore (formerly on CRAN, with some low-level functions renamed and not exported) and snow (for socket clusters only, but MPI clusters generated by snow are also supported). There have been many enhancements and bug fixes since 2011.
This package also provides makeForkCluster
to create
socket clusters by forking (not Windows).
For a complete list of exported functions, use
library(help = "parallel")
.
Brian Ripley, Luke Tierney and Simon Urbanek
Maintainer: R Core Team [email protected]
Parallel computation involves launching worker processes: functions
psnice
and pskill
in package tools
provide means to manage such processes.
These functions provide several ways to parallelize computations using a cluster.
clusterCall(cl = NULL, fun, ...) clusterApply(cl = NULL, x, fun, ...) clusterApplyLB(cl = NULL, x, fun, ...) clusterEvalQ(cl = NULL, expr) clusterExport(cl = NULL, varlist, envir = .GlobalEnv) clusterMap(cl = NULL, fun, ..., MoreArgs = NULL, RECYCLE = TRUE, SIMPLIFY = FALSE, USE.NAMES = TRUE, .scheduling = c("static", "dynamic")) clusterSplit(cl = NULL, seq) parLapply(cl = NULL, X, fun, ..., chunk.size = NULL) parSapply(cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, chunk.size = NULL) parApply(cl = NULL, X, MARGIN, FUN, ..., chunk.size = NULL) parRapply(cl = NULL, x, FUN, ..., chunk.size = NULL) parCapply(cl = NULL, x, FUN, ..., chunk.size = NULL) parLapplyLB(cl = NULL, X, fun, ..., chunk.size = NULL) parSapplyLB(cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, chunk.size = NULL)
clusterCall(cl = NULL, fun, ...) clusterApply(cl = NULL, x, fun, ...) clusterApplyLB(cl = NULL, x, fun, ...) clusterEvalQ(cl = NULL, expr) clusterExport(cl = NULL, varlist, envir = .GlobalEnv) clusterMap(cl = NULL, fun, ..., MoreArgs = NULL, RECYCLE = TRUE, SIMPLIFY = FALSE, USE.NAMES = TRUE, .scheduling = c("static", "dynamic")) clusterSplit(cl = NULL, seq) parLapply(cl = NULL, X, fun, ..., chunk.size = NULL) parSapply(cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, chunk.size = NULL) parApply(cl = NULL, X, MARGIN, FUN, ..., chunk.size = NULL) parRapply(cl = NULL, x, FUN, ..., chunk.size = NULL) parCapply(cl = NULL, x, FUN, ..., chunk.size = NULL) parLapplyLB(cl = NULL, X, fun, ..., chunk.size = NULL) parSapplyLB(cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, chunk.size = NULL)
cl |
a cluster object, created by this package or by package
snow. If |
fun , FUN
|
function or character string naming a function. |
expr |
expression to evaluate. |
seq |
vector to split. |
varlist |
character vector of names of objects to export. |
envir |
environment from which to export variables |
x |
a vector for |
... |
additional arguments to pass to |
MoreArgs |
additional arguments for |
RECYCLE |
logical; if true shorter arguments are recycled. |
X |
A vector (atomic or list) for |
chunk.size |
scalar number; number of invocations of |
MARGIN |
vector specifying the dimensions to use. |
simplify , USE.NAMES
|
logical; see |
SIMPLIFY |
logical; see |
.scheduling |
should tasks be statically allocated to nodes or dynamic load-balancing used? |
clusterCall
calls a function fun
with identical
arguments ...
on each node.
clusterEvalQ
evaluates a literal expression on each cluster
node. It is a parallel version of evalq
, and is a
convenience function invoking clusterCall
.
clusterApply
calls fun
on the first node with
arguments x[[1]]
and ...
, on the second node with
x[[2]]
and ...
, and so on, recycling nodes as needed.
clusterApplyLB
is a load balancing version of
clusterApply
. If the length n
of x
is not
greater than the number of nodes p
, then a job is sent to
n
nodes. Otherwise the first p
jobs are placed in order
on the p
nodes. When the first job completes, the next job is
placed on the node that has become free; this continues until all jobs
are complete. Using clusterApplyLB
can result in better
cluster utilization than using clusterApply
, but increased
communication can reduce performance. Furthermore, the node that
executes a particular job is non-deterministic. This means that
simulations that assign RNG streams to nodes will not be reproducible.
clusterMap
is a multi-argument version of clusterApply
,
analogous to mapply
and Map
. If
RECYCLE
is true shorter arguments are recycled (and either none
or all must be of length zero); otherwise, the result length is the
length of the shortest argument. Nodes are recycled if the length of
the result is greater than the number of nodes. (mapply
always
uses RECYCLE = TRUE
, and has argument SIMPLIFY = TRUE
.
Map
always uses RECYCLE = TRUE
.)
clusterExport
assigns the values on the master R process of
the variables named in varlist
to variables of the same names
in the global environment (aka ‘workspace’) of each node. The
environment on the master from which variables are exported defaults
to the global environment.
clusterSplit
splits seq
into a consecutive piece for
each cluster and returns the result as a list with length equal to the
number of nodes. Currently the pieces are chosen to be close
to equal in length: the computation is done on the master.
parLapply
, parSapply
, and parApply
are parallel
versions of lapply
, sapply
and apply
. Chunks of
computation are statically allocated to nodes using clusterApply
.
By default, the number of chunks is the same as the number of nodes.
parLapplyLB
, parSapplyLB
are load-balancing versions,
intended for use when applying FUN
to different elements of
X
takes quite variable amounts of time, and either the function is
deterministic or reproducible results are not required. Chunks of
computation are allocated dynamically to nodes using
clusterApplyLB
. From R 3.5.0, the default number of chunks is
twice the number of nodes. Before R 3.5.0, the (fixed) number of chunks
was the same as the number of nodes. As for clusterApplyLB
,
with load balancing the node that executes a particular job is
non-deterministic and simulations that assign RNG streams to nodes
will not be reproducible.
parRapply
and parCapply
are parallel row and column
apply
functions for a matrix x
; they may be slightly
more efficient than parApply
but do less post-processing of the
result.
A chunk size of 0
with static scheduling uses the default (one
chunk per node). With dynamic scheduling, chunk size of 0
has the
same effect as 1
(one invocation of FUN
/fun
per
chunk).
For clusterCall
, clusterEvalQ
and clusterSplit
, a
list with one element per node.
For clusterApply
and clusterApplyLB
, a list the same
length as x
.
clusterMap
follows mapply
.
clusterExport
returns nothing.
parLapply
returns a list the length of X
.
parSapply
and parApply
follow sapply
and
apply
respectively.
parRapply
and parCapply
always return a vector. If
FUN
always returns a scalar result this will be of length the
number of rows or columns: otherwise it will be the concatenation of
the returned values.
An error is signalled on the master if any of the workers produces an error.
These functions are almost identical to those in package snow.
Two exceptions: parLapply
has argument X
not x
for consistency with lapply
, and
parSapply
has been updated to match sapply
.
Luke Tierney and R Core.
Derived from the snow package.
## Use option cl.cores to choose an appropriate cluster size. cl <- makeCluster(getOption("cl.cores", 2)) clusterApply(cl, 1:2, get("+"), 3) xx <- 1 clusterExport(cl, "xx") clusterCall(cl, function(y) xx + y, 2) ## Use clusterMap like an mapply example clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)) parSapply(cl, 1:20, get("+"), 3) ## A bootstrapping example, which can be done in many ways: clusterEvalQ(cl, { ## set up each worker. Could also use clusterExport() library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) NULL }) res <- clusterEvalQ(cl, boot(cd4, corr, R = 100, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle)) library(boot) cd4.boot <- do.call(c, res) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) stopCluster(cl) ## or library(boot) run1 <- function(...) { library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) boot(cd4, corr, R = 500, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle) } cl <- makeCluster(mc <- getOption("cl.cores", 2)) ## to make this reproducible clusterSetRNGStream(cl, 123) cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1)) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) stopCluster(cl)
## Use option cl.cores to choose an appropriate cluster size. cl <- makeCluster(getOption("cl.cores", 2)) clusterApply(cl, 1:2, get("+"), 3) xx <- 1 clusterExport(cl, "xx") clusterCall(cl, function(y) xx + y, 2) ## Use clusterMap like an mapply example clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)) parSapply(cl, 1:20, get("+"), 3) ## A bootstrapping example, which can be done in many ways: clusterEvalQ(cl, { ## set up each worker. Could also use clusterExport() library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) NULL }) res <- clusterEvalQ(cl, boot(cd4, corr, R = 100, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle)) library(boot) cd4.boot <- do.call(c, res) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) stopCluster(cl) ## or library(boot) run1 <- function(...) { library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) boot(cd4, corr, R = 500, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle) } cl <- makeCluster(mc <- getOption("cl.cores", 2)) ## to make this reproducible clusterSetRNGStream(cl, 123) cd4.boot <- do.call(c, parLapply(cl, seq_len(mc), run1)) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) stopCluster(cl)
Attempt to detect the number of CPU cores on the current host.
detectCores(all.tests = FALSE, logical = TRUE)
detectCores(all.tests = FALSE, logical = TRUE)
all.tests |
Logical: if true apply all known tests. |
logical |
Logical: if possible, use the number of physical CPUs/cores
(if |
This attempts to detect the number of available CPU cores.
It has methods to do so for Linux, macOS, FreeBSD, OpenBSD, Solaris
and Windows. detectCores(TRUE)
could be tried on other
Unix-alike systems.
An integer, NA
if the answer is unknown.
Exactly what this represents is OS-dependent: where possible by default it counts logical (e.g., hyperthreaded) CPUs and not physical cores or packages.
Under macOS there is a further distinction between ‘available in the current power management mode’ and ‘could be available this boot’, and this function returns the first.
On Sparc Solaris logical = FALSE
returns the number of physical
cores and logical = TRUE
returns the number of available
hardware threads. (Some Sparc CPUs have multiple cores per CPU, others
have multiple threads per core and some have both.) For example, the
UltraSparc T2 CPU in the former CRAN check server was a single
physical CPU with 8 cores, and each core supports 8 hardware threads.
So detectCores(logical = FALSE)
returns 8, and
detectCores(logical = TRUE)
returns 64.
Where virtual machines are in use, one would hope that the result
for logical = TRUE
represents the number of CPUs available (or
potentially available) to that particular VM.
This is not suitable for use directly for the mc.cores
argument
of mclapply
nor specifying the number of cores in
makeCluster
. First because it may return NA
, second
because it does not give the number of allowed cores, and third
because on Sparc Solaris and some Windows boxes it is not reasonable
to try to use all the logical CPUs at once.
Simon Urbanek and Brian Ripley
detectCores() detectCores(logical = FALSE)
detectCores() detectCores(logical = FALSE)
Creates a set of copies of R running in parallel and communicating over sockets.
makeCluster(spec, type, ...) makePSOCKcluster(names, ...) makeForkCluster(nnodes = getOption("mc.cores", 2L), ...) stopCluster(cl = NULL) setDefaultCluster(cl = NULL) getDefaultCluster()
makeCluster(spec, type, ...) makePSOCKcluster(names, ...) makeForkCluster(nnodes = getOption("mc.cores", 2L), ...) stopCluster(cl = NULL) setDefaultCluster(cl = NULL) getDefaultCluster()
spec |
A specification appropriate to the type of cluster. |
names |
Either a character vector of host names on which to run the worker copies of R, or a positive integer (in which case that number of copies is run on ‘localhost’). |
nnodes |
The number of nodes to be forked. |
type |
One of the supported types: see ‘Details’. |
... |
Options to be passed to the function spawning the workers. See ‘Details’. |
cl |
an object of class |
makeCluster
creates a cluster of one of the supported types.
The default type, "PSOCK"
, calls makePSOCKcluster
. Type
"FORK"
calls makeForkCluster
. Other types are passed to
package snow.
makePSOCKcluster
is an enhanced version of
makeSOCKcluster
in package snow. It runs
Rscript
on the specified host(s) to set up a worker process
which listens on a socket for expressions to evaluate, and returns the
results (as serialized objects).
makeForkCluster
is merely a stub on Windows. On Unix-alike
platforms it creates the worker process by forking.
The workers are most often running on the same host as the master, when no options need be set.
Several options are supported (mainly for makePSOCKcluster
):
master
The host name of the master, as known to the workers. This may not be the same as it is known to the master, and on private subnets it may be necessary to specify this as a numeric IP address. For example, macOS is likely to detect a machine as ‘somename.local’, a name known only to itself.
port
The port number for the socket connection,
default taken from the environment variable R_PARALLEL_PORT,
then a randomly chosen port in the range 11000:11999
.
timeout
The timeout in seconds for that port. This is the maximum time of zero communication between master and worker before failing. Default is 30 days (and the POSIX standard only requires values up to 31 days to be supported).
setup_timeout
The maximum number of seconds a worker attempts to connect to master before failing. Default is 2 minutes. The waiting time before the next attempt starts at 0.1 seconds and is incremented 50% after each retry.
outfile
Where to direct the stdout
and
stderr
connection output from the workers.
""
indicates no redirection (which may only be useful for
workers on the local machine).
Defaults to ‘/dev/null’ (‘nul:’ on Windows). The other
possibility is a file path on the worker's host.
Files will be opened in append mode, as all workers log to the
same file.
homogeneous
Logical, default true. See ‘Note’.
rscript
See ‘Note’.
rscript_args
Character vector of additional
arguments for Rscript
such as --no-environ.
renice
A numerical ‘niceness’ to set for the
worker processes, e.g. 15
for a low priority.
OS-dependent: see psnice
for details.
rshcmd
The command to be run on the master to launch a
process on another host. Defaults to ssh
.
user
The user name to be used when communicating with another host.
manual
Logical. If true the workers will need to be run manually.
methods
Logical. If true (default) the workers will load the methods package: not loading it saves ca 30% of the startup CPU time of the cluster.
useXDR
Logical. If true (default) serialization will use XDR: where large amounts of data are to be transferred and all the nodes are little-endian, communication may be substantially faster if this is set to false.
setup_strategy
Character. If "parallel"
(default)
workers will be started in parallel during cluster setup when this is
possible, which is now for homogeneous "PSOCK"
clusters with
all workers started automatically (manual = FALSE
) on the local
machine. Workers will be started sequentially on other clusters, on
all clusters with setup_strategy = "sequential"
and on R 3.6.0
and older. This option is for expert use only (e.g. debugging) and
may be removed in future versions of R.
Function makeForkCluster
creates a socket cluster by forking
(and hence is not available on Windows). It supports options
port
, timeout
and outfile
, and always uses
useXDR = FALSE
. It is strongly discouraged to use the
"FORK"
cluster with GUI front-ends or multi-threaded libraries.
See mcfork
for details.
It is good practice to shut down the workers by calling
stopCluster
: however the workers will terminate
themselves once the socket on which they are listening for commands
becomes unavailable, which it should if the master R session is
completed (or its process dies).
Function setDefaultCluster
registers a cluster as the default one
for the current session. Using setDefaultCluster(NULL)
removes
the registered cluster, as does stopping that cluster.
For the cluster creators, an object of class
c("SOCKcluster", "cluster")
.
For the default cluster setter and getter, the registered default
cluster or NULL
if there is no such cluster.
Option homogeneous = TRUE
was for years documented as
‘Are all the hosts running identical setups?’, but this was
apparently more restrictive than its author intended and not required
by the code.
The current interpretation of homogeneous = TRUE
is that
Rscript
can be launched using the same path on each worker.
That path is given by the option rscript
and defaults to the
full path to Rscript
on the master. (The workers are not
required to be running the same version of R as the master, nor even
as each other.)
For homogeneous = FALSE
, Rscript
on the workers is
found on their default shell's path.
For the very common usage of running both master and worker on a single multi-core host, the default settings are the appropriate ones.
A socket connection is used to communicate from the master to each worker so the maximum number of connections (default 128 but some will be in use) may need to be increased when the master process is started.
Luke Tierney and R Core.
Derived from the snow package.
mcaffinity
retrieves or sets the CPU affinity mask of the
current process, i.e., the set of CPUs the process is allowed to be
run on. (CPU here means logical CPU which can be CPU, core or
hyperthread unit.)
mcaffinity(affinity = NULL)
mcaffinity(affinity = NULL)
affinity |
specification of the CPUs to lock this process to
(numeric vector) or |
mcaffinity
can be used to obtain (affinity = NULL
)
or set the CPU affinity mask of the current process. The affinity mask
is a list of integer CPU identifiers (starting from 1) that this
process is allowed to run on. Not all systems provide user access to
the process CPU affinity, in cases where no support is present at all
mcaffinity()
will return NULL
. Some systems may take
into account only the number of CPUs present in the mask.
Typically, it is legal to specify larger set than the number of logical CPUs (but at most as many as the OS can handle) and the system will return back the actually present set.
NULL
if CPU affinity is not supported by the system or an
integer vector with the set of CPUs in the active affinity mask for
this process (this may be different than affinity
).
Simon Urbanek.
These are low-level support functions for the forking approach.
They are not available on Windows, and not exported from the namespace.
children(select) readChild(child) readChildren(timeout = 0) selectChildren(children = NULL, timeout = 0) sendChildStdin(child, what) sendMaster(what, raw.asis = TRUE) mckill(process, signal = 2L)
children(select) readChild(child) readChildren(timeout = 0) selectChildren(children = NULL, timeout = 0) sendChildStdin(child, what) sendMaster(what, raw.asis = TRUE) mckill(process, signal = 2L)
select |
if omitted, all active children are returned, otherwise
|
child |
child process (object of the class |
timeout |
timeout (in seconds, fractions supported) to wait for a response before giving up. |
children |
list of child processes or a single child process
object or a vector of process IDs or |
what |
For For |
raw.asis |
logical, if |
process |
process (object of the class |
signal |
integer: signal to send. Values of 2 (SIGINT), 9
(SIGKILL) and 15 (SIGTERM) are pretty much portable, but for maximal
portability use |
children
returns currently active children.
readChild
reads data (sent by sendMaster
) from a given
child process.
selectChildren
checks children for available data.
readChildren
checks all children for available data and reads
from the first child that has available data.
sendChildStdin
sends a string (or data) to one or more child's
standard input. Note that if the master session was interactive, it
will also be echoed on the standard output of the master process
(unless disabled). The function is vector-compatible, so you can
specify child
as a list or a vector of process IDs.
sendMaster
sends data from the child to the master process.
mckill
sends a signal to a child process: it is equivalent to
pskill
in package tools.
children
returns a (possibly empty) list of objects of class
"process"
, the process ID.
readChild
and readChildren
return a raw vector with a
"pid"
attribute if data were available, an integer vector of
length one with the process ID if a child terminated or NULL
if the child no longer exists (no children at all for
readChildren
).
selectChildren
returns TRUE
is the timeout was reached,
FALSE
if an error occurred (e.g., if the master process was
interrupted) or an integer vector of process IDs with children that
have data available, or NULL
if there are no children.
sendChildStdin
returns a vector of TRUE
values (one for
each member of child
) or throws an error.
sendMaster
returns TRUE
or throws an error.
mckill
returns TRUE
.
This is a very low-level interface for expert use only: it not regarded as part of the R API and subject to change without notice.
sendMaster
, readChild
and sendChildStdin
did not
support long vectors prior to R 3.4.0 and so were limited to
bytes (and still are on 32-bit platforms).
Simon Urbanek and R Core.
Derived from the multicore package formerly on CRAN.
mcfork
, sendMaster
, mcparallel
## Not run: p <- mcparallel(scan(n = 1, quiet = TRUE)) sendChildStdin(p, "17.4\n") mccollect(p)[[1]] ## End(Not run)
## Not run: p <- mcparallel(scan(n = 1, quiet = TRUE)) sendChildStdin(p, "17.4\n") mccollect(p)[[1]] ## End(Not run)
These are low-level functions, not available on Windows, and not exported from the namespace.
mcfork
creates a new child process as a copy of the current R process.
mcexit
closes the current child process, informing the master
process as necessary.
mcfork(estranged = FALSE) mcexit(exit.code = 0L, send = NULL)
mcfork(estranged = FALSE) mcexit(exit.code = 0L, send = NULL)
estranged |
logical, if |
exit.code |
process exit code. By convention |
send |
if not |
The mcfork
function provides an interface to the fork
system call. In addition it sets up a pipe between the master and
child process that can be used to send data from the child process
to the master (see sendMaster
) and child's ‘stdin’ is
re-mapped to another pipe held by the master process (see
sendChildStdin
).
If you are not familiar with the fork
system call, do not use
this function directly as it leads to very complex inter-process
interactions amongst the R processes involved.
In a nutshell fork
spawns a copy (child) of the current
process, that can work in parallel to the master (parent)
process. At the point of forking both processes share exactly the
same state including the workspace, global options, loaded packages
etc. Forking is relatively cheap in modern operating systems and no
real copy of the used memory is created, instead both processes
share the same memory and only modified parts are copied. This makes
mcfork
an ideal tool for parallel processing since there is no
need to setup the parallel working environment, data and code is
shared automatically from the start.
mcexit
is to be run in the child process. It sends send
to the master (unless NULL
) and then shuts down the child
process. The child can also be shut down by sending it the signal
SIGUSR1
, as is done by the unexported function
parallel:::rmChild
.
mcfork
returns an object of the class "childProcess"
to
the master and of class "masterProcess"
to the child: both the
classes inherit from class "process"
. If estranged
is
set to TRUE
then the child process will be of the class
"estrangedProcess"
and cannot communicate with the master
process nor will it show up on the list of children. These are lists
with components pid
(the process id of the other
process) and a vector fd
of the two file descriptor numbers
for ends in the current process of the inter-process pipes.
mcexit
never returns.
It is strongly discouraged to use mcfork
and the
higher-level functions which rely on it (e.g., mcparallel
,
mclapply
and pvec
) in GUI or embedded environments,
because it leads to several processes sharing the same GUI which will
likely cause chaos (and possibly crashes). Child processes should
never use on-screen graphics devices. Some precautions have been
taken to make this usable in R.app
on macOS, but users of
third-party front-ends should consult their documentation.
This can also apply to other connections (e.g., to an X server) created before forking, and to files opened by e.g. graphics devices.
Note that tcltk counts as a GUI for these purposes since
Tcl
runs an event loop. That event loop is inhibited in a
child process but there could still be problems with Tk graphical
connections.
It is strongly discouraged to use mcfork
and the
higher-level functions in any multi-threaded R process (with additional
threads created by a third-party library or package). Such use can lead
to deadlocks or crashes, because the child process created by
mcfork
may not be able to access resources locked in the parent or
may see an inconsistent version of global data (mcfork
runs system
call fork
without exec
).
If in doubt, it is safer to use a non-FORK cluster (see
makeCluster
, clusterApply
).
This is a very low-level API for expert use only.
Simon Urbanek and R Core.
Derived from the multicore package formerly on CRAN.
## This will work when run as an example, but not when pasted in. p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { cat("I'm a child! ", Sys.getpid(), "\n") parallel:::mcexit(,"I was a child") } cat("I'm the master\n") unserialize(parallel:::readChildren(1.5))
## This will work when run as an example, but not when pasted in. p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { cat("I'm a child! ", Sys.getpid(), "\n") parallel:::mcexit(,"I was a child") } cat("I'm the master\n") unserialize(parallel:::readChildren(1.5))
lapply
and mapply
using Forkingmclapply
is a parallelized version of lapply
,
it returns a list of the same length as X
, each element of
which is the result of applying FUN
to the corresponding
element of X
.
It relies on forking and hence is not available on Windows unless
mc.cores = 1
.
mcmapply
is a parallelized version of mapply
, and
mcMap
corresponds to Map
.
mclapply(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE, mc.allow.recursive = TRUE, affinity.list = NULL) mcmapply(FUN, ..., MoreArgs = NULL, SIMPLIFY = TRUE, USE.NAMES = TRUE, mc.preschedule = TRUE, mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE, affinity.list = NULL) mcMap(f, ...)
mclapply(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE, mc.allow.recursive = TRUE, affinity.list = NULL) mcmapply(FUN, ..., MoreArgs = NULL, SIMPLIFY = TRUE, USE.NAMES = TRUE, mc.preschedule = TRUE, mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE, affinity.list = NULL) mcMap(f, ...)
X |
a vector (atomic or list) or an expressions vector. Other
objects (including classed objects) will be coerced by
|
FUN |
the function to be applied to ( |
f |
the function to be applied in parallel to |
... |
For |
MoreArgs , SIMPLIFY , USE.NAMES
|
see |
mc.preschedule |
if set to |
mc.set.seed |
See |
mc.silent |
if set to |
mc.cores |
The number of cores to use, i.e. at most how many child processes will be run simultaneously. The option is initialized from environment variable MC_CORES if set. Must be at least one, and parallelization requires at least two cores. |
mc.cleanup |
if set to |
mc.allow.recursive |
Unless true, calling |
affinity.list |
a vector (atomic or list) containing the CPU
affinity mask for each element of |
mclapply
is a parallelized version of lapply
,
provided mc.cores > 1
: for mc.cores == 1
(and the
affinity.list
is NULL
) it simply calls lapply
.
By default (mc.preschedule = TRUE
) the input X
is split
into as many parts as there are cores (currently the values are spread
across the cores sequentially, i.e. first value to core 1,
second to core 2, ... (core + 1)-th value to core 1 etc.) and then
one process is forked to each core and the results are collected.
Without prescheduling, a separate job is forked for each value of
X
. To ensure that no more than mc.cores
jobs are
running at once, once that number has been forked the master process
waits for a child to complete before the next fork.
Due to the parallel nature of the execution random numbers are not
sequential (in the random number sequence) as they would be when using
lapply
. They are sequential for each forked process, but not
all jobs as a whole. See mcparallel
or the package's
vignette for ways to make the results reproducible with
mc.preschedule = TRUE
.
Note: the number of file descriptors (and processes) is usually
limited by the operating system, so you may have trouble using more
than 100 cores or so (see ulimit -n
or similar in your OS
documentation) unless you raise the limit of permissible open file
descriptors (fork will fail with error "unable to create a pipe"
).
Prior to R 3.4.0 and on a 32-bit platform, the serialized
result from each forked process is limited to
bytes. (Returning very large results via serialization is
inefficient and should be avoided.)
affinity.list
can be used to run elements of X
on
specific CPUs. This can be helpful, if elements of X
have a
high variance of completion time or if the hardware architecture is
heterogeneous. It also enables the development of scheduling
strategies for optimizing the overall runtime of parallel jobs. If
affinity.list
is set, the mc.core
parameter is replaced
with the number of CPU ids used in the affinity masks.
For mclapply
, a list of the same length as X
and named
by X
.
For mcmapply
, a list, vector or array: see
mapply
.
For mcMap
, a list.
Each forked process runs its job inside try(..., silent = TRUE)
so if errors occur they will be stored as class "try-error"
objects in the return value and a warning will be given. Note that
the job will typically involve more than one value of X
and
hence a "try-error"
object will be returned for all the values
involved in the failure, even if not all of them failed. If any forked
process is killed or fails to deliver a result for any reason, values
involved in the failure will be NULL
. To allow detection of such
errors, FUN
should not return NULL
. As of R 4.0, the
return value of mcmapply
is always a list when it needs to contain
"try-error"
objects (SIMPLIFY
is overridden to FALSE
).
It is strongly discouraged to use these functions in GUI or embedded environments, because it leads to several processes sharing the same GUI which will likely cause chaos (and possibly crashes). Child processes should never use on-screen graphics devices.
Some precautions have been taken to make this usable in
R.app
on macOS, but users of third-party front-ends
should consult their documentation.
Note that tcltk counts as a GUI for these purposes since
Tcl
runs an event loop. That event loop
is inhibited in a child process but there could still be problems with
Tk graphical connections.
It is strongly discouraged to use these functions with
multi-threaded libraries or packages (see mcfork
for more
details). If in doubt, it is safer to use a non-FORK cluster (see
makeCluster
, clusterApply
).
Simon Urbanek and R Core.
The affinity.list
feature by Helena Kotthaus and Andreas Lang,
TU Dortmund.
Derived from the multicore package formerly on CRAN.
mcparallel
, pvec
,
parLapply
, clusterMap
.
simplify2array
for results like sapply
.
simplify2array(mclapply(rep(4, 5), rnorm)) # use the same random numbers for all values set.seed(1) simplify2array(mclapply(rep(4, 5), rnorm, mc.preschedule = FALSE, mc.set.seed = FALSE)) ## Contrast this with the examples for clusterCall library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) mc <- getOption("mc.cores", 2) run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle) ## To make this reproducible: set.seed(123, "L'Ecuyer") res <- mclapply(seq_len(mc), run1) cd4.boot <- do.call(c, res) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) ## Usage of the affinity.list parameter A <- runif(2500000,0,100) B <- runif(2500000,0,100) C <- runif(5000000,0,100) first <- function(i) head(sort(i), n = 1) # Restict all elements of X to run on CPU 1 and 2 affL <- list(c(1,2), c(1,2), c(1,2)) mclapply(list(A, A, A), first, mc.preschedule = FALSE, affinity.list = affL) # Completion times are assumed to have a high variance # To optimize the overall execution time elements of X are scheduled to suitable CPUs # Assuming that the runtime for C is as long as the runtime of A plus B # mapping: A to 1 , B to 1, C to 2 X <- list(A, B, C) affL <- c(1, 1, 2) mclapply(X, first, mc.preschedule = FALSE, affinity.list = affL)
simplify2array(mclapply(rep(4, 5), rnorm)) # use the same random numbers for all values set.seed(1) simplify2array(mclapply(rep(4, 5), rnorm, mc.preschedule = FALSE, mc.set.seed = FALSE)) ## Contrast this with the examples for clusterCall library(boot) cd4.rg <- function(data, mle) MASS::mvrnorm(nrow(data), mle$m, mle$v) cd4.mle <- list(m = colMeans(cd4), v = var(cd4)) mc <- getOption("mc.cores", 2) run1 <- function(...) boot(cd4, corr, R = 500, sim = "parametric", ran.gen = cd4.rg, mle = cd4.mle) ## To make this reproducible: set.seed(123, "L'Ecuyer") res <- mclapply(seq_len(mc), run1) cd4.boot <- do.call(c, res) boot.ci(cd4.boot, type = c("norm", "basic", "perc"), conf = 0.9, h = atanh, hinv = tanh) ## Usage of the affinity.list parameter A <- runif(2500000,0,100) B <- runif(2500000,0,100) C <- runif(5000000,0,100) first <- function(i) head(sort(i), n = 1) # Restict all elements of X to run on CPU 1 and 2 affL <- list(c(1,2), c(1,2), c(1,2)) mclapply(list(A, A, A), first, mc.preschedule = FALSE, affinity.list = affL) # Completion times are assumed to have a high variance # To optimize the overall execution time elements of X are scheduled to suitable CPUs # Assuming that the runtime for C is as long as the runtime of A plus B # mapping: A to 1 , B to 1, C to 2 X <- list(A, B, C) affL <- c(1, 1, 2) mclapply(X, first, mc.preschedule = FALSE, affinity.list = affL)
These functions are based on forking and so are not available on Windows.
mcparallel
starts a parallel R process which evaluates the
given expression.
mccollect
collects results from one or more parallel processes.
mcparallel(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE) mccollect(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
mcparallel(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE) mccollect(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
expr |
expression to evaluate (do not use any on-screen
devices or GUI elements in this code, see |
name |
an optional name (character vector of length one) that can be associated with the job. |
mc.set.seed |
logical: see section ‘Random numbers’. |
silent |
if set to |
mc.affinity |
either a numeric vector specifying CPUs to restrict
the child process to (1-based) or |
mc.interactive |
logical, if |
detached |
logical, if |
jobs |
list of jobs (or a single job) to collect results
for. Alternatively |
wait |
if set to |
timeout |
timeout (in seconds) to check for job results – applies
only if |
intermediate |
|
mcparallel
evaluates the expr
expression in parallel to
the current R process. Everything is shared read-only (or in fact
copy-on-write) between the parallel process and the current process,
i.e. no side-effects of the expression affect the main process. The
result of the parallel execution can be collected using
mccollect
function.
mccollect
function collects any available results from parallel
jobs (or in fact any child process). If wait
is TRUE
then collect
waits for all specified jobs to finish before
returning a list containing the last reported result for each
job. If wait
is FALSE
then mccollect
merely
checks for any results available at the moment and will not wait for
jobs to finish. If jobs
is specified, jobs not listed there
will not be affected or acted upon.
Note: If expr
uses low-level multicore functions such
as sendMaster
a single job can deliver results
multiple times and it is the responsibility of the user to interpret
them correctly. mccollect
will return NULL
for a
terminating job that has sent its results already after which the
job is no longer available.
Jobs are identified by process IDs (even when referred to as job objects),
which are reused by the operating system. Detached jobs created by
mcparallel
can thus never be safely referred to by their process
IDs nor job objects. Non-detached jobs are guaranteed to exist until
collected by mccollect
, even if crashed or terminated by a signal.
Once collected by mccollect
, a job is regarded as detached, and
thus must no longer be referred to by its process ID nor its job object.
With wait = TRUE
, all jobs passed to mccollect
are
collected. With wait = FALSE
, the collected jobs are given as
names of the result vector, and thus in subsequent calls to
mccollect
these jobs must be excluded. Job objects should be used
in preference of process IDs whenever accepted by the API.
The mc.affinity
parameter can be used to try to restrict
the child process to specific CPUs. The availability and the extent of
this feature is system-dependent (e.g., some systems will only
consider the CPU count, others will ignore it completely).
mcparallel
returns an object of the class "parallelJob"
which inherits from "childProcess"
(see the ‘Value’
section of the help for mcfork
). If argument
name
was supplied this will have an additional component
name
.
mccollect
returns any results that are available in a list. The
results will have the same order as the specified jobs. If there are
multiple jobs and a job has a name it will be used to name the
result, otherwise its process ID will be used. If none of the
specified children are still running, it returns NULL
.
If mc.set.seed = FALSE
, the child process has the same initial
random number generator (RNG) state as the current R session. If the
RNG has been used (or .Random.seed
was restored from a saved
workspace), the child will start drawing random numbers at the same
point as the current session. If the RNG has not yet been used, the
child will set a seed based on the time and process ID when it first
uses the RNG: this is pretty much guaranteed to give a different
random-number stream from the current session and any other child
process.
The behaviour with mc.set.seed = TRUE
is different only if
RNGkind("L'Ecuyer-CMRG")
has been selected. Then each
time a child is forked it is given the next stream (see
nextRNGStream
). So if you select that generator, set a
seed and call mc.reset.stream
just before the first use
of mcparallel
the results of simulations will be reproducible
provided the same tasks are given to the first, second, ...
forked process.
Prior to R 3.4.0 and on a 32-bit platform, the serialized
result from each forked process is limited to bytes. (Returning very large results via serialization is
inefficient and should be avoided.)
Simon Urbanek and R Core.
Derived from the multicore package formerly on CRAN. (but with different handling of the RNG stream).
p <- mcparallel(1:10) q <- mcparallel(1:20) # wait for both jobs to finish and collect all results res <- mccollect(list(p, q)) p <- mcparallel(1:10) mccollect(p, wait = FALSE, 10) # will retrieve the result (since it's fast) mccollect(p, wait = FALSE) # will signal the job as terminating mccollect(p, wait = FALSE) # there is no longer such a job # a naive parallel lapply can be created using mcparallel alone: jobs <- lapply(1:10, function(x) mcparallel(rnorm(x), name = x)) mccollect(jobs)
p <- mcparallel(1:10) q <- mcparallel(1:20) # wait for both jobs to finish and collect all results res <- mccollect(list(p, q)) p <- mcparallel(1:10) mccollect(p, wait = FALSE, 10) # will retrieve the result (since it's fast) mccollect(p, wait = FALSE) # will signal the job as terminating mccollect(p, wait = FALSE) # there is no longer such a job # a naive parallel lapply can be created using mcparallel alone: jobs <- lapply(1:10, function(x) mcparallel(rnorm(x), name = x)) mccollect(jobs)
pvec
parallelizes the execution of a function on vector elements
by splitting the vector and submitting each part to one core. The
function must be a vectorized map, i.e. it takes a vector input and
creates a vector output of exactly the same length as the input which
doesn't depend on the partition of the vector.
It relies on forking and hence is not available on Windows unless
mc.cores = 1
.
pvec(v, FUN, ..., mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE)
pvec(v, FUN, ..., mc.set.seed = TRUE, mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE)
v |
vector to operate on |
FUN |
function to call on each part of the vector |
... |
any further arguments passed to |
mc.set.seed |
See |
mc.silent |
if set to |
mc.cores |
The number of cores to use, i.e. at most how many child processes will be run simultaneously. Must be at least one, and at least two for parallel operation. The option is initialized from environment variable MC_CORES if set. |
mc.cleanup |
See the description of this argument in
|
pvec
parallelizes FUN(x, ...)
where FUN
is a
function that returns a vector of the same length as
x
. FUN
must also be pure (i.e., without side-effects)
since side-effects are not collected from the parallel processes. The
vector is split into nearly identically sized subvectors on which
FUN
is run. Although it is in principle possible to use
functions that are not necessarily maps, the interpretation would be
case-specific as the splitting is in theory arbitrary (a warning is
given in such cases).
The major difference between pvec
and mclapply
is
that mclapply
will run FUN
on each element separately
whereas pvec
assumes that c(FUN(x[1]), FUN(x[2]))
is
equivalent to FUN(x[1:2])
and thus will split into as many
calls to FUN
as there are cores (or elements, if fewer), each
handling a subset vector. This makes it more efficient than
mclapply
but requires the above assumption on FUN
.
If mc.cores == 1
this evaluates FUN(v, ...)
in the
current process.
The result of the computation – in a successful case it should be of
the same length as v
. If an error occurred or the function was
not a map the result may be shorter or longer, and a warning is given.
Due to the nature of the parallelization, error handling does not
follow the usual rules since errors will be returned as strings and
results from killed child processes will show up simply as
non-existent data. Therefore it is the responsibility of the user to
check the length of the result to make sure it is of the correct size.
pvec
raises a warning if that is the case since it does not
know whether such an outcome is intentional or not.
See mcfork
for the inadvisability of using this with
GUI front-ends and multi-threaded libraries.
Simon Urbanek and R Core.
Derived from the multicore package formerly on CRAN.
mcparallel
, mclapply
,
parLapply
, clusterMap
.
x <- pvec(1:1000, sqrt) stopifnot(all(x == sqrt(1:1000))) # One use is to convert date strings to unix time in large datasets # as that is a relatively slow operation. # So let's get some random dates first # (A small test only with 2 cores: set options("mc.cores") # and increase N for a larger-scale test.) N <- 1e5 dates <- sprintf('%04d-%02d-%02d', as.integer(2000+rnorm(N)), as.integer(runif(N, 1, 12)), as.integer(runif(N, 1, 28))) system.time(a <- as.POSIXct(dates)) # But specifying the format is faster system.time(a <- as.POSIXct(dates, format = "%Y-%m-%d")) # pvec ought to be faster, but system overhead can be high system.time(b <- pvec(dates, as.POSIXct, format = "%Y-%m-%d")) stopifnot(all(a == b)) # using mclapply for this would much slower because each value # will require a separate call to as.POSIXct() # as lapply(dates, as.POSIXct) does system.time(c <- unlist(mclapply(dates, as.POSIXct, format = "%Y-%m-%d"))) stopifnot(all(a == c))
x <- pvec(1:1000, sqrt) stopifnot(all(x == sqrt(1:1000))) # One use is to convert date strings to unix time in large datasets # as that is a relatively slow operation. # So let's get some random dates first # (A small test only with 2 cores: set options("mc.cores") # and increase N for a larger-scale test.) N <- 1e5 dates <- sprintf('%04d-%02d-%02d', as.integer(2000+rnorm(N)), as.integer(runif(N, 1, 12)), as.integer(runif(N, 1, 28))) system.time(a <- as.POSIXct(dates)) # But specifying the format is faster system.time(a <- as.POSIXct(dates, format = "%Y-%m-%d")) # pvec ought to be faster, but system overhead can be high system.time(b <- pvec(dates, as.POSIXct, format = "%Y-%m-%d")) stopifnot(all(a == b)) # using mclapply for this would much slower because each value # will require a separate call to as.POSIXct() # as lapply(dates, as.POSIXct) does system.time(c <- unlist(mclapply(dates, as.POSIXct, format = "%Y-%m-%d"))) stopifnot(all(a == c))
This is an R re-implementation of Pierre L'Ecuyer's ‘RngStreams’ multiple streams of pseudo-random numbers.
nextRNGStream(seed) nextRNGSubStream(seed) clusterSetRNGStream(cl = NULL, iseed) mc.reset.stream()
nextRNGStream(seed) nextRNGSubStream(seed) clusterSetRNGStream(cl = NULL, iseed) mc.reset.stream()
seed |
An integer vector of length 7 as given by
|
cl |
A cluster from this package or package snow, or (if
|
iseed |
An integer to be supplied to |
The ‘RngStreams’ interface works with (potentially) multiple streams of pseudo-random numbers: this is particularly suitable for working with parallel computations since each task can be assigned a separate RNG stream.
This uses as its underlying generator RNGkind("L'Ecuyer-CMRG")
,
of L'Ecuyer (1999), which has a seed vector of 6 (signed) integers and a
period of around . Each ‘stream’ is a
subsequence of the period of length
which is in
turn divided into ‘substreams’ of length
.
The idea of L'Ecuyer et al (2002) is to use a separate stream
for each of the parallel computations (which ensures that the random
numbers generated never get into to sync) and the parallel
computations can themselves use substreams if required. The original
interface stores the original seed of the first stream, the original
seed of the current stream and the current seed: this could be
implemented in R, but it is as easy to work by saving the relevant
values of .Random.seed
: see the examples.
clusterSetRNGStream
selects the "L'Ecuyer-CMRG"
RNG and
then distributes streams to the members of a cluster, optionally
setting the seed of the streams by set.seed(iseed)
(otherwise
they are set from the current seed of the master process: after
selecting the L'Ecuyer generator).
When not on Windows, Calling mc.reset.stream()
after setting
the L'Ecuyer random number generator and seed makes runs from
mcparallel(mc.set.seed = TRUE)
reproducible. This is
done internally in mclapply
and pvec
.
(Note that it does not set the seed in the master process, so does not
affect the fallback-to-serial versions of these functions.)
For nextRNGStream
and nextRNGSubStream
,
a value which can be assigned to .Random.seed
.
Interfaces to L'Ecuyer's C code are available in CRAN packages rlecuyer and rstream.
Brian Ripley
L'Ecuyer, P. (1999). Good parameters and implementations for combined multiple recursive random number generators. Operations Research, 47, 159–164. doi:10.1287/opre.47.1.159.
L'Ecuyer, P., Simard, R., Chen, E. J. and Kelton, W. D. (2002). An object-oriented random-number package with many long streams and substreams. Operations Research, 50, 1073–1075. doi:10.1287/opre.50.6.1073.358.
RNG
for fuller details of R's built-in random number
generators.
The vignette for package parallel.
RNGkind("L'Ecuyer-CMRG") set.seed(123) (s <- .Random.seed) ## do some work involving random numbers. nextRNGStream(s) nextRNGSubStream(s)
RNGkind("L'Ecuyer-CMRG") set.seed(123) (s <- .Random.seed) ## do some work involving random numbers. nextRNGStream(s) nextRNGSubStream(s)
This divides up 1:nx
into ncl
lists of approximately
equal size, as a way to allocate tasks to nodes in a cluster.
It is mainly for internal use, but some package authors have found it useful.
splitIndices(nx, ncl)
splitIndices(nx, ncl)
nx |
Number of tasks. |
ncl |
Number of cluster nodes. |
A list of length ncl
, each element being an integer vector.
splitIndices(20, 3)
splitIndices(20, 3)