1062 lines
29 KiB
JavaScript
Executable File
1062 lines
29 KiB
JavaScript
Executable File
'use strict';
|
|
|
|
const config = require('config');
|
|
const util = require('util');
|
|
const Promise = require('bluebird');
|
|
const fs = require('fs');
|
|
const fsPromises = require('fs').promises;
|
|
const path = require('path');
|
|
const stream = require('stream');
|
|
const { nanoid } = require('nanoid/non-secure');
|
|
const mime = require('mime');
|
|
// const fileType = require('file-type');
|
|
const ffmpeg = require('fluent-ffmpeg');
|
|
const sharp = require('sharp');
|
|
const blake2 = require('blake2');
|
|
const taskQueue = require('promise-task-queue');
|
|
const AWS = require('aws-sdk');
|
|
|
|
const logger = require('./logger')(__filename);
|
|
const argv = require('./argv');
|
|
const knex = require('./knex');
|
|
const http = require('./utils/http');
|
|
const bulkInsert = require('./utils/bulk-insert');
|
|
const chunk = require('./utils/chunk');
|
|
const { get } = require('./utils/qu');
|
|
|
|
// const pipeline = util.promisify(stream.pipeline);
|
|
const streamQueue = taskQueue();
|
|
|
|
const endpoint = new AWS.Endpoint('s3.eu-central-1.wasabisys.com');
|
|
|
|
const s3 = new AWS.S3({
|
|
// region: 'eu-central-1',
|
|
endpoint,
|
|
credentials: {
|
|
accessKeyId: config.s3.accessKey,
|
|
secretAccessKey: config.s3.secretKey,
|
|
},
|
|
});
|
|
|
|
function sampleMedias(medias, limit = argv.mediaLimit, preferLast = true) {
|
|
// limit media sets, use extras as fallbacks
|
|
if (medias.length <= limit) {
|
|
return medias;
|
|
}
|
|
|
|
const chunkSize = Math.floor(medias.length / limit);
|
|
const rest = medias.length - (limit * chunkSize);
|
|
|
|
const chunks = Array.from(
|
|
{ length: limit },
|
|
(value, index) => {
|
|
const start = (chunkSize * index) + Math.min(index, rest);
|
|
|
|
return medias.slice(
|
|
start,
|
|
start + chunkSize + (index < rest ? 1 : 0),
|
|
);
|
|
},
|
|
);
|
|
|
|
// flip last chunk so the very last image (often the best cumshot) is tried first
|
|
const lastPreferredChunks = preferLast
|
|
? chunks.slice(0, -1).concat(chunks.slice(-1).reverse())
|
|
: chunks;
|
|
|
|
const groupedMedias = lastPreferredChunks.map((mediaChunk) => {
|
|
// merge chunked medias into single media with grouped fallback priorities,
|
|
// so the first sources of each media is preferred over all second sources, etc.
|
|
const sources = mediaChunk
|
|
.reduce((accSources, media) => {
|
|
media.sources.forEach((source, index) => {
|
|
if (!accSources[index]) {
|
|
accSources.push([source]);
|
|
return;
|
|
}
|
|
|
|
accSources[index].push(source);
|
|
});
|
|
|
|
return accSources;
|
|
}, [])
|
|
.flat();
|
|
|
|
return {
|
|
id: mediaChunk[0].id,
|
|
role: mediaChunk[0].role,
|
|
sources,
|
|
};
|
|
});
|
|
|
|
return groupedMedias;
|
|
}
|
|
|
|
function itemsByKey(items, key) {
|
|
return items.reduce((acc, item) => ({ ...acc, [item[key]]: item }), {});
|
|
}
|
|
|
|
function isValidUrl(url) {
|
|
try {
|
|
const urlObject = new URL(url);
|
|
|
|
return !!urlObject;
|
|
} catch (error) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
function toBaseSource(rawSource) {
|
|
if (rawSource && (rawSource.src || (rawSource.extract && rawSource.url) || rawSource.stream)) {
|
|
const baseSource = {};
|
|
|
|
if (rawSource.src) {
|
|
if (!isValidUrl(rawSource.src)) {
|
|
return null;
|
|
}
|
|
|
|
baseSource.src = rawSource.src;
|
|
}
|
|
|
|
if (rawSource.quality) baseSource.quality = rawSource.quality;
|
|
if (rawSource.type) baseSource.type = rawSource.type;
|
|
|
|
if (rawSource.url) {
|
|
if (!isValidUrl(rawSource.url)) {
|
|
return null;
|
|
}
|
|
|
|
baseSource.url = rawSource.url;
|
|
}
|
|
|
|
if (rawSource.extract) baseSource.extract = rawSource.extract;
|
|
|
|
if (rawSource.expectType) baseSource.expectType = rawSource.expectType;
|
|
|
|
if (rawSource.stream) {
|
|
baseSource.src = rawSource.stream;
|
|
baseSource.stream = rawSource.stream;
|
|
}
|
|
|
|
// reject source if response mimetype does not match specified type
|
|
if (rawSource.verifyType) baseSource.verifyType = rawSource.verifyType;
|
|
|
|
if (rawSource.referer) baseSource.referer = rawSource.referer;
|
|
if (rawSource.host) baseSource.host = rawSource.host;
|
|
if (rawSource.attempts) baseSource.attempts = rawSource.attempts;
|
|
if (rawSource.interval) baseSource.interval = rawSource.interval;
|
|
if (rawSource.concurrency) baseSource.concurrency = rawSource.concurrency;
|
|
|
|
if (rawSource.vr) baseSource.vr = rawSource.vr;
|
|
|
|
if (rawSource.credit !== undefined) baseSource.credit = rawSource.credit;
|
|
if (rawSource.comment) baseSource.comment = rawSource.comment;
|
|
if (rawSource.group) baseSource.group = rawSource.group;
|
|
|
|
if (rawSource.process) baseSource.process = rawSource.process;
|
|
|
|
return baseSource;
|
|
}
|
|
|
|
if (typeof rawSource === 'string') {
|
|
if (!isValidUrl(rawSource)) {
|
|
return null;
|
|
}
|
|
|
|
if (new URL(rawSource).pathname.match(/\.(m3u8)|(mpd)$/)) {
|
|
return {
|
|
src: rawSource,
|
|
stream: rawSource,
|
|
};
|
|
}
|
|
|
|
return {
|
|
src: rawSource,
|
|
};
|
|
}
|
|
|
|
if (typeof rawSource === 'function') {
|
|
return {
|
|
defer: rawSource,
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function baseSourceToBaseMedia(baseSource, role, metadata) {
|
|
if (Array.isArray(baseSource)) {
|
|
if (baseSource.length > 0) {
|
|
return {
|
|
...metadata,
|
|
id: nanoid(),
|
|
role,
|
|
sources: baseSource,
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
if (baseSource) {
|
|
return {
|
|
...metadata,
|
|
id: nanoid(),
|
|
role,
|
|
sources: [baseSource],
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function sortBaseTrailersByQuality(sources, role) {
|
|
if (role === 'trailers') {
|
|
const sortedSources = sources.sort((sourceA, sourceB) => {
|
|
if (config.media.trailerQuality.includes(sourceB.quality) && config.media.trailerQuality.indexOf(sourceA.quality) > config.media.trailerQuality.indexOf(sourceB.quality)) {
|
|
return 1;
|
|
}
|
|
|
|
if (config.media.trailerQuality.includes(sourceA.quality) && config.media.trailerQuality.indexOf(sourceA.quality) < config.media.trailerQuality.indexOf(sourceB.quality)) {
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
});
|
|
|
|
return sortedSources;
|
|
}
|
|
|
|
return sources;
|
|
}
|
|
|
|
function fallbackMediaToBaseMedia(rawMedia, role, metadata) {
|
|
const baseSources = rawMedia
|
|
.map((source) => toBaseSource(source))
|
|
.filter(Boolean);
|
|
|
|
const sortedBaseSources = sortBaseTrailersByQuality(baseSources, role);
|
|
|
|
return baseSourceToBaseMedia(sortedBaseSources, role, metadata);
|
|
}
|
|
|
|
function toBaseMedias(rawMedias, role, metadata) {
|
|
if (!rawMedias || rawMedias.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const baseMedias = rawMedias.map((rawMedia) => {
|
|
if (!rawMedia) {
|
|
return null;
|
|
}
|
|
|
|
if (Array.isArray(rawMedia)) {
|
|
// fallback sources provided
|
|
return fallbackMediaToBaseMedia(rawMedia, role, metadata);
|
|
}
|
|
|
|
const baseSource = toBaseSource(rawMedia);
|
|
|
|
return baseSourceToBaseMedia(baseSource, role, metadata);
|
|
}).filter(Boolean);
|
|
|
|
const sampledBaseMedias = sampleMedias(baseMedias);
|
|
|
|
return sampledBaseMedias;
|
|
}
|
|
|
|
async function findSourceDuplicates(baseMedias) {
|
|
const sourceUrls = baseMedias
|
|
.map((baseMedia) => baseMedia.sources.map((source) => source.src))
|
|
.flat()
|
|
.filter(Boolean);
|
|
|
|
const extractUrls = baseMedias
|
|
.map((baseMedia) => baseMedia.sources.map((source) => source.extract))
|
|
.flat()
|
|
.filter(Boolean);
|
|
|
|
const [existingSourceMedia, existingExtractMedia] = await Promise.all([
|
|
// may try to check thousands of URLs at once, don't pass all of them to a single query
|
|
chunk(sourceUrls).reduce(async (chain, sourceUrlsChunk) => {
|
|
const accUrls = await chain;
|
|
const existingUrls = await knex('media').whereIn('source', sourceUrlsChunk);
|
|
|
|
return [...accUrls, ...existingUrls];
|
|
}, []),
|
|
chunk(extractUrls).reduce(async (chain, extractUrlsChunk) => {
|
|
const accUrls = await chain;
|
|
const existingUrls = await knex('media').whereIn('source_page', extractUrlsChunk);
|
|
|
|
return [...accUrls, ...existingUrls];
|
|
}, []),
|
|
]);
|
|
|
|
const existingSourceMediaByUrl = itemsByKey(existingSourceMedia, 'source');
|
|
const existingExtractMediaByUrl = itemsByKey(existingExtractMedia, 'source_page');
|
|
|
|
return {
|
|
existingSourceMediaByUrl,
|
|
existingExtractMediaByUrl,
|
|
};
|
|
}
|
|
|
|
async function findHashDuplicates(medias) {
|
|
const hashes = medias.map((media) => media.meta?.hash || media.entry?.hash).filter(Boolean);
|
|
|
|
const existingHashMediaEntries = await chunk(hashes, 2).reduce(async (chain, hashesChunk) => {
|
|
const accHashes = await chain;
|
|
const existingHashes = await knex('media').whereIn('hash', hashesChunk);
|
|
|
|
return [...accHashes, ...existingHashes];
|
|
}, []);
|
|
|
|
const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash');
|
|
const uniqueHashMedias = medias.filter((media) => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]);
|
|
|
|
const { selfDuplicateMedias, selfUniqueMediasByHash } = uniqueHashMedias.reduce((acc, media) => {
|
|
if (!media.meta?.hash) {
|
|
return acc;
|
|
}
|
|
|
|
if (acc.selfUniqueMediasByHash[media.meta.hash]) {
|
|
acc.selfDuplicateMedias.push({
|
|
...media,
|
|
use: acc.selfUniqueMediasByHash[media.meta.hash].id,
|
|
});
|
|
|
|
return acc;
|
|
}
|
|
|
|
acc.selfUniqueMediasByHash[media.meta.hash] = media;
|
|
|
|
return acc;
|
|
}, {
|
|
selfDuplicateMedias: [],
|
|
selfUniqueMediasByHash: {},
|
|
});
|
|
|
|
const selfUniqueHashMedias = Object.values(selfUniqueMediasByHash);
|
|
|
|
const existingHashMedias = medias
|
|
.filter((media) => existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash])
|
|
.map((media) => ({
|
|
...media,
|
|
entry: existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash],
|
|
}))
|
|
.concat(selfDuplicateMedias);
|
|
|
|
return { uniqueHashMedias: selfUniqueHashMedias, existingHashMedias };
|
|
}
|
|
|
|
async function extractSource(baseSource, { existingExtractMediaByUrl }) {
|
|
if (typeof baseSource.defer === 'function') {
|
|
const src = await baseSource.defer();
|
|
|
|
return {
|
|
...baseSource,
|
|
...toBaseSource(src),
|
|
};
|
|
}
|
|
|
|
if (typeof baseSource.extract !== 'function' || !baseSource.url) {
|
|
return baseSource;
|
|
}
|
|
|
|
const existingExtractMedia = existingExtractMediaByUrl[baseSource.url];
|
|
|
|
if (existingExtractMedia) {
|
|
// media entry found by extract URL
|
|
return {
|
|
...baseSource,
|
|
entry: existingExtractMedia,
|
|
};
|
|
}
|
|
|
|
const res = await get(baseSource.url);
|
|
|
|
if (res.ok) {
|
|
const src = await baseSource.extract(res.item);
|
|
|
|
return {
|
|
...baseSource,
|
|
...toBaseSource(src),
|
|
};
|
|
}
|
|
|
|
throw new Error(`Could not extract source from ${baseSource.url}: ${res.status}`);
|
|
}
|
|
|
|
async function storeS3Object(filepath, media) {
|
|
const fullFilepath = path.join(config.media.path, filepath);
|
|
const file = fs.createReadStream(fullFilepath);
|
|
|
|
const status = await s3.upload({
|
|
Bucket: config.s3.bucket,
|
|
Body: file,
|
|
Key: filepath,
|
|
ContentType: media.meta.mimetype,
|
|
}).promise();
|
|
|
|
await fsPromises.unlink(fullFilepath);
|
|
|
|
logger.silly(`Uploaded '${media.id}' from ${media.src} to S3 bucket '${status.Bucket}' at ${status.Location}`);
|
|
|
|
return status;
|
|
}
|
|
|
|
async function writeImage(image, media, info, filepath, isProcessed) {
|
|
if (isProcessed && info.pages) {
|
|
// convert animated image to WebP and write to permanent location
|
|
await image
|
|
.webp()
|
|
.toFile(path.join(config.media.path, filepath));
|
|
|
|
return;
|
|
}
|
|
|
|
await image
|
|
.resize({
|
|
height: config.media.maxSize,
|
|
withoutEnlargement: true,
|
|
})
|
|
.jpeg({ quality: config.media.quality })
|
|
.toFile(path.join(config.media.path, filepath));
|
|
}
|
|
|
|
async function writeThumbnail(image, thumbpath) {
|
|
return image
|
|
.jpeg({ quality: config.media.thumbnailQuality })
|
|
.resize({
|
|
height: config.media.thumbnailSize,
|
|
withoutEnlargement: true,
|
|
})
|
|
.rotate()
|
|
.toFile(path.join(config.media.path, thumbpath));
|
|
}
|
|
|
|
async function writeLazy(image, lazypath) {
|
|
return image
|
|
.resize({
|
|
height: config.media.lazySize,
|
|
withoutEnlargement: true,
|
|
})
|
|
.jpeg({ quality: config.media.lazyQuality })
|
|
.rotate()
|
|
.toFile(path.join(config.media.path, lazypath));
|
|
}
|
|
|
|
async function storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options) {
|
|
logger.silly(`Storing permanent media files for ${media.id} from ${media.src} at ${filepath}`);
|
|
logger.debug(`Memory usage at image storage: ${process.memoryUsage.rss() / 1000000} MB (${media.src})`);
|
|
|
|
try {
|
|
const thumbdir = config.s3.enabled ? path.join(media.role, 'thumbs') : path.join(media.role, 'thumbs', hashDir, hashSubDir);
|
|
const thumbpath = path.join(thumbdir, filename);
|
|
|
|
const lazydir = config.s3.enabled ? path.join(media.role, 'lazy') : path.join(media.role, 'lazy', hashDir, hashSubDir);
|
|
const lazypath = path.join(lazydir, filename);
|
|
|
|
await Promise.all([
|
|
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
|
|
fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }),
|
|
fsPromises.mkdir(path.join(config.media.path, lazydir), { recursive: true }),
|
|
]);
|
|
|
|
const image = sharp(media.file.path, { pages: ['trailer', 'teaser'].includes(media.role) ? -1 : 1 }); // don't store posters as animation
|
|
const isProcessed = media.meta.subtype !== 'jpeg' || media.process;
|
|
|
|
const [info, stats] = await Promise.all([
|
|
image.metadata(),
|
|
options?.stats && image.stats(),
|
|
]);
|
|
|
|
if (media.process) {
|
|
Object.entries(media.process).forEach(([operation, processOptions]) => {
|
|
if (image[operation]) {
|
|
image[operation](...(Array.isArray(processOptions) ? processOptions : [processOptions]));
|
|
return;
|
|
}
|
|
|
|
if (operation === 'crop') {
|
|
image.extract(...(Array.isArray(processOptions) ? processOptions : [processOptions]));
|
|
return;
|
|
}
|
|
|
|
logger.warn(`Unknown image operation on ${media.id} (${media.src}): ${operation}`);
|
|
});
|
|
}
|
|
|
|
await writeImage(image, media, info, filepath, isProcessed);
|
|
|
|
await Promise.all([
|
|
writeThumbnail(image, thumbpath, info),
|
|
writeLazy(image, lazypath, info),
|
|
]);
|
|
|
|
/*
|
|
if (isProcessed) {
|
|
// file already stored, remove temporary file
|
|
await fsPromises.unlink(media.file.path);
|
|
} else {
|
|
// image not processed, simply move temporary file to final location
|
|
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
|
|
}
|
|
*/
|
|
|
|
await fsPromises.unlink(media.file.path);
|
|
|
|
if (config.s3.enabled) {
|
|
await Promise.all([
|
|
storeS3Object(filepath, media),
|
|
storeS3Object(thumbpath, media),
|
|
storeS3Object(lazypath, media),
|
|
]);
|
|
}
|
|
|
|
logger.silly(`Stored thumbnail, lazy and permanent media file for ${media.id} from ${media.src} at ${filepath}`);
|
|
|
|
return {
|
|
...media,
|
|
file: {
|
|
path: filepath,
|
|
thumbnail: thumbpath,
|
|
lazy: lazypath,
|
|
},
|
|
meta: {
|
|
...media.meta,
|
|
// 6 or 8 implies image is sideways, and size is not inherently adjusted for orientation
|
|
width: info.orientation === 6 || info.orientation === 8 ? info.height : info.width,
|
|
height: info.orientation === 6 || info.orientation === 8 ? info.width : info.height,
|
|
entropy: stats?.entropy || null,
|
|
sharpness: stats?.sharpness || null,
|
|
},
|
|
};
|
|
} catch (error) {
|
|
logger.error(`Failed to store ${media.id} from ${media.src} at ${filepath}: ${error.message}`);
|
|
|
|
await fsPromises.unlink(media.file.path);
|
|
|
|
return null;
|
|
}
|
|
}
|
|
|
|
async function storeFile(media, options) {
|
|
try {
|
|
const hashDir = media.meta.hash.slice(0, 2);
|
|
const hashSubDir = media.meta.hash.slice(2, 4);
|
|
const hashFilename = config.s3.enabled ? media.meta.hash : media.meta.hash.slice(4);
|
|
|
|
const filename = media.quality
|
|
? `${hashFilename}_${media.quality}.${media.meta.extension}`
|
|
: `${hashFilename}.${media.meta.extension}`;
|
|
|
|
const filedir = config.s3.enabled ? media.role : path.join(media.role, hashDir, hashSubDir);
|
|
const filepath = path.join(filedir, filename);
|
|
|
|
if (argv.forceMedia) {
|
|
try {
|
|
// remove old file to in case rename() does not overwrite (possibly on NFS setups)
|
|
await fsPromises.unlink(path.join(config.media.path, filepath));
|
|
} catch (error) {
|
|
// file probably didn't exist
|
|
}
|
|
}
|
|
|
|
if (media.meta.type === 'image') {
|
|
return storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options);
|
|
}
|
|
|
|
if (['posters', 'photos', 'caps', 'covers'].includes(media.role)) {
|
|
throw new Error(`Media for '${media.role}' must be an image, but '${media.meta.mimetype}' was detected`);
|
|
}
|
|
|
|
if (['teasers', 'trailers'].includes(media.role) && media.meta.type !== 'video') {
|
|
throw new Error(`Media for '${media.role}' must be a video, but '${media.meta.mimetype}' was detected in ${media.src}`);
|
|
}
|
|
|
|
const [stat] = await Promise.all([
|
|
fsPromises.stat(media.file.path),
|
|
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
|
|
]);
|
|
|
|
// move temp file to permanent location
|
|
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
|
|
|
|
if (config.s3.enabled) {
|
|
// upload the file to S3 storage, will remove original
|
|
await storeS3Object(filepath, media);
|
|
}
|
|
|
|
logger.silly(`Stored permanent media file for ${media.id} from ${media.src} at ${filepath}`);
|
|
|
|
return {
|
|
...media,
|
|
file: {
|
|
path: filepath,
|
|
},
|
|
meta: {
|
|
...media.meta,
|
|
size: stat.size,
|
|
},
|
|
};
|
|
} catch (error) {
|
|
logger.warn(`Failed to store ${media.src}: ${error.message}`);
|
|
|
|
try {
|
|
await fsPromises.unlink(media.file.path);
|
|
} catch (unlinkError) {
|
|
logger.warn(`Failed to unlink ${media.file.path} from ${media.src}: ${unlinkError.message}`);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
}
|
|
|
|
async function fetchHttpSource(source, tempFileTarget, hashStream) {
|
|
const res = await http.get(source.src, {
|
|
headers: {
|
|
...(source.referer && { referer: source.referer }),
|
|
...(source.host && { host: source.host }),
|
|
},
|
|
stream: true, // sources are fetched in parallel, don't gobble up memory
|
|
transforms: [hashStream],
|
|
destination: tempFileTarget,
|
|
...(source.interval && { interval: source.interval }),
|
|
...(source.concurrency && { concurrency: source.concurrency }),
|
|
});
|
|
|
|
if (!res.ok) {
|
|
throw new Error(`Response ${res.status} not OK`);
|
|
}
|
|
|
|
return {
|
|
mimetype: (source.expectType
|
|
? source.expectType[res.headers['content-type']]
|
|
: res.headers['content-type'])
|
|
|| mime.getType(new URL(source.src).pathname),
|
|
};
|
|
}
|
|
|
|
streamQueue.define('fetchStreamSource', async ({ source, tempFileTarget, hashStream }) => {
|
|
const meta = { mimetype: 'video/mp4' };
|
|
|
|
const video = ffmpeg(source.stream)
|
|
.format('mp4')
|
|
.outputOptions(['-movflags frag_keyframe+empty_moov'])
|
|
.on('start', (cmd) => logger.verbose(`Fetching stream from ${source.stream} with "${cmd}"`))
|
|
.on('error', (error) => logger.error(`Failed to fetch stream from ${source.stream}: ${error.message}`))
|
|
.pipe();
|
|
|
|
// await pipeline(video, hashStream, tempFileTarget);
|
|
await stream.promises.pipeline(video, hashStream, tempFileTarget);
|
|
|
|
logger.verbose(`Finished fetching stream from ${source.stream}`);
|
|
|
|
return meta;
|
|
}, {
|
|
concurrency: config.media.streamConcurrency,
|
|
});
|
|
|
|
async function fetchSource(source, baseMedia) {
|
|
const maxAttempts = source.attempts || argv.mediaAttempts;
|
|
|
|
logger.silly(`Fetching media from ${source.src}`);
|
|
logger.debug(`Memory usage before media fetch: ${process.memoryUsage.rss() / 1000000} MB (${source.src})`);
|
|
|
|
if (source.stream && !config.media.fetchStreams) {
|
|
throw new Error(`Stream fetching disabled, ignoring ${source.stream}`);
|
|
}
|
|
|
|
async function attempt(attempts = 1) {
|
|
const hasher = new blake2.Hash('blake2b', { digestLength: 24 });
|
|
let hasherReady = true;
|
|
hasher.setEncoding('hex');
|
|
|
|
try {
|
|
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`);
|
|
const tempFileTarget = fs.createWriteStream(tempFilePath);
|
|
const hashStream = new stream.PassThrough();
|
|
let size = 0;
|
|
|
|
hashStream.on('data', (streamChunk) => {
|
|
size += streamChunk.length;
|
|
|
|
if (hasherReady) {
|
|
hasher.write(streamChunk);
|
|
}
|
|
});
|
|
|
|
const { mimetype } = source.stream
|
|
? await streamQueue.push('fetchStreamSource', { source, tempFileTarget, hashStream })
|
|
: await fetchHttpSource(source, tempFileTarget, hashStream);
|
|
|
|
hasher.end();
|
|
|
|
const hash = hasher.read();
|
|
const [type, subtype] = mimetype.split('/');
|
|
const extension = mime.getExtension(mimetype);
|
|
|
|
if (source.verifyType && source.verifyType !== type) {
|
|
throw Object.assign(new Error(`Type '${type}' does not match type '${source.verifyType}' specified by source`), { code: 'VERIFY_TYPE' });
|
|
}
|
|
|
|
return {
|
|
...source,
|
|
file: {
|
|
path: tempFilePath,
|
|
},
|
|
meta: {
|
|
hash,
|
|
mimetype,
|
|
extension,
|
|
type,
|
|
subtype,
|
|
size,
|
|
},
|
|
};
|
|
} catch (error) {
|
|
hasherReady = false;
|
|
hasher.end();
|
|
|
|
if (error.code !== 'VERIFY_TYPE') {
|
|
logger.warn(`Failed attempt ${attempts}/${maxAttempts} to fetch ${source.src}: ${error.message}`);
|
|
|
|
if (attempts < maxAttempts) {
|
|
await Promise.delay(1000);
|
|
|
|
return attempt(attempts + 1);
|
|
}
|
|
}
|
|
|
|
throw new Error(`Failed to fetch ${source.src}: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
return attempt(1);
|
|
}
|
|
|
|
async function trySource(baseSource, existingMedias, baseMedia) {
|
|
// catch error and try the next source
|
|
const extractedSource = await extractSource(baseSource, existingMedias);
|
|
const existingSourceMedia = existingMedias.existingSourceMediaByUrl[extractedSource.src];
|
|
|
|
if (!argv.forceMedia && extractedSource.entry) {
|
|
logger.silly(`Media page URL already in database, not extracting ${baseSource.url}`);
|
|
|
|
// media entry found during extraction, don't fetch
|
|
return extractedSource;
|
|
}
|
|
|
|
if (!argv.forceMedia && existingSourceMedia) {
|
|
logger.silly(`Media source URL already in database, skipping ${baseSource.src}`);
|
|
|
|
// media entry found by source URL, don't fetch
|
|
return {
|
|
...baseSource,
|
|
entry: existingSourceMedia,
|
|
};
|
|
}
|
|
|
|
const source = await fetchSource(extractedSource, baseMedia);
|
|
|
|
return {
|
|
...source,
|
|
entry: extractedSource.entry || existingSourceMedia,
|
|
};
|
|
}
|
|
|
|
async function fetchMedia(baseMedia, existingMedias) {
|
|
try {
|
|
const source = await baseMedia.sources.reduce(
|
|
// try each source until success
|
|
(result, baseSource, baseSourceIndex) => result.catch(async (error) => {
|
|
if (error.message) {
|
|
logger.warn(error.message);
|
|
}
|
|
|
|
return trySource(baseSource, existingMedias, baseMedia, baseSourceIndex);
|
|
}),
|
|
Promise.reject(new Error()),
|
|
);
|
|
|
|
return {
|
|
...baseMedia,
|
|
...source,
|
|
};
|
|
} catch (error) {
|
|
logger.warn(error.message);
|
|
|
|
return baseMedia;
|
|
}
|
|
}
|
|
|
|
function curateMediaEntry(media, index) {
|
|
if (media.entry) {
|
|
return media;
|
|
}
|
|
|
|
const curatedMediaEntry = {
|
|
id: media.id,
|
|
path: media.file.path,
|
|
thumbnail: media.file.thumbnail,
|
|
lazy: media.file.lazy,
|
|
is_s3: config.s3.enabled,
|
|
index,
|
|
mime: media.meta.mimetype,
|
|
hash: media.meta.hash,
|
|
size: media.meta.size,
|
|
width: media.meta.width,
|
|
height: media.meta.height,
|
|
is_vr: media.vr,
|
|
entropy: media.meta.entropy,
|
|
sharpness: media.meta.sharpness,
|
|
source: media.src,
|
|
source_page: media.url,
|
|
scraper: media.scraper,
|
|
credit: media.credit,
|
|
comment: media.comment,
|
|
};
|
|
|
|
return {
|
|
...media,
|
|
newEntry: true,
|
|
entry: curatedMediaEntry,
|
|
};
|
|
}
|
|
|
|
async function storeMedias(baseMedias, options) {
|
|
await fsPromises.mkdir(path.join(config.media.path, 'temp'), { recursive: true });
|
|
|
|
const { existingSourceMediaByUrl, existingExtractMediaByUrl } = await findSourceDuplicates(baseMedias);
|
|
|
|
const fetchedMedias = await Promise.map(
|
|
baseMedias,
|
|
async (baseMedia) => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }),
|
|
// { concurrency: 100 }, // don't overload disk (or network, although this has its own throttling)
|
|
{ concurrency: 10 }, // don't overload disk (or network, although this has its own throttling)
|
|
);
|
|
|
|
const { uniqueHashMedias, existingHashMedias } = await findHashDuplicates(fetchedMedias);
|
|
|
|
const savedMedias = await Promise.map(
|
|
uniqueHashMedias,
|
|
async (baseMedia) => storeFile(baseMedia, options),
|
|
{ concurrency: 100 }, // don't overload disk
|
|
);
|
|
|
|
if (argv.forceMedia) {
|
|
// overwrite files in case image processing was changed
|
|
await Promise.map(
|
|
existingHashMedias,
|
|
async (baseMedia) => storeFile(baseMedia, options),
|
|
{ concurrency: 100 }, // don't overload disk
|
|
);
|
|
}
|
|
|
|
const newMediaWithEntries = savedMedias.filter(Boolean).map((media, index) => curateMediaEntry(media, index));
|
|
const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry);
|
|
|
|
try {
|
|
await bulkInsert('media', newMediaEntries, false);
|
|
|
|
return [...newMediaWithEntries, ...existingHashMedias];
|
|
} catch (error) {
|
|
throw Object.assign(error, { entries: newMediaEntries });
|
|
}
|
|
}
|
|
|
|
async function associateReleaseMedia(releases, type = 'release') {
|
|
if (!argv.media) {
|
|
return;
|
|
}
|
|
|
|
const baseMediasByReleaseId = releases.reduce((acc, release) => ({
|
|
...acc,
|
|
[release.id]: [
|
|
...(argv.images && argv.poster ? toBaseMedias([release.poster], 'posters') : []),
|
|
...(argv.images && argv.covers ? toBaseMedias(release.covers, 'covers') : []),
|
|
...(argv.images && argv.photos ? toBaseMedias(release.photos, 'photos') : []),
|
|
...(argv.images && argv.caps ? toBaseMedias(release.caps, 'caps') : []),
|
|
...(argv.videos && argv.trailer ? toBaseMedias([release.trailer], 'trailers') : []),
|
|
...(argv.videos && argv.teaser ? toBaseMedias([release.teaser], 'teasers') : []),
|
|
],
|
|
}), {});
|
|
|
|
const baseMediasByRole = Object.values(baseMediasByReleaseId)
|
|
.flat()
|
|
.filter(Boolean)
|
|
.reduce((acc, baseMedia) => {
|
|
if (!acc[baseMedia.role]) acc[baseMedia.role] = [];
|
|
acc[baseMedia.role].push(baseMedia);
|
|
|
|
return acc;
|
|
}, {});
|
|
|
|
await Promise.reduce(['posters', 'covers', 'photos', 'caps', 'teasers', 'trailers'], async (chain, role) => {
|
|
// stage by role so posters are prioritized over photos and videos
|
|
await chain;
|
|
|
|
const baseMedias = baseMediasByRole[role];
|
|
|
|
if (!baseMedias) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const storedMedias = await storeMedias(baseMedias);
|
|
const storedMediasById = itemsByKey(storedMedias, 'id');
|
|
|
|
const associations = Object
|
|
.entries(baseMediasByReleaseId)
|
|
.reduce((acc, [releaseId, releaseBaseMedias]) => {
|
|
releaseBaseMedias.forEach((baseMedia) => {
|
|
const media = storedMediasById[baseMedia.id];
|
|
const mediaId = (storedMediasById[media?.use] && media?.use) || media?.entry?.id;
|
|
|
|
if (mediaId) {
|
|
acc.push({
|
|
[`${type}_id`]: releaseId,
|
|
media_id: mediaId,
|
|
});
|
|
}
|
|
});
|
|
|
|
return acc;
|
|
}, [])
|
|
.filter(Boolean);
|
|
|
|
if (associations.length > 0) {
|
|
await bulkInsert(`${type}s_${role}`, associations, false);
|
|
}
|
|
} catch (error) {
|
|
if (error.entries) {
|
|
logger.error(util.inspect(error.entries, null, null, { color: true }));
|
|
}
|
|
|
|
logger.error(`Failed to store ${type} ${role}: ${error.message} (${error.detail || 'no detail'})`);
|
|
}
|
|
}, Promise.resolve());
|
|
}
|
|
|
|
async function associateAvatars(profiles) {
|
|
if (!argv.media) {
|
|
return profiles;
|
|
}
|
|
|
|
const profilesWithBaseMedias = profiles.map((profile) => (profile.avatar
|
|
? {
|
|
...profile,
|
|
avatarBaseMedia: toBaseMedias([profile.avatar], 'avatars', {
|
|
credit: profile.credit || profile.entity?.name || null,
|
|
scraper: profile.scraper || null,
|
|
})[0],
|
|
}
|
|
: profile
|
|
));
|
|
|
|
const baseMedias = profilesWithBaseMedias.map((profile) => profile.avatarBaseMedia).filter(Boolean);
|
|
|
|
const storedMedias = await storeMedias(baseMedias, { stats: true });
|
|
const storedMediasById = itemsByKey(storedMedias, 'id');
|
|
|
|
const profilesWithAvatarIds = profilesWithBaseMedias.map((profile) => {
|
|
const media = storedMediasById[profile.avatarBaseMedia?.id];
|
|
|
|
if (media) {
|
|
return {
|
|
...profile,
|
|
avatarMediaId: media.use || media.entry.id,
|
|
};
|
|
}
|
|
|
|
return profile;
|
|
});
|
|
|
|
return profilesWithAvatarIds;
|
|
}
|
|
|
|
async function deleteS3Objects(media) {
|
|
const objects = media
|
|
.map((item) => [
|
|
{ Key: item.path },
|
|
{ Key: item.thumbnail },
|
|
{ Key: item.lazy },
|
|
])
|
|
.flat()
|
|
.filter((item) => item.Key);
|
|
|
|
const status = await s3.deleteObjects({
|
|
Bucket: config.s3.bucket,
|
|
Delete: {
|
|
Objects: objects,
|
|
Quiet: false,
|
|
},
|
|
}).promise();
|
|
|
|
logger.info(`Removed ${status.Deleted.length} media files from S3 bucket '${config.s3.bucket}', ${status.Errors.length} errors`);
|
|
|
|
return status;
|
|
}
|
|
|
|
async function flushOrphanedMedia() {
|
|
const orphanedMedia = await knex('media')
|
|
.where('is_sfw', false)
|
|
.whereNotExists(
|
|
knex
|
|
.from(
|
|
knex('tags_posters')
|
|
.select('media_id')
|
|
.unionAll(
|
|
knex('tags_photos').select('media_id'),
|
|
knex('releases_posters').select('media_id'),
|
|
knex('releases_photos').select('media_id'),
|
|
knex('releases_caps').select('media_id'),
|
|
knex('releases_covers').select('media_id'),
|
|
knex('releases_trailers').select('media_id'),
|
|
knex('releases_teasers').select('media_id'),
|
|
knex('movies_covers').select('media_id'),
|
|
knex('movies_trailers').select('media_id'),
|
|
knex('actors').select(knex.raw('avatar_media_id as media_id')),
|
|
knex('actors_profiles').select(knex.raw('avatar_media_id as media_id')),
|
|
knex('actors_photos').select('media_id'),
|
|
knex('chapters_photos').select('media_id'),
|
|
knex('chapters_posters').select('media_id'),
|
|
)
|
|
.as('associations'),
|
|
)
|
|
.whereRaw('associations.media_id = media.id'),
|
|
)
|
|
.returning(['media.id', 'media.is_s3', 'media.path', 'media.thumbnail', 'media.lazy'])
|
|
.delete();
|
|
|
|
if (argv.flushMediaFiles) {
|
|
await Promise.all(orphanedMedia.filter((media) => !media.is_s3).map((media) => Promise.all([
|
|
media.path && fsPromises.unlink(path.join(config.media.path, media.path)).catch(() => { /* probably file not found */ }),
|
|
media.thumbnail && fsPromises.unlink(path.join(config.media.path, media.thumbnail)).catch(() => { /* probably file not found */ }),
|
|
media.lazy && fsPromises.unlink(path.join(config.media.path, media.lazy)).catch(() => { /* probably file not found */ }),
|
|
])));
|
|
|
|
if (config.s3.enabled) {
|
|
await deleteS3Objects(orphanedMedia.filter((media) => media.is_s3));
|
|
}
|
|
|
|
logger.info(`Removed ${orphanedMedia.length} media files from database and storage`);
|
|
} else {
|
|
logger.info(`Removed ${orphanedMedia.length} media files from database, but not from storage`);
|
|
}
|
|
|
|
try {
|
|
await fsPromises.rm(path.join(config.media.path, 'temp'), { recursive: true });
|
|
logger.info('Cleared temporary media directory');
|
|
} catch (error) {
|
|
logger.warn(`Failed to clear temporary media directory: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
associateAvatars,
|
|
associateReleaseMedia,
|
|
flushOrphanedMedia,
|
|
};
|