QueryCursor.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. /*!
  2. * Module dependencies.
  3. */
  4. 'use strict';
  5. const Readable = require('stream').Readable;
  6. const eachAsync = require('../helpers/cursor/eachAsync');
  7. const helpers = require('../queryhelpers');
  8. const util = require('util');
  9. const utils = require('../utils');
  10. /**
  11. * A QueryCursor is a concurrency primitive for processing query results
  12. * one document at a time. A QueryCursor fulfills the Node.js streams3 API,
  13. * in addition to several other mechanisms for loading documents from MongoDB
  14. * one at a time.
  15. *
  16. * QueryCursors execute the model's pre find hooks, but **not** the model's
  17. * post find hooks.
  18. *
  19. * Unless you're an advanced user, do **not** instantiate this class directly.
  20. * Use [`Query#cursor()`](/docs/api.html#query_Query-cursor) instead.
  21. *
  22. * @param {Query} query
  23. * @param {Object} options query options passed to `.find()`
  24. * @inherits Readable
  25. * @event `cursor`: Emitted when the cursor is created
  26. * @event `error`: Emitted when an error occurred
  27. * @event `data`: Emitted when the stream is flowing and the next doc is ready
  28. * @event `end`: Emitted when the stream is exhausted
  29. * @api public
  30. */
  31. function QueryCursor(query, options) {
  32. Readable.call(this, { objectMode: true });
  33. this.cursor = null;
  34. this.query = query;
  35. const _this = this;
  36. const model = query.model;
  37. this._mongooseOptions = {};
  38. this._transforms = [];
  39. this.model = model;
  40. model.hooks.execPre('find', query, () => {
  41. this._transforms = this._transforms.concat(query._transforms.slice());
  42. if (options.transform) {
  43. this._transforms.push(options.transform);
  44. }
  45. model.collection.find(query._conditions, options, function(err, cursor) {
  46. if (_this._error) {
  47. cursor.close(function() {});
  48. _this.listeners('error').length > 0 && _this.emit('error', _this._error);
  49. }
  50. if (err) {
  51. return _this.emit('error', err);
  52. }
  53. _this.cursor = cursor;
  54. _this.emit('cursor', cursor);
  55. });
  56. });
  57. }
  58. util.inherits(QueryCursor, Readable);
  59. /*!
  60. * Necessary to satisfy the Readable API
  61. */
  62. QueryCursor.prototype._read = function() {
  63. const _this = this;
  64. _next(this, function(error, doc) {
  65. if (error) {
  66. return _this.emit('error', error);
  67. }
  68. if (!doc) {
  69. _this.push(null);
  70. _this.cursor.close(function(error) {
  71. if (error) {
  72. return _this.emit('error', error);
  73. }
  74. setTimeout(function() {
  75. _this.emit('close');
  76. }, 0);
  77. });
  78. return;
  79. }
  80. _this.push(doc);
  81. });
  82. };
  83. /**
  84. * Registers a transform function which subsequently maps documents retrieved
  85. * via the streams interface or `.next()`
  86. *
  87. * ####Example
  88. *
  89. * // Map documents returned by `data` events
  90. * Thing.
  91. * find({ name: /^hello/ }).
  92. * cursor().
  93. * map(function (doc) {
  94. * doc.foo = "bar";
  95. * return doc;
  96. * })
  97. * on('data', function(doc) { console.log(doc.foo); });
  98. *
  99. * // Or map documents returned by `.next()`
  100. * var cursor = Thing.find({ name: /^hello/ }).
  101. * cursor().
  102. * map(function (doc) {
  103. * doc.foo = "bar";
  104. * return doc;
  105. * });
  106. * cursor.next(function(error, doc) {
  107. * console.log(doc.foo);
  108. * });
  109. *
  110. * @param {Function} fn
  111. * @return {QueryCursor}
  112. * @api public
  113. * @method map
  114. */
  115. QueryCursor.prototype.map = function(fn) {
  116. this._transforms.push(fn);
  117. return this;
  118. };
  119. /*!
  120. * Marks this cursor as errored
  121. */
  122. QueryCursor.prototype._markError = function(error) {
  123. this._error = error;
  124. return this;
  125. };
  126. /**
  127. * Marks this cursor as closed. Will stop streaming and subsequent calls to
  128. * `next()` will error.
  129. *
  130. * @param {Function} callback
  131. * @return {Promise}
  132. * @api public
  133. * @method close
  134. * @emits close
  135. * @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
  136. */
  137. QueryCursor.prototype.close = function(callback) {
  138. return utils.promiseOrCallback(callback, cb => {
  139. this.cursor.close(error => {
  140. if (error) {
  141. cb(error);
  142. return this.listeners('error').length > 0 && this.emit('error', error);
  143. }
  144. this.emit('close');
  145. cb(null);
  146. });
  147. }, this.model.events);
  148. };
  149. /**
  150. * Get the next document from this cursor. Will return `null` when there are
  151. * no documents left.
  152. *
  153. * @param {Function} callback
  154. * @return {Promise}
  155. * @api public
  156. * @method next
  157. */
  158. QueryCursor.prototype.next = function(callback) {
  159. return utils.promiseOrCallback(callback, cb => {
  160. _next(this, function(error, doc) {
  161. if (error) {
  162. return cb(error);
  163. }
  164. cb(null, doc);
  165. });
  166. }, this.model.events);
  167. };
  168. /**
  169. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  170. * will wait for the promise to resolve before iterating on to the next one.
  171. * Returns a promise that resolves when done.
  172. *
  173. * @param {Function} fn
  174. * @param {Object} [options]
  175. * @param {Number} [options.parallel] the number of promises to execute in parallel. Defaults to 1.
  176. * @param {Function} [callback] executed when all docs have been processed
  177. * @return {Promise}
  178. * @api public
  179. * @method eachAsync
  180. */
  181. QueryCursor.prototype.eachAsync = function(fn, opts, callback) {
  182. const _this = this;
  183. if (typeof opts === 'function') {
  184. callback = opts;
  185. opts = {};
  186. }
  187. opts = opts || {};
  188. return eachAsync(function(cb) { return _next(_this, cb); }, fn, opts, callback);
  189. };
  190. /**
  191. * Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
  192. * Useful for setting the `noCursorTimeout` and `tailable` flags.
  193. *
  194. * @param {String} flag
  195. * @param {Boolean} value
  196. * @return {AggregationCursor} this
  197. * @api public
  198. * @method addCursorFlag
  199. */
  200. QueryCursor.prototype.addCursorFlag = function(flag, value) {
  201. const _this = this;
  202. _waitForCursor(this, function() {
  203. _this.cursor.addCursorFlag(flag, value);
  204. });
  205. return this;
  206. };
  207. /*!
  208. * ignore
  209. */
  210. QueryCursor.prototype.transformNull = function(val) {
  211. if (arguments.length === 0) {
  212. val = true;
  213. }
  214. this._mongooseOptions.transformNull = val;
  215. return this;
  216. };
  217. /*!
  218. * Get the next doc from the underlying cursor and mongooseify it
  219. * (populate, etc.)
  220. */
  221. function _next(ctx, cb) {
  222. let callback = cb;
  223. if (ctx._transforms.length) {
  224. callback = function(err, doc) {
  225. if (err || (doc === null && !ctx._mongooseOptions.transformNull)) {
  226. return cb(err, doc);
  227. }
  228. cb(err, ctx._transforms.reduce(function(doc, fn) {
  229. return fn.call(ctx, doc);
  230. }, doc));
  231. };
  232. }
  233. if (ctx._error) {
  234. return process.nextTick(function() {
  235. callback(ctx._error);
  236. });
  237. }
  238. if (ctx.cursor) {
  239. return ctx.cursor.next(function(error, doc) {
  240. if (error) {
  241. return callback(error);
  242. }
  243. if (!doc) {
  244. return callback(null, null);
  245. }
  246. const opts = ctx.query._mongooseOptions;
  247. if (!opts.populate) {
  248. return opts.lean ?
  249. callback(null, doc) :
  250. _create(ctx, doc, null, callback);
  251. }
  252. const pop = helpers.preparePopulationOptionsMQ(ctx.query,
  253. ctx.query._mongooseOptions);
  254. pop.__noPromise = true;
  255. ctx.query.model.populate(doc, pop, function(err, doc) {
  256. if (err) {
  257. return callback(err);
  258. }
  259. return opts.lean ?
  260. callback(null, doc) :
  261. _create(ctx, doc, pop, callback);
  262. });
  263. });
  264. } else {
  265. ctx.once('cursor', function() {
  266. _next(ctx, cb);
  267. });
  268. }
  269. }
  270. /*!
  271. * ignore
  272. */
  273. function _waitForCursor(ctx, cb) {
  274. if (ctx.cursor) {
  275. return cb();
  276. }
  277. ctx.once('cursor', function() {
  278. cb();
  279. });
  280. }
  281. /*!
  282. * Convert a raw doc into a full mongoose doc.
  283. */
  284. function _create(ctx, doc, populatedIds, cb) {
  285. const instance = helpers.createModel(ctx.query.model, doc, ctx.query._fields);
  286. const opts = populatedIds ?
  287. { populated: populatedIds } :
  288. undefined;
  289. instance.init(doc, opts, function(err) {
  290. if (err) {
  291. return cb(err);
  292. }
  293. cb(null, instance);
  294. });
  295. }
  296. module.exports = QueryCursor;