Added hash comparison to duplicate avoidance.

This commit is contained in:
2019-11-05 04:52:17 +01:00
parent 648013c77b
commit 7de8f94caa
7 changed files with 305 additions and 35 deletions

View File

@@ -2,16 +2,16 @@
const config = require('config');
const Promise = require('bluebird');
const yaml = require('js-yaml');
const saveProfileDetails = require('../save/profileDetails.js');
const fetchItem = require('./item.js');
const interpolate = require('../interpolate.js');
const save = require('../save/save.js');
const textToStream = require('../save/textToStream.js');
// const textToStream = require('../save/textToStream.js');
const saveMeta = require('../save/meta.js');
const mux = require('../save/mux.js');
const writeToIndex = require('../save/writeToIndex.js');
const yaml = require('js-yaml');
function curateComments(comments) {
return comments.map((comment) => {
@@ -48,16 +48,19 @@ function selfPostToText(item, post) {
return yaml.safeDump(curatedPost);
}
async function getStreams(item, post) {
async function getBuffers(item, post) {
if (item.self) {
return [textToStream(selfPostToText(item, post))];
return [{
...Buffer.from(selfPostToText(item, post), 'utf8'),
hash: post.hash,
}];
}
const sources = item.mux ? [item.url].concat(item.mux) : [item.url];
const streams = await Promise.map(sources, source => fetchItem(source, 0, post));
const buffers = await Promise.map(sources, source => fetchItem(source, 0, post));
if (streams.filter(stream => stream).length > 0) {
return streams;
if (buffers.filter(buffer => buffer).length > 0) {
return buffers;
}
return null;
@@ -101,19 +104,32 @@ function getFilepath(item, content, host, post, user) {
async function fetchSaveUserContent(user, ep, args) {
const profilePaths = await saveProfileDetails(user, args);
const hashes = new Set(user.indexed.original.map(item => item.hash));
const posts = await Promise.map(user.posts, async (post) => {
await Promise.reduce(post.content.items, async (accItems, originalItem, index) => {
const hash = await Promise.reduce(post.content.items, async (accItems, originalItem, index) => {
const item = { ...originalItem, index };
const streams = await getStreams(item, post);
const buffers = await getBuffers(item, post);
// no streams, ignore item
if (!streams || streams.length === 0) {
// no buffers, ignore item
if (!buffers || buffers.length === 0) {
return accItems;
}
// prevent duplicates
if (config.fetch.avoidDuplicates && hashes.has(buffers[0].hash)) {
console.log(
'\x1b[33m%s\x1b[0m',
`Ignoring duplicate file '${post.url}' (${post.permalink})`,
);
return buffers[0].hash;
}
const filepath = getFilepath(item, post.content, post.host, post, user);
const sourcePaths = await save(filepath, streams, item, post);
const sourcePaths = await save(filepath, buffers, item, post);
hashes.add(buffers[0].hash);
if (item.mux) {
await mux(filepath, sourcePaths, item);
@@ -121,10 +137,13 @@ async function fetchSaveUserContent(user, ep, args) {
await addMeta(filepath, item, post, user, ep);
return sourcePaths;
return buffers[0].hash;
}, []);
return post;
return {
...post,
hash,
};
}, {
concurrency: config.fetch.concurrency,
});
@@ -135,15 +154,15 @@ async function fetchSaveUserContent(user, ep, args) {
async function fetchSaveDirectContent(content, host, ep) {
return Promise.reduce(content.items, async (accItems, originalItem, index) => {
const item = { ...originalItem, index };
const streams = await getStreams(item, null);
const buffers = await getBuffers(item, null);
// no streams, ignore item
if (!streams || streams.length === 0) {
// no buffers, ignore item
if (!buffers || buffers.length === 0) {
return accItems;
}
const filepath = getFilepath(item, content, host, null, null);
const sourcePaths = await save(filepath, streams, item, null);
const sourcePaths = await save(filepath, buffers, item, null);
if (item.mux) {
await mux(filepath, sourcePaths, item);

View File

@@ -1,7 +1,8 @@
'use strict';
const config = require('config');
const fetch = require('node-fetch');
const bhttp = require('bhttp');
const blake2 = require('blake2');
async function fetchItem(url, attempt, post) {
async function retry(error) {
@@ -17,16 +18,21 @@ async function fetchItem(url, attempt, post) {
}
try {
const res = await fetch(url);
const res = await bhttp.get(url);
if (!res.ok) {
if (!res.statusCode === 200) {
throw new Error(`Response not OK for '${url}', HTTP code '${res.status}'`);
}
console.log(`Fetched '${url}' (${post ? post.permalink : 'no post'})`);
return res.body;
const hash = blake2.createHash('blake2b', { digestLength: 24 });
hash.update(res.body);
const contentHash = hash.digest('hex');
return Object.assign(res.body, { hash: contentHash });
} catch (error) {
console.log(error);
return retry(error);
}
}

View File

@@ -6,7 +6,7 @@ const UrlPattern = require('url-pattern');
const interpolate = require('../interpolate.js');
const fetchItem = require('../fetch/item.js');
const textToStream = require('./textToStream.js');
// const textToStream = require('./textToStream.js');
const save = require('./save.js');
async function saveProfileImage(user, args) {
@@ -38,7 +38,8 @@ async function saveProfileImage(user, args) {
);
try {
const stream = await fetchItem(image, 0, { permalink: `https://reddit.com/user/${user.name}` });
const { protocol, hostname, pathname } = new URL(image);
const stream = await fetchItem(`${protocol}//${hostname}${pathname}`, 0, { permalink: `https://reddit.com/user/${user.name}` });
const targets = await save(filepath, stream);
return targets[0];
@@ -62,7 +63,7 @@ async function saveProfileDescription(user, args) {
if (config.library.profile.description && !user.fallback && !user.deleted) {
if (user.profile && user.profile.description) {
const filepath = interpolate(config.library.profile.description, null, null, null, null, user);
const stream = textToStream(user.profile.description);
const stream = Buffer.from(user.profile.description, 'utf8');
try {
const targets = await save(filepath, stream);

View File

@@ -26,7 +26,20 @@ function getPathElements(requestedFilepath) {
};
}
function pipeStreamToFile(target, stream, item) {
async function writeBufferToFile(target, buffer, item) {
await fs.writeFile(target, buffer);
if (item && item.mux) {
console.log(`Temporarily saved '${target}', queued for muxing`);
} else {
console.log('\x1b[32m%s\x1b[0m', `Saved '${target}'`);
}
return target;
}
/*
async function pipeStreamToFile(target, stream, item) {
const file = fs.createWriteStream(target);
return new Promise((resolve, reject) => {
@@ -57,5 +70,19 @@ async function save(requestedFilepath, streamOrStreams, item) {
return pipeStreamToFile(target, stream, item);
}));
}
*/
async function save(requestedFilepath, bufferOrBuffers, item) {
const pathElements = getPathElements(requestedFilepath);
const buffers = [].concat(bufferOrBuffers); // allow for single stream argument
await fs.ensureDir(pathElements.dir);
return Promise.all(buffers.map((buffer, index) => {
const target = path.join(pathElements.root, pathElements.dir, `${pathElements.name}${buffers.length > 1 ? `-${index}` : ''}${pathElements.ext}`);
return writeBufferToFile(target, buffer, item);
}));
}
module.exports = save;

View File

@@ -4,7 +4,7 @@ const config = require('config');
const yaml = require('js-yaml');
const interpolate = require('../interpolate');
const textToStream = require('./textToStream');
// const textToStream = require('./textToStream');
const save = require('./save');
async function writeToIndex(posts, profilePaths, user, args) {
@@ -22,6 +22,7 @@ async function writeToIndex(posts, profilePaths, user, args) {
indexed: now,
score: post.score,
title: post.title,
hash: post.hash,
};
if (post.previewFallback) {
@@ -43,7 +44,8 @@ async function writeToIndex(posts, profilePaths, user, args) {
return false;
}
return save(filepath, textToStream(yaml.safeDump(data)));
// return save(filepath, textToStream(yaml.safeDump(data)));
return save(filepath, Buffer.from(yaml.safeDump(data), 'utf8'));
}
module.exports = writeToIndex;