How to close Threads in Python?

13,904

Solution 1

I do not see why you need a Queue in the first place.
After all in your design every thread just processes one task.
You should be able to pass that task to the thread on creation.
This way you do not need a Queue and you get rid of the while-loop:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

Then you can rid of the while-loop inside the run-method:

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5

    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"

    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()

        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

Create one thread for each task

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

and join them

for thread in threads:
    thread.join()

Solution 2

What I often do when my thread involves an infinite loop like this, is to change the condition to something I can control from the outside. For example like this:

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff

That way, I can change the keepRunning property from the outside and set it to false to gracefully terminate the thread the next time it checks the loop condition.

Btw. as you seem to spawn exactly one thread for each item you put into the queue, you don’t even need to have the threads loop at all, although I would argue that you should always enforce a maximum limit of threads that can be created in this way (i.e. for i in range(min(len(answers), MAX_THREAD_COUNT)):)

Alternative

In your case, instead of terminating the threads in each for-loop iteration, you could just reuse the threads. From what I gather from your thread’s source, all that makes a thread unique to an iteration is the id_domain property you set on its creation. You could however just provide that as well with your queue, so the threads are completely independent and you can reuse them.

This could look like this:

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False

Of course, you would need to change your thread a bit then:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff

Solution 3

Joining the threads will do the trick, but the joins in your case are blocking indefinitely because your threads aren't ever exiting your run loop. You need to exit the run method so that the threads can be joined.

Share:
13,904
user1610458
Author by

user1610458

Updated on June 04, 2022

Comments

  • user1610458
    user1610458 almost 2 years

    I have some issue with too many Threads unfinished. I think that queue command .join() just close queue and not the threads using it.

    In my script I need to check 280k domains and for each domain get list of his MX records and obtain an IPv6 address of servers if it has it.

    I used threads and thanks for them the script its many times faster. But there is a problem, although there is join() for the queue, number of alive threads is growing till an error occur that informs that cant create any new thread (limitation of OS?).

    How can I terminate/close/stop/reset threads after each For loop when I am retrieving new domain from database?

    Thread Class definition...

    class MX_getAAAA_thread(threading.Thread):
        def __init__(self,queue,id_domain):
            threading.Thread.__init__(self)
            self.queue = queue
            self.id_domain = id_domain
    
    
        def run(self):
            while True:
                self.mx = self.queue.get()
    
                res = dns.resolver.Resolver()
                res.lifetime = 1.5
                res.timeout = 0.5
    
                try:
                    answers = res.query(self.mx,'AAAA')
                    ip_mx = str(answers[0])
                except:
                    ip_mx = "N/A"
    
                lock.acquire()
    
                sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
                try:
                    cursor.execute(sql)
                    db.commit()
                except:
                    db.rollback()
    
                print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
    
                lock.release()
                self.queue.task_done()
    

    Thread class in use... (The main For-loop is not here, this is just part of his body)

    try:
        answers = resolver.query(domain, 'MX')
    
        qMX = Queue.Queue()
        for i in range(len(answers)):
            t = MX_getAAAA_thread(qMX,id_domain)
            t.setDaemon(True)
            threads.append(t)
            t.start()
    
        for mx in answers:
            qMX.put(mx.exchange)
    
        qMX.join()
    
    except NoAnswer as e:
        print "MX - Error: No Answer"
    except Timeout as etime:
        print "MX - Error: dns.exception.Timeout"
    
    print "end of script"
    

    I tried to:

    for thread in threads:
                thread.join()
    

    after the queue was done, but thread.join() never stops waiting, despite fact that there is no need to wait, because when queue.join() executes there is nothing to do for threads.

  • Frank Osterfeld
    Frank Osterfeld over 11 years
    Also, Queue.get without timeout will block forever as the queue stays empty.
  • user1610458
    user1610458 over 11 years
    How do you recommend exit the run loops? Maybe I dont fully understand how its working in run method... If Im not mistaken, in every example i seen, there was while true: loop. That is the issue why threads never exits run loop?
  • Loren Abrams
    Loren Abrams over 11 years
    I typically use a passive approach in this scenario. Ie, setting a stoppage flag on MX_getAAAA_thread and checking its value at different points in your loop. If it's set, break out of the loop or return from the run method. -- Answering your question: yes, the thread will run indefinitely until the code being executed completes (or the thread is killed)
  • Loren Abrams
    Loren Abrams over 11 years
    Not sure I like this solution much better. Why fan out so many threads? It's still terribly inefficient.
  • tzelleke
    tzelleke over 11 years
    @LorenAbrams, I agree -- but that's how the OP was doing it. This answer solves his immediate problem of never-ending threads --- Certainly it would be more efficient to create some limited pool of threads and send them tasks through a queue
  • user1610458
    user1610458 over 11 years
    it works without queue... thank you very much ;) but I was working till now on setting the stoppageFlag as suggested by Loren but I can not make it work.
  • tzelleke
    tzelleke over 11 years
    @user1610458 you should best go with the answer from poke -- his alternative approach -- IMHO that's how to do it
  • user1610458
    user1610458 over 11 years
    My whole project is to get IPv6 addresses. Input is 280 000 domains. I need from every domain: ■ AAAA record for "www." + domain ■ Glue record for "@ns.sk-nic.sk www." + domain ■ for every NS record, ask if it has AAAA record assigned ■ for every MX record, ask if it has AAAA record assigned So I ended with script that use threads after every domain selected from DB I think it could be done more effectively, but i dont know how. It could be done by paralell loading of domains and each one domain paralell answering for records. but it is too much complex for my beginner skills :)
  • user1610458
    user1610458 over 11 years
    "Certainly it would be more efficient to create some limited pool of threads and send them tasks through a queue" It is necessary to create limited pool of threads when I dont know how many threads I will need? Its good idea to do it, but when Im sure, that for one domain I will never need more then 50 for example. I think it is better to create as much as it is requested.
  • poke
    poke over 11 years
    @user1610458 It is generally not a good idea to create a separate thread for every action. Thread creation and destruction will come with an overhead that will definitely hurt your performance if you do it like that. Also note that a processor cannot efficiently handle an arbitrary amount of threads at the same time, and context switches are quite expensive, so it is best to keep the thread amount at a minimum. Also, Python is not good at multithreading because of its global interpreter lock, so you definitely want to avoid them if possible.
  • poke
    poke over 11 years
    @user1610458 That being said, you rely on network I/O, so of course it makes sense to use threads to keep your resources busy. And here you should change your understanding of your requirements; if one domain requires you to make 50 queries, this does not mean you need 50 threads. It just means that you have 50 independent tasks that need to be performed and can be off-loaded to other threads. Choosing the right number is difficult, but you definitely should choose a maximum number.