Refactored media fetch with undici for http/2 support.
This commit is contained in:
@@ -3,9 +3,11 @@
|
||||
const config = require('config');
|
||||
const Promise = require('bluebird');
|
||||
const bhttp = require('bhttp');
|
||||
const undici = require('undici');
|
||||
const fs = require('fs').promises;
|
||||
const util = require('util');
|
||||
const stream = require('stream');
|
||||
// const util = require('util');
|
||||
// const stream = require('stream');
|
||||
const { pipeline } = require('stream/promises');
|
||||
const tunnel = require('tunnel');
|
||||
const Bottleneck = require('bottleneck');
|
||||
const { JSDOM, toughCookie } = require('jsdom');
|
||||
@@ -18,7 +20,7 @@ const logger = require('../logger')(__filename);
|
||||
const virtualConsole = require('./virtual-console')(__filename);
|
||||
const argv = require('../argv');
|
||||
|
||||
const pipeline = util.promisify(stream.pipeline);
|
||||
// const pipeline = util.promisify(stream.pipeline);
|
||||
|
||||
const limiters = {
|
||||
bypass: new Bottleneck({
|
||||
@@ -47,13 +49,6 @@ const defaultOptions = {
|
||||
},
|
||||
};
|
||||
|
||||
const proxyAgent = tunnel.httpsOverHttp({
|
||||
proxy: {
|
||||
host: config.proxy.host,
|
||||
port: config.proxy.port,
|
||||
},
|
||||
});
|
||||
|
||||
function useProxy(url) {
|
||||
if (!config.proxy.enable) {
|
||||
return false;
|
||||
@@ -326,87 +321,103 @@ async function bypassCloudflareRequest(url, method, body, cloudflareBypass, opti
|
||||
};
|
||||
}
|
||||
|
||||
async function request(method = 'get', url, body, requestOptions = {}, limiter) {
|
||||
const http = requestOptions.session || bhttp;
|
||||
const defaultAgent = new undici.Agent({
|
||||
allowH2: true,
|
||||
connect: {
|
||||
rejectUnauthorized: false,
|
||||
},
|
||||
});
|
||||
|
||||
const options = {
|
||||
...requestOptions,
|
||||
session: null,
|
||||
};
|
||||
const proxyAgent = tunnel.httpsOverHttp({
|
||||
proxy: {
|
||||
host: config.proxy.host,
|
||||
port: config.proxy.port,
|
||||
},
|
||||
});
|
||||
|
||||
async function request(method = 'get', url, body, requestOptions = {}, limiter, redirects = 0) {
|
||||
const withProxy = useProxy(url);
|
||||
const withBrowserBypass = useBrowserBypass(url, options);
|
||||
const withCloudflareBypass = useCloudflareBypass(url, options);
|
||||
const withBrowserBypass = useBrowserBypass(url, requestOptions);
|
||||
const withCloudflareBypass = useCloudflareBypass(url, requestOptions);
|
||||
|
||||
if (withProxy) {
|
||||
options.agent = proxyAgent;
|
||||
}
|
||||
|
||||
logger.debug(`${method.toUpperCase()} (${limiter._store.storeOptions.minTime}ms/${limiter._store.storeOptions.maxConcurrent}p${withProxy ? ' proxy' : ''}${withBrowserBypass || withCloudflareBypass ? ' bypass' : ''}) ${url}`);
|
||||
logger.debug(`${redirects > 0 ? 'REDIRECT ' : ''}${method.toUpperCase()} (${limiter._store.storeOptions.minTime}ms/${limiter._store.storeOptions.maxConcurrent}p${withProxy ? ' proxy' : ''}${withBrowserBypass || withCloudflareBypass ? ' bypass' : ''}) ${url}`);
|
||||
|
||||
if (withBrowserBypass) {
|
||||
if (method !== 'get') {
|
||||
throw new Error('Browser bypass only supports GET');
|
||||
}
|
||||
|
||||
return bypassBrowserRequest(url, options);
|
||||
if (method !== 'get') throw new Error('Browser bypass only supports GET');
|
||||
return bypassBrowserRequest(url, requestOptions);
|
||||
}
|
||||
|
||||
if (withCloudflareBypass) {
|
||||
return bypassCloudflareRequest(url, method, body, withCloudflareBypass, options);
|
||||
return bypassCloudflareRequest(url, method, body, withCloudflareBypass, requestOptions);
|
||||
}
|
||||
|
||||
const res = await (body
|
||||
? http[method](url, body, options)
|
||||
: http[method](url, options));
|
||||
const headers = {
|
||||
...requestOptions.headers,
|
||||
};
|
||||
|
||||
const res = await undici.request(url, {
|
||||
method: method.toUpperCase(),
|
||||
headers,
|
||||
body: body ?? null,
|
||||
dispatcher: withProxy
|
||||
? proxyAgent
|
||||
: defaultAgent,
|
||||
maxRedirections: 0, // handle manually
|
||||
});
|
||||
|
||||
if (res.headers.location && redirects < 3) {
|
||||
// Drain the body to free the socket before redirecting
|
||||
await res.body.dump();
|
||||
const nextUrl = new URL(res.headers.location, url).href;
|
||||
return request(method, nextUrl, body, requestOptions, limiter, redirects + 1);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
async function finalizeResult(res, options) {
|
||||
async function finalizeResult(res, url, options) {
|
||||
if (options.destination) {
|
||||
// res.on('progress', (bytes, totalBytes) => logger.silly(`Downloaded ${Math.round((bytes / totalBytes) * 100)}% of ${url}`));
|
||||
|
||||
await pipeline(res, ...(options.transforms || []), options.destination);
|
||||
}
|
||||
|
||||
if (Buffer.isBuffer(res.body)) {
|
||||
const html = res.body.toString();
|
||||
const window = options?.parse ? new JSDOM(html, { virtualConsole, ...options.extract }).window : null;
|
||||
const pathname = new URL(res.request.url).pathname.replace(/\//g, '_');
|
||||
|
||||
// allow window.close to be called after scraping is done, only for deep scrapes where the URL is known outside the scraper
|
||||
if (window && /fetchScene|fetchMovie/.test(new Error().stack)) {
|
||||
windows.set(pathname, window);
|
||||
}
|
||||
|
||||
if (argv.saveHtml) {
|
||||
await fs.writeFile(`./html/${pathname}.html`, html);
|
||||
}
|
||||
await pipeline(
|
||||
res.body,
|
||||
...(options.transforms || []),
|
||||
options.destination,
|
||||
);
|
||||
|
||||
return {
|
||||
...res,
|
||||
body: html,
|
||||
html,
|
||||
status: res.statusCode,
|
||||
statusCode: res.statusCode,
|
||||
headers: res.headers,
|
||||
document: window?.document || null,
|
||||
window,
|
||||
status: res.statusCode,
|
||||
ok: res.statusCode >= 200 && res.statusCode <= 299,
|
||||
};
|
||||
}
|
||||
|
||||
const buffer = await res.body.arrayBuffer();
|
||||
const html = Buffer.from(buffer).toString();
|
||||
const window = options?.parse ? new JSDOM(html, { virtualConsole, ...options.extract }).window : null;
|
||||
const pathname = new URL(url).pathname.replace(/\//g, '_');
|
||||
|
||||
if (window && /fetchScene|fetchMovie/.test(new Error().stack)) {
|
||||
windows.set(pathname, window);
|
||||
}
|
||||
|
||||
if (argv.saveHtml) {
|
||||
await fs.writeFile(`./html/${pathname}.html`, html);
|
||||
}
|
||||
|
||||
return {
|
||||
...res,
|
||||
body: res.body,
|
||||
statusCode: res.statusCode,
|
||||
status: res.statusCode,
|
||||
headers: res.headers,
|
||||
body: html,
|
||||
html,
|
||||
document: window?.document || null,
|
||||
window,
|
||||
ok: res.statusCode >= 200 && res.statusCode <= 299,
|
||||
};
|
||||
}
|
||||
|
||||
function getTimeout(options, url) {
|
||||
return new Promise((resolve, reject, onCancel) => {
|
||||
return new Promise((_resolve, reject, onCancel) => {
|
||||
const timeout = setTimeout(() => {
|
||||
logger.debug(`Canceled timed out request to ${url}`);
|
||||
reject(new Error(`URL ${url} timed out`));
|
||||
@@ -441,7 +452,7 @@ async function scheduleRequest(method = 'get', url, body, requestOptions = {}) {
|
||||
|
||||
timeout.cancel();
|
||||
|
||||
const curatedResult = await finalizeResult(result, options);
|
||||
const curatedResult = await finalizeResult(result, url, options);
|
||||
|
||||
logger.silly(`Response ${curatedResult.status} for ${method.toUpperCase()} ${url}`);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user