'use strict'; const config = require('config'); const util = require('util'); const Promise = require('bluebird'); const fs = require('fs'); const fsPromises = require('fs').promises; const path = require('path'); const stream = require('stream'); const { nanoid } = require('nanoid/non-secure'); const mime = require('mime'); // const fileType = require('file-type'); const ffmpeg = require('fluent-ffmpeg'); const sharp = require('sharp'); const blake2 = require('blake2'); const taskQueue = require('promise-task-queue'); const { Upload } = require('@aws-sdk/lib-storage'); const { S3Client } = require('@aws-sdk/client-s3'); const logger = require('./logger')(__filename); const argv = require('./argv'); const knex = require('./knex'); const http = require('./utils/http'); const bulkInsert = require('./utils/bulk-insert'); const chunk = require('./utils/chunk'); const { get } = require('./utils/qu'); // const pipeline = util.promisify(stream.pipeline); const streamQueue = taskQueue(); const s3 = new S3Client({ region: 'eu-central-1', endpoint: 'https://s3.eu-central-1.wasabisys.com', credentials: { accessKeyId: config.s3.accessKey, secretAccessKey: config.s3.secretKey, }, }); function sampleMedias(medias, limit = argv.mediaLimit, preferLast = true) { // limit media sets, use extras as fallbacks if (medias.length <= limit) { return medias; } const chunkSize = Math.floor(medias.length / limit); const rest = medias.length - (limit * chunkSize); const chunks = Array.from( { length: limit }, (value, index) => { const start = (chunkSize * index) + Math.min(index, rest); return medias.slice( start, start + chunkSize + (index < rest ? 1 : 0), ); }, ); // flip last chunk so the very last image (often the best cumshot) is tried first const lastPreferredChunks = preferLast ? chunks.slice(0, -1).concat(chunks.slice(-1).reverse()) : chunks; const groupedMedias = lastPreferredChunks.map((mediaChunk) => { // merge chunked medias into single media with grouped fallback priorities, // so the first sources of each media is preferred over all second sources, etc. const sources = mediaChunk .reduce((accSources, media) => { media.sources.forEach((source, index) => { if (!accSources[index]) { accSources.push([source]); return; } accSources[index].push(source); }); return accSources; }, []) .flat(); return { id: mediaChunk[0].id, role: mediaChunk[0].role, sources, }; }); return groupedMedias; } function itemsByKey(items, key) { return items.reduce((acc, item) => ({ ...acc, [item[key]]: item }), {}); } function isValidUrl(url) { try { const urlObject = new URL(url); return !!urlObject; } catch (error) { return false; } } function toBaseSource(rawSource) { if (rawSource && (rawSource.src || (rawSource.extract && rawSource.url) || rawSource.stream)) { const baseSource = {}; if (rawSource.src) { if (!isValidUrl(rawSource.src)) { return null; } baseSource.src = rawSource.src; } if (rawSource.quality) baseSource.quality = rawSource.quality; if (rawSource.type) baseSource.type = rawSource.type; if (rawSource.url) { if (!isValidUrl(rawSource.url)) { return null; } baseSource.url = rawSource.url; } if (rawSource.extract) baseSource.extract = rawSource.extract; if (rawSource.expectType) baseSource.expectType = rawSource.expectType; if (rawSource.stream) { baseSource.src = rawSource.stream; baseSource.stream = rawSource.stream; } // reject source if response mimetype does not match specified type if (rawSource.verifyType) baseSource.verifyType = rawSource.verifyType; if (rawSource.referer) baseSource.referer = rawSource.referer; if (rawSource.host) baseSource.host = rawSource.host; if (rawSource.attempts) baseSource.attempts = rawSource.attempts; if (rawSource.interval) baseSource.interval = rawSource.interval; if (rawSource.concurrency) baseSource.concurrency = rawSource.concurrency; if (rawSource.vr) baseSource.vr = rawSource.vr; if (rawSource.credit !== undefined) baseSource.credit = rawSource.credit; if (rawSource.comment) baseSource.comment = rawSource.comment; if (rawSource.group) baseSource.group = rawSource.group; if (rawSource.process) baseSource.process = rawSource.process; return baseSource; } if (typeof rawSource === 'string') { if (!isValidUrl(rawSource)) { return null; } if (new URL(rawSource).pathname.match(/\.(m3u8)|(mpd)$/)) { return { src: rawSource, stream: rawSource, }; } return { src: rawSource, }; } if (typeof rawSource === 'function') { return { defer: rawSource, }; } return null; } function baseSourceToBaseMedia(baseSource, role, metadata) { if (Array.isArray(baseSource)) { if (baseSource.length > 0) { return { ...metadata, id: nanoid(), role, sources: baseSource, }; } return null; } if (baseSource) { return { ...metadata, id: nanoid(), role, sources: [baseSource], }; } return null; } function sortBaseTrailersByQuality(sources, role) { if (role === 'trailers') { const sortedSources = sources.sort((sourceA, sourceB) => { if (config.media.trailerQuality.includes(sourceB.quality) && config.media.trailerQuality.indexOf(sourceA.quality) > config.media.trailerQuality.indexOf(sourceB.quality)) { return 1; } if (config.media.trailerQuality.includes(sourceA.quality) && config.media.trailerQuality.indexOf(sourceA.quality) < config.media.trailerQuality.indexOf(sourceB.quality)) { return -1; } return 0; }); return sortedSources; } return sources; } function fallbackMediaToBaseMedia(rawMedia, role, metadata) { const baseSources = rawMedia .map((source) => toBaseSource(source)) .filter(Boolean); const sortedBaseSources = sortBaseTrailersByQuality(baseSources, role); return baseSourceToBaseMedia(sortedBaseSources, role, metadata); } function toBaseMedias(rawMedias, role, metadata) { if (!rawMedias || rawMedias.length === 0) { return []; } const baseMedias = rawMedias.map((rawMedia) => { if (!rawMedia) { return null; } if (Array.isArray(rawMedia)) { // fallback sources provided return fallbackMediaToBaseMedia(rawMedia, role, metadata); } const baseSource = toBaseSource(rawMedia); return baseSourceToBaseMedia(baseSource, role, metadata); }).filter(Boolean); const sampledBaseMedias = sampleMedias(baseMedias); return sampledBaseMedias; } async function findSourceDuplicates(baseMedias) { const sourceUrls = baseMedias .map((baseMedia) => baseMedia.sources.map((source) => source.src)) .flat() .filter(Boolean); const extractUrls = baseMedias .map((baseMedia) => baseMedia.sources.map((source) => source.extract)) .flat() .filter(Boolean); const [existingSourceMedia, existingExtractMedia] = await Promise.all([ // may try to check thousands of URLs at once, don't pass all of them to a single query chunk(sourceUrls).reduce(async (chain, sourceUrlsChunk) => { const accUrls = await chain; const existingUrls = await knex('media').whereIn('source', sourceUrlsChunk); return [...accUrls, ...existingUrls]; }, []), chunk(extractUrls).reduce(async (chain, extractUrlsChunk) => { const accUrls = await chain; const existingUrls = await knex('media').whereIn('source_page', extractUrlsChunk); return [...accUrls, ...existingUrls]; }, []), ]); const existingSourceMediaByUrl = itemsByKey(existingSourceMedia, 'source'); const existingExtractMediaByUrl = itemsByKey(existingExtractMedia, 'source_page'); return { existingSourceMediaByUrl, existingExtractMediaByUrl, }; } async function findHashDuplicates(medias) { const hashes = medias.map((media) => media.meta?.hash || media.entry?.hash).filter(Boolean); const existingHashMediaEntries = await chunk(hashes, 2).reduce(async (chain, hashesChunk) => { const accHashes = await chain; const existingHashes = await knex('media').whereIn('hash', hashesChunk); return [...accHashes, ...existingHashes]; }, []); const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash'); const uniqueHashMedias = medias.filter((media) => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]); const { selfDuplicateMedias, selfUniqueMediasByHash } = uniqueHashMedias.reduce((acc, media) => { if (!media.meta?.hash) { return acc; } if (acc.selfUniqueMediasByHash[media.meta.hash]) { acc.selfDuplicateMedias.push({ ...media, use: acc.selfUniqueMediasByHash[media.meta.hash].id, }); return acc; } acc.selfUniqueMediasByHash[media.meta.hash] = media; return acc; }, { selfDuplicateMedias: [], selfUniqueMediasByHash: {}, }); const selfUniqueHashMedias = Object.values(selfUniqueMediasByHash); const existingHashMedias = medias .filter((media) => existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash]) .map((media) => ({ ...media, entry: existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash], })) .concat(selfDuplicateMedias); return { uniqueHashMedias: selfUniqueHashMedias, existingHashMedias }; } async function extractSource(baseSource, { existingExtractMediaByUrl }) { if (typeof baseSource.defer === 'function') { const src = await baseSource.defer(); return { ...baseSource, ...toBaseSource(src), }; } if (typeof baseSource.extract !== 'function' || !baseSource.url) { return baseSource; } const existingExtractMedia = existingExtractMediaByUrl[baseSource.url]; if (existingExtractMedia) { // media entry found by extract URL return { ...baseSource, entry: existingExtractMedia, }; } const res = await get(baseSource.url); if (res.ok) { const src = await baseSource.extract(res.item); return { ...baseSource, ...toBaseSource(src), }; } throw new Error(`Could not extract source from ${baseSource.url}: ${res.status}`); } async function storeS3Object(filepath, media) { const fullFilepath = path.join(config.media.path, filepath); const file = fs.createReadStream(fullFilepath); const status = await new Upload({ client: s3, params: { Bucket: config.s3.bucket, Body: file, Key: filepath, ContentType: media.meta.mimetype, }, }).done(); await fsPromises.unlink(fullFilepath); logger.silly(`Uploaded '${media.id}' from ${media.src} to S3 bucket '${status.Bucket}' at ${status.Location}`); return status; } async function writeImage(image, media, info, filepath, isProcessed) { if (isProcessed && info.pages) { // convert animated image to WebP and write to permanent location await image .webp() .toFile(path.join(config.media.path, filepath)); return; } await image .resize({ height: config.media.maxSize, withoutEnlargement: true, }) .jpeg({ quality: config.media.quality }) .toFile(path.join(config.media.path, filepath)); } async function writeThumbnail(image, thumbpath) { return image .jpeg({ quality: config.media.thumbnailQuality }) .resize({ height: config.media.thumbnailSize, withoutEnlargement: true, }) .rotate() .toFile(path.join(config.media.path, thumbpath)); } async function writeLazy(image, lazypath) { return image .resize({ height: config.media.lazySize, withoutEnlargement: true, }) .jpeg({ quality: config.media.lazyQuality }) .rotate() .toFile(path.join(config.media.path, lazypath)); } async function storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options) { logger.silly(`Storing permanent media files for ${media.id} from ${media.src} at ${filepath}`); logger.debug(`Memory usage at image storage: ${process.memoryUsage.rss() / 1000000} MB (${media.src})`); try { const thumbdir = config.s3.enabled ? path.join(media.role, 'thumbs') : path.join(media.role, 'thumbs', hashDir, hashSubDir); const thumbpath = path.join(thumbdir, filename); const lazydir = config.s3.enabled ? path.join(media.role, 'lazy') : path.join(media.role, 'lazy', hashDir, hashSubDir); const lazypath = path.join(lazydir, filename); await Promise.all([ fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }), fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }), fsPromises.mkdir(path.join(config.media.path, lazydir), { recursive: true }), ]); const image = sharp(media.file.path, { pages: ['trailer', 'teaser'].includes(media.role) ? -1 : 1 }); // don't store posters as animation const isProcessed = media.meta.subtype !== 'jpeg' || media.process; const [info, stats] = await Promise.all([ image.metadata(), options?.stats && image.stats(), ]); if (media.process) { Object.entries(media.process).forEach(([operation, processOptions]) => { if (image[operation]) { image[operation](...(Array.isArray(processOptions) ? processOptions : [processOptions])); return; } if (operation === 'crop') { image.extract(...(Array.isArray(processOptions) ? processOptions : [processOptions])); return; } logger.warn(`Unknown image operation on ${media.id} (${media.src}): ${operation}`); }); } await writeImage(image, media, info, filepath, isProcessed); await Promise.all([ writeThumbnail(image, thumbpath, info), writeLazy(image, lazypath, info), ]); /* if (isProcessed) { // file already stored, remove temporary file await fsPromises.unlink(media.file.path); } else { // image not processed, simply move temporary file to final location await fsPromises.rename(media.file.path, path.join(config.media.path, filepath)); } */ await fsPromises.unlink(media.file.path); if (config.s3.enabled) { await Promise.all([ storeS3Object(filepath, media), storeS3Object(thumbpath, media), storeS3Object(lazypath, media), ]); } logger.silly(`Stored thumbnail, lazy and permanent media file for ${media.id} from ${media.src} at ${filepath}`); return { ...media, file: { path: filepath, thumbnail: thumbpath, lazy: lazypath, }, meta: { ...media.meta, // 6 or 8 implies image is sideways, and size is not inherently adjusted for orientation width: info.orientation === 6 || info.orientation === 8 ? info.height : info.width, height: info.orientation === 6 || info.orientation === 8 ? info.width : info.height, entropy: stats?.entropy || null, sharpness: stats?.sharpness || null, }, }; } catch (error) { logger.error(`Failed to store ${media.id} from ${media.src} at ${filepath}: ${error.message}`); await fsPromises.unlink(media.file.path); return null; } } async function storeFile(media, options) { try { const hashDir = media.meta.hash.slice(0, 2); const hashSubDir = media.meta.hash.slice(2, 4); const hashFilename = config.s3.enabled ? media.meta.hash : media.meta.hash.slice(4); const filename = media.quality ? `${hashFilename}_${media.quality}.${media.meta.extension}` : `${hashFilename}.${media.meta.extension}`; const filedir = config.s3.enabled ? media.role : path.join(media.role, hashDir, hashSubDir); const filepath = path.join(filedir, filename); if (argv.forceMedia) { try { // remove old file to in case rename() does not overwrite (possibly on NFS setups) await fsPromises.unlink(path.join(config.media.path, filepath)); } catch (error) { // file probably didn't exist } } if (media.meta.type === 'image') { return storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options); } if (['posters', 'photos', 'caps', 'covers'].includes(media.role)) { throw new Error(`Media for '${media.role}' must be an image, but '${media.meta.mimetype}' was detected`); } if (['teasers', 'trailers'].includes(media.role) && media.meta.type !== 'video') { throw new Error(`Media for '${media.role}' must be a video, but '${media.meta.mimetype}' was detected in ${media.src}`); } const [stat] = await Promise.all([ fsPromises.stat(media.file.path), fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }), ]); // move temp file to permanent location await fsPromises.rename(media.file.path, path.join(config.media.path, filepath)); if (config.s3.enabled) { // upload the file to S3 storage, will remove original await storeS3Object(filepath, media); } logger.silly(`Stored permanent media file for ${media.id} from ${media.src} at ${filepath}`); return { ...media, file: { path: filepath, }, meta: { ...media.meta, size: stat.size, }, }; } catch (error) { logger.warn(`Failed to store ${media.src}: ${error.message}`); try { await fsPromises.unlink(media.file.path); } catch (unlinkError) { logger.warn(`Failed to unlink ${media.file.path} from ${media.src}: ${unlinkError.message}`); } return null; } } async function fetchHttpSource(source, tempFileTarget, hashStream) { const res = await http.get(source.src, { headers: { ...(source.referer && { referer: source.referer }), ...(source.host && { host: source.host }), }, stream: true, // sources are fetched in parallel, don't gobble up memory transforms: [hashStream], destination: tempFileTarget, ...(source.interval && { interval: source.interval }), ...(source.concurrency && { concurrency: source.concurrency }), }); if (!res.ok) { throw new Error(`Response ${res.status} not OK`); } return { mimetype: (source.expectType ? source.expectType[res.headers['content-type']] : res.headers['content-type']) || mime.getType(new URL(source.src).pathname), }; } streamQueue.define('fetchStreamSource', async ({ source, tempFileTarget, hashStream }) => { const meta = { mimetype: 'video/mp4' }; const video = ffmpeg(source.stream) .format('mp4') .outputOptions(['-movflags frag_keyframe+empty_moov']) .on('start', (cmd) => logger.verbose(`Fetching stream from ${source.stream} with "${cmd}"`)) .on('error', (error) => logger.error(`Failed to fetch stream from ${source.stream}: ${error.message}`)) .pipe(); // await pipeline(video, hashStream, tempFileTarget); await stream.promises.pipeline(video, hashStream, tempFileTarget); logger.verbose(`Finished fetching stream from ${source.stream}`); return meta; }, { concurrency: config.media.streamConcurrency, }); async function fetchSource(source, baseMedia) { const maxAttempts = source.attempts || argv.mediaAttempts; logger.silly(`Fetching media from ${source.src}`); logger.debug(`Memory usage before media fetch: ${process.memoryUsage.rss() / 1000000} MB (${source.src})`); if (source.stream && !config.media.fetchStreams) { throw new Error(`Stream fetching disabled, ignoring ${source.stream}`); } async function attempt(attempts = 1) { const hasher = new blake2.Hash('blake2b', { digestLength: 24 }); let hasherReady = true; hasher.setEncoding('hex'); try { const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`); const tempFileTarget = fs.createWriteStream(tempFilePath); const hashStream = new stream.PassThrough(); let size = 0; hashStream.on('data', (streamChunk) => { size += streamChunk.length; if (hasherReady) { hasher.write(streamChunk); } }); const { mimetype } = source.stream ? await streamQueue.push('fetchStreamSource', { source, tempFileTarget, hashStream }) : await fetchHttpSource(source, tempFileTarget, hashStream); hasher.end(); const hash = hasher.read(); const [type, subtype] = mimetype.split('/'); const extension = mime.getExtension(mimetype); if (source.verifyType && source.verifyType !== type) { throw Object.assign(new Error(`Type '${type}' does not match type '${source.verifyType}' specified by source`), { code: 'VERIFY_TYPE' }); } return { ...source, file: { path: tempFilePath, }, meta: { hash, mimetype, extension, type, subtype, size, }, }; } catch (error) { hasherReady = false; hasher.end(); if (error.code !== 'VERIFY_TYPE') { logger.warn(`Failed attempt ${attempts}/${maxAttempts} to fetch ${source.src}: ${error.message}`); if (attempts < maxAttempts) { await Promise.delay(1000); return attempt(attempts + 1); } } throw new Error(`Failed to fetch ${source.src}: ${error.message}`); } } return attempt(1); } async function trySource(baseSource, existingMedias, baseMedia) { // catch error and try the next source const extractedSource = await extractSource(baseSource, existingMedias); const existingSourceMedia = existingMedias.existingSourceMediaByUrl[extractedSource.src]; if (!argv.forceMedia && extractedSource.entry) { logger.silly(`Media page URL already in database, not extracting ${baseSource.url}`); // media entry found during extraction, don't fetch return extractedSource; } if (!argv.forceMedia && existingSourceMedia) { logger.silly(`Media source URL already in database, skipping ${baseSource.src}`); // media entry found by source URL, don't fetch return { ...baseSource, entry: existingSourceMedia, }; } const source = await fetchSource(extractedSource, baseMedia); return { ...source, entry: extractedSource.entry || existingSourceMedia, }; } async function fetchMedia(baseMedia, existingMedias) { try { const source = await baseMedia.sources.reduce( // try each source until success (result, baseSource, baseSourceIndex) => result.catch(async (error) => { if (error.message) { logger.warn(error.message); } return trySource(baseSource, existingMedias, baseMedia, baseSourceIndex); }), Promise.reject(new Error()), ); return { ...baseMedia, ...source, }; } catch (error) { logger.warn(error.message); return baseMedia; } } function curateMediaEntry(media, index) { if (media.entry) { return media; } const curatedMediaEntry = { id: media.id, path: media.file.path, thumbnail: media.file.thumbnail, lazy: media.file.lazy, is_s3: config.s3.enabled, index, mime: media.meta.mimetype, hash: media.meta.hash, size: media.meta.size, width: media.meta.width, height: media.meta.height, is_vr: media.vr, entropy: media.meta.entropy, sharpness: media.meta.sharpness, source: media.src, source_page: media.url, scraper: media.scraper, credit: media.credit, comment: media.comment, }; return { ...media, newEntry: true, entry: curatedMediaEntry, }; } async function storeMedias(baseMedias, options) { await fsPromises.mkdir(path.join(config.media.path, 'temp'), { recursive: true }); const { existingSourceMediaByUrl, existingExtractMediaByUrl } = await findSourceDuplicates(baseMedias); const fetchedMedias = await Promise.map( baseMedias, async (baseMedia) => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }), // { concurrency: 100 }, // don't overload disk (or network, although this has its own throttling) { concurrency: 10 }, // don't overload disk (or network, although this has its own throttling) ); const { uniqueHashMedias, existingHashMedias } = await findHashDuplicates(fetchedMedias); const savedMedias = await Promise.map( uniqueHashMedias, async (baseMedia) => storeFile(baseMedia, options), { concurrency: 100 }, // don't overload disk ); if (argv.forceMedia) { // overwrite files in case image processing was changed await Promise.map( existingHashMedias, async (baseMedia) => storeFile(baseMedia, options), { concurrency: 100 }, // don't overload disk ); } const newMediaWithEntries = savedMedias.filter(Boolean).map((media, index) => curateMediaEntry(media, index)); const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry); try { await bulkInsert('media', newMediaEntries, false); return [...newMediaWithEntries, ...existingHashMedias]; } catch (error) { throw Object.assign(error, { entries: newMediaEntries }); } } async function associateReleaseMedia(releases, type = 'release') { if (!argv.media) { return; } const baseMediasByReleaseId = releases.reduce((acc, release) => ({ ...acc, [release.id]: [ ...(argv.images && argv.poster ? toBaseMedias([release.poster], 'posters') : []), ...(argv.images && argv.covers ? toBaseMedias(release.covers, 'covers') : []), ...(argv.images && argv.photos ? toBaseMedias(release.photos, 'photos') : []), ...(argv.images && argv.caps ? toBaseMedias(release.caps, 'caps') : []), ...(argv.videos && argv.trailer ? toBaseMedias([release.trailer], 'trailers') : []), ...(argv.videos && argv.teaser ? toBaseMedias([release.teaser], 'teasers') : []), ], }), {}); const baseMediasByRole = Object.values(baseMediasByReleaseId) .flat() .filter(Boolean) .reduce((acc, baseMedia) => { if (!acc[baseMedia.role]) acc[baseMedia.role] = []; acc[baseMedia.role].push(baseMedia); return acc; }, {}); await Promise.reduce(['posters', 'covers', 'photos', 'caps', 'teasers', 'trailers'], async (chain, role) => { // stage by role so posters are prioritized over photos and videos await chain; const baseMedias = baseMediasByRole[role]; if (!baseMedias) { return; } try { const storedMedias = await storeMedias(baseMedias); const storedMediasById = itemsByKey(storedMedias, 'id'); const associations = Object .entries(baseMediasByReleaseId) .reduce((acc, [releaseId, releaseBaseMedias]) => { releaseBaseMedias.forEach((baseMedia) => { const media = storedMediasById[baseMedia.id]; const mediaId = (storedMediasById[media?.use] && media?.use) || media?.entry?.id; if (mediaId) { acc.push({ [`${type}_id`]: releaseId, media_id: mediaId, }); } }); return acc; }, []) .filter(Boolean); if (associations.length > 0) { await bulkInsert(`${type}s_${role}`, associations, false); } } catch (error) { if (error.entries) { logger.error(util.inspect(error.entries, null, null, { color: true })); } logger.error(`Failed to store ${type} ${role}: ${error.message} (${error.detail || 'no detail'})`); } }, Promise.resolve()); } async function associateAvatars(profiles) { if (!argv.media) { return profiles; } const profilesWithBaseMedias = profiles.map((profile) => (profile.avatar ? { ...profile, avatarBaseMedia: toBaseMedias([profile.avatar], 'avatars', { credit: profile.credit || profile.entity?.name || null, scraper: profile.scraper || null, })[0], } : profile )); const baseMedias = profilesWithBaseMedias.map((profile) => profile.avatarBaseMedia).filter(Boolean); const storedMedias = await storeMedias(baseMedias, { stats: true }); const storedMediasById = itemsByKey(storedMedias, 'id'); const profilesWithAvatarIds = profilesWithBaseMedias.map((profile) => { const media = storedMediasById[profile.avatarBaseMedia?.id]; if (media) { return { ...profile, avatarMediaId: media.use || media.entry.id, }; } return profile; }); return profilesWithAvatarIds; } async function deleteS3Objects(media) { const objects = media .map((item) => [ { Key: item.path }, { Key: item.thumbnail }, { Key: item.lazy }, ]) .flat() .filter((item) => item.Key); const status = await s3.deleteObjects({ Bucket: config.s3.bucket, Delete: { Objects: objects, Quiet: false, }, }); logger.info(`Removed ${status.Deleted.length} media files from S3 bucket '${config.s3.bucket}', ${status.Errors.length} errors`); return status; } async function flushOrphanedMedia() { const orphanedMedia = await knex('media') .where('is_sfw', false) .whereNotExists( knex .from( knex('tags_posters') .select('media_id') .unionAll( knex('tags_photos').select('media_id'), knex('releases_posters').select('media_id'), knex('releases_photos').select('media_id'), knex('releases_caps').select('media_id'), knex('releases_covers').select('media_id'), knex('releases_trailers').select('media_id'), knex('releases_teasers').select('media_id'), knex('movies_covers').select('media_id'), knex('movies_trailers').select('media_id'), knex('actors').select(knex.raw('avatar_media_id as media_id')), knex('actors_profiles').select(knex.raw('avatar_media_id as media_id')), knex('actors_photos').select('media_id'), knex('chapters_photos').select('media_id'), knex('chapters_posters').select('media_id'), ) .as('associations'), ) .whereRaw('associations.media_id = media.id'), ) .returning(['media.id', 'media.is_s3', 'media.path', 'media.thumbnail', 'media.lazy']) .delete(); if (argv.flushMediaFiles) { await Promise.all(orphanedMedia.filter((media) => !media.is_s3).map((media) => Promise.all([ media.path && fsPromises.unlink(path.join(config.media.path, media.path)).catch(() => { /* probably file not found */ }), media.thumbnail && fsPromises.unlink(path.join(config.media.path, media.thumbnail)).catch(() => { /* probably file not found */ }), media.lazy && fsPromises.unlink(path.join(config.media.path, media.lazy)).catch(() => { /* probably file not found */ }), ]))); if (config.s3.enabled) { await deleteS3Objects(orphanedMedia.filter((media) => media.is_s3)); } logger.info(`Removed ${orphanedMedia.length} media files from database and storage`); } else { logger.info(`Removed ${orphanedMedia.length} media files from database, but not from storage`); } try { await fsPromises.rm(path.join(config.media.path, 'temp'), { recursive: true }); logger.info('Cleared temporary media directory'); } catch (error) { logger.warn(`Failed to clear temporary media directory: ${error.message}`); } } module.exports = { associateAvatars, associateReleaseMedia, flushOrphanedMedia, };