123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- 'use strict';
- const EventEmitter = require('events');
- const MongoError = require('../error').MongoError;
- const Pool = require('../connection/pool');
- const relayEvents = require('../utils').relayEvents;
- const calculateDurationInMs = require('../utils').calculateDurationInMs;
- const Query = require('../connection/commands').Query;
- const TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support');
- const ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support');
- const BSON = require('../connection/utils').retrieveBSON();
- const createClientInfo = require('../topologies/shared').createClientInfo;
- const Logger = require('../connection/logger');
- const ServerDescription = require('./server_description').ServerDescription;
- const ReadPreference = require('../topologies/read_preference');
- const monitorServer = require('./monitoring').monitorServer;
- /**
- *
- * @fires Server#serverHeartbeatStarted
- * @fires Server#serverHeartbeatSucceeded
- * @fires Server#serverHeartbeatFailed
- */
- class Server extends EventEmitter {
- /**
- * Create a server
- *
- * @param {ServerDescription} description
- * @param {Object} options
- */
- constructor(description, options) {
- super();
- this.s = {
- // the server description
- description,
- // a saved copy of the incoming options
- options,
- // the server logger
- logger: Logger('Server', options),
- // the bson parser
- bson: options.bson || new BSON(),
- // client metadata for the initial handshake
- clientInfo: createClientInfo(options),
- // state variable to determine if there is an active server check in progress
- monitoring: false,
- // the connection pool
- pool: null
- };
- }
- get description() {
- return this.s.description;
- }
- get name() {
- return this.s.description.address;
- }
- /**
- * Initiate server connect
- *
- * @param {Array} [options.auth] Array of auth options to apply on connect
- */
- connect(options) {
- options = options || {};
- // do not allow connect to be called on anything that's not disconnected
- if (this.s.pool && !this.s.pool.isDisconnected() && !this.s.pool.isDestroyed()) {
- throw new MongoError(`Server instance in invalid state ${this.s.pool.state}`);
- }
- // create a pool
- this.s.pool = new Pool(this, Object.assign(this.s.options, options, { bson: this.s.bson }));
- // Set up listeners
- this.s.pool.on('connect', connectEventHandler(this));
- this.s.pool.on('close', closeEventHandler(this));
- // this.s.pool.on('error', errorEventHandler(this));
- // this.s.pool.on('timeout', timeoutEventHandler(this));
- // this.s.pool.on('parseError', errorEventHandler(this));
- // this.s.pool.on('reconnect', reconnectEventHandler(this));
- // this.s.pool.on('reconnectFailed', errorEventHandler(this));
- // relay all command monitoring events
- relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']);
- // If auth settings have been provided, use them
- if (options.auth) {
- this.s.pool.connect.apply(this.s.pool, options.auth);
- return;
- }
- this.s.pool.connect();
- }
- /**
- * Destroy the server connection
- *
- * @param {Boolean} [options.emitClose=false] Emit close event on destroy
- * @param {Boolean} [options.emitDestroy=false] Emit destroy event on destroy
- * @param {Boolean} [options.force=false] Force destroy the pool
- */
- destroy(callback) {
- if (typeof callback === 'function') {
- callback(null, null);
- }
- }
- /**
- * Immediately schedule monitoring of this server. If there already an attempt being made
- * this will be a no-op.
- */
- monitor() {
- if (this.s.monitoring) return;
- if (this.s.monitorId) clearTimeout(this.s.monitorId);
- monitorServer(this);
- }
- /**
- * Execute a command
- *
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {object} cmd The command hash
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.checkKeys=false] Specify if the bson parser should validate keys.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- command(ns, cmd, options, callback) {
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options || {});
- }
- const error = basicReadValidations(this, options);
- if (error) {
- return callback(error, null);
- }
- // Clone the options
- options = Object.assign({}, options, { wireProtocolCommand: false });
- // Debug log
- if (this.s.logger.isDebug()) {
- this.s.logger.debug(
- `executing command [${JSON.stringify({ ns, cmd, options })}] against ${this.name}`
- );
- }
- // Check if we have collation support
- if (this.description.maxWireVersion < 5 && cmd.collation) {
- callback(new MongoError(`server ${this.name} does not support collation`));
- return;
- }
- // Create the query object
- const query = this.s.wireProtocolHandler.command(this, ns, cmd, {}, options);
- // Set slave OK of the query
- query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false;
- // write options
- const writeOptions = {
- raw: typeof options.raw === 'boolean' ? options.raw : false,
- promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
- promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
- promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
- command: true,
- monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : false,
- fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
- requestId: query.requestId,
- socketTimeout: typeof options.socketTimeout === 'number' ? options.socketTimeout : null,
- session: options.session || null
- };
- // write the operation to the pool
- this.s.pool.write(query, writeOptions, callback);
- }
- /**
- * Insert one or more documents
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of documents to insert
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- insert(ns, ops, options, callback) {
- executeWriteOperation({ server: this, op: 'insert', ns, ops }, options, callback);
- }
- /**
- * Perform one or more update operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of updates
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- update(ns, ops, options, callback) {
- executeWriteOperation({ server: this, op: 'update', ns, ops }, options, callback);
- }
- /**
- * Perform one or more remove operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of removes
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
- * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
- * @param {ClientSession} [options.session=null] Session to use for the operation
- * @param {opResultCallback} callback A callback function
- */
- remove(ns, ops, options, callback) {
- executeWriteOperation({ server: this, op: 'remove', ns, ops }, options, callback);
- }
- }
- function basicWriteValidations(server) {
- if (!server.s.pool) {
- return new MongoError('server instance is not connected');
- }
- if (server.s.pool.isDestroyed()) {
- return new MongoError('server instance pool was destroyed');
- }
- return null;
- }
- function basicReadValidations(server, options) {
- const error = basicWriteValidations(server, options);
- if (error) {
- return error;
- }
- if (options.readPreference && !(options.readPreference instanceof ReadPreference)) {
- return new MongoError('readPreference must be an instance of ReadPreference');
- }
- }
- function executeWriteOperation(args, options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = options || {};
- // TODO: once we drop Node 4, use destructuring either here or in arguments.
- const server = args.server;
- const op = args.op;
- const ns = args.ns;
- const ops = Array.isArray(args.ops) ? args.ops : [args.ops];
- const error = basicWriteValidations(server, options);
- if (error) {
- callback(error, null);
- return;
- }
- // Check if we have collation support
- if (server.description.maxWireVersion < 5 && options.collation) {
- callback(new MongoError(`server ${this.name} does not support collation`));
- return;
- }
- // Execute write
- return server.s.wireProtocolHandler[op](server.s.pool, ns, server.s.bson, ops, options, callback);
- }
- function saslSupportedMechs(options) {
- if (!options) {
- return {};
- }
- const authArray = options.auth || [];
- const authMechanism = authArray[0] || options.authMechanism;
- const authSource = authArray[1] || options.authSource || options.dbName || 'admin';
- const user = authArray[2] || options.user;
- if (typeof authMechanism === 'string' && authMechanism.toUpperCase() !== 'DEFAULT') {
- return {};
- }
- if (!user) {
- return {};
- }
- return { saslSupportedMechs: `${authSource}.${user}` };
- }
- function extractIsMasterError(err, result) {
- if (err) return err;
- if (result && result.result && result.result.ok === 0) {
- return new MongoError(result.result);
- }
- }
- function executeServerHandshake(server, callback) {
- // construct an `ismaster` query
- const compressors =
- server.s.options.compression && server.s.options.compression.compressors
- ? server.s.options.compression.compressors
- : [];
- const queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
- const query = new Query(
- server.s.bson,
- 'admin.$cmd',
- Object.assign(
- { ismaster: true, client: server.s.clientInfo, compression: compressors },
- saslSupportedMechs(server.s.options)
- ),
- queryOptions
- );
- // execute the query
- server.s.pool.write(
- query,
- { socketTimeout: server.s.options.connectionTimeout || 2000 },
- callback
- );
- }
- function configureWireProtocolHandler(ismaster) {
- // 3.2 wire protocol handler
- if (ismaster.maxWireVersion >= 4) {
- return new ThreeTwoWireProtocolSupport();
- }
- // default to 2.6 wire protocol handler
- return new TwoSixWireProtocolSupport();
- }
- function connectEventHandler(server) {
- return function() {
- // log information of received information if in info mode
- // if (server.s.logger.isInfo()) {
- // var object = err instanceof MongoError ? JSON.stringify(err) : {};
- // server.s.logger.info(`server ${server.name} fired event ${event} out with message ${object}`);
- // }
- // begin initial server handshake
- const start = process.hrtime();
- executeServerHandshake(server, (err, response) => {
- // Set initial lastIsMasterMS - is this needed?
- server.s.lastIsMasterMS = calculateDurationInMs(start);
- const serverError = extractIsMasterError(err, response);
- if (serverError) {
- server.emit('error', serverError);
- return;
- }
- // extract the ismaster from the server response
- const isMaster = response.result;
- // compression negotation
- if (isMaster && isMaster.compression) {
- const localCompressionInfo = server.s.options.compression;
- const localCompressors = localCompressionInfo.compressors;
- for (var i = 0; i < localCompressors.length; i++) {
- if (isMaster.compression.indexOf(localCompressors[i]) > -1) {
- server.s.pool.options.agreedCompressor = localCompressors[i];
- break;
- }
- }
- if (localCompressionInfo.zlibCompressionLevel) {
- server.s.pool.options.zlibCompressionLevel = localCompressionInfo.zlibCompressionLevel;
- }
- }
- // configure the wire protocol handler
- server.s.wireProtocolHandler = configureWireProtocolHandler(isMaster);
- // log the connection event if requested
- if (server.s.logger.isInfo()) {
- server.s.logger.info(
- `server ${server.name} connected with ismaster [${JSON.stringify(isMaster)}]`
- );
- }
- // emit an event indicating that our description has changed
- server.emit(
- 'descriptionReceived',
- new ServerDescription(server.description.address, isMaster)
- );
- // emit a connect event
- server.emit('connect', isMaster);
- });
- };
- }
- function closeEventHandler(server) {
- return function() {
- server.emit('close');
- };
- }
- module.exports = Server;
|