Using new bulk insert utility for releases, media and actors.
This commit is contained in:
parent
e996a45bf5
commit
50c5f921f5
|
@ -41,7 +41,6 @@ function initUiActions(_store, _router) {
|
||||||
slug
|
slug
|
||||||
date
|
date
|
||||||
url
|
url
|
||||||
type
|
|
||||||
isNew
|
isNew
|
||||||
entity {
|
entity {
|
||||||
id
|
id
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 525 KiB |
Binary file not shown.
After Width: | Height: | Size: 7.9 KiB |
Binary file not shown.
After Width: | Height: | Size: 36 KiB |
Binary file not shown.
After Width: | Height: | Size: 269 KiB |
Binary file not shown.
After Width: | Height: | Size: 5.6 KiB |
Binary file not shown.
After Width: | Height: | Size: 24 KiB |
|
@ -16,6 +16,7 @@ const scrapers = require('./scrapers/scrapers').actors;
|
||||||
|
|
||||||
const argv = require('./argv');
|
const argv = require('./argv');
|
||||||
const include = require('./utils/argv-include')(argv);
|
const include = require('./utils/argv-include')(argv);
|
||||||
|
const bulkInsert = require('./utils/bulk-insert');
|
||||||
const logger = require('./logger')(__filename);
|
const logger = require('./logger')(__filename);
|
||||||
|
|
||||||
const { toBaseReleases } = require('./deep');
|
const { toBaseReleases } = require('./deep');
|
||||||
|
@ -485,7 +486,7 @@ async function upsertProfiles(profiles) {
|
||||||
const updatingProfileEntries = profiles.filter(profile => profile.update).map(profile => curateProfileEntry(profile));
|
const updatingProfileEntries = profiles.filter(profile => profile.update).map(profile => curateProfileEntry(profile));
|
||||||
|
|
||||||
if (newProfileEntries.length > 0) {
|
if (newProfileEntries.length > 0) {
|
||||||
await knex.batchInsert('actors_profiles', newProfileEntries);
|
await bulkInsert('actors_profiles', newProfileEntries);
|
||||||
|
|
||||||
logger.info(`Saved ${newProfileEntries.length} actor profiles`);
|
logger.info(`Saved ${newProfileEntries.length} actor profiles`);
|
||||||
}
|
}
|
||||||
|
@ -639,9 +640,7 @@ async function scrapeActors(argNames) {
|
||||||
|
|
||||||
// TODO: associate entity when entry ID is provided
|
// TODO: associate entity when entry ID is provided
|
||||||
|
|
||||||
const newActorEntries = batchId && await knex('actors')
|
const newActorEntries = batchId && await bulkInsert('actors', curatedActorEntries);
|
||||||
.insert(curatedActorEntries)
|
|
||||||
.returning(['id', 'name', 'slug', 'entry_id']);
|
|
||||||
|
|
||||||
const actors = existingActorEntries.concat(Array.isArray(newActorEntries) ? newActorEntries : []);
|
const actors = existingActorEntries.concat(Array.isArray(newActorEntries) ? newActorEntries : []);
|
||||||
|
|
||||||
|
@ -697,7 +696,7 @@ async function getOrCreateActors(baseActors, batchId) {
|
||||||
const uniqueBaseActors = baseActors.filter(baseActor => !existingActorSlugs[baseActor.entity.id]?.[baseActor.slug] && !existingActorSlugs.null?.[baseActor.slug]);
|
const uniqueBaseActors = baseActors.filter(baseActor => !existingActorSlugs[baseActor.entity.id]?.[baseActor.slug] && !existingActorSlugs.null?.[baseActor.slug]);
|
||||||
|
|
||||||
const curatedActorEntries = curateActorEntries(uniqueBaseActors, batchId);
|
const curatedActorEntries = curateActorEntries(uniqueBaseActors, batchId);
|
||||||
const newActors = await knex('actors').insert(curatedActorEntries, ['id', 'alias_for', 'name', 'slug', 'entity_id']);
|
const newActors = await bulkInsert('actors', curatedActorEntries);
|
||||||
|
|
||||||
if (Array.isArray(newActors)) {
|
if (Array.isArray(newActors)) {
|
||||||
return newActors.concat(existingActors);
|
return newActors.concat(existingActors);
|
||||||
|
@ -743,7 +742,7 @@ async function associateActors(releases, batchId) {
|
||||||
})))
|
})))
|
||||||
.flat();
|
.flat();
|
||||||
|
|
||||||
await knex.raw(`${knex('releases_actors').insert(releaseActorAssociations).toString()} ON CONFLICT DO NOTHING;`);
|
await bulkInsert('releases_actors', releaseActorAssociations, false);
|
||||||
|
|
||||||
return actors;
|
return actors;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ const logger = require('./logger')(__filename);
|
||||||
const argv = require('./argv');
|
const argv = require('./argv');
|
||||||
const knex = require('./knex');
|
const knex = require('./knex');
|
||||||
const http = require('./utils/http');
|
const http = require('./utils/http');
|
||||||
|
const bulkInsert = require('./utils/bulk-insert');
|
||||||
const { get } = require('./utils/qu');
|
const { get } = require('./utils/qu');
|
||||||
|
|
||||||
const pipeline = util.promisify(stream.pipeline);
|
const pipeline = util.promisify(stream.pipeline);
|
||||||
|
@ -607,7 +608,7 @@ async function storeMedias(baseMedias) {
|
||||||
const newMediaWithEntries = savedMedias.map((media, index) => curateMediaEntry(media, index));
|
const newMediaWithEntries = savedMedias.map((media, index) => curateMediaEntry(media, index));
|
||||||
const newMediaEntries = newMediaWithEntries.filter(media => media.newEntry).map(media => media.entry);
|
const newMediaEntries = newMediaWithEntries.filter(media => media.newEntry).map(media => media.entry);
|
||||||
|
|
||||||
await knex('media').insert(newMediaEntries);
|
await bulkInsert('media', newMediaEntries);
|
||||||
|
|
||||||
return [...newMediaWithEntries, ...existingHashMedias];
|
return [...newMediaWithEntries, ...existingHashMedias];
|
||||||
}
|
}
|
||||||
|
@ -670,7 +671,7 @@ async function associateReleaseMedia(releases, type = 'releases') {
|
||||||
.filter(Boolean);
|
.filter(Boolean);
|
||||||
|
|
||||||
if (associations.length > 0) {
|
if (associations.length > 0) {
|
||||||
await knex.raw(`${knex(`${type}_${role}`).insert(associations)} ON CONFLICT DO NOTHING`);
|
await bulkInsert(`${type}_${role}`, associations, false);
|
||||||
}
|
}
|
||||||
}, Promise.resolve());
|
}, Promise.resolve());
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,8 +224,7 @@ async function updateReleasesSearch(releaseIds) {
|
||||||
`, releaseIds && [releaseIds]);
|
`, releaseIds && [releaseIds]);
|
||||||
|
|
||||||
if (documents.rows?.length > 0) {
|
if (documents.rows?.length > 0) {
|
||||||
const query = knex('releases_search').insert(documents.rows).toString();
|
await bulkInsert('releases_search', documents.rows, ['release_id']);
|
||||||
await knex.raw(`${query} ON CONFLICT (release_id) DO UPDATE SET document = EXCLUDED.document`);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +243,7 @@ async function storeScenes(releases) {
|
||||||
|
|
||||||
const curatedNewReleaseEntries = uniqueReleases.map(release => curateReleaseEntry(release, batchId));
|
const curatedNewReleaseEntries = uniqueReleases.map(release => curateReleaseEntry(release, batchId));
|
||||||
|
|
||||||
const storedReleases = await knex.batchInsert('releases', curatedNewReleaseEntries).returning('*');
|
const storedReleases = await bulkInsert('releases', curatedNewReleaseEntries);
|
||||||
// TODO: update duplicate releases
|
// TODO: update duplicate releases
|
||||||
|
|
||||||
const storedReleaseEntries = Array.isArray(storedReleases) ? storedReleases : [];
|
const storedReleaseEntries = Array.isArray(storedReleases) ? storedReleases : [];
|
||||||
|
@ -297,7 +296,7 @@ async function associateMovieScenes(movies, movieScenes) {
|
||||||
});
|
});
|
||||||
}).flat().filter(Boolean);
|
}).flat().filter(Boolean);
|
||||||
|
|
||||||
await bulkInsert('movies_scenes', associations, ['movie_id', 'scene_id']);
|
await bulkInsert('movies_scenes', associations, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function storeMovies(movies, movieScenes) {
|
async function storeMovies(movies, movieScenes) {
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const knex = require('../knex');
|
||||||
|
const chunk = require('./chunk');
|
||||||
|
|
||||||
|
async function bulkUpsert(table, items, conflict, update = true, chunkSize) {
|
||||||
|
const updated = (conflict === false && ':query ON CONFLICT DO NOTHING RETURNING *;')
|
||||||
|
|| (conflict && update && `
|
||||||
|
:query ON CONFLICT (${conflict})
|
||||||
|
DO UPDATE SET ${(update === true
|
||||||
|
? Object.keys(items[0]) // derive updating props
|
||||||
|
: update)
|
||||||
|
.reduce((acc, prop, index) => `${acc}${index > 0 ? ',' : ''}\n${prop} = ${conflict.includes(prop) ? table : 'EXCLUDED'}.${prop}`, '')}
|
||||||
|
RETURNING *;
|
||||||
|
`);
|
||||||
|
|
||||||
|
return knex.transaction(async (transaction) => {
|
||||||
|
const queries = chunk(items, chunkSize)
|
||||||
|
.map(chunkItems => knex.raw(updated || ':query RETURNING *;', {
|
||||||
|
query: knex(table).insert(chunkItems).transacting(transaction),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const responses = await Promise.all(queries);
|
||||||
|
|
||||||
|
return responses.flat().map(response => response.rows).flat();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = bulkUpsert;
|
Loading…
Reference in New Issue