'use strict'; const knex = require('../knex'); const chunk = require('./chunk'); const logger = require('../logger')(__filename); const chunkTarget = 50_000; // PostgreSQL allows 65,535 binding parameters, allow for a bit of margin // improved version of bulkInsert async function batchInsert(table, items, { conflict = true, update = false, concurrent = false, transaction, commit = false, } = {}) { if (!table) { throw new Error('No table specified for batch insert'); } if (conflict && update) { throw new Error('Batch insert conflict must specify columns, or update must be disabled'); } if (!Array.isArray(items)) { throw new Error('Batch insert items are not an array'); } if (items.length === 0) { return []; } // PostgreSQL's bindings limit applies to individual values, so item size needs to be taken into account const itemSize = items.reduce((acc, item) => Math.max(acc, Object.keys(item).length), 0); if (itemSize === 0) { throw new Error('Batch insert items are empty'); } const chunks = chunk(items, Math.floor(chunkTarget / itemSize)); const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true' if (conflicts.length > 0 && !update) { throw new Error('Batch insert conflict columns must be specified together with update'); } const trx = transaction || await knex.transaction(); try { const queries = chunks.map((chunkItems) => { const query = trx(table) .insert(chunkItems) .returning('*'); if (conflicts.length > 0) { if (Array.isArray(update)) { // udpate specified return query .onConflict(conflicts) .merge(update); } if (update) { // update all return query .onConflict(conflicts) .merge(); } } // error on any conflict if (conflict) { return query; } // ignore duplicates, keep old entries as-is return query .onConflict() .ignore(); }); const results = concurrent ? await Promise.all(queries) : await queries.reduce(async (chain, query) => { const acc = await chain; const result = await query; return acc.concat(result); }, Promise.resolve([])); if (!transaction || commit) { await trx.commit(); } return results; } catch (error) { if (!transaction || commit) { await trx.rollback(); } logger.error(`Failed batch insert: ${error.message} (${error.detail})`); throw error; } } module.exports = batchInsert;