Shared memory in parallel foreach in R
Solution 1
I think the solution to the problem can be seen from the post of Steve Weston, the author of the foreach
package, here. There he states:
The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.
So I think the problem is that in your code your big matrix c
is referenced in the assignment c<-m[1,1]
. Just try xyz <- m[1,1]
instead and see what happens.
Here is an example with a file-backed big.matrix
:
#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double",
separated = FALSE,
backingfile = "example.bin",
descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
invisible(c) ## c is referenced and thus exported to workers
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
closeAllConnections()
Solution 2
Alternatively, if you are on Linux/Mac and you want a CoW shared memory, use forks. First load all your data into the main thread, and then launch working threads (forks) with general function mcparallel
from the parallel
package.
You can collect their results with mccollect
or with the use of truly shared memory using the Rdsm
library, like this:
library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number
job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23
You can confirm, that the value really gets updated in backgruound, if you delay the write:
fn<-function()
{
Sys.sleep(1) #One second delay
shared[1]<-11
}
job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)
To control for concurency and avoid race conditions use locks:
library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"
bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
}
good.incr<-function()
{
lock(m)
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
unlock(m)
}
shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions
mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6
mccollect()
Edit:
I simplified dependencies a bit by exchanging Rdsm::mgrmakevar
into bigmemory::big.matrix
. mgrmakevar
internally calls big.matrix
anyway, and we don't need anything more.
Related videos on Youtube
Stanislav
I have interest in multivariate sparse representations, Data Mining, Machine Learning, Image Processing, Simulation Methods, Pattern Recognition, Probability, Decision Theory, Multivariate Statistics and Bayesian Theory, but I also enjoy coding and learning new programming languages. My goal here is to help and to learn by helping others and by enjoying the fine postings other have created. Best,
Updated on June 12, 2022Comments
-
Stanislav almost 2 years
Problem Description:
I have a big matrix
c
, loaded in RAM memory. My goal is through parallel processing to have read only access to it. However when I create the connections either I usedoSNOW
,doMPI
,big.matrix
, etc the amount to ram used increases dramatically.Is there a way to properly create a shared memory, where all the processes may read from, without creating a local copy of all the data?
Example:
libs<-function(libraries){# Installs missing libraries and then load them for (lib in libraries){ if( !is.element(lib, .packages(all.available = TRUE)) ) { install.packages(lib) } library(lib,character.only = TRUE) } } libra<-list("foreach","parallel","doSNOW","bigmemory") libs(libra) #create a matrix of size 1GB aproximatelly c<-matrix(runif(10000^2),10000,10000) #convert it to bigmatrix x<-as.big.matrix(c) # get a description of the matrix mdesc <- describe(x) # Create the required connections cl <- makeCluster(detectCores ()) registerDoSNOW(cl) out<-foreach(linID = 1:10, .combine=c) %dopar% { #load bigmemory require(bigmemory) # attach the matrix via shared memory?? m <- attach.big.matrix(mdesc) #dummy expression to test data aquisition c<-m[1,1] } closeAllConnections()
RAM: in the image above, you may find that the memory increases a lot until
foreach
ends and it is freed.-
NoBackingDown almost 9 yearsI have exactly the same problem right now and I'm highly interested in a solution. I also observed that copies are made instead of memory being shared.
-
-
Stanislav almost 9 yearsI could not see that
c<-m[1,1]
actually loadsc
, since I expected this would generate a new variable instead of , well reading it. This means that actually the memory is shared and I have been losing my time exploring different options because ofc
. Thank you so much for the help! PS: I do not think that the code bellow invisible is ever executed. -
NoBackingDown almost 9 years@Stanislav I agree that it's a bit unexpected behaviour. If my answer solves your problem, I would be glad if you would consider accepting it.
-
cdeterman almost 9 years@Stanislav This answer is correct, you need to be certain of what you are actually exporting to the workers. It is generally good practice to not have variable names the same inside and outside of loops unless you actually modifying the same object.