traxxx/src/media.js

813 lines
22 KiB
JavaScript
Raw Normal View History

'use strict';
const config = require('config');
const util = require('util');
const Promise = require('bluebird');
const fs = require('fs');
const fsPromises = require('fs').promises;
const path = require('path');
const stream = require('stream');
const nanoid = require('nanoid/non-secure');
const mime = require('mime');
2020-04-11 20:49:37 +00:00
// const fileType = require('file-type');
const ffmpeg = require('fluent-ffmpeg');
const sharp = require('sharp');
const blake2 = require('blake2');
const taskQueue = require('promise-task-queue');
const logger = require('./logger')(__filename);
const argv = require('./argv');
const knex = require('./knex');
const http = require('./utils/http');
const bulkInsert = require('./utils/bulk-insert');
const { get } = require('./utils/qu');
const pipeline = util.promisify(stream.pipeline);
const streamQueue = taskQueue();
2020-04-11 20:49:37 +00:00
function sampleMedias(medias, limit = config.media.limit, preferLast = true) {
// limit media sets, use extras as fallbacks
if (medias.length <= limit) {
return medias;
}
const chunkSize = Math.floor(medias.length / limit);
const rest = medias.length - (limit * chunkSize);
const chunks = Array.from(
{ length: limit },
(value, index) => {
const start = (chunkSize * index) + Math.min(index, rest);
return medias.slice(
start,
start + chunkSize + (index < rest ? 1 : 0),
);
},
);
// flip last chunk so the very last image (often the best cumshot) is tried first
const lastPreferredChunks = preferLast
? chunks.slice(0, -1).concat(chunks.slice(-1).reverse())
: chunks;
const groupedMedias = lastPreferredChunks.map((chunk) => {
// merge chunked medias into single media with grouped fallback priorities,
// so the first sources of each media is preferred over all second sources, etc.
const sources = chunk
.reduce((accSources, media) => {
media.sources.forEach((source, index) => {
if (!accSources[index]) {
accSources.push([source]);
return;
}
accSources[index].push(source);
});
return accSources;
}, [])
.flat();
return {
id: chunk[0].id,
role: chunk[0].role,
sources,
};
});
return groupedMedias;
2020-04-11 20:49:37 +00:00
}
function itemsByKey(items, key) {
return items.reduce((acc, item) => ({ ...acc, [item[key]]: item }), {});
}
function toBaseSource(rawSource) {
if (rawSource && (rawSource.src || (rawSource.extract && rawSource.url) || rawSource.stream)) {
const baseSource = {};
if (rawSource.src) baseSource.src = rawSource.src;
if (rawSource.quality) baseSource.quality = rawSource.quality;
if (rawSource.type) baseSource.type = rawSource.type;
if (rawSource.url) baseSource.url = rawSource.url;
if (rawSource.extract) baseSource.extract = rawSource.extract;
if (rawSource.stream) {
baseSource.src = rawSource.stream;
baseSource.stream = rawSource.stream;
}
// reject source if response mimetype does not match specified type
if (rawSource.verifyType) baseSource.verifyType = rawSource.verifyType;
if (rawSource.referer) baseSource.referer = rawSource.referer;
if (rawSource.host) baseSource.host = rawSource.host;
if (rawSource.attempts) baseSource.attempts = rawSource.attempts;
if (rawSource.interval) baseSource.interval = rawSource.interval;
if (rawSource.concurrency) baseSource.concurrency = rawSource.concurrency;
if (rawSource.credit !== undefined) baseSource.credit = rawSource.credit;
if (rawSource.comment) baseSource.comment = rawSource.comment;
if (rawSource.group) baseSource.group = rawSource.group;
2020-03-31 02:05:31 +00:00
if (rawSource.process) baseSource.process = rawSource.process;
return baseSource;
}
if (typeof rawSource === 'string') {
return {
src: rawSource,
};
}
return null;
}
function baseSourceToBaseMedia(baseSource, role, metadata) {
if (Array.isArray(baseSource)) {
if (baseSource.length > 0) {
return {
...metadata,
id: nanoid(),
role,
sources: baseSource,
};
}
return null;
}
if (baseSource) {
return {
...metadata,
id: nanoid(),
role,
sources: [baseSource],
};
}
return null;
}
function fallbackMediaToBaseMedia(rawMedia, role, metadata) {
const baseSources = rawMedia
.map(source => toBaseSource(source))
.filter(Boolean);
return baseSourceToBaseMedia(baseSources, role, metadata);
}
function toBaseMedias(rawMedias, role, metadata) {
if (!rawMedias || rawMedias.length === 0) {
return [];
}
const baseMedias = rawMedias.map((rawMedia) => {
if (!rawMedia) {
return null;
}
if (Array.isArray(rawMedia)) {
// fallback sources provided
return fallbackMediaToBaseMedia(rawMedia, role, metadata);
}
const baseSource = toBaseSource(rawMedia);
return baseSourceToBaseMedia(baseSource, role, metadata);
}).filter(Boolean);
2020-04-11 20:49:37 +00:00
const sampledBaseMedias = sampleMedias(baseMedias);
2020-04-11 20:49:37 +00:00
return sampledBaseMedias;
}
async function findSourceDuplicates(baseMedias) {
const sourceUrls = baseMedias
.map(baseMedia => baseMedia.sources.map(source => source.src))
.flat()
.filter(Boolean);
const extractUrls = baseMedias
.map(baseMedia => baseMedia.sources.map(source => source.url))
.flat()
.filter(Boolean);
const [existingSourceMedia, existingExtractMedia] = await Promise.all([
knex('media').whereIn('source', sourceUrls),
knex('media').whereIn('source_page', extractUrls),
]);
const existingSourceMediaByUrl = itemsByKey(existingSourceMedia, 'source');
const existingExtractMediaByUrl = itemsByKey(existingExtractMedia, 'source_page');
return [existingSourceMediaByUrl, existingExtractMediaByUrl];
}
async function findHashDuplicates(medias) {
const hashes = medias.map(media => media.meta?.hash || media.entry?.hash).filter(Boolean);
2020-03-31 02:05:31 +00:00
const existingHashMediaEntries = await knex('media').whereIn('hash', hashes);
const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash');
2020-03-31 02:05:31 +00:00
const uniqueHashMedias = medias.filter(media => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]);
const { selfDuplicateMedias, selfUniqueMediasByHash } = uniqueHashMedias.reduce((acc, media) => {
if (!media.meta?.hash) {
return acc;
}
if (acc.selfUniqueMediasByHash[media.meta.hash]) {
acc.selfDuplicateMedias.push({
...media,
use: acc.selfUniqueMediasByHash[media.meta.hash].id,
});
return acc;
}
2020-03-31 02:05:31 +00:00
acc.selfUniqueMediasByHash[media.meta.hash] = media;
return acc;
}, {
selfDuplicateMedias: [],
selfUniqueMediasByHash: {},
});
const selfUniqueHashMedias = Object.values(selfUniqueMediasByHash);
const existingHashMedias = medias
.filter(media => existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash])
.map(media => ({
...media,
entry: existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash],
}))
.concat(selfDuplicateMedias);
return [selfUniqueHashMedias, existingHashMedias];
}
async function extractSource(baseSource, { existingExtractMediaByUrl }) {
if (typeof baseSource.extract !== 'function' || !baseSource.url) {
return baseSource;
}
const existingExtractMedia = existingExtractMediaByUrl[baseSource.url];
if (existingExtractMedia) {
// media entry found by extract URL
return {
...baseSource,
entry: existingExtractMedia,
};
}
const res = await get(baseSource.url);
if (res.ok) {
const src = await baseSource.extract(res.item);
return {
...baseSource,
src,
};
}
throw new Error(`Could not extract source from ${baseSource.url}: ${res.status}`);
}
async function storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options) {
logger.silly(`Storing permanent media files for ${media.id} from ${media.src} at ${filepath}`);
try {
const thumbdir = path.join(media.role, 'thumbs', hashDir, hashSubDir);
const thumbpath = path.join(thumbdir, filename);
const lazydir = path.join(media.role, 'lazy', hashDir, hashSubDir);
const lazypath = path.join(lazydir, filename);
await Promise.all([
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }),
fsPromises.mkdir(path.join(config.media.path, lazydir), { recursive: true }),
]);
const image = sharp(media.file.path);
const isProcessed = media.meta.subtype !== 'jpeg' || media.process;
const [info, stats] = await Promise.all([
image.metadata(),
options?.stats && image.stats(),
]);
2020-12-19 22:01:17 +00:00
if (media.process) {
Object.entries(media.process).forEach(([operation, processOptions]) => {
if (image[operation]) {
image[operation](...(Array.isArray(processOptions) ? processOptions : [processOptions]));
return;
}
if (operation === 'crop') {
image.extract(...(Array.isArray(processOptions) ? processOptions : [processOptions]));
return;
}
logger.warn(`Unknown image operation on ${media.id} (${media.src}): ${operation}`);
});
}
if (isProcessed) {
// convert to JPEG and write to permanent location
await image
.jpeg()
.toFile(path.join(config.media.path, filepath));
}
// generate thumbnail and lazy
await Promise.all([
image
.resize({
height: config.media.thumbnailSize,
withoutEnlargement: true,
})
.jpeg({ quality: config.media.thumbnailQuality })
.toFile(path.join(config.media.path, thumbpath)),
image
.resize({
height: config.media.lazySize,
withoutEnlargement: true,
})
.jpeg({ quality: config.media.lazyQuality })
.toFile(path.join(config.media.path, lazypath)),
]);
if (isProcessed) {
// remove temp file
await fsPromises.unlink(media.file.path);
} else {
// move temp file to permanent location
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
}
logger.silly(`Stored thumbnail, lazy and permanent media file for ${media.id} from ${media.src} at ${filepath}`);
return {
...media,
file: {
path: filepath,
thumbnail: thumbpath,
lazy: lazypath,
},
meta: {
...media.meta,
width: info.width,
height: info.height,
entropy: stats?.entropy || null,
sharpness: stats?.sharpness || null,
},
};
} catch (error) {
logger.error(`Failed to store ${media.id} from ${media.src} at ${filepath}: ${error.message}`);
await fsPromises.unlink(media.file.path);
return null;
}
}
async function storeFile(media, options) {
try {
const hashDir = media.meta.hash.slice(0, 2);
const hashSubDir = media.meta.hash.slice(2, 4);
const hashFilename = media.meta.hash.slice(4);
const filename = media.quality
? `${hashFilename}_${media.quality}.${media.meta.extension}`
: `${hashFilename}.${media.meta.extension}`;
const filedir = path.join(media.role, hashDir, hashSubDir);
const filepath = path.join(filedir, filename);
if (argv.force) {
try {
// remove old file to in case rename() does not overwrite (possibly on NFS setups)
await fsPromises.unlink(path.join(config.media.path, filepath));
} catch (error) {
// file probably didn't exist
}
}
if (media.meta.type === 'image') {
return storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath, options);
}
const [stat] = await Promise.all([
fsPromises.stat(media.file.path),
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
]);
// move temp file to permanent location
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
logger.silly(`Stored permanent media file for ${media.id} from ${media.src} at ${filepath}`);
return {
...media,
file: {
path: filepath,
},
meta: {
...media.meta,
size: stat.size,
},
};
} catch (error) {
logger.warn(`Failed to store ${media.src}: ${error.message}`);
await fsPromises.unlink(media.file.path);
return null;
}
2020-03-31 02:05:31 +00:00
}
async function fetchHttpSource(source, tempFileTarget, hashStream) {
const res = await http.get(source.src, {
headers: {
...(source.referer && { referer: source.referer }),
...(source.host && { host: source.host }),
},
stream: true, // sources are fetched in parallel, don't gobble up memory
transforms: [hashStream],
destination: tempFileTarget,
...(source.interval && { interval: source.interval }),
...(source.concurrency && { concurrency: source.concurrency }),
});
if (!res.ok) {
throw new Error(`Response ${res.status} not OK`);
}
return {
mimetype: res.headers['content-type'] || mime.getType(new URL(source.src).pathname),
};
}
streamQueue.define('fetchStreamSource', async ({ source, tempFileTarget, hashStream }) => {
const meta = { mimetype: 'video/mp4' };
const video = ffmpeg(source.stream)
.format('mp4')
.outputOptions(['-movflags frag_keyframe+empty_moov'])
.on('start', cmd => logger.verbose(`Fetching stream from ${source.stream} with "${cmd}"`))
.on('error', error => logger.error(`Failed to fetch stream from ${source.stream}: ${error.message}`))
.pipe();
await pipeline(video, hashStream, tempFileTarget);
logger.verbose(`Finished fetching stream from ${source.stream}`);
return meta;
}, {
concurrency: config.media.streamConcurrency,
});
async function fetchSource(source, baseMedia) {
const maxAttempts = source.attempts || 3;
logger.silly(`Fetching media from ${source.src}`);
// attempts
async function attempt(attempts = 1) {
const hasher = new blake2.Hash('blake2b', { digestLength: 24 });
hasher.setEncoding('hex');
try {
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`);
const tempFileTarget = fs.createWriteStream(tempFilePath);
const hashStream = new stream.PassThrough();
let size = 0;
hashStream.on('data', (chunk) => {
size += chunk.length;
hasher.write(chunk);
});
const { mimetype } = source.stream
? await streamQueue.push('fetchStreamSource', { source, tempFileTarget, hashStream })
: await fetchHttpSource(source, tempFileTarget, hashStream);
hasher.end();
const hash = hasher.read();
const [type, subtype] = mimetype.split('/');
const extension = mime.getExtension(mimetype);
if (source.verifyType && source.verifyType !== type) {
throw Object.assign(new Error(`Type '${type}' does not match type '${source.verifyType}' specified by source`), { code: 'VERIFY_TYPE' });
}
return {
...source,
file: {
path: tempFilePath,
},
meta: {
hash,
mimetype,
extension,
type,
subtype,
size,
},
};
} catch (error) {
hasher.end();
if (error.code !== 'VERIFY_TYPE') {
logger.warn(`Failed attempt ${attempts}/${maxAttempts} to fetch ${source.src}: ${error.message}`);
if (attempts < maxAttempts) {
await Promise.delay(1000);
return attempt(attempts + 1);
}
}
throw new Error(`Failed to fetch ${source.src}: ${error.message}`);
}
}
return attempt(1);
}
async function trySource(baseSource, existingMedias, baseMedia) {
// catch error and try the next source
const extractedSource = await extractSource(baseSource, existingMedias);
const existingSourceMedia = existingMedias.existingSourceMediaByUrl[extractedSource.src];
if (!argv.force && extractedSource.entry) {
logger.silly(`Media page URL already in database, not extracting ${baseSource.url}`);
2020-03-31 02:05:31 +00:00
// media entry found during extraction, don't fetch
return extractedSource;
}
if (!argv.force && existingSourceMedia) {
logger.silly(`Media source URL already in database, skipping ${baseSource.src}`);
2020-03-31 02:05:31 +00:00
// media entry found by source URL, don't fetch
return {
...baseSource,
entry: existingSourceMedia,
};
}
return fetchSource(extractedSource, baseMedia);
}
async function fetchMedia(baseMedia, existingMedias) {
try {
const source = await baseMedia.sources.reduce(
// try each source until success
(result, baseSource, baseSourceIndex) => result.catch(async (error) => {
if (error.message) {
logger.warn(error.message);
}
return trySource(baseSource, existingMedias, baseMedia, baseSourceIndex);
}),
Promise.reject(new Error()),
);
return {
...baseMedia,
...source,
};
} catch (error) {
logger.warn(error.message);
return baseMedia;
}
}
2020-03-31 02:05:31 +00:00
function curateMediaEntry(media, index) {
if (media.entry) {
return media;
}
const curatedMediaEntry = {
id: media.id,
path: media.file.path,
thumbnail: media.file.thumbnail,
lazy: media.file.lazy,
index,
mime: media.meta.mimetype,
hash: media.meta.hash,
size: media.meta.size,
width: media.meta.width,
height: media.meta.height,
entropy: media.meta.entropy,
sharpness: media.meta.sharpness,
source: media.src,
source_page: media.url,
scraper: media.scraper,
credit: media.credit,
comment: media.comment,
};
return {
...media,
newEntry: true,
entry: curatedMediaEntry,
};
2020-03-31 02:05:31 +00:00
}
async function storeMedias(baseMedias, options) {
await fsPromises.mkdir(path.join(config.media.path, 'temp'), { recursive: true });
const [existingSourceMediaByUrl, existingExtractMediaByUrl] = await findSourceDuplicates(baseMedias);
const fetchedMedias = await Promise.map(
baseMedias,
async baseMedia => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }),
{ concurrency: 100 }, // don't overload disk (or network, although this has its own throttling)
);
const [uniqueHashMedias, existingHashMedias] = await findHashDuplicates(fetchedMedias);
const savedMedias = await Promise.map(
uniqueHashMedias,
async baseMedia => storeFile(baseMedia, options),
{ concurrency: 100 }, // don't overload disk
);
if (argv.force) {
// overwrite files in case image processing was changed
await Promise.map(
existingHashMedias,
async baseMedia => storeFile(baseMedia, options),
{ concurrency: 100 }, // don't overload disk
);
}
const newMediaWithEntries = savedMedias.filter(Boolean).map((media, index) => curateMediaEntry(media, index));
const newMediaEntries = newMediaWithEntries.filter(media => media.newEntry).map(media => media.entry);
await bulkInsert('media', newMediaEntries);
return [...newMediaWithEntries, ...existingHashMedias];
}
async function associateReleaseMedia(releases, type = 'release') {
if (!argv.media) {
return;
}
const baseMediasByReleaseId = releases.reduce((acc, release) => ({
...acc,
[release.id]: [
...(argv.images && argv.poster ? toBaseMedias([release.poster], 'posters') : []),
...(argv.images && argv.covers ? toBaseMedias(release.covers, 'covers') : []),
...(argv.images && argv.photos ? toBaseMedias(release.photos, 'photos') : []),
...(argv.videos && argv.trailer ? toBaseMedias([release.trailer], 'trailers') : []),
...(argv.videos && argv.teaser ? toBaseMedias([release.teaser], 'teasers') : []),
],
}), {});
const baseMediasByRole = Object.values(baseMediasByReleaseId)
.flat()
.filter(Boolean)
.reduce((acc, baseMedia) => {
if (!acc[baseMedia.role]) acc[baseMedia.role] = [];
acc[baseMedia.role].push(baseMedia);
return acc;
}, {});
await Promise.reduce(['posters', 'covers', 'photos', 'teasers', 'trailers'], async (chain, role) => {
// stage by role so posters are prioritized over photos and videos
await chain;
const baseMedias = baseMediasByRole[role];
if (!baseMedias) {
return;
}
const storedMedias = await storeMedias(baseMedias);
const storedMediasById = itemsByKey(storedMedias, 'id');
const associations = Object
.entries(baseMediasByReleaseId)
.reduce((acc, [releaseId, releaseBaseMedias]) => {
releaseBaseMedias.forEach((baseMedia) => {
const media = storedMediasById[baseMedia.id];
if (media) {
acc.push({
[`${type}_id`]: releaseId,
media_id: media.use || media.entry.id,
});
}
});
return acc;
}, [])
.filter(Boolean);
if (associations.length > 0) {
await bulkInsert(`${type}s_${role}`, associations, false);
}
}, Promise.resolve());
}
async function associateAvatars(profiles) {
if (!argv.media) {
return profiles;
}
const profilesWithBaseMedias = profiles.map(profile => (profile.avatar
? {
...profile,
avatarBaseMedia: toBaseMedias([profile.avatar], 'avatars', {
2020-12-19 22:01:17 +00:00
credit: profile.credit || profile.entity?.name || null,
scraper: profile.scraper || null,
})[0],
}
: profile
));
const baseMedias = profilesWithBaseMedias.map(profile => profile.avatarBaseMedia).filter(Boolean);
const storedMedias = await storeMedias(baseMedias, { stats: true });
const storedMediasById = itemsByKey(storedMedias, 'id');
const profilesWithAvatarIds = profilesWithBaseMedias.map((profile) => {
const media = storedMediasById[profile.avatarBaseMedia?.id];
if (media) {
return {
...profile,
avatarMediaId: media.use || media.entry.id,
};
}
return profile;
});
return profilesWithAvatarIds;
}
async function flushOrphanedMedia() {
const orphanedMedia = await knex('media')
.where('is_sfw', false)
.whereNotExists(
knex
.from(
knex('tags_posters')
.select('media_id')
.unionAll(
knex('tags_photos').select('media_id'),
knex('releases_posters').select('media_id'),
knex('releases_photos').select('media_id'),
knex('releases_covers').select('media_id'),
knex('releases_trailers').select('media_id'),
knex('releases_teasers').select('media_id'),
knex('movies_covers').select('media_id'),
knex('movies_trailers').select('media_id'),
2020-12-30 01:23:43 +00:00
knex('actors').select(knex.raw('avatar_media_id as media_id')),
knex('actors_profiles').select(knex.raw('avatar_media_id as media_id')),
knex('actors_photos').select('media_id'),
knex('clips_photos').select('media_id'),
knex('clips_posters').select('media_id'),
)
.as('associations'),
)
.whereRaw('associations.media_id = media.id'),
)
.returning(['media.path', 'media.thumbnail', 'media.lazy'])
.delete();
await Promise.all(orphanedMedia.map(media => Promise.all([
media.path && fsPromises.unlink(path.join(config.media.path, media.path)).catch(() => { /* probably file not found */ }),
media.thumbnail && fsPromises.unlink(path.join(config.media.path, media.thumbnail)).catch(() => { /* probably file not found */ }),
media.lazy && fsPromises.unlink(path.join(config.media.path, media.lazy)).catch(() => { /* probably file not found */ }),
])));
logger.info(`Removed ${orphanedMedia.length} media files from database and storage`);
await fsPromises.rmdir(path.join(config.media.path, 'temp'), { recursive: true });
logger.info('Cleared temporary media directory');
}
module.exports = {
associateAvatars,
associateReleaseMedia,
flushOrphanedMedia,
};