How to close Threads in Python?
Solution 1
I do not see why you need a Queue
in the first place.
After all in your design every thread just processes one task.
You should be able to pass that task to the thread on creation.
This way you do not need a Queue
and you get rid of the while
-loop:
class MX_getAAAA_thread(threading.Thread):
def __init__(self, id_domain, mx):
threading.Thread.__init__(self)
self.id_domain = id_domain
self.mx = mx
Then you can rid of the while
-loop inside the run
-method:
def run(self):
res = dns.resolver.Resolver()
res.lifetime = 1.5
res.timeout = 0.5
try:
answers = res.query(self.mx,'AAAA')
ip_mx = str(answers[0])
except:
ip_mx = "N/A"
with lock:
sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
Create one thread for each task
for mx in answers:
t = MX_getAAAA_thread(qMX, id_domain, mx)
t.setDaemon(True)
threads.append(t)
t.start()
and join them
for thread in threads:
thread.join()
Solution 2
What I often do when my thread involves an infinite loop like this, is to change the condition to something I can control from the outside. For example like this:
def run(self):
self.keepRunning = True
while self.keepRunning:
# do stuff
That way, I can change the keepRunning
property from the outside and set it to false to gracefully terminate the thread the next time it checks the loop condition.
Btw. as you seem to spawn exactly one thread for each item you put into the queue, you don’t even need to have the threads loop at all, although I would argue that you should always enforce a maximum limit of threads that can be created in this way (i.e. for i in range(min(len(answers), MAX_THREAD_COUNT)):
)
Alternative
In your case, instead of terminating the threads in each for-loop iteration, you could just reuse the threads. From what I gather from your thread’s source, all that makes a thread unique to an iteration is the id_domain
property you set on its creation. You could however just provide that as well with your queue, so the threads are completely independent and you can reuse them.
This could look like this:
qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
t = MX_getAAAA_thread(qMX)
t.daemon = True
threads.append(t)
t.start()
for id_domain in enumerateIdDomains():
answers = resolver.query(id_domain, 'MX')
for mx in answers:
qMX.put((id_domain, mx.exchange)) # insert a tuple
qMX.join()
for thread in threads:
thread.keepRunning = False
Of course, you would need to change your thread a bit then:
class MX_getAAAA_thread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
self.keepRunning = True
while self.keepRunning:
id_domain, mx = self.queue.get()
# do stuff
Solution 3
Joining the threads will do the trick, but the joins in your case are blocking indefinitely because your threads aren't ever exiting your run loop. You need to exit the run method so that the threads can be joined.
user1610458
Updated on June 04, 2022Comments
-
user1610458 almost 2 years
I have some issue with too many Threads unfinished. I think that queue command .join() just close queue and not the threads using it.
In my script I need to check 280k domains and for each domain get list of his MX records and obtain an IPv6 address of servers if it has it.
I used threads and thanks for them the script its many times faster. But there is a problem, although there is join() for the queue, number of alive threads is growing till an error occur that informs that cant create any new thread (limitation of OS?).
How can I terminate/close/stop/reset threads after each For loop when I am retrieving new domain from database?
Thread Class definition...
class MX_getAAAA_thread(threading.Thread): def __init__(self,queue,id_domain): threading.Thread.__init__(self) self.queue = queue self.id_domain = id_domain def run(self): while True: self.mx = self.queue.get() res = dns.resolver.Resolver() res.lifetime = 1.5 res.timeout = 0.5 try: answers = res.query(self.mx,'AAAA') ip_mx = str(answers[0]) except: ip_mx = "N/A" lock.acquire() sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')" try: cursor.execute(sql) db.commit() except: db.rollback() print "MX" , '>>' , ip_mx, ' :: ', str(self.mx) lock.release() self.queue.task_done()
Thread class in use... (The main For-loop is not here, this is just part of his body)
try: answers = resolver.query(domain, 'MX') qMX = Queue.Queue() for i in range(len(answers)): t = MX_getAAAA_thread(qMX,id_domain) t.setDaemon(True) threads.append(t) t.start() for mx in answers: qMX.put(mx.exchange) qMX.join() except NoAnswer as e: print "MX - Error: No Answer" except Timeout as etime: print "MX - Error: dns.exception.Timeout" print "end of script"
I tried to:
for thread in threads: thread.join()
after the queue was done, but thread.join() never stops waiting, despite fact that there is no need to wait, because when queue.join() executes there is nothing to do for threads.
-
Frank Osterfeld over 11 yearsAlso, Queue.get without timeout will block forever as the queue stays empty.
-
user1610458 over 11 yearsHow do you recommend exit the run loops? Maybe I dont fully understand how its working in run method... If Im not mistaken, in every example i seen, there was while true: loop. That is the issue why threads never exits run loop?
-
Loren Abrams over 11 yearsI typically use a passive approach in this scenario. Ie, setting a stoppage flag on MX_getAAAA_thread and checking its value at different points in your loop. If it's set, break out of the loop or return from the run method. -- Answering your question: yes, the thread will run indefinitely until the code being executed completes (or the thread is killed)
-
Loren Abrams over 11 yearsNot sure I like this solution much better. Why fan out so many threads? It's still terribly inefficient.
-
tzelleke over 11 years@LorenAbrams, I agree -- but that's how the OP was doing it. This answer solves his immediate problem of never-ending threads --- Certainly it would be more efficient to create some limited pool of threads and send them tasks through a queue
-
user1610458 over 11 yearsit works without queue... thank you very much ;) but I was working till now on setting the stoppageFlag as suggested by Loren but I can not make it work.
-
tzelleke over 11 years@user1610458 you should best go with the answer from poke -- his alternative approach -- IMHO that's how to do it
-
user1610458 over 11 yearsMy whole project is to get IPv6 addresses. Input is 280 000 domains. I need from every domain: ■ AAAA record for "www." + domain ■ Glue record for "@ns.sk-nic.sk www." + domain ■ for every NS record, ask if it has AAAA record assigned ■ for every MX record, ask if it has AAAA record assigned So I ended with script that use threads after every domain selected from DB I think it could be done more effectively, but i dont know how. It could be done by paralell loading of domains and each one domain paralell answering for records. but it is too much complex for my beginner skills :)
-
user1610458 over 11 years"Certainly it would be more efficient to create some limited pool of threads and send them tasks through a queue" It is necessary to create limited pool of threads when I dont know how many threads I will need? Its good idea to do it, but when Im sure, that for one domain I will never need more then 50 for example. I think it is better to create as much as it is requested.
-
poke over 11 years@user1610458 It is generally not a good idea to create a separate thread for every action. Thread creation and destruction will come with an overhead that will definitely hurt your performance if you do it like that. Also note that a processor cannot efficiently handle an arbitrary amount of threads at the same time, and context switches are quite expensive, so it is best to keep the thread amount at a minimum. Also, Python is not good at multithreading because of its global interpreter lock, so you definitely want to avoid them if possible.
-
poke over 11 years@user1610458 That being said, you rely on network I/O, so of course it makes sense to use threads to keep your resources busy. And here you should change your understanding of your requirements; if one domain requires you to make 50 queries, this does not mean you need 50 threads. It just means that you have 50 independent tasks that need to be performed and can be off-loaded to other threads. Choosing the right number is difficult, but you definitely should choose a maximum number.