eachAsync.js 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const async = require('async');
  6. const utils = require('../../utils');
  7. /**
  8. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  9. * will wait for the promise to resolve before iterating on to the next one.
  10. * Returns a promise that resolves when done.
  11. *
  12. * @param {Function} next the thunk to call to get the next document
  13. * @param {Function} fn
  14. * @param {Object} options
  15. * @param {Function} [callback] executed when all docs have been processed
  16. * @return {Promise}
  17. * @api public
  18. * @method eachAsync
  19. */
  20. module.exports = function eachAsync(next, fn, options, callback) {
  21. const parallel = options.parallel || 1;
  22. const handleNextResult = function(doc, callback) {
  23. const promise = fn(doc);
  24. if (promise && typeof promise.then === 'function') {
  25. promise.then(
  26. function() { callback(null); },
  27. function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
  28. } else {
  29. callback(null);
  30. }
  31. };
  32. const iterate = function(callback) {
  33. let drained = false;
  34. const nextQueue = async.queue(function(task, cb) {
  35. if (drained) return cb();
  36. next(function(err, doc) {
  37. if (err) return cb(err);
  38. if (!doc) drained = true;
  39. cb(null, doc);
  40. });
  41. }, 1);
  42. const getAndRun = function(cb) {
  43. nextQueue.push({}, function(err, doc) {
  44. if (err) return cb(err);
  45. if (!doc) return cb();
  46. handleNextResult(doc, function(err) {
  47. if (err) return cb(err);
  48. // Make sure to clear the stack re: gh-4697
  49. setTimeout(function() {
  50. getAndRun(cb);
  51. }, 0);
  52. });
  53. });
  54. };
  55. async.times(parallel, function(n, cb) {
  56. getAndRun(cb);
  57. }, callback);
  58. };
  59. return utils.promiseOrCallback(callback, cb => {
  60. iterate(cb);
  61. });
  62. };