Multiprocessing Queue.get() hangs

11,884

Solution 1

Comments suggest you're trying to run this on Windows. As I said in a comment,

If you're running this on Windows, it can't work - Windows doesn't have fork(), so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue in main(), and pass it as an argument to the worker function.

Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:

def process_append_queue(append_queue):
    while True:
        x = append_queue.get()
        if x is None:
            break
        print("processed %d" % x)
    print("worker done")

def main():
    import multiprocessing as mp

    append_queue = mp.Queue(10)
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
    append_queue_process.start()
    for i in range(100):
        append_queue.put(i)
    append_queue.put(None)  # tell worker we're done
    append_queue_process.join()

if __name__=="__main__":
    main()

The output is the "obvious" stuff:

processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done

Note: because Windows doesn't (can't) fork(), it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue, wholly unrelated to the Queue in the other process. In the approach shown above, only the main process creates a Queue, and the main process passes it (as an argument) to the worker process.

Solution 2

queue.Queue is thread-safe, but doesn't work across processes. This is quite easy to fix, though. Instead of:

from multiprocessing import Process
from Queue import Queue

You want:

from multiprocessing import Process, Queue
Share:
11,884
skyguy126
Author by

skyguy126

Updated on July 19, 2022

Comments

  • skyguy126
    skyguy126 almost 2 years

    I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.

    import time, sys, random, threading
    from multiprocessing import Process
    from Queue import Queue
    from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
    
    append_queue = Queue(10)
    database = FrequencyStore()
    
    def add_to_append_queue(_list):
        append_queue.put(_list)
    
    def process_append_queue():
        while True:
            item = append_queue.get()
            database.append(item)
            print("Appended to database in %.4f seconds" % database.append_time)
            append_queue.task_done()
        return
    
    def main():
        database.load_db()
        print("Database loaded in %.4f seconds" % database.load_time)
        append_queue_process = Process(target=process_append_queue)
        append_queue_process.daemon = True
        append_queue_process.start()
        #t = threading.Thread(target=process_append_queue)
        #t.daemon = True
        #t.start()
    
        while True:
            path = raw_input("file: ")
            if path == "exit":
                break
            a = AnalyzeFrequency(path)
            a.analyze()
            print("Analyzed file in %.4f seconds" % a._time)
            add_to_append_queue(a.get_results())
    
        append_queue.join()
        #append_queue_process.join()
        database.save_db()
        print("Database saved in %.4f seconds" % database.save_time)
        sys.exit(0)
    
    if __name__=="__main__":
        main()
    

    The AnalyzeFrequency analyzes the frequencies of words in a file and get_results() returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.

    This list is then passed to the add_to_append_queue method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis in main() so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs at item = append_queue.get().

    Could someone please explain what is happening here, and perhaps direct me toward a fix?

    All answers appreciated!

    UPDATE

    The pickle error was my fault, it was just a typo. Now I am using the Queue class within multiprocessing but the append_queue.get() method still hangs. NEW CODE

    import time, sys, random
    from multiprocessing import Process, Queue
    from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency
    
    append_queue = Queue()
    database = FrequencyStore()
    
    def add_to_append_queue(_list):
        append_queue.put(_list)
    
    def process_append_queue():
        while True:
            database.append(append_queue.get())
            print("Appended to database in %.4f seconds" % database.append_time)
        return
    
    def main():
        database.load_db()
        print("Database loaded in %.4f seconds" % database.load_time)
        append_queue_process = Process(target=process_append_queue)
        append_queue_process.daemon = True
        append_queue_process.start()
        #t = threading.Thread(target=process_append_queue)
        #t.daemon = True
        #t.start()
    
        while True:
            path = raw_input("file: ")
            if path == "exit":
                break
            a = AnalyzeFrequency(path)
            a.analyze()
            print("Analyzed file in %.4f seconds" % a._time)
            add_to_append_queue(a.get_results())
    
        #append_queue.join()
        #append_queue_process.join()
        print str(append_queue.qsize())
        database.save_db()
        print("Database saved in %.4f seconds" % database.save_time)
        sys.exit(0)
    
    if __name__=="__main__":
        main()
    

    UPDATE 2

    This is the database code:

    class FrequencyStore:
    
        def __init__(self):
            self.sorter = Sorter()
            self.db = {}
            self.load_time = -1
            self.save_time = -1
            self.append_time = -1
            self.sort_time = -1
    
        def load_db(self):
            start_time = time.time()
    
            try:
                file = open("results.txt", 'r')
            except:
                raise IOError
    
            self.db = {}
            for line in file:
                word, count = line.strip("\n").split("=")
                self.db[word] = int(count)
            file.close()
    
            self.load_time = time.time() - start_time
    
        def save_db(self):
            start_time = time.time()
    
            _db = []
            for key in self.db:
                _db.append([key, self.db[key]])
            _db = self.sort(_db)
    
            try:
                file = open("results.txt", 'w')
            except:
                raise IOError
    
            file.truncate(0)
            for x in _db:
                file.write(x[0] + "=" + str(x[1]) + "\n")
            file.close()
    
            self.save_time = time.time() - start_time
    
        def create_sorted_db(self):
            _temp_db = []
            for key in self.db:
                _temp_db.append([key, self.db[key]])
            _temp_db = self.sort(_temp_db)
            _temp_db.reverse()
            return _temp_db
    
        def get_db(self):
            return self.db
    
        def sort(self, _list):
            start_time = time.time()
    
            _list = self.sorter.mergesort(_list)
            _list.reverse()
    
            self.sort_time = time.time() - start_time
            return _list
    
        def append(self, _list):
            start_time = time.time()
    
            for x in _list:
                if x[0] not in self.db:
                    self.db[x[0]] = x[1]
                else:
                    self.db[x[0]] += x[1]
    
            self.append_time = time.time() - start_time
    
    • Tim Peters
      Tim Peters almost 8 years
      A Queue.Queue does not work across processes. So first change is to use a multiprocessing.Queue instead.
  • James Scholes
    James Scholes almost 8 years
    I'm a blind screen reader user and can't read a traceback in an image, sorry. It also doesn't help people who Google for the text in your traceback at a later date. Preferably, update the question, post the traceback in a comment, or create a new one.
  • James Scholes
    James Scholes almost 8 years
    Your database isn't process-safe. You're creating the database instance in the main Python process, then telling Python to do something with it in a completely different one. So the call to database.append is hanging, not the call to Queue.get. The whole point of using a queue is to avoid this exact problem.
  • skyguy126
    skyguy126 almost 8 years
    I just tested the Queue.get and the database.append methods with a print statement after each. The Queue.get statement never proceeds, it just blocks. I also updated the original question with the code from the database.
  • skyguy126
    skyguy126 almost 8 years
    Just as I suspected. When I run this on linux, it works with no problems whatsoever. Maybe it is due to something in windows?
  • James Scholes
    James Scholes almost 8 years
    Impossible to say without a runnable sample.
  • Tim Peters
    Tim Peters almost 8 years
    If you're running this on Windows, it can't work - Windows doesn't have fork(), so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue in main(), and pass it as an argument to the worker function.
  • skyguy126
    skyguy126 almost 8 years
    Thanks for that info, it is very helpful!
  • skyguy126
    skyguy126 almost 8 years
    The code works! One more thing, If you take a look at my FrequencyAnalysis.py you will see the FrequencyStorage class. If I call the append method from another process, will it not update the instance variable in that class. I am passing the object as a parameter like you did with queue.
  • Tim Peters
    Tim Peters almost 8 years
    That would be a different question entirely, so make the test case as small as possible and post a different question. In general, you shouldn't expect any mutation of any object to be visible across processes. The memory is not shared. A multiprocessing.Queue works because it's implemented from the ground up to make mutations visible across processes - and that doesn't happen by magic (it happens by under-the-covers interprocess pipes communicating info about mutations among processes, guarded by interprocess semaphores protecting against simultaneous mutations).