Python: How to use Value and Array in Multiprocessing pool
I never knew "the reason" for this, but multiprocessing
(mp
) uses different pickler/unpickler mechanisms for functions passed to most Pool
methods. It's a consequence that objects created by things like mp.Value
, mp.Array
, mp.Lock
, ..., can't be passed as arguments to such methods, although they can be passed as arguments to mp.Process
and to the optional initializer
function of mp.Pool()
. Because of the latter, this works:
import multiprocessing as mp
def init(aa, vv):
global a, v
a = aa
v = vv
def worker(i):
a[i] = v.value * i
if __name__ == "__main__":
N = 10
a = mp.Array('i', [0]*N)
v = mp.Value('i', 3)
p = mp.Pool(initializer=init, initargs=(a, v))
p.map(worker, range(N))
print(a[:])
and that prints
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
That's the only way I know of to get this to work across platforms.
On Linux-y platforms (where mp
creates new processes via fork()
), you can instead create your mp.Array
and mp.Value
(etc) objects as module globals any time before you do mp.Pool()
. Processes created by fork()
inherit whatever is in the module global address space at the time mp.Pool()
executes.
But that doesn't work at all on platforms (read "Windows") that don't support fork()
.
Comments
-
Merlin almost 2 years
For
multiprocessing
withProcess
, I can useValue, Array
by settingargs
param.With
multiprocessing
withPool
, how can I useValue, Array.
There is nothing in the docs on how to do this.from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
I am trying to use
Value, Array
within the code snippet below.import multiprocessing def do_calc(data): # access num or # work to update arr newdata =data * 2 return newdata def start_process(): print 'Starting', multiprocessing.current_process().name if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) inputs = list(range(10)) print 'Input :', inputs pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size,initializer=start_process, ) pool_outputs = pool.map(do_calc, inputs) pool.close() # no more tasks pool.join() # wrap up current tasks print 'Pool :', pool_outputs
-
Tim Peters over 7 yearsIf your original question was answered, you should accept the answer, and open new issues for new questions. Otherwise posts become an incomprehensible mess of endless, unrelated comments.