How to Interrupt/Stop/End a hanging multi-threaded python program

12,263

If you want to force all the threads to exit when the process exits, you can set the "daemon" flag of the thread to True before the thread is created.

http://docs.python.org/2/library/threading.html#threading.Thread.daemon

Share:
12,263
NRS
Author by

NRS

Updated on June 04, 2022

Comments

  • NRS
    NRS almost 2 years

    I have a python program that implements threads like this:

       class Mythread(threading.Thread):
            def __init__(self, name, q):
                threading.Thread.__init__(self)
                self.name = name
                self.q = q
    
            def run(self):
                print "Starting %s..." % (self.name)
                while True:
                    ## Get data from queue
                    data = self.q.get()
                    ## do_some_processing with data ###
                    process_data(data)
                    ## Mark Queue item as done
                    self.q.task_done()
                print "Exiting %s..." % (self.name)
    
    
        def call_threaded_program():
            ##Setup the threads. Define threads,queue,locks 
            threads = []    
            q = Queue.Queue()
            thread_count = n #some number
            data_list = [] #some data list containing data
    
            ##Create Threads
            for thread_id in range(1, thread_count+1):
                thread_name = "Thread-" + str(thread_id)
                thread = Mythread(thread_name,q)
                thread.daemon = True
                thread.start()
    
            ##Fill data in Queue
            for data_item in data_list:
                q.put(data_item)
    
            try:
                ##Wait for queue to be exhausted and then exit main program
                q.join()
            except (KeyboardInterrupt, SystemExit) as e:
                print "Interrupt Issued. Exiting Program with error state: %s"%(str(e))
                exit(1)
    

    The call_threaded_program() is called from a different program.

    I have the code working under normal circumstances. However if an error/exception occurs in one of the threads, then the program is stuck (as the queue join is infinitely blocking). The only way I am able to quit this program is to close the terminal itself.

    What is the best way to terminate this program when a thread bails out? Is there a clean (actually I would take any way) way of doing this? I know this question has been asked numerous times, but I am still unable to find a convincing answer. I would really appreciate any help.

    EDIT: I tried removing the join on the queue and used a global exit flag as suggested in Is there any way to kill a Thread in Python? However, Now the behavior is so strange, I can't comprehend what is going on.

        import threading
        import Queue
        import time
    
        exit_flag = False
    
        class Mythread (threading.Thread):
            def __init__(self,name,q):
                threading.Thread.__init__(self)
                self.name = name
                self.q = q
    
            def run(self):
                try:    
                    # Start Thread
                    print "Starting %s...."%(self.name)
                    # Do Some Processing
                    while not exit_flag:
                        data = self.q.get()
                        print "%s processing %s"%(self.name,str(data))
                        self.q.task_done()
                    # Exit thread
                    print "Exiting %s..."%(self.name)
                except Exception as e:
                    print "Exiting %s due to Error: %s"%(self.name,str(e))
    
    
        def main():
            global exit_flag
    
            ##Setup the threads. Define threads,queue,locks 
            threads = []    
            q = Queue.Queue()
            thread_count = 20
            data_list = range(1,50)
    
            ##Create Threads
            for thread_id in range(1,thread_count+1):
                thread_name = "Thread-" + str(thread_id)
                thread = Mythread(thread_name,q)
                thread.daemon = True
                threads.append(thread)
                thread.start()
    
            ##Fill data in Queue
            for data_item in data_list:
              q.put(data_item)
    
    
            try:
              ##Wait for queue to be exhausted and then exit main program
              while not q.empty():
                pass
    
              # Stop the threads
              exit_flag = True
    
              # Wait for threads to finish
              print "Waiting for threads to finish..."
              while threading.activeCount() > 1:
                print "Active Threads:",threading.activeCount()
                time.sleep(1)
                pass
    
              print "Finished Successfully"
            except (KeyboardInterrupt, SystemExit) as e:
              print "Interrupt Issued. Exiting Program with error state: %s"%(str(e))
    
    
         if __name__ == '__main__':
             main()
    

    The program's output is as below:

        #Threads get started correctly
        #The output also is getting processed but then towards the end, All i see are
        Active Threads: 16
        Active Threads: 16
        Active Threads: 16...
    

    The program then just hangs or keeps on printing the active threads. However since the exit flag is set to True, the thread's run method is not being exercised. So I have no clue as to how these threads are kept up or what is happening.

    EDIT: I found the problem. In the above code, thread's get method were blocking and hence unable to quit. Using a get method with a timeout instead did the trick. I have the code for just the run method that I modified below

        def run(self):
                try:
                       #Start Thread
                       printing "Starting %s..."%(self.name)
                       #Do Some processing
                       while not exit_flag:
                              try:
                                    data = self.q.get(True,self.timeout)
                                    print "%s processing %s"%(self.name,str(data))
                                    self.q.task_done()
                              except:
                                     print "Queue Empty or Timeout Occurred. Try Again for %s"%(self.name)
    
    
                        # Exit thread
                        print "Exiting %s..."%(self.name)
                 except Exception as e:
                    print "Exiting %s due to Error: %s"%(self.name,str(e))
    
  • NRS
    NRS almost 11 years
    Hi, I tried wrapping the thread to catch the exception and to stop the threads using a global exit flag. But the behavior now is so strange, it is beyond my comprehension. Please see edit above
  • NRS
    NRS almost 11 years
    I had already set the "daemon" flag for the thread. Please see above Code. But that was not the issue. The issue was that, the main process would not exit because it was waiting for the queue to be exhausted. However the queue would never exit because the threads died (and hence there is no one to consume the items in the queue). So, that caused the program to be stuck. But I think I figured the problem out (Please see edits)