123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- var Crypto = require('crypto');
- var Events = require('events');
- var Net = require('net');
- var tls = require('tls');
- var ConnectionConfig = require('./ConnectionConfig');
- var Protocol = require('./protocol/Protocol');
- var SqlString = require('./protocol/SqlString');
- var Query = require('./protocol/sequences/Query');
- var Util = require('util');
- module.exports = Connection;
- Util.inherits(Connection, Events.EventEmitter);
- function Connection(options) {
- Events.EventEmitter.call(this);
- this.config = options.config;
- this._socket = options.socket;
- this._protocol = new Protocol({config: this.config, connection: this});
- this._connectCalled = false;
- this.state = 'disconnected';
- this.threadId = null;
- }
- Connection.createQuery = function createQuery(sql, values, callback) {
- if (sql instanceof Query) {
- return sql;
- }
- var cb = wrapCallbackInDomain(null, callback);
- var options = {};
- if (typeof sql === 'function') {
- cb = wrapCallbackInDomain(null, sql);
- return new Query(options, cb);
- }
- if (typeof sql === 'object') {
- for (var prop in sql) {
- options[prop] = sql[prop];
- }
- if (typeof values === 'function') {
- cb = wrapCallbackInDomain(null, values);
- } else if (values !== undefined) {
- options.values = values;
- }
- return new Query(options, cb);
- }
- options.sql = sql;
- options.values = values;
- if (typeof values === 'function') {
- cb = wrapCallbackInDomain(null, values);
- options.values = undefined;
- }
- if (cb === undefined && callback !== undefined) {
- throw new TypeError('argument callback must be a function when provided');
- }
- return new Query(options, cb);
- };
- Connection.prototype.connect = function connect(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- if (!this._connectCalled) {
- this._connectCalled = true;
- // Connect either via a UNIX domain socket or a TCP socket.
- this._socket = (this.config.socketPath)
- ? Net.createConnection(this.config.socketPath)
- : Net.createConnection(this.config.port, this.config.host);
- // Connect socket to connection domain
- if (Events.usingDomains) {
- this._socket.domain = this.domain;
- }
- var connection = this;
- this._protocol.on('data', function(data) {
- connection._socket.write(data);
- });
- this._socket.on('data', wrapToDomain(connection, function (data) {
- connection._protocol.write(data);
- }));
- this._protocol.on('end', function() {
- connection._socket.end();
- });
- this._socket.on('end', wrapToDomain(connection, function () {
- connection._protocol.end();
- }));
- this._socket.on('error', this._handleNetworkError.bind(this));
- this._socket.on('connect', this._handleProtocolConnect.bind(this));
- this._protocol.on('handshake', this._handleProtocolHandshake.bind(this));
- this._protocol.on('unhandledError', this._handleProtocolError.bind(this));
- this._protocol.on('drain', this._handleProtocolDrain.bind(this));
- this._protocol.on('end', this._handleProtocolEnd.bind(this));
- this._protocol.on('enqueue', this._handleProtocolEnqueue.bind(this));
- if (this.config.connectTimeout) {
- var handleConnectTimeout = this._handleConnectTimeout.bind(this);
- this._socket.setTimeout(this.config.connectTimeout, handleConnectTimeout);
- this._socket.once('connect', function() {
- this.setTimeout(0, handleConnectTimeout);
- });
- }
- }
- this._protocol.handshake(options, wrapCallbackInDomain(this, callback));
- };
- Connection.prototype.changeUser = function changeUser(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- this._implyConnect();
- var charsetNumber = (options.charset)
- ? ConnectionConfig.getCharsetNumber(options.charset)
- : this.config.charsetNumber;
- return this._protocol.changeUser({
- user : options.user || this.config.user,
- password : options.password || this.config.password,
- database : options.database || this.config.database,
- timeout : options.timeout,
- charsetNumber : charsetNumber,
- currentConfig : this.config
- }, wrapCallbackInDomain(this, callback));
- };
- Connection.prototype.beginTransaction = function beginTransaction(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- options = options || {};
- options.sql = 'START TRANSACTION';
- options.values = null;
- return this.query(options, callback);
- };
- Connection.prototype.commit = function commit(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- options = options || {};
- options.sql = 'COMMIT';
- options.values = null;
- return this.query(options, callback);
- };
- Connection.prototype.rollback = function rollback(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- options = options || {};
- options.sql = 'ROLLBACK';
- options.values = null;
- return this.query(options, callback);
- };
- Connection.prototype.query = function query(sql, values, cb) {
- var query = Connection.createQuery(sql, values, cb);
- query._connection = this;
- if (!(typeof sql === 'object' && 'typeCast' in sql)) {
- query.typeCast = this.config.typeCast;
- }
- if (query.sql) {
- query.sql = this.format(query.sql, query.values);
- }
- if (query._callback) {
- query._callback = wrapCallbackInDomain(this, query._callback);
- }
- this._implyConnect();
- return this._protocol._enqueue(query);
- };
- Connection.prototype.ping = function ping(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- this._implyConnect();
- this._protocol.ping(options, wrapCallbackInDomain(this, callback));
- };
- Connection.prototype.statistics = function statistics(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- this._implyConnect();
- this._protocol.stats(options, wrapCallbackInDomain(this, callback));
- };
- Connection.prototype.end = function end(options, callback) {
- var cb = callback;
- var opts = options;
- if (!callback && typeof options === 'function') {
- cb = options;
- opts = null;
- }
- // create custom options reference
- opts = Object.create(opts || null);
- if (opts.timeout === undefined) {
- // default timeout of 30 seconds
- opts.timeout = 30000;
- }
- this._implyConnect();
- this._protocol.quit(opts, wrapCallbackInDomain(this, cb));
- };
- Connection.prototype.destroy = function() {
- this.state = 'disconnected';
- this._implyConnect();
- this._socket.destroy();
- this._protocol.destroy();
- };
- Connection.prototype.pause = function() {
- this._socket.pause();
- this._protocol.pause();
- };
- Connection.prototype.resume = function() {
- this._socket.resume();
- this._protocol.resume();
- };
- Connection.prototype.escape = function(value) {
- return SqlString.escape(value, false, this.config.timezone);
- };
- Connection.prototype.escapeId = function escapeId(value) {
- return SqlString.escapeId(value, false);
- };
- Connection.prototype.format = function(sql, values) {
- if (typeof this.config.queryFormat === 'function') {
- return this.config.queryFormat.call(this, sql, values, this.config.timezone);
- }
- return SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone);
- };
- if (tls.TLSSocket) {
- // 0.11+ environment
- Connection.prototype._startTLS = function _startTLS(onSecure) {
- var connection = this;
- var secureContext = tls.createSecureContext({
- ca : this.config.ssl.ca,
- cert : this.config.ssl.cert,
- ciphers : this.config.ssl.ciphers,
- key : this.config.ssl.key,
- passphrase : this.config.ssl.passphrase
- });
- // "unpipe"
- this._socket.removeAllListeners('data');
- this._protocol.removeAllListeners('data');
- // socket <-> encrypted
- var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
- var secureEstablished = false;
- var secureSocket = new tls.TLSSocket(this._socket, {
- rejectUnauthorized : rejectUnauthorized,
- requestCert : true,
- secureContext : secureContext,
- isServer : false
- });
- // error handler for secure socket
- secureSocket.on('_tlsError', function(err) {
- if (secureEstablished) {
- connection._handleNetworkError(err);
- } else {
- onSecure(err);
- }
- });
- // cleartext <-> protocol
- secureSocket.pipe(this._protocol);
- this._protocol.on('data', function(data) {
- secureSocket.write(data);
- });
- secureSocket.on('secure', function() {
- secureEstablished = true;
- onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
- });
- // start TLS communications
- secureSocket._start();
- };
- } else {
- // pre-0.11 environment
- Connection.prototype._startTLS = function _startTLS(onSecure) {
- // before TLS:
- // _socket <-> _protocol
- // after:
- // _socket <-> securePair.encrypted <-> securePair.cleartext <-> _protocol
- var connection = this;
- var credentials = Crypto.createCredentials({
- ca : this.config.ssl.ca,
- cert : this.config.ssl.cert,
- ciphers : this.config.ssl.ciphers,
- key : this.config.ssl.key,
- passphrase : this.config.ssl.passphrase
- });
- var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
- var secureEstablished = false;
- var securePair = tls.createSecurePair(credentials, false, true, rejectUnauthorized);
- // error handler for secure pair
- securePair.on('error', function(err) {
- if (secureEstablished) {
- connection._handleNetworkError(err);
- } else {
- onSecure(err);
- }
- });
- // "unpipe"
- this._socket.removeAllListeners('data');
- this._protocol.removeAllListeners('data');
- // socket <-> encrypted
- securePair.encrypted.pipe(this._socket);
- this._socket.on('data', function(data) {
- securePair.encrypted.write(data);
- });
- // cleartext <-> protocol
- securePair.cleartext.pipe(this._protocol);
- this._protocol.on('data', function(data) {
- securePair.cleartext.write(data);
- });
- // secure established
- securePair.on('secure', function() {
- secureEstablished = true;
- if (!rejectUnauthorized) {
- onSecure();
- return;
- }
- var verifyError = this.ssl.verifyError();
- var err = verifyError;
- // node.js 0.6 support
- if (typeof err === 'string') {
- err = new Error(verifyError);
- err.code = verifyError;
- }
- onSecure(err);
- });
- // node.js 0.8 bug
- securePair._cycle = securePair.cycle;
- securePair.cycle = function cycle() {
- if (this.ssl && this.ssl.error) {
- this.error();
- }
- return this._cycle.apply(this, arguments);
- };
- };
- }
- Connection.prototype._handleConnectTimeout = function() {
- if (this._socket) {
- this._socket.setTimeout(0);
- this._socket.destroy();
- }
- var err = new Error('connect ETIMEDOUT');
- err.errorno = 'ETIMEDOUT';
- err.code = 'ETIMEDOUT';
- err.syscall = 'connect';
- this._handleNetworkError(err);
- };
- Connection.prototype._handleNetworkError = function(err) {
- this._protocol.handleNetworkError(err);
- };
- Connection.prototype._handleProtocolError = function(err) {
- this.state = 'protocol_error';
- this.emit('error', err);
- };
- Connection.prototype._handleProtocolDrain = function() {
- this.emit('drain');
- };
- Connection.prototype._handleProtocolConnect = function() {
- this.state = 'connected';
- this.emit('connect');
- };
- Connection.prototype._handleProtocolHandshake = function _handleProtocolHandshake(packet) {
- this.state = 'authenticated';
- this.threadId = packet.threadId;
- };
- Connection.prototype._handleProtocolEnd = function(err) {
- this.state = 'disconnected';
- this.emit('end', err);
- };
- Connection.prototype._handleProtocolEnqueue = function _handleProtocolEnqueue(sequence) {
- this.emit('enqueue', sequence);
- };
- Connection.prototype._implyConnect = function() {
- if (!this._connectCalled) {
- this.connect();
- }
- };
- function unwrapFromDomain(fn) {
- return function () {
- var domains = [];
- var ret;
- while (process.domain) {
- domains.shift(process.domain);
- process.domain.exit();
- }
- try {
- ret = fn.apply(this, arguments);
- } finally {
- for (var i = 0; i < domains.length; i++) {
- domains[i].enter();
- }
- }
- return ret;
- };
- }
- function wrapCallbackInDomain(ee, fn) {
- if (typeof fn !== 'function' || fn.domain) {
- return fn;
- }
- var domain = process.domain;
- if (domain) {
- return domain.bind(fn);
- } else if (ee) {
- return unwrapFromDomain(wrapToDomain(ee, fn));
- } else {
- return fn;
- }
- }
- function wrapToDomain(ee, fn) {
- return function () {
- if (Events.usingDomains && ee.domain) {
- ee.domain.enter();
- fn.apply(this, arguments);
- ee.domain.exit();
- } else {
- fn.apply(this, arguments);
- }
- };
- }
|