R-posts.com

Implementing Parallel Processing in R

Interested in publishing a one-time post on R-bloggers.com? Press here to learn how.
If something takes less time if done through parallel processing, why not do it and save time? Modern laptops and PCs today have multi core processors with sufficient amount of memory available and one can use it to generate outputs quickly. Parallelizing your codes has its own numerous advantages. Instead of waiting several minutes or hours while a task completes, one can replace the code, obtain output within seconds or minutes and make it efficient at the same time. Code efficiency is one of the most sought abilities in the industry today and not many people are able to use it. Once you learn, how to parallelize your code, you will only regret that why didn’t you learn it sooner.

Parallelizing your codes in R is simple and there are various methods and packages. Let’s look at some of the functions available in R

lapply() and sapply() functions
lapply() function is used to apply a specified function to the vector or list input. The output of this function is always a list.
lapply(1:5, function(x) x^2) 
#input is 1,2,3,4,5 and output is square of the input
[[1]]
[1] 1

[[2]]
[1] 4

[[3]]
[1] 9

[[4]]
[1] 16

[[5]]
[1] 25

We can also generate multiple outputs, say x^2 and x^3

lapply(1:5, function(x) c(x^2,x^3)) 
#The output should be square and cube of input

[[1]]
[1] 1 1

[[2]]
[1] 4 8

[[3]]
[1]  9 27

[[4]]
[1] 16 64

[[5]]
[1]  25 125

This output is very large but sometimes list is useful. 
An alternative is to use the sapply() function which generates a vector, matrix or array output.

sapply(1:5, function(x) x^2) #This output is a vector
[1]  1  4  9 16 25
sapply(1:5, function(x) c(x^2,x^3)) #This outputs a matrix
     [,1] [,2] [,3] [,4] [,5]
[1,]    1    4    9   16   25
[2,]    1    8   27   64  125

sapply() also provides two additional parameters: simplify and USE.NAMES. If both of them are kept false, the output generated is the same as lappy()
sapply(1:5, function(x) x^2, simplify = FALSE, USE.NAMES = FALSE) 
#Output is same as for lapply()
[[1]]
[1] 1 1

[[2]]
[1] 4 8

[[3]]
[1]  9 27

[[4]]
[1] 16 64

[[5]]
[1]  25 125

The lapply() and sapply() functions are very fast in calculation. This means that the values are calculated independently of each other. However, it is not parallel in execution. There are various packages in R which allow parallelization.

“parallel” Package

The parallel package in R can perform tasks in parallel by providing the ability to allocate cores to R. The working involves finding the number of cores in the system and allocating all of them or a subset to make a cluster. We can then use the parallel version of various functions and run them by passing the cluster as an additional argument. A word of caution: it is important to close the cluster at the end of execution step so that core memory is released.
#Include the parallel library. If the next line does not work, run install.packages(“parallel”) first
library(parallel)
 
# Use the detectCores() function to find the number of cores in system
no_cores <- detectCores()
 
# Setup cluster
clust <- makeCluster(no_cores) #This line will take time

#The parallel version of lapply() is parLapply() and needs an additional cluster argument.
parLapply(clust,1:5, function(x) c(x^2,x^3))
[[1]]
[1] 1 1

[[2]]
[1] 4 8

[[3]]
[1]  9 27

[[4]]
[1] 16 64

[[5]]
[1]  25 125

stopCluster(clust)

If we want a similar output but with sapply(), we use the parSapply() function

#Include the parallel library. If the next line does not work, run install.packages(“parallel”) first
library(parallel)

# Use the detectCores() function to find the number of cores in system
no_cores <- detectCores()

# Setup cluster
clust <- makeCluster(no_cores) #This line will take time

#Setting a base variable 
base <- 4
#Note that this line is required so that all cores in cluster have this variable available
clusterExport(clust, "base")

#Using the parSapply() function
parSapply(clust, 1:5, function(exponent) base^exponent)
[1]    4   16   64  256 1024

stopCluster(clust)
Notice the clusterExport() function here above? This is a special command needed to change the variable scope in parallel execution. Normally, variables such as ‘base’ have a scope which does not allow them to be accessible at all cores. We need to use the clusterExport() function and send the variable to all the assigned cores in the cluster. This is why we pass both the cluster variable as well as the variable we need to export. Changing the base variable after export will have no effect as all the cores will not see that change. Just like there is a scope for variables, libraries also need to be exported to all cores in order to be accessible. If I need to run a task which requires importing libraries, I use the
clusterEvalQ() function
clusterEvalQ(clust,library(randomForest)) 

“foreach” Package

Having basic programming knowledge makes you aware of for loops and the for each package is based on this methodology, making it easy to use. The foreach package also need doParallel package to make the process parallel using the registerDoParallel() function. The starting code looks like this
library(foreach)
library(doParallel)

registerDoParallel(makeCluster(no_cores))
Once this is done, I can execute commands using the foreach() function and do them in parallel using the %dopar% command. The foreach() function includes a parameter .combine which is used to specify the kind of output needed. Using .combine=c gives a vector output while .combine=rbind creates a matrix. If a list output is needed similar to lapply(), we can set .combine=list. We can also obtain dataframe using .combine=data.frame
#Vector output
foreach(exponent = 1:5, .combine = c)  %dopar%  base^exponent

[1]   3   9  27  81 243

#Matrix output
foreach(exponent = 1:5, .combine = rbind)  %dopar%  base^exponent

         [,1]
result.1    3
result.2    9
result.3   27
result.4   81
result.5  243


#List output
foreach(exponent = 1:5, .combine = list, .multicombine=TRUE)  %dopar%  base^exponent

[[1]]
[1] 3

[[2]]
[1] 9

[[3]]
[1] 27

[[4]]
[1] 81

[[5]]
[1] 243

#Data Frame output
foreach(exponent = 1:5, .combine = data.frame)  %dopar%  base^exponent
  result.1 result.2 result.3 result.4 result.5
1        2        4        8       16       32

stopImplicitCluster()

Variations and Controlling Memory Usage

There are a number of different ways to do the same tasks which I did in the code above. For instance, the registerDoParallel() function allows creation of implicit clusters and we don’t need to use the makeCluster() function inside.
#This also works
registerDoParallel(no_cores)
Using implicit cluster means that I don’t have a ‘clust’ variable. To close the cluster, I need a different function known as stopImplicitCluster() at the end of our parallelization task.
stopImplicitCluster()
The foreach() and doParallel() functions have the local variables available at all cores by default. Hence, we don’t need any clusterExport function. However, variables which are not defined locally (such as those part of a parent function) and libraries need to be exported to all cores. For this, we have .export parameter and .packages parameter in foreach() function.
#using .export parameter
registerDoParallel(no_cores)

base <- 2 #Declaring this variable outside the scope of foreach() function

sample_func <- function (exponent) {
  #Using the .export function here to include the base variable
  foreach(exponent = 1:5, .combine = c,.export = "base")  %dopar%  base^exponent
}
sample_func()
[1]  2  4  8 16 32
stopImplicitCluster()

#using .packages parameter
library(dplyr)
registerDoParallel(no_cores)
foreach(i = 1:5, .combine=c, .packages="dplyr") %dopar% {
  iris[i, ] %>% select(-Species) %>% sum
}
[1] 10.2  9.5  9.4  9.4 10.2
stopImplicitCluster()


With parallel processing comes efficient memory usage or your system may crash. The first thing that comes to mind is the ability to use same address(using FORK) versus creating different memory locations(using PSOCK). By default, the makeCluster() function creates addresses of type PSOCK. However, we can change the setting to FORK by passing the type function
clust<-makeCluster(no_cores, type="FORK")
Unless required, it is very useful to use the FORK type cluster to save memory and running time. The makeCluster() function also has an outfile parameter to specify an output file for debugging purpose.
registerDoParallel(makeCluster(no_cores, outfile="debug_file.txt"))
foreach(x=list(1:5, "a"))  %dopar%  print(x)
[[1]]
[1] 1 2 3 4 5

[[2]]
[1] "a"
Contents of the debug_file.txt
starting worker pid=6696 on localhost:11363 at 17:34:36.808
starting worker pid=13168 on localhost:11363 at 17:34:37.421
starting worker pid=11536 on localhost:11363 at 17:34:38.047
starting worker pid=9572 on localhost:11363 at 17:34:38.675
starting worker pid=10612 on localhost:11363 at 17:34:43.467
starting worker pid=8864 on localhost:11363 at 17:34:44.078
starting worker pid=5144 on localhost:11363 at 17:34:44.683
starting worker pid=12012 on localhost:11363 at 17:34:45.286
[1] 1 2a" 3 4 5
We notice that a is printed after 2 in my output due to race. Using a different debug file for each node in a cluster is a better debugging option.
registerDoParallel(makeCluster(no_cores, outfile="debug_file.txt"))
foreach(x=list(1,2,3,4,5, "a"))  %dopar%  cat(dput(x), file = paste0("debug_file_", x, ".txt"))

In this case, I create 6 files containing output for each element in the list and a debug_file. Another way to debug code is using the trycatch() functions.

registerDoParallel(makeCluster(no_cores))
foreach(x=list(1, 2, "a"))  %dopar%  
{
  tryCatch({
    c(1/x) #Should give an error when x is “a”
  }, error = function(e) return(paste0("Error occurred for '", x, "'", 
                                       " The error is '", e, "'")))
}

[[1]]
[1] 1

[[2]]
[1] 0.5

[[3]]
[1] "Error occurred for 'a' The error is 'Error in 1/x: non-numeric argument to binary operator\n'"

Debugging helps find out the reasons for errors but what if the error is related to running out of memory. For this, I can use rm() and gc() functions in R. The rm() function is used to remove a variable from the environment. If you’re sure that you no longer need this variable, it is better to free up memory using the rm() function.
base=4 #Create a variable base whose value is 4
base_copy=base #Make a copy of the variable 
rm(base) #I can now remove the base variable and free up memory
To clean up the entire environment, use rm(list=ls()). It removes all the variables but does not remove libraries.
rm(list=ls())
The gc() function is the garbage collector for R and is automatically implemented. However, in a parallel environment, the gc() function is useful to return the memory regularly.

Summary

Parallel programming may seem a complex process at first but the amount of time saved after executing tasks in parallel makes it worth the try. Functions such as lapply() and sapply() are great alternatives to time consuming looping functions while parallel, foreach and doParallel packages are great starting points to running tasks in parallel. These parallel processes are based on functions and are also modular. However, with great power comes a risk of code crashes. Hence it is necessary to be careful and be aware of ways to control memory usage and error handling. It is not necessary to parallelize every piece of code that you write. You can always write sequential code and decide to parallelize the parts which take significant amounts of time. This will help in further reducing out of memory instances and writing robust and fast codes. The use of parallel programing method is growing and many packages now have parallel implementations available. With this article. one can dive deep into the world of parallel programming and make full use of the vast memory and processing power to generate output quickly. The full code for this article is as follows.
lapply(1:5, function(x) x^2) #input is 1,2,3,4,5 and output is square of the input

lapply(1:5, function(x) c(x^2,x^3)) #The output should be square and cube of input

sapply(1:5, function(x) x^2) #This output is a vector

sapply(1:5, function(x) c(x^2,x^3)) #This outputs a matrix

sapply(1:5, function(x) x^2, simplify = FALSE, USE.NAMES = FALSE) #Output is same as for lapply()

#Include the parallel library. If the next line does not work, run install.packages(“parallel”) first
library(parallel)

# Use the detectCores() function to find the number of cores in system
no_cores <- detectCores()

# Setup cluster
clust <- makeCluster(no_cores) #This line will take time

#The parallel version of lapply() is parLapply() and needs an additional cluster argument.
parLapply(clust,1:5, function(x) c(x^2,x^3))
stopCluster(clust)

#Include the parallel library. If the next line does not work, run install.packages(“parallel”) first
library(parallel)

# Use the detectCores() function to find the number of cores in system
no_cores <- detectCores()

# Setup cluster
clust <- makeCluster(no_cores) #This line will take time

#Setting a base variable 
base <- 4
#Note that this line is required so that all cores in cluster have this variable available
clusterExport(clust, "base")

#Using the parSapply() function
parSapply(clust, 1:5, function(exponent) base^exponent)
stopCluster(clust)

clusterEvalQ(clust,library(randomForest))

library(foreach)
library(doParallel)

registerDoParallel(makeCluster(no_cores))

#Vector output
foreach(exponent = 1:5, .combine = c)  %dopar%  base^exponent

#Matrix output
foreach(exponent = 1:5, .combine = rbind)  %dopar%  base^exponent

#List output
foreach(exponent = 1:5, .combine = list, .multicombine=TRUE)  %dopar%  base^exponent

#Data Frame output
foreach(exponent = 1:5, .combine = data.frame)  %dopar%  base^exponent


#This also works
registerDoParallel(no_cores)

stopImplicitCluster()

#using .export parameter
registerDoParallel(no_cores)

base <- 2 #Declaring this variable outside the scope of foreach() function

sample_func <- function (exponent) {
  #Using the .export function here to include the base variable
  foreach(exponent = 1:5, .combine = c,.export = "base")  %dopar%  base^exponent
}
sample_func()
stopImplicitCluster()

#using .packages parameter
library(dplyr)
registerDoParallel(no_cores)
foreach(i = 1:5, .combine=c, .packages="dplyr") %dopar% {
  iris[i, ] %>% select(-Species) %>% sum
}
stopImplicitCluster()

clust<-makeCluster(no_cores, type="FORK")

registerDoParallel(makeCluster(no_cores, outfile="debug_file.txt"))
foreach(x=list(1:5, "a"))  %dopar%  print(x)

registerDoParallel(makeCluster(no_cores, outfile="debug_file.txt"))
foreach(x=list(1,2,3,4,5, "a"))  %dopar%  cat(dput(x), file = paste0("debug_file_", x, ".txt"))

registerDoParallel(makeCluster(no_cores))
foreach(x=list(1, 2, "a"))  %dopar%  
{
  tryCatch({
    c(1/x) #Should give an error when x is “a”
  }, error = function(e) return(paste0("Error occurred for '", x, "'", 
                                       " The error is '", e, "'")))
}

base=4 #Create a variable base whose value is 4
base_copy=base #Make a copy of the variable 
rm(base) #I can now remove the base variable and free up memory

rm(list=ls())


Bio: Chaitanya Sagar is the Founder and CEO of Perceptive Analytics. Perceptive Analytics has been chosen as one of the top 10 analytics companies to watch out for by Analytics India Magazine. It works on Marketing Analytics for e-commerce, Retail and Pharma companies.
Exit mobile version