Multiprocessing Queue.get() hangs
Solution 1
Comments suggest you're trying to run this on Windows. As I said in a comment,
If you're running this on Windows, it can't work - Windows doesn't have
fork()
, so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue inmain()
, and pass it as an argument to the worker function.
Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon
fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:
def process_append_queue(append_queue):
while True:
x = append_queue.get()
if x is None:
break
print("processed %d" % x)
print("worker done")
def main():
import multiprocessing as mp
append_queue = mp.Queue(10)
append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
append_queue_process.start()
for i in range(100):
append_queue.put(i)
append_queue.put(None) # tell worker we're done
append_queue_process.join()
if __name__=="__main__":
main()
The output is the "obvious" stuff:
processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done
Note: because Windows doesn't (can't) fork()
, it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue
, wholly unrelated to the Queue
in the other process. In the approach shown above, only the main process creates a Queue
, and the main process passes it (as an argument) to the worker process.
Solution 2
queue.Queue
is thread-safe, but doesn't work across processes. This is quite easy to fix, though. Instead of:
from multiprocessing import Process
from Queue import Queue
You want:
from multiprocessing import Process, Queue
skyguy126
Updated on July 19, 2022Comments
-
skyguy126 almost 2 years
I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.
import time, sys, random, threading from multiprocessing import Process from Queue import Queue from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency append_queue = Queue(10) database = FrequencyStore() def add_to_append_queue(_list): append_queue.put(_list) def process_append_queue(): while True: item = append_queue.get() database.append(item) print("Appended to database in %.4f seconds" % database.append_time) append_queue.task_done() return def main(): database.load_db() print("Database loaded in %.4f seconds" % database.load_time) append_queue_process = Process(target=process_append_queue) append_queue_process.daemon = True append_queue_process.start() #t = threading.Thread(target=process_append_queue) #t.daemon = True #t.start() while True: path = raw_input("file: ") if path == "exit": break a = AnalyzeFrequency(path) a.analyze() print("Analyzed file in %.4f seconds" % a._time) add_to_append_queue(a.get_results()) append_queue.join() #append_queue_process.join() database.save_db() print("Database saved in %.4f seconds" % database.save_time) sys.exit(0) if __name__=="__main__": main()
The AnalyzeFrequency analyzes the frequencies of words in a file and
get_results()
returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.This list is then passed to the
add_to_append_queue
method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis inmain()
so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs atitem = append_queue.get()
.Could someone please explain what is happening here, and perhaps direct me toward a fix?
All answers appreciated!
UPDATE
The pickle error was my fault, it was just a typo. Now I am using the Queue class within multiprocessing but the append_queue.get() method still hangs. NEW CODE
import time, sys, random from multiprocessing import Process, Queue from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency append_queue = Queue() database = FrequencyStore() def add_to_append_queue(_list): append_queue.put(_list) def process_append_queue(): while True: database.append(append_queue.get()) print("Appended to database in %.4f seconds" % database.append_time) return def main(): database.load_db() print("Database loaded in %.4f seconds" % database.load_time) append_queue_process = Process(target=process_append_queue) append_queue_process.daemon = True append_queue_process.start() #t = threading.Thread(target=process_append_queue) #t.daemon = True #t.start() while True: path = raw_input("file: ") if path == "exit": break a = AnalyzeFrequency(path) a.analyze() print("Analyzed file in %.4f seconds" % a._time) add_to_append_queue(a.get_results()) #append_queue.join() #append_queue_process.join() print str(append_queue.qsize()) database.save_db() print("Database saved in %.4f seconds" % database.save_time) sys.exit(0) if __name__=="__main__": main()
UPDATE 2
This is the database code:
class FrequencyStore: def __init__(self): self.sorter = Sorter() self.db = {} self.load_time = -1 self.save_time = -1 self.append_time = -1 self.sort_time = -1 def load_db(self): start_time = time.time() try: file = open("results.txt", 'r') except: raise IOError self.db = {} for line in file: word, count = line.strip("\n").split("=") self.db[word] = int(count) file.close() self.load_time = time.time() - start_time def save_db(self): start_time = time.time() _db = [] for key in self.db: _db.append([key, self.db[key]]) _db = self.sort(_db) try: file = open("results.txt", 'w') except: raise IOError file.truncate(0) for x in _db: file.write(x[0] + "=" + str(x[1]) + "\n") file.close() self.save_time = time.time() - start_time def create_sorted_db(self): _temp_db = [] for key in self.db: _temp_db.append([key, self.db[key]]) _temp_db = self.sort(_temp_db) _temp_db.reverse() return _temp_db def get_db(self): return self.db def sort(self, _list): start_time = time.time() _list = self.sorter.mergesort(_list) _list.reverse() self.sort_time = time.time() - start_time return _list def append(self, _list): start_time = time.time() for x in _list: if x[0] not in self.db: self.db[x[0]] = x[1] else: self.db[x[0]] += x[1] self.append_time = time.time() - start_time
-
Tim Peters almost 8 yearsA
Queue.Queue
does not work across processes. So first change is to use amultiprocessing.Queue
instead.
-
-
James Scholes almost 8 yearsI'm a blind screen reader user and can't read a traceback in an image, sorry. It also doesn't help people who Google for the text in your traceback at a later date. Preferably, update the question, post the traceback in a comment, or create a new one.
-
James Scholes almost 8 yearsYour database isn't process-safe. You're creating the database instance in the main Python process, then telling Python to do something with it in a completely different one. So the call to
database.append
is hanging, not the call toQueue.get
. The whole point of using a queue is to avoid this exact problem. -
skyguy126 almost 8 yearsI just tested the Queue.get and the database.append methods with a print statement after each. The Queue.get statement never proceeds, it just blocks. I also updated the original question with the code from the database.
-
skyguy126 almost 8 yearsJust as I suspected. When I run this on linux, it works with no problems whatsoever. Maybe it is due to something in windows?
-
James Scholes almost 8 yearsImpossible to say without a runnable sample.
-
Tim Peters almost 8 yearsIf you're running this on Windows, it can't work - Windows doesn't have
fork()
, so each process gets its ownQueue
and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create theQueue
inmain()
, and pass it as an argument to the worker function. -
skyguy126 almost 8 yearsThanks for that info, it is very helpful!
-
skyguy126 almost 8 yearsThe code works! One more thing, If you take a look at my FrequencyAnalysis.py you will see the FrequencyStorage class. If I call the append method from another process, will it not update the instance variable in that class. I am passing the object as a parameter like you did with queue.
-
Tim Peters almost 8 yearsThat would be a different question entirely, so make the test case as small as possible and post a different question. In general, you shouldn't expect any mutation of any object to be visible across processes. The memory is not shared. A
multiprocessing.Queue
works because it's implemented from the ground up to make mutations visible across processes - and that doesn't happen by magic (it happens by under-the-covers interprocess pipes communicating info about mutations among processes, guarded by interprocess semaphores protecting against simultaneous mutations).