123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- // ██╗ ██████╗ ██╗███╗ ██╗
- // ██║██╔═══██╗██║████╗ ██║
- // ██║██║ ██║██║██╔██╗ ██║
- // ██ ██║██║ ██║██║██║╚██╗██║
- // ╚█████╔╝╚██████╔╝██║██║ ╚████║
- // ╚════╝ ╚═════╝ ╚═╝╚═╝ ╚═══╝
- //
- module.exports = require('machine').build({
- friendlyName: 'Join',
- description: 'Support native joins on the database.',
- inputs: {
- datastore: {
- description: 'The datastore to use for connections.',
- extendedDescription: 'Datastores represent the config and manager required to obtain an active database connection.',
- required: true,
- readOnly: true,
- example: '==='
- },
- models: {
- description: 'An object containing all of the model definitions that have been registered.',
- required: true,
- example: '==='
- },
- query: {
- description: 'A normalized Waterline Stage Three Query.',
- required: true,
- example: '==='
- }
- },
- exits: {
- success: {
- description: 'The query was run successfully.',
- outputType: 'ref'
- },
- badConnection: {
- friendlyName: 'Bad connection',
- description: 'A connection either could not be obtained or there was an error using the connection.'
- }
- },
- fn: function join(inputs, exits) {
- var _ = require('@sailshq/lodash');
- var async = require('async');
- var WLUtils = require('waterline-utils');
- var Helpers = require('./private');
- var meta = _.has(inputs.query, 'meta') ? inputs.query.meta : {};
- // Set a flag if a leased connection from outside the adapter was used or not.
- var leased = _.has(meta, 'leasedConnection');
- // ╔═╗╦╔╗╔╔╦╗ ┌┬┐┌─┐┌┐ ┬ ┌─┐ ┌─┐┬─┐┬┌┬┐┌─┐┬─┐┬ ┬ ┬┌─┌─┐┬ ┬
- // ╠╣ ║║║║ ║║ │ ├─┤├┴┐│ ├┤ ├─┘├┬┘││││├─┤├┬┘└┬┘ ├┴┐├┤ └┬┘
- // ╚ ╩╝╚╝═╩╝ ┴ ┴ ┴└─┘┴─┘└─┘ ┴ ┴└─┴┴ ┴┴ ┴┴└─ ┴ ┴ ┴└─┘ ┴
- // Find the model definition
- var model = inputs.models[inputs.query.using];
- if (!model) {
- return exits.invalidDatastore();
- }
- // Grab the primary key attribute for the main table name
- var primaryKeyAttr = model.primaryKey;
- var primaryKeyColumnName = model.definition[primaryKeyAttr].columnName || primaryKeyAttr;
- // Build a fake ORM and process the records.
- var orm = {
- collections: inputs.models
- };
- // ╔╗ ╦ ╦╦╦ ╔╦╗ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐┌─┐
- // ╠╩╗║ ║║║ ║║ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │ └─┐
- // ╚═╝╚═╝╩╩═╝═╩╝ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴ └─┘
- // Attempt to build up the statements necessary for the query.
- var statements;
- try {
- statements = WLUtils.joins.convertJoinCriteria({
- query: inputs.query,
- getPk: function getPk(tableName) {
- var model = inputs.models[tableName];
- if (!model) {
- throw new Error('Invalid parent table name used when caching query results. Perhaps the join criteria is invalid?');
- }
- var pkAttrName = model.primaryKey;
- var pkColumnName = model.definition[pkAttrName].columnName || pkAttrName;
- return pkColumnName;
- }
- });
- } catch (e) {
- return exits.error(e);
- }
- // ╔═╗╔═╗╔╗╔╦ ╦╔═╗╦═╗╔╦╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐
- // ║ ║ ║║║║╚╗╔╝║╣ ╠╦╝ ║ ├─┘├─┤├┬┘├┤ │││ │
- // ╚═╝╚═╝╝╚╝ ╚╝ ╚═╝╩╚═ ╩ ┴ ┴ ┴┴└─└─┘┘└┘ ┴
- // ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐
- // └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │
- // └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴
- // Convert the parent statement into a native query. If the query can be run
- // in a single query then this will be the only query that runs.
- var compiledQuery;
- try {
- compiledQuery = Helpers.query.compileStatement(statements.parentStatement);
- } catch (e) {
- return exits.error(e);
- }
- // ╔═╗╔═╗╔═╗╦ ╦╔╗╔ ┌─┐┌─┐┌┐┌┌┐┌┌─┐┌─┐┌┬┐┬┌─┐┌┐┌
- // ╚═╗╠═╝╠═╣║║║║║║ │ │ │││││││├┤ │ │ ││ ││││
- // ╚═╝╩ ╩ ╩╚╩╝╝╚╝ └─┘└─┘┘└┘┘└┘└─┘└─┘ ┴ ┴└─┘┘└┘
- // ┌─┐┬─┐ ┬ ┬┌─┐┌─┐ ┬ ┌─┐┌─┐┌─┐┌─┐┌┬┐ ┌─┐┌─┐┌┐┌┌┐┌┌─┐┌─┐┌┬┐┬┌─┐┌┐┌
- // │ │├┬┘ │ │└─┐├┤ │ ├┤ ├─┤└─┐├┤ ││ │ │ │││││││├┤ │ │ ││ ││││
- // └─┘┴└─ └─┘└─┘└─┘ ┴─┘└─┘┴ ┴└─┘└─┘─┴┘ └─┘└─┘┘└┘┘└┘└─┘└─┘ ┴ ┴└─┘┘└┘
- // Spawn a new connection for running queries on.
- Helpers.connection.spawnOrLeaseConnection(inputs.datastore, meta, function spawnCb(err, connection) {
- if (err) {
- return exits.error(err);
- }
- // ╦═╗╦ ╦╔╗╔ ┌┬┐┬ ┬┌─┐ ┌┐┌┌─┐┌┬┐┬┬ ┬┌─┐ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
- // ╠╦╝║ ║║║║ │ ├─┤├┤ │││├─┤ │ │└┐┌┘├┤ │─┼┐│ │├┤ ├┬┘└┬┘
- // ╩╚═╚═╝╝╚╝ ┴ ┴ ┴└─┘ ┘└┘┴ ┴ ┴ ┴ └┘ └─┘ └─┘└└─┘└─┘┴└─ ┴
- Helpers.query.runNativeQuery(connection, compiledQuery.nativeQuery, compiledQuery.valuesToEscape, compiledQuery.meta, function parentQueryCb(err, parentResults) {
- if (err) {
- // Release the connection on error
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
- return exits.error(err);
- });
- return;
- }
- // If there weren't any joins being performed or no parent records were
- // returned, release the connection and return the results.
- if (!_.has(inputs.query, 'joins') || !parentResults.length) {
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb(err) {
- if (err) {
- return exits.error(err);
- }
- return exits.success(parentResults);
- });
- return;
- }
- // ╔═╗╦╔╗╔╔╦╗ ┌─┐┬ ┬┬┬ ┌┬┐┬─┐┌─┐┌┐┌ ┬─┐┌─┐┌─┐┌─┐┬─┐┌┬┐┌─┐
- // ╠╣ ║║║║ ║║ │ ├─┤││ ││├┬┘├┤ │││ ├┬┘├┤ │ │ │├┬┘ ││└─┐
- // ╚ ╩╝╚╝═╩╝ └─┘┴ ┴┴┴─┘─┴┘┴└─└─┘┘└┘ ┴└─└─┘└─┘└─┘┴└──┴┘└─┘
- // If there was a join that was either performed or still needs to be
- // performed, look into the results for any children records that may
- // have been joined and splt them out from the parent.
- var sortedResults;
- try {
- sortedResults = WLUtils.joins.detectChildrenRecords(primaryKeyColumnName, parentResults);
- } catch (e) {
- // Release the connection if there was an error.
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
- return exits.error(e);
- });
- return;
- }
- // ╦╔╗╔╦╔╦╗╦╔═╗╦ ╦╔═╗╔═╗ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬ ┌─┐┌─┐┌─┐┬ ┬┌─┐
- // ║║║║║ ║ ║╠═╣║ ║╔═╝║╣ │─┼┐│ │├┤ ├┬┘└┬┘ │ ├─┤│ ├─┤├┤
- // ╩╝╚╝╩ ╩ ╩╩ ╩╩═╝╩╚═╝╚═╝ └─┘└└─┘└─┘┴└─ ┴ └─┘┴ ┴└─┘┴ ┴└─┘
- var queryCache;
- try {
- queryCache = Helpers.query.initializeQueryCache({
- instructions: statements.instructions,
- models: inputs.models,
- sortedResults: sortedResults
- });
- } catch (e) {
- // Release the connection if there was an error.
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
- return exits.error(e);
- });
- return;
- }
- // ╔═╗╔╦╗╔═╗╦═╗╔═╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐┌─┐
- // ╚═╗ ║ ║ ║╠╦╝║╣ ├─┘├─┤├┬┘├┤ │││ │ └─┐
- // ╚═╝ ╩ ╚═╝╩╚═╚═╝ ┴ ┴ ┴┴└─└─┘┘└┘ ┴ └─┘
- try {
- queryCache.setParents(sortedResults.parents);
- } catch (e) {
- // Release the connection if there was an error.
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
- return exits.error(e);
- });
- return;
- }
- // ╔═╗╦ ╦╔═╗╔═╗╦╔═ ┌─┐┌─┐┬─┐ ┌─┐┬ ┬┬┬ ┌┬┐┬─┐┌─┐┌┐┌
- // ║ ╠═╣║╣ ║ ╠╩╗ ├┤ │ │├┬┘ │ ├─┤││ ││├┬┘├┤ │││
- // ╚═╝╩ ╩╚═╝╚═╝╩ ╩ └ └─┘┴└─ └─┘┴ ┴┴┴─┘─┴┘┴└─└─┘┘└┘
- // ┌─┐ ┬ ┬┌─┐┬─┐┬┌─┐┌─┐
- // │─┼┐│ │├┤ ├┬┘│├┤ └─┐
- // └─┘└└─┘└─┘┴└─┴└─┘└─┘
- // Now that all the parents are found, check if there are any child
- // statements that need to be processed. If not, close the connection and
- // return the combined results.
- if (!statements.childStatements || !statements.childStatements.length) {
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb(err) {
- if (err) {
- return exits.error(err);
- }
- // Combine records in the cache to form nested results
- var combinedResults;
- try {
- combinedResults = queryCache.combineRecords();
- } catch (e) {
- return exits.error(e);
- }
- // Process each record to normalize output
- try {
- Helpers.query.processEachRecord({
- records: combinedResults,
- identity: model.identity,
- orm: orm
- });
- } catch (e) {
- return exits.error(e);
- }
- // Return the combined results
- exits.success(combinedResults);
- });
- return;
- }
- // ╔═╗╔═╗╦ ╦ ╔═╗╔═╗╔╦╗ ┌─┐┌─┐┬─┐┌─┐┌┐┌┌┬┐
- // ║ ║ ║║ ║ ║╣ ║ ║ ├─┘├─┤├┬┘├┤ │││ │
- // ╚═╝╚═╝╩═╝╩═╝╚═╝╚═╝ ╩ ┴ ┴ ┴┴└─└─┘┘└┘ ┴
- // ┬─┐┌─┐┌─┐┌─┐┬─┐┌┬┐┌─┐
- // ├┬┘├┤ │ │ │├┬┘ ││└─┐
- // ┴└─└─┘└─┘└─┘┴└──┴┘└─┘
- // There is more work to be done now. Go through the parent records and
- // build up an array of the primary keys.
- var parentKeys = _.map(queryCache.getParents(), function pluckPk(record) {
- return record[primaryKeyColumnName];
- });
- // ╔═╗╦═╗╔═╗╔═╗╔═╗╔═╗╔═╗ ┌─┐┬ ┬┬┬ ┌┬┐ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐┌─┐
- // ╠═╝╠╦╝║ ║║ ║╣ ╚═╗╚═╗ │ ├─┤││ ││ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │ └─┐
- // ╩ ╩╚═╚═╝╚═╝╚═╝╚═╝╚═╝ └─┘┴ ┴┴┴─┘─┴┘ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴ └─┘
- // For each child statement, figure out how to turn the statement into
- // a native query and then run it. Add the results to the query cache.
- async.each(statements.childStatements, function processChildStatements(template, next) {
- // ╦═╗╔═╗╔╗╔╔╦╗╔═╗╦═╗ ┬┌┐┌ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
- // ╠╦╝║╣ ║║║ ║║║╣ ╠╦╝ ││││ │─┼┐│ │├┤ ├┬┘└┬┘
- // ╩╚═╚═╝╝╚╝═╩╝╚═╝╩╚═ ┴┘└┘ └─┘└└─┘└─┘┴└─ ┴
- // ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
- // │ ├┤ │││├─┘│ ├─┤ │ ├┤
- // ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
- // If the statement is an IN query, replace the values with the parent
- // keys.
- if (template.queryType === 'in') {
- // Pull the last AND clause out - it's the one we added
- var inClause = _.pullAt(template.statement.where.and, template.statement.where.and.length - 1);
- // Grab the object inside the array that comes back
- inClause = _.first(inClause);
- // Modify the inClause using the actual parent key values
- _.each(inClause, function modifyInClause(val) {
- val.in = parentKeys;
- });
- // Reset the statement
- template.statement.where.and.push(inClause);
- }
- // ╦═╗╔═╗╔╗╔╔╦╗╔═╗╦═╗ ┬ ┬┌┐┌┬┌─┐┌┐┌ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
- // ╠╦╝║╣ ║║║ ║║║╣ ╠╦╝ │ ││││││ ││││ │─┼┐│ │├┤ ├┬┘└┬┘
- // ╩╚═╚═╝╝╚╝═╩╝╚═╝╩╚═ └─┘┘└┘┴└─┘┘└┘ └─┘└└─┘└─┘┴└─ ┴
- // ┌┬┐┌─┐┌┬┐┌─┐┬ ┌─┐┌┬┐┌─┐
- // │ ├┤ │││├─┘│ ├─┤ │ ├┤
- // ┴ └─┘┴ ┴┴ ┴─┘┴ ┴ ┴ └─┘
- // If the statement is a UNION type, loop through each parent key and
- // build up a proper query.
- if (template.queryType === 'union') {
- var unionStatements = [];
- // Build up an array of generated statements
- _.each(parentKeys, function buildUnion(parentPk) {
- var unionStatement = _.merge({}, template.statement);
- // Replace the placeholder `?` values with the primary key of the
- // parent record.
- var andClause = _.pullAt(unionStatement.where.and, unionStatement.where.and.length - 1);
- _.each(_.first(andClause), function replaceValue(val, key) {
- _.first(andClause)[key] = parentPk;
- });
- // Add the UNION statement to the array of other statements
- unionStatement.where.and.push(_.first(andClause));
- unionStatements.push(unionStatement);
- });
- // Replace the final statement with the UNION ALL clause
- if (unionStatements.length) {
- template.statement = { unionAll: unionStatements };
- }
- }
- // If there isn't a statement to be run, then just return
- if (!template.statement) {
- return next();
- }
- // ╔═╗╔═╗╔╦╗╔═╗╦╦ ╔═╗ ┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┬┐┌─┐┌┐┌┌┬┐
- // ║ ║ ║║║║╠═╝║║ ║╣ └─┐ │ ├─┤ │ ├┤ │││├┤ │││ │
- // ╚═╝╚═╝╩ ╩╩ ╩╩═╝╚═╝ └─┘ ┴ ┴ ┴ ┴ └─┘┴ ┴└─┘┘└┘ ┴
- // Attempt to convert the statement into a native query
- var compiledQuery;
- try {
- compiledQuery = Helpers.query.compileStatement(template.statement);
- } catch (e) {
- return next(e);
- }
- // ╦═╗╦ ╦╔╗╔ ┌─┐┬ ┬┬┬ ┌┬┐ ┌─┐ ┬ ┬┌─┐┬─┐┬ ┬
- // ╠╦╝║ ║║║║ │ ├─┤││ ││ │─┼┐│ │├┤ ├┬┘└┬┘
- // ╩╚═╚═╝╝╚╝ └─┘┴ ┴┴┴─┘─┴┘ └─┘└└─┘└─┘┴└─ ┴
- // Run the native query
- Helpers.query.runNativeQuery(connection, compiledQuery.nativeQuery, compiledQuery.valuesToEscape, compiledQuery.meta, function parentQueryCb(err, queryResults) {
- if (err) {
- return next(err);
- }
- // Extend the values in the cache to include the values from the
- // child query.
- queryCache.extend(queryResults, template.instructions);
- return next();
- });
- },
- function asyncEachCb(err) {
- // Always release the connection unless a leased connection from outside
- // the adapter was used.
- Helpers.connection.releaseConnection(connection, leased, function releaseConnectionCb() {
- if (err) {
- return exits.error(err);
- }
- // Combine records in the cache to form nested results
- var combinedResults = queryCache.combineRecords();
- // Process each record to normalize output
- try {
- Helpers.query.processEachRecord({
- records: combinedResults,
- identity: model.identity,
- orm: orm
- });
- } catch (e) {
- return exits.error(e);
- }
- // Return the combined results
- return exits.success(combinedResults);
- }); // </ releaseConnection >
- }); // </ asyncEachCb >
- }); // </ runNativeQuery >
- }); // </ spawnConnection >
- }
- });
|