diff --git a/src/entities.js b/src/entities.js index 788510d8..0f44824e 100755 --- a/src/entities.js +++ b/src/entities.js @@ -8,6 +8,7 @@ const argv = require('./argv'); const knex = require('./knex'); const { deleteScenes, deleteMovies, deleteSeries } = require('./releases'); const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve'); +const { fetchEntityReleaseIds } = require('./entity-releases'); const getRecursiveParameters = require('./utils/get-recursive-parameters'); function getRecursiveParent(entity) { @@ -373,86 +374,6 @@ async function searchEntities(query, type, limit) { return curateEntities(entities); } -async function fetchEntityReleaseIds(networkSlugs = [], channelSlugs = []) { - const entityQuery = knex - .withRecursive('selected_entities', knex.raw(` - SELECT entities.* - FROM entities - WHERE - entities.slug = ANY(:networkSlugs) - AND entities.type = 'network' - OR (entities.slug = ANY(:channelSlugs) - AND entities.type = 'channel') - UNION ALL - SELECT entities.* - FROM entities - INNER JOIN selected_entities ON selected_entities.id = entities.parent_id - `, { - networkSlugs, - channelSlugs, - })); - - const sceneIds = await entityQuery - .clone() - .select('releases.id') - .distinct('releases.id') - .from('selected_entities') - .leftJoin('releases', 'releases.entity_id', 'selected_entities.id') - .whereNotNull('releases.id') - .modify((builder) => { - if (argv.flushAfter) { - builder.where('effective_date', '>=', argv.flushAfter); - } - - if (argv.flushBefore) { - builder.where('effective_date', '<=', argv.flushBefore); - } - }) - .pluck('releases.id'); - - const movieIds = await entityQuery - .clone() - .select('movies.id') - .distinct('movies.id') - .from('selected_entities') - .leftJoin('movies', 'movies.entity_id', 'selected_entities.id') - .whereNotNull('movies.id') - .modify((builder) => { - if (argv.flushAfter) { - builder.where('effective_date', '>=', argv.flushAfter); - } - - if (argv.flushBefore) { - builder.where('effective_date', '<=', argv.flushBefore); - } - }) - .pluck('movies.id'); - - const serieIds = await entityQuery - .clone() - .select('series.id') - .distinct('series.id') - .from('selected_entities') - .leftJoin('series', 'series.entity_id', 'selected_entities.id') - .whereNotNull('series.id') - .modify((builder) => { - if (argv.flushAfter) { - builder.where('date', '>=', argv.flushAfter); - } - - if (argv.flushBefore) { - builder.where('date', '<=', argv.flushBefore); - } - }) - .pluck('series.id'); - - return { - sceneIds, - movieIds, - serieIds, - }; -} - async function flushEntities(networkSlugs = [], channelSlugs = [], flushOrphanedMedia) { const { sceneIds, movieIds, serieIds } = await fetchEntityReleaseIds(networkSlugs, channelSlugs); const entitySlugs = networkSlugs.concat(channelSlugs).join(', '); diff --git a/src/entity-releases.js b/src/entity-releases.js new file mode 100644 index 00000000..324d1da6 --- /dev/null +++ b/src/entity-releases.js @@ -0,0 +1,88 @@ +'use strict'; + +const knex = require('./knex'); +const argv = require('./argv'); + +async function fetchEntityReleaseIds(networkSlugs = [], channelSlugs = []) { + const entityQuery = knex + .withRecursive('selected_entities', knex.raw(` + SELECT entities.* + FROM entities + WHERE + entities.slug = ANY(:networkSlugs) + AND entities.type = 'network' + OR (entities.slug = ANY(:channelSlugs) + AND entities.type = 'channel') + UNION ALL + SELECT entities.* + FROM entities + INNER JOIN selected_entities ON selected_entities.id = entities.parent_id + `, { + networkSlugs, + channelSlugs, + })); + + const sceneIds = await entityQuery + .clone() + .select('releases.id') + .distinct('releases.id') + .from('selected_entities') + .leftJoin('releases', 'releases.entity_id', 'selected_entities.id') + .whereNotNull('releases.id') + .modify((builder) => { + if (argv.flushAfter) { + builder.where('effective_date', '>=', argv.flushAfter); + } + + if (argv.flushBefore) { + builder.where('effective_date', '<=', argv.flushBefore); + } + }) + .pluck('releases.id'); + + const movieIds = await entityQuery + .clone() + .select('movies.id') + .distinct('movies.id') + .from('selected_entities') + .leftJoin('movies', 'movies.entity_id', 'selected_entities.id') + .whereNotNull('movies.id') + .modify((builder) => { + if (argv.flushAfter) { + builder.where('effective_date', '>=', argv.flushAfter); + } + + if (argv.flushBefore) { + builder.where('effective_date', '<=', argv.flushBefore); + } + }) + .pluck('movies.id'); + + const serieIds = await entityQuery + .clone() + .select('series.id') + .distinct('series.id') + .from('selected_entities') + .leftJoin('series', 'series.entity_id', 'selected_entities.id') + .whereNotNull('series.id') + .modify((builder) => { + if (argv.flushAfter) { + builder.where('date', '>=', argv.flushAfter); + } + + if (argv.flushBefore) { + builder.where('date', '<=', argv.flushBefore); + } + }) + .pluck('series.id'); + + return { + sceneIds, + movieIds, + serieIds, + }; +} + +module.exports = { + fetchEntityReleaseIds, +}; diff --git a/src/media.js b/src/media.js index 79d3399c..47d5f7cf 100755 --- a/src/media.js +++ b/src/media.js @@ -26,7 +26,7 @@ const http = require('./utils/http'); const bulkInsert = require('./utils/bulk-insert'); const chunk = require('./utils/chunk'); const { get } = require('./utils/qu'); -const { fetchEntityReleaseIds } = require('./entities'); +const { fetchEntityReleaseIds } = require('./entity-releases'); // const pipeline = util.promisify(stream.pipeline); const streamQueue = taskQueue(); diff --git a/src/tags.js b/src/tags.js index b6e4b26b..0323c485 100755 --- a/src/tags.js +++ b/src/tags.js @@ -6,7 +6,7 @@ const knex = require('./knex'); const { fetchEntityReleaseIds } = require('./entities'); const slugify = require('./utils/slugify'); -const bulkInsert = require('./utils/bulk-insert'); +const batchInsert = require('./utils/batch-insert'); function curateTagMedia(media) { if (!media) { @@ -161,7 +161,7 @@ async function associateReleaseTags(releases, type = 'release') { const tagAssociations = buildReleaseTagAssociations(releases, tagIdsBySlug, entityTagIdsByEntityId, type); - await bulkInsert(`${type}s_tags`, tagAssociations, false); + await batchInsert(`${type}s_tags`, tagAssociations, { conflict: false }); } async function fetchTag(tagId) { @@ -200,15 +200,19 @@ async function reassociateTagEntries(tagEntries, rematch) { })).filter((tagEntry) => tagEntry.tag_id); if (updatedTagEntries.length > 0) { - // TODO: prevent wiping tags if insert fails - await knex('releases_tags') + const trx = await knex.transaction(); + + await trx('releases_tags') .whereIn('id', updatedTagEntries.map((tagEntry) => tagEntry.id)) .delete(); - await bulkInsert('releases_tags', updatedTagEntries.map((tagEntry) => ({ + await batchInsert('releases_tags', updatedTagEntries.map((tagEntry) => ({ ...tagEntry, id: undefined, - })), true); + })), { + conflict: false, + transaction: trx, + }); } logger.info(`Updated ${updatedTagEntries.length} tags in ${new Set(updatedTagEntries.map((tagEntry) => tagEntry.release_id)).size} scenes`); diff --git a/src/tools/batch-test.js b/src/tools/batch-test.js new file mode 100644 index 00000000..b5618b1b --- /dev/null +++ b/src/tools/batch-test.js @@ -0,0 +1,64 @@ +'use strict'; + +const knex = require('../knex'); +const batchInsert = require('../utils/batch-insert'); + +async function createTestTable() { + const tableExists = await knex.schema.hasTable('batch_test'); + + if (tableExists) { + // await knex('batch_test').delete(); + return; + } + + await knex.schema.createTable('batch_test', (table) => { + table.increments('id'); + + table.string('name') + .unique(); + + table.integer('age'); + table.text('location'); + + table.datetime('created_at') + .notNullable() + .defaultTo(knex.fn.now()); + }); +} + +async function init() { + await createTestTable(); + + const transaction = await knex.transaction(); + + const entries = await batchInsert('batch_test', [ + { + name: 'John', + age: 18, + location: 'Home', + }, + { + name: 'Jack', + age: 38, + location: 'Work', + }, + { + name: 'James', + age: 35, + location: 'Club', + }, + ], { + conflict: 'name', + update: true, + transaction, + commit: false, + }); + + await transaction.commit(); + console.log('ENTRIES', entries); + + // await knex.schema.dropTable('batch_test'); + await knex.destroy(); +} + +init(); diff --git a/src/utils/batch-insert.js b/src/utils/batch-insert.js new file mode 100755 index 00000000..ff5e975c --- /dev/null +++ b/src/utils/batch-insert.js @@ -0,0 +1,96 @@ +'use strict'; + +const knex = require('../knex'); +const chunk = require('./chunk'); +const logger = require('../logger')(__filename); + +// improved version of bulkInsert +async function batchInsert(table, items, { + conflict = true, + update = false, + chunkSize = 1000, + concurrent = false, + transaction, + commit = false, +} = {}) { + if (!table) { + throw new Error('No table specified for batch insert'); + } + + if (!Array.isArray(items)) { + throw new Error('Batch insert items are not an array'); + } + + if (items.length === 0) { + return []; + } + + const chunks = chunk(items, chunkSize); + const conflicts = [].concat(conflict).filter((column) => typeof column === 'string'); // conflict might be 'true' + const trx = transaction || await knex.transaction(); + + try { + const queries = chunks.map((chunkItems) => { + const query = trx(table) + .insert(chunkItems) + .returning('*'); + + if (conflicts.length > 0) { + if (Array.isArray(update)) { + // udpate specified + return query + .onConflict(conflicts) + .merge(update); + } + + if (update) { + // update all + return query + .onConflict(conflicts) + .merge(); + } + + throw new Error('Batch insert conflict columns must be specified together with update'); + } + + if (conflict && update) { + throw new Error('Batch insert conflict must specify columns, or update must be disabled'); + } + + // error on any conflict + if (conflict) { + return query; + } + + // ignore duplicates, keep old entries as-is + return query + .onConflict() + .ignore(); + }); + + const results = concurrent + ? await Promise.all(queries) + : await queries.reduce(async (chain, query) => { + const acc = await chain; + const result = await query; + + return acc.concat(result); + }, Promise.resolve([])); + + if (!transaction || commit) { + await trx.commit(); + } + + return results; + } catch (error) { + if (!transaction || commit) { + await trx.rollback(); + } + + logger.error(`Failed batch insert: ${error.message} (${error.detail})`); + + throw error; + } +} + +module.exports = batchInsert;