Updated dependencies. Added periodic memory logger.
This commit is contained in:
38
src/media.js
38
src/media.js
@@ -190,7 +190,7 @@ function sortBaseTrailersByQuality(sources, role) {
|
||||
|
||||
function fallbackMediaToBaseMedia(rawMedia, role, metadata) {
|
||||
const baseSources = rawMedia
|
||||
.map(source => toBaseSource(source))
|
||||
.map((source) => toBaseSource(source))
|
||||
.filter(Boolean);
|
||||
|
||||
const sortedBaseSources = sortBaseTrailersByQuality(baseSources, role);
|
||||
@@ -225,12 +225,12 @@ function toBaseMedias(rawMedias, role, metadata) {
|
||||
|
||||
async function findSourceDuplicates(baseMedias) {
|
||||
const sourceUrls = baseMedias
|
||||
.map(baseMedia => baseMedia.sources.map(source => source.src))
|
||||
.map((baseMedia) => baseMedia.sources.map((source) => source.src))
|
||||
.flat()
|
||||
.filter(Boolean);
|
||||
|
||||
const extractUrls = baseMedias
|
||||
.map(baseMedia => baseMedia.sources.map(source => source.url))
|
||||
.map((baseMedia) => baseMedia.sources.map((source) => source.url))
|
||||
.flat()
|
||||
.filter(Boolean);
|
||||
|
||||
@@ -246,12 +246,12 @@ async function findSourceDuplicates(baseMedias) {
|
||||
}
|
||||
|
||||
async function findHashDuplicates(medias) {
|
||||
const hashes = medias.map(media => media.meta?.hash || media.entry?.hash).filter(Boolean);
|
||||
const hashes = medias.map((media) => media.meta?.hash || media.entry?.hash).filter(Boolean);
|
||||
|
||||
const existingHashMediaEntries = await knex('media').whereIn('hash', hashes);
|
||||
const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash');
|
||||
|
||||
const uniqueHashMedias = medias.filter(media => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]);
|
||||
const uniqueHashMedias = medias.filter((media) => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]);
|
||||
|
||||
const { selfDuplicateMedias, selfUniqueMediasByHash } = uniqueHashMedias.reduce((acc, media) => {
|
||||
if (!media.meta?.hash) {
|
||||
@@ -278,8 +278,8 @@ async function findHashDuplicates(medias) {
|
||||
const selfUniqueHashMedias = Object.values(selfUniqueMediasByHash);
|
||||
|
||||
const existingHashMedias = medias
|
||||
.filter(media => existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash])
|
||||
.map(media => ({
|
||||
.filter((media) => existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash])
|
||||
.map((media) => ({
|
||||
...media,
|
||||
entry: existingHashMediaEntriesByHash[media.entry?.hash || media.meta?.hash],
|
||||
}))
|
||||
@@ -563,8 +563,8 @@ streamQueue.define('fetchStreamSource', async ({ source, tempFileTarget, hashStr
|
||||
const video = ffmpeg(source.stream)
|
||||
.format('mp4')
|
||||
.outputOptions(['-movflags frag_keyframe+empty_moov'])
|
||||
.on('start', cmd => logger.verbose(`Fetching stream from ${source.stream} with "${cmd}"`))
|
||||
.on('error', error => logger.error(`Failed to fetch stream from ${source.stream}: ${error.message}`))
|
||||
.on('start', (cmd) => logger.verbose(`Fetching stream from ${source.stream} with "${cmd}"`))
|
||||
.on('error', (error) => logger.error(`Failed to fetch stream from ${source.stream}: ${error.message}`))
|
||||
.pipe();
|
||||
|
||||
await pipeline(video, hashStream, tempFileTarget);
|
||||
@@ -745,7 +745,7 @@ async function storeMedias(baseMedias, options) {
|
||||
|
||||
const fetchedMedias = await Promise.map(
|
||||
baseMedias,
|
||||
async baseMedia => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }),
|
||||
async (baseMedia) => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }),
|
||||
{ concurrency: 100 }, // don't overload disk (or network, although this has its own throttling)
|
||||
);
|
||||
|
||||
@@ -753,7 +753,7 @@ async function storeMedias(baseMedias, options) {
|
||||
|
||||
const savedMedias = await Promise.map(
|
||||
uniqueHashMedias,
|
||||
async baseMedia => storeFile(baseMedia, options),
|
||||
async (baseMedia) => storeFile(baseMedia, options),
|
||||
{ concurrency: 100 }, // don't overload disk
|
||||
);
|
||||
|
||||
@@ -761,13 +761,13 @@ async function storeMedias(baseMedias, options) {
|
||||
// overwrite files in case image processing was changed
|
||||
await Promise.map(
|
||||
existingHashMedias,
|
||||
async baseMedia => storeFile(baseMedia, options),
|
||||
async (baseMedia) => storeFile(baseMedia, options),
|
||||
{ concurrency: 100 }, // don't overload disk
|
||||
);
|
||||
}
|
||||
|
||||
const newMediaWithEntries = savedMedias.filter(Boolean).map((media, index) => curateMediaEntry(media, index));
|
||||
const newMediaEntries = newMediaWithEntries.filter(media => media.newEntry).map(media => media.entry);
|
||||
const newMediaEntries = newMediaWithEntries.filter((media) => media.newEntry).map((media) => media.entry);
|
||||
|
||||
try {
|
||||
await bulkInsert('media', newMediaEntries, false);
|
||||
@@ -851,7 +851,7 @@ async function associateAvatars(profiles) {
|
||||
return profiles;
|
||||
}
|
||||
|
||||
const profilesWithBaseMedias = profiles.map(profile => (profile.avatar
|
||||
const profilesWithBaseMedias = profiles.map((profile) => (profile.avatar
|
||||
? {
|
||||
...profile,
|
||||
avatarBaseMedia: toBaseMedias([profile.avatar], 'avatars', {
|
||||
@@ -862,7 +862,7 @@ async function associateAvatars(profiles) {
|
||||
: profile
|
||||
));
|
||||
|
||||
const baseMedias = profilesWithBaseMedias.map(profile => profile.avatarBaseMedia).filter(Boolean);
|
||||
const baseMedias = profilesWithBaseMedias.map((profile) => profile.avatarBaseMedia).filter(Boolean);
|
||||
|
||||
const storedMedias = await storeMedias(baseMedias, { stats: true });
|
||||
const storedMediasById = itemsByKey(storedMedias, 'id');
|
||||
@@ -885,13 +885,13 @@ async function associateAvatars(profiles) {
|
||||
|
||||
async function deleteS3Objects(media) {
|
||||
const objects = media
|
||||
.map(item => [
|
||||
.map((item) => [
|
||||
{ Key: item.path },
|
||||
{ Key: item.thumbnail },
|
||||
{ Key: item.lazy },
|
||||
])
|
||||
.flat()
|
||||
.filter(item => item.Key);
|
||||
.filter((item) => item.Key);
|
||||
|
||||
const status = await s3.deleteObjects({
|
||||
Bucket: config.s3.bucket,
|
||||
@@ -936,7 +936,7 @@ async function flushOrphanedMedia() {
|
||||
.returning(['media.id', 'media.is_s3', 'media.path', 'media.thumbnail', 'media.lazy'])
|
||||
.delete();
|
||||
|
||||
await Promise.all(orphanedMedia.filter(media => !media.is_s3).map(media => Promise.all([
|
||||
await Promise.all(orphanedMedia.filter((media) => !media.is_s3).map((media) => Promise.all([
|
||||
media.path && fsPromises.unlink(path.join(config.media.path, media.path)).catch(() => { /* probably file not found */ }),
|
||||
media.thumbnail && fsPromises.unlink(path.join(config.media.path, media.thumbnail)).catch(() => { /* probably file not found */ }),
|
||||
media.lazy && fsPromises.unlink(path.join(config.media.path, media.lazy)).catch(() => { /* probably file not found */ }),
|
||||
@@ -945,7 +945,7 @@ async function flushOrphanedMedia() {
|
||||
logger.info(`Removed ${orphanedMedia.length} media files from database and storage`);
|
||||
|
||||
if (config.s3.enabled) {
|
||||
await deleteS3Objects(orphanedMedia.filter(media => media.is_s3));
|
||||
await deleteS3Objects(orphanedMedia.filter((media) => media.is_s3));
|
||||
}
|
||||
|
||||
await fsPromises.rmdir(path.join(config.media.path, 'temp'), { recursive: true });
|
||||
|
||||
Reference in New Issue
Block a user