traxxx/src/updates.js

315 lines
11 KiB
JavaScript
Executable File

'use strict';
const config = require('config');
const Promise = require('bluebird');
const moment = require('moment');
const argv = require('./argv');
const logger = require('./logger')(__filename);
const knex = require('./knex');
const { curateRelease } = require('./releases');
const chunk = require('./utils/chunk');
const include = require('./utils/argv-include')(argv);
const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve');
const { fetchIncludedEntities } = require('./entities');
const getRecursiveParameters = require('./utils/get-recursive-parameters');
const emptyReleases = { uniqueReleases: [], duplicateReleases: [] };
function mapReleasesToEntityIdAndEntryId(acc, release) {
const entityId = release.entityId || release.entity.id;
const entryId = release.entryId || release.entryId;
if (!acc[entityId]) acc[entityId] = {};
acc[entityId][entryId] = release;
return acc;
}
function filterLocalUniqueReleases(releases, accReleases) {
const localDuplicateReleasesBySiteIdAndEntryId = accReleases.reduce(mapReleasesToEntityIdAndEntryId, {});
const localUniqueReleases = releases.filter((release) => !localDuplicateReleasesBySiteIdAndEntryId[release.entity.id]?.[release.entryId]);
const localDuplicateReleases = releases.filter((release) => localDuplicateReleasesBySiteIdAndEntryId[release.entity.id]?.[release.entryId]);
return {
localUniqueReleases,
localDuplicateReleases,
};
}
async function filterUniqueReleases(releases) {
const releaseIdentifierChunks = chunk(releases.map((release) => [release.entity.id, release.entryId.toString()]));
const duplicateReleaseEntryChunks = await Promise.map(releaseIdentifierChunks, async (releaseIdentifiers) => {
const duplicateReleaseEntriesQuery = knex('releases')
.select(knex.raw('releases.*, row_to_json(entities) as entity'))
.leftJoin('entities', 'entities.id', 'releases.entity_id')
.whereIn(['entity_id', 'entry_id'], releaseIdentifiers)
.where((builder) => {
// check if previously upcoming scenes can be excluded from duplicates to be rescraped for release day updates
builder
.where('deep', true) // scene is already deep scraped
.orWhereNull('date')
.orWhereNotIn('date_precision', ['day', 'minute']) // don't worry about scenes without (accurate) dates for now
.orWhere(knex.raw('date > NOW() - INTERVAL \'12 hours\'')) // scene is still upcoming, with a rough offset to wait for the end of the day west of UTC
.orWhere(knex.raw('updated_at - date > INTERVAL \'1 day\'')); // scene was updated after the release date, no updates expected
});
return duplicateReleaseEntriesQuery;
}, { concurrency: 10 });
const duplicateReleaseEntries = duplicateReleaseEntryChunks.flat();
const duplicateReleases = duplicateReleaseEntries.map((release) => curateRelease(release));
const duplicateReleasesByEntityIdAndEntryId = duplicateReleases.reduce(mapReleasesToEntityIdAndEntryId, {});
const internalUniqueReleasesByEntityIdAndEntryId = releases.reduce((acc, release) => mapReleasesToEntityIdAndEntryId(acc, release), {});
const internalUniqueReleases = Object.values(internalUniqueReleasesByEntityIdAndEntryId).map((releasesByEntryId) => Object.values(releasesByEntryId)).flat();
const uniqueReleases = internalUniqueReleases.filter((release) => !duplicateReleasesByEntityIdAndEntryId[release.entity.id]?.[release.entryId]);
return { uniqueReleases, duplicateReleases };
}
function needNextPage(pageReleases, accReleases, isUpcoming, unextracted = []) {
const { localUniqueReleases: uniquePageReleases } = filterLocalUniqueReleases(pageReleases, accReleases);
if (uniquePageReleases.length + unextracted.length === 0) {
// page is empty, or only contains scenes from previous page
return false;
}
if (isUpcoming) {
return uniquePageReleases.length > 0 && argv.paginateUpcoming;
}
if (uniquePageReleases.length + unextracted.length > 0) {
if (argv.last) {
return accReleases.length + pageReleases.length < argv.last;
}
if (!pageReleases.concat(unextracted).every((release) => !!release.date)) { // some scenes don't have dates
return accReleases.length + pageReleases.length < argv.missingDateLimit;
}
if (argv.after) {
const oldestReleaseOnPage = pageReleases
.concat(unextracted)
.sort((releaseA, releaseB) => releaseB.date - releaseA.date)
.slice(-1)[0];
if (moment(oldestReleaseOnPage.date).isAfter(argv.after)) {
// oldest release on page is newer than the specified date cut-off
return true;
}
}
}
return false;
}
async function scrapeReleases(scraper, entity, preData, isUpcoming) {
async function scrapeReleasesPage(page, accReleases, pageContext) {
const options = {
...config.options[scraper.slug],
...include,
...preData,
...pageContext,
parameters: getRecursiveParameters(entity),
};
const rawPageReleases = isUpcoming
? await scraper.fetchUpcoming(entity, page, options, preData)
: await scraper.fetchLatest(entity, page, options, preData);
const pageReleases = rawPageReleases.scenes || rawPageReleases;
if (!Array.isArray(pageReleases)) {
// scraper is unable to fetch the releases and returned a HTTP code or null
logger.warn(`Scraper returned ${pageReleases} when fetching latest from '${entity.name}' (${entity.parent?.name})`);
return accReleases;
}
const validPageReleases = pageReleases.filter((release) => release?.entryId); // filter out empty and unidentified releases
const pageReleasesWithEntity = validPageReleases.map((release) => ({ ...release, entity: release.entity || entity }));
if (pageReleases.length > validPageReleases.length) {
logger.warn(`Found ${pageReleases.length - validPageReleases.length} empty or unidentified releases on page ${page} for '${entity.name}'`);
}
if (needNextPage(pageReleasesWithEntity, accReleases, isUpcoming, rawPageReleases.unextracted)) {
return scrapeReleasesPage(page + 1, accReleases.concat(pageReleasesWithEntity), rawPageReleases.context);
}
return accReleases.concat(pageReleasesWithEntity);
}
const releases = await scrapeReleasesPage(argv.page || 1, []);
const hasDates = releases.every((release) => !!release.date);
const limitedReleases = (argv.last && releases.slice(0, Math.max(argv.last, 0)))
|| (hasDates && releases.filter((release) => moment(release.date).isAfter(argv.after)))
|| releases.slice(0, Math.max(argv.missingDateLimit, 0));
const { uniqueReleases, duplicateReleases } = argv.force
? { uniqueReleases: limitedReleases, duplicateReleases: [] }
: await filterUniqueReleases(limitedReleases);
return { uniqueReleases, duplicateReleases };
}
async function scrapeLatestReleases(scraper, entity, preData) {
if (!argv.latest || !scraper.fetchLatest) {
return emptyReleases;
}
try {
return await scrapeReleases(scraper, entity, preData, false);
} catch (error) {
if (argv.debug) {
console.trace(error);
}
logger.warn(`Failed to scrape latest updates for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`);
}
return emptyReleases;
}
async function scrapeUpcomingReleases(scraper, entity, preData) {
if (!argv.upcoming || !scraper.fetchUpcoming) {
return emptyReleases;
}
try {
return await scrapeReleases(scraper, entity, preData, true);
} catch (error) {
if (argv.debug) {
console.trace(error);
}
logger.warn(`Failed to scrape upcoming updates for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`);
}
return emptyReleases;
}
async function scrapeMovies(scraper, entity) {
if (!argv.movies || !scraper.fetchMovies) {
return [];
}
try {
// return await scrapeReleases(scraper, entity, preData, true);
return await scraper.fetchMovies(entity);
} catch (error) {
logger.warn(`Failed to scrape movies for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`);
}
return [];
}
async function scrapeChannelReleases(scraper, channelEntity, preData) {
const [latestReleases, upcomingReleases] = await Promise.all([
scrapeLatestReleases(scraper, channelEntity, preData),
scrapeUpcomingReleases(scraper, channelEntity, preData),
scrapeMovies(scraper, channelEntity, preData),
]);
logger.info(`Fetching ${argv.latest ? latestReleases.uniqueReleases.length : 'no'} latest and ${argv.upcoming ? upcomingReleases.uniqueReleases.length : 'no'} upcoming updates for '${channelEntity.name}' (${channelEntity.parent?.name})`);
return {
uniqueReleases: [...latestReleases.uniqueReleases, ...upcomingReleases.uniqueReleases],
duplicateReleases: [...latestReleases.duplicateReleases, ...upcomingReleases.duplicateReleases],
};
}
async function scrapeChannel(channelEntity, accNetworkReleases, beforeNetwork) {
const scraper = resolveScraper(channelEntity);
const layoutScraper = resolveLayoutScraper(channelEntity, scraper);
if (!layoutScraper) {
logger.warn(`No scraper found for '${channelEntity.name}' (${channelEntity.parent?.name})`);
return emptyReleases;
}
try {
const beforeFetchLatest = await scraper.beforeFetchLatest?.(channelEntity, { beforeNetwork });
return await scrapeChannelReleases(layoutScraper, channelEntity, {
...accNetworkReleases,
beforeFetchLatest,
beforeNetwork,
});
} catch (error) {
logger.error(`Failed to scrape releases from ${channelEntity.name} using ${scraper.slug}: ${error.message}`);
return emptyReleases;
}
}
async function scrapeNetworkSequential(networkEntity) {
const releases = await Promise.reduce(
networkEntity.includedChildren,
async (chain, channelEntity) => {
const accNetworkReleases = await chain;
const { uniqueReleases, duplicateReleases } = await scrapeChannel(channelEntity, accNetworkReleases);
return {
uniqueReleases: accNetworkReleases.uniqueReleases.concat(uniqueReleases),
duplicateReleases: accNetworkReleases.duplicateReleases.concat(duplicateReleases),
};
},
Promise.resolve(emptyReleases),
);
return releases.uniqueReleases;
}
async function getBeforeNetwork(networkEntity) {
try {
const parameters = getRecursiveParameters(networkEntity);
return await networkEntity.scraper?.beforeNetwork?.(networkEntity, parameters);
} catch (error) {
if (networkEntity.scraper?.requireBeforeNetwork === false) {
return null;
}
throw error;
}
}
async function scrapeNetworkParallel(networkEntity) {
const beforeNetwork = await getBeforeNetwork(networkEntity);
return Promise.map(
networkEntity.includedChildren,
async (channelEntity) => {
const { uniqueReleases } = await scrapeChannel(channelEntity, null, beforeNetwork);
return uniqueReleases;
},
{ concurrency: 3 },
);
}
async function fetchUpdates() {
const includedNetworks = await fetchIncludedEntities();
const scrapedNetworks = await Promise.map(
includedNetworks,
async (networkEntity) => (networkEntity.parameters?.sequential
? scrapeNetworkSequential(networkEntity)
: scrapeNetworkParallel(networkEntity)),
{ concurrency: 5 },
);
const releases = scrapedNetworks.flat(2);
return releases;
}
module.exports = fetchUpdates;