'use strict'; const config = require('config'); const Promise = require('bluebird'); const moment = require('moment'); const argv = require('./argv'); const logger = require('./logger')(__filename); const knex = require('./knex'); const { curateRelease } = require('./releases'); const chunk = require('./utils/chunk'); const include = require('./utils/argv-include')(argv); const { resolveScraper, resolveLayoutScraper } = require('./scrapers/resolve'); const { fetchIncludedEntities } = require('./entities'); const getRecursiveParameters = require('./utils/get-recursive-parameters'); const emptyReleases = { uniqueReleases: [], duplicateReleases: [] }; function mapReleasesToEntityIdAndEntryId(acc, release) { const entityId = release.entityId || release.entity.id; const entryId = release.entryId || release.entryId; if (!acc[entityId]) acc[entityId] = {}; acc[entityId][entryId] = release; return acc; } function filterLocalUniqueReleases(releases, accReleases) { const localDuplicateReleasesBySiteIdAndEntryId = accReleases.reduce(mapReleasesToEntityIdAndEntryId, {}); const localUniqueReleases = releases.filter((release) => !localDuplicateReleasesBySiteIdAndEntryId[release.entity.id]?.[release.entryId]); const localDuplicateReleases = releases.filter((release) => localDuplicateReleasesBySiteIdAndEntryId[release.entity.id]?.[release.entryId]); return { localUniqueReleases, localDuplicateReleases, }; } async function filterUniqueReleases(releases) { const releaseIdentifierChunks = chunk(releases.map((release) => [release.entity.id, release.entryId.toString()])); const duplicateReleaseEntryChunks = await Promise.map(releaseIdentifierChunks, async (releaseIdentifiers) => { const duplicateReleaseEntriesQuery = knex('releases') .select(knex.raw('releases.*, row_to_json(entities) as entity')) .leftJoin('entities', 'entities.id', 'releases.entity_id') .whereIn(['entity_id', 'entry_id'], releaseIdentifiers) .where((builder) => { // check if previously upcoming scenes can be excluded from duplicates to be rescraped for release day updates builder .where('deep', true) // scene is already deep scraped .orWhereNull('date') .orWhereNotIn('date_precision', ['day', 'minute']) // don't worry about scenes without (accurate) dates for now .orWhere(knex.raw('date > NOW() - INTERVAL \'12 hours\'')) // scene is still upcoming, with a rough offset to wait for the end of the day west of UTC .orWhere(knex.raw('updated_at - date > INTERVAL \'1 day\'')); // scene was updated after the release date, no updates expected }); return duplicateReleaseEntriesQuery; }, { concurrency: 10 }); const duplicateReleaseEntries = duplicateReleaseEntryChunks.flat(); const duplicateReleases = duplicateReleaseEntries.map((release) => curateRelease(release)); const duplicateReleasesByEntityIdAndEntryId = duplicateReleases.reduce(mapReleasesToEntityIdAndEntryId, {}); const internalUniqueReleasesByEntityIdAndEntryId = releases.reduce((acc, release) => mapReleasesToEntityIdAndEntryId(acc, release), {}); const internalUniqueReleases = Object.values(internalUniqueReleasesByEntityIdAndEntryId).map((releasesByEntryId) => Object.values(releasesByEntryId)).flat(); const uniqueReleases = internalUniqueReleases.filter((release) => !duplicateReleasesByEntityIdAndEntryId[release.entity.id]?.[release.entryId]); return { uniqueReleases, duplicateReleases }; } function needNextPage(pageReleases, accReleases, isUpcoming, unextracted = []) { const { localUniqueReleases: uniquePageReleases } = filterLocalUniqueReleases(pageReleases, accReleases); if (uniquePageReleases.length + unextracted.length === 0) { // page is empty, or only contains scenes from previous page return false; } if (isUpcoming) { return uniquePageReleases.length > 0 && argv.paginateUpcoming; } if (uniquePageReleases.length + unextracted.length > 0) { if (argv.last) { return accReleases.length + pageReleases.length < argv.last; } if (!pageReleases.concat(unextracted).every((release) => !!release.date)) { // some scenes don't have dates return accReleases.length + pageReleases.length < argv.missingDateLimit; } if (argv.after) { const oldestReleaseOnPage = pageReleases .concat(unextracted) .sort((releaseA, releaseB) => releaseB.date - releaseA.date) .slice(-1)[0]; if (moment(oldestReleaseOnPage.date).isAfter(argv.after)) { // oldest release on page is newer than the specified date cut-off return true; } } } return false; } async function scrapeReleases(scraper, entity, preData, isUpcoming) { async function scrapeReleasesPage(page, accReleases, pageContext) { const options = { ...config.options[scraper.slug], ...include, ...preData, ...pageContext, parameters: getRecursiveParameters(entity), }; const rawPageReleases = isUpcoming ? await scraper.fetchUpcoming(entity, page, options, preData) : await scraper.fetchLatest(entity, page, options, preData); const pageReleases = rawPageReleases.scenes || rawPageReleases; if (!Array.isArray(pageReleases)) { // scraper is unable to fetch the releases and returned a HTTP code or null logger.warn(`Scraper returned ${pageReleases} when fetching latest from '${entity.name}' (${entity.parent?.name})`); return accReleases; } const validPageReleases = pageReleases.filter((release) => release?.entryId); // filter out empty and unidentified releases const pageReleasesWithEntity = validPageReleases.map((release) => ({ ...release, entity: release.entity || entity })); if (pageReleases.length > validPageReleases.length) { logger.warn(`Found ${pageReleases.length - validPageReleases.length} empty or unidentified releases on page ${page} for '${entity.name}'`); } if (needNextPage(pageReleasesWithEntity, accReleases, isUpcoming, rawPageReleases.unextracted)) { return scrapeReleasesPage(page + 1, accReleases.concat(pageReleasesWithEntity), rawPageReleases.context); } return accReleases.concat(pageReleasesWithEntity); } const releases = await scrapeReleasesPage(argv.page || 1, []); const hasDates = releases.every((release) => !!release.date); const limitedReleases = (argv.last && releases.slice(0, Math.max(argv.last, 0))) || (hasDates && releases.filter((release) => moment(release.date).isAfter(argv.after))) || releases.slice(0, Math.max(argv.missingDateLimit, 0)); const { uniqueReleases, duplicateReleases } = argv.force ? { uniqueReleases: limitedReleases, duplicateReleases: [] } : await filterUniqueReleases(limitedReleases); return { uniqueReleases, duplicateReleases }; } async function scrapeLatestReleases(scraper, entity, preData) { if (!argv.latest || !scraper.fetchLatest) { return emptyReleases; } try { return await scrapeReleases(scraper, entity, preData, false); } catch (error) { if (argv.debug) { console.trace(error); } logger.warn(`Failed to scrape latest updates for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`); } return emptyReleases; } async function scrapeUpcomingReleases(scraper, entity, preData) { if (!argv.upcoming || !scraper.fetchUpcoming) { return emptyReleases; } try { return await scrapeReleases(scraper, entity, preData, true); } catch (error) { if (argv.debug) { console.trace(error); } logger.warn(`Failed to scrape upcoming updates for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`); } return emptyReleases; } async function scrapeMovies(scraper, entity) { if (!argv.movies || !scraper.fetchMovies) { return []; } try { // return await scrapeReleases(scraper, entity, preData, true); return await scraper.fetchMovies(entity); } catch (error) { logger.warn(`Failed to scrape movies for '${entity.slug}' (${entity.parent?.slug}): ${error.message}`); } return []; } async function scrapeChannelReleases(scraper, channelEntity, preData) { const [latestReleases, upcomingReleases] = await Promise.all([ scrapeLatestReleases(scraper, channelEntity, preData), scrapeUpcomingReleases(scraper, channelEntity, preData), scrapeMovies(scraper, channelEntity, preData), ]); logger.info(`Fetching ${argv.latest ? latestReleases.uniqueReleases.length : 'no'} latest and ${argv.upcoming ? upcomingReleases.uniqueReleases.length : 'no'} upcoming updates for '${channelEntity.name}' (${channelEntity.parent?.name})`); return { uniqueReleases: [...latestReleases.uniqueReleases, ...upcomingReleases.uniqueReleases], duplicateReleases: [...latestReleases.duplicateReleases, ...upcomingReleases.duplicateReleases], }; } async function scrapeChannel(channelEntity, accNetworkReleases, beforeNetwork) { const scraper = resolveScraper(channelEntity); const layoutScraper = resolveLayoutScraper(channelEntity, scraper); if (!layoutScraper) { logger.warn(`No scraper found for '${channelEntity.name}' (${channelEntity.parent?.name})`); return emptyReleases; } try { const beforeFetchLatest = await scraper.beforeFetchLatest?.(channelEntity, { beforeNetwork }); return await scrapeChannelReleases(layoutScraper, channelEntity, { ...accNetworkReleases, beforeFetchLatest, beforeNetwork, }); } catch (error) { logger.error(`Failed to scrape releases from ${channelEntity.name} using ${scraper.slug}: ${error.message}`); return emptyReleases; } } async function scrapeNetworkSequential(networkEntity) { const releases = await Promise.reduce( networkEntity.includedChildren, async (chain, channelEntity) => { const accNetworkReleases = await chain; const { uniqueReleases, duplicateReleases } = await scrapeChannel(channelEntity, accNetworkReleases); return { uniqueReleases: accNetworkReleases.uniqueReleases.concat(uniqueReleases), duplicateReleases: accNetworkReleases.duplicateReleases.concat(duplicateReleases), }; }, Promise.resolve(emptyReleases), ); return releases.uniqueReleases; } async function getBeforeNetwork(networkEntity) { try { const parameters = getRecursiveParameters(networkEntity); return await networkEntity.scraper?.beforeNetwork?.(networkEntity, parameters); } catch (error) { if (networkEntity.scraper?.requireBeforeNetwork === false) { return null; } throw error; } } async function scrapeNetworkParallel(networkEntity) { const beforeNetwork = await getBeforeNetwork(networkEntity); return Promise.map( networkEntity.includedChildren, async (channelEntity) => { const { uniqueReleases } = await scrapeChannel(channelEntity, null, beforeNetwork); return uniqueReleases; }, { concurrency: 3 }, ); } async function fetchUpdates() { const includedNetworks = await fetchIncludedEntities(); const scrapedNetworks = await Promise.map( includedNetworks, async (networkEntity) => (networkEntity.parameters?.sequential ? scrapeNetworkSequential(networkEntity) : scrapeNetworkParallel(networkEntity)), { concurrency: 5 }, ); const releases = scrapedNetworks.flat(2); return releases; } module.exports = fetchUpdates;