diff --git a/src/media.js b/src/media.js index 91dde530..98dc4cc3 100755 --- a/src/media.js +++ b/src/media.js @@ -23,7 +23,7 @@ const logger = require('./logger')(__filename); const argv = require('./argv'); const knex = require('./knex'); const http = require('./utils/http'); -const bulkInsert = require('./utils/bulk-insert'); +const batchInsert = require('./utils/batch-insert'); const chunk = require('./utils/chunk'); const { get } = require('./utils/qu'); const { fetchEntityReleaseIds } = require('./entity-releases'); @@ -924,7 +924,8 @@ async function storeMedias(baseMedias, options) { const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry); try { - await bulkInsert('media', newMediaEntries, false); + console.log('NEW MEDIA ENTRIES', newMediaEntries); + await batchInsert('media', newMediaEntries, { confict: false }); return [...newMediaWithEntries, ...existingHashMedias]; } catch (error) { @@ -993,11 +994,11 @@ async function associateReleaseMedia(releases, type = 'release') { .filter(Boolean); if (associations.length > 0) { - await bulkInsert(`${type}s_${role}`, associations, false); + await batchInsert(`${type}s_${role}`, associations, { conflict: false }); } } catch (error) { if (error.entries) { - logger.error(util.inspect(error.entries, null, null, { color: true })); + logger.error(util.inspect(error.entries.slice(0, 2), null, null, { color: true }), `${Math.min(error.entries.length, 2)} of ${error.length}`); } logger.error(`Failed to store ${type} ${role}: ${error.message} (${error.detail || 'no detail'})`); diff --git a/src/utils/batch-insert.js b/src/utils/batch-insert.js index ff5e975c..2b8dddf0 100755 --- a/src/utils/batch-insert.js +++ b/src/utils/batch-insert.js @@ -4,11 +4,12 @@ const knex = require('../knex'); const chunk = require('./chunk'); const logger = require('../logger')(__filename); +const chunkTarget = 50_000; // PostgreSQL allows 65,535 binding parameters, allow for a bit of margin + // improved version of bulkInsert async function batchInsert(table, items, { conflict = true, update = false, - chunkSize = 1000, concurrent = false, transaction, commit = false, @@ -17,6 +18,10 @@ async function batchInsert(table, items, { throw new Error('No table specified for batch insert'); } + if (conflict && update) { + throw new Error('Batch insert conflict must specify columns, or update must be disabled'); + } + if (!Array.isArray(items)) { throw new Error('Batch insert items are not an array'); } @@ -25,8 +30,20 @@ async function batchInsert(table, items, { return []; } - const chunks = chunk(items, chunkSize); + // PostgreSQL's bindings limit applies to individual values, so item size needs to be taken into account + const itemSize = items.reduce((acc, item) => Math.max(acc, Object.keys(item).length), 0); + + if (itemSize === 0) { + throw new Error('Batch insert items are empty'); + } + + const chunks = chunk(items, Math.floor(chunkTarget / itemSize)); const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true' + + if (conflicts.length > 0 && !update) { + throw new Error('Batch insert conflict columns must be specified together with update'); + } + const trx = transaction || await knex.transaction(); try { @@ -49,12 +66,6 @@ async function batchInsert(table, items, { .onConflict(conflicts) .merge(); } - - throw new Error('Batch insert conflict columns must be specified together with update'); - } - - if (conflict && update) { - throw new Error('Batch insert conflict must specify columns, or update must be disabled'); } // error on any conflict