cursor_ops.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. 'use strict';
  2. const buildCountCommand = require('./collection_ops').buildCountCommand;
  3. const formattedOrderClause = require('../utils').formattedOrderClause;
  4. const handleCallback = require('../utils').handleCallback;
  5. const MongoError = require('mongodb-core').MongoError;
  6. const push = Array.prototype.push;
  7. let cursor;
  8. function loadCursor() {
  9. if (!cursor) {
  10. cursor = require('../cursor');
  11. }
  12. return cursor;
  13. }
  14. /**
  15. * Get the count of documents for this cursor.
  16. *
  17. * @method
  18. * @param {Cursor} cursor The Cursor instance on which to count.
  19. * @param {boolean} [applySkipLimit=true] Specifies whether the count command apply limit and skip settings should be applied on the cursor or in the provided options.
  20. * @param {object} [options] Optional settings. See Cursor.prototype.count for a list of options.
  21. * @param {Cursor~countResultCallback} [callback] The result callback.
  22. */
  23. function count(cursor, applySkipLimit, opts, callback) {
  24. if (applySkipLimit) {
  25. if (typeof cursor.cursorSkip() === 'number') opts.skip = cursor.cursorSkip();
  26. if (typeof cursor.cursorLimit() === 'number') opts.limit = cursor.cursorLimit();
  27. }
  28. // Ensure we have the right read preference inheritance
  29. if (opts.readPreference) {
  30. cursor.setReadPreference(opts.readPreference);
  31. }
  32. if (
  33. typeof opts.maxTimeMS !== 'number' &&
  34. cursor.s.cmd &&
  35. typeof cursor.s.cmd.maxTimeMS === 'number'
  36. ) {
  37. opts.maxTimeMS = cursor.s.cmd.maxTimeMS;
  38. }
  39. let options = {};
  40. options.skip = opts.skip;
  41. options.limit = opts.limit;
  42. options.hint = opts.hint;
  43. options.maxTimeMS = opts.maxTimeMS;
  44. // Command
  45. const delimiter = cursor.s.ns.indexOf('.');
  46. options.collectionName = cursor.s.ns.substr(delimiter + 1);
  47. let command;
  48. try {
  49. command = buildCountCommand(cursor, cursor.s.cmd.query, options);
  50. } catch (err) {
  51. return callback(err);
  52. }
  53. // Set cursor server to the same as the topology
  54. cursor.server = cursor.topology.s.coreTopology;
  55. // Execute the command
  56. cursor.s.topology.command(
  57. `${cursor.s.ns.substr(0, delimiter)}.$cmd`,
  58. command,
  59. cursor.s.options,
  60. (err, result) => {
  61. callback(err, result ? result.result.n : null);
  62. }
  63. );
  64. }
  65. /**
  66. * Iterates over all the documents for this cursor. See Cursor.prototype.each for more information.
  67. *
  68. * @method
  69. * @deprecated
  70. * @param {Cursor} cursor The Cursor instance on which to run.
  71. * @param {Cursor~resultCallback} callback The result callback.
  72. */
  73. function each(cursor, callback) {
  74. let Cursor = loadCursor();
  75. if (!callback) throw MongoError.create({ message: 'callback is mandatory', driver: true });
  76. if (cursor.isNotified()) return;
  77. if (cursor.s.state === Cursor.CLOSED || cursor.isDead()) {
  78. return handleCallback(
  79. callback,
  80. MongoError.create({ message: 'Cursor is closed', driver: true })
  81. );
  82. }
  83. if (cursor.s.state === Cursor.INIT) cursor.s.state = Cursor.OPEN;
  84. // Define function to avoid global scope escape
  85. let fn = null;
  86. // Trampoline all the entries
  87. if (cursor.bufferedCount() > 0) {
  88. while ((fn = loop(cursor, callback))) fn(cursor, callback);
  89. each(cursor, callback);
  90. } else {
  91. cursor.next((err, item) => {
  92. if (err) return handleCallback(callback, err);
  93. if (item == null) {
  94. return cursor.close({ skipKillCursors: true }, () => handleCallback(callback, null, null));
  95. }
  96. if (handleCallback(callback, null, item) === false) return;
  97. each(cursor, callback);
  98. });
  99. }
  100. }
  101. /**
  102. * Check if there is any document still available in the cursor.
  103. *
  104. * @method
  105. * @param {Cursor} cursor The Cursor instance on which to run.
  106. * @param {Cursor~resultCallback} [callback] The result callback.
  107. */
  108. function hasNext(cursor, callback) {
  109. let Cursor = loadCursor();
  110. if (cursor.s.currentDoc) {
  111. return callback(null, true);
  112. }
  113. if (cursor.isNotified()) {
  114. return callback(null, false);
  115. }
  116. nextObject(cursor, (err, doc) => {
  117. if (err) return callback(err, null);
  118. if (cursor.s.state === Cursor.CLOSED || cursor.isDead()) return callback(null, false);
  119. if (!doc) return callback(null, false);
  120. cursor.s.currentDoc = doc;
  121. callback(null, true);
  122. });
  123. }
  124. // Trampoline emptying the number of retrieved items
  125. // without incurring a nextTick operation
  126. function loop(cursor, callback) {
  127. // No more items we are done
  128. if (cursor.bufferedCount() === 0) return;
  129. // Get the next document
  130. cursor._next(callback);
  131. // Loop
  132. return loop;
  133. }
  134. /**
  135. * Get the next available document from the cursor. Returns null if no more documents are available.
  136. *
  137. * @method
  138. * @param {Cursor} cursor The Cursor instance from which to get the next document.
  139. * @param {Cursor~resultCallback} [callback] The result callback.
  140. */
  141. function next(cursor, callback) {
  142. // Return the currentDoc if someone called hasNext first
  143. if (cursor.s.currentDoc) {
  144. const doc = cursor.s.currentDoc;
  145. cursor.s.currentDoc = null;
  146. return callback(null, doc);
  147. }
  148. // Return the next object
  149. nextObject(cursor, callback);
  150. }
  151. // Get the next available document from the cursor, returns null if no more documents are available.
  152. function nextObject(cursor, callback) {
  153. let Cursor = loadCursor();
  154. if (cursor.s.state === Cursor.CLOSED || (cursor.isDead && cursor.isDead()))
  155. return handleCallback(
  156. callback,
  157. MongoError.create({ message: 'Cursor is closed', driver: true })
  158. );
  159. if (cursor.s.state === Cursor.INIT && cursor.s.cmd.sort) {
  160. try {
  161. cursor.s.cmd.sort = formattedOrderClause(cursor.s.cmd.sort);
  162. } catch (err) {
  163. return handleCallback(callback, err);
  164. }
  165. }
  166. // Get the next object
  167. cursor._next((err, doc) => {
  168. cursor.s.state = Cursor.OPEN;
  169. if (err) return handleCallback(callback, err);
  170. handleCallback(callback, null, doc);
  171. });
  172. }
  173. /**
  174. * Returns an array of documents. See Cursor.prototype.toArray for more information.
  175. *
  176. * @method
  177. * @param {Cursor} cursor The Cursor instance from which to get the next document.
  178. * @param {Cursor~toArrayResultCallback} [callback] The result callback.
  179. */
  180. function toArray(cursor, callback) {
  181. let Cursor = loadCursor();
  182. const items = [];
  183. // Reset cursor
  184. cursor.rewind();
  185. cursor.s.state = Cursor.INIT;
  186. // Fetch all the documents
  187. const fetchDocs = () => {
  188. cursor._next((err, doc) => {
  189. if (err) {
  190. return cursor._endSession
  191. ? cursor._endSession(() => handleCallback(callback, err))
  192. : handleCallback(callback, err);
  193. }
  194. if (doc == null) {
  195. return cursor.close({ skipKillCursors: true }, () => handleCallback(callback, null, items));
  196. }
  197. // Add doc to items
  198. items.push(doc);
  199. // Get all buffered objects
  200. if (cursor.bufferedCount() > 0) {
  201. let docs = cursor.readBufferedDocuments(cursor.bufferedCount());
  202. // Transform the doc if transform method added
  203. if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') {
  204. docs = docs.map(cursor.s.transforms.doc);
  205. }
  206. push.apply(items, docs);
  207. }
  208. // Attempt a fetch
  209. fetchDocs();
  210. });
  211. };
  212. fetchDocs();
  213. }
  214. module.exports = { count, each, hasNext, next, toArray };