traxxx/src/utils/bulk-insert.js

42 lines
1.2 KiB
JavaScript
Executable File

'use strict';
const knex = require('../knex');
const chunk = require('./chunk');
const logger = require('../logger')(__filename);
async function bulkUpsert(table, items, conflict, update = true, chunkSize) {
if (items.length === 0) {
return [];
}
const updated = (conflict === false && ':query ON CONFLICT DO NOTHING RETURNING *;')
|| (conflict && update && `
:query ON CONFLICT (${conflict})
DO UPDATE SET ${(update === true
? Object.keys(items[0]) // derive updating props
: update)
.reduce((acc, prop, index) => `${acc}${index > 0 ? ',' : ''}\n${prop} = ${conflict.includes(prop) ? table : 'EXCLUDED'}.${prop}`, '')}
RETURNING *;
`);
return knex.transaction(async (transaction) => {
const chunked = chunk(items, chunkSize);
const queries = chunked
.map((chunkItems) => knex.raw(updated || ':query RETURNING *;', {
query: knex(table).insert(chunkItems),
}).transacting(transaction));
try {
const responses = await Promise.all(queries);
return responses.flat().map((response) => response.rows).flat();
} catch (error) {
logger.error(`Failed bulk insert: ${error.message} (${error.detail})`);
throw error;
}
});
}
module.exports = bulkUpsert;