Hashing media in transform in an attempt to improve reliability. Added seed file for user abilities.
This commit is contained in:
51
src/media.js
51
src/media.js
@@ -28,7 +28,6 @@ const chunk = require('./utils/chunk');
|
||||
const { get } = require('./utils/qu');
|
||||
const { fetchEntityReleaseIds } = require('./entity-releases');
|
||||
|
||||
// const pipeline = util.promisify(stream.pipeline);
|
||||
const streamQueue = taskQueue();
|
||||
|
||||
const s3 = new S3Client({
|
||||
@@ -516,16 +515,6 @@ async function storeImageFile(media, hashDir, hashSubDir, filename, filedir, fil
|
||||
writeLazy(image, lazypath, info),
|
||||
]);
|
||||
|
||||
/*
|
||||
if (isProcessed) {
|
||||
// file already stored, remove temporary file
|
||||
await fsPromises.unlink(media.file.path);
|
||||
} else {
|
||||
// image not processed, simply move temporary file to final location
|
||||
await fsPromises.rename(media.file.path, path.join(config.media.path, filepath));
|
||||
}
|
||||
*/
|
||||
|
||||
await fsPromises.unlink(media.file.path);
|
||||
|
||||
if (config.s3.enabled) {
|
||||
@@ -731,31 +720,31 @@ async function fetchSource(source, baseMedia) {
|
||||
}
|
||||
|
||||
async function attempt(attempts = 1) {
|
||||
const hasher = new blake2.Hash('blake2b', { digestLength: 24 });
|
||||
let hasherReady = true;
|
||||
hasher.setEncoding('hex');
|
||||
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`);
|
||||
let tempFileTarget;
|
||||
|
||||
const hasher = blake2.createHash('blake2b', { digestLength: 24 });
|
||||
|
||||
let size = 0;
|
||||
|
||||
const hashStream = new stream.Transform({
|
||||
transform(streamChunk, _encoding, callback) {
|
||||
size += streamChunk.length;
|
||||
hasher.update(streamChunk);
|
||||
this.push(streamChunk);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}`);
|
||||
const tempFileTarget = fs.createWriteStream(tempFilePath);
|
||||
const hashStream = new stream.PassThrough();
|
||||
let size = 0;
|
||||
|
||||
hashStream.on('data', (streamChunk) => {
|
||||
size += streamChunk.length;
|
||||
|
||||
if (hasherReady) {
|
||||
hasher.write(streamChunk);
|
||||
}
|
||||
});
|
||||
tempFileTarget = fs.createWriteStream(tempFilePath);
|
||||
|
||||
const { mimetype } = source.stream
|
||||
? await streamQueue.push('fetchStreamSource', { source, tempFileTarget, hashStream })
|
||||
: await fetchHttpSource(source, tempFileTarget, hashStream);
|
||||
|
||||
hasher.end();
|
||||
const hash = hasher.digest('hex');
|
||||
|
||||
const hash = hasher.read();
|
||||
const [type, subtype] = mimetype.split('/');
|
||||
const extension = mime.getExtension(mimetype);
|
||||
|
||||
@@ -778,8 +767,10 @@ async function fetchSource(source, baseMedia) {
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
hasherReady = false;
|
||||
hasher.end();
|
||||
// hasherReady = false;
|
||||
// hasher.end();
|
||||
hashStream.destroy();
|
||||
tempFileTarget?.destroy();
|
||||
|
||||
if (error.code !== 'VERIFY_TYPE') {
|
||||
logger.warn(`Failed attempt ${attempts}/${maxAttempts} to fetch ${source.src}: ${error.message}`);
|
||||
|
||||
Reference in New Issue
Block a user