Switched to tabs. Adding missing actor entries when scraping actors, with batch ID.
This commit is contained in:
@@ -11,155 +11,164 @@ const { curateSite } = require('./sites');
|
||||
const { associateReleaseMedia } = require('./media');
|
||||
|
||||
function curateReleaseEntry(release, batchId, existingRelease) {
|
||||
const slug = slugify(release.title || release.actors?.join('-') || null, '-', {
|
||||
encode: true,
|
||||
limit: config.titleSlugLength,
|
||||
});
|
||||
const slug = slugify(release.title || release.actors?.join('-') || null, '-', {
|
||||
encode: true,
|
||||
limit: config.titleSlugLength,
|
||||
});
|
||||
|
||||
const curatedRelease = {
|
||||
title: release.title,
|
||||
entry_id: release.entryId || null,
|
||||
site_id: release.site.id,
|
||||
shoot_id: release.shootId || null,
|
||||
studio_id: release.studio?.id || null,
|
||||
url: release.url,
|
||||
date: release.date,
|
||||
slug,
|
||||
description: release.description,
|
||||
duration: release.duration,
|
||||
type: release.type,
|
||||
// director: release.director,
|
||||
// likes: release.rating && release.rating.likes,
|
||||
// dislikes: release.rating && release.rating.dislikes,
|
||||
// rating: release.rating && release.rating.stars && Math.floor(release.rating.stars),
|
||||
deep: typeof release.deep === 'boolean' ? release.deep : false,
|
||||
deep_url: release.deepUrl,
|
||||
updated_batch_id: batchId,
|
||||
};
|
||||
const curatedRelease = {
|
||||
title: release.title,
|
||||
entry_id: release.entryId || null,
|
||||
site_id: release.site?.id,
|
||||
network_id: release.site ? null : release.network?.id, // prefer site ID if available
|
||||
shoot_id: release.shootId || null,
|
||||
studio_id: release.studio?.id || null,
|
||||
url: release.url,
|
||||
date: release.date,
|
||||
slug,
|
||||
description: release.description,
|
||||
duration: release.duration,
|
||||
type: release.type,
|
||||
// director: release.director,
|
||||
// likes: release.rating && release.rating.likes,
|
||||
// dislikes: release.rating && release.rating.dislikes,
|
||||
// rating: release.rating && release.rating.stars && Math.floor(release.rating.stars),
|
||||
deep: typeof release.deep === 'boolean' ? release.deep : false,
|
||||
deep_url: release.deepUrl,
|
||||
updated_batch_id: batchId,
|
||||
};
|
||||
|
||||
if (!existingRelease && !release.id) {
|
||||
curatedRelease.created_batch_id = batchId;
|
||||
}
|
||||
if (!existingRelease && !release.id) {
|
||||
curatedRelease.created_batch_id = batchId;
|
||||
}
|
||||
|
||||
return curatedRelease;
|
||||
return curatedRelease;
|
||||
}
|
||||
|
||||
async function attachChannelSites(releases) {
|
||||
const releasesWithoutSite = releases.filter(release => release.channel && (!release.site || release.site.isFallback));
|
||||
const releasesWithoutSite = releases.filter(release => release.channel && (!release.site || release.site.isNetwork));
|
||||
|
||||
const channelSites = await knex('sites')
|
||||
.leftJoin('networks', 'networks.id', 'sites.network_id')
|
||||
.select('sites.*', 'networks.name as network_name', 'networks.slug as network_slug', 'networks.url as network_url', 'networks.parameters as network_parameters', 'networks.description as network_description')
|
||||
.whereIn('sites.slug', releasesWithoutSite.map(release => release.channel));
|
||||
const channelSites = await knex('sites')
|
||||
.leftJoin('networks', 'networks.id', 'sites.network_id')
|
||||
.select('sites.*', 'networks.name as network_name', 'networks.slug as network_slug', 'networks.url as network_url', 'networks.parameters as network_parameters', 'networks.description as network_description')
|
||||
.whereIn('sites.slug', releasesWithoutSite.map(release => release.channel));
|
||||
|
||||
const channelSitesBySlug = channelSites.reduce((acc, site) => ({ ...acc, [site.slug]: site }), {});
|
||||
const channelSitesBySlug = channelSites.reduce((acc, site) => ({ ...acc, [site.slug]: site }), {});
|
||||
|
||||
const releasesWithChannelSite = await Promise.all(releases
|
||||
.map(async (release) => {
|
||||
if (release.site && !release.site.isFallback) {
|
||||
return release;
|
||||
}
|
||||
const releasesWithChannelSite = await Promise.all(releases
|
||||
.map(async (release) => {
|
||||
if (release.site && !release.site.isNetwork) {
|
||||
return release;
|
||||
}
|
||||
|
||||
if (release.channel && channelSitesBySlug[release.channel]) {
|
||||
const curatedSite = await curateSite(channelSitesBySlug[release.channel]);
|
||||
if (release.channel && channelSitesBySlug[release.channel]) {
|
||||
const curatedSite = await curateSite(channelSitesBySlug[release.channel]);
|
||||
|
||||
return {
|
||||
...release,
|
||||
site: curatedSite,
|
||||
};
|
||||
}
|
||||
return {
|
||||
...release,
|
||||
site: curatedSite,
|
||||
};
|
||||
}
|
||||
|
||||
logger.error(`Unable to match channel '${release.channel?.slug || release.channel}' from generic URL ${release.url}`);
|
||||
if (release.site && release.site.isNetwork) {
|
||||
return {
|
||||
...release,
|
||||
site: null,
|
||||
network: release.site,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}));
|
||||
logger.error(`Unable to match channel '${release.channel?.slug || release.channel}' from generic URL ${release.url}`);
|
||||
|
||||
return releasesWithChannelSite.filter(Boolean);
|
||||
return null;
|
||||
}));
|
||||
|
||||
return releasesWithChannelSite.filter(Boolean);
|
||||
}
|
||||
|
||||
async function attachStudios(releases) {
|
||||
const studioSlugs = releases.map(release => release.studio).filter(Boolean);
|
||||
const studioSlugs = releases.map(release => release.studio).filter(Boolean);
|
||||
|
||||
const studios = await knex('studios').whereIn('slug', studioSlugs);
|
||||
const studioBySlug = studios.reduce((acc, studio) => ({ ...acc, [studio.slug]: studio }), {});
|
||||
const studios = await knex('studios').whereIn('slug', studioSlugs);
|
||||
const studioBySlug = studios.reduce((acc, studio) => ({ ...acc, [studio.slug]: studio }), {});
|
||||
|
||||
const releasesWithStudio = releases.map((release) => {
|
||||
if (release.studio && studioBySlug[release.studio]) {
|
||||
return {
|
||||
...release,
|
||||
studio: studioBySlug[release.studio],
|
||||
};
|
||||
}
|
||||
const releasesWithStudio = releases.map((release) => {
|
||||
if (release.studio && studioBySlug[release.studio]) {
|
||||
return {
|
||||
...release,
|
||||
studio: studioBySlug[release.studio],
|
||||
};
|
||||
}
|
||||
|
||||
if (release.studio) {
|
||||
logger.warn(`Unable to match studio '${release.studio}' for ${release.url}`);
|
||||
}
|
||||
if (release.studio) {
|
||||
logger.warn(`Unable to match studio '${release.studio}' for ${release.url}`);
|
||||
}
|
||||
|
||||
return release;
|
||||
});
|
||||
return release;
|
||||
});
|
||||
|
||||
return releasesWithStudio;
|
||||
return releasesWithStudio;
|
||||
}
|
||||
|
||||
function attachReleaseIds(releases, storedReleases) {
|
||||
const storedReleaseIdsBySiteIdAndEntryId = storedReleases.reduce((acc, release) => {
|
||||
if (!acc[release.site_id]) acc[release.site_id] = {};
|
||||
acc[release.site_id][release.entry_id] = release.id;
|
||||
const storedReleaseIdsBySiteIdAndEntryId = storedReleases.reduce((acc, release) => {
|
||||
if (!acc[release.site_id]) acc[release.site_id] = {};
|
||||
acc[release.site_id][release.entry_id] = release.id;
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
const releasesWithId = releases.map(release => ({
|
||||
...release,
|
||||
id: storedReleaseIdsBySiteIdAndEntryId[release.site.id][release.entryId],
|
||||
}));
|
||||
const releasesWithId = releases.map(release => ({
|
||||
...release,
|
||||
id: storedReleaseIdsBySiteIdAndEntryId[release.site.id][release.entryId],
|
||||
}));
|
||||
|
||||
return releasesWithId;
|
||||
return releasesWithId;
|
||||
}
|
||||
|
||||
function filterInternalDuplicateReleases(releases) {
|
||||
const releasesBySiteIdAndEntryId = releases.reduce((acc, release) => {
|
||||
if (!acc[release.site.id]) {
|
||||
acc[release.site.id] = {};
|
||||
}
|
||||
const releasesBySiteIdAndEntryId = releases.reduce((acc, release) => {
|
||||
if (!acc[release.site.id]) {
|
||||
acc[release.site.id] = {};
|
||||
}
|
||||
|
||||
acc[release.site.id][release.entryId] = release;
|
||||
acc[release.site.id][release.entryId] = release;
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
return Object.values(releasesBySiteIdAndEntryId)
|
||||
.map(siteReleases => Object.values(siteReleases))
|
||||
.flat();
|
||||
return Object.values(releasesBySiteIdAndEntryId)
|
||||
.map(siteReleases => Object.values(siteReleases))
|
||||
.flat();
|
||||
}
|
||||
|
||||
async function filterDuplicateReleases(releases) {
|
||||
const internalUniqueReleases = filterInternalDuplicateReleases(releases);
|
||||
const internalUniqueReleases = filterInternalDuplicateReleases(releases);
|
||||
|
||||
const duplicateReleaseEntries = await knex('releases')
|
||||
.whereIn(['entry_id', 'site_id'], internalUniqueReleases.map(release => [release.entryId, release.site.id]));
|
||||
const duplicateReleaseEntries = await knex('releases')
|
||||
.whereIn(['entry_id', 'site_id'], internalUniqueReleases.map(release => [release.entryId, release.site.id]));
|
||||
|
||||
const duplicateReleasesBySiteIdAndEntryId = duplicateReleaseEntries.reduce((acc, release) => {
|
||||
if (!acc[release.site_id]) acc[release.site_id] = {};
|
||||
acc[release.site_id][release.entry_id] = true;
|
||||
const duplicateReleasesBySiteIdAndEntryId = duplicateReleaseEntries.reduce((acc, release) => {
|
||||
if (!acc[release.site_id]) acc[release.site_id] = {};
|
||||
acc[release.site_id][release.entry_id] = true;
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
const duplicateReleases = internalUniqueReleases.filter(release => duplicateReleasesBySiteIdAndEntryId[release.site.id]?.[release.entryId]);
|
||||
const uniqueReleases = internalUniqueReleases.filter(release => !duplicateReleasesBySiteIdAndEntryId[release.site.id]?.[release.entryId]);
|
||||
const duplicateReleases = internalUniqueReleases.filter(release => duplicateReleasesBySiteIdAndEntryId[release.site.id]?.[release.entryId]);
|
||||
const uniqueReleases = internalUniqueReleases.filter(release => !duplicateReleasesBySiteIdAndEntryId[release.site.id]?.[release.entryId]);
|
||||
|
||||
return {
|
||||
uniqueReleases,
|
||||
duplicateReleases,
|
||||
duplicateReleaseEntries,
|
||||
};
|
||||
return {
|
||||
uniqueReleases,
|
||||
duplicateReleases,
|
||||
duplicateReleaseEntries,
|
||||
};
|
||||
}
|
||||
|
||||
async function updateReleasesSearch(releaseIds) {
|
||||
logger.info(`Updating search documents for ${releaseIds ? releaseIds.length : 'all' } releases`);
|
||||
logger.info(`Updating search documents for ${releaseIds ? releaseIds.length : 'all' } releases`);
|
||||
|
||||
const documents = await knex.raw(`
|
||||
const documents = await knex.raw(`
|
||||
SELECT
|
||||
releases.id AS release_id,
|
||||
TO_TSVECTOR(
|
||||
@@ -190,45 +199,49 @@ async function updateReleasesSearch(releaseIds) {
|
||||
GROUP BY releases.id, sites.name, sites.slug, sites.alias, sites.url, networks.name, networks.slug, networks.url;
|
||||
`, releaseIds && [releaseIds]);
|
||||
|
||||
if (documents.rows?.length > 0) {
|
||||
const query = knex('releases_search').insert(documents.rows).toString();
|
||||
await knex.raw(`${query} ON CONFLICT (release_id) DO UPDATE SET document = EXCLUDED.document`);
|
||||
}
|
||||
if (documents.rows?.length > 0) {
|
||||
const query = knex('releases_search').insert(documents.rows).toString();
|
||||
await knex.raw(`${query} ON CONFLICT (release_id) DO UPDATE SET document = EXCLUDED.document`);
|
||||
}
|
||||
}
|
||||
|
||||
async function storeReleases(releases) {
|
||||
const [batchId] = await knex('batches').insert({ comment: null }).returning('id');
|
||||
if (releases.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const releasesWithSites = await attachChannelSites(releases);
|
||||
const releasesWithStudios = await attachStudios(releasesWithSites);
|
||||
const [batchId] = await knex('batches').insert({ comment: null }).returning('id');
|
||||
|
||||
// uniqueness is site ID + entry ID, filter uniques after adding sites
|
||||
const { uniqueReleases, duplicateReleases, duplicateReleaseEntries } = await filterDuplicateReleases(releasesWithStudios);
|
||||
const releasesWithSites = await attachChannelSites(releases);
|
||||
const releasesWithStudios = await attachStudios(releasesWithSites);
|
||||
|
||||
const curatedNewReleaseEntries = uniqueReleases.map(release => curateReleaseEntry(release, batchId));
|
||||
// uniqueness is site ID + entry ID, filter uniques after adding sites
|
||||
const { uniqueReleases, duplicateReleases, duplicateReleaseEntries } = await filterDuplicateReleases(releasesWithStudios);
|
||||
|
||||
const storedReleases = await knex('releases').insert(curatedNewReleaseEntries).returning('*');
|
||||
// TODO: update duplicate releases
|
||||
const curatedNewReleaseEntries = uniqueReleases.map(release => curateReleaseEntry(release, batchId));
|
||||
|
||||
const storedReleaseEntries = Array.isArray(storedReleases) ? storedReleases : [];
|
||||
const releasesWithId = attachReleaseIds([].concat(uniqueReleases, duplicateReleases), [].concat(storedReleaseEntries, duplicateReleaseEntries));
|
||||
const storedReleases = await knex('releases').insert(curatedNewReleaseEntries).returning('*');
|
||||
// TODO: update duplicate releases
|
||||
|
||||
await Promise.all([
|
||||
associateActors(releasesWithId, batchId),
|
||||
associateReleaseTags(releasesWithId),
|
||||
]);
|
||||
const storedReleaseEntries = Array.isArray(storedReleases) ? storedReleases : [];
|
||||
const releasesWithId = attachReleaseIds([].concat(uniqueReleases, duplicateReleases), [].concat(storedReleaseEntries, duplicateReleaseEntries));
|
||||
|
||||
// media is more error-prone, associate separately
|
||||
await associateReleaseMedia(releasesWithId);
|
||||
await Promise.all([
|
||||
associateActors(releasesWithId, batchId),
|
||||
associateReleaseTags(releasesWithId),
|
||||
]);
|
||||
|
||||
logger.info(`Stored ${storedReleaseEntries.length} releases`);
|
||||
// media is more error-prone, associate separately
|
||||
await associateReleaseMedia(releasesWithId);
|
||||
|
||||
await updateReleasesSearch(releasesWithId.map(release => release.id));
|
||||
logger.info(`Stored ${storedReleaseEntries.length} releases`);
|
||||
|
||||
return releasesWithId;
|
||||
await updateReleasesSearch(releasesWithId.map(release => release.id));
|
||||
|
||||
return releasesWithId;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
storeReleases,
|
||||
updateReleasesSearch,
|
||||
storeReleases,
|
||||
updateReleasesSearch,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user