12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- 'use strict';
- /*!
- * Module dependencies.
- */
- var PromiseProvider = require('../../promise_provider');
- var async = require('async');
- /**
- * Execute `fn` for every document in the cursor. If `fn` returns a promise,
- * will wait for the promise to resolve before iterating on to the next one.
- * Returns a promise that resolves when done.
- *
- * @param {Function} next the thunk to call to get the next document
- * @param {Function} fn
- * @param {Object} options
- * @param {Function} [callback] executed when all docs have been processed
- * @return {Promise}
- * @api public
- * @method eachAsync
- */
- module.exports = function eachAsync(next, fn, options, callback) {
- var Promise = PromiseProvider.get();
- var parallel = options.parallel || 1;
- var handleNextResult = function(doc, callback) {
- var promise = fn(doc);
- if (promise && typeof promise.then === 'function') {
- promise.then(
- function() { callback(null); },
- function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
- } else {
- callback(null);
- }
- };
- var iterate = function(callback) {
- var drained = false;
- var nextQueue = async.queue(function(task, cb) {
- if (drained) return cb();
- next(function(err, doc) {
- if (err) return cb(err);
- if (!doc) drained = true;
- cb(null, doc);
- });
- }, 1);
- var getAndRun = function(cb) {
- nextQueue.push({}, function(err, doc) {
- if (err) return cb(err);
- if (!doc) return cb();
- handleNextResult(doc, function(err) {
- if (err) return cb(err);
- // Make sure to clear the stack re: gh-4697
- setTimeout(function() {
- getAndRun(cb);
- }, 0);
- });
- });
- };
- async.times(parallel, function(n, cb) {
- getAndRun(cb);
- }, callback);
- };
- return new Promise.ES6(function(resolve, reject) {
- iterate(function(error) {
- if (error) {
- callback && callback(error);
- return reject(error);
- }
- callback && callback(null);
- return resolve();
- });
- });
- };
|