From 2ac879d27600fe56a6e4c345096e2517d8fa75d7 Mon Sep 17 00:00:00 2001 From: Niels Simenon Date: Sun, 26 Apr 2020 03:51:59 +0200 Subject: [PATCH] Fixed Vixen scraper, using new token URL for trailers. --- src/media.js | 192 +++++++++++++++++++++++------------------- src/scrapers/vixen.js | 38 ++++++--- src/utils/buffer.js | 118 ++++++++++++++++++++++++++ src/utils/http.js | 7 +- src/utils/stream.js | 55 ++++++++++++ 5 files changed, 313 insertions(+), 97 deletions(-) create mode 100644 src/utils/buffer.js create mode 100644 src/utils/stream.js diff --git a/src/media.js b/src/media.js index 9cfa8379..64fe9efd 100644 --- a/src/media.js +++ b/src/media.js @@ -2,9 +2,12 @@ const config = require('config'); const Promise = require('bluebird'); -const fs = require('fs').promises; +const util = require('util'); +const fs = require('fs'); +const fsPromises = require('fs').promises; const path = require('path'); -const bhttp = require('bhttp'); +const stream = require('stream'); +const { once } = require('events'); const nanoid = require('nanoid/non-secure'); const mime = require('mime'); // const fileType = require('file-type'); @@ -17,63 +20,17 @@ const knex = require('./knex'); const http = require('./utils/http'); const { get } = require('./utils/qu'); -function getHash(buffer) { - const hash = blake2.createHash('blake2b', { digestLength: 24 }); - hash.update(buffer); +const PassThrough = stream.PassThrough; +const pipeline = util.promisify(stream.pipeline); - return hash.digest('hex'); +function getMemoryUsage() { + return process.memoryUsage().rss / (10 ** 6); } -async function getEntropy(buffer, source) { - try { - const { entropy } = await sharp(buffer).stats(); - - return entropy; - } catch (error) { - logger.warn(`Failed to retrieve image entropy, using 7.5 for ${source.src}: ${error.message}`); - - return 7.5; - } -} - -async function getMeta(buffer, source) { - try { - const { width, height, size } = await sharp(buffer).metadata(); - - return { - width, - height, - size, - }; - } catch (error) { - logger.warn(`Failed to retrieve image metadata from ${source.src}: ${error.message}`); - - return {}; - } -} - -async function getThumbnail(buffer, height = config.media.thumbnailSize) { - try { - const thumbnail = sharp(buffer) - .resize({ - height, - withoutEnlargement: true, - }) - .jpeg({ - quality: config.media.thumbnailQuality, - }) - .toBuffer(); - - return thumbnail; - } catch (error) { - logger.error(`Failed to create thumbnail: ${error.message}`); - } - - return null; -} +let peakMemoryUsage = getMemoryUsage(); function sampleMedias(medias, limit = config.media.limit, preferLast = true) { - // limit media sets, use extrax as fallbacks + // limit media sets, use extras as fallbacks if (medias.length <= limit) { return medias; } @@ -311,7 +268,7 @@ async function extractSource(baseSource, { existingExtractMediaByUrl }) { throw new Error(`Could not extract source from ${baseSource.url}: ${res.status}`); } -async function saveMedia(media) { +async function storeFile(media) { const hashDir = media.meta.hash.slice(0, 2); const hashSubDir = media.meta.hash.slice(2, 4); const hashFilename = media.meta.hash.slice(4); @@ -324,83 +281,137 @@ async function saveMedia(media) { const filepath = path.join(filedir, filename); if (media.meta.type === 'image') { - const thumbnail = await getThumbnail(media.file.buffer); - const thumbdir = path.join(media.role, 'thumbs', hashDir, hashSubDir); const thumbpath = path.join(thumbdir, filename); await Promise.all([ - fs.mkdir(path.join(config.media.path, filedir), { recursive: true }), - fs.mkdir(path.join(config.media.path, thumbdir), { recursive: true }), + fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }), + fsPromises.mkdir(path.join(config.media.path, thumbdir), { recursive: true }), ]); await Promise.all([ - fs.writeFile(path.join(config.media.path, filepath), media.file.buffer), - fs.writeFile(path.join(config.media.path, thumbpath), thumbnail), + fsPromises.rename(media.file.path, path.join(config.media.path, filepath)), + fsPromises.rename(media.file.thumbnail, path.join(config.media.path, thumbpath)), ]); return { ...media, file: { - // buffer is no longer needed, discard to free up memory path: filepath, thumbnail: thumbpath, }, }; } - await fs.mkdir(path.join(config.media.path, filedir), { recursive: true }); - await fs.writeFile(path.join(config.media.path, filepath), media.file.buffer); + await fsPromises.mkdir(path.join(config.media.path, filedir), { recursive: true }); + await fsPromises.rename(media.file.path, path.join(config.media.path, filepath)); + + logger.silly(`Stored media file at permanent location ${filepath}`); return { ...media, file: { - // buffer is no longer needed, discard to free up memory path: filepath, }, }; } -async function fetchSource(source) { +async function fetchSource(source, baseMedia) { logger.silly(`Fetching media from ${source.src}`); // attempts async function attempt(attempts = 1) { try { - const { pathname } = new URL(source.src); - const mimetype = mime.getType(pathname); - const extension = mime.getExtension(mimetype); - const type = mimetype?.split('/')[0] || 'image'; - - const res = await bhttp.get(source.src, { - headers: { - ...(source.referer && { referer: source.referer }), - ...(source.host && { host: source.host }), - }, - stream: true, + const res = await http.get(source.src, { + ...(source.referer && { referer: source.referer }), + ...(source.host && { host: source.host }), + }, { + stream: true, // sources are fetched in parallel, don't gobble up memory }); if (!res.ok) { throw new Error(`Response ${res.status} not OK`); } - const hash = getHash(res.body); - const entropy = type === 'image' ? await getEntropy(res.body) : null; - const { size, width, height } = type === 'image' ? await getMeta(res.body) : {}; + const { pathname } = new URL(source.src); + const mimetype = mime.getType(pathname); + const extension = mime.getExtension(mimetype); + const type = mimetype?.split('/')[0] || 'image'; - logger.silly(`Fetched media from ${source.src}`); + const hasher = new blake2.Hash('blake2b'); + hasher.setEncoding('hex'); + + const hashStream = new PassThrough(); + const metaStream = type === 'image' ? sharp() : new PassThrough(); + + const tempFilePath = path.join(config.media.path, 'temp', `${baseMedia.id}.${extension}`); + const tempThumbPath = path.join(config.media.path, 'temp', `${baseMedia.id}_thumb.${extension}`); + + const tempFileTarget = fs.createWriteStream(tempFilePath); + const tempThumbTarget = fs.createWriteStream(tempThumbPath); + + hashStream.on('data', chunk => hasher.write(chunk)); + + if (type === 'image') { + metaStream + .clone() + .resize({ + height: config.media.thumbnailSize, + withoutEnlargement: true, + }) + .jpeg({ quality: config.media.thumbnailQuality }) + .pipe(tempThumbTarget) + .on('error', error => logger.error(error)); + } + + // pipeline destroys streams + const infoPromise = type === 'image' ? once(metaStream, 'info') : Promise.resolve([{}]); + const metaPromise = type === 'image' ? metaStream.stats() : Promise.resolve(); + + await pipeline( + res.originalRes, + metaStream, + hashStream, + tempFileTarget, + ); + + /* + res.originalRes + .pipe(metaStream) + .pipe(hashStream) + .pipe(tempFileTarget); + */ + + logger.silly(`Temporarily saved media from ${source.src}`); + + const [stats, info] = await Promise.all([ + metaPromise, + infoPromise, + ]); + + logger.silly(`Ended pipeline for ${source.src}`); + + hasher.end(); + + const hash = hasher.read(); + const [{ size, width, height }] = info; + + peakMemoryUsage = Math.max(getMemoryUsage(), peakMemoryUsage); + + logger.silly(`Retrieved metadata from ${source.src}`); return { ...source, file: { - buffer: res.body, + path: tempFilePath, + thumbnail: tempThumbPath, }, meta: { mimetype, extension, type, hash, - entropy, + entropy: stats?.entropy, size, width, height, @@ -421,7 +432,7 @@ async function fetchSource(source) { return attempt(1); } -async function trySource(baseSource, existingMedias) { +async function trySource(baseSource, existingMedias, baseMedia) { // catch error and try the next source const extractedSource = await extractSource(baseSource, existingMedias); const existingSourceMedia = existingMedias.existingSourceMediaByUrl[extractedSource.src]; @@ -443,7 +454,7 @@ async function trySource(baseSource, existingMedias) { }; } - return fetchSource(extractedSource); + return fetchSource(extractedSource, baseMedia); } async function fetchMedia(baseMedia, existingMedias) { @@ -462,10 +473,17 @@ async function fetchMedia(baseMedia, existingMedias) { }; } + return storeFile({ + ...baseMedia, + ...source, + }); + + /* return saveMedia({ ...baseMedia, ...source, }); + */ } catch (error) { logger.warn(error.message); @@ -504,6 +522,8 @@ function curateMediaEntry(media, index) { } async function storeMedias(baseMedias) { + await fsPromises.mkdir(path.join(config.media.path, 'temp'), { recursive: true }); + const [existingSourceMediaByUrl, existingExtractMediaByUrl] = await findSourceDuplicates(baseMedias); const savedMedias = await Promise.map( @@ -581,6 +601,8 @@ async function associateReleaseMedia(releases) { await knex.raw(`${knex(`releases_${role}`).insert(associations)} ON CONFLICT DO NOTHING`); }, Promise.resolve()); + + logger.debug(`Peak media fetching memory usage: ${peakMemoryUsage.toFixed(2)} MB`); } module.exports = { diff --git a/src/scrapers/vixen.js b/src/scrapers/vixen.js index e36c3f64..33a08fa9 100644 --- a/src/scrapers/vixen.js +++ b/src/scrapers/vixen.js @@ -90,6 +90,32 @@ function scrapeUpcoming(scene, site) { return [release]; } +async function getTrailer(scene, site, url) { + const qualities = [360, 480, 720, 1080, 2160]; + + const tokenRes = await post(`${site.url}/api/__record_tknreq`, { + file: scene.previewVideoUrl1080P, + sizes: qualities.join('+'), + type: 'trailer', + }, { referer: url }); + + if (!tokenRes.ok) { + return null; + } + + const trailerUrl = `${site.url}/api${tokenRes.body.data.url}`; + const trailersRes = await post(trailerUrl, null, { referer: url }); + + if (trailersRes.ok) { + return qualities.map(quality => (trailersRes.body[quality] ? { + src: trailersRes.body[quality].token, + quality, + } : null)).filter(Boolean); + } + + return null; +} + async function scrapeScene(data, url, site, baseRelease) { const scene = data.video; @@ -116,16 +142,8 @@ async function scrapeScene(data, url, site, baseRelease) { release.teaser = getTeaserFallbacks(scene.previews.poster); - const qualities = [360, 480, 720, 1080, 2160]; - const trailersUrl = `${site.url}/api/__tkn/${scene.previewVideoUrl1080P}/trailer/${qualities.join('+')}`; - const trailersRes = await post(trailersUrl, null, { headers: { referer: url } }); - - if (trailersRes.code === 200) { - release.trailer = qualities.map(quality => (trailersRes.body[quality] ? { - src: trailersRes.body[quality].token, - quality, - } : null)).filter(Boolean); - } + const trailer = await getTrailer(scene, site, url); + if (trailer) release.trailer = trailer; return release; } diff --git a/src/utils/buffer.js b/src/utils/buffer.js new file mode 100644 index 00000000..bdaa84cd --- /dev/null +++ b/src/utils/buffer.js @@ -0,0 +1,118 @@ +'use strict'; + +const bhttp = require('bhttp'); +const Promise = require('bluebird'); +const fsPromises = require('fs').promises; +const fs = require('fs'); +const { PassThrough } = require('stream'); +const blake2 = require('blake2'); +const argv = require('yargs').argv; + +const file = 'https://speed.hetzner.de/100MB.bin'; +// const file = 'https://speed.hetzner.de/1GB.bin'; +// const file = 'https://speed.hetzner.de/10GB.bin'; + +function getMemoryUsage() { + return process.memoryUsage().rss / (10 ** 6); +} + +const stats = { + peakMemoryUsage: getMemoryUsage(), + done: false, + downloads: {}, +}; + +function render() { + const downloads = Object.entries(stats.downloads); + + process.stdout.clearScreenDown(); + + process.stdout.write(`peak memory: ${stats.peakMemoryUsage.toFixed(2)} MB\n`); + + downloads.forEach(([download, progress]) => { + process.stdout.write(`${download}: ${progress}${typeof progress === 'string' ? '' : '%'}\n`); + }); + + process.stdout.moveCursor(0, -(downloads.length + 1)); + process.stdout.cursorTo(0); + + if (downloads.length === 0 || !downloads.every(([_label, download]) => typeof download === 'string')) { + setTimeout(() => render(), 1000); + return; + } + + process.stdout.moveCursor(0, downloads.length + 1); +} + +function setProgress(label, completedBytes, totalBytes, hash) { + const memory = getMemoryUsage(); + + stats.peakMemoryUsage = Math.max(memory, stats.peakMemoryUsage); + stats.downloads[label] = hash || Math.round((completedBytes / totalBytes) * 100); +} + +async function buffered(label) { + const hash = new blake2.Hash('blake2b'); + + const imageRes = await bhttp.get(file, { + onDownloadProgress(completedBytes, totalBytes) { + setProgress(label, completedBytes, totalBytes); + }, + }); + + hash.update(imageRes.body); + setProgress(label, null, null, hash.digest('hex')); + + await fsPromises.writeFile(`/mnt/stor/Pictures/traxxx/temp/buffered-${label}.bin`, imageRes.body); +} + +async function streamed(label) { + const hash = new blake2.Hash('blake2b'); + hash.setEncoding('hex'); + + const hashStream = new PassThrough(); + const targetStream = fs.createWriteStream(`/mnt/stor/Pictures/traxxx/temp/streamed-${label}.bin`); + + const imageRes = await bhttp.get(file, { + stream: true, + }); + + const stream = imageRes + .pipe(hashStream) + .pipe(targetStream); + + imageRes.on('progress', (completedBytes, totalBytes) => { + setProgress(label, completedBytes, totalBytes); + }); + + hashStream.on('data', (chunk) => { + hash.write(chunk); + }); + + stream.on('finish', () => { + hash.end(); + setProgress(label, null, null, hash.read()); + }); +} + +async function init() { + const n = argv.n || 1; + + if (argv._.includes('stream')) { + console.log('using streams'); + render(); + + await Promise.map(Array.from({ length: n }), async (value, index) => streamed(index + 1)); + + return; + } + + if (argv._.includes('buffer')) { + console.log('using buffers'); + render(); + + await Promise.map(Array.from({ length: n }), async (value, index) => buffered(index + 1)); + } +} + +init(); diff --git a/src/utils/http.js b/src/utils/http.js index 71acd3f9..b49e71d6 100644 --- a/src/utils/http.js +++ b/src/utils/http.js @@ -45,7 +45,7 @@ queue.define('http', async ({ options = {}, }) => { if (body) { - logger.silly(`${method.toUpperCase()} ${url} with ${body}`); + logger.silly(`${method.toUpperCase()} ${url} with ${JSON.stringify(body)}`); } else { logger.silly(`${method.toUpperCase()} ${url}`); } @@ -73,8 +73,10 @@ queue.define('http', async ({ return { ...res, + originalRes: res, html, json, + pipe: res.pipe, ok: res.statusCode >= 200 && res.statusCode <= 299, code: res.statusCode, status: res.statusCode, @@ -85,7 +87,7 @@ queue.define('http', async ({ async function get(url, headers, options) { return queue.push('http', { - method: 'get', + method: 'GET', url, headers, options, @@ -94,6 +96,7 @@ async function get(url, headers, options) { async function post(url, body, headers, options) { return queue.push('http', { + method: 'POST', url, body, headers, diff --git a/src/utils/stream.js b/src/utils/stream.js new file mode 100644 index 00000000..e51c820a --- /dev/null +++ b/src/utils/stream.js @@ -0,0 +1,55 @@ +'use strict'; + +const config = require('config'); +const { PassThrough } = require('stream'); +const fs = require('fs'); +const path = require('path'); +const bhttp = require('bhttp'); +const blake2 = require('blake2'); +const sharp = require('sharp'); + +const url = 'https://thumbs.julesjordan.com/trial/content//upload/dl03/julesjordan/oil_overload_16_scene2//photos/alina_lopez_jules_jordan_com_77.jpg'; + +async function init() { + const hash = new blake2.Hash('blake2b'); + hash.setEncoding('hex'); + + const res = await bhttp.get(url, { + stream: true, + }); + + const metaStream = sharp(); + const hashStream = new PassThrough(); + const target = fs.createWriteStream(path.join(config.media.path, 'temp', 'alina.jpg')); + const thumbTarget = fs.createWriteStream(path.join(config.media.path, 'temp', 'alina_thumb.jpg')); + + hashStream.on('data', (chunk) => { + hash.write(chunk); + }); + + metaStream.clone() + .resize(320) + .pipe(thumbTarget); + + const stream = res + .pipe(metaStream) + .pipe(hashStream) + .pipe(target); + + stream.on('finish', () => { + hash.end(); + const digest = hash.read(); + + console.log('stream', digest); + }); + + metaStream.on('info', (info) => { + console.log('info', info); + }); + + const stats = await metaStream.stats(); + + console.log('stats', stats); +} + +init();