flowchart LR
start((Start)) --> one["`Create<br>a cluster`"]
one --> two
subgraph two[Prepare the session]
direction TB
copy[Copy objects]~~~eval[Evaluate<br>expressions]
eval~~~seed[Set seed]
end
two --> three[Do your<br>call]
three --> four[Stop the<br>cluster]
The parallel R package
This content was originally published in the book “Applied HPC with R” by George G. Vega Yon, Ph.D. You can find the book at https://book-hpc.ggvy.cl
Although R was not built for parallel computing, multiple ways of parallelizing your R code exist. One of these is the parallel package. This R package, shipped with base R, provides various functions to parallelize R code using embarrassingly parallel computing, i.e., a divide-and-conquer-type strategy. The basic idea is to start multiple R sessions (usually called child processes), connect the main session with those, and send them instructions. This section goes over a common workflow to work with R’s parallel.
Parallel workflow
(Usually) We do the following:
Create a
PSOCK/FORK(or other) cluster usingmakePSOCKCluster/makeForkCluster(ormakeCluster). How many child processes will depend on how many threads your computer has. A rule of thumb is to useparallel::detectCores() - 1cores (so you leave one free for the rest of your computer).Copy/prepare each R session (if you are using a
PSOCKcluster):Copy objects with
clusterExport. This would be all the objects that you need in the child sessions.Pass expressions with
clusterEvalQ. This would include loading R packages and other code into the other sessions.Set a seed (if you are doing something that involves randomness)
Do your call:
parApply,parLapply, etc.Stop the cluster with
clusterStop
As we mention later, step 2 will depend on the type of cluster you are using. If you are using a Socket connection (PSOCK cluster), then the spawned R sessions will be completely, fresh (no data or R packages pre-loaded); whereas using a Fork connection (FORK cluster) will copy the current R session, including all objects and loaded packages.
Types of clusters: PSOCK
Can be created with
makePSOCKClusterCreates brand new R Sessions (so nothing is inherited from the master), e.g.
# This creates a cluster with 4 R sessions cl <- makePSOCKCluster(4)Child sessions are connected to the master session via Socket connections
Can be created outside the current computer, i.e., across multiple computers!
Types of clusters: Fork
Fork Cluster
makeForkCluster:Uses OS Forking,
Copies the current R session locally (so everything is inherited from the master up to that point).
Data is only duplicated if altered (need to double check when this happens!)
Not available on Windows.
Other types are available via the function makeCluster from the snow R package (Simple Network of Workstations). These include MPI (Message Passing Interface) clusters and Slurm (Socket) clusters.
A template program
The following code chunk shows a template for using the parallel package in R. You can copy this and comment the bits that you don’t need:
library(parallel)
# 1. CREATING A CLUSTER ----------------
nnodes <- 4L # Could be less or more!
cl <- makePSOCKcluster(nnodes)
# 2. PREPARING THE CLUSTER -------------
# Mostly if using PSOCK
clusterEvalQ(cl, {
library(...) # Loading the necesary packages
source(...) # Source additional scripts
})
# Always if you are doing random numbers
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL ----------------------
ans <- parLapply(
cl,
... long list to iterate ...,
function(x) {
...
},
... further arguments ...
)
# 4. STOP THE CLUSTER
stopCluster(cl)Generally, the ... long list to iterate ... will be a vector or another list that contains either data (e.g., individual datasets), a sequence of numbers (e.g., from 1 to 1000), a list of file paths (if you were processing files individually), or directly a short sequence with numbers from 1 to the number of nodes (least common application).
When calling parLapply or parSapply (the parallel versions of lapply and sapply respectively), the function call will automatically split the iterations across nodes using the splitIndices function. Here is an example of what happens under the hood:
# Distributing 9 iterations across two cores
(n_iterations <- parallel::splitIndices(nx = 9, ncl = 2))[[1]]
[1] 1 2 3 4
[[2]]
[1] 5 6 7 8 9
Which means that the first R session will get 4 jobs, wereas the second R session will get 5 jobs. This way, each spawned R session (child session) gets to do a similiar number of iterations.
Example: Running a linear regression across multiple columns
In genomics, it is common to analyze genomic data at the gene level comparing expression levels against some phenotype/disease. A simple analysis consists of running a linear regression across multiple columns (genes) of a data frame. The following code-block generates some artificial data we can use for this example:
set.seed(331)
n_genes <- 10000
n_obs <- 1000
# A random matrix of omics
X_genes <- rnorm(n_obs * n_genes) |>
matrix(nrow = n_obs)
# A random phenotype (completely unrelated for this example)
Y <- rnorm(n_obs) |> cbind()We will wrap the analysis into a function so we can do benchmarking. We will use the lapply function to iterate over the columns of X_genes
ols_serial <- function(X, Y) {
lapply(
X = seq_len(n_genes),
FUN = \(i) {lm.fit(X[, i, drop = FALSE], Y) |> coef()}
) |> do.call(what = rbind)
}
# Calling the function and looking at the first few rows
ols_serial(X_genes, Y) |> head() x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
Like we did in the efficient programming section, instead of using lm() or glm(), we can use lm.fit() for better performance. The lm.fit() function does less than the lm() function by skipping computing residuals and other overhead, making it faster for large datasets.
Using parallel computing (and following the template we presented earlier), this could be done in the following way with the parallel package:
library(parallel)
ols_parallel <- function(X, Y, ncores) {
# 1. CREATING A CLUSTER ----------------
cl <- makePSOCKcluster(ncores)
# This will be called when exiting the function
on.exit(stopCluster(cl))
# 2. PREPARING THE CLUSTER -------------
# We copy the data over
clusterExport(cl, c("X", "Y"), envir = environment())
# 3. DO YOUR CALL ----------------------
parLapply(
cl,
seq_len(n_genes),
function(i) {
lm.fit(X[, i, drop = FALSE], Y) |> coef()
}
) |> do.call(what = rbind)
}
# Checking it works
ols_parallel(X_genes, Y, ncores = 4L) |> head() x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
Just like the return(), on.exit() can only be used within a function call. We could have used stopCluster(cl) at the end as we do in our template example, but the benefit of using on.exit() is that it will be called automatically when the function exits, even if an error occurs. This helps to ensure that the cluster is always stopped properly.
Now that we have the function implemented, we can go ahead and (1) compare results and (2) measure performance.
library(microbenchmark)
microbenchmark(
serial = ols_serial(X_genes, Y),
parallel = ols_parallel(X_genes, Y, ncores = 4L),
times = 10L,
check = "identical"
)Warning in microbenchmark(serial = ols_serial(X_genes, Y), parallel =
ols_parallel(X_genes, : less accurate nanosecond times to avoid potential
integer overflows
Unit: milliseconds
expr min lq mean median uq max neval
serial 370.0829 387.6258 416.4014 412.2651 443.811 477.3256 10
parallel 1302.7103 1358.1617 1408.5180 1414.3330 1437.407 1591.6240 10
From the comparison, we can see that the parallel version is significantly slower than the serial version. Two things to note here are (a) the task we are running is already fast (about 0.3 seconds on average for the serial run) and (b) there is an overhead cost associated with creating, preparing, and stopping the cluster. As we mentioned earlier, parallel optimizations only make sense if your code is already talking a significant amount of time, making the overhead cost associated with the setup relatively small. The following implementation of the function should make it significantly faster:
ols_parallel2 <- function(cl) {
# 1. CREATING A CLUSTER ----------------
# 2. PREPARING THE CLUSTER -------------
# No need anymore as we are handling the core outside
# 3. DO YOUR CALL ----------------------
parLapply(
cl,
seq_len(n_genes),
function(i) {
lm.fit(X_genes[, i, drop = FALSE], Y) |> coef()
}
) |> do.call(what = rbind)
}
# Checking it works
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))
ols_parallel2(cl) |> head() x1
[1,] 0.029403088
[2,] 0.008907854
[3,] -0.027246099
[4,] -0.031280262
[5,] -0.001309752
[6,] 0.066971469
# 4. STOP THE CLUSTER
stopCluster(cl)The main differences from the previous version of the function are:
We are creating the cluster outside of the function and passing it as an argument.
We are exporting the
X_genesandYvariables to the cluster only once, which should also reduce overhead significantly.Because of the previous step, we are now calling
X_genesdirectly in the main function.The cluster is stopped outside of the function call (since the function no longer manages the cluster object).
Let’s measure the performance to see how much faster the parallel version is.
library(microbenchmark)
# We need to prepare the cluster before hand
cl <- makePSOCKcluster(4)
clusterExport(cl, c("X_genes", "Y"))
microbenchmark(
serial = ols_serial(X_genes, Y),
parallel = ols_parallel2(cl),
times = 10L,
check = "identical"
)Unit: milliseconds
expr min lq mean median uq max neval
serial 357.84542 386.2114 409.8269 395.0364 404.9950 578.7929 10
parallel 98.20041 100.9171 115.2487 110.9535 133.6062 136.9356 10
# We need to stop the cluster
stopCluster(cl)Now, the parallel version is significantly faster than the serial version. Just using the parallel package (or any other package that can be used for parallel computing) does not guarantee improved performance.
More examples
The following three examples are a simple application of the package in which we are explicitly running as many replications as threads the cluster has. Generally, the number of replicates will be a function of the data.
Ex 1: Parallel RNG with makePSOCKCluster
Using more threads than cores available on your computer is never a good idea. As a rule of thumb, clusters should be created using parallel::detectCores() - 1 cores (so you leave one free for the rest of your computer.)
# 1. CREATING A CLUSTER
library(parallel)
nnodes <- 4L
cl <- makePSOCKcluster(nnodes)
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123) # Equivalent to `set.seed(123)`
# 3. DO YOUR CALL
ans <- parSapply(cl, 1:nnodes, function(x) runif(1e3))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Making sure it is reproducible
# I want to get the same!
clusterSetRNGStream(cl, 123)
ans1 <- var(parSapply(cl, 1:nnodes, function(x) runif(1e3)))
# 4. STOP THE CLUSTER
stopCluster(cl)
all.equal(ans0, ans1) # All equal![1] TRUE
Ex 2: Parallel RNG with makeForkCluster
In the case of makeForkCluster
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims <- 1e3
nnodes <- 4L
cl <- makeForkCluster(nnodes)
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL
ans <- do.call(cbind, parLapply(cl, 1:nnodes, function(x) {
runif(nsims) # Look! we use the nsims object!
# This would have fail in makePSOCKCluster
# if we didn't copy -nsims- first.
}))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Again, we want to make sure this is reproducible
# Same sequence with same seed
clusterSetRNGStream(cl, 123)
ans1 <- var(do.call(cbind, parLapply(cl, 1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros [,1] [,2] [,3] [,4]
[1,] 0 0 0 0
[2,] 0 0 0 0
[3,] 0 0 0 0
[4,] 0 0 0 0
# 4. STOP THE CLUSTER
stopCluster(cl)Ex 3: Parallel RNG with mclapply (Forking on the fly)
In the case of mclapply, the forking (cluster creation) is done on the fly!
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims <- 1e3
nnodes <- 4L
# cl <- makeForkCluster(nnodes) # mclapply does it on the fly
# 2. PREPARING THE CLUSTER
set.seed(123)
# 3. DO YOUR CALL
ans <- do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims)))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.084918209 -0.0031421153 0.0018527358 -0.0010697658
[2,] -0.003142115 0.0854694877 -0.0005553091 0.0052735608
[3,] 0.001852736 -0.0005553091 0.0817167841 -0.0006622197
[4,] -0.001069766 0.0052735608 -0.0006622197 0.0851134570
Once more, we want to make sure this is reproducible
# Same sequence with same seed
set.seed(123)
ans1 <- var(do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros [,1] [,2] [,3] [,4]
[1,] -0.0020320384 0.0015893915 6.164758e-04 -2.478574e-03
[2,] 0.0015893915 0.0007625405 9.410214e-04 5.849171e-03
[3,] 0.0006164758 0.0009410214 -1.148884e-03 4.598675e-05
[4,] -0.0024785736 0.0058491714 4.598675e-05 5.082166e-03
# 4. STOP THE CLUSTER
# stopCluster(cl) no need of doing this anymoreExercise: Overhead costs
Compare the timing of taking the sum of 100 numbers when parallelized versus not. For the unparallized (serialized) version, use the following:
set.seed(123)
x <- runif(n=100)
serial_sum <- function(x){
x_sum <- sum(x)
return(x_sum)
}For the parallized version, follow this outline
library(parallel)set.seed(123)
x <- runif(n=100)
parallel_sum <- function(){
# Set number of cores to use
# make cluster and export to the cluster the x variable
# Use "split function to divide x up into as many chunks as the number of cores
# Calculate partial sums doing something like:
partial_sums <- parallel::parSapply(cl, x_split, sum)
# Stop the cluster
# Add and return the partial sums
}Compare the timing of the two approaches:
microbenchmark::microbenchmark(
serial = serial_sum(x),
parallel = parallel_sum(x),
times = 10,
unit = "relative"
)