deno.land / x / mongoose@6.7.5 / lib / helpers / cursor / eachAsync.js

نووسراو ببینە
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
'use strict';
/*! * Module dependencies. */
const EachAsyncMultiError = require('../../error/eachAsyncMultiError');const immediate = require('../immediate');const promiseOrCallback = require('../promiseOrCallback');
/** * Execute `fn` for every document in the cursor. If `fn` returns a promise, * will wait for the promise to resolve before iterating on to the next one. * Returns a promise that resolves when done. * * @param {Function} next the thunk to call to get the next document * @param {Function} fn * @param {Object} options * @param {Number} [options.batchSize=null] if set, Mongoose will call `fn` with an array of at most `batchSize` documents, instead of a single document * @param {Number} [options.parallel=1] maximum number of `fn` calls that Mongoose will run in parallel * @param {AbortSignal} [options.signal] allow cancelling this eachAsync(). Once the abort signal is fired, `eachAsync()` will immediately fulfill the returned promise (or call the callback) and not fetch any more documents. * @param {Function} [callback] executed when all docs have been processed * @return {Promise} * @api public * @method eachAsync */
module.exports = function eachAsync(next, fn, options, callback) { const parallel = options.parallel || 1; const batchSize = options.batchSize; const signal = options.signal; const continueOnError = options.continueOnError; const aggregatedErrors = []; const enqueue = asyncQueue();
let aborted = false;
return promiseOrCallback(callback, cb => { if (signal != null) { if (signal.aborted) { return cb(null); }
signal.addEventListener('abort', () => { aborted = true; return cb(null); }, { once: true }); }
if (batchSize != null) { if (typeof batchSize !== 'number') { throw new TypeError('batchSize must be a number'); } else if (!Number.isInteger(batchSize)) { throw new TypeError('batchSize must be an integer'); } else if (batchSize < 1) { throw new TypeError('batchSize must be at least 1'); } }
iterate(cb); });
function iterate(finalCallback) { let handleResultsInProgress = 0; let currentDocumentIndex = 0;
let error = null; for (let i = 0; i < parallel; ++i) { enqueue(createFetch()); }
function createFetch() { let documentsBatch = []; let drained = false;
return fetch;
function fetch(done) { if (drained || aborted) { return done(); } else if (error) { return done(); }
next(function(err, doc) { if (error != null) { return done(); } if (err != null) { if (err.name === 'MongoCursorExhaustedError') { // We may end up calling `next()` multiple times on an exhausted // cursor, which leads to an error. In case cursor is exhausted, // just treat it as if the cursor returned no document, which is // how a cursor indicates it is exhausted. doc = null; } else if (continueOnError) { aggregatedErrors.push(err); } else { error = err; finalCallback(err); return done(); } } if (doc == null) { drained = true; if (handleResultsInProgress <= 0) { const finalErr = continueOnError ? createEachAsyncMultiError(aggregatedErrors) : error;
finalCallback(finalErr); } else if (batchSize && documentsBatch.length) { handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack); } return done(); }
++handleResultsInProgress;
// Kick off the subsequent `next()` before handling the result, but // make sure we know that we still have a result to handle re: #8422 immediate(() => done());
if (batchSize) { documentsBatch.push(doc); }
// If the current documents size is less than the provided batch size don't process the documents yet if (batchSize && documentsBatch.length !== batchSize) { immediate(() => enqueue(fetch)); return; }
const docsToProcess = batchSize ? documentsBatch : doc;
function handleNextResultCallBack(err) { if (batchSize) { handleResultsInProgress -= documentsBatch.length; documentsBatch = []; } else { --handleResultsInProgress; } if (err != null) { if (continueOnError) { aggregatedErrors.push(err); } else { error = err; return finalCallback(err); } } if ((drained || aborted) && handleResultsInProgress <= 0) { const finalErr = continueOnError ? createEachAsyncMultiError(aggregatedErrors) : error; return finalCallback(finalErr); }
immediate(() => enqueue(fetch)); }
handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack); }); } } }
function handleNextResult(doc, i, callback) { let maybePromise; try { maybePromise = fn(doc, i); } catch (err) { return callback(err); } if (maybePromise && typeof maybePromise.then === 'function') { maybePromise.then( function() { callback(null); }, function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); }); } else { callback(null); } }};
// `next()` can only execute one at a time, so make sure we always execute// `next()` in series, while still allowing multiple `fn()` instances to run// in parallel.function asyncQueue() { const _queue = []; let inProgress = null; let id = 0;
return function enqueue(fn) { if ( inProgress === null && _queue.length === 0 ) { inProgress = id++; return fn(_step); } _queue.push(fn); };
function _step() { if (_queue.length !== 0) { inProgress = id++; const fn = _queue.shift(); fn(_step); } else { inProgress = null; } }}
function createEachAsyncMultiError(aggregatedErrors) { if (aggregatedErrors.length === 0) { return null; }
return new EachAsyncMultiError(aggregatedErrors);}
mongoose

Version Info

Tagged at
a year ago