change_stream.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const isResumableError = require('./error').isResumableError;
  4. const MongoError = require('mongodb-core').MongoError;
  5. var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];
  6. const CHANGE_DOMAIN_TYPES = {
  7. COLLECTION: Symbol('Collection'),
  8. DATABASE: Symbol('Database'),
  9. CLUSTER: Symbol('Cluster')
  10. };
  11. /**
  12. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  13. * @class ChangeStream
  14. * @since 3.0.0
  15. * @param {(MongoClient|Db|Collection)} changeDomain The domain against which to create the change stream
  16. * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  17. * @param {object} [options] Optional settings
  18. * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  19. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
  20. * @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
  21. * @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  22. * @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  23. * @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
  24. * @fires ChangeStream#close
  25. * @fires ChangeStream#change
  26. * @fires ChangeStream#end
  27. * @fires ChangeStream#error
  28. * @return {ChangeStream} a ChangeStream instance.
  29. */
  30. class ChangeStream extends EventEmitter {
  31. constructor(changeDomain, pipeline, options) {
  32. super();
  33. const Collection = require('./collection');
  34. const Db = require('./db');
  35. const MongoClient = require('./mongo_client');
  36. this.pipeline = pipeline || [];
  37. this.options = options || {};
  38. this.cursorNamespace = undefined;
  39. this.namespace = {};
  40. if (changeDomain instanceof Collection) {
  41. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  42. this.topology = changeDomain.s.db.serverConfig;
  43. this.namespace = {
  44. collection: changeDomain.collectionName,
  45. database: changeDomain.s.db.databaseName
  46. };
  47. this.cursorNamespace = `${this.namespace.database}.${this.namespace.collection}`;
  48. } else if (changeDomain instanceof Db) {
  49. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  50. this.namespace = { collection: '', database: changeDomain.databaseName };
  51. this.cursorNamespace = this.namespace.database;
  52. this.topology = changeDomain.serverConfig;
  53. } else if (changeDomain instanceof MongoClient) {
  54. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  55. this.namespace = { collection: '', database: 'admin' };
  56. this.cursorNamespace = this.namespace.database;
  57. this.topology = changeDomain.topology;
  58. } else {
  59. throw new TypeError(
  60. 'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
  61. );
  62. }
  63. this.promiseLibrary = changeDomain.s.promiseLibrary;
  64. if (!this.options.readPreference && changeDomain.s.readPreference) {
  65. this.options.readPreference = changeDomain.s.readPreference;
  66. }
  67. // We need to get the operationTime as early as possible
  68. const isMaster = this.topology.lastIsMaster();
  69. if (!isMaster) {
  70. throw new MongoError('Topology does not have an ismaster yet.');
  71. }
  72. this.operationTime = isMaster.operationTime;
  73. // Create contained Change Stream cursor
  74. this.cursor = createChangeStreamCursor(this);
  75. // Listen for any `change` listeners being added to ChangeStream
  76. this.on('newListener', eventName => {
  77. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  78. this.cursor.on('data', change =>
  79. processNewChange({ changeStream: this, change, eventEmitter: true })
  80. );
  81. }
  82. });
  83. // Listen for all `change` listeners being removed from ChangeStream
  84. this.on('removeListener', eventName => {
  85. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  86. this.cursor.removeAllListeners('data');
  87. }
  88. });
  89. }
  90. /**
  91. * Check if there is any document still available in the Change Stream
  92. * @function ChangeStream.prototype.hasNext
  93. * @param {ChangeStream~resultCallback} [callback] The result callback.
  94. * @throws {MongoError}
  95. * @return {Promise} returns Promise if no callback passed
  96. */
  97. hasNext(callback) {
  98. return this.cursor.hasNext(callback);
  99. }
  100. /**
  101. * Get the next available document from the Change Stream, returns null if no more documents are available.
  102. * @function ChangeStream.prototype.next
  103. * @param {ChangeStream~resultCallback} [callback] The result callback.
  104. * @throws {MongoError}
  105. * @return {Promise} returns Promise if no callback passed
  106. */
  107. next(callback) {
  108. var self = this;
  109. if (this.isClosed()) {
  110. if (callback) return callback(new Error('Change Stream is not open.'), null);
  111. return self.promiseLibrary.reject(new Error('Change Stream is not open.'));
  112. }
  113. return this.cursor
  114. .next()
  115. .then(change => processNewChange({ changeStream: self, change, callback }))
  116. .catch(error => processNewChange({ changeStream: self, error, callback }));
  117. }
  118. /**
  119. * Is the cursor closed
  120. * @method ChangeStream.prototype.isClosed
  121. * @return {boolean}
  122. */
  123. isClosed() {
  124. if (this.cursor) {
  125. return this.cursor.isClosed();
  126. }
  127. return true;
  128. }
  129. /**
  130. * Close the Change Stream
  131. * @method ChangeStream.prototype.close
  132. * @param {ChangeStream~resultCallback} [callback] The result callback.
  133. * @return {Promise} returns Promise if no callback passed
  134. */
  135. close(callback) {
  136. if (!this.cursor) {
  137. if (callback) return callback();
  138. return this.promiseLibrary.resolve();
  139. }
  140. // Tidy up the existing cursor
  141. var cursor = this.cursor;
  142. delete this.cursor;
  143. return cursor.close(callback);
  144. }
  145. /**
  146. * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
  147. * @method
  148. * @param {Writable} destination The destination for writing data
  149. * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
  150. * @return {null}
  151. */
  152. pipe(destination, options) {
  153. if (!this.pipeDestinations) {
  154. this.pipeDestinations = [];
  155. }
  156. this.pipeDestinations.push(destination);
  157. return this.cursor.pipe(destination, options);
  158. }
  159. /**
  160. * This method will remove the hooks set up for a previous pipe() call.
  161. * @param {Writable} [destination] The destination for writing data
  162. * @return {null}
  163. */
  164. unpipe(destination) {
  165. if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
  166. this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
  167. }
  168. return this.cursor.unpipe(destination);
  169. }
  170. /**
  171. * Return a modified Readable stream including a possible transform method.
  172. * @method
  173. * @param {object} [options] Optional settings.
  174. * @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
  175. * @return {Cursor}
  176. */
  177. stream(options) {
  178. this.streamOptions = options;
  179. return this.cursor.stream(options);
  180. }
  181. /**
  182. * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
  183. * @return {null}
  184. */
  185. pause() {
  186. return this.cursor.pause();
  187. }
  188. /**
  189. * This method will cause the readable stream to resume emitting data events.
  190. * @return {null}
  191. */
  192. resume() {
  193. return this.cursor.resume();
  194. }
  195. }
  196. // Create a new change stream cursor based on self's configuration
  197. var createChangeStreamCursor = function(self) {
  198. if (self.resumeToken) {
  199. self.options.resumeAfter = self.resumeToken;
  200. }
  201. var changeStreamCursor = buildChangeStreamAggregationCommand(self);
  202. /**
  203. * Fired for each new matching change in the specified namespace. Attaching a `change`
  204. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  205. * then be passed as soon as it is available.
  206. *
  207. * @event ChangeStream#change
  208. * @type {object}
  209. */
  210. if (self.listenerCount('change') > 0) {
  211. changeStreamCursor.on('data', function(change) {
  212. processNewChange({ changeStream: self, change, eventEmitter: true });
  213. });
  214. }
  215. /**
  216. * Change stream close event
  217. *
  218. * @event ChangeStream#close
  219. * @type {null}
  220. */
  221. changeStreamCursor.on('close', function() {
  222. self.emit('close');
  223. });
  224. /**
  225. * Change stream end event
  226. *
  227. * @event ChangeStream#end
  228. * @type {null}
  229. */
  230. changeStreamCursor.on('end', function() {
  231. self.emit('end');
  232. });
  233. /**
  234. * Fired when the stream encounters an error.
  235. *
  236. * @event ChangeStream#error
  237. * @type {Error}
  238. */
  239. changeStreamCursor.on('error', function(error) {
  240. processNewChange({ changeStream: self, error, eventEmitter: true });
  241. });
  242. if (self.pipeDestinations) {
  243. const cursorStream = changeStreamCursor.stream(self.streamOptions);
  244. for (let pipeDestination in self.pipeDestinations) {
  245. cursorStream.pipe(pipeDestination);
  246. }
  247. }
  248. return changeStreamCursor;
  249. };
  250. function getResumeToken(self) {
  251. return self.resumeToken || self.options.resumeAfter;
  252. }
  253. function getStartAtOperationTime(self) {
  254. const isMaster = self.topology.lastIsMaster() || {};
  255. return (
  256. isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime
  257. );
  258. }
  259. var buildChangeStreamAggregationCommand = function(self) {
  260. const topology = self.topology;
  261. const namespace = self.namespace;
  262. const pipeline = self.pipeline;
  263. const options = self.options;
  264. const cursorNamespace = self.cursorNamespace;
  265. var changeStreamStageOptions = {
  266. fullDocument: options.fullDocument || 'default'
  267. };
  268. const resumeToken = getResumeToken(self);
  269. const startAtOperationTime = getStartAtOperationTime(self);
  270. if (resumeToken) {
  271. changeStreamStageOptions.resumeAfter = resumeToken;
  272. }
  273. if (startAtOperationTime) {
  274. changeStreamStageOptions.startAtOperationTime = startAtOperationTime;
  275. }
  276. // Map cursor options
  277. var cursorOptions = {};
  278. cursorOptionNames.forEach(function(optionName) {
  279. if (options[optionName]) {
  280. cursorOptions[optionName] = options[optionName];
  281. }
  282. });
  283. if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  284. changeStreamStageOptions.allChangesForCluster = true;
  285. }
  286. var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }];
  287. changeStreamPipeline = changeStreamPipeline.concat(pipeline);
  288. var command = {
  289. aggregate: self.type === CHANGE_DOMAIN_TYPES.COLLECTION ? namespace.collection : 1,
  290. pipeline: changeStreamPipeline,
  291. readConcern: { level: 'majority' },
  292. cursor: {
  293. batchSize: options.batchSize || 1
  294. }
  295. };
  296. // Create and return the cursor
  297. return topology.cursor(cursorNamespace, command, cursorOptions);
  298. };
  299. // This method performs a basic server selection loop, satisfying the requirements of
  300. // ChangeStream resumability until the new SDAM layer can be used.
  301. const SELECTION_TIMEOUT = 30000;
  302. function waitForTopologyConnected(topology, options, callback) {
  303. setTimeout(() => {
  304. if (options && options.start == null) options.start = process.hrtime();
  305. const start = options.start || process.hrtime();
  306. const timeout = options.timeout || SELECTION_TIMEOUT;
  307. const readPreference = options.readPreference;
  308. if (topology.isConnected({ readPreference })) return callback(null, null);
  309. const hrElapsed = process.hrtime(start);
  310. const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
  311. if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
  312. waitForTopologyConnected(topology, options, callback);
  313. }, 3000); // this is an arbitrary wait time to allow SDAM to transition
  314. }
  315. // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
  316. function processNewChange(args) {
  317. const changeStream = args.changeStream;
  318. const error = args.error;
  319. const change = args.change;
  320. const callback = args.callback;
  321. const eventEmitter = args.eventEmitter || false;
  322. // If the changeStream is closed, then it should not process a change.
  323. if (changeStream.isClosed()) {
  324. // We do not error in the eventEmitter case.
  325. if (eventEmitter) {
  326. return;
  327. }
  328. const error = new MongoError('ChangeStream is closed');
  329. return typeof callback === 'function'
  330. ? callback(error, null)
  331. : changeStream.promiseLibrary.reject(error);
  332. }
  333. const topology = changeStream.topology;
  334. const options = changeStream.cursor.options;
  335. if (error) {
  336. if (isResumableError(error) && !changeStream.attemptingResume) {
  337. changeStream.attemptingResume = true;
  338. if (!(getResumeToken(changeStream) || getStartAtOperationTime(changeStream))) {
  339. const startAtOperationTime = changeStream.cursor.cursorState.operationTime;
  340. changeStream.options = Object.assign({ startAtOperationTime }, changeStream.options);
  341. }
  342. // stop listening to all events from old cursor
  343. ['data', 'close', 'end', 'error'].forEach(event =>
  344. changeStream.cursor.removeAllListeners(event)
  345. );
  346. // close internal cursor, ignore errors
  347. changeStream.cursor.close();
  348. // attempt recreating the cursor
  349. if (eventEmitter) {
  350. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  351. if (err) return changeStream.emit('error', err);
  352. changeStream.cursor = createChangeStreamCursor(changeStream);
  353. });
  354. return;
  355. }
  356. if (callback) {
  357. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  358. if (err) return callback(err, null);
  359. changeStream.cursor = createChangeStreamCursor(changeStream);
  360. changeStream.next(callback);
  361. });
  362. return;
  363. }
  364. return new Promise((resolve, reject) => {
  365. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  366. if (err) return reject(err);
  367. resolve();
  368. });
  369. })
  370. .then(() => (changeStream.cursor = createChangeStreamCursor(changeStream)))
  371. .then(() => changeStream.next());
  372. }
  373. if (eventEmitter) return changeStream.emit('error', error);
  374. if (typeof callback === 'function') return callback(error, null);
  375. return changeStream.promiseLibrary.reject(error);
  376. }
  377. changeStream.attemptingResume = false;
  378. // Cache the resume token if it is present. If it is not present return an error.
  379. if (!change || !change._id) {
  380. var noResumeTokenError = new Error(
  381. 'A change stream document has been received that lacks a resume token (_id).'
  382. );
  383. if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
  384. if (typeof callback === 'function') return callback(noResumeTokenError, null);
  385. return changeStream.promiseLibrary.reject(noResumeTokenError);
  386. }
  387. changeStream.resumeToken = change._id;
  388. // Return the change
  389. if (eventEmitter) return changeStream.emit('change', change);
  390. if (typeof callback === 'function') return callback(error, change);
  391. return changeStream.promiseLibrary.resolve(change);
  392. }
  393. /**
  394. * The callback format for results
  395. * @callback ChangeStream~resultCallback
  396. * @param {MongoError} error An error instance representing the error during the execution.
  397. * @param {(object|null)} result The result object if the command was executed successfully.
  398. */
  399. module.exports = ChangeStream;