How to use PostgreSQL in multi thread python program

10,373

Are you creating a connection for each thread? If you have multiple threads you need a connection for each one (or a pool with locking mechanisms around the connections) otherwise you will have all sorts of weird issues.

Which is why you would not have issues in multiprocessing, since each process will be creating its own connection.

Share:
10,373
Foad Tahmasebi
Author by

Foad Tahmasebi

Updated on June 27, 2022

Comments

  • Foad Tahmasebi
    Foad Tahmasebi almost 2 years

    I am using psycopg2 (2.6) connect to PostgreSQL database in a multi-threading python program.

    When queue size in program increase, select queries get error "no results to fetch", but inserts records to db works very well.

    example code:

    class Decoders(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            self.decode()
    
        def decode(self):
            queue = self.queue
            db = Database()
            while queue.qsize() > 0:    
                # calling db methods, just an example
                temp = queue.get()
                db.select_records()
                db.insert_record(temp)
    

    and:

    Decoders(queue).start()
    Decoders(queue).start()
    

    note: I don't have this problem with multiprocessing.

    Edit:

    When I start only one thread, the program doesn't have any problem.

    database class:

    class Database:
        db = object
        cursor = object
    
        def __init__(self):
            self.db = connect(host=conf_hostname,
                              database=conf_dbname,
                              user=conf_dbuser,
                              password=conf_dbpass,
                              port=conf_dbport)
            self.db.autocommit = True
            self.cursor = self.db.cursor()
    
        def select_records(self):
            self.cursor.execute(simple select)
            return self.cursor.fetchall()
    
    
        def insert_record(self, temp):
            # insert query