'use strict'; const config = require('config'); const Promise = require('bluebird'); // const bhttp = require('bhttp'); const mime = require('mime'); const fs = require('fs-extra'); const sharp = require('sharp'); const path = require('path'); const blake2 = require('blake2'); const logger = require('./logger')(__filename); const knex = require('./knex'); const { get } = require('./utils/http'); const { ex } = require('./utils/q'); const chunk = require('./utils/chunk'); function getHash(buffer) { const hash = blake2.createHash('blake2b', { digestLength: 24 }); hash.update(buffer); return hash.digest('hex'); } async function createThumbnail(buffer) { try { const thumbnail = sharp(buffer) .resize({ height: config.media.thumbnailSize, withoutEnlargement: true, }) .jpeg({ quality: config.media.thumbnailQuality, }) .toBuffer(); return thumbnail; } catch (error) { logger.error(`Failed to create thumbnail: ${error.message}`); } return null; } function pluckItems(items, specifiedLimit) { const limit = specifiedLimit || config.media.limit; if (!items || items.length <= limit) return items; const plucked = [1] .concat( Array.from({ length: limit - 1 }, (value, index) => Math.round((index + 1) * (items.length / (limit - 1)))), ); return Array.from(new Set(plucked)).map(itemIndex => items[itemIndex - 1]); // remove duplicates, may happen when photo total and photo limit are close } function pickQuality(items) { const itemsByQuality = items.reduce((acc, item) => ({ ...acc, [item.quality]: item }), {}); const item = config.media.videoQuality.reduce((acc, quality) => acc || itemsByQuality[quality], null); return item || items[0]; } async function getEntropy(buffer) { try { const { entropy } = await sharp(buffer).stats(); return entropy; } catch (error) { logger.warn(`Failed to retrieve image entropy, using 7.5: ${error.message}`); return 7.5; } } async function extractItem(source) { // const res = await bhttp.get(source.src); const res = await get(source.src); if (res.statusCode === 200) { const { q } = ex(res.body.toString()); return source.extract(q); } return null; } async function fetchItem(source, index, existingItemsBySource, domain, role, attempt = 1, originalSource = null) { try { if (Array.isArray(source)) { if (source.every(sourceX => !!sourceX.quality)) { // various video qualities provided const selectedSource = pickQuality(source); return fetchItem(selectedSource, index, existingItemsBySource, domain, role, attempt, originalSource); } // fallbacks provided return source.reduce( (outcome, sourceX) => outcome.catch(async () => fetchItem(sourceX, index, existingItemsBySource, domain, role, attempt, originalSource)), Promise.reject(new Error()), ); } if (source.src && source.extract) { // source links to page containing a (presumably) tokenized photo const itemSource = await extractItem(source); return fetchItem(itemSource, index, existingItemsBySource, domain, role, attempt, source); } if (existingItemsBySource[source]) { return null; } logger.verbose(`Fetching media item from ${source.src || source}`); // const res = await bhttp.get(source.src || source); const res = await get(source.src || source); if (res.statusCode === 200) { const { pathname } = new URL(source.src || source); const mimetype = mime.getType(pathname); const extension = mime.getExtension(mimetype); const hash = getHash(res.body); const entropy = /image/.test(mimetype) ? await getEntropy(res.body) : null; logger.verbose(`Fetched media item from ${source.src || source}`); return { file: res.body, mimetype, extension, hash, entropy, quality: source.quality || null, source: originalSource?.src || originalSource || source.src || source, }; } throw new Error(`Response ${res.statusCode} not OK`); } catch (error) { logger.warn(`Failed attempt ${attempt}/3 to fetch ${domain} ${role} ${index + 1} (${source.src || source}): ${error}`); if (attempt < 3) { await Promise.delay(5000); return fetchItem(source, index, existingItemsBySource, domain, role, attempt + 1); } return null; } } async function fetchItems(itemSources, existingItemsBySource, domain, role) { return Promise.map(itemSources, async (source, index) => fetchItem(source, index, existingItemsBySource, domain, role)).filter(Boolean); } async function saveItems(items, domain, role) { return Promise.map(items, async (item) => { try { const dir = item.hash.slice(0, 2); const subdir = item.hash.slice(2, 4); const filename = item.quality ? `${item.hash.slice(4)}_${item.quality}.${item.extension}` : `${item.hash.slice(4)}.${item.extension}`; const filedir = path.join(`${role}s`, dir, subdir); const filepath = path.join(filedir, filename); await fs.mkdir(path.join(config.media.path, filedir), { recursive: true }); await fs.writeFile(path.join(config.media.path, filepath), item.file); if (/image/.test(item.mimetype)) { const thumbnail = await createThumbnail(item.file); const thumbdir = path.join(`${role}s`, 'thumbs', dir, subdir); const thumbpath = path.join(thumbdir, filename); await fs.mkdir(path.join(config.media.path, thumbdir), { recursive: true }); await fs.writeFile(path.join(config.media.path, thumbpath), thumbnail); logger.verbose(`Saved ${domain} ${role} with thumbnail to ${filepath}`); return { thumbnail, filepath, thumbpath, mimetype: item.mimetype, extension: item.extension, hash: item.hash, entropy: item.entropy, quality: item.quality, source: item.source, }; } logger.verbose(`Saved ${domain} ${role} to ${filepath}`); return { ...item, filepath, }; } catch (error) { logger.error(`Failed to store ${domain} ${role} from ${item.source}: ${error.message}`); return null; } }); } function curateItemEntries(items) { return items.filter(Boolean).map((item, index) => ({ path: item.filepath, thumbnail: item.thumbpath, mime: item.mimetype, hash: item.hash, source: item.source, entropy: item.entropy, index, })); } function groupItems(items) { return items.reduce((acc, item) => ({ source: { ...acc.source, [item.source]: item }, hash: { ...acc.hash, [item.hash]: item }, }), { source: {}, hash: {}, }); } async function storeMedia(sources, domain, role) { const presentSources = sources.filter(Boolean); if (presentSources.length === 0) { return {}; } // split up source list to prevent excessive RAM usage const itemChunksBySource = await Promise.all(chunk(presentSources, 50).map(async (sourceChunk, index) => { try { // find source duplicates that don't need to be re-downloaded or re-saved const existingSourceItems = await knex('media').whereIn('source', sourceChunk.flat().map(source => source.src || source)); const { source: existingSourceItemsBySource, hash: existingSourceItemsByHash } = groupItems(existingSourceItems); // download media items from new sources const fetchedItems = await fetchItems(sourceChunk, existingSourceItemsBySource, domain, role); const { hash: fetchedItemsByHash } = groupItems(fetchedItems); // find hash duplicates that don't need to be re-saved const uniqueFetchedItems = Object.values(fetchedItemsByHash); const existingHashItems = await knex('media').whereIn('hash', uniqueFetchedItems.map(item => item.hash)); const { hash: existingHashItemsByHash } = groupItems(existingHashItems); // save new items to disk const newItems = uniqueFetchedItems.filter(item => !existingHashItemsByHash[item.hash]); const savedItems = await saveItems(newItems, domain, role); // store new items in database const curatedItemEntries = curateItemEntries(savedItems); const storedItems = await knex('media').insert(curatedItemEntries).returning('*'); const { hash: storedItemsByHash } = groupItems(Array.isArray(storedItems) ? storedItems : []); // accumulate existing and new items by source to be mapped onto releases const itemsByHash = { ...existingSourceItemsByHash, ...existingHashItemsByHash, ...storedItemsByHash }; const itemsBySource = { ...existingSourceItemsBySource, ...fetchedItems.reduce((acc, item) => ({ ...acc, [item.source]: itemsByHash[item.hash] }), {}), }; logger.info(`Stored batch ${index + 1} with ${fetchedItems.length} of new ${domain} ${role}s`); return itemsBySource; } catch (error) { logger.error(`Failed to store ${domain} ${role} batch ${index + 1}: ${error.message}`); return null; } })); return itemChunksBySource.reduce((acc, itemChunk) => ({ ...acc, ...itemChunk }), {}); } function extractPrimaryItem(associations, targetId, role, primaryRole, primaryItemsByTargetId) { if (!primaryRole) { return { [role]: associations, [primaryRole]: null }; } if (primaryItemsByTargetId[targetId]) { const remainingAssociations = associations.filter(association => association.media_id !== primaryItemsByTargetId[targetId].media_id); return { [role]: remainingAssociations, [primaryRole]: null }; } return { [role]: associations.slice(1), [primaryRole]: associations.slice(0, 1)[0], }; } function associateTargetMedia(targetId, sources, mediaBySource, domain, role, primaryRole, primaryItemsByTargetId) { if (!sources) return { [role]: null, [primaryRole]: null }; const associations = sources .filter(Boolean) .map((source) => { const mediaItem = Array.isArray(source) ? source.reduce((acc, sourceX) => acc || mediaBySource[sourceX.src || sourceX], null) : mediaBySource[source.src || source]; return mediaItem && { [`${domain}_id`]: targetId, media_id: mediaItem.id }; }) .filter(Boolean); logger.silly(`Associating ${associations.length} ${role}s to ${domain} ${targetId}`); return extractPrimaryItem(associations, targetId, role, primaryRole, primaryItemsByTargetId); } async function associateMedia(sourcesByTargetId, mediaBySource, domain, role, primaryRole) { const primaryItems = primaryRole ? await knex(`${domain}s_${primaryRole}s`).whereIn(`${domain}_id`, Object.keys(sourcesByTargetId)) : []; const primaryItemsByTargetId = primaryItems.reduce((acc, item) => ({ ...acc, [item[`${domain}_id`]]: item }), {}); const associationsPerTarget = await Promise.map(Object.entries(sourcesByTargetId), ([targetId, sources]) => associateTargetMedia(targetId, sources, mediaBySource, domain, role, primaryRole, primaryItemsByTargetId)); const associations = associationsPerTarget.map(association => association[role]).flat().filter(Boolean); const primaryAssociations = associationsPerTarget.map(association => association[primaryRole]).filter(Boolean); logger.info(`Associated ${associations.length} ${role}s to ${domain}s`); if (primaryRole) logger.info(`Associated ${primaryAssociations.length} extracted ${primaryRole}s to ${domain}s`); return Promise.all([ (associations.length > 0 && knex.raw(`${knex(`${domain}s_${role}s`).insert(associations).toString()} ON CONFLICT DO NOTHING`)), (primaryAssociations.length > 0 && knex.raw(`${knex(`${domain}s_${primaryRole}s`).insert(primaryAssociations).toString()} ON CONFLICT DO NOTHING`)), ]); } module.exports = { pluckItems, storeMedia, associateMedia, };