import config from 'config'; import { format } from 'date-fns'; import { CronJob } from 'cron'; import initLogger from './logger.js'; import { knexOwner as knex } from './knex.js'; import { searchApi, indexApi } from './manticore.js'; import chunk from '../utils/chunk.js'; import filterTitle from '../utils/filter-title.js'; const logger = initLogger(); export async function syncStashes(domain = 'scene', ids) { const stashes = await knex(`stashes_${domain}s`) .select( `stashes_${domain}s.id as stashed_id`, `stashes_${domain}s.${domain}_id`, 'stashes.id as stash_id', 'stashes.user_id as user_id', `stashes_${domain}s.created_at as created_at`, ) .modify((builder) => { if (ids) { builder.whereRaw(`stashes_${domain}s.${domain}_id = ANY(?)`, [ids]); } }) .leftJoin('stashes', 'stashes.id', `stashes_${domain}s.stash_id`); await chunk(stashes, 1000).reduce(async (chain, stashChunk, index) => { await chain; const stashDocs = stashChunk.map((stash) => ({ replace: { index: `${domain}s_stashed`, id: stash.stashed_id, doc: { [`${domain}_id`]: stash[`${domain}_id`], stash_id: stash.stash_id, user_id: stash.user_id, created_at: Math.round(stash.created_at.getTime() / 1000), }, }, })); await indexApi.bulk(stashDocs.map((doc) => JSON.stringify(doc)).join('\n')); logger.verbose(`Seeded ${index * 1000 + stashChunk.length}/${stashes.length} ${domain} stashes`); }, Promise.resolve()); // purge orphaned docs const itemIds = ids ?? [...new Set(stashes.map((s) => s[`${domain}_id`]))]; if (itemIds.length === 0) { return; } const validStashedIds = new Set(stashes.map((stash) => stash.stashed_id)); await chunk(itemIds, 1000).reduce(async (chain, itemIdChunk) => { await chain; const searchResponse = await searchApi.search({ index: `${domain}s_stashed`, query: { in: { [`${domain}_id`]: itemIdChunk, }, }, limit: 1000, }); const docs = searchResponse?.hits?.hits ?? []; const orphanedIds = docs .map((hit) => hit._id) .filter((manticoreId) => !validStashedIds.has(manticoreId)); if (orphanedIds.length === 0) { return; } const deleteDocs = orphanedIds.map((orphanId) => ({ delete: { index: `${domain}s_stashed`, id: orphanId, }, })); await indexApi.bulk(deleteDocs.map((doc) => JSON.stringify(doc)).join('\n')); logger.verbose(`Purged ${orphanedIds.length} orphaned ${domain} stash documents`); }, Promise.resolve()); } export async function syncManticoreScenes(sceneIds) { logger.info(`Updating Manticore search documents for ${sceneIds ? sceneIds.length : 'all' } scenes`); const scenes = await knex.raw(` SELECT releases.id AS id, releases.title, releases.created_at, releases.date, releases.shoot_id, scenes_meta.stashed, entities.id as channel_id, entities.slug as channel_slug, entities.name as channel_name, parents.id as network_id, parents.slug as network_slug, parents.name as network_name, studios.id as studio_id, studios.slug as studio_slug, studios.name as studio_name, grandparents.id as parent_network_id, COALESCE(JSON_AGG(DISTINCT (actors.id, actors.name)) FILTER (WHERE actors.id IS NOT NULL), '[]') as actors, COALESCE(JSON_AGG(DISTINCT (tags.id, tags.name, tags.priority, tags_aliases.name)) FILTER (WHERE tags.id IS NOT NULL), '[]') as tags, COALESCE(JSON_AGG(DISTINCT (movies.id, movies.title)) FILTER (WHERE movies.id IS NOT NULL), '[]') as movies, COALESCE(JSON_AGG(DISTINCT (series.id, series.title)) FILTER (WHERE series.id IS NOT NULL), '[]') as series, studios.showcased IS NOT false AND (entities.showcased IS NOT false OR COALESCE(studios.showcased, false) = true) AND (parents.showcased IS NOT false OR COALESCE(entities.showcased, false) = true OR COALESCE(studios.showcased, false) = true) AND (releases_summaries.batch_showcased IS NOT false) AS showcased, row_number() OVER (PARTITION BY releases.entry_id, parents.id ORDER BY releases.effective_date DESC) as dupe_index FROM releases LEFT JOIN releases_summaries ON releases_summaries.release_id = releases.id LEFT JOIN scenes_meta ON scenes_meta.scene_id = releases.id LEFT JOIN entities ON releases.entity_id = entities.id LEFT JOIN entities AS parents ON parents.id = entities.parent_id LEFT JOIN entities AS grandparents ON grandparents.id = parents.parent_id LEFT JOIN entities AS studios ON studios.id = releases.studio_id LEFT JOIN releases_actors AS local_actors ON local_actors.release_id = releases.id LEFT JOIN releases_directors AS local_directors ON local_directors.release_id = releases.id LEFT JOIN releases_tags AS local_tags ON local_tags.release_id = releases.id LEFT JOIN actors ON local_actors.actor_id = actors.id LEFT JOIN actors AS directors ON local_directors.director_id = directors.id LEFT JOIN tags ON local_tags.tag_id = tags.id LEFT JOIN tags as tags_aliases ON local_tags.tag_id = tags_aliases.alias_for AND tags_aliases.secondary = true LEFT JOIN movies_scenes ON movies_scenes.scene_id = releases.id LEFT JOIN movies ON movies.id = movies_scenes.movie_id LEFT JOIN series_scenes ON series_scenes.scene_id = releases.id LEFT JOIN series ON series.id = series_scenes.serie_id ${sceneIds ? 'WHERE releases.id = ANY(?)' : ''} GROUP BY releases.id, releases.title, releases.created_at, releases.date, releases.shoot_id, scenes_meta.stashed, releases_summaries.batch_showcased, entities.id, entities.name, entities.slug, entities.alias, entities.showcased, parents.id, parents.name, parents.slug, parents.alias, grandparents.id, studios.id, studios.name, studios.slug, parents.showcased, studios.showcased `, sceneIds && [sceneIds]); const scenesById = Object.fromEntries(scenes.rows.map((scene) => [scene.id, scene])); const docs = (sceneIds || Object.keys(scenesById)).map((sceneId) => { const scene = scenesById[sceneId]; if (!scene) { return { delete: { index: 'scenes', id: sceneId, }, }; } const flatActors = scene.actors.flatMap((actor) => actor.f2.split(' ')); const flatTags = scene.tags.filter((tag) => tag.f3 > 6).flatMap((tag) => [tag.f2].concat(tag.f4)).filter(Boolean); // only make top tags searchable to minimize cluttered results const filteredTitle = filterTitle(scene.title, [...flatActors, ...flatTags]); return { replace: { index: 'scenes', id: scene.id, doc: { title: scene.title || undefined, title_filtered: filteredTitle || undefined, date: scene.date ? Math.round(scene.date.getTime() / 1000) : undefined, created_at: Math.round(scene.created_at.getTime() / 1000), effective_date: Math.round((scene.date || scene.created_at).getTime() / 1000), is_showcased: scene.showcased, shoot_id: scene.shoot_id || undefined, channel_id: scene.channel_id, channel_slug: scene.channel_slug, channel_name: scene.channel_name, network_id: scene.network_id || undefined, network_slug: scene.network_slug || undefined, network_name: scene.network_name || undefined, studio_id: scene.studio_id || undefined, studio_slug: scene.studio_slug || undefined, studio_name: scene.studio_name || undefined, entity_ids: [scene.channel_id, scene.network_id, scene.parent_network_id, scene.studio_id].filter(Boolean), // manticore does not support OR, this allows IN actor_ids: scene.actors.map((actor) => actor.f1), actors: scene.actors.map((actor) => actor.f2).join(), tag_ids: scene.tags.map((tag) => tag.f1), tags: flatTags.join(' '), // only make top tags searchable to minimize cluttered results movie_ids: scene.movies.map((movie) => movie.f1), movies: scene.movies.map((movie) => movie.f2).join(' '), serie_ids: scene.series.map((serie) => serie.f1), series: scene.series.map((serie) => serie.f2).join(' '), meta: scene.date ? format(scene.date, 'y yy M MMM MMMM d') : undefined, stashed: scene.stashed || 0, dupe_index: scene.dupe_index || 0, }, }, }; }); if (docs.length === 0) { return null; } const [manticoreResult] = await Promise.all([ chunk(docs, 10000).reduce(async (chain, docsChunk, index, array) => { const acc = await chain; const data = await indexApi.bulk(docsChunk.map((doc) => JSON.stringify(doc)).join('\n')); logger.verbose(`Seeded ${index + 1}/${array.length}, errors: ${data.errors} ${data.error}`); return acc.concat(data.items); }, Promise.resolve([])), syncStashes('scene', sceneIds), ]); return manticoreResult; } export async function syncScenes(releaseIds) { await knex.raw('REFRESH MATERIALIZED VIEW scenes_meta;'); await syncManticoreScenes(releaseIds); } export async function syncManticoreMovies(movieIds) { logger.info(`Updating Manticore search documents for ${movieIds ? movieIds.length : 'all' } movies`); const movies = await knex.raw(` SELECT movies.id AS id, movies.title, movies.created_at, movies.date, movies_meta.stashed, entities.id as channel_id, entities.slug as channel_slug, entities.name as channel_name, parents.id as network_id, parents.slug as network_slug, parents.name as network_name, movies_covers IS NOT NULL as has_cover, COALESCE(JSON_AGG(DISTINCT (actors.id, actors.name)) FILTER (WHERE actors.id IS NOT NULL), '[]') as actors, COALESCE(JSON_AGG(DISTINCT (tags.id, tags.name, tags.priority, tags_aliases.name)) FILTER (WHERE tags.id IS NOT NULL), '[]') as tags, COALESCE(JSON_AGG(DISTINCT (movie_tags.id, movie_tags.name, movie_tags.priority, movie_tags_aliases.name)) FILTER (WHERE movie_tags.id IS NOT NULL), '[]') as movie_tags, row_number() OVER (PARTITION BY movies.entry_id, parents.id ORDER BY movies.effective_date DESC) as dupe_index FROM movies LEFT JOIN movies_meta ON movies_meta.movie_id = movies.id LEFT JOIN movies_scenes ON movies_scenes.movie_id = movies.id LEFT JOIN movies_tags ON movies_tags.movie_id = movies.id LEFT JOIN entities ON movies.entity_id = entities.id LEFT JOIN entities AS parents ON parents.id = entities.parent_id LEFT JOIN releases_actors AS local_actors ON local_actors.release_id = movies_scenes.scene_id LEFT JOIN releases_directors AS local_directors ON local_directors.release_id = movies_scenes.scene_id LEFT JOIN releases_tags AS local_tags ON local_tags.release_id = movies_scenes.scene_id LEFT JOIN actors ON local_actors.actor_id = actors.id LEFT JOIN actors AS directors ON local_directors.director_id = directors.id LEFT JOIN tags ON local_tags.tag_id = tags.id LEFT JOIN tags as tags_aliases ON local_tags.tag_id = tags_aliases.alias_for AND tags_aliases.secondary = true LEFT JOIN tags as movie_tags ON movies_tags.tag_id = movie_tags.id LEFT JOIN tags as movie_tags_aliases ON movies_tags.tag_id = movie_tags_aliases.alias_for AND movie_tags_aliases.secondary = true LEFT JOIN movies_covers ON movies_covers.movie_id = movies.id ${movieIds ? 'WHERE movies.id = ANY(?)' : ''} GROUP BY movies.id, movies.title, movies.created_at, movies.date, movies_meta.stashed, movies_meta.stashed_scenes, movies_meta.stashed_total, entities.id, entities.name, entities.slug, entities.alias, parents.id, parents.name, parents.slug, parents.alias, movies_covers.* `, movieIds && [movieIds]); const moviesById = Object.fromEntries(movies.rows.map((movie) => [movie.id, movie])); const docs = (movieIds || Object.keys(moviesById)).map((movieId) => { const movie = moviesById[movieId]; if (!movie) { return { delete: { index: 'movies', id: movieId, }, }; } const combinedTags = Object.values(Object.fromEntries(movie.tags.concat(movie.movie_tags).map((tag) => [tag.f1, { id: tag.f1, name: tag.f2, priority: tag.f3, alias: tag.f4, }]))); const flatActors = movie.actors.flatMap((actor) => actor.f2.match(/[\w']+/g)); // match word characters to filter out brackets etc. const flatTags = combinedTags.filter((tag) => tag.priority > 6).flatMap((tag) => (tag.alias ? `${tag.name} ${tag.alias}` : tag.name).match(/[\w']+/g)); // only make top tags searchable to minimize cluttered results const filteredTitle = movie.title && [...flatActors, ...flatTags].reduce((accTitle, tag) => accTitle.replace(new RegExp(tag.replace(/[^\w\s]+/g, ''), 'gi'), ''), movie.title).trim().replace(/\s{2,}/g, ' '); return { replace: { index: 'movies', id: movie.id, doc: { title: movie.title || undefined, title_filtered: filteredTitle || undefined, date: movie.date ? Math.round(movie.date.getTime() / 1000) : undefined, created_at: Math.round(movie.created_at.getTime() / 1000), effective_date: Math.round((movie.date || movie.created_at).getTime() / 1000), channel_id: movie.channel_id, channel_slug: movie.channel_slug, channel_name: movie.channel_name, network_id: movie.network_id || undefined, network_slug: movie.network_slug || undefined, network_name: movie.network_name || undefined, entity_ids: [movie.channel_id, movie.network_id].filter(Boolean), // manticore does not support OR, this allows IN actor_ids: movie.actors.map((actor) => actor.f1), actors: movie.actors.map((actor) => actor.f2).join(), tag_ids: combinedTags.map((tag) => tag.id), tags: flatTags.join(' '), has_cover: movie.has_cover, meta: movie.date ? format(movie.date, 'y yy M MMM MMMM d') : undefined, stashed: movie.stashed || 0, stashed_scenes: movie.stashed_scenes || 0, stashed_total: movie.stashed_total || 0, dupe_index: movie.dupe_index || 0, }, }, }; }); if (docs.length === 0) { return null; } return indexApi.bulk(docs.map((doc) => JSON.stringify(doc)).join('\n')); } export async function syncMovies(releaseIds) { await knex.raw('REFRESH MATERIALIZED VIEW movies_meta;'); await syncManticoreMovies(releaseIds); } export async function syncManticoreActors(actorIds) { logger.info(`Updating Manticore search documents for ${actorIds ? actorIds.length : 'all' } actors`); // manually select date of birth, otherwise it is retrieved in local timezone but interpreted as UTC... const actors = await knex.raw(` SELECT actors.*, actors_meta.*, date_of_birth AT TIME ZONE 'Europe/Amsterdam' AT TIME ZONE 'UTC' as dob FROM actors LEFT JOIN actors_meta ON actors_meta.actor_id = actors.id ${actorIds ? 'WHERE actors.id = ANY(?)' : ''} `, actorIds && [actorIds]); const actorsById = Object.fromEntries(actors.rows.map((actor) => [actor.id, actor])); const docs = (actorIds || Object.keys(actorsById)).map((actorId) => { const actor = actorsById[actorId]; if (!actor) { return { delete: { index: 'actors', id: actorId, }, }; } return { replace: { index: 'actors', id: actor.id, doc: { entity_id: actor.entity_id, name: actor.name, slug: actor.slug, gender: actor.gender || undefined, date_of_birth: actor.dob ? Math.round(actor.dob.getTime() / 1000) : undefined, has_avatar: !!actor.avatar_media_id, country: actor.birth_country_alpha2 || undefined, height: actor.height || undefined, mass: actor.weight || undefined, // weight is a reserved keyword in manticore cup: actor.cup || undefined, natural_boobs: actor.natural_boobs === null ? 0 : Number(actor.natural_boobs) + 1, // manticore bool does not seem to support null, and we need three states for natural_boobs: yes, no and unknown penis_length: actor.penis_length || undefined, penis_girth: actor.penis_girth || undefined, stashed: actor.stashed || 0, scenes: actor.scenes || 0, }, }, }; }); if (docs.length === 0) { return null; } return indexApi.bulk(docs.map((doc) => JSON.stringify(doc)).join('\n')); } export async function syncActors(actorIds) { await knex.raw('REFRESH MATERIALIZED VIEW actors_meta;'); await syncManticoreActors(actorIds); } function getQueueItemIds(tasks, domain) { return Array.from(new Set(tasks.filter((task) => task.domain === domain).flatMap((task) => task.item_ids))); } export async function syncQueue() { const tasks = await knex('sync'); const sceneIds = getQueueItemIds(tasks, 'scene'); const movieIds = getQueueItemIds(tasks, 'movie'); const actorIds = getQueueItemIds(tasks, 'actor'); await Promise.all([ syncScenes(sceneIds), syncMovies(movieIds), syncActors(actorIds), ]); await knex('sync') .whereIn('id', tasks.map((task) => task.id)) .delete(); logger[process.tasks > 0 ? 'info' : 'verbose'](`Processed ${tasks.length} sync items`); } CronJob.from({ cronTime: config.sync.crontab, async onTick() { syncQueue(); }, start: config.sync.enabled, runOnInit: true, });