Refactored update scraper into new module.
This commit is contained in:
19
src/app.js
19
src/app.js
@@ -1,9 +1,23 @@
|
||||
'use strict';
|
||||
|
||||
// const knex = require('./knex');
|
||||
const argv = require('./argv');
|
||||
const knex = require('./knex');
|
||||
const initServer = require('./web/server');
|
||||
|
||||
const knex = require('./knex');
|
||||
const fetchUpdates = require('./fetch-updates');
|
||||
|
||||
async function init() {
|
||||
if (argv.server) {
|
||||
await initServer();
|
||||
return;
|
||||
}
|
||||
|
||||
await fetchUpdates();
|
||||
knex.destroy();
|
||||
}
|
||||
|
||||
/*
|
||||
const scrapeSites = require('./scrape-sites');
|
||||
const { scrapeScenes, scrapeMovies, deepFetchReleases } = require('./scrape-releases');
|
||||
const { storeReleases, updateReleasesSearch } = require('./releases');
|
||||
@@ -16,7 +30,7 @@ if (process.env.NODE_ENV === 'development') {
|
||||
async function init() {
|
||||
if (argv.scene) {
|
||||
await scrapeScenes(argv.scene);
|
||||
}
|
||||
|
||||
|
||||
if (argv.movie) {
|
||||
await scrapeMovies(argv.movie);
|
||||
@@ -52,5 +66,6 @@ async function init() {
|
||||
|
||||
knex.destroy();
|
||||
}
|
||||
*/
|
||||
|
||||
module.exports = init;
|
||||
|
||||
210
src/fetch-updates.js
Normal file
210
src/fetch-updates.js
Normal file
@@ -0,0 +1,210 @@
|
||||
'use strict';
|
||||
|
||||
const Promise = require('bluebird');
|
||||
const moment = require('moment');
|
||||
|
||||
const argv = require('./argv');
|
||||
const logger = require('./logger')(__filename);
|
||||
const knex = require('./knex');
|
||||
const include = require('./utils/argv-include')(argv);
|
||||
const scrapers = require('./scrapers/scrapers');
|
||||
const { fetchSitesFromArgv, fetchSitesFromConfig } = require('./sites');
|
||||
|
||||
const afterDate = (() => {
|
||||
if (/\d{2,4}-\d{2}-\d{2,4}/.test(argv.after)) {
|
||||
// using date
|
||||
return moment
|
||||
.utc(argv.after, ['YYYY-MM-DD', 'DD-MM-YYYY'])
|
||||
.toDate();
|
||||
}
|
||||
|
||||
// using time distance (e.g. "1 month")
|
||||
return moment
|
||||
.utc()
|
||||
.subtract(...argv.after.split(' '))
|
||||
.toDate();
|
||||
})();
|
||||
|
||||
async function extractUniqueReleases(latestReleases, accReleases) {
|
||||
const latestReleaseEntryIds = latestReleases.map(release => release.entryId);
|
||||
const duplicateReleases = await knex('releases')
|
||||
.whereIn('entry_id', latestReleaseEntryIds);
|
||||
|
||||
// add entry IDs of accumulated releases to prevent an infinite loop
|
||||
// when one page contains the same release as the previous
|
||||
const duplicateReleaseEntryIds = new Set(duplicateReleases
|
||||
.map(release => String(release.entry_id))
|
||||
.concat(accReleases.map(release => String(release.entryId))));
|
||||
|
||||
const uniqueReleases = latestReleases
|
||||
.filter(release => !duplicateReleaseEntryIds.has(String(release.entryId)));
|
||||
|
||||
return uniqueReleases;
|
||||
}
|
||||
|
||||
function getNextPage(uniqueReleases, pageAccReleases, oldestReleaseOnPage) {
|
||||
if (uniqueReleases === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (argv.last && pageAccReleases.length < argv.last) {
|
||||
// request for last N releases not yet satisfied
|
||||
return true;
|
||||
}
|
||||
|
||||
if (oldestReleaseOnPage && moment(oldestReleaseOnPage.date).isAfter(afterDate)) {
|
||||
// oldest release on page is newer than the specified date cut-off
|
||||
return true;
|
||||
}
|
||||
|
||||
// dates missing, and limit for scenes without dates not yet reached
|
||||
return pageAccReleases.length <= argv.nullDateLimit;
|
||||
}
|
||||
|
||||
async function scrapeLatestReleases(scraper, site, preData) {
|
||||
if (!scraper.fetchLatest) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const scrapePage = async (page = 1, accReleases = []) => {
|
||||
const latestReleases = await scraper.fetchLatest(site, page, preData, include);
|
||||
|
||||
if (!Array.isArray(latestReleases)) {
|
||||
// scraper is unable to fetch the releases and returned a HTTP code or null
|
||||
logger.warn(`Scraper returned ${latestReleases} when fetching latest from '${site.name}' (${site.network.name})`);
|
||||
return accReleases;
|
||||
}
|
||||
|
||||
if (latestReleases.length === 0) {
|
||||
// scraper successfully requested releases, but found none
|
||||
return accReleases;
|
||||
}
|
||||
|
||||
const latestReleasesWithSite = latestReleases.map(release => ({ ...release, site: release.site || site })); // attach site release is assigned to when stored
|
||||
const oldestReleaseOnPage = latestReleases.sort((releaseA, releaseB) => releaseB.date - releaseA.date).slice(-1)[0];
|
||||
|
||||
const uniqueReleases = await extractUniqueReleases(latestReleasesWithSite, accReleases);
|
||||
const pageAccReleases = accReleases.concat(uniqueReleases);
|
||||
|
||||
logger.verbose(`Scraped '${site.name}' (${site.network.name}) page ${page}, found ${uniqueReleases.length} unique releases`);
|
||||
|
||||
if (getNextPage(uniqueReleases, pageAccReleases, oldestReleaseOnPage)) {
|
||||
return scrapePage(page + 1, accReleases.concat(uniqueReleases));
|
||||
}
|
||||
|
||||
if (argv.last) {
|
||||
return pageAccReleases.slice(0, argv.last);
|
||||
}
|
||||
|
||||
if (oldestReleaseOnPage) {
|
||||
const recentReleases = uniqueReleases
|
||||
.filter(release => moment(release.date).isAfter(afterDate));
|
||||
|
||||
return accReleases.concat(recentReleases);
|
||||
}
|
||||
|
||||
return pageAccReleases.slice(0, argv.nullDateLimit);
|
||||
};
|
||||
|
||||
return scrapePage(1, []);
|
||||
}
|
||||
|
||||
async function scrapeUpcomingReleases(scraper, site, preData) {
|
||||
if (!scraper.fetchUpcoming) {
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const upcomingReleases = await scraper.fetchUpcoming(site, 1, preData, include);
|
||||
|
||||
if (upcomingReleases) {
|
||||
return upcomingReleases.map(release => ({
|
||||
...release,
|
||||
site,
|
||||
upcoming: true,
|
||||
}));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to scrape upcoming releases for '${site.slug}' (${site.network.slug})`);
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
async function scrapeSiteReleases(scraper, site, preData) {
|
||||
const [latestReleases, upcomingReleases] = await Promise.all([
|
||||
argv.latest
|
||||
? scrapeLatestReleases(scraper, site, preData)
|
||||
: [],
|
||||
argv.upcoming
|
||||
? scrapeUpcomingReleases(scraper, site, preData)
|
||||
: [],
|
||||
]);
|
||||
|
||||
return [...latestReleases, ...upcomingReleases];
|
||||
}
|
||||
|
||||
async function scrapeSite(site, accSiteReleases) {
|
||||
const scraper = scrapers.releases[site.slug]
|
||||
|| scrapers.releases[site.network.slug]
|
||||
|| scrapers.releases[site.network.parent?.slug];
|
||||
|
||||
if (!scraper) {
|
||||
logger.warn(`No scraper found for '${site.name}' (${site.network.name})`);
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const beforeFetchLatest = await scraper.beforeFetchLatest?.(site);
|
||||
|
||||
const siteReleases = await scrapeSiteReleases(scraper, site, {
|
||||
accSiteReleases,
|
||||
beforeFetchLatest,
|
||||
});
|
||||
|
||||
return siteReleases.map(release => ({ ...release, site }));
|
||||
} catch (error) {
|
||||
logger.error(`Failed to scrape releases from ${site.name} using ${scraper.slug}: ${error.message}`);
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function scrapeNetworkSequential(network) {
|
||||
return Promise.reduce(
|
||||
network.sites,
|
||||
async (chain, site) => {
|
||||
const accSiteReleases = await chain;
|
||||
const siteReleases = await scrapeSite(site, network, accSiteReleases);
|
||||
|
||||
return accSiteReleases.concat(siteReleases);
|
||||
},
|
||||
Promise.resolve([]),
|
||||
);
|
||||
}
|
||||
|
||||
async function scrapeNetworkParallel(network) {
|
||||
return Promise.map(
|
||||
network.sites,
|
||||
async site => scrapeSite(site, network),
|
||||
{ concurrency: 3 },
|
||||
);
|
||||
}
|
||||
|
||||
async function fetchUpdates() {
|
||||
const includedNetworks = argv.sites || argv.networks
|
||||
? await fetchSitesFromArgv()
|
||||
: await fetchSitesFromConfig();
|
||||
|
||||
const scrapedNetworks = await Promise.map(
|
||||
includedNetworks,
|
||||
async network => (network.parameters?.sequential
|
||||
? scrapeNetworkSequential(network)
|
||||
: scrapeNetworkParallel(network)),
|
||||
{ concurrency: 5 },
|
||||
);
|
||||
|
||||
return scrapedNetworks;
|
||||
}
|
||||
|
||||
module.exports = fetchUpdates;
|
||||
@@ -113,7 +113,6 @@ async function fetchSitesFromArgv() {
|
||||
)
|
||||
.whereIn('sites.slug', argv.sites || [])
|
||||
.orWhereIn('networks.slug', argv.networks || [])
|
||||
.where('sites.scrape', true)
|
||||
.leftJoin('networks', 'sites.network_id', 'networks.id');
|
||||
|
||||
const curatedSites = await curateSites(rawSites, true);
|
||||
@@ -132,7 +131,6 @@ async function fetchSitesFromConfig() {
|
||||
'networks.name as network_name', 'networks.slug as network_slug', 'networks.url as network_url', 'networks.description as network_description', 'networks.parameters as network_parameters',
|
||||
)
|
||||
.leftJoin('networks', 'sites.network_id', 'networks.id')
|
||||
.where('sites.scrape', true)
|
||||
.where((builder) => {
|
||||
if (config.include) {
|
||||
builder
|
||||
|
||||
Reference in New Issue
Block a user