Added chunking to media duplicate queries to prevent overloading parameters. Added DP Diva to Perv City (coming soon).
This commit is contained in:
17
src/app.js
17
src/app.js
@@ -85,23 +85,6 @@ async function startMemorySample(snapshotTriggers = []) {
|
||||
}, config.memorySampling.sampleDuration);
|
||||
}
|
||||
|
||||
async function startMemorySample() {
|
||||
await inspector.heap.enable();
|
||||
await inspector.heap.startSampling();
|
||||
|
||||
// monitorMemory();
|
||||
|
||||
logger.info(`Start heap sampling, memory usage: ${process.memoryUsage.rss() / 1000000} MB`);
|
||||
|
||||
setTimeout(async () => {
|
||||
await stopMemorySample();
|
||||
|
||||
if (!done) {
|
||||
await startMemorySample();
|
||||
}
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
async function init() {
|
||||
try {
|
||||
if (argv.server) {
|
||||
|
||||
44
src/media.js
44
src/media.js
@@ -21,6 +21,7 @@ const argv = require('./argv');
|
||||
const knex = require('./knex');
|
||||
const http = require('./utils/http');
|
||||
const bulkInsert = require('./utils/bulk-insert');
|
||||
const chunk = require('./utils/chunk');
|
||||
const { get } = require('./utils/qu');
|
||||
|
||||
const pipeline = util.promisify(stream.pipeline);
|
||||
@@ -63,10 +64,10 @@ function sampleMedias(medias, limit = argv.mediaLimit, preferLast = true) {
|
||||
? chunks.slice(0, -1).concat(chunks.slice(-1).reverse())
|
||||
: chunks;
|
||||
|
||||
const groupedMedias = lastPreferredChunks.map((chunk) => {
|
||||
const groupedMedias = lastPreferredChunks.map((mediaChunk) => {
|
||||
// merge chunked medias into single media with grouped fallback priorities,
|
||||
// so the first sources of each media is preferred over all second sources, etc.
|
||||
const sources = chunk
|
||||
const sources = mediaChunk
|
||||
.reduce((accSources, media) => {
|
||||
media.sources.forEach((source, index) => {
|
||||
if (!accSources[index]) {
|
||||
@@ -82,8 +83,8 @@ function sampleMedias(medias, limit = argv.mediaLimit, preferLast = true) {
|
||||
.flat();
|
||||
|
||||
return {
|
||||
id: chunk[0].id,
|
||||
role: chunk[0].role,
|
||||
id: mediaChunk[0].id,
|
||||
role: mediaChunk[0].role,
|
||||
sources,
|
||||
};
|
||||
});
|
||||
@@ -235,22 +236,41 @@ async function findSourceDuplicates(baseMedias) {
|
||||
.filter(Boolean);
|
||||
|
||||
const [existingSourceMedia, existingExtractMedia] = await Promise.all([
|
||||
knex('media').whereIn('source', sourceUrls),
|
||||
knex('media').whereIn('source_page', extractUrls),
|
||||
// my try to check thousands of URLs at once, don't pass all of them to a single query
|
||||
chunk(sourceUrls).reduce(async (chain, sourceUrlsChunk) => {
|
||||
const accUrls = await chain;
|
||||
const existingUrls = await knex('media').whereIn('source', sourceUrlsChunk);
|
||||
|
||||
return [...accUrls, ...existingUrls];
|
||||
}, []),
|
||||
chunk(extractUrls).reduce(async (chain, extractUrlsChunk) => {
|
||||
const accUrls = await chain;
|
||||
const existingUrls = await knex('media').whereIn('source_page', extractUrlsChunk);
|
||||
|
||||
return [...accUrls, ...existingUrls];
|
||||
}, []),
|
||||
]);
|
||||
|
||||
const existingSourceMediaByUrl = itemsByKey(existingSourceMedia, 'source');
|
||||
const existingExtractMediaByUrl = itemsByKey(existingExtractMedia, 'source_page');
|
||||
|
||||
return { existingSourceMediaByUrl, existingExtractMediaByUrl };
|
||||
return {
|
||||
existingSourceMediaByUrl,
|
||||
existingExtractMediaByUrl,
|
||||
};
|
||||
}
|
||||
|
||||
async function findHashDuplicates(medias) {
|
||||
const hashes = medias.map((media) => media.meta?.hash || media.entry?.hash).filter(Boolean);
|
||||
|
||||
const existingHashMediaEntries = await knex('media').whereIn('hash', hashes);
|
||||
const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash');
|
||||
const existingHashMediaEntries = await chunk(hashes, 2).reduce(async (chain, hashesChunk) => {
|
||||
const accHashes = await chain;
|
||||
const existingHashes = await knex('media').whereIn('hash', hashesChunk);
|
||||
|
||||
return [...accHashes, ...existingHashes];
|
||||
}, []);
|
||||
|
||||
const existingHashMediaEntriesByHash = itemsByKey(existingHashMediaEntries, 'hash');
|
||||
const uniqueHashMedias = medias.filter((media) => !media.entry && !existingHashMediaEntriesByHash[media.meta?.hash]);
|
||||
|
||||
const { selfDuplicateMedias, selfUniqueMediasByHash } = uniqueHashMedias.reduce((acc, media) => {
|
||||
@@ -600,11 +620,11 @@ async function fetchSource(source, baseMedia) {
|
||||
const hashStream = new stream.PassThrough();
|
||||
let size = 0;
|
||||
|
||||
hashStream.on('data', (chunk) => {
|
||||
size += chunk.length;
|
||||
hashStream.on('data', (streamChunk) => {
|
||||
size += streamChunk.length;
|
||||
|
||||
if (hasherReady) {
|
||||
hasher.write(chunk);
|
||||
hasher.write(streamChunk);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user