123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- 'use strict';
- var _classCallCheck2 = require('babel-runtime/helpers/classCallCheck');
- var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
- var _possibleConstructorReturn2 = require('babel-runtime/helpers/possibleConstructorReturn');
- var _possibleConstructorReturn3 = _interopRequireDefault(_possibleConstructorReturn2);
- var _inherits2 = require('babel-runtime/helpers/inherits');
- var _inherits3 = _interopRequireDefault(_inherits2);
- function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
- // Oracledb Client
- // -------
- var _ = require('lodash');
- var inherits = require('inherits');
- var QueryCompiler = require('./query/compiler');
- var ColumnCompiler = require('./schema/columncompiler');
- var BlobHelper = require('./utils').BlobHelper;
- var ReturningHelper = require('./utils').ReturningHelper;
- var Promise = require('bluebird');
- var stream = require('stream');
- var helpers = require('../../helpers');
- var Transaction = require('./transaction');
- var Client_Oracle = require('../oracle');
- var Oracle_Formatter = require('../oracle/formatter');
- var Buffer = require('safe-buffer').Buffer;
- function Client_Oracledb() {
- Client_Oracle.apply(this, arguments);
- // Node.js only have 4 background threads by default, oracledb needs one by connection
- if (this.driver) {
- process.env.UV_THREADPOOL_SIZE = process.env.UV_THREADPOOL_SIZE || 1;
- process.env.UV_THREADPOOL_SIZE += this.driver.poolMax;
- }
- }
- inherits(Client_Oracledb, Client_Oracle);
- Client_Oracledb.prototype.driverName = 'oracledb';
- Client_Oracledb.prototype._driver = function () {
- var oracledb = require('oracledb');
- return oracledb;
- };
- Client_Oracledb.prototype.queryCompiler = function () {
- return new (Function.prototype.bind.apply(QueryCompiler, [null].concat([this], Array.prototype.slice.call(arguments))))();
- };
- Client_Oracledb.prototype.columnCompiler = function () {
- return new (Function.prototype.bind.apply(ColumnCompiler, [null].concat([this], Array.prototype.slice.call(arguments))))();
- };
- Client_Oracledb.prototype.formatter = function () {
- return new Oracledb_Formatter(this);
- };
- Client_Oracledb.prototype.transaction = function () {
- return new (Function.prototype.bind.apply(Transaction, [null].concat([this], Array.prototype.slice.call(arguments))))();
- };
- Client_Oracledb.prototype.prepBindings = function (bindings) {
- var _this = this;
- return _.map(bindings, function (value) {
- if (value instanceof BlobHelper && _this.driver) {
- return { type: _this.driver.BLOB, dir: _this.driver.BIND_OUT };
- // Returning helper always use ROWID as string
- } else if (value instanceof ReturningHelper && _this.driver) {
- return { type: _this.driver.STRING, dir: _this.driver.BIND_OUT };
- } else if (typeof value === 'boolean') {
- return value ? 1 : 0;
- }
- return value;
- });
- };
- // Get a raw connection, called by the `pool` whenever a new
- // connection needs to be added to the pool.
- Client_Oracledb.prototype.acquireRawConnection = function () {
- var client = this;
- var asyncConnection = new Promise(function (resolver, rejecter) {
- // If external authentication dont have to worry about username/password and
- // if not need to set the username and password
- var oracleDbConfig = client.connectionSettings.externalAuth ? { externalAuth: client.connectionSettings.externalAuth } : {
- user: client.connectionSettings.user,
- password: client.connectionSettings.password
- };
- // In the case of external authentication connection string will be given
- oracleDbConfig.connectString = client.connectionSettings.connectString || client.connectionSettings.host + '/' + client.connectionSettings.database;
- if (client.connectionSettings.prefetchRowCount) {
- oracleDbConfig.prefetchRows = client.connectionSettings.prefetchRowCount;
- }
- if (!_.isUndefined(client.connectionSettings.stmtCacheSize)) {
- oracleDbConfig.stmtCacheSize = client.connectionSettings.stmtCacheSize;
- }
- client.driver.getConnection(oracleDbConfig, function (err, connection) {
- if (err) {
- return rejecter(err);
- }
- connection.commitAsync = function () {
- var _this2 = this;
- return new Promise(function (commitResolve, commitReject) {
- if (connection.isTransaction) {
- return commitResolve();
- }
- _this2.commit(function (err) {
- if (err) {
- return commitReject(err);
- }
- commitResolve();
- });
- });
- };
- connection.rollbackAsync = function () {
- var _this3 = this;
- return new Promise(function (rollbackResolve, rollbackReject) {
- _this3.rollback(function (err) {
- if (err) {
- return rollbackReject(err);
- }
- rollbackResolve();
- });
- });
- };
- var fetchAsync = function fetchAsync(sql, bindParams, options, cb) {
- options = options || {};
- options.outFormat = client.driver.OBJECT;
- if (options.resultSet) {
- connection.execute(sql, bindParams || [], options, function (err, result) {
- if (err) {
- return cb(err);
- }
- var fetchResult = { rows: [], resultSet: result.resultSet };
- var numRows = 100;
- var fetchRowsFromRS = function fetchRowsFromRS(connection, resultSet, numRows) {
- resultSet.getRows(numRows, function (err, rows) {
- if (err) {
- resultSet.close(function () {
- return cb(err);
- });
- } else if (rows.length === 0) {
- return cb(null, fetchResult);
- } else if (rows.length > 0) {
- if (rows.length === numRows) {
- fetchResult.rows = fetchResult.rows.concat(rows);
- fetchRowsFromRS(connection, resultSet, numRows);
- } else {
- fetchResult.rows = fetchResult.rows.concat(rows);
- return cb(null, fetchResult);
- }
- }
- });
- };
- fetchRowsFromRS(connection, result.resultSet, numRows);
- });
- } else {
- connection.execute(sql, bindParams || [], options, cb);
- }
- };
- connection.executeAsync = function (sql, bindParams, options) {
- // Read all lob
- return new Promise(function (resultResolve, resultReject) {
- fetchAsync(sql, bindParams, options, function (err, results) {
- if (err) {
- return resultReject(err);
- }
- // Collect LOBs to read
- var lobs = [];
- if (results.rows) {
- if (Array.isArray(results.rows)) {
- for (var i = 0; i < results.rows.length; i++) {
- // Iterate through the rows
- var row = results.rows[i];
- for (var column in row) {
- if (row[column] instanceof stream.Readable) {
- lobs.push({ index: i, key: column, stream: row[column] });
- }
- }
- }
- }
- }
- Promise.each(lobs, function (lob) {
- return new Promise(function (lobResolve, lobReject) {
- readStream(lob.stream, function (err, d) {
- if (err) {
- if (results.resultSet) {
- results.resultSet.close(function () {
- return lobReject(err);
- });
- }
- return lobReject(err);
- }
- results.rows[lob.index][lob.key] = d;
- lobResolve();
- });
- });
- }).then(function () {
- if (results.resultSet) {
- results.resultSet.close(function (err) {
- if (err) {
- return resultReject(err);
- }
- return resultResolve(results);
- });
- }
- resultResolve(results);
- }, function (err) {
- resultReject(err);
- });
- });
- });
- };
- resolver(connection);
- });
- });
- return asyncConnection;
- };
- // Used to explicitly close a connection, called internally by the pool
- // when a connection times out or the pool is shutdown.
- Client_Oracledb.prototype.destroyRawConnection = function (connection) {
- connection.release();
- };
- // Runs the query on the specified connection, providing the bindings
- // and any other necessary prep work.
- Client_Oracledb.prototype._query = function (connection, obj) {
- // Convert ? params into positional bindings (:1)
- obj.sql = this.positionBindings(obj.sql);
- obj.bindings = this.prepBindings(obj.bindings) || [];
- return new Promise(function (resolver, rejecter) {
- if (!obj.sql) {
- return rejecter(new Error('The query is empty'));
- }
- var options = { autoCommit: false };
- if (obj.method === 'select') {
- options.resultSet = true;
- }
- connection.executeAsync(obj.sql, obj.bindings, options).then(function (response) {
- // Flatten outBinds
- var outBinds = _.flatten(response.outBinds);
- obj.response = response.rows || [];
- obj.rowsAffected = response.rows ? response.rows.rowsAffected : response.rowsAffected;
- if (obj.method === 'update') {
- (function () {
- var modifiedRowsCount = obj.rowsAffected.length || obj.rowsAffected;
- var updatedObjOutBinding = [];
- var updatedOutBinds = [];
- var updateOutBinds = function updateOutBinds(i) {
- return function (value, index) {
- var OutBindsOffset = index * modifiedRowsCount;
- updatedOutBinds.push(outBinds[i + OutBindsOffset]);
- };
- };
- for (var i = 0; i < modifiedRowsCount; i++) {
- updatedObjOutBinding.push(obj.outBinding[0]);
- _.each(obj.outBinding[0], updateOutBinds(i));
- }
- outBinds = updatedOutBinds;
- obj.outBinding = updatedObjOutBinding;
- })();
- }
- if (!obj.returning && outBinds.length === 0) {
- return connection.commitAsync().then(function () {
- resolver(obj);
- });
- }
- var rowIds = [];
- var offset = 0;
- Promise.each(obj.outBinding, function (ret, line) {
- offset = offset + (obj.outBinding[line - 1] ? obj.outBinding[line - 1].length : 0);
- return Promise.each(ret, function (out, index) {
- return new Promise(function (bindResolver, bindRejecter) {
- if (out instanceof BlobHelper) {
- var blob = outBinds[index + offset];
- if (out.returning) {
- obj.response[line] = obj.response[line] || {};
- obj.response[line][out.columnName] = out.value;
- }
- blob.on('error', function (err) {
- bindRejecter(err);
- });
- blob.on('finish', function () {
- bindResolver();
- });
- blob.write(out.value);
- blob.end();
- } else if (obj.outBinding[line][index] === 'ROWID') {
- rowIds.push(outBinds[index + offset]);
- bindResolver();
- } else {
- obj.response[line] = obj.response[line] || {};
- obj.response[line][out] = outBinds[index + offset];
- bindResolver();
- }
- });
- });
- }).then(function () {
- return connection.commitAsync();
- }).then(function () {
- if (obj.returningSql) {
- return connection.executeAsync(obj.returningSql(), rowIds, { resultSet: true }).then(function (response) {
- obj.response = response.rows;
- return obj;
- }, rejecter);
- }
- return obj;
- }, rejecter).then(function (obj) {
- resolver(obj);
- });
- }, rejecter);
- });
- };
- // Handle clob
- function readStream(stream, cb) {
- var oracledb = require('oracledb');
- var data = '';
- if (stream.iLob.type === oracledb.CLOB) {
- stream.setEncoding('utf-8');
- } else {
- data = Buffer.alloc(0);
- }
- stream.on('error', function (err) {
- cb(err);
- });
- stream.on('data', function (chunk) {
- if (stream.iLob.type === oracledb.CLOB) {
- data += chunk;
- } else {
- data = Buffer.concat([data, chunk]);
- }
- });
- stream.on('end', function () {
- cb(null, data);
- });
- }
- // Process the response as returned from the query.
- Client_Oracledb.prototype.processResponse = function (obj, runner) {
- var response = obj.response;
- var method = obj.method;
- if (obj.output) {
- return obj.output.call(runner, response);
- }
- switch (method) {
- case 'select':
- case 'pluck':
- case 'first':
- response = helpers.skim(response);
- if (obj.method === 'pluck') {
- response = _.map(response, obj.pluck);
- }
- return obj.method === 'first' ? response[0] : response;
- case 'insert':
- case 'del':
- case 'update':
- case 'counter':
- if (obj.returning && !_.isEmpty(obj.returning)) {
- if (obj.returning.length === 1 && obj.returning[0] !== '*') {
- return _.flatten(_.map(response, _.values));
- }
- return response;
- } else if (!_.isUndefined(obj.rowsAffected)) {
- return obj.rowsAffected;
- } else {
- return 1;
- }
- default:
- return response;
- }
- };
- var Oracledb_Formatter = function (_Oracle_Formatter) {
- (0, _inherits3.default)(Oracledb_Formatter, _Oracle_Formatter);
- function Oracledb_Formatter() {
- (0, _classCallCheck3.default)(this, Oracledb_Formatter);
- return (0, _possibleConstructorReturn3.default)(this, _Oracle_Formatter.apply(this, arguments));
- }
- // Checks whether a value is a function... if it is, we compile it
- // otherwise we check whether it's a raw
- Oracledb_Formatter.prototype.parameter = function parameter(value) {
- if (typeof value === 'function') {
- return this.outputQuery(this.compileCallback(value), true);
- } else if (value instanceof BlobHelper) {
- return 'EMPTY_BLOB()';
- }
- return this.unwrapRaw(value, true) || '?';
- };
- return Oracledb_Formatter;
- }(Oracle_Formatter);
- module.exports = Client_Oracledb;
|