Associating actors without network.

This commit is contained in:
2020-05-13 02:56:20 +02:00
parent 5a82e769c7
commit 6040a3f41f
53 changed files with 245 additions and 533 deletions

View File

@@ -1,6 +1,6 @@
'use strict';
const logger = require('./logger')(__filename);
// const logger = require('./logger')(__filename);
const knex = require('./knex');
const slugify = require('./utils/slugify');
const capitalize = require('./utils/capitalize');
@@ -13,9 +13,7 @@ function toBaseActors(actorsOrNames, release) {
const baseActor = {
name,
slug,
hasSingleName: name.split(/\s+/).length === 1,
network: release.site.network,
slugWithNetworkSlug: `${slug}-${release.site.network.slug}`,
};
if (actorOrName.name) {
@@ -29,39 +27,38 @@ function toBaseActors(actorsOrNames, release) {
});
}
function curateActorEntry(baseActor) {
if (baseActor.hasSingleName) {
logger.warn(`Assigning single name actor '${baseActor.name}' to network '${baseActor.network.name}'`);
// attach network ID to allow separating actors with the same name
return {
name: baseActor.name,
slug: baseActor.slugWithNetworkSlug,
network_id: baseActor.network.id,
};
}
function curateActorEntry(baseActor, batchId) {
return {
name: baseActor.name,
slug: baseActor.slug,
network_id: null,
batch_id: batchId,
};
}
function curateActorEntries(baseActors) {
return baseActors.map(baseActor => curateActorEntry(baseActor));
function curateActorEntries(baseActors, batchId) {
return baseActors.map(baseActor => curateActorEntry(baseActor, batchId));
}
async function getOrCreateActors(baseActors) {
async function getOrCreateActors(baseActors, batchId) {
const existingActors = await knex('actors')
.select('id', 'name', 'slug', 'network_id')
.whereIn('slug', baseActors.map(baseActor => baseActor.slug))
.whereNull('network_id')
.orWhereIn(['slug', 'network_id'], baseActors.map(baseActor => [baseActor.slugWithNetworkSlug, baseActor.network.id]));
.orWhereIn(['slug', 'network_id'], baseActors.map(baseActor => [baseActor.slug, baseActor.network.id]));
const existingActorSlugs = new Set(existingActors.map(actor => actor.slug));
const uniqueBaseActors = baseActors.filter(baseActor => !existingActorSlugs.has(baseActor.slug) && !existingActorSlugs.has(baseActor.slugWithNetworkSlug));
// const existingActorSlugs = new Set(existingActors.map(actor => actor.slug));
const existingActorSlugs = existingActors.reduce((acc, actor) => ({
...acc,
[actor.network_id]: {
...acc[actor.network_id],
[actor.slug]: true,
},
}), {});
const curatedActorEntries = curateActorEntries(uniqueBaseActors);
const uniqueBaseActors = baseActors.filter(baseActor => !existingActorSlugs[baseActor.network.id]?.[baseActor.slug] && !existingActorSlugs.null?.[baseActor.slug]);
const curatedActorEntries = curateActorEntries(uniqueBaseActors, batchId);
const newActors = await knex('actors').insert(curatedActorEntries, ['id', 'name', 'slug', 'network_id']);
if (Array.isArray(newActors)) {
@@ -71,7 +68,7 @@ async function getOrCreateActors(baseActors) {
return existingActors;
}
async function associateActors(releases) {
async function associateActors(releases, batchId) {
const baseActorsByReleaseId = releases.reduce((acc, release) => {
if (release.actors) {
acc[release.id] = toBaseActors(release.actors, release);
@@ -86,17 +83,32 @@ async function associateActors(releases) {
return;
}
const baseActorsBySlug = baseActors.reduce((acc, baseActor) => ({ ...acc, [baseActor.slug]: baseActor }), {});
const uniqueBaseActors = Object.values(baseActorsBySlug);
// const baseActorsBySlug = baseActors.reduce((acc, baseActor) => ({ ...acc, [baseActor.slug]: baseActor }), {});
const baseActorsBySlugAndNetworkId = baseActors.reduce((acc, baseActor) => ({
...acc,
[baseActor.slug]: {
...acc[baseActor.slug],
[baseActor.network.id]: baseActor,
},
}), {});
const actors = await getOrCreateActors(uniqueBaseActors);
const actorIdsBySlug = actors.reduce((acc, actor) => ({ ...acc, [actor.slug]: actor.id }), {});
const uniqueBaseActors = Object.values(baseActorsBySlugAndNetworkId).map(baseActorsByNetworkId => Object.values(baseActorsByNetworkId)).flat();
const actors = await getOrCreateActors(uniqueBaseActors, batchId);
// const actorIdsBySlug = actors.reduce((acc, actor) => ({ ...acc, [actor.slug]: actor.id }), {});
const actorIdsBySlugAndNetworkId = actors.reduce((acc, actor) => ({
...acc,
[actor.network_id]: {
...acc[actor.network_id],
[actor.slug]: actor.id,
},
}), {});
const releaseActorAssociations = Object.entries(baseActorsByReleaseId)
.map(([releaseId, releaseActors]) => releaseActors
.map(releaseActor => ({
release_id: releaseId,
actor_id: actorIdsBySlug[releaseActor.slug] || actorIdsBySlug[releaseActor.slugWithNetworkSlug],
actor_id: actorIdsBySlugAndNetworkId[releaseActor.network.id]?.[releaseActor.slug] || actorIdsBySlugAndNetworkId.null[releaseActor.slug],
})))
.flat();

View File

@@ -1,445 +0,0 @@
'use strict';
const config = require('config');
const Promise = require('bluebird');
// const bhttp = require('bhttp');
const mime = require('mime');
const fs = require('fs-extra');
const sharp = require('sharp');
const path = require('path');
const blake2 = require('blake2');
const argv = require('./argv');
const logger = require('./logger')(__filename);
const knex = require('./knex');
const { get } = require('./utils/http');
const { ex } = require('./utils/q');
const chunk = require('./utils/chunk');
function getHash(buffer) {
const hash = blake2.createHash('blake2b', { digestLength: 24 });
hash.update(buffer);
return hash.digest('hex');
}
async function getMeta(buffer, withHash = false) {
try {
const { entropy } = await sharp(buffer).stats();
const { width, height, size } = await sharp(buffer).metadata();
const hash = withHash && getHash(buffer);
return {
width,
height,
size,
entropy,
hash,
};
} catch (error) {
logger.warn(`Failed to retrieve image entropy, using 7.5: ${error.message}`);
return 7.5;
}
}
async function createThumbnail(buffer, height = config.media.thumbnailSize) {
try {
const thumbnail = sharp(buffer)
.resize({
height,
withoutEnlargement: true,
})
.jpeg({
quality: config.media.thumbnailQuality,
})
.toBuffer();
return thumbnail;
} catch (error) {
logger.error(`Failed to create thumbnail: ${error.message}`);
}
return null;
}
function groupFallbacksByPriority(chunks) {
/*
Chunks naturally give priority to all of the first item's fallbacks, generally lower quality images.
This function ensures every item's first source is tried, before trying every item's second source, etc., example:
IN: [[1, 2, 3,], 10, [1, 2, 3, 4, 5], [1, 2, 3]]
OUT [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4], [5]]
*/
return chunks.map(group => group.reduce((acc, item) => {
if (Array.isArray(item)) {
// place provided fallbacks at same index (priority) in parent array
item.forEach((fallback, fallbackIndex) => {
if (!acc[fallbackIndex]) {
acc[fallbackIndex] = [];
}
acc[fallbackIndex].push(fallback);
});
return acc;
}
// no fallbacks provided, first priority
if (!acc[0]) {
acc[0] = [];
}
acc[0].push(item);
return acc;
}, []).flat());
}
function pluckItems(items, specifiedLimit, asFallbacks = true) {
const limit = specifiedLimit || argv.mediaLimit;
if (!items || items.length <= limit) return items;
if (asFallbacks) {
const chunks = chunk(items, Math.ceil(items.length / limit));
const fallbacks = groupFallbacksByPriority(chunks);
return fallbacks;
}
const plucked = [1]
.concat(
Array.from({ length: limit - 1 }, (value, index) => Math.round((index + 1) * (items.length / (limit - 1)))),
);
return Array.from(new Set(plucked)).map(itemIndex => items[itemIndex - 1]); // remove duplicates, may happen when photo total and photo limit are close
}
function pickQuality(items) {
const itemsByQuality = items.reduce((acc, item) => ({ ...acc, [item.quality]: item }), {});
const item = config.media.videoQuality.reduce((acc, quality) => acc || itemsByQuality[quality], null);
return item || items[0];
}
async function extractItem(source) {
// const res = await bhttp.get(source.src);
const res = await get(source.src);
if (res.statusCode === 200) {
const { qu } = ex(res.body.toString());
return source.extract(qu);
}
return null;
}
async function fetchSource(source, domain, role) {
logger.silly(`Fetching ${domain} ${role} from ${source.src || source}`);
// const res = await bhttp.get(source.src || source);
const res = await get(source.src || source, {
headers: {
...(source.referer && { referer: source.referer }),
...(source.host && { host: source.host }),
},
});
if (res.statusCode === 200) {
const { pathname } = new URL(source.src || source);
const mimetype = mime.getType(pathname);
const extension = mime.getExtension(mimetype);
const hash = getHash(res.body);
const { entropy, size, width, height } = /image/.test(mimetype) ? await getMeta(res.body) : {};
logger.silly(`Fetched media item from ${source.src || source}`);
return {
file: res.body,
mimetype,
extension,
hash,
entropy: entropy || null,
size: size || null,
width: width || null,
height: height || null,
quality: source.quality || null,
source: source.src || source,
scraper: source.scraper,
copyright: source.copyright,
};
}
throw new Error(`Response ${res.statusCode} not OK`);
}
async function fetchItem(source, index, existingItemsBySource, domain, role, attempt = 1, originalSource = null, sourceIndex = 0) {
try {
if (!source) {
throw new Error(`Empty ${domain} ${role} source in ${originalSource}`);
}
if (Array.isArray(source)) {
if (source.every(sourceX => sourceX.quality)) {
// various video qualities provided
const selectedSource = pickQuality(source);
return fetchItem(selectedSource, index, existingItemsBySource, domain, role, attempt, originalSource);
}
// fallbacks provided
return source.reduce((outcome, sourceX, sourceIndexX) => outcome.catch(
async () => fetchItem(sourceX, index, existingItemsBySource, domain, role, attempt, source, sourceIndexX),
), Promise.reject(new Error()));
}
if (source.src && source.extract) {
// source links to page containing a (presumably) tokenized photo
const itemSource = await extractItem(source);
return fetchItem(itemSource, index, existingItemsBySource, domain, role, attempt, source, sourceIndex);
}
if (existingItemsBySource[source]) {
return null;
}
return await fetchSource(source, domain, role, originalSource);
} catch (error) {
logger.warn(`Failed attempt ${attempt}/3 to fetch ${domain} ${role} ${index + 1} (${source.src || source}): ${error}`);
if (source && attempt < 3) {
// only retry if source is provided at all
await Promise.delay(5000);
return fetchItem(source, index, existingItemsBySource, domain, role, attempt + 1, originalSource, sourceIndex);
}
if (originalSource && sourceIndex < originalSource.length - 1) {
throw error; // gets caught to try next source
}
return null;
}
}
async function fetchItems(itemSources, existingItemsBySource, domain, role) {
return Promise.map(itemSources, async (source, index) => fetchItem(source, index, existingItemsBySource, domain, role)).filter(Boolean);
}
async function saveItems(items, domain, role) {
return Promise.map(items, async (item) => {
try {
const dir = item.hash.slice(0, 2);
const subdir = item.hash.slice(2, 4);
const filename = item.quality
? `${item.hash.slice(4)}_${item.quality}.${item.extension}`
: `${item.hash.slice(4)}.${item.extension}`;
const filedir = path.join(`${role}s`, dir, subdir);
const filepath = path.join(filedir, filename);
await fs.mkdir(path.join(config.media.path, filedir), { recursive: true });
await fs.writeFile(path.join(config.media.path, filepath), item.file);
if (/image/.test(item.mimetype)) {
const thumbnail = await createThumbnail(item.file);
const thumbdir = path.join(`${role}s`, 'thumbs', dir, subdir);
const thumbpath = path.join(thumbdir, filename);
await fs.mkdir(path.join(config.media.path, thumbdir), { recursive: true });
await fs.writeFile(path.join(config.media.path, thumbpath), thumbnail);
logger.verbose(`Saved ${domain} ${role} with thumbnail to ${filepath}`);
return {
thumbnail,
filepath,
thumbpath,
mimetype: item.mimetype,
extension: item.extension,
hash: item.hash,
size: item.size,
width: item.width,
height: item.height,
quality: item.quality,
entropy: item.entropy,
scraper: item.scraper,
copyright: item.copyright,
source: item.source,
};
}
logger.verbose(`Saved ${domain} ${role} to ${filepath}`);
return {
filepath,
mimetype: item.mimetype,
extension: item.extension,
hash: item.hash,
size: item.size,
width: item.width,
height: item.height,
quality: item.quality,
entropy: item.entropy,
scraper: item.scraper,
copyright: item.copyright,
source: item.source,
};
} catch (error) {
logger.error(`Failed to store ${domain} ${role} from ${item.source}: ${error.message}`);
return null;
}
});
}
function curateItemEntries(items) {
return items.filter(Boolean).map((item, index) => ({
path: item.filepath,
thumbnail: item.thumbpath,
mime: item.mimetype,
hash: item.hash,
size: item.size,
width: item.width,
height: item.height,
quality: item.quality,
entropy: item.entropy,
source: item.source,
scraper: item.scraper,
copyright: item.copyright,
index,
}));
}
function groupItems(items) {
return items.reduce((acc, item) => ({
source: { ...acc.source, [item.source]: item },
hash: { ...acc.hash, [item.hash]: item },
}), {
source: {},
hash: {},
});
}
async function storeMedia(sources, domain, role, { entropyFilter = 2.5 } = {}) {
const presentSources = sources.filter(source => typeof source === 'string' || Array.isArray(source) || (source && source.src));
if (presentSources.length === 0) {
return {};
}
// split up source list to prevent excessive RAM usage
const itemChunksBySource = await Promise.all(chunk(presentSources, 50).map(async (sourceChunk, index) => {
try {
// find source duplicates that don't need to be re-downloaded or re-saved
const existingSourceItems = await knex('media').whereIn('source', sourceChunk.flat().map(source => source.src || source));
const { source: existingSourceItemsBySource, hash: existingSourceItemsByHash } = groupItems(existingSourceItems);
// download media items from new sources
const fetchedItems = await fetchItems(sourceChunk, existingSourceItemsBySource, domain, role);
const { hash: fetchedItemsByHash } = groupItems(fetchedItems);
// find hash duplicates that don't need to be re-saved
const uniqueFetchedItems = Object.values(fetchedItemsByHash).filter(item => !entropyFilter || item.entropy === null || item.entropy >= entropyFilter);
const existingHashItems = await knex('media').whereIn('hash', uniqueFetchedItems.map(item => item.hash));
const { hash: existingHashItemsByHash } = groupItems(existingHashItems);
// save new items to disk
const newItems = uniqueFetchedItems.filter(item => !existingHashItemsByHash[item.hash]);
const savedItems = await saveItems(newItems, domain, role);
// store new items in database
const curatedItemEntries = curateItemEntries(savedItems);
const storedItems = await knex('media').insert(curatedItemEntries).returning('*');
const { hash: storedItemsByHash } = groupItems(Array.isArray(storedItems) ? storedItems : []);
// accumulate existing and new items by source to be mapped onto releases
const itemsByHash = { ...existingSourceItemsByHash, ...existingHashItemsByHash, ...storedItemsByHash };
const itemsBySource = {
...existingSourceItemsBySource,
...fetchedItems.reduce((acc, item) => ({ ...acc, [item.source]: itemsByHash[item.hash] }), {}),
};
logger.info(`Stored batch ${index + 1} with ${fetchedItems.length} of new ${domain} ${role}s`);
return itemsBySource;
} catch (error) {
logger.error(`Failed to store ${domain} ${role} batch ${index + 1}: ${error.message}`);
return null;
}
}));
return itemChunksBySource.reduce((acc, itemChunk) => ({ ...acc, ...itemChunk }), {});
}
function extractPrimaryItem(associations, targetId, role, primaryRole, primaryItemsByTargetId) {
if (!primaryRole) {
return { [role]: associations, [primaryRole]: null };
}
if (primaryItemsByTargetId[targetId]) {
const remainingAssociations = associations.filter(association => association.media_id !== primaryItemsByTargetId[targetId].media_id);
return { [role]: remainingAssociations, [primaryRole]: null };
}
return {
[role]: associations.slice(1),
[primaryRole]: associations.slice(0, 1)[0],
};
}
function associateTargetMedia(targetId, sources, mediaBySource, domain, role, primaryRole, primaryItemsByTargetId) {
if (!sources) return { [role]: null, [primaryRole]: null };
const mediaIds = sources
.map((source) => {
if (!source) return null;
if (Array.isArray(source)) {
const availableSource = source.find(fallbackSource => mediaBySource[fallbackSource.src || fallbackSource]);
return mediaBySource[availableSource];
}
return mediaBySource[source.src || source];
})
.filter(Boolean)
// .sort((mediaItemA, mediaItemB) => mediaItemB.height - mediaItemA.height) // prefer high res images for primary item
.map(mediaItem => mediaItem.id);
const uniqueMediaIds = Array.from(new Set(mediaIds));
const associations = uniqueMediaIds.map(mediaId => ({ [`${domain}_id`]: targetId, media_id: mediaId }));
logger.silly(`Associating ${associations.length} ${role}s to ${domain} ${targetId}`);
return extractPrimaryItem(associations, targetId, role, primaryRole, primaryItemsByTargetId);
}
async function associateMedia(sourcesByTargetId, mediaBySource, domain, role, primaryRole) {
const primaryItems = primaryRole ? await knex(`${domain}s_${primaryRole}s`).whereIn(`${domain}_id`, Object.keys(sourcesByTargetId)) : [];
const primaryItemsByTargetId = primaryItems.reduce((acc, item) => ({ ...acc, [item[`${domain}_id`]]: item }), {});
const associationsPerTarget = await Promise.map(Object.entries(sourcesByTargetId), ([targetId, sources]) => associateTargetMedia(targetId, sources, mediaBySource, domain, role, primaryRole, primaryItemsByTargetId));
const associations = associationsPerTarget.map(association => association[role]).flat().filter(Boolean);
const primaryAssociations = associationsPerTarget.map(association => association[primaryRole]).filter(Boolean);
logger.info(`Associated ${associations.length} ${role}s to ${domain}s`);
if (primaryRole) logger.info(`Associated ${primaryAssociations.length} extracted ${primaryRole}s to ${domain}s`);
return Promise.all([
(associations.length > 0 && knex.raw(`${knex(`${domain}s_${role}s`).insert(associations).toString()} ON CONFLICT DO NOTHING`)),
(primaryAssociations.length > 0 && knex.raw(`${knex(`${domain}s_${primaryRole}s`).insert(primaryAssociations).toString()} ON CONFLICT DO NOTHING`)),
]);
}
module.exports = {
associateMedia,
createThumbnail,
getHash,
getMeta,
pluckItems,
storeMedia,
};

View File

@@ -114,7 +114,7 @@ module.exports = {
perfectgonzo,
pervcity,
pimpxxx: cherrypimps,
ornpros: whalemember,
pornpros: whalemember,
private: privateNetwork,
puretaboo,
realitykings,

View File

@@ -214,11 +214,13 @@ async function storeReleases(releases) {
const releasesWithId = attachReleaseIds([].concat(uniqueReleases, duplicateReleases), [].concat(storedReleaseEntries, duplicateReleaseEntries));
await Promise.all([
associateActors(releasesWithId),
associateActors(releasesWithId, batchId),
associateReleaseTags(releasesWithId),
associateReleaseMedia(releasesWithId),
]);
// media is more error-prone, associate separately
await associateReleaseMedia(releasesWithId);
logger.info(`Stored ${storedReleaseEntries.length} releases`);
await updateReleasesSearch(releasesWithId.map(release => release.id));