python threading Queue producer-consumer with thread-safe

10,686

Solution 1

I recommend you read about the producer-consumer problem. Your producers are the fetch threads. Your consumer is the save function. If I understand correctly, you want the consumer to save the fetched result as soon as its available. For this to work, the producer and consumer must be able to communicate in some thread-safe way (e.g. a queue).

Basically, you need another queue. It would replace proxy_array. Your save function will look something like this:

while True:
 try:
   data = fetch_data_from_output_queue()
   save_to_database(data)
 except EmptyQueue:
   if not stop_flag.is_set():
     # All done
     break
   time.sleep(1)
   continue

This save function will need to run in its own thread. stop_flag is an Event that gets set after you join your fetch threads.

From a high level, your application will look like this:

input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()

stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete

Solution 2

I have to come up with something similar producer-consumer. Producer generates an 'id' and consumer consumes that id to do some url fetch and processing it to it. Here is my skeleton code which does that


    import Queue
    import random
    import threading
    import time
    import sys

    data_queue = Queue.Queue()
    lock = threading.Lock()

    def gcd(a, b):
        while b != 0:
            a,b = b, a%b

        return b

    def consumer(idnum):
        while True:
            try:
                data = data_queue.get(block=False)
            except Exception, e:
               print 'Exception ' + str(e)
            else:
                with lock:
                    print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))

            time.sleep(1)
            data_queue.task_done()

    def producer(idnum, count):
        for i in range(count):
            a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
            with lock:
                print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
            data_queue.put((a,b))
            time.sleep(0.5)

    if __name__ == '__main__':
        num_producers = 1
        num_consumers = 2
        num_integer_pairs = 10

        for i in range(num_consumers):
            t = threading.Thread(target=consumer, args=(i,))
            t.daemon = True
            t.start()

        threads = []
        for ii in range(num_producers):
            thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
            threads.append(thread)
            thread.start()

        # wait for the producers threads to finish
        for thread in threads:
            thread.join()
        print 'done with producer threads'

        # wait till all the jobs are done in the queue
        data_queue.join()

        with lock:
            print 'all consumer threads finished'

        with lock:
            print 'main thread exited'
Share:
10,686
Mithril
Author by

Mithril

I just want to get the badge....

Updated on June 28, 2022

Comments

  • Mithril
    Mithril almost 2 years

    I am using threading and Queue to fetch url and store to database.
    I just want one thread to do storing job.
    so I write code as below:

    import threading
    import time
    
    import Queue
    
    site_count = 10
    
    fetch_thread_count = 2
    
    site_queue = Queue.Queue()
    proxy_array=[]        
    
    
    class FetchThread(threading.Thread):
        def __init__(self,site_queue,proxy_array):
            threading.Thread.__init__(self)
            self.site_queue = site_queue
            self.proxy_array = proxy_array
        def run(self):
            while True:
                index = self.site_queue.get()
                self.get_proxy_one_website(index)
                self.site_queue.task_done()
        def get_proxy_one_website(self,index):
            print '{0} fetched site :{1}\n'.format(self.name,index)
            self.proxy_array.append(index)
    
    
    def save():
        while True:
            if site_queue.qsize() > 0:
                if len(proxy_array) > 10:
                    print 'save :{0}  to database\n'.format(proxy_array.pop())
    
                else:
                    time.sleep(1)
            elif len(proxy_array) > 0:
                print 'save :{0} to database\n'.format(proxy_array.pop())
    
            elif len(proxy_array) == 0:
                print 'break'
                break
            else:
                print 'continue'
                continue
    
    def start_crawl():
        global site_count,fetch_thread_count,site_queue,proxy_array
        print 'init'
        for i in range(fetch_thread_count):
            ft = FetchThread(site_queue,proxy_array)
            ft.setDaemon(True)
            ft.start()
    
        print 'put site_queue'
        for i in range(site_count):
            site_queue.put(i)
    
        save()
    
        print 'start site_queue join'
        site_queue.join()
        print 'finish'
    
    start_crawl()
    

    excuted output:

    init
    put site_queue
    Thread-1 fetched site :0
    
    Thread-2 fetched site :1
    
    Thread-1 fetched site :2
    
    Thread-2 fetched site :3
    
    Thread-1 fetched site :4
    
    Thread-2 fetched site :5
    
    Thread-1 fetched site :6
    
    Thread-2 fetched site :7
    
    Thread-1 fetched site :8
    
    Thread-2 fetched site :9
    
    save :9 to database
    
    save :8 to database
    
    save :7 to database
    
    save :6 to database
    
    save :5 to database
    
    save :4 to database
    
    save :3 to database
    
    save :2 to database
    
    save :1 to database
    
    save :0 to database
    
    break
    start site_queue join
    finish
    [Finished in 1.2s]
    

    Why save() function run after site_queue.join() which written after save().
    I also have substituted save() with a thread function ,but it doesn't work too.
    Does it mean I must change proxy_array=[] to proxy_queue=Queue.Queue() ,then I can use theading to store data?
    I just want one thead to do this,and there is not any other theads would get data from proxy_array , why should I join it?Using Queue seems very weird.
    Is there any better solusion?

    UPDATE:
    I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster. I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):

    Thread-2 fetched site :1
    
    Thread-1 fetched site :2
    
    save :0 to database
    
    Thread-2 fetched site :3
    
    Thread-1 fetched site :4
    
    save :2 to database
    
    save :3 to database
    
    
    Thread-2 fetched site :5
    
    Thread-1 fetched site :6
    
    save :4 to database
    .......
    

    UPDATE2 for someone has same question as below:

    question:
    As I saying as above context,there is not any other theads would get data from proxy_array.
    I just can not imagine why it would break thread-safe?

    answer:
    producer-consumer problem in misha's answer, I understand after reading it carefully.


    question:
    And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)

    this is what I cannot figure out,I would update after finded the answer.