'use strict'; /*! * Module dependencies. */ var PromiseProvider = require('../../promise_provider'); var async = require('async'); /** * Execute `fn` for every document in the cursor. If `fn` returns a promise, * will wait for the promise to resolve before iterating on to the next one. * Returns a promise that resolves when done. * * @param {Function} next the thunk to call to get the next document * @param {Function} fn * @param {Object} options * @param {Function} [callback] executed when all docs have been processed * @return {Promise} * @api public * @method eachAsync */ module.exports = function eachAsync(next, fn, options, callback) { var Promise = PromiseProvider.get(); var parallel = options.parallel || 1; var handleNextResult = function(doc, callback) { var promise = fn(doc); if (promise && typeof promise.then === 'function') { promise.then( function() { callback(null); }, function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); }); } else { callback(null); } }; var iterate = function(callback) { var drained = false; var nextQueue = async.queue(function(task, cb) { if (drained) return cb(); next(function(err, doc) { if (err) return cb(err); if (!doc) drained = true; cb(null, doc); }); }, 1); var getAndRun = function(cb) { nextQueue.push({}, function(err, doc) { if (err) return cb(err); if (!doc) return cb(); handleNextResult(doc, function(err) { if (err) return cb(err); // Make sure to clear the stack re: gh-4697 setTimeout(function() { getAndRun(cb); }, 0); }); }); }; async.times(parallel, function(n, cb) { getAndRun(cb); }, callback); }; return new Promise.ES6(function(resolve, reject) { iterate(function(error) { if (error) { callback && callback(error); return reject(error); } callback && callback(null); return resolve(); }); }); };