create-each.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // ██████╗██████╗ ███████╗ █████╗ ████████╗███████╗ ███████╗ █████╗ ██████╗██╗ ██╗
  2. // ██╔════╝██╔══██╗██╔════╝██╔══██╗╚══██╔══╝██╔════╝ ██╔════╝██╔══██╗██╔════╝██║ ██║
  3. // ██║ ██████╔╝█████╗ ███████║ ██║ █████╗ █████╗ ███████║██║ ███████║
  4. // ██║ ██╔══██╗██╔══╝ ██╔══██║ ██║ ██╔══╝ ██╔══╝ ██╔══██║██║ ██╔══██║
  5. // ╚██████╗██║ ██║███████╗██║ ██║ ██║ ███████╗ ███████╗██║ ██║╚██████╗██║ ██║
  6. // ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚══════╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝
  7. //
  8. // Run creates in order and return the records. This is needed because MySQL
  9. // lacks the ability to return multiple insert id's from a bulk insert.
  10. //
  11. // So when a createEach call from Waterline is made with the `fetch: true` flag
  12. // turned on, the records must be inserted one by one in order to return the
  13. // correct primary keys.
  14. var _ = require('@sailshq/lodash');
  15. var async = require('async');
  16. var compileStatement = require('./compile-statement');
  17. var runQuery = require('./run-query');
  18. module.exports = function createEach(options, cb) {
  19. // ╦ ╦╔═╗╦ ╦╔╦╗╔═╗╔╦╗╔═╗ ┌─┐┌─┐┌┬┐┬┌─┐┌┐┌┌─┐
  20. // ╚╗╔╝╠═╣║ ║ ║║╠═╣ ║ ║╣ │ │├─┘ │ ││ ││││└─┐
  21. // ╚╝ ╩ ╩╩═╝╩═╩╝╩ ╩ ╩ ╚═╝ └─┘┴ ┴ ┴└─┘┘└┘└─┘
  22. if (_.isUndefined(options) || !_.isPlainObject(options)) {
  23. throw new Error('Invalid options argument. Options must contain: connection, statement, fetch, and primaryKey.');
  24. }
  25. if (!_.has(options, 'connection') || !_.isObject(options.connection)) {
  26. throw new Error('Invalid option used in options argument. Missing or invalid connection.');
  27. }
  28. if (!_.has(options, 'statement') || !_.isPlainObject(options.statement)) {
  29. throw new Error('Invalid option used in options argument. Missing or invalid statement.');
  30. }
  31. if (!_.has(options, 'fetch') || !_.isBoolean(options.fetch)) {
  32. throw new Error('Invalid option used in options argument. Missing or invalid fetch flag.');
  33. }
  34. if (!_.has(options, 'primaryKey') || !_.isString(options.primaryKey)) {
  35. throw new Error('Invalid option used in options argument. Missing or invalid primaryKey flag.');
  36. }
  37. // ███╗ ██╗ ██████╗ ███╗ ██╗ ███████╗███████╗████████╗ ██████╗██╗ ██╗
  38. // ████╗ ██║██╔═══██╗████╗ ██║ ██╔════╝██╔════╝╚══██╔══╝██╔════╝██║ ██║
  39. // ██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗█████╗ █████╗ ██║ ██║ ███████║
  40. // ██║╚██╗██║██║ ██║██║╚██╗██║╚════╝██╔══╝ ██╔══╝ ██║ ██║ ██╔══██║
  41. // ██║ ╚████║╚██████╔╝██║ ╚████║ ██║ ███████╗ ██║ ╚██████╗██║ ██║
  42. // ╚═╝ ╚═══╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝ ╚══════╝ ╚═╝ ╚═════╝╚═╝ ╚═╝
  43. //
  44. // ██████╗██████╗ ███████╗ █████╗ ████████╗███████╗
  45. // ██╔════╝██╔══██╗██╔════╝██╔══██╗╚══██╔══╝██╔════╝
  46. // ██║ ██████╔╝█████╗ ███████║ ██║ █████╗
  47. // ██║ ██╔══██╗██╔══╝ ██╔══██║ ██║ ██╔══╝
  48. // ╚██████╗██║ ██║███████╗██║ ██║ ██║ ███████╗
  49. // ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═╝ ╚══════╝
  50. //
  51. // If the fetch flag was used, then the statement will need to be broken up
  52. // into a series of async queries. Otherwise just run a bulk insert.
  53. if (!options.fetch) {
  54. // ╔═╗╔═╗╔╦╗╔═╗╦╦ ╔═╗ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  55. // ║ ║ ║║║║╠═╝║║ ║╣ │─┼┐│ │├┤ ├┬┘└┬┘
  56. // ╚═╝╚═╝╩ ╩╩ ╩╩═╝╚═╝ └─┘└└─┘└─┘┴└─ ┴
  57. // Compile the statement into a native query.
  58. var compiledQuery;
  59. try {
  60. compiledQuery = compileStatement(options.statement, options.meta);
  61. } catch (e) {
  62. // If the statement could not be compiled, return an error.
  63. return cb(e);
  64. }
  65. // ╦═╗╦ ╦╔╗╔ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  66. // ╠╦╝║ ║║║║ │─┼┐│ │├┤ ├┬┘└┬┘
  67. // ╩╚═╚═╝╝╚╝ └─┘└└─┘└─┘┴└─ ┴
  68. // Run the initial query (bulk insert)
  69. runQuery({
  70. connection: options.connection,
  71. nativeQuery: compiledQuery.nativeQuery,
  72. valuesToEscape: compiledQuery.valuesToEscape,
  73. meta: compiledQuery.meta,
  74. disconnectOnError: false,
  75. queryType: 'insert'
  76. },
  77. function runQueryCb(err, report) {
  78. if (err) {
  79. return cb(err);
  80. }
  81. return cb(undefined, report.result);
  82. });
  83. // Return early
  84. return;
  85. }
  86. // ███████╗███████╗████████╗ ██████╗██╗ ██╗ ██████╗██████╗ ███████╗ █████╗ ████████╗███████╗
  87. // ██╔════╝██╔════╝╚══██╔══╝██╔════╝██║ ██║ ██╔════╝██╔══██╗██╔════╝██╔══██╗╚══██╔══╝██╔════╝
  88. // █████╗ █████╗ ██║ ██║ ███████║ ██║ ██████╔╝█████╗ ███████║ ██║ █████╗
  89. // ██╔══╝ ██╔══╝ ██║ ██║ ██╔══██║ ██║ ██╔══██╗██╔══╝ ██╔══██║ ██║ ██╔══╝
  90. // ██║ ███████╗ ██║ ╚██████╗██║ ██║ ╚██████╗██║ ██║███████╗██║ ██║ ██║ ███████╗
  91. // ╚═╝ ╚══════╝ ╚═╝ ╚═════╝╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═╝ ╚══════╝
  92. //
  93. // Break apart the statement's insert records and create a single create query
  94. // for each one. Collect the result of the insertId's to be returned.
  95. var newRecords = options.statement.insert;
  96. var insertIds = [];
  97. // Be sure to run these in series so that the insert order is maintained.
  98. async.eachSeries(newRecords, function runCreateQuery(record, nextRecord) {
  99. // Build up a statement to use.
  100. var statement = {
  101. insert: record,
  102. into: options.statement.into
  103. };
  104. // ╔═╗╔═╗╔╦╗╔═╗╦╦ ╔═╗ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  105. // ║ ║ ║║║║╠═╝║║ ║╣ │─┼┐│ │├┤ ├┬┘└┬┘
  106. // ╚═╝╚═╝╩ ╩╩ ╩╩═╝╚═╝ └─┘└└─┘└─┘┴└─ ┴
  107. // Compile the statement into a native query.
  108. var compiledQuery;
  109. try {
  110. compiledQuery = compileStatement(statement);
  111. } catch (e) {
  112. // If the statement could not be compiled, return an error.
  113. return nextRecord(e);
  114. }
  115. var insertOptions = {
  116. connection: options.connection,
  117. nativeQuery: compiledQuery.nativeQuery,
  118. valuesToEscape: compiledQuery.valuesToEscape,
  119. meta: compiledQuery.meta,
  120. disconnectOnError: false,
  121. queryType: 'insert'
  122. };
  123. // Determine if a custom primary key value was used. If so pass it down so that
  124. // the report can be used correctly. MySQL doesn't return these values.
  125. if (statement.insert[options.primaryKey]) {
  126. insertOptions.customPrimaryKey = statement.insert[options.primaryKey];
  127. }
  128. // ╦═╗╦ ╦╔╗╔ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  129. // ╠╦╝║ ║║║║ │─┼┐│ │├┤ ├┬┘└┬┘
  130. // ╩╚═╚═╝╝╚╝ └─┘└└─┘└─┘┴└─ ┴
  131. // Run the initial query (bulk insert)
  132. runQuery(insertOptions, function runQueryCb(err, report) {
  133. if (err) {
  134. return nextRecord(err);
  135. }
  136. // Add the insert id to the array
  137. insertIds.push(report.result.inserted);
  138. return nextRecord(undefined, report.result);
  139. });
  140. },
  141. function fetchCreateCb(err) {
  142. if (err) {
  143. return cb(err);
  144. }
  145. // ╔═╗╔═╗╦═╗╔═╗╔═╗╦═╗╔╦╗ ┌┬┐┬ ┬┌─┐ ┌─┐┌─┐┌┬┐┌─┐┬ ┬
  146. // ╠═╝║╣ ╠╦╝╠╣ ║ ║╠╦╝║║║ │ ├─┤├┤ ├┤ ├┤ │ │ ├─┤
  147. // ╩ ╚═╝╩╚═╚ ╚═╝╩╚═╩ ╩ ┴ ┴ ┴└─┘ └ └─┘ ┴ └─┘┴ ┴
  148. var fetchStatement = {
  149. select: '*',
  150. from: options.statement.into,
  151. where: {},
  152. orderBy: [{}]
  153. };
  154. // Sort the records by primary key
  155. fetchStatement.orderBy[0][options.primaryKey] = 'ASC';
  156. // Build up the WHERE clause for the statement to get the newly inserted
  157. // records.
  158. fetchStatement.where[options.primaryKey] = { 'in': insertIds };
  159. // ╔═╗╔═╗╔╦╗╔═╗╦╦ ╔═╗ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  160. // ║ ║ ║║║║╠═╝║║ ║╣ │─┼┐│ │├┤ ├┬┘└┬┘
  161. // ╚═╝╚═╝╩ ╩╩ ╩╩═╝╚═╝ └─┘└└─┘└─┘┴└─ ┴
  162. // Compile the statement into a native query.
  163. var compiledQuery;
  164. try {
  165. compiledQuery = compileStatement(fetchStatement);
  166. } catch (err) {
  167. // If the statement could not be compiled, return an error.
  168. return cb(err);
  169. }
  170. // ╦═╗╦ ╦╔╗╔ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  171. // ╠╦╝║ ║║║║ │─┼┐│ │├┤ ├┬┘└┬┘
  172. // ╩╚═╚═╝╝╚╝ └─┘└└─┘└─┘┴└─ ┴
  173. // Run the fetch query.
  174. runQuery({
  175. connection: options.connection,
  176. nativeQuery: compiledQuery.nativeQuery,
  177. valuesToEscape: compiledQuery.valuesToEscape,
  178. meta: compiledQuery.meta,
  179. disconnectOnError: false,
  180. queryType: 'select'
  181. }, function runQueryCb(err, report) {
  182. if (err) {
  183. return cb(err);
  184. }
  185. return cb(undefined, report.result);
  186. });
  187. });
  188. };