Using batch insert module for media, calculating chunk size based on item size.
This commit is contained in:
@@ -23,7 +23,7 @@ const logger = require('./logger')(__filename);
|
|||||||
const argv = require('./argv');
|
const argv = require('./argv');
|
||||||
const knex = require('./knex');
|
const knex = require('./knex');
|
||||||
const http = require('./utils/http');
|
const http = require('./utils/http');
|
||||||
const bulkInsert = require('./utils/bulk-insert');
|
const batchInsert = require('./utils/batch-insert');
|
||||||
const chunk = require('./utils/chunk');
|
const chunk = require('./utils/chunk');
|
||||||
const { get } = require('./utils/qu');
|
const { get } = require('./utils/qu');
|
||||||
const { fetchEntityReleaseIds } = require('./entity-releases');
|
const { fetchEntityReleaseIds } = require('./entity-releases');
|
||||||
@@ -924,7 +924,8 @@ async function storeMedias(baseMedias, options) {
|
|||||||
const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry);
|
const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await bulkInsert('media', newMediaEntries, false);
|
console.log('NEW MEDIA ENTRIES', newMediaEntries);
|
||||||
|
await batchInsert('media', newMediaEntries, { confict: false });
|
||||||
|
|
||||||
return [...newMediaWithEntries, ...existingHashMedias];
|
return [...newMediaWithEntries, ...existingHashMedias];
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -993,11 +994,11 @@ async function associateReleaseMedia(releases, type = 'release') {
|
|||||||
.filter(Boolean);
|
.filter(Boolean);
|
||||||
|
|
||||||
if (associations.length > 0) {
|
if (associations.length > 0) {
|
||||||
await bulkInsert(`${type}s_${role}`, associations, false);
|
await batchInsert(`${type}s_${role}`, associations, { conflict: false });
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.entries) {
|
if (error.entries) {
|
||||||
logger.error(util.inspect(error.entries, null, null, { color: true }));
|
logger.error(util.inspect(error.entries.slice(0, 2), null, null, { color: true }), `${Math.min(error.entries.length, 2)} of ${error.length}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.error(`Failed to store ${type} ${role}: ${error.message} (${error.detail || 'no detail'})`);
|
logger.error(`Failed to store ${type} ${role}: ${error.message} (${error.detail || 'no detail'})`);
|
||||||
|
|||||||
@@ -4,11 +4,12 @@ const knex = require('../knex');
|
|||||||
const chunk = require('./chunk');
|
const chunk = require('./chunk');
|
||||||
const logger = require('../logger')(__filename);
|
const logger = require('../logger')(__filename);
|
||||||
|
|
||||||
|
const chunkTarget = 50_000; // PostgreSQL allows 65,535 binding parameters, allow for a bit of margin
|
||||||
|
|
||||||
// improved version of bulkInsert
|
// improved version of bulkInsert
|
||||||
async function batchInsert(table, items, {
|
async function batchInsert(table, items, {
|
||||||
conflict = true,
|
conflict = true,
|
||||||
update = false,
|
update = false,
|
||||||
chunkSize = 1000,
|
|
||||||
concurrent = false,
|
concurrent = false,
|
||||||
transaction,
|
transaction,
|
||||||
commit = false,
|
commit = false,
|
||||||
@@ -17,6 +18,10 @@ async function batchInsert(table, items, {
|
|||||||
throw new Error('No table specified for batch insert');
|
throw new Error('No table specified for batch insert');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (conflict && update) {
|
||||||
|
throw new Error('Batch insert conflict must specify columns, or update must be disabled');
|
||||||
|
}
|
||||||
|
|
||||||
if (!Array.isArray(items)) {
|
if (!Array.isArray(items)) {
|
||||||
throw new Error('Batch insert items are not an array');
|
throw new Error('Batch insert items are not an array');
|
||||||
}
|
}
|
||||||
@@ -25,8 +30,20 @@ async function batchInsert(table, items, {
|
|||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
const chunks = chunk(items, chunkSize);
|
// PostgreSQL's bindings limit applies to individual values, so item size needs to be taken into account
|
||||||
|
const itemSize = items.reduce((acc, item) => Math.max(acc, Object.keys(item).length), 0);
|
||||||
|
|
||||||
|
if (itemSize === 0) {
|
||||||
|
throw new Error('Batch insert items are empty');
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunks = chunk(items, Math.floor(chunkTarget / itemSize));
|
||||||
const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true'
|
const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true'
|
||||||
|
|
||||||
|
if (conflicts.length > 0 && !update) {
|
||||||
|
throw new Error('Batch insert conflict columns must be specified together with update');
|
||||||
|
}
|
||||||
|
|
||||||
const trx = transaction || await knex.transaction();
|
const trx = transaction || await knex.transaction();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -49,12 +66,6 @@ async function batchInsert(table, items, {
|
|||||||
.onConflict(conflicts)
|
.onConflict(conflicts)
|
||||||
.merge();
|
.merge();
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('Batch insert conflict columns must be specified together with update');
|
|
||||||
}
|
|
||||||
|
|
||||||
if (conflict && update) {
|
|
||||||
throw new Error('Batch insert conflict must specify columns, or update must be disabled');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// error on any conflict
|
// error on any conflict
|
||||||
|
|||||||
Reference in New Issue
Block a user