Added batch insert util to replace bulk insert. Fixed circular dependencies.

This commit is contained in:
DebaucheryLibrarian
2026-02-25 01:09:49 +01:00
parent 9f37f54634
commit e77ced44c7
6 changed files with 260 additions and 87 deletions

View File

@@ -8,6 +8,7 @@ const argv = require('./argv');
const knex = require('./knex');
const { deleteScenes, deleteMovies, deleteSeries } = require('./releases');
const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve');
const { fetchEntityReleaseIds } = require('./entity-releases');
const getRecursiveParameters = require('./utils/get-recursive-parameters');
function getRecursiveParent(entity) {
@@ -373,86 +374,6 @@ async function searchEntities(query, type, limit) {
return curateEntities(entities);
}
async function fetchEntityReleaseIds(networkSlugs = [], channelSlugs = []) {
const entityQuery = knex
.withRecursive('selected_entities', knex.raw(`
SELECT entities.*
FROM entities
WHERE
entities.slug = ANY(:networkSlugs)
AND entities.type = 'network'
OR (entities.slug = ANY(:channelSlugs)
AND entities.type = 'channel')
UNION ALL
SELECT entities.*
FROM entities
INNER JOIN selected_entities ON selected_entities.id = entities.parent_id
`, {
networkSlugs,
channelSlugs,
}));
const sceneIds = await entityQuery
.clone()
.select('releases.id')
.distinct('releases.id')
.from('selected_entities')
.leftJoin('releases', 'releases.entity_id', 'selected_entities.id')
.whereNotNull('releases.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('effective_date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('effective_date', '<=', argv.flushBefore);
}
})
.pluck('releases.id');
const movieIds = await entityQuery
.clone()
.select('movies.id')
.distinct('movies.id')
.from('selected_entities')
.leftJoin('movies', 'movies.entity_id', 'selected_entities.id')
.whereNotNull('movies.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('effective_date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('effective_date', '<=', argv.flushBefore);
}
})
.pluck('movies.id');
const serieIds = await entityQuery
.clone()
.select('series.id')
.distinct('series.id')
.from('selected_entities')
.leftJoin('series', 'series.entity_id', 'selected_entities.id')
.whereNotNull('series.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('date', '<=', argv.flushBefore);
}
})
.pluck('series.id');
return {
sceneIds,
movieIds,
serieIds,
};
}
async function flushEntities(networkSlugs = [], channelSlugs = [], flushOrphanedMedia) {
const { sceneIds, movieIds, serieIds } = await fetchEntityReleaseIds(networkSlugs, channelSlugs);
const entitySlugs = networkSlugs.concat(channelSlugs).join(', ');

88
src/entity-releases.js Normal file
View File

@@ -0,0 +1,88 @@
'use strict';
const knex = require('./knex');
const argv = require('./argv');
async function fetchEntityReleaseIds(networkSlugs = [], channelSlugs = []) {
const entityQuery = knex
.withRecursive('selected_entities', knex.raw(`
SELECT entities.*
FROM entities
WHERE
entities.slug = ANY(:networkSlugs)
AND entities.type = 'network'
OR (entities.slug = ANY(:channelSlugs)
AND entities.type = 'channel')
UNION ALL
SELECT entities.*
FROM entities
INNER JOIN selected_entities ON selected_entities.id = entities.parent_id
`, {
networkSlugs,
channelSlugs,
}));
const sceneIds = await entityQuery
.clone()
.select('releases.id')
.distinct('releases.id')
.from('selected_entities')
.leftJoin('releases', 'releases.entity_id', 'selected_entities.id')
.whereNotNull('releases.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('effective_date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('effective_date', '<=', argv.flushBefore);
}
})
.pluck('releases.id');
const movieIds = await entityQuery
.clone()
.select('movies.id')
.distinct('movies.id')
.from('selected_entities')
.leftJoin('movies', 'movies.entity_id', 'selected_entities.id')
.whereNotNull('movies.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('effective_date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('effective_date', '<=', argv.flushBefore);
}
})
.pluck('movies.id');
const serieIds = await entityQuery
.clone()
.select('series.id')
.distinct('series.id')
.from('selected_entities')
.leftJoin('series', 'series.entity_id', 'selected_entities.id')
.whereNotNull('series.id')
.modify((builder) => {
if (argv.flushAfter) {
builder.where('date', '>=', argv.flushAfter);
}
if (argv.flushBefore) {
builder.where('date', '<=', argv.flushBefore);
}
})
.pluck('series.id');
return {
sceneIds,
movieIds,
serieIds,
};
}
module.exports = {
fetchEntityReleaseIds,
};

View File

@@ -26,7 +26,7 @@ const http = require('./utils/http');
const bulkInsert = require('./utils/bulk-insert');
const chunk = require('./utils/chunk');
const { get } = require('./utils/qu');
const { fetchEntityReleaseIds } = require('./entities');
const { fetchEntityReleaseIds } = require('./entity-releases');
// const pipeline = util.promisify(stream.pipeline);
const streamQueue = taskQueue();

View File

@@ -6,7 +6,7 @@ const knex = require('./knex');
const { fetchEntityReleaseIds } = require('./entities');
const slugify = require('./utils/slugify');
const bulkInsert = require('./utils/bulk-insert');
const batchInsert = require('./utils/batch-insert');
function curateTagMedia(media) {
if (!media) {
@@ -161,7 +161,7 @@ async function associateReleaseTags(releases, type = 'release') {
const tagAssociations = buildReleaseTagAssociations(releases, tagIdsBySlug, entityTagIdsByEntityId, type);
await bulkInsert(`${type}s_tags`, tagAssociations, false);
await batchInsert(`${type}s_tags`, tagAssociations, { conflict: false });
}
async function fetchTag(tagId) {
@@ -200,15 +200,19 @@ async function reassociateTagEntries(tagEntries, rematch) {
})).filter((tagEntry) => tagEntry.tag_id);
if (updatedTagEntries.length > 0) {
// TODO: prevent wiping tags if insert fails
await knex('releases_tags')
const trx = await knex.transaction();
await trx('releases_tags')
.whereIn('id', updatedTagEntries.map((tagEntry) => tagEntry.id))
.delete();
await bulkInsert('releases_tags', updatedTagEntries.map((tagEntry) => ({
await batchInsert('releases_tags', updatedTagEntries.map((tagEntry) => ({
...tagEntry,
id: undefined,
})), true);
})), {
conflict: false,
transaction: trx,
});
}
logger.info(`Updated ${updatedTagEntries.length} tags in ${new Set(updatedTagEntries.map((tagEntry) => tagEntry.release_id)).size} scenes`);

64
src/tools/batch-test.js Normal file
View File

@@ -0,0 +1,64 @@
'use strict';
const knex = require('../knex');
const batchInsert = require('../utils/batch-insert');
async function createTestTable() {
const tableExists = await knex.schema.hasTable('batch_test');
if (tableExists) {
// await knex('batch_test').delete();
return;
}
await knex.schema.createTable('batch_test', (table) => {
table.increments('id');
table.string('name')
.unique();
table.integer('age');
table.text('location');
table.datetime('created_at')
.notNullable()
.defaultTo(knex.fn.now());
});
}
async function init() {
await createTestTable();
const transaction = await knex.transaction();
const entries = await batchInsert('batch_test', [
{
name: 'John',
age: 18,
location: 'Home',
},
{
name: 'Jack',
age: 38,
location: 'Work',
},
{
name: 'James',
age: 35,
location: 'Club',
},
], {
conflict: 'name',
update: true,
transaction,
commit: false,
});
await transaction.commit();
console.log('ENTRIES', entries);
// await knex.schema.dropTable('batch_test');
await knex.destroy();
}
init();

96
src/utils/batch-insert.js Executable file
View File

@@ -0,0 +1,96 @@
'use strict';
const knex = require('../knex');
const chunk = require('./chunk');
const logger = require('../logger')(__filename);
// improved version of bulkInsert
async function batchInsert(table, items, {
conflict = true,
update = false,
chunkSize = 1000,
concurrent = false,
transaction,
commit = false,
} = {}) {
if (!table) {
throw new Error('No table specified for batch insert');
}
if (!Array.isArray(items)) {
throw new Error('Batch insert items are not an array');
}
if (items.length === 0) {
return [];
}
const chunks = chunk(items, chunkSize);
const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true'
const trx = transaction || await knex.transaction();
try {
const queries = chunks.map((chunkItems) => {
const query = trx(table)
.insert(chunkItems)
.returning('*');
if (conflicts.length > 0) {
if (Array.isArray(update)) {
// udpate specified
return query
.onConflict(conflicts)
.merge(update);
}
if (update) {
// update all
return query
.onConflict(conflicts)
.merge();
}
throw new Error('Batch insert conflict columns must be specified together with update');
}
if (conflict && update) {
throw new Error('Batch insert conflict must specify columns, or update must be disabled');
}
// error on any conflict
if (conflict) {
return query;
}
// ignore duplicates, keep old entries as-is
return query
.onConflict()
.ignore();
});
const results = concurrent
? await Promise.all(queries)
: await queries.reduce(async (chain, query) => {
const acc = await chain;
const result = await query;
return acc.concat(result);
}, Promise.resolve([]));
if (!transaction || commit) {
await trx.commit();
}
return results;
} catch (error) {
if (!transaction || commit) {
await trx.rollback();
}
logger.error(`Failed batch insert: ${error.message} (${error.detail})`);
throw error;
}
}
module.exports = batchInsert;