Queueing and batching media HTTP requests for improved reliability.
This commit is contained in:
114
src/media.js
114
src/media.js
@@ -2,7 +2,7 @@
|
||||
|
||||
const config = require('config');
|
||||
const Promise = require('bluebird');
|
||||
const bhttp = require('bhttp');
|
||||
// const bhttp = require('bhttp');
|
||||
const mime = require('mime');
|
||||
const fs = require('fs-extra');
|
||||
const sharp = require('sharp');
|
||||
@@ -11,7 +11,9 @@ const blake2 = require('blake2');
|
||||
|
||||
const logger = require('./logger')(__filename);
|
||||
const knex = require('./knex');
|
||||
const { get } = require('./utils/http');
|
||||
const { ex } = require('./utils/q');
|
||||
const chunk = require('./utils/chunk');
|
||||
|
||||
function getHash(buffer) {
|
||||
const hash = blake2.createHash('blake2b', { digestLength: 24 });
|
||||
@@ -73,7 +75,8 @@ async function getEntropy(buffer) {
|
||||
}
|
||||
|
||||
async function extractItem(source) {
|
||||
const res = await bhttp.get(source.src);
|
||||
// const res = await bhttp.get(source.src);
|
||||
const res = await get(source.src);
|
||||
|
||||
if (res.statusCode === 200) {
|
||||
const { q } = ex(res.body.toString());
|
||||
@@ -114,7 +117,8 @@ async function fetchItem(source, index, existingItemsBySource, domain, role, att
|
||||
|
||||
logger.verbose(`Fetching media item from ${source.src || source}`);
|
||||
|
||||
const res = await bhttp.get(source.src || source);
|
||||
// const res = await bhttp.get(source.src || source);
|
||||
const res = await get(source.src || source);
|
||||
|
||||
if (res.statusCode === 200) {
|
||||
const { pathname } = new URL(source.src || source);
|
||||
@@ -150,9 +154,7 @@ async function fetchItem(source, index, existingItemsBySource, domain, role, att
|
||||
}
|
||||
|
||||
async function fetchItems(itemSources, existingItemsBySource, domain, role) {
|
||||
return Promise.map(itemSources, async (source, index) => fetchItem(source, index, existingItemsBySource, domain, role), {
|
||||
concurrency: 10,
|
||||
}).filter(Boolean);
|
||||
return Promise.map(itemSources, async (source, index) => fetchItem(source, index, existingItemsBySource, domain, role)).filter(Boolean);
|
||||
}
|
||||
|
||||
async function saveItems(items, domain, role) {
|
||||
@@ -182,10 +184,15 @@ async function saveItems(items, domain, role) {
|
||||
logger.verbose(`Saved ${domain} ${role} with thumbnail to ${filepath}`);
|
||||
|
||||
return {
|
||||
...item,
|
||||
thumbnail,
|
||||
filepath,
|
||||
thumbpath,
|
||||
mimetype: item.mimetype,
|
||||
extension: item.extension,
|
||||
hash: item.hash,
|
||||
entropy: item.entropy,
|
||||
quality: item.quality,
|
||||
source: item.source,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -199,8 +206,6 @@ async function saveItems(items, domain, role) {
|
||||
logger.error(`Failed to store ${domain} ${role} from ${item.source}: ${error.message}`);
|
||||
return null;
|
||||
}
|
||||
}, {
|
||||
concurrency: 20,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -227,50 +232,55 @@ function groupItems(items) {
|
||||
}
|
||||
|
||||
async function storeMedia(sources, domain, role) {
|
||||
try {
|
||||
const presentSources = sources.filter(Boolean);
|
||||
const presentSources = sources.filter(Boolean);
|
||||
|
||||
if (presentSources.length === 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// find source duplicates that don't need to be re-downloaded or re-saved
|
||||
const existingSourceItems = await knex('media').whereIn('source', presentSources.flat().map(source => source.src || source));
|
||||
const { source: existingSourceItemsBySource, hash: existingSourceItemsByHash } = groupItems(existingSourceItems);
|
||||
|
||||
// download media items from new sources
|
||||
const fetchedItems = await fetchItems(presentSources, existingSourceItemsBySource, domain, role);
|
||||
const { hash: fetchedItemsByHash } = groupItems(fetchedItems);
|
||||
|
||||
// find hash duplicates that don't need to be re-saved
|
||||
const uniqueFetchedItems = Object.values(fetchedItemsByHash);
|
||||
const existingHashItems = await knex('media').whereIn('hash', uniqueFetchedItems.map(item => item.hash));
|
||||
const { hash: existingHashItemsByHash } = groupItems(existingHashItems);
|
||||
|
||||
// save new items to disk
|
||||
const newItems = uniqueFetchedItems.filter(item => !existingHashItemsByHash[item.hash]);
|
||||
const savedItems = await saveItems(newItems, domain, role);
|
||||
|
||||
// store new items in database
|
||||
const curatedItemEntries = curateItemEntries(savedItems);
|
||||
const storedItems = await knex('media').insert(curatedItemEntries).returning('*');
|
||||
const { hash: storedItemsByHash } = groupItems(Array.isArray(storedItems) ? storedItems : []);
|
||||
|
||||
// accumulate existing and new items by source to be mapped onto releases
|
||||
const itemsByHash = { ...existingSourceItemsByHash, ...existingHashItemsByHash, ...storedItemsByHash };
|
||||
const itemsBySource = {
|
||||
...existingSourceItemsBySource,
|
||||
...fetchedItems.reduce((acc, item) => ({ ...acc, [item.source]: itemsByHash[item.hash] }), {}),
|
||||
};
|
||||
|
||||
logger.info(`Stored ${fetchedItems.length} new ${domain} ${role}s`);
|
||||
|
||||
return itemsBySource;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to store ${domain} ${role} batch: ${error.message}`);
|
||||
|
||||
return null;
|
||||
if (presentSources.length === 0) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// split up source list to prevent excessive RAM usage
|
||||
const itemChunksBySource = await Promise.all(chunk(presentSources, 50).map(async (sourceChunk, index) => {
|
||||
try {
|
||||
// find source duplicates that don't need to be re-downloaded or re-saved
|
||||
const existingSourceItems = await knex('media').whereIn('source', sourceChunk.flat().map(source => source.src || source));
|
||||
const { source: existingSourceItemsBySource, hash: existingSourceItemsByHash } = groupItems(existingSourceItems);
|
||||
|
||||
// download media items from new sources
|
||||
const fetchedItems = await fetchItems(sourceChunk, existingSourceItemsBySource, domain, role);
|
||||
const { hash: fetchedItemsByHash } = groupItems(fetchedItems);
|
||||
|
||||
// find hash duplicates that don't need to be re-saved
|
||||
const uniqueFetchedItems = Object.values(fetchedItemsByHash);
|
||||
const existingHashItems = await knex('media').whereIn('hash', uniqueFetchedItems.map(item => item.hash));
|
||||
const { hash: existingHashItemsByHash } = groupItems(existingHashItems);
|
||||
|
||||
// save new items to disk
|
||||
const newItems = uniqueFetchedItems.filter(item => !existingHashItemsByHash[item.hash]);
|
||||
const savedItems = await saveItems(newItems, domain, role);
|
||||
|
||||
// store new items in database
|
||||
const curatedItemEntries = curateItemEntries(savedItems);
|
||||
const storedItems = await knex('media').insert(curatedItemEntries).returning('*');
|
||||
const { hash: storedItemsByHash } = groupItems(Array.isArray(storedItems) ? storedItems : []);
|
||||
|
||||
// accumulate existing and new items by source to be mapped onto releases
|
||||
const itemsByHash = { ...existingSourceItemsByHash, ...existingHashItemsByHash, ...storedItemsByHash };
|
||||
const itemsBySource = {
|
||||
...existingSourceItemsBySource,
|
||||
...fetchedItems.reduce((acc, item) => ({ ...acc, [item.source]: itemsByHash[item.hash] }), {}),
|
||||
};
|
||||
|
||||
logger.info(`Stored batch ${index + 1} with ${fetchedItems.length} of new ${domain} ${role}s`);
|
||||
|
||||
return itemsBySource;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to store ${domain} ${role} batch ${index + 1}: ${error.message}`);
|
||||
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
|
||||
return itemChunksBySource.reduce((acc, itemChunk) => ({ ...acc, ...itemChunk }), {});
|
||||
}
|
||||
|
||||
function extractPrimaryItem(associations, targetId, role, primaryRole, primaryItemsByTargetId) {
|
||||
@@ -319,7 +329,7 @@ async function associateMedia(sourcesByTargetId, mediaBySource, domain, role, pr
|
||||
const primaryAssociations = associationsPerTarget.map(association => association[primaryRole]).filter(Boolean);
|
||||
|
||||
logger.info(`Associated ${associations.length} ${role}s to ${domain}s`);
|
||||
logger.info(`Associated ${primaryAssociations.length} extracted ${primaryRole}s to ${domain}s`);
|
||||
if (primaryRole) logger.info(`Associated ${primaryAssociations.length} extracted ${primaryRole}s to ${domain}s`);
|
||||
|
||||
return Promise.all([
|
||||
(associations.length > 0 && knex.raw(`${knex(`${domain}s_${role}s`).insert(associations).toString()} ON CONFLICT DO NOTHING`)),
|
||||
|
||||
Reference in New Issue
Block a user