Thursday, May 21, 2020

R Parallel Writing to Files

R Parallel Writing to Files

Here I will use foreach and doParallel library to demonstrate how to parallel write to file.

library(stringr)
library(flock)
library(foreach)
library(doParallel)
cl <- makeCluster(detectCores(), outfile = "a.out")
registerDoParallel(cl)
lock0 <-tempfile()
foreach (
  i = 1:10000,
  .combine = cbind,
  .packages = c('stringr', 'flock'),
  .export = ls(globalenv())
) %dopar% {
  locked0 <- flock::lock(lock0)
  write(i,file="outfile.txt"),append=TRUE) 
  flock::unlock(locked0)
}
stopCluster(cl)

The makeCluster(detectCores(), outfile = "a.out") statement make a cluster by using the all available cores, and the console output will be direct to a.out file.

The statement registerDoParallel(cl) register the cluster as the foreach parallel backend.
Note, int the foreach statements, we have .packages = c('stringr', 'flock') and .export = ls(globalenv(). The former exposes the specified packages to the context inside the foreach loop and the latter exposes all the declared variable to the foreach loop. Without this, the inside foreach loop cannot see the outside library or variables.

To avoid data race problem when multiple processes/threads writing to the same file, we use flock library as a mutex and wrap the write operation by flock::lock and flock::unlock.

Using mutex can make the processing really slow. The other way to do this is that each process write to its separate file. You can use the process id in the file name. For example,

write(i,file=paste(c("outfile",Sys.getpid(),".txt"), collapse =""),append=TRUE)

One thing to notice is that, if you parallel processing include database connections, the above code will fail since the parallel process cannot spawn the database connections. You can use the below code initialize the connections when build the cluster using clusterEvalQ.

library(RODBC)   #use the ODBC library
library(DBI)
library(odbc)
odbcCloseAll()
library(foreach)
library(doParallel)
cl <- makeCluster(detectCores(), outfile = "a.out")
clusterEvalQ(cl, {
   library(odbc)
   library(RODBC)
   library(DBI)
   dbname1 = "test"  # change this when change server!!!
   channel1 = RODBC::odbcConnect(dbname1)
   con1 <- DBI::dbConnect(odbc(),  dbname1)
})
registerDoParallel(cl)

No comments:

Post a Comment