Chunked duplicate check to prevent postgres stack depth errors.

This commit is contained in:
DebaucheryLibrarian 2022-01-10 02:17:17 +01:00
parent 43a0bc8a2c
commit 506971b44b
3 changed files with 33 additions and 22 deletions

View File

@ -232,7 +232,7 @@ function actors(release) {
} }
async function fetchLatest(entity, page, options) { async function fetchLatest(entity, page, options) {
return Promise.all(Array.from({ length: 100 }, async (value, index) => { return Promise.all(Array.from({ length: 10000 }, async (value, index) => {
const release = {}; const release = {};
release.entryId = nanoid(); release.entryId = nanoid();
@ -262,7 +262,7 @@ async function fetchLatest(entity, page, options) {
.select('name') .select('name')
.where('priority', '>', 7) .where('priority', '>', 7)
.orderByRaw('random()') .orderByRaw('random()')
.limit(faker.random.number({ min: 2, max: 15 })) .limit(faker.datatype.number({ min: 2, max: 15 }))
.pluck('name'); .pluck('name');
release.actors = [...actors(release), null]; // include empty actor to ensure proper handling release.actors = [...actors(release), null]; // include empty actor to ensure proper handling

View File

@ -1,6 +1,7 @@
'use strict'; 'use strict';
const config = require('config'); const config = require('config');
const Promise = require('bluebird');
const argv = require('./argv'); const argv = require('./argv');
const logger = require('./logger')(__filename); const logger = require('./logger')(__filename);
@ -8,6 +9,7 @@ const knex = require('./knex');
const slugify = require('./utils/slugify'); const slugify = require('./utils/slugify');
const bulkInsert = require('./utils/bulk-insert'); const bulkInsert = require('./utils/bulk-insert');
const resolvePlace = require('./utils/resolve-place'); const resolvePlace = require('./utils/resolve-place');
const chunk = require('./utils/chunk');
const { formatDate } = require('./utils/qu'); const { formatDate } = require('./utils/qu');
const { associateActors, associateDirectors, scrapeActors, toBaseActors } = require('./actors'); const { associateActors, associateDirectors, scrapeActors, toBaseActors } = require('./actors');
const { associateReleaseTags } = require('./tags'); const { associateReleaseTags } = require('./tags');
@ -192,13 +194,16 @@ function filterInternalDuplicateReleases(releases) {
async function filterDuplicateReleases(releases) { async function filterDuplicateReleases(releases) {
const internalUniqueReleases = filterInternalDuplicateReleases(releases); const internalUniqueReleases = filterInternalDuplicateReleases(releases);
const internalUniqueReleaseChunks = chunk(internalUniqueReleases);
const duplicateReleaseEntries = await knex('releases') const duplicateReleaseEntryChunks = await Promise.map(internalUniqueReleaseChunks, async (internalUniqueReleasesChunk) => knex('releases')
.whereIn(['entry_id', 'entity_id'], internalUniqueReleases.map((release) => [release.entryId, release.entity.id])) .whereIn(['entry_id', 'entity_id'], internalUniqueReleasesChunk.map((release) => [release.entryId, release.entity.id]))
.orWhereIn(['entry_id', 'entity_id'], internalUniqueReleases .orWhereIn(['entry_id', 'entity_id'], internalUniqueReleasesChunk
// scene IDs shared across network, mark as duplicate so scene can be updated with channel if only available on release day (i.e. Perv City) // scene IDs shared across network, mark as duplicate so scene can be updated with channel if only available on release day (i.e. Perv City)
.filter((release) => release.entity.parent?.parameters?.networkEntryIds) .filter((release) => release.entity.parent?.parameters?.networkEntryIds)
.map((release) => [release.entryId, release.entity.parent.id])); .map((release) => [release.entryId, release.entity.parent.id])), { concurrency: 10 });
const duplicateReleaseEntries = duplicateReleaseEntryChunks.flat();
const duplicateReleasesByEntityIdAndEntryId = duplicateReleaseEntries.reduce((acc, release) => { const duplicateReleasesByEntityIdAndEntryId = duplicateReleaseEntries.reduce((acc, release) => {
if (!acc[release.entity_id]) acc[release.entity_id] = {}; if (!acc[release.entity_id]) acc[release.entity_id] = {};

View File

@ -8,6 +8,7 @@ const argv = require('./argv');
const logger = require('./logger')(__filename); const logger = require('./logger')(__filename);
const knex = require('./knex'); const knex = require('./knex');
const { curateRelease } = require('./releases'); const { curateRelease } = require('./releases');
const chunk = require('./utils/chunk');
const include = require('./utils/argv-include')(argv); const include = require('./utils/argv-include')(argv);
const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve'); const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve');
const { fetchIncludedEntities } = require('./entities'); const { fetchIncludedEntities } = require('./entities');
@ -38,9 +39,9 @@ function filterLocalUniqueReleases(releases, accReleases) {
} }
async function filterUniqueReleases(releases) { async function filterUniqueReleases(releases) {
const releaseIdentifiers = releases const releaseIdentifierChunks = chunk(releases.map((release) => [release.entity.id, release.entryId.toString()]));
.map((release) => [release.entity.id, release.entryId.toString()]);
const duplicateReleaseEntryChunks = await Promise.map(releaseIdentifierChunks, async (releaseIdentifiers) => {
const duplicateReleaseEntriesQuery = knex('releases') const duplicateReleaseEntriesQuery = knex('releases')
.select(knex.raw('releases.*, row_to_json(entities) as entity')) .select(knex.raw('releases.*, row_to_json(entities) as entity'))
.leftJoin('entities', 'entities.id', 'releases.entity_id') .leftJoin('entities', 'entities.id', 'releases.entity_id')
@ -55,7 +56,12 @@ async function filterUniqueReleases(releases) {
.orWhere(knex.raw('updated_at - date > INTERVAL \'1 day\'')); // scene was updated after the release date, no updates expected .orWhere(knex.raw('updated_at - date > INTERVAL \'1 day\'')); // scene was updated after the release date, no updates expected
}); });
const duplicateReleaseEntries = await duplicateReleaseEntriesQuery; return duplicateReleaseEntriesQuery;
}, { concurrency: 10 });
const duplicateReleaseEntries = duplicateReleaseEntryChunks.flat();
console.log(duplicateReleaseEntries);
const duplicateReleases = duplicateReleaseEntries.map((release) => curateRelease(release)); const duplicateReleases = duplicateReleaseEntries.map((release) => curateRelease(release));
const duplicateReleasesByEntityIdAndEntryId = duplicateReleases.reduce(mapReleasesToEntityIdAndEntryId, {}); const duplicateReleasesByEntityIdAndEntryId = duplicateReleases.reduce(mapReleasesToEntityIdAndEntryId, {});