castBulkWrite.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. 'use strict';
  2. const applyTimestampsToChildren = require('../update/applyTimestampsToChildren');
  3. const applyTimestampsToUpdate = require('../update/applyTimestampsToUpdate');
  4. const cast = require('../../cast');
  5. const castUpdate = require('../query/castUpdate');
  6. const setDefaultsOnInsert = require('../setDefaultsOnInsert');
  7. /*!
  8. * Given a model and a bulkWrite op, return a thunk that handles casting and
  9. * validating the individual op.
  10. */
  11. module.exports = function castBulkWrite(model, op) {
  12. const now = model.base.now();
  13. if (op['insertOne']) {
  14. return (callback) => {
  15. const doc = new model(op['insertOne']['document']);
  16. if (model.schema.options.timestamps != null) {
  17. doc.initializeTimestamps();
  18. }
  19. op['insertOne']['document'] = doc;
  20. op['insertOne']['document'].validate({ __noPromise: true }, function(error) {
  21. if (error) {
  22. return callback(error, null);
  23. }
  24. callback(null);
  25. });
  26. };
  27. } else if (op['updateOne']) {
  28. op = op['updateOne'];
  29. return (callback) => {
  30. try {
  31. op['filter'] = cast(model.schema, op['filter']);
  32. op['update'] = castUpdate(model.schema, op['update'], {
  33. strict: model.schema.options.strict,
  34. overwrite: false
  35. });
  36. if (op.setDefaultsOnInsert) {
  37. setDefaultsOnInsert(op['filter'], model.schema, op['update'], {
  38. setDefaultsOnInsert: true,
  39. upsert: op.upsert
  40. });
  41. }
  42. if (model.schema.$timestamps != null) {
  43. const createdAt = model.schema.$timestamps.createdAt;
  44. const updatedAt = model.schema.$timestamps.updatedAt;
  45. applyTimestampsToUpdate(now, createdAt, updatedAt, op['update'], {});
  46. }
  47. applyTimestampsToChildren(now, op['update'], model.schema);
  48. } catch (error) {
  49. return callback(error, null);
  50. }
  51. callback(null);
  52. };
  53. } else if (op['updateMany']) {
  54. op = op['updateMany'];
  55. return (callback) => {
  56. try {
  57. op['filter'] = cast(model.schema, op['filter']);
  58. op['update'] = castUpdate(model.schema, op['update'], {
  59. strict: model.schema.options.strict,
  60. overwrite: false
  61. });
  62. if (op.setDefaultsOnInsert) {
  63. setDefaultsOnInsert(op['filter'], model.schema, op['update'], {
  64. setDefaultsOnInsert: true,
  65. upsert: op.upsert
  66. });
  67. }
  68. if (model.schema.$timestamps != null) {
  69. const createdAt = model.schema.$timestamps.createdAt;
  70. const updatedAt = model.schema.$timestamps.updatedAt;
  71. applyTimestampsToUpdate(now, createdAt, updatedAt, op['update'], {});
  72. }
  73. applyTimestampsToChildren(now, op['update'], model.schema);
  74. } catch (error) {
  75. return callback(error, null);
  76. }
  77. callback(null);
  78. };
  79. } else if (op['replaceOne']) {
  80. return (callback) => {
  81. try {
  82. op['replaceOne']['filter'] = cast(model.schema,
  83. op['replaceOne']['filter']);
  84. } catch (error) {
  85. return callback(error, null);
  86. }
  87. // set `skipId`, otherwise we get "_id field cannot be changed"
  88. const doc = new model(op['replaceOne']['replacement'], null, true);
  89. if (model.schema.options.timestamps != null) {
  90. doc.initializeTimestamps();
  91. }
  92. op['replaceOne']['replacement'] = doc;
  93. op['replaceOne']['replacement'].validate({ __noPromise: true }, function(error) {
  94. if (error) {
  95. return callback(error, null);
  96. }
  97. callback(null);
  98. });
  99. };
  100. } else if (op['deleteOne']) {
  101. return (callback) => {
  102. try {
  103. op['deleteOne']['filter'] = cast(model.schema,
  104. op['deleteOne']['filter']);
  105. } catch (error) {
  106. return callback(error, null);
  107. }
  108. callback(null);
  109. };
  110. } else if (op['deleteMany']) {
  111. return (callback) => {
  112. try {
  113. op['deleteMany']['filter'] = cast(model.schema,
  114. op['deleteMany']['filter']);
  115. } catch (error) {
  116. return callback(error, null);
  117. }
  118. callback(null);
  119. };
  120. } else {
  121. return (callback) => {
  122. callback(new Error('Invalid op passed to `bulkWrite()`'), null);
  123. };
  124. }
  125. };