python threading Queue producer-consumer with thread-safe
Solution 1
I recommend you read about the producer-consumer problem. Your producers are the fetch threads. Your consumer is the save
function. If I understand correctly, you want the consumer to save the fetched result as soon as its available. For this to work, the producer and consumer must be able to communicate in some thread-safe way (e.g. a queue).
Basically, you need another queue. It would replace proxy_array
. Your save
function will look something like this:
while True:
try:
data = fetch_data_from_output_queue()
save_to_database(data)
except EmptyQueue:
if not stop_flag.is_set():
# All done
break
time.sleep(1)
continue
This save
function will need to run in its own thread. stop_flag
is an Event that gets set after you join your fetch threads.
From a high level, your application will look like this:
input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()
stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete
Solution 2
I have to come up with something similar producer-consumer. Producer generates an 'id' and consumer consumes that id to do some url fetch and processing it to it. Here is my skeleton code which does that
import Queue
import random
import threading
import time
import sys
data_queue = Queue.Queue()
lock = threading.Lock()
def gcd(a, b):
while b != 0:
a,b = b, a%b
return b
def consumer(idnum):
while True:
try:
data = data_queue.get(block=False)
except Exception, e:
print 'Exception ' + str(e)
else:
with lock:
print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))
time.sleep(1)
data_queue.task_done()
def producer(idnum, count):
for i in range(count):
a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
with lock:
print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
data_queue.put((a,b))
time.sleep(0.5)
if __name__ == '__main__':
num_producers = 1
num_consumers = 2
num_integer_pairs = 10
for i in range(num_consumers):
t = threading.Thread(target=consumer, args=(i,))
t.daemon = True
t.start()
threads = []
for ii in range(num_producers):
thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
threads.append(thread)
thread.start()
# wait for the producers threads to finish
for thread in threads:
thread.join()
print 'done with producer threads'
# wait till all the jobs are done in the queue
data_queue.join()
with lock:
print 'all consumer threads finished'
with lock:
print 'main thread exited'
Comments
-
Mithril almost 2 years
I am using threading and Queue to fetch url and store to database.
I just want one thread to do storing job.
so I write code as below:import threading import time import Queue site_count = 10 fetch_thread_count = 2 site_queue = Queue.Queue() proxy_array=[] class FetchThread(threading.Thread): def __init__(self,site_queue,proxy_array): threading.Thread.__init__(self) self.site_queue = site_queue self.proxy_array = proxy_array def run(self): while True: index = self.site_queue.get() self.get_proxy_one_website(index) self.site_queue.task_done() def get_proxy_one_website(self,index): print '{0} fetched site :{1}\n'.format(self.name,index) self.proxy_array.append(index) def save(): while True: if site_queue.qsize() > 0: if len(proxy_array) > 10: print 'save :{0} to database\n'.format(proxy_array.pop()) else: time.sleep(1) elif len(proxy_array) > 0: print 'save :{0} to database\n'.format(proxy_array.pop()) elif len(proxy_array) == 0: print 'break' break else: print 'continue' continue def start_crawl(): global site_count,fetch_thread_count,site_queue,proxy_array print 'init' for i in range(fetch_thread_count): ft = FetchThread(site_queue,proxy_array) ft.setDaemon(True) ft.start() print 'put site_queue' for i in range(site_count): site_queue.put(i) save() print 'start site_queue join' site_queue.join() print 'finish' start_crawl()
excuted output:
init put site_queue Thread-1 fetched site :0 Thread-2 fetched site :1 Thread-1 fetched site :2 Thread-2 fetched site :3 Thread-1 fetched site :4 Thread-2 fetched site :5 Thread-1 fetched site :6 Thread-2 fetched site :7 Thread-1 fetched site :8 Thread-2 fetched site :9 save :9 to database save :8 to database save :7 to database save :6 to database save :5 to database save :4 to database save :3 to database save :2 to database save :1 to database save :0 to database break start site_queue join finish [Finished in 1.2s]
Why
save()
function run aftersite_queue.join()
which written aftersave()
.
I also have substitutedsave()
with a thread function ,but it doesn't work too.
Does it mean I must changeproxy_array=[]
toproxy_queue=Queue.Queue()
,then I can use theading to store data?
I just want one thead to do this,and there is not any other theads would get data fromproxy_array
, why should I join it?Using Queue seems very weird.
Is there any better solusion?UPDATE:
I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster. I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):Thread-2 fetched site :1 Thread-1 fetched site :2 save :0 to database Thread-2 fetched site :3 Thread-1 fetched site :4 save :2 to database save :3 to database Thread-2 fetched site :5 Thread-1 fetched site :6 save :4 to database .......
UPDATE2 for someone has same question as below:
question:
As I saying as above context,there is not any other theads would get data from proxy_array.
I just can not imagine why it would break thread-safe?
answer:
producer-consumer problem in misha's answer, I understand after reading it carefully.
question:
And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)
this is what I cannot figure out,I would update after finded the answer.