deno.land / x / mongoose@6.7.5 / lib / helpers / cursor / eachAsync.js
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223'use strict';
/*! * Module dependencies. */
const EachAsyncMultiError = require('../../error/eachAsyncMultiError');const immediate = require('../immediate');const promiseOrCallback = require('../promiseOrCallback');
/** * Execute `fn` for every document in the cursor. If `fn` returns a promise, * will wait for the promise to resolve before iterating on to the next one. * Returns a promise that resolves when done. * * @param {Function} next the thunk to call to get the next document * @param {Function} fn * @param {Object} options * @param {Number} [options.batchSize=null] if set, Mongoose will call `fn` with an array of at most `batchSize` documents, instead of a single document * @param {Number} [options.parallel=1] maximum number of `fn` calls that Mongoose will run in parallel * @param {AbortSignal} [options.signal] allow cancelling this eachAsync(). Once the abort signal is fired, `eachAsync()` will immediately fulfill the returned promise (or call the callback) and not fetch any more documents. * @param {Function} [callback] executed when all docs have been processed * @return {Promise} * @api public * @method eachAsync */
module.exports = function eachAsync(next, fn, options, callback) { const parallel = options.parallel || 1; const batchSize = options.batchSize; const signal = options.signal; const continueOnError = options.continueOnError; const aggregatedErrors = []; const enqueue = asyncQueue();
let aborted = false;
return promiseOrCallback(callback, cb => { if (signal != null) { if (signal.aborted) { return cb(null); }
signal.addEventListener('abort', () => { aborted = true; return cb(null); }, { once: true }); }
if (batchSize != null) { if (typeof batchSize !== 'number') { throw new TypeError('batchSize must be a number'); } else if (!Number.isInteger(batchSize)) { throw new TypeError('batchSize must be an integer'); } else if (batchSize < 1) { throw new TypeError('batchSize must be at least 1'); } }
iterate(cb); });
function iterate(finalCallback) { let handleResultsInProgress = 0; let currentDocumentIndex = 0;
let error = null; for (let i = 0; i < parallel; ++i) { enqueue(createFetch()); }
function createFetch() { let documentsBatch = []; let drained = false;
return fetch;
function fetch(done) { if (drained || aborted) { return done(); } else if (error) { return done(); }
next(function(err, doc) { if (error != null) { return done(); } if (err != null) { if (err.name === 'MongoCursorExhaustedError') { // We may end up calling `next()` multiple times on an exhausted // cursor, which leads to an error. In case cursor is exhausted, // just treat it as if the cursor returned no document, which is // how a cursor indicates it is exhausted. doc = null; } else if (continueOnError) { aggregatedErrors.push(err); } else { error = err; finalCallback(err); return done(); } } if (doc == null) { drained = true; if (handleResultsInProgress <= 0) { const finalErr = continueOnError ? createEachAsyncMultiError(aggregatedErrors) : error;
finalCallback(finalErr); } else if (batchSize && documentsBatch.length) { handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack); } return done(); }
++handleResultsInProgress;
// Kick off the subsequent `next()` before handling the result, but // make sure we know that we still have a result to handle re: #8422 immediate(() => done());
if (batchSize) { documentsBatch.push(doc); }
// If the current documents size is less than the provided batch size don't process the documents yet if (batchSize && documentsBatch.length !== batchSize) { immediate(() => enqueue(fetch)); return; }
const docsToProcess = batchSize ? documentsBatch : doc;
function handleNextResultCallBack(err) { if (batchSize) { handleResultsInProgress -= documentsBatch.length; documentsBatch = []; } else { --handleResultsInProgress; } if (err != null) { if (continueOnError) { aggregatedErrors.push(err); } else { error = err; return finalCallback(err); } } if ((drained || aborted) && handleResultsInProgress <= 0) { const finalErr = continueOnError ? createEachAsyncMultiError(aggregatedErrors) : error; return finalCallback(finalErr); }
immediate(() => enqueue(fetch)); }
handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack); }); } } }
function handleNextResult(doc, i, callback) { let maybePromise; try { maybePromise = fn(doc, i); } catch (err) { return callback(err); } if (maybePromise && typeof maybePromise.then === 'function') { maybePromise.then( function() { callback(null); }, function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); }); } else { callback(null); } }};
// `next()` can only execute one at a time, so make sure we always execute// `next()` in series, while still allowing multiple `fn()` instances to run// in parallel.function asyncQueue() { const _queue = []; let inProgress = null; let id = 0;
return function enqueue(fn) { if ( inProgress === null && _queue.length === 0 ) { inProgress = id++; return fn(_step); } _queue.push(fn); };
function _step() { if (_queue.length !== 0) { inProgress = id++; const fn = _queue.shift(); fn(_step); } else { inProgress = null; } }}
function createEachAsyncMultiError(aggregatedErrors) { if (aggregatedErrors.length === 0) { return null; }
return new EachAsyncMultiError(aggregatedErrors);}
Version Info