How do I batch sql statements with package database/sql

66,855

Solution 1

Since the db.Exec function is variadic, one option (that actually does only make a single network roundtrip) is to construct the statement yourself and explode the arguments and pass them in.

Sample code:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", 
                        strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

In a simple test I ran, this solution is about 4 times faster at inserting 10,000 rows than the Begin, Prepare, Commit presented in the other answer - though the actual improvement will depend a lot on your individual setup, network latencies, etc.

Solution 2

If you’re using PostgreSQL then pq supports bulk imports.

Solution 3

Adapting Andrew's solution for PostgreSQL, which doesn't support the ? placeholder, the following works:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

Solution 4

Expanding on Avi Flax's answer, I needed an ON CONFLICT DO UPDATE clause in my INSERT.

The solution to this is to COPY to a temporary table (set to delete at the end of the transaction) then INSERT from the temporary table to the permanent table.

Here's the code I settled on:

func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
    tx, err := fdata.db.Begin()
    if err != nil {
        return errors.Wrap(err, "begin transaction")
    }
    txOK := false
    defer func() {
        if !txOK {
            tx.Rollback()
        }
    }()

    // The ON COMMIT DROP clause at the end makes sure that the table
    // is cleaned up at the end of the transaction.
    // While the "for{..} state machine" goroutine in charge of delayed
    // saving ensures this function is not running twice at any given time.
    _, err = tx.Exec(sqlFDataMakeTempTable)
    // CREATE TEMPORARY TABLE fstore_data_load
    // (map text NOT NULL, key text NOT NULL, data json)
    // ON COMMIT DROP
    if err != nil {
        return errors.Wrap(err, "create temporary table")
    }

    stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
    for key, val := range items {
        _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
        if err != nil {
            return errors.Wrap(err, "loading COPY data")
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return errors.Wrap(err, "flush COPY data")
    }
    err = stmt.Close()
    if err != nil {
        return errors.Wrap(err, "close COPY stmt")
    }

    _, err = tx.Exec(sqlFDataSetFromTemp)
    // INSERT INTO fstore_data (map, key, data)
    // SELECT map, key, data FROM fstore_data_load
    // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
    if err != nil {
        return errors.Wrap(err, "move from temporary to real table")
    }

    err = tx.Commit()
    if err != nil {
        return errors.Wrap(err, "commit transaction")
    }
    txOK = true
    return nil
}

Solution 5

In case anyone is using pgx (the supposed best Postgres driver in Golang), see this solution: https://github.com/jackc/pgx/issues/764#issuecomment-685249471

Share:
66,855

Related videos on Youtube

barnardh
Author by

barnardh

Updated on July 18, 2022

Comments

  • barnardh
    barnardh almost 2 years

    How do I batch sql statements with Go's database/sql package?

    In Java I would do it like this :

    // Create a prepared statement
    String sql = "INSERT INTO my_table VALUES(?)";
    PreparedStatement pstmt = connection.prepareStatement(sql);
    
    // Insert 10 rows of data
    for (int i=0; i<10; i++) {
        pstmt.setString(1, ""+i);
        pstmt.addBatch();
    }
    
    // Execute the batch
    int [] updateCounts = pstmt.executeBatch();
    

    How would I achieve the same in Go?

    • Stephen Weinberg
      Stephen Weinberg over 11 years
      I do not believe such a thing exists in the sql package.
  • nathj07
    nathj07 over 9 years
    hmm, that's an interesting approach. I kinda like it, would you be be able to post your test for this to show the benchmarking you did? I'm interested in this sort of solution as a way of reducing the number of transactions that are started.
  • TrippyD
    TrippyD about 9 years
    To be fair, you should use a prepared statement and a transaction then try it again, instead of begin/prepare/commit each time. That would be a more accurate comparison.
  • lucapette
    lucapette over 7 years
    Just learned github.com/go-sql-driver/mysql has an option called multiStatements that works pretty nicely
  • Xeoncross
    Xeoncross about 7 years
    Worth noting that this is X number of separate queries - it isn't a transaction so any (or multiple) of these repeats could fail while the rest go through. It also doesn't protect against SQL injections.
  • Andrew C
    Andrew C about 7 years
    @Xeoncross the way I wrote it here, you're correct that they're different transactions. But I believe you can just as easily do db.Begin(); <code from above but with tx.Exec>; db.Commit() to run it all as one transaction. Also, this is safe against SQL injection because it uses the ? placeholders.
  • j boschiero
    j boschiero over 6 years
    I used this answer to build something and discovered that, as of this writing, there appears to be a limit of 2^16-1 (65,535) for placeholders in MySQL. I ended up running multiple inserts inside a loop (roughly 10,000 rows inserted at a time) just to be on the safe side.
  • Jona Rodrigues
    Jona Rodrigues over 6 years
    Nice snippet ! Isn't this open to SQL injection ?
  • Nathan Hyland
    Nathan Hyland over 6 years
    I did the same as @jboschiero . The only issue I'm having now is, if the batch is 100 lines (arbitrary num), and line 50 is a duplicate, I need to know that line 50 failed, but still insert lines 1-49 without rolling it back, which happens with this method.
  • MasterCarl
    MasterCarl over 6 years
    I don't see the risk, the string interpolation only uses the incrementing i parameter (valueStrings will be ($1, $2, $3),($4, $5, $6),...). The ExampleRowStructs are passed only to db.Exec, where the database driver takes care of substituting the placeholders.
  • mh-cbon
    mh-cbon about 5 years
    using sqlx you can rebind all the placeholders for a given dialect. github.com/jmoiron/sqlx/blob/master/bind.go#L44 i guess it is battle tested.
  • amwill04
    amwill04 over 4 years
    Just incase someone comes across this answer and doesnt realise (like me) the syntax (?, ?, ?) is MySql specfic and needs to be changed to ($1, $2, $3), ... ($n, $n+1 $n+2) for Postgresql.
  • user3067170
    user3067170 over 3 years
    @AndrewC i have tried same in mysql but the statement is coming as INSERT INTO (a,b,c) VALUES (?,?,?),(?,?,?) its not replacing the ? any suggestions ?
  • Avishay28
    Avishay28 about 2 years
    Is there a way to support nullable fields in this approach? i.e one of the values (or both) post.Column2, post.Column3 can be nil