Drastically improved memory performance media module using streams and temp files.

This commit is contained in:
2020-05-07 01:53:07 +02:00
parent 79c7847f1c
commit af9d8f7858
18 changed files with 248 additions and 134 deletions

View File

@@ -2,12 +2,10 @@
const config = require('config');
const Promise = require('bluebird');
const util = require('util');
const fs = require('fs');
const fsPromises = require('fs').promises;
const path = require('path');
const stream = require('stream');
const { once } = require('events');
const { PassThrough } = require('stream');
const nanoid = require('nanoid/non-secure');
const mime = require('mime');
// const fileType = require('file-type');
@@ -20,9 +18,6 @@ const knex = require('./knex');
const http = require('./utils/http');
const { get } = require('./utils/qu');
const PassThrough = stream.PassThrough;
const pipeline = util.promisify(stream.pipeline);
function getMemoryUsage() {
return process.memoryUsage().rss / (10 ** 6);
}
@@ -268,142 +263,154 @@ async function extractSource(baseSource, { existingExtractMediaByUrl }) {
throw new Error(`Could not extract source from ${baseSource.url}: ${res.status}`);
}
async function storeFile(media) {
const hashDir = media.meta.hash.slice(0, 2);
const hashSubDir = media.meta.hash.slice(2, 4);
const hashFilename = media.meta.hash.slice(4);
async function storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath) {
const thumbdir = path.join(media.role, 'thumbs', hashDir, hashSubDir);
const thumbpath = path.join(thumbdir, filename);
const filename = media.quality
? `${hashFilename}_${media.quality}.${media.meta.extension}`
: `${hashFilename}.${media.meta.extension}`;
const image = sharp(media.file.path);
const filedir = path.join(media.role, hashDir, hashSubDir);
const filepath = path.join(filedir, filename);
const [info, stat] = await Promise.all([
image.metadata(),
fsPromises.stat(media.file.path),
]);
if (media.meta.type === 'image') {
const thumbdir = path.join(media.role, 'thumbs', hashDir, hashSubDir);
const thumbpath = path.join(thumbdir, filename);
await Promise.all([
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }),
]);
await Promise.all([
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }),
]);
// generate thumbnail
await sharp(media.file.path)
.resize({
height: config.media.thumbnailSize,
withoutEnlargement: true,
})
.jpeg({ quality: config.media.thumbnailQuality })
.toFile(path.join(config.media.path, thumbpath));
await Promise.all([
fsPromises.rename(media.file.path, path.join(config.media.path, filepath)),
fsPromises.rename(media.file.thumbnail, path.join(config.media.path, thumbpath)),
]);
if (media.meta.subtype === 'jpeg') {
// move temp file to permanent location
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
} else {
// convert to JPEG and write to permanent location
await sharp(media.file.path)
.jpeg()
.toFile(path.join(config.media.path, filepath));
return {
...media,
file: {
path: filepath,
thumbnail: thumbpath,
},
};
// remove temp file
await fsPromises.unlink(media.file.path);
}
await fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true });
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
logger.silly(`Stored media file at permanent location ${filepath}`);
const memoryUsage = getMemoryUsage();
peakMemoryUsage = Math.max(memoryUsage, peakMemoryUsage);
logger.silly(`Stored thumbnail and media from ${media.src}, memory usage ${memoryUsage.toFixed(2)} MB`);
return {
...media,
file: {
path: filepath,
thumbnail: thumbpath,
},
meta: {
...media.meta,
width: info.width,
height: info.height,
size: stat.size,
},
};
}
async function storeFile(media) {
try {
const hashDir = media.meta.hash.slice(0, 2);
const hashSubDir = media.meta.hash.slice(2, 4);
const hashFilename = media.meta.hash.slice(4);
const filename = media.quality
? `${hashFilename}_${media.quality}.${media.meta.extension}`
: `${hashFilename}.${media.meta.extension}`;
const filedir = path.join(media.role, hashDir, hashSubDir);
const filepath = path.join(filedir, filename);
if (media.meta.type === 'image') {
return storeImageFile(media, hashDir, hashSubDir, filename, filedir, filepath);
}
const [stat] = await Promise.all([
fsPromises.stat(media.file.path),
fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }),
]);
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
logger.silly(`Stored media file at permanent location ${filepath}`);
return {
...media,
file: {
path: filepath,
},
meta: {
...media.meta,
size: stat.size,
},
};
} catch (error) {
logger.warn(`Failed to store ${media.src}: ${error.message}`);
return null;
}
}
async function fetchSource(source, baseMedia) {
logger.silly(`Fetching media from ${source.src}`);
// attempts
async function attempt(attempts = 1) {
try {
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`);
const hasher = new blake2.Hash('blake2b');
hasher.setEncoding('hex');
const tempFileTarget = fs.createWriteStream(tempFilePath);
const hashStream = new PassThrough();
hashStream.on('data', chunk => hasher.write(chunk));
const res = await http.get(source.src, {
...(source.referer && { referer: source.referer }),
...(source.host && { host: source.host }),
}, {
stream: true, // sources are fetched in parallel, don't gobble up memory
transforms: [hashStream],
destination: tempFileTarget,
});
hasher.end();
const hash = hasher.read();
const { pathname } = new URL(source.src);
const mimetype = res.headers['content-type'] || mime.getType(pathname);
const [type, subtype] = mimetype.split('/');
const extension = mime.getExtension(mimetype);
if (!res.ok) {
throw new Error(`Response ${res.status} not OK`);
}
const { pathname } = new URL(source.src);
const mimetype = res.headers['content-type'] || mime.getType(pathname);
const extension = mime.getExtension(mimetype);
const type = mimetype?.split('/')[0] || 'image';
const hasher = new blake2.Hash('blake2b');
hasher.setEncoding('hex');
const hashStream = new PassThrough();
const metaStream = type === 'image'
? sharp()
: new PassThrough();
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}.${extension}`);
const tempThumbPath = path.join(config.media.path, 'temp', `${baseMedia.id}_thumb.${extension}`);
const tempFileTarget = fs.createWriteStream(tempFilePath);
const tempThumbTarget = fs.createWriteStream(tempThumbPath);
hashStream.on('data', chunk => hasher.write(chunk));
if (type === 'image') {
// generate thumbnail
metaStream
.clone()
.resize({
height: config.media.thumbnailSize,
withoutEnlargement: true,
})
.jpeg({ quality: config.media.thumbnailQuality })
.pipe(tempThumbTarget)
.on('error', error => logger.error(error));
}
// pipeline destroys streams, so attach info event first
const infoPromise = type === 'image' ? once(metaStream, 'info') : Promise.resolve([{}]);
const metaPromise = type === 'image' ? metaStream.stats() : Promise.resolve();
await pipeline(
res.originalRes,
metaStream,
hashStream,
tempFileTarget,
);
const [stats, info] = await Promise.all([metaPromise, infoPromise]);
hasher.end();
const hash = hasher.read();
const [{ size, width, height }] = info;
peakMemoryUsage = Math.max(getMemoryUsage(), peakMemoryUsage);
logger.silly(`Fetched media from ${source.src}, memory usage ${peakMemoryUsage.toFixed(2)} MB`);
return {
...source,
file: {
path: tempFilePath,
thumbnail: tempThumbPath,
},
meta: {
hash,
mimetype,
extension,
type,
hash,
entropy: stats?.entropy,
size,
width,
height,
subtype,
},
};
} catch (error) {
@@ -412,20 +419,14 @@ async function fetchSource(source, baseMedia) {
if (attempts < 3) {
await Promise.delay(1000);
return Promise.race([
attempt(attempts + 1),
Promise.delay(120 * 1000).then(() => { throw new Error(`Media fetch attempt ${attempts}/3 timed out, aborting ${source.src}`); }),
]);
return attempt(attempts + 1);
}
throw new Error(`Failed to fetch ${source.src}: ${error.message}`);
}
}
return Promise.race([
attempt(1),
Promise.delay(120 * 1000).then(() => { throw new Error(`Media fetch timed out, aborting ${source.src}`); }),
]);
return attempt(1);
}
async function trySource(baseSource, existingMedias, baseMedia) {
@@ -467,25 +468,14 @@ async function fetchMedia(baseMedia, existingMedias) {
Promise.reject(new Error()),
);
if (source.entry) {
// don't save media, already in database
return {
...baseMedia,
...source,
};
}
const memoryUsage = getMemoryUsage();
peakMemoryUsage = Math.max(memoryUsage, peakMemoryUsage);
logger.silly(`Fetched media from ${source.src}, memory usage ${memoryUsage.toFixed(2)} MB`);
return storeFile({
return {
...baseMedia,
...source,
});
/*
return saveMedia({
...baseMedia,
...source,
});
*/
};
} catch (error) {
logger.warn(error.message);
@@ -528,14 +518,19 @@ async function storeMedias(baseMedias) {
const [existingSourceMediaByUrl, existingExtractMediaByUrl] = await findSourceDuplicates(baseMedias);
const savedMedias = await Promise.map(
const fetchedMedias = await Promise.map(
baseMedias,
async baseMedia => fetchMedia(baseMedia, { existingSourceMediaByUrl, existingExtractMediaByUrl }),
);
const [uniqueHashMedias, existingHashMedias] = await findHashDuplicates(savedMedias);
const [uniqueHashMedias, existingHashMedias] = await findHashDuplicates(fetchedMedias);
const newMediaWithEntries = uniqueHashMedias.map((media, index) => curateMediaEntry(media, index));
const savedMedias = await Promise.map(
uniqueHashMedias,
async baseMedia => storeFile(baseMedia),
);
const newMediaWithEntries = savedMedias.map((media, index) => curateMediaEntry(media, index));
const newMediaEntries = newMediaWithEntries.filter(media => media.newEntry).map(media => media.entry);
await knex('media').insert(newMediaEntries);

View File

@@ -53,7 +53,7 @@ async function filterUniqueReleases(latestReleases, accReleases) {
}
function needNextPage(uniqueReleases, pageAccReleases) {
if (uniqueReleases === 0) {
if (uniqueReleases.length === 0) {
return false;
}

View File

@@ -1,10 +1,13 @@
'use strict';
const util = require('util');
const stream = require('stream');
const config = require('config');
const tunnel = require('tunnel');
const bhttp = require('bhttp');
const taskQueue = require('promise-task-queue');
const pipeline = util.promisify(stream.pipeline);
const logger = require('../logger')(__filename);
const defaultHeaders = {
@@ -68,6 +71,10 @@ queue.define('http', async ({
? await bhttp[method.toLowerCase()](url, body, reqOptions)
: await bhttp[method.toLowerCase()](url, reqOptions);
if (options.stream && options.destination) {
await pipeline(res, ...(options.transforms || []), options.destination);
}
const html = Buffer.isBuffer(res.body) ? res.body.toString() : null;
const json = Buffer.isBuffer(res.body) ? null : res.body;

110
src/utils/media.js Normal file
View File

@@ -0,0 +1,110 @@
'use strict';
const config = require('config');
const fs = require('fs');
const fsPromises = require('fs').promises;
const Promise = require('bluebird');
const blake2 = require('blake2');
const sharp = require('sharp');
const nanoid = require('nanoid');
const { PassThrough } = require('stream');
const http = require('./http');
function getMemoryUsage() {
return process.memoryUsage().rss / (10 ** 6);
}
let peakMemoryUsage = getMemoryUsage();
async function fetchSource(link) {
const id = nanoid();
const hasher = new blake2.Hash('blake2b');
hasher.setEncoding('hex');
const tempFilePath = `/home/niels/Pictures/thumbs/temp/${id}.jpeg`;
const tempFileStream = fs.createWriteStream(tempFilePath);
const hashStream = new PassThrough();
hashStream.on('data', chunk => hasher.write(chunk));
try {
const res = await http.get(link, null, {
stream: true,
transforms: [hashStream],
destination: tempFileStream,
timeout: 5000,
});
if (!res.ok) {
throw new Error(res.status);
}
hasher.end();
const hash = hasher.read();
const memoryUsage = getMemoryUsage();
peakMemoryUsage = Math.max(memoryUsage, peakMemoryUsage);
console.log(`Stored ${tempFilePath}, memory usage: ${memoryUsage.toFixed(2)} MB`);
return {
id,
path: tempFilePath,
hash,
};
} catch (error) {
await fsPromises.unlink(tempFilePath);
throw error;
}
}
async function init() {
const linksFile = await fsPromises.readFile('/home/niels/Pictures/photos', 'utf8');
const links = linksFile.split('\n').filter(Boolean);
await fsPromises.mkdir('/home/niels/Pictures/thumbs/temp', { recursive: true });
console.time('thumbs');
const files = await Promise.map(links, async (link) => {
try {
return await fetchSource(link);
} catch (error) {
console.log(`Failed to fetch ${link}: ${error.message}`);
return null;
}
});
await Promise.map(files.filter(Boolean), async (file) => {
const image = sharp(file.path).jpeg();
const [{ width, height }, { size }] = await Promise.all([
image.metadata(),
fsPromises.stat(file.path),
]);
await Promise.all([
image
.toFile(`/home/niels/Pictures/thumbs/${file.hash}.jpeg`),
image
.resize({
height: config.media.thumbnailSize,
withoutEnlargement: true,
})
.toFile(`/home/niels/Pictures/thumbs/${file.hash}_thumb.jpeg`),
]);
const memoryUsage = getMemoryUsage();
peakMemoryUsage = Math.max(memoryUsage, peakMemoryUsage);
console.log(`Resized ${file.id} (${width}, ${height}, ${size}), memory usage: ${memoryUsage.toFixed(2)} MB`);
}, { concurrency: 10 });
console.log(`Peak memory usage: ${peakMemoryUsage.toFixed(2)} MB`);
console.timeEnd('thumbs');
}
init();