eachAsync.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. var PromiseProvider = require('../../promise_provider');
  6. var async = require('async');
  7. /**
  8. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  9. * will wait for the promise to resolve before iterating on to the next one.
  10. * Returns a promise that resolves when done.
  11. *
  12. * @param {Function} next the thunk to call to get the next document
  13. * @param {Function} fn
  14. * @param {Object} options
  15. * @param {Function} [callback] executed when all docs have been processed
  16. * @return {Promise}
  17. * @api public
  18. * @method eachAsync
  19. */
  20. module.exports = function eachAsync(next, fn, options, callback) {
  21. var Promise = PromiseProvider.get();
  22. var parallel = options.parallel || 1;
  23. var handleNextResult = function(doc, callback) {
  24. var promise = fn(doc);
  25. if (promise && typeof promise.then === 'function') {
  26. promise.then(
  27. function() { callback(null); },
  28. function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
  29. } else {
  30. callback(null);
  31. }
  32. };
  33. var iterate = function(callback) {
  34. var drained = false;
  35. var nextQueue = async.queue(function(task, cb) {
  36. if (drained) return cb();
  37. next(function(err, doc) {
  38. if (err) return cb(err);
  39. if (!doc) drained = true;
  40. cb(null, doc);
  41. });
  42. }, 1);
  43. var getAndRun = function(cb) {
  44. nextQueue.push({}, function(err, doc) {
  45. if (err) return cb(err);
  46. if (!doc) return cb();
  47. handleNextResult(doc, function(err) {
  48. if (err) return cb(err);
  49. // Make sure to clear the stack re: gh-4697
  50. setTimeout(function() {
  51. getAndRun(cb);
  52. }, 0);
  53. });
  54. });
  55. };
  56. async.times(parallel, function(n, cb) {
  57. getAndRun(cb);
  58. }, callback);
  59. };
  60. return new Promise.ES6(function(resolve, reject) {
  61. iterate(function(error) {
  62. if (error) {
  63. callback && callback(error);
  64. return reject(error);
  65. }
  66. callback && callback(null);
  67. return resolve();
  68. });
  69. });
  70. };