Storing pHash. Fixed RedGIFs user agent.

This commit is contained in:
ThePendulum 2023-03-12 22:39:00 +01:00
parent b811ebe2ef
commit 8a978cb803
15 changed files with 852 additions and 94 deletions

768
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -48,6 +48,7 @@
"node-fetch": "^2.1.2",
"object.omit": "^3.0.0",
"object.pick": "^1.3.0",
"sharp-phash": "^2.1.0",
"snoowrap": "^1.20.0",
"template-format": "^1.2.4",
"unprint": "^0.8.1",

View File

@ -50,6 +50,8 @@ async function fetchPredata(hosts) {
if (methods[host?.method]?.fetchPredata) {
const data = await methods[host.method].fetchPredata();
logger.info(`Fetched predata for ${host.method}`);
return {
...acc,
[host.method]: data,
@ -108,8 +110,6 @@ async function getDirectContent(links, ep) {
const predata = await fetchPredata(hosts.map(({ host }) => host));
console.log('app predata', predata);
return Promise.map(hosts, async ({ link, host }) => {
const info = await getInfo(host, { reddit, link, predata });

View File

@ -2,9 +2,11 @@
const crypto = require('crypto');
const hashPost = post => crypto
.createHash('md5')
.update(post.id + post.subreddit_id + post.created_utc + post.title)
.digest('hex');
function hashPost(post) {
return crypto
.createHash('md5')
.update(post.id + post.subreddit_id + post.created_utc + post.title)
.digest('hex');
}
module.exports = hashPost;

View File

@ -1,12 +1,12 @@
'use strict';
const config = require('config');
const { isAfter, isBefore, isEqual } = require('date-fns');
const omit = require('object.omit');
const dissectLink = require('../dissectLink');
const hashPost = require('./hashPost');
const { isAfter, isBefore, isEqual } = require('date-fns');
const logger = require('../logger')(__filename);
function report(curatedPosts, indexed, user, args) {
@ -58,7 +58,7 @@ function curatePost(acc, post, user, index, indexed, ignoreIds, processed, args)
datetime: new Date(post.created_utc * 1000),
subreddit: post.subreddit.display_name,
score: post.score,
preview: post.preview ? post.preview.images.map(image => image.source) : null,
preview: post.preview ? post.preview.images.map((image) => image.source) : null,
host,
direct: post.direct,
comments: post.comments,
@ -66,7 +66,7 @@ function curatePost(acc, post, user, index, indexed, ignoreIds, processed, args)
};
if (indexed.entries.length) {
const indexedPost = indexed.entries.find(entry => entry.id === post.id);
const indexedPost = indexed.entries.find((entry) => entry.id === post.id);
if (indexedPost && !args.redownload) {
curatedPost.previewFallback = indexedPost.preview;
@ -96,7 +96,7 @@ function curatePost(acc, post, user, index, indexed, ignoreIds, processed, args)
return acc;
}
const ignoring = args.ignore ? args.ignore.find(prop => post[prop]) : null;
const ignoring = args.ignore ? args.ignore.find((prop) => post[prop]) : null;
if (ignoring) {
logger.verbose(`Ignoring ${ignoring} post '${post.title}' (${permalink})`);
@ -139,7 +139,7 @@ function curatePost(acc, post, user, index, indexed, ignoreIds, processed, args)
const curatePosts = (userPosts, ignoreIdsArray, args) => Object.values(userPosts).reduce((accPosts, user) => {
const processed = new Set();
const ignoreIds = new Set(ignoreIdsArray.map(postId => String(postId).toLowerCase()));
const ignoreIds = new Set(ignoreIdsArray.map((postId) => String(postId).toLowerCase()));
const indexedByDate = user.indexed.original.sort((entryA, entryB) => new Date(entryA.date) - new Date(entryB.date));
const indexed = {
@ -162,7 +162,7 @@ const curatePosts = (userPosts, ignoreIdsArray, args) => Object.values(userPosts
report(curatedPosts, indexed, user, args);
const indexedOriginal = user.indexed.original.filter(entry => !curatedPosts.indexedUpdated.find(post => post.id === entry.id));
const indexedOriginal = user.indexed.original.filter((entry) => !curatedPosts.indexedUpdated.find((post) => post.id === entry.id));
return {
...accPosts,

View File

@ -58,7 +58,10 @@ async function getBuffers(item, context) {
}
const sources = item.mux ? [item.url].concat(item.mux) : [item.url];
const buffers = await Promise.map(sources, (source) => fetchItem(source, 0, context));
const buffers = await Promise.map(sources, (source) => fetchItem(source, 0, {
item,
...context,
}));
if (buffers.filter((buffer) => buffer).length > 0) {
return buffers;
@ -105,31 +108,37 @@ function getFilepath(item, content, host, post, user) {
async function fetchSaveUserContent(user, ep, args) {
const profilePaths = await saveProfileDetails(user, args);
const hashes = new Set(user.indexed.original.map((item) => item.hash));
const hashes = new Set(user.indexed.original.flatMap((item) => [item.hash, item.phash]).filter(Boolean));
const posts = await Promise.map(user.posts, async (post) => {
if (!post.content) {
return null;
}
const hash = await Promise.reduce(post.content.items, async (accItems, originalItem, index) => {
const items = await Promise.map(post.content.items, async (originalItem, index) => {
const item = { ...originalItem, index };
const buffers = await getBuffers(item, { post, host: post.host, headers: post.content.headers || item.headers });
// no buffers, ignore item
if (!buffers || buffers.length === 0) {
return accItems;
return null;
}
item.hash = buffers[0].hash;
item.phash = buffers[0].phash;
// prevent duplicates
if (config.fetch.avoidDuplicates && hashes.has(buffers[0].hash)) {
if (config.fetch.avoidDuplicates && hashes.has(buffers[0].hash) && !item.album) {
logger.verbose(`Ignoring duplicate file '${post.url}' (${post.permalink})`);
return buffers[0].hash;
return {
...item,
ignored: true,
};
}
const filepath = getFilepath(item, post.content, post.host, post, user);
const sourcePaths = await save(filepath, buffers, item, post);
const sourcePaths = await save(filepath, buffers.map(({ buffer }) => buffer), item, post);
hashes.add(buffers[0].hash);
@ -139,12 +148,14 @@ async function fetchSaveUserContent(user, ep, args) {
await addMeta(filepath, item, post, user, ep);
return buffers[0].hash;
}, []);
return item;
});
return {
...post,
hash,
hash: items[0]?.hash,
phash: items[0]?.phash,
content: items.filter(Boolean),
};
}, {
concurrency: config.fetch.concurrency,
@ -154,7 +165,7 @@ async function fetchSaveUserContent(user, ep, args) {
}
async function fetchSaveDirectContent(content, host, ep) {
return Promise.reduce(content.items, async (accItems, originalItem, index) => {
return Promise.map(content.items, async (originalItem, index) => {
logger.info(`Fetching and saving '${host.url}'`);
const item = { ...originalItem, index };
@ -162,7 +173,7 @@ async function fetchSaveDirectContent(content, host, ep) {
// no buffers, ignore item
if (!buffers || buffers.length === 0) {
return accItems;
return;
}
const filepath = getFilepath(item, content, host, null, null);
@ -173,9 +184,7 @@ async function fetchSaveDirectContent(content, host, ep) {
}
await addMeta(filepath, item, null, null, ep);
return sourcePaths;
}, []);
}, { concurrency: 5 });
}
module.exports = {

View File

@ -18,8 +18,6 @@ async function attachContentInfo(users, { reddit, predata }) {
return accPosts;
}
console.log('attach predata', predata[post.host.method]);
try {
return [
...accPosts,
@ -60,7 +58,10 @@ async function attachContentInfo(users, { reddit, predata }) {
async function getInfo(host, { reddit, url, predata }) {
if (host === null) {
try {
const info = await methods.tube(host, null, reddit);
const info = await methods.tube(host, null, {
reddit,
predata: predata.tube,
});
return info;
} catch (error) {
@ -70,7 +71,10 @@ async function getInfo(host, { reddit, url, predata }) {
}
}
return (methods[host.method].fetchInfo || methods[host.method])(host, null, { reddit, predata });
return (methods[host.method].fetchInfo || methods[host.method])(host, null, {
reddit,
predata: predata[host.method],
});
}
module.exports = {

View File

@ -3,23 +3,26 @@
const config = require('config');
const bhttp = require('bhttp');
const blake2 = require('blake2');
const phash = require('sharp-phash');
// const phashDistance = require('sharp-phash/distance');
const logger = require('../logger')(__filename);
const limiter = require('../limiter').items;
async function fetchItem(url, attempt, { post, host, headers }) {
async function fetchItem(url, attempt, context) {
async function retry(error) {
logger.warn(`Failed to fetch '${url}', ${attempt < config.fetch.retries ? 'retrying' : 'giving up'}: ${error.message} (${post ? post.permalink : 'no post'})`);
logger.warn(`Failed to fetch '${url}', ${attempt < config.fetch.retries ? 'retrying' : 'giving up'}: ${error.message} (${context.post ? context.post.permalink : 'no post'})`);
if (attempt < config.fetch.retries) {
return fetchItem(url, attempt + 1, { post, host, headers });
return fetchItem(url, attempt + 1, context);
}
return null;
}
try {
const res = await limiter.schedule(async () => bhttp.get(url, { headers }));
// throw new Error('Failed it!');
const res = await limiter.schedule(async () => bhttp.get(url, { headers: context.headers }));
if (res.statusCode !== 200) {
throw new Error(`Response not OK for ${url} (${res.statusCode}): ${res.body.toString()}`);
@ -29,13 +32,21 @@ async function fetchItem(url, attempt, { post, host, headers }) {
throw new Error(`Unexpected response for ${url} (${res.statusCode}): ${res.body}`);
}
logger.debug(`Fetched '${host ? host.url : url}' (${post ? post.permalink : 'no post'})`);
logger.debug(`Fetched '${context.host?.url || url}' (${context.post?.permalink || 'no post'})`);
const hash = blake2.createHash('blake2b', { digestLength: 24 });
hash.update(res.body);
const contentHash = hash.digest('hex');
return Object.assign(res.body, { hash: contentHash });
const phashResult = context.item?.type?.includes('image/')
? await phash(res.body)
: null;
return {
buffer: res.body,
hash: contentHash,
phash: phashResult,
};
} catch (error) {
return retry(error);
}

View File

@ -31,7 +31,7 @@ async function fetchPredata() {
return data;
}
async function imgurImageApi(host, post, { predata } = {}) {
async function imgurImageApi(host, post, { predata }) {
if (predata.remaining === 10) { // keep a buffer
throw new Error(`Reached Imgur API rate limit with source '${host.url}'`);
}

View File

@ -21,7 +21,6 @@ async function fetchPredata() {
address: data.addr,
agent: data.agent,
token: data.token,
userAgent,
};
}
@ -66,7 +65,7 @@ async function redgifsApi(host, post, { predata }) {
const res = await fetch(`https://api.redgifs.com/v2/gifs/${host.id.toLowerCase()}`, {
headers: {
authorization: `Bearer ${predata.token}`,
'user-agent': predata.userAgent,
'user-agent': predata.agent,
},
});
@ -99,7 +98,7 @@ async function redgifsApi(host, post, { predata }) {
original: data.gif,
}],
headers: {
'user-agent': predata.userAgent,
'user-agent': predata.agent,
},
};

View File

@ -5,23 +5,24 @@ const fs = require('fs-extra');
const logger = require('../logger')(__filename);
function mux(target, sources) {
return new Promise((resolve, reject) => sources.reduce((acc, source) => acc.input(source), ffmpeg())
.videoCodec('copy')
.audioCodec('copy')
.on('start', () => {
logger.verbose(`Muxing ${sources.length} streams to '${target}'`);
})
.on('end', (stdout) => {
logger.verbose(`Muxed and saved '${target}'`);
async function mux(target, sources) {
return new Promise((resolve, reject) => {
sources.reduce((acc, source) => acc.input(source), ffmpeg())
.videoCodec('copy')
.audioCodec('copy')
.on('start', () => {
logger.verbose(`Muxing ${sources.length} streams to '${target}'`);
})
.on('end', (stdout) => {
logger.verbose(`Muxed and saved '${target}'`);
resolve(stdout);
})
.on('error', () => reject)
.save(target))
.then(() => Promise.all(sources.map(source => fs.remove(source))).then(() => {
logger.verbose(`Cleaned up temporary files for '${target}'`);
}));
resolve(stdout);
})
.on('error', () => reject)
.save(target);
}).then(() => Promise.all(sources.map((source) => fs.remove(source))).then(() => {
logger.verbose(`Cleaned up temporary files for '${target}'`);
}));
}
module.exports = mux;

View File

@ -40,7 +40,7 @@ async function saveProfileImage(user, args) {
try {
const { protocol, hostname, pathname } = new URL(image);
const stream = await fetchItem(`${protocol}//${hostname}${pathname}`, 0, { permalink: `https://reddit.com/user/${user.name}` });
const { buffer: stream } = await fetchItem(`${protocol}//${hostname}${pathname}`, 0, { permalink: `https://reddit.com/user/${user.name}` });
const targets = await save(filepath, stream);
return targets[0];

View File

@ -8,6 +8,18 @@ const interpolate = require('../interpolate');
const save = require('./save');
const logger = require('../logger')(__filename);
function curatePosts(newPosts, indexedPosts) {
// console.log(indexedPosts, 'new');
const newIds = new Set(newPosts.map((post) => post.id));
const uniqueIndexedPosts = indexedPosts.filter((post) => !newIds.has(post.id));
const combinedPosts = newPosts.concat(uniqueIndexedPosts);
// console.log(combinedPosts);
return combinedPosts;
}
async function writeToIndex(posts, profilePaths, user, args) {
const filepath = interpolate(config.library.index.file, null, null, null, null, user, false);
const now = new Date();
@ -24,6 +36,8 @@ async function writeToIndex(posts, profilePaths, user, args) {
score: post.score,
title: post.title,
hash: post.hash,
phash: post.phash,
content: post.content,
};
if (post.previewFallback) {
@ -34,11 +48,12 @@ async function writeToIndex(posts, profilePaths, user, args) {
});
const data = {
updated: new Date(),
profile: {
image: profilePaths.image,
description: profilePaths.description,
},
posts: newAndUpdatedEntries.concat(user.indexed.original),
posts: curatePosts(newAndUpdatedEntries, user.indexed.original),
};
if (!data.profile.image && !data.profile.description && !data.posts.length) {
@ -46,7 +61,7 @@ async function writeToIndex(posts, profilePaths, user, args) {
}
try {
const yamlIndex = yaml.safeDump(data);
const yamlIndex = yaml.safeDump(data, { skipInvalid: true });
const saved = await save(filepath, Buffer.from(yamlIndex, 'utf8'));
logger.info(`Saved index with ${posts.length} new posts for ${user.name}`);

View File

@ -2,8 +2,8 @@
const Promise = require('bluebird');
const getIndex = require('./getIndex.js');
const curateUser = require('../curate/user.js');
const getIndex = require('./getIndex');
const curateUser = require('../curate/user');
const logger = require('../logger')(__filename);
const limiter = require('../limiter').reddit;
@ -23,7 +23,7 @@ async function getUser(username, reddit) {
}
}
const getPostsWrap = reddit => function getPosts(postIds, userPosts = {}) {
const getPostsWrap = (reddit) => function getPosts(postIds, userPosts = {}) {
return Promise.reduce(postIds, (accUserPosts, postId) => Promise.resolve().then(async () => {
const post = await limiter.schedule(async () => reddit.getSubmission(postId).fetch());

View File

@ -2,9 +2,9 @@
const Promise = require('bluebird');
const getIndex = require('./getIndex.js');
const getArchivePostIds = require('../archives/getArchivePostIds.js');
const curateUser = require('../curate/user.js');
const getIndex = require('./getIndex');
const getArchivePostIds = require('../archives/getArchivePostIds');
const curateUser = require('../curate/user');
const logger = require('../logger')(__filename);
const limiter = require('../limiter').reddit;
@ -46,9 +46,9 @@ async function getPosts(username, reddit, args) {
}
async function getArchivedPosts(username, posts, reddit) {
const postIds = await getArchivePostIds(username, posts.map(post => post.id));
const postIds = await getArchivePostIds(username, posts.map((post) => post.id));
return Promise.all(postIds.map(postId => limiter.schedule(async () => reddit.getSubmission(postId).fetch())));
return Promise.all(postIds.map((postId) => limiter.schedule(async () => reddit.getSubmission(postId).fetch())));
}
function getUserPostsWrap(reddit, args) {