Postgresql ON CONFLICT in sqlalchemy

20,756

Solution 1

You don't need this, use exists condition to prevent inserting duplicates.

for example:

INSERT INTO table (unique_name) 
SELECT 'some_string'
WHERE NOT EXISTS(SELECT 1 FROM table WHERE unique_name = 'some_string')

you can also

INSERT INTO table (unique_name)
VALUES('some_string')
ON CONFLICT (unique_name) DO NOTHING

but if you need insert or update in single query, then this is example for you:

INSERT INTO distributors (did, dname)
VALUES (5, 'Gizmo Transglobal'), (6, 'Associated Computing, Inc')
ON CONFLICT (did) DO UPDATE SET dname = EXCLUDED.dname;

This is an example from PostgreSQL Documentation.

Solution 2

Using Postgres 9.6.1, sqlachemy 1.1.4, and psycopg2 2.6.2:

  1. Convert your data structure to a dictionary. From Pandas it is

    import pandas
    from sqlalchemy import MetaData
    from sqlalchemy.dialects.postgresql import insert
    import psycopg2
    
    # The dictionary should include all the values including index values
    insrt_vals = df.to_dict(orient='records')
    
  2. Connect to database through sqlalchemy . Instead try psycog2 driver underneath and the native COPY function, which bypasses all the postgres indexing.

    csv_data = os.path.realpath('test.csv')
    con = psycopg2.connect(database = 'db01', user = 'postgres')
    cur = con.cursor()
    cur.execute("\copy stamm_data from '%s' DELIMITER ';' csv header" % csv_data)
    con.commit()
    
  3. Execute

    results = engine.execute(do_nothing_stmt)
    # Get number of rows inserted
    rowcount = results.rowcount
    

Warning:

This method does not work with NaTs out of the box.

Everything together

tst_df = pd.DataFrame({'colA':['a','b','c','a','z', 'q'],
              'colB': pd.date_range(end=datetime.datetime.now() , periods=6),
              'colC' : ['a1','b2','c3','a4','z5', 'q6']})


insrt_vals = tst_df.to_dict(orient='records')
engine =      sqlalchemy.create_engine("postgresql://user:password@localhost/postgres")
connect = engine.connect()
meta = MetaData(bind=engine)
meta.reflect(bind=engine)
table = meta.tables['tstbl']
insrt_stmnt = insert(table).values(insrt_vals)

do_nothing_stmt  = insrt_stmnt.on_conflict_do_nothing(index_elements=['colA','colB'])
results = engine.execute(do_nothing_stmt)

Instead of step 2 and 3 , using psycog2 driver with the copy command in postgres is faster for larger files (approaching a gig) because it sets all the table indexing off.

csv_data = os.path.realpath('test.csv')

Solution 3

This works with Postgresql 9.5:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def prefix_inserts(insert, compiler, **kw):
    return compiler.visit_insert(insert, **kw) + " ON CONFLICT DO NOTHING"

I use it for bulk_insert_mappings. It does however not make ON CONFLICT DO NOTHING optional

Solution 4

This works with Postgresql 10.5 and Sqlalchemy 1.3.6:

from sqlalchemy.dialects.postgresql import insert


table_info = {
'tableTime': '',
'deploymentID': '',
'tableData': ''
}
insert_table = insert(Table).values(table_info)
insert_table_sql = insert_table.on_conflict_do_nothing(
  index_elements=['tableTime', 'deploymentID']
)
db.session.execute(insert_table_sql)
db.session.commit()

Solution 5

This is an extension of Niklas' answer.

Basically, uses a thread local state and a context manager to make appending the ON CONFLICT DO NOTHING optional. It's still a big hack though.

It also only hooks the postgres specific statement, and doesn't construct the sql query manually from text.

import threading
from contextlib import contextmanager

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
from sqlalchemy.dialects.postgresql.dml import OnConflictDoNothing

state = threading.local()

@contextmanager
def on_conflict_do_nothing():
  state.active = True
  yield
  del state.active

@compiles(Insert, 'postgresql')
def prefix_inserts(insert, compiler, **kw):
  if getattr(state, "active", False):
    insert._post_values_clause = OnConflictDoNothing()
  return compiler.visit_insert(insert, **kw)
Share:
20,756
puredevotion
Author by

puredevotion

student in business informatics, entrepreneur on the side.

Updated on July 09, 2022

Comments

  • puredevotion
    puredevotion almost 2 years

    I've read quite a few resources (ao. 1, 2) but I'm unable to get Postgresql's ON CONFLICT IGNORE behaviour working in sqlalchemy.

    I've used this accepted answer as a basis, but it gives

    SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append'
    

    I've tried adding the postgresql dialect to the @compile clause, renaming my object, but it doesn't work. I also tried to use the str(insert())+ " ON CONFILCT IGNORE" without results. (not surprising btw)

    How can I get the On CONFLICT IGNORE to get added to my inserts? I like the proposed solution, as I can see myself not wanting the IGNORE behaviour on each INSERT

    ps. using python 2.7 (don't mind upgrading to 3.4/3.5), latest sqlalchemy (1.x)