Using new HTTP module with a dynamic rate limiter.

This commit is contained in:
DebaucheryLibrarian
2020-11-22 04:07:09 +01:00
parent 5d0fe44130
commit b9b777c621
27 changed files with 358 additions and 175 deletions

146
src/utils/http-legacy.js Normal file
View File

@@ -0,0 +1,146 @@
'use strict';
const util = require('util');
const stream = require('stream');
const config = require('config');
const tunnel = require('tunnel');
const bhttp = require('@thependulum/bhttp');
const taskQueue = require('promise-task-queue');
const pipeline = util.promisify(stream.pipeline);
const logger = require('../logger')(__filename);
const defaultHeaders = {
'user-agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1',
};
const defaultOptions = {
responseTimeout: 30000,
};
const proxyAgent = tunnel.httpsOverHttp({
proxy: {
host: config.proxy.host,
port: config.proxy.port,
},
});
function useProxy(url) {
if (!config.proxy.enable) {
return false;
}
const { hostname } = new URL(url);
return config.proxy.hostnames.includes(hostname);
}
const queue = taskQueue();
const defaultQueueMethod = '20p';
async function handler({
url,
method = 'GET',
body,
headers = {},
options = {},
}) {
if (body) {
logger.silly(`${method.toUpperCase()} ${url} with ${JSON.stringify(body)} ${options.queueMethod || defaultQueueMethod}`);
} else {
logger.silly(`${method.toUpperCase()} ${url} ${options.queueMethod || defaultQueueMethod}`);
}
const reqOptions = {
headers: {
...(options?.defaultHeaders !== false && defaultHeaders),
...headers,
},
...defaultOptions,
...options,
...(options?.timeout && { responseTimeout: options?.timeout }),
};
if (useProxy(url)) {
reqOptions.agent = proxyAgent;
}
const res = ['POST', 'PUT', 'PATCH'].includes(method.toUpperCase())
? await (options.useSession || bhttp)[method.toLowerCase()](url, body, reqOptions)
: await (options.useSession || bhttp)[method.toLowerCase()](url, reqOptions);
if (options?.stream && options?.destination) {
await pipeline(res, ...(options?.transforms || []), options?.destination);
}
const html = Buffer.isBuffer(res.body) ? res.body.toString() : null;
const json = Buffer.isBuffer(res.body) ? null : res.body;
return {
...res,
originalRes: res,
html,
json,
pipe: res.pipe,
ok: res.statusCode >= 200 && res.statusCode <= 299,
code: res.statusCode,
status: res.statusCode,
};
}
queue.on('concurrencyReached:http', () => {
logger.silly('Queueing requests');
});
queue.define('20p', handler, {
concurrency: 20,
});
queue.define('1s', handler, {
interval: 1,
});
queue.define('5s', handler, {
interval: 5,
});
async function get(url, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'GET',
url,
headers,
options,
});
}
async function head(url, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'HEAD',
url,
headers,
options,
});
}
async function post(url, body, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'POST',
url,
body,
headers,
options,
});
}
function session(headers, options) {
return bhttp.session({
headers,
options,
});
}
module.exports = {
get,
post,
head,
session,
};

View File

@@ -1,21 +1,23 @@
'use strict';
const config = require('config');
const bhttp = require('bhttp');
const util = require('util');
const stream = require('stream');
const config = require('config');
const tunnel = require('tunnel');
const bhttp = require('@thependulum/bhttp');
const taskQueue = require('promise-task-queue');
const Bottleneck = require('bottleneck');
const { JSDOM } = require('jsdom');
const pipeline = util.promisify(stream.pipeline);
const logger = require('../logger')(__filename);
const defaultHeaders = {
'user-agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1',
};
const pipeline = util.promisify(stream.pipeline);
const limiters = {};
const defaultOptions = {
responseTimeout: 30000,
encodeJSON: true,
headers: {
'user-agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1',
},
};
const proxyAgent = tunnel.httpsOverHttp({
@@ -34,113 +36,114 @@ function useProxy(url) {
return config.proxy.hostnames.includes(hostname);
}
const queue = taskQueue();
const defaultQueueMethod = '20p';
function getLimiter(limit = {}) {
const interval = limit.interval === undefined ? config.limits.default.interval : limit.interval;
const concurrency = limit.concurrency === undefined ? config.limits.default.concurrency : limit.concurrency;
async function handler({
url,
method = 'GET',
body,
headers = {},
options = {},
}) {
if (body) {
logger.silly(`${method.toUpperCase()} ${url} with ${JSON.stringify(body)} ${options.queueMethod || defaultQueueMethod}`);
} else {
logger.silly(`${method.toUpperCase()} ${url} ${options.queueMethod || defaultQueueMethod}`);
if (!limiters[interval]?.[concurrency]) {
limiters[interval] = limiters[interval] || {};
limiters[interval][concurrency] = new Bottleneck({
minTime: interval,
maxConcurrent: concurrency,
});
}
const reqOptions = {
headers: {
...(options?.defaultHeaders !== false && defaultHeaders),
...headers,
},
return limiters[interval][concurrency];
}
async function request(method = 'get', url, body, requestOptions = {}) {
const http = requestOptions.session || bhttp;
const options = {
...defaultOptions,
...options,
...(options?.timeout && { responseTimeout: options?.timeout }),
...requestOptions,
responseTimeout: requestOptions.responseTimeout || requestOptions.timeout || 60000,
stream: !!requestOptions.destination,
interval: requestOptions.interval || config.limits.default.interval,
concurrency: requestOptions.concurrency || config.limits.default.concurrency,
session: null,
};
if (useProxy(url)) {
reqOptions.agent = proxyAgent;
options.agent = proxyAgent;
}
const res = ['POST', 'PUT', 'PATCH'].includes(method.toUpperCase())
? await (options.useSession || bhttp)[method.toLowerCase()](url, body, reqOptions)
: await (options.useSession || bhttp)[method.toLowerCase()](url, reqOptions);
logger.debug(`GET (${options.interval}ms/${options.concurrency}p) ${url}`);
if (options?.stream && options?.destination) {
await pipeline(res, ...(options?.transforms || []), options?.destination);
const res = await (body
? http[method](url, body, options)
: http[method](url, options));
const resIsOk = res.statusCode >= 200 && res.statusCode <= 299;
if (options.destination) {
// res.on('progress', (bytes, totalBytes) => logger.silly(`Downloaded ${Math.round((bytes / totalBytes) * 100)}% of ${url}`));
await pipeline(res, ...(options.transforms || []), options.destination);
}
const html = Buffer.isBuffer(res.body) ? res.body.toString() : null;
const json = Buffer.isBuffer(res.body) ? null : res.body;
if (Buffer.isBuffer(res.body)) {
const html = res.body.toString();
const window = new JSDOM(html).window;
return {
...res,
body: html,
html,
status: res.statusCode,
document: window.document,
window,
ok: resIsOk,
};
}
return {
...res,
originalRes: res,
html,
json,
pipe: res.pipe,
ok: res.statusCode >= 200 && res.statusCode <= 299,
code: res.statusCode,
body: res.body,
status: res.statusCode,
ok: res.statusCode >= 200 && res.statusCode <= 299,
};
}
queue.on('concurrencyReached:http', () => {
logger.silly('Queueing requests');
});
queue.define('20p', handler, {
concurrency: 20,
});
queue.define('1s', handler, {
interval: 1,
});
queue.define('5s', handler, {
interval: 5,
});
async function get(url, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'GET',
url,
headers,
options,
});
async function scheduleRequest(method = 'get', url, body, options) {
return getLimiter(options || {}).schedule(() => request(method, url, body, options));
}
async function head(url, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'HEAD',
url,
headers,
options,
});
async function get(url, options) {
return scheduleRequest('get', url, null, options);
}
async function post(url, body, headers, options) {
return queue.push(options?.queueMethod || defaultQueueMethod, {
method: 'POST',
url,
body,
headers,
options,
});
async function post(url, body, options) {
return scheduleRequest('post', url, body, options);
}
function session(headers, options) {
return bhttp.session({
headers,
options,
});
async function put(url, body, options) {
return scheduleRequest('put', url, body, options);
}
async function patch(url, body, options) {
return scheduleRequest('patch', url, body, options);
}
async function del(url, options) {
return scheduleRequest('delete', url, null, options);
}
async function head(url, options) {
return scheduleRequest('head', url, null, options);
}
function getSession(options) {
return bhttp.session(options);
}
module.exports = {
get,
post,
head,
session,
post,
delete: del,
put,
patch,
session: getSession,
};

View File

@@ -457,8 +457,8 @@ function extractAll(htmlValue, selector) {
async function request(method = 'get', urlValue, body, selector, headers, options, queryAll = false) {
const res = await (method === 'post'
? http.post(urlValue, body, headers, options)
: http[method](urlValue, headers, options));
? http.post(urlValue, body, { ...options, headers })
: http[method](urlValue, { ...options, headers }));
if (res.ok) {
const item = queryAll
@@ -494,7 +494,7 @@ async function post(urlValue, body, selector, headers, options) {
}
async function getAll(urlValue, selector, headers, options) {
return request('get,', urlValue, selector, headers, options, true);
return request('get', urlValue, null, selector, headers, options, true);
}
async function postAll(urlValue, body, selector, headers, options) {