Python Multiprocessing concurrency using Manager, Pool and a shared list not working

10,374

It's simply because you instantiate your pool before defining the function get_files :

import os
import multiprocessing

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()

files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here

pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

The overall idea of a process is that at the instant you start it, you fork the memory of the main process. So any definition done in the main process after the fork will not be in the subprocess.

If you want a shared memory, you can use the threading library, but you will have some issues with it (cf: The global interpreter lock)

Share:
10,374
nohup
Author by

nohup

Writes bad code occasionally.

Updated on June 15, 2022

Comments

  • nohup
    nohup almost 2 years

    I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only.

    #!/usr/bin/python
    import os
    import multiprocessing
    tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
    manager = multiprocessing.Manager()
    files = manager.list()
    
    
    def get_files(x):
        for root, dir, file in os.walk(x):
            for name in file:
                files.append(os.path.join(root, name))
    
    mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
          for x in range(len(tld))]
    
    for i in mp:
        i.start()
        i.join()
    print len(files)
    

    When I checked the process tree, I can see only a single chile processes spawned. (man pstree says {} denotes the child process spawned by the parent.)

    ---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                                   `-python(12750)`
    

    What I was looking for was, to spawn a process for each tld directory, populate the shared list files, and that would be around 10-15 processes depending on the number of directories. What am I doing wrong?

    EDIT::

    I used multiprocessing.Pool to create worker threads, and this time the processes are spawned, but is giving errors when I try to usemultiprocessing.Pool.map(). I was referring to the following code in python docs that shows

    from multiprocessing import Pool
    def f(x):
    return x*x
    
    if __name__ == '__main__':
        p = Pool(5)
        print(p.map(f, [1, 2, 3])) 
    

    Following that example, I rewrote the code as

    import os
    import multiprocessing
    tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
    manager = multiprocessing.Manager()
    pool = multiprocessing.Pool(processes=len(tld))
    print pool
    files = manager.list()
    def get_files(x):
        for root, dir, file in os.walk(x):
            for name in file:
                files.append(os.path.join(root, name))
    pool.map(get_files, [x for x in tld])
    pool.close()
    pool.join()
    print len(files)
    

    and it is forking multiple processes.

    ---bash(10949)---python(12890)-+-python(12967)
                                   |-python(12968)
                                   |-python(12970)
                                   |-python(12971)
                                   |-python(12972)
                                   ---snip---
    

    But the code is erroring saying

    Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

    What am I doing wrong here, and why do the get_files() function errors out?