mongo_client_ops.js 18 KB


  1. 'use strict';
  2. const authenticate = require('../authenticate');
  3. const deprecate = require('util').deprecate;
  4. const Logger = require('mongodb-core').Logger;
  5. const MongoError = require('mongodb-core').MongoError;
  6. const Mongos = require('../topologies/mongos');
  7. const parse = require('mongodb-core').parseConnectionString;
  8. const ReadPreference = require('mongodb-core').ReadPreference;
  9. const ReplSet = require('../topologies/replset');
  10. const Server = require('../topologies/server');
  11. const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool;
  12. let client;
  13. function loadClient() {
  14. if (!client) {
  15. client = require('../mongo_client');
  16. }
  17. return client;
  18. }
  19. const monitoringEvents = [
  20. 'timeout',
  21. 'close',
  22. 'serverOpening',
  23. 'serverDescriptionChanged',
  24. 'serverHeartbeatStarted',
  25. 'serverHeartbeatSucceeded',
  26. 'serverHeartbeatFailed',
  27. 'serverClosed',
  28. 'topologyOpening',
  29. 'topologyClosed',
  30. 'topologyDescriptionChanged',
  31. 'commandStarted',
  32. 'commandSucceeded',
  33. 'commandFailed',
  34. 'joined',
  35. 'left',
  36. 'ping',
  37. 'ha',
  38. 'all',
  39. 'fullsetup',
  40. 'open'
  41. ];
  42. const ignoreOptionNames = ['native_parser'];
  43. const legacyOptionNames = ['server', 'replset', 'replSet', 'mongos', 'db'];
  44. const legacyParse = deprecate(
  45. require('../url_parser'),
  46. 'current URL string parser is deprecated, and will be removed in a future version. ' +
  47. 'To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.'
  48. );
  49. const validOptionNames = [
  50. 'poolSize',
  51. 'ssl',
  52. 'sslValidate',
  53. 'sslCA',
  54. 'sslCert',
  55. 'sslKey',
  56. 'sslPass',
  57. 'sslCRL',
  58. 'autoReconnect',
  59. 'noDelay',
  60. 'keepAlive',
  61. 'keepAliveInitialDelay',
  62. 'connectTimeoutMS',
  63. 'family',
  64. 'socketTimeoutMS',
  65. 'reconnectTries',
  66. 'reconnectInterval',
  67. 'ha',
  68. 'haInterval',
  69. 'replicaSet',
  70. 'secondaryAcceptableLatencyMS',
  71. 'acceptableLatencyMS',
  72. 'connectWithNoPrimary',
  73. 'authSource',
  74. 'w',
  75. 'wtimeout',
  76. 'j',
  77. 'forceServerObjectId',
  78. 'serializeFunctions',
  79. 'ignoreUndefined',
  80. 'raw',
  81. 'bufferMaxEntries',
  82. 'readPreference',
  83. 'pkFactory',
  84. 'promiseLibrary',
  85. 'readConcern',
  86. 'maxStalenessSeconds',
  87. 'loggerLevel',
  88. 'logger',
  89. 'promoteValues',
  90. 'promoteBuffers',
  91. 'promoteLongs',
  92. 'domainsEnabled',
  93. 'checkServerIdentity',
  94. 'validateOptions',
  95. 'appname',
  96. 'auth',
  97. 'user',
  98. 'password',
  99. 'authMechanism',
  100. 'compression',
  101. 'fsync',
  102. 'readPreferenceTags',
  103. 'numberOfRetries',
  104. 'auto_reconnect',
  105. 'minSize',
  106. 'monitorCommands',
  107. 'retryWrites',
  108. 'useNewUrlParser'
  109. ];
  110. function addListeners(mongoClient, topology) {
  111. topology.on('authenticated', createListener(mongoClient, 'authenticated'));
  112. topology.on('error', createListener(mongoClient, 'error'));
  113. topology.on('timeout', createListener(mongoClient, 'timeout'));
  114. topology.on('close', createListener(mongoClient, 'close'));
  115. topology.on('parseError', createListener(mongoClient, 'parseError'));
  116. topology.once('open', createListener(mongoClient, 'open'));
  117. topology.once('fullsetup', createListener(mongoClient, 'fullsetup'));
  118. topology.once('all', createListener(mongoClient, 'all'));
  119. topology.on('reconnect', createListener(mongoClient, 'reconnect'));
  120. }
  121. function assignTopology(client, topology) {
  122. client.topology = topology;
  123. topology.s.sessionPool = new ServerSessionPool(topology.s.coreTopology);
  124. }
  125. // Clear out all events
  126. function clearAllEvents(topology) {
  127. monitoringEvents.forEach(event => topology.removeAllListeners(event));
  128. }
  129. // Collect all events in order from SDAM
  130. function collectEvents(mongoClient, topology) {
  131. let MongoClient = loadClient();
  132. const collectedEvents = [];
  133. if (mongoClient instanceof MongoClient) {
  134. monitoringEvents.forEach(event => {
  135. topology.on(event, (object1, object2) => {
  136. if (event === 'open') {
  137. collectedEvents.push({ event: event, object1: mongoClient });
  138. } else {
  139. collectedEvents.push({ event: event, object1: object1, object2: object2 });
  140. }
  141. });
  142. });
  143. }
  144. return collectedEvents;
  145. }
  146. /**
  147. * Connect to MongoDB using a url as documented at
  148. *
  149. * docs.mongodb.org/manual/reference/connection-string/
  150. *
  151. * Note that for replicasets the replicaSet query parameter is required in the 2.0 driver
  152. *
  153. * @method
  154. * @param {MongoClient} mongoClient The MongoClient instance with which to connect.
  155. * @param {string} url The connection URI string
  156. * @param {object} [options] Optional settings. See MongoClient.prototype.connect for a list of options.
  157. * @param {MongoClient~connectCallback} [callback] The command result callback
  158. */
  159. function connect(mongoClient, url, options, callback) {
  160. options = Object.assign({}, options);
  161. // If callback is null throw an exception
  162. if (callback == null) {
  163. throw new Error('no callback function provided');
  164. }
  165. // Get a logger for MongoClient
  166. const logger = Logger('MongoClient', options);
  167. // Did we pass in a Server/ReplSet/Mongos
  168. if (url instanceof Server || url instanceof ReplSet || url instanceof Mongos) {
  169. return connectWithUrl(mongoClient, url, options, connectCallback);
  170. }
  171. const parseFn = options.useNewUrlParser ? parse : legacyParse;
  172. const transform = options.useNewUrlParser ? transformUrlOptions : legacyTransformUrlOptions;
  173. parseFn(url, options, (err, _object) => {
  174. // Do not attempt to connect if parsing error
  175. if (err) return callback(err);
  176. // Flatten
  177. const object = transform(_object);
  178. // Parse the string
  179. const _finalOptions = createUnifiedOptions(object, options);
  180. // Check if we have connection and socket timeout set
  181. if (_finalOptions.socketTimeoutMS == null) _finalOptions.socketTimeoutMS = 360000;
  182. if (_finalOptions.connectTimeoutMS == null) _finalOptions.connectTimeoutMS = 30000;
  183. if (_finalOptions.db_options && _finalOptions.db_options.auth) {
  184. delete _finalOptions.db_options.auth;
  185. }
  186. // Store the merged options object
  187. mongoClient.s.options = _finalOptions;
  188. // Failure modes
  189. if (object.servers.length === 0) {
  190. return callback(new Error('connection string must contain at least one seed host'));
  191. }
  192. // Do we have a replicaset then skip discovery and go straight to connectivity
  193. if (_finalOptions.replicaSet || _finalOptions.rs_name) {
  194. return createTopology(
  195. mongoClient,
  196. 'replicaset',
  197. _finalOptions,
  198. connectHandler(mongoClient, _finalOptions, connectCallback)
  199. );
  200. } else if (object.servers.length > 1) {
  201. return createTopology(
  202. mongoClient,
  203. 'mongos',
  204. _finalOptions,
  205. connectHandler(mongoClient, _finalOptions, connectCallback)
  206. );
  207. } else {
  208. return createServer(
  209. mongoClient,
  210. _finalOptions,
  211. connectHandler(mongoClient, _finalOptions, connectCallback)
  212. );
  213. }
  214. });
  215. function connectCallback(err, topology) {
  216. const warningMessage = `seed list contains no mongos proxies, replicaset connections requires the parameter replicaSet to be supplied in the URI or options object, mongodb://server:port/db?replicaSet=name`;
  217. if (err && err.message === 'no mongos proxies found in seed list') {
  218. if (logger.isWarn()) {
  219. logger.warn(warningMessage);
  220. }
  221. // Return a more specific error message for MongoClient.connect
  222. return callback(new MongoError(warningMessage));
  223. }
  224. // Return the error and db instance
  225. callback(err, topology);
  226. }
  227. }
  228. function connectHandler(client, options, callback) {
  229. return (err, topology) => {
  230. if (err) {
  231. return handleConnectCallback(err, topology, callback);
  232. }
  233. // No authentication just reconnect
  234. if (!options.auth) {
  235. return handleConnectCallback(err, topology, callback);
  236. }
  237. // Authenticate
  238. authenticate(client, options.user, options.password, options, (err, success) => {
  239. if (success) {
  240. handleConnectCallback(null, topology, callback);
  241. } else {
  242. if (topology) topology.close();
  243. const authError = err ? err : new Error('Could not authenticate user ' + options.auth[0]);
  244. handleConnectCallback(authError, topology, callback);
  245. }
  246. });
  247. };
  248. }
  249. /**
  250. * Connect to MongoDB using a url as documented at
  251. *
  252. * docs.mongodb.org/manual/reference/connection-string/
  253. *
  254. * Note that for replicasets the replicaSet query parameter is required in the 2.0 driver
  255. *
  256. * @method
  257. * @param {MongoClient} mongoClient The MongoClient instance with which to connect.
  258. * @param {MongoClient~connectCallback} [callback] The command result callback
  259. */
  260. function connectOp(mongoClient, err, callback) {
  261. // Did we have a validation error
  262. if (err) return callback(err);
  263. // Fallback to callback based connect
  264. connect(mongoClient, mongoClient.s.url, mongoClient.s.options, err => {
  265. if (err) return callback(err);
  266. callback(null, mongoClient);
  267. });
  268. }
  269. function connectWithUrl(mongoClient, url, options, connectCallback) {
  270. // Set the topology
  271. assignTopology(mongoClient, url);
  272. // Add listeners
  273. addListeners(mongoClient, url);
  274. // Propagate the events to the client
  275. relayEvents(mongoClient, url);
  276. let finalOptions = Object.assign({}, options);
  277. // If we have a readPreference passed in by the db options, convert it from a string
  278. if (typeof options.readPreference === 'string' || typeof options.read_preference === 'string') {
  279. finalOptions.readPreference = new ReadPreference(
  280. options.readPreference || options.read_preference
  281. );
  282. }
  283. // Connect
  284. return url.connect(
  285. finalOptions,
  286. connectHandler(mongoClient, finalOptions, (err, topology) => {
  287. if (err) return connectCallback(err, topology);
  288. if (finalOptions.user || finalOptions.password || finalOptions.authMechanism) {
  289. return authenticate(
  290. mongoClient,
  291. finalOptions.user,
  292. finalOptions.password,
  293. finalOptions,
  294. err => {
  295. if (err) return connectCallback(err, topology);
  296. connectCallback(err, topology);
  297. }
  298. );
  299. }
  300. connectCallback(err, topology);
  301. })
  302. );
  303. }
  304. function createListener(mongoClient, event) {
  305. const eventSet = new Set(['all', 'fullsetup', 'open', 'reconnect']);
  306. return (v1, v2) => {
  307. if (eventSet.has(event)) {
  308. return mongoClient.emit(event, mongoClient);
  309. }
  310. mongoClient.emit(event, v1, v2);
  311. };
  312. }
  313. function createServer(mongoClient, options, callback) {
  314. // Pass in the promise library
  315. options.promiseLibrary = mongoClient.s.promiseLibrary;
  316. // Set default options
  317. const servers = translateOptions(options);
  318. const server = servers[0];
  319. // Propagate the events to the client
  320. const collectedEvents = collectEvents(mongoClient, server);
  321. // Connect to topology
  322. server.connect(options, (err, topology) => {
  323. if (err) {
  324. server.close(true);
  325. return callback(err);
  326. }
  327. // Clear out all the collected event listeners
  328. clearAllEvents(server);
  329. // Relay all the events
  330. relayEvents(mongoClient, server);
  331. // Add listeners
  332. addListeners(mongoClient, server);
  333. // Check if we are really speaking to a mongos
  334. const ismaster = topology.lastIsMaster();
  335. // Set the topology
  336. assignTopology(mongoClient, topology);
  337. // Do we actually have a mongos
  338. if (ismaster && ismaster.msg === 'isdbgrid') {
  339. // Destroy the current connection
  340. topology.close();
  341. // Create mongos connection instead
  342. return createTopology(mongoClient, 'mongos', options, callback);
  343. }
  344. // Fire all the events
  345. replayEvents(mongoClient, collectedEvents);
  346. // Otherwise callback
  347. callback(err, topology);
  348. });
  349. }
  350. function createTopology(mongoClient, topologyType, options, callback) {
  351. // Pass in the promise library
  352. options.promiseLibrary = mongoClient.s.promiseLibrary;
  353. const translationOptions = {};
  354. if (topologyType === 'unified') translationOptions.createServers = false;
  355. // Set default options
  356. const servers = translateOptions(options, translationOptions);
  357. // Create the topology
  358. let topology;
  359. if (topologyType === 'mongos') {
  360. topology = new Mongos(servers, options);
  361. } else if (topologyType === 'replicaset') {
  362. topology = new ReplSet(servers, options);
  363. }
  364. // Add listeners
  365. addListeners(mongoClient, topology);
  366. // Propagate the events to the client
  367. relayEvents(mongoClient, topology);
  368. // Open the connection
  369. topology.connect(options, (err, newTopology) => {
  370. if (err) {
  371. topology.close(true);
  372. return callback(err);
  373. }
  374. assignTopology(mongoClient, newTopology);
  375. callback(null, newTopology);
  376. });
  377. }
  378. function createUnifiedOptions(finalOptions, options) {
  379. const childOptions = [
  380. 'mongos',
  381. 'server',
  382. 'db',
  383. 'replset',
  384. 'db_options',
  385. 'server_options',
  386. 'rs_options',
  387. 'mongos_options'
  388. ];
  389. const noMerge = ['readconcern', 'compression'];
  390. for (const name in options) {
  391. if (noMerge.indexOf(name.toLowerCase()) !== -1) {
  392. finalOptions[name] = options[name];
  393. } else if (childOptions.indexOf(name.toLowerCase()) !== -1) {
  394. finalOptions = mergeOptions(finalOptions, options[name], false);
  395. } else {
  396. if (
  397. options[name] &&
  398. typeof options[name] === 'object' &&
  399. !Buffer.isBuffer(options[name]) &&
  400. !Array.isArray(options[name])
  401. ) {
  402. finalOptions = mergeOptions(finalOptions, options[name], true);
  403. } else {
  404. finalOptions[name] = options[name];
  405. }
  406. }
  407. }
  408. return finalOptions;
  409. }
  410. function handleConnectCallback(err, topology, callback) {
  411. return process.nextTick(() => {
  412. try {
  413. callback(err, topology);
  414. } catch (err) {
  415. if (topology) topology.close();
  416. throw err;
  417. }
  418. });
  419. }
  420. function legacyTransformUrlOptions(object) {
  421. return mergeOptions(createUnifiedOptions({}, object), object, false);
  422. }
  423. /**
  424. * Logout user from server, fire off on all connections and remove all auth info.
  425. *
  426. * @method
  427. * @param {MongoClient} mongoClient The MongoClient instance on which to logout.
  428. * @param {object} [options] Optional settings. See MongoClient.prototype.logout for a list of options.
  429. * @param {Db~resultCallback} [callback] The command result callback
  430. */
  431. function logout(mongoClient, dbName, callback) {
  432. mongoClient.topology.logout(dbName, err => {
  433. if (err) return callback(err);
  434. callback(null, true);
  435. });
  436. }
  437. function mergeOptions(target, source, flatten) {
  438. for (const name in source) {
  439. if (source[name] && typeof source[name] === 'object' && flatten) {
  440. target = mergeOptions(target, source[name], flatten);
  441. } else {
  442. target[name] = source[name];
  443. }
  444. }
  445. return target;
  446. }
  447. function relayEvents(mongoClient, topology) {
  448. const serverOrCommandEvents = [
  449. 'serverOpening',
  450. 'serverDescriptionChanged',
  451. 'serverHeartbeatStarted',
  452. 'serverHeartbeatSucceeded',
  453. 'serverHeartbeatFailed',
  454. 'serverClosed',
  455. 'topologyOpening',
  456. 'topologyClosed',
  457. 'topologyDescriptionChanged',
  458. 'commandStarted',
  459. 'commandSucceeded',
  460. 'commandFailed',
  461. 'joined',
  462. 'left',
  463. 'ping',
  464. 'ha'
  465. ];
  466. serverOrCommandEvents.forEach(event => {
  467. topology.on(event, (object1, object2) => {
  468. mongoClient.emit(event, object1, object2);
  469. });
  470. });
  471. }
  472. //
  473. // Replay any events due to single server connection switching to Mongos
  474. //
  475. function replayEvents(mongoClient, events) {
  476. for (let i = 0; i < events.length; i++) {
  477. mongoClient.emit(events[i].event, events[i].object1, events[i].object2);
  478. }
  479. }
  480. const LEGACY_OPTIONS_MAP = validOptionNames.reduce((obj, name) => {
  481. obj[name.toLowerCase()] = name;
  482. return obj;
  483. }, {});
  484. function transformUrlOptions(_object) {
  485. let object = Object.assign({ servers: _object.hosts }, _object.options);
  486. for (let name in object) {
  487. const camelCaseName = LEGACY_OPTIONS_MAP[name];
  488. if (camelCaseName) {
  489. object[camelCaseName] = object[name];
  490. }
  491. }
  492. if (_object.auth) {
  493. const auth = _object.auth;
  494. for (let i in auth) {
  495. if (auth[i]) {
  496. object[i] = auth[i];
  497. }
  498. }
  499. if (auth.username) {
  500. object.auth = auth;
  501. object.user = auth.username;
  502. }
  503. if (auth.db) {
  504. object.authSource = object.authSource || auth.db;
  505. }
  506. }
  507. if (_object.defaultDatabase) {
  508. object.dbName = _object.defaultDatabase;
  509. }
  510. if (object.maxpoolsize) {
  511. object.poolSize = object.maxpoolsize;
  512. }
  513. if (object.readconcernlevel) {
  514. object.readConcern = { level: object.readconcernlevel };
  515. }
  516. if (object.wtimeoutms) {
  517. object.wtimeout = object.wtimeoutms;
  518. }
  519. return object;
  520. }
  521. function translateOptions(options, translationOptions) {
  522. translationOptions = Object.assign({}, { createServers: true }, translationOptions);
  523. // If we have a readPreference passed in by the db options
  524. if (typeof options.readPreference === 'string' || typeof options.read_preference === 'string') {
  525. options.readPreference = new ReadPreference(options.readPreference || options.read_preference);
  526. }
  527. // Do we have readPreference tags, add them
  528. if (options.readPreference && (options.readPreferenceTags || options.read_preference_tags)) {
  529. options.readPreference.tags = options.readPreferenceTags || options.read_preference_tags;
  530. }
  531. // Do we have maxStalenessSeconds
  532. if (options.maxStalenessSeconds) {
  533. options.readPreference.maxStalenessSeconds = options.maxStalenessSeconds;
  534. }
  535. // Set the socket and connection timeouts
  536. if (options.socketTimeoutMS == null) options.socketTimeoutMS = 360000;
  537. if (options.connectTimeoutMS == null) options.connectTimeoutMS = 30000;
  538. if (!translationOptions.createServers) {
  539. return;
  540. }
  541. // Create server instances
  542. return options.servers.map(serverObj => {
  543. return serverObj.domain_socket
  544. ? new Server(serverObj.domain_socket, 27017, options)
  545. : new Server(serverObj.host, serverObj.port, options);
  546. });
  547. }
  548. // Validate options object
  549. function validOptions(options) {
  550. const _validOptions = validOptionNames.concat(legacyOptionNames);
  551. for (const name in options) {
  552. if (ignoreOptionNames.indexOf(name) !== -1) {
  553. continue;
  554. }
  555. if (_validOptions.indexOf(name) === -1 && options.validateOptions) {
  556. return new MongoError(`option ${name} is not supported`);
  557. } else if (_validOptions.indexOf(name) === -1) {
  558. console.warn(`the options [${name}] is not supported`);
  559. }
  560. if (legacyOptionNames.indexOf(name) !== -1) {
  561. console.warn(
  562. `the server/replset/mongos/db options are deprecated, ` +
  563. `all their options are supported at the top level of the options object [${validOptionNames}]`
  564. );
  565. }
  566. }
  567. }
  568. module.exports = { connectOp, logout, validOptions };