ordered.js 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. 'use strict';
  2. const common = require('./common');
  3. const BulkOperationBase = common.BulkOperationBase;
  4. const utils = require('../utils');
  5. const toError = utils.toError;
  6. const handleCallback = utils.handleCallback;
  7. const BulkWriteResult = common.BulkWriteResult;
  8. const Batch = common.Batch;
  9. const mergeBatchResults = common.mergeBatchResults;
  10. const executeOperation = utils.executeOperation;
  11. const MongoWriteConcernError = require('mongodb-core').MongoWriteConcernError;
  12. const handleMongoWriteConcernError = require('./common').handleMongoWriteConcernError;
  13. const bson = common.bson;
  14. const isPromiseLike = require('../utils').isPromiseLike;
  15. /**
  16. * Add to internal list of Operations
  17. *
  18. * @param {OrderedBulkOperation} bulkOperation
  19. * @param {number} docType number indicating the document type
  20. * @param {object} document
  21. * @return {OrderedBulkOperation}
  22. */
  23. function addToOperationsList(bulkOperation, docType, document) {
  24. // Get the bsonSize
  25. const bsonSize = bson.calculateObjectSize(document, {
  26. checkKeys: false
  27. });
  28. // Throw error if the doc is bigger than the max BSON size
  29. if (bsonSize >= bulkOperation.s.maxBatchSizeBytes)
  30. throw toError('document is larger than the maximum size ' + bulkOperation.s.maxBatchSizeBytes);
  31. // Create a new batch object if we don't have a current one
  32. if (bulkOperation.s.currentBatch == null)
  33. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  34. const maxKeySize = bulkOperation.s.maxKeySize;
  35. // Check if we need to create a new batch
  36. if (
  37. bulkOperation.s.currentBatchSize + 1 >= bulkOperation.s.maxWriteBatchSize ||
  38. bulkOperation.s.currentBatchSizeBytes + maxKeySize + bsonSize >=
  39. bulkOperation.s.maxBatchSizeBytes ||
  40. bulkOperation.s.currentBatch.batchType !== docType
  41. ) {
  42. // Save the batch to the execution stack
  43. bulkOperation.s.batches.push(bulkOperation.s.currentBatch);
  44. // Create a new batch
  45. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  46. // Reset the current size trackers
  47. bulkOperation.s.currentBatchSize = 0;
  48. bulkOperation.s.currentBatchSizeBytes = 0;
  49. }
  50. if (docType === common.INSERT) {
  51. bulkOperation.s.bulkResult.insertedIds.push({
  52. index: bulkOperation.s.currentIndex,
  53. _id: document._id
  54. });
  55. }
  56. // We have an array of documents
  57. if (Array.isArray(document)) {
  58. throw toError('operation passed in cannot be an Array');
  59. }
  60. bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
  61. bulkOperation.s.currentBatch.operations.push(document);
  62. bulkOperation.s.currentBatchSize += 1;
  63. bulkOperation.s.currentBatchSizeBytes += maxKeySize + bsonSize;
  64. bulkOperation.s.currentIndex += 1;
  65. // Return bulkOperation
  66. return bulkOperation;
  67. }
  68. /**
  69. * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  70. * @class
  71. * @property {number} length Get the number of operations in the bulk.
  72. * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
  73. */
  74. class OrderedBulkOperation extends BulkOperationBase {
  75. constructor(topology, collection, options) {
  76. options = options || {};
  77. options = Object.assign(options, { addToOperationsList });
  78. super(topology, collection, options, true);
  79. }
  80. /**
  81. * The callback format for results
  82. * @callback OrderedBulkOperation~resultCallback
  83. * @param {MongoError} error An error instance representing the error during the execution.
  84. * @param {BulkWriteResult} result The bulk write result.
  85. */
  86. /**
  87. * Execute the ordered bulk operation
  88. *
  89. * @method
  90. * @param {object} [options] Optional settings.
  91. * @param {(number|string)} [options.w] The write concern.
  92. * @param {number} [options.wtimeout] The write concern timeout.
  93. * @param {boolean} [options.j=false] Specify a journal write concern.
  94. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  95. * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
  96. * @throws {MongoError}
  97. * @return {Promise} returns Promise if no callback passed
  98. */
  99. execute(_writeConcern, options, callback) {
  100. const ret = this.bulkExecute(_writeConcern, options, callback);
  101. if (isPromiseLike(ret)) {
  102. return ret;
  103. }
  104. options = ret.options;
  105. callback = ret.callback;
  106. return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
  107. }
  108. }
  109. /**
  110. * Execute next write command in a chain
  111. *
  112. * @param {OrderedBulkOperation} bulkOperation
  113. * @param {object} options
  114. * @param {function} callback
  115. */
  116. function executeCommands(bulkOperation, options, callback) {
  117. if (bulkOperation.s.batches.length === 0) {
  118. return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
  119. }
  120. // Ordered execution of the command
  121. const batch = bulkOperation.s.batches.shift();
  122. function resultHandler(err, result) {
  123. // Error is a driver related error not a bulk op error, terminate
  124. if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
  125. return handleCallback(callback, err);
  126. }
  127. // If we have and error
  128. if (err) err.ok = 0;
  129. if (err instanceof MongoWriteConcernError) {
  130. return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback);
  131. }
  132. // Merge the results together
  133. const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
  134. const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result);
  135. if (mergeResult != null) {
  136. return handleCallback(callback, null, writeResult);
  137. }
  138. if (bulkOperation.handleWriteError(callback, writeResult)) return;
  139. // Execute the next command in line
  140. executeCommands(bulkOperation, options, callback);
  141. }
  142. bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
  143. }
  144. /**
  145. * Returns an unordered batch object
  146. * @ignore
  147. */
  148. function initializeOrderedBulkOp(topology, collection, options) {
  149. return new OrderedBulkOperation(topology, collection, options);
  150. }
  151. initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
  152. module.exports = initializeOrderedBulkOp;
  153. module.exports.Bulk = OrderedBulkOperation;