join.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. // ██╗ ██████╗ ██╗███╗ ██╗
  2. // ██║██╔═══██╗██║████╗ ██║
  3. // ██║██║ ██║██║██╔██╗ ██║
  4. // ██ ██║██║ ██║██║██║╚██╗██║
  5. // ╚█████╔╝╚██████╔╝██║██║ ╚████║
  6. // ╚════╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝
  7. //
  8. module.exports = require('machine').build({
  9. friendlyName: 'Join',
  10. description: 'Support native joins on the database.',
  11. inputs: {
  12. datastore: {
  13. description: 'The datastore to use for connections.',
  14. extendedDescription: 'Datastores represent the config and manager required to obtain an active database connection.',
  15. required: true,
  16. readOnly: true,
  17. example: '==='
  18. },
  19. models: {
  20. description: 'An object containing all of the model definitions that have been registered.',
  21. required: true,
  22. example: '==='
  23. },
  24. query: {
  25. description: 'A normalized Waterline Stage Three Query.',
  26. required: true,
  27. example: '==='
  28. }
  29. },
  30. exits: {
  31. success: {
  32. description: 'The query was run successfully.',
  33. outputType: 'ref'
  34. },
  35. badConnection: {
  36. friendlyName: 'Bad connection',
  37. description: 'A connection either could not be obtained or there was an error using the connection.'
  38. }
  39. },
  40. fn: function join(inputs, exits) {
  41. var _ = require('@sailshq/lodash');
  42. var async = require('async');
  43. var WLUtils = require('waterline-utils');
  44. var Helpers = require('./private');
  45. var meta = _.has(inputs.query, 'meta') ? inputs.query.meta : {};
  46. // Set a flag if a leased connection from outside the adapter was used or not.
  47. var leased = _.has(meta, 'leasedConnection');
  48. // ╔═╗╦╔╗╔╔╦╗ ┌┬┐┌─┐┌┐ ┬ ┌─┐ ┌─┐┬─┐┬┌┬┐┌─┐┬─┐┬ ┬ ┬┌─┌─┐┬ ┬
  49. // ╠╣ ║║║║ ║║ │ ├─┤├┴┐│ ├┤ ├─┘├┬┘││││├─┤├┬┘└┬┘ ├┴┐├┤ └┬┘
  50. // ╚ ╩╝╚╝═╩╝ ┴ ┴ ┴└─┘┴─┘└─┘ ┴ ┴└─┴┴ ┴┴ ┴┴└─ ┴ ┴ ┴└─┘ ┴
  51. // Find the model definition
  52. var model = inputs.models[inputs.query.using];
  53. if (!model) {
  54. return exits.invalidDatastore();
  55. }
  56. // Grab the primary key attribute for the main table name
  57. var primaryKeyAttr = model.primaryKey;
  58. var primaryKeyColumnName = model.definition[primaryKeyAttr].columnName || primaryKeyAttr;
  59. // Build a fake ORM and process the records.
  60. var orm = {
  61. collections: inputs.models
  62. };
  63. // ╔╗ ╦ ╦╦╦ ╔╦╗ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐┌─┐
  64. // ╠╩╗║ ║║║ ║║ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │ └─┐
  65. // ╚═╝╚═╝╩╩═╝═╩╝ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴ └─┘
  66. // Attempt to build up the statements necessary for the query.
  67. var statements;
  68. try {
  69. statements = WLUtils.joins.convertJoinCriteria({
  70. query: inputs.query,
  71. getPk: function getPk(tableName) {
  72. var model = inputs.models[tableName];
  73. if (!model) {
  74. throw new Error('Invalid parent table name used when caching query results. Perhaps the join criteria is invalid?');
  75. }
  76. var pkAttrName = model.primaryKey;
  77. var pkColumnName = model.definition[pkAttrName].columnName || pkAttrName;
  78. return pkColumnName;
  79. }
  80. });
  81. } catch (e) {
  82. return exits.error(e);
  83. }
  84. // ╔═╗╔═╗╔╗╔╦ ╦╔═╗╦═╗╔╦╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐
  85. // ║ ║ ║║║║╚╗╔╝║╣ ╠╦╝ ║ ├─┘├─┤├┬┘├┤ │││ │
  86. // ╚═╝╚═╝╝╚╝ ╚╝ ╚═╝╩╚═ ╩ ┴ ┴ ┴┴└─└─┘┘└┘ ┴
  87. // ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐
  88. // └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │
  89. // └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴
  90. // Convert the parent statement into a native query. If the query can be run
  91. // in a single query then this will be the only query that runs.
  92. var compiledQuery;
  93. try {
  94. compiledQuery = Helpers.query.compileStatement(statements.parentStatement);
  95. } catch (e) {
  96. return exits.error(e);
  97. }
  98. // ╔═╗╔═╗╔═╗╦ ╦╔╗╔ ┌─┐┌─┐┌┐┌┌┐┌┌─┐┌─┐┌┬┐┬┌─┐┌┐┌
  99. // ╚═╗╠═╝╠═╣║║║║║║ │ │ │││││││├┤ │ │ ││ ││││
  100. // ╚═╝╩ ╩ ╩╚╩╝╝╚╝ └─┘└─┘┘└┘┘└┘└─┘└─┘ ┴ ┴└─┘┘└┘
  101. // ┌─┐┬─┐ ┬ ┬┌─┐┌─┐ ┬ ┌─┐┌─┐┌─┐┌─┐┌┬┐ ┌─┐┌─┐┌┐┌┌┐┌┌─┐┌─┐┌┬┐┬┌─┐┌┐┌
  102. // │ │├┬┘ │ │└─┐├┤ │ ├┤ ├─┤└─┐├┤ ││ │ │ │││││││├┤ │ │ ││ ││││
  103. // └─┘┴└─ └─┘└─┘└─┘ ┴─┘└─┘┴ ┴└─┘└─┘─┴┘ └─┘└─┘┘└┘┘└┘└─┘└─┘ ┴ ┴└─┘┘└┘
  104. // Spawn a new connection for running queries on.
  105. Helpers.connection.spawnOrLeaseConnection(inputs.datastore, meta, function spawnCb(err, connection) {
  106. if (err) {
  107. return exits.error(err);
  108. }
  109. // ╦═╗╦ ╦╔╗╔ ┌┬┐┬ ┬┌─┐ ┌┐┌┌─┐┌┬┐┬┬ ┬┌─┐ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  110. // ╠╦╝║ ║║║║ │ ├─┤├┤ │││├─┤ │ │└┐┌┘├┤ │─┼┐│ │├┤ ├┬┘└┬┘
  111. // ╩╚═╚═╝╝╚╝ ┴ ┴ ┴└─┘ ┘└┘┴ ┴ ┴ ┴ └┘ └─┘ └─┘└└─┘└─┘┴└─ ┴
  112. Helpers.query.runNativeQuery(connection, compiledQuery.nativeQuery, compiledQuery.valuesToEscape, compiledQuery.meta, function parentQueryCb(err, parentResults) {
  113. if (err) {
  114. // Release the connection on error
  115. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
  116. return exits.error(err);
  117. });
  118. return;
  119. }
  120. // If there weren't any joins being performed or no parent records were
  121. // returned, release the connection and return the results.
  122. if (!_.has(inputs.query, 'joins') || !parentResults.length) {
  123. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb(err) {
  124. if (err) {
  125. return exits.error(err);
  126. }
  127. return exits.success(parentResults);
  128. });
  129. return;
  130. }
  131. // ╔═╗╦╔╗╔╔╦╗ ┌─┐┬ ┬┬┬ ┌┬┐┬─┐┌─┐┌┐┌ ┬─┐┌─┐┌─┐┌─┐┬─┐┌┬┐┌─┐
  132. // ╠╣ ║║║║ ║║ │ ├─┤││ ││├┬┘├┤ │││ ├┬┘├┤ │ │ │├┬┘ ││└─┐
  133. // ╚ ╩╝╚╝═╩╝ └─┘┴ ┴┴┴─┘─┴┘┴└─└─┘┘└┘ ┴└─└─┘└─┘└─┘┴└──┴┘└─┘
  134. // If there was a join that was either performed or still needs to be
  135. // performed, look into the results for any children records that may
  136. // have been joined and splt them out from the parent.
  137. var sortedResults;
  138. try {
  139. sortedResults = WLUtils.joins.detectChildrenRecords(primaryKeyColumnName, parentResults);
  140. } catch (e) {
  141. // Release the connection if there was an error.
  142. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
  143. return exits.error(e);
  144. });
  145. return;
  146. }
  147. // ╦╔╗╔╦╔╦╗╦╔═╗╦ ╦╔═╗╔═╗ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬ ┌─┐┌─┐┌─┐┬ ┬┌─┐
  148. // ║║║║║ ║ ║╠═╣║ ║╔═╝║╣ │─┼┐│ │├┤ ├┬┘└┬┘ │ ├─┤│ ├─┤├┤
  149. // ╩╝╚╝╩ ╩ ╩╩ ╩╩═╝╩╚═╝╚═╝ └─┘└└─┘└─┘┴└─ ┴ └─┘┴ ┴└─┘┴ ┴└─┘
  150. var queryCache;
  151. try {
  152. queryCache = Helpers.query.initializeQueryCache({
  153. instructions: statements.instructions,
  154. models: inputs.models,
  155. sortedResults: sortedResults
  156. });
  157. } catch (e) {
  158. // Release the connection if there was an error.
  159. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
  160. return exits.error(e);
  161. });
  162. return;
  163. }
  164. // ╔═╗╔╦╗╔═╗╦═╗╔═╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐┌─┐
  165. // ╚═╗ ║ ║ ║╠╦╝║╣ ├─┘├─┤├┬┘├┤ │││ │ └─┐
  166. // ╚═╝ ╩ ╚═╝╩╚═╚═╝ ┴ ┴ ┴┴└─└─┘┘└┘ ┴ └─┘
  167. try {
  168. queryCache.setParents(sortedResults.parents);
  169. } catch (e) {
  170. // Release the connection if there was an error.
  171. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
  172. return exits.error(e);
  173. });
  174. return;
  175. }
  176. // ╔═╗╦ ╦╔═╗╔═╗╦╔═ ┌─┐┌─┐┬─┐ ┌─┐┬ ┬┬┬ ┌┬┐┬─┐┌─┐┌┐┌
  177. // ║ ╠═╣║╣ ║ ╠╩╗ ├┤ │ │├┬┘ │ ├─┤││ ││├┬┘├┤ │││
  178. // ╚═╝╩ ╩╚═╝╚═╝╩ ╩ └ └─┘┴└─ └─┘┴ ┴┴┴─┘─┴┘┴└─└─┘┘└┘
  179. // ┌─┐ ┬ ┬┌─┐┬─┐┬┌─┐┌─┐
  180. // │─┼┐│ │├┤ ├┬┘│├┤ └─┐
  181. // └─┘└└─┘└─┘┴└─┴└─┘└─┘
  182. // Now that all the parents are found, check if there are any child
  183. // statements that need to be processed. If not, close the connection and
  184. // return the combined results.
  185. if (!statements.childStatements || !statements.childStatements.length) {
  186. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb(err) {
  187. if (err) {
  188. return exits.error(err);
  189. }
  190. // Combine records in the cache to form nested results
  191. var combinedResults;
  192. try {
  193. combinedResults = queryCache.combineRecords();
  194. } catch (e) {
  195. return exits.error(e);
  196. }
  197. // Process each record to normalize output
  198. try {
  199. Helpers.query.processEachRecord({
  200. records: combinedResults,
  201. identity: model.identity,
  202. orm: orm
  203. });
  204. } catch (e) {
  205. return exits.error(e);
  206. }
  207. // Return the combined results
  208. exits.success(combinedResults);
  209. });
  210. return;
  211. }
  212. // ╔═╗╔═╗╦ ╦ ╔═╗╔═╗╔╦╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐
  213. // ║ ║ ║║ ║ ║╣ ║ ║ ├─┘├─┤├┬┘├┤ │││ │
  214. // ╚═╝╚═╝╩═╝╩═╝╚═╝╚═╝ ╩ ┴ ┴ ┴┴└─└─┘┘└┘ ┴
  215. // ┬─┐┌─┐┌─┐┌─┐┬─┐┌┬┐┌─┐
  216. // ├┬┘├┤ │ │ │├┬┘ ││└─┐
  217. // ┴└─└─┘└─┘└─┘┴└──┴┘└─┘
  218. // There is more work to be done now. Go through the parent records and
  219. // build up an array of the primary keys.
  220. var parentKeys = _.map(queryCache.getParents(), function pluckPk(record) {
  221. return record[primaryKeyColumnName];
  222. });
  223. // ╔═╗╦═╗╔═╗╔═╗╔═╗╔═╗╔═╗ ┌─┐┬ ┬┬┬ ┌┬┐ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐┌─┐
  224. // ╠═╝╠╦╝║ ║║ ║╣ ╚═╗╚═╗ │ ├─┤││ ││ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │ └─┐
  225. // ╩ ╩╚═╚═╝╚═╝╚═╝╚═╝╚═╝ └─┘┴ ┴┴┴─┘─┴┘ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴ └─┘
  226. // For each child statement, figure out how to turn the statement into
  227. // a native query and then run it. Add the results to the query cache.
  228. async.each(statements.childStatements, function processChildStatements(template, next) {
  229. // ╦═╗╔═╗╔╗╔╔╦╗╔═╗╦═╗ ┬┌┐┌ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  230. // ╠╦╝║╣ ║║║ ║║║╣ ╠╦╝ ││││ │─┼┐│ │├┤ ├┬┘└┬┘
  231. // ╩╚═╚═╝╝╚╝═╩╝╚═╝╩╚═ ┴┘└┘ └─┘└└─┘└─┘┴└─ ┴
  232. // ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
  233. // │ ├┤ │││├─┘│ ├─┤ │ ├┤
  234. // ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
  235. // If the statement is an IN query, replace the values with the parent
  236. // keys.
  237. if (template.queryType === 'in') {
  238. // Pull the last AND clause out - it's the one we added
  239. var inClause = _.pullAt(template.statement.where.and, template.statement.where.and.length - 1);
  240. // Grab the object inside the array that comes back
  241. inClause = _.first(inClause);
  242. // Modify the inClause using the actual parent key values
  243. _.each(inClause, function modifyInClause(val) {
  244. val.in = parentKeys;
  245. });
  246. // Reset the statement
  247. template.statement.where.and.push(inClause);
  248. }
  249. // ╦═╗╔═╗╔╗╔╔╦╗╔═╗╦═╗ ┬ ┬┌┐┌┬┌─┐┌┐┌ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  250. // ╠╦╝║╣ ║║║ ║║║╣ ╠╦╝ │ ││││││ ││││ │─┼┐│ │├┤ ├┬┘└┬┘
  251. // ╩╚═╚═╝╝╚╝═╩╝╚═╝╩╚═ └─┘┘└┘┴└─┘┘└┘ └─┘└└─┘└─┘┴└─ ┴
  252. // ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
  253. // │ ├┤ │││├─┘│ ├─┤ │ ├┤
  254. // ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
  255. // If the statement is a UNION type, loop through each parent key and
  256. // build up a proper query.
  257. if (template.queryType === 'union') {
  258. var unionStatements = [];
  259. // Build up an array of generated statements
  260. _.each(parentKeys, function buildUnion(parentPk) {
  261. var unionStatement = _.merge({}, template.statement);
  262. // Replace the placeholder `?` values with the primary key of the
  263. // parent record.
  264. var andClause = _.pullAt(unionStatement.where.and, unionStatement.where.and.length - 1);
  265. _.each(_.first(andClause), function replaceValue(val, key) {
  266. _.first(andClause)[key] = parentPk;
  267. });
  268. // Add the UNION statement to the array of other statements
  269. unionStatement.where.and.push(_.first(andClause));
  270. unionStatements.push(unionStatement);
  271. });
  272. // Replace the final statement with the UNION ALL clause
  273. if (unionStatements.length) {
  274. template.statement = { unionAll: unionStatements };
  275. }
  276. }
  277. // If there isn't a statement to be run, then just return
  278. if (!template.statement) {
  279. return next();
  280. }
  281. // ╔═╗╔═╗╔╦╗╔═╗╦╦ ╔═╗ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐
  282. // ║ ║ ║║║║╠═╝║║ ║╣ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │
  283. // ╚═╝╚═╝╩ ╩╩ ╩╩═╝╚═╝ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴
  284. // Attempt to convert the statement into a native query
  285. var compiledQuery;
  286. try {
  287. compiledQuery = Helpers.query.compileStatement(template.statement);
  288. } catch (e) {
  289. return next(e);
  290. }
  291. // ╦═╗╦ ╦╔╗╔ ┌─┐┬ ┬┬┬ ┌┬┐ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
  292. // ╠╦╝║ ║║║║ │ ├─┤││ ││ │─┼┐│ │├┤ ├┬┘└┬┘
  293. // ╩╚═╚═╝╝╚╝ └─┘┴ ┴┴┴─┘─┴┘ └─┘└└─┘└─┘┴└─ ┴
  294. // Run the native query
  295. Helpers.query.runNativeQuery(connection, compiledQuery.nativeQuery, compiledQuery.valuesToEscape, compiledQuery.meta, function parentQueryCb(err, queryResults) {
  296. if (err) {
  297. return next(err);
  298. }
  299. // Extend the values in the cache to include the values from the
  300. // child query.
  301. queryCache.extend(queryResults, template.instructions);
  302. return next();
  303. });
  304. },
  305. function asyncEachCb(err) {
  306. // Always release the connection unless a leased connection from outside
  307. // the adapter was used.
  308. Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
  309. if (err) {
  310. return exits.error(err);
  311. }
  312. // Combine records in the cache to form nested results
  313. var combinedResults = queryCache.combineRecords();
  314. // Process each record to normalize output
  315. try {
  316. Helpers.query.processEachRecord({
  317. records: combinedResults,
  318. identity: model.identity,
  319. orm: orm
  320. });
  321. } catch (e) {
  322. return exits.error(e);
  323. }
  324. // Return the combined results
  325. return exits.success(combinedResults);
  326. }); // </ releaseConnection >
  327. }); // </ asyncEachCb >
  328. }); // </ runNativeQuery >
  329. }); // </ spawnConnection >
  330. }
  331. });