socket.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports/index');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parseqs = require('parseqs');
  11. /**
  12. * Module exports.
  13. */
  14. module.exports = Socket;
  15. /**
  16. * Socket constructor.
  17. *
  18. * @param {String|Object} uri or options
  19. * @param {Object} options
  20. * @api public
  21. */
  22. function Socket (uri, opts) {
  23. if (!(this instanceof Socket)) return new Socket(uri, opts);
  24. opts = opts || {};
  25. if (uri && 'object' === typeof uri) {
  26. opts = uri;
  27. uri = null;
  28. }
  29. if (uri) {
  30. uri = parseuri(uri);
  31. opts.hostname = uri.host;
  32. opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
  33. opts.port = uri.port;
  34. if (uri.query) opts.query = uri.query;
  35. } else if (opts.host) {
  36. opts.hostname = parseuri(opts.host).host;
  37. }
  38. this.secure = null != opts.secure ? opts.secure
  39. : (typeof location !== 'undefined' && 'https:' === location.protocol);
  40. if (opts.hostname && !opts.port) {
  41. // if no port is specified manually, use the protocol default
  42. opts.port = this.secure ? '443' : '80';
  43. }
  44. this.agent = opts.agent || false;
  45. this.hostname = opts.hostname ||
  46. (typeof location !== 'undefined' ? location.hostname : 'localhost');
  47. this.port = opts.port || (typeof location !== 'undefined' && location.port
  48. ? location.port
  49. : (this.secure ? 443 : 80));
  50. this.query = opts.query || {};
  51. if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
  52. this.upgrade = false !== opts.upgrade;
  53. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  54. this.forceJSONP = !!opts.forceJSONP;
  55. this.jsonp = false !== opts.jsonp;
  56. this.forceBase64 = !!opts.forceBase64;
  57. this.enablesXDR = !!opts.enablesXDR;
  58. this.timestampParam = opts.timestampParam || 't';
  59. this.timestampRequests = opts.timestampRequests;
  60. this.transports = opts.transports || ['polling', 'websocket'];
  61. this.transportOptions = opts.transportOptions || {};
  62. this.readyState = '';
  63. this.writeBuffer = [];
  64. this.prevBufferLen = 0;
  65. this.policyPort = opts.policyPort || 843;
  66. this.rememberUpgrade = opts.rememberUpgrade || false;
  67. this.binaryType = null;
  68. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  69. this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
  70. if (true === this.perMessageDeflate) this.perMessageDeflate = {};
  71. if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
  72. this.perMessageDeflate.threshold = 1024;
  73. }
  74. // SSL options for Node.js client
  75. this.pfx = opts.pfx || null;
  76. this.key = opts.key || null;
  77. this.passphrase = opts.passphrase || null;
  78. this.cert = opts.cert || null;
  79. this.ca = opts.ca || null;
  80. this.ciphers = opts.ciphers || null;
  81. this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
  82. this.forceNode = !!opts.forceNode;
  83. // detect ReactNative environment
  84. this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative');
  85. // other options for Node.js or ReactNative client
  86. if (typeof self === 'undefined' || this.isReactNative) {
  87. if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
  88. this.extraHeaders = opts.extraHeaders;
  89. }
  90. if (opts.localAddress) {
  91. this.localAddress = opts.localAddress;
  92. }
  93. }
  94. // set on handshake
  95. this.id = null;
  96. this.upgrades = null;
  97. this.pingInterval = null;
  98. this.pingTimeout = null;
  99. // set on heartbeat
  100. this.pingIntervalTimer = null;
  101. this.pingTimeoutTimer = null;
  102. this.open();
  103. }
  104. Socket.priorWebsocketSuccess = false;
  105. /**
  106. * Mix in `Emitter`.
  107. */
  108. Emitter(Socket.prototype);
  109. /**
  110. * Protocol version.
  111. *
  112. * @api public
  113. */
  114. Socket.protocol = parser.protocol; // this is an int
  115. /**
  116. * Expose deps for legacy compatibility
  117. * and standalone browser access.
  118. */
  119. Socket.Socket = Socket;
  120. Socket.Transport = require('./transport');
  121. Socket.transports = require('./transports/index');
  122. Socket.parser = require('engine.io-parser');
  123. /**
  124. * Creates transport of the given type.
  125. *
  126. * @param {String} transport name
  127. * @return {Transport}
  128. * @api private
  129. */
  130. Socket.prototype.createTransport = function (name) {
  131. debug('creating transport "%s"', name);
  132. var query = clone(this.query);
  133. // append engine.io protocol identifier
  134. query.EIO = parser.protocol;
  135. // transport name
  136. query.transport = name;
  137. // per-transport options
  138. var options = this.transportOptions[name] || {};
  139. // session id if we already have one
  140. if (this.id) query.sid = this.id;
  141. var transport = new transports[name]({
  142. query: query,
  143. socket: this,
  144. agent: options.agent || this.agent,
  145. hostname: options.hostname || this.hostname,
  146. port: options.port || this.port,
  147. secure: options.secure || this.secure,
  148. path: options.path || this.path,
  149. forceJSONP: options.forceJSONP || this.forceJSONP,
  150. jsonp: options.jsonp || this.jsonp,
  151. forceBase64: options.forceBase64 || this.forceBase64,
  152. enablesXDR: options.enablesXDR || this.enablesXDR,
  153. timestampRequests: options.timestampRequests || this.timestampRequests,
  154. timestampParam: options.timestampParam || this.timestampParam,
  155. policyPort: options.policyPort || this.policyPort,
  156. pfx: options.pfx || this.pfx,
  157. key: options.key || this.key,
  158. passphrase: options.passphrase || this.passphrase,
  159. cert: options.cert || this.cert,
  160. ca: options.ca || this.ca,
  161. ciphers: options.ciphers || this.ciphers,
  162. rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
  163. perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
  164. extraHeaders: options.extraHeaders || this.extraHeaders,
  165. forceNode: options.forceNode || this.forceNode,
  166. localAddress: options.localAddress || this.localAddress,
  167. requestTimeout: options.requestTimeout || this.requestTimeout,
  168. protocols: options.protocols || void (0),
  169. isReactNative: this.isReactNative
  170. });
  171. return transport;
  172. };
  173. function clone (obj) {
  174. var o = {};
  175. for (var i in obj) {
  176. if (obj.hasOwnProperty(i)) {
  177. o[i] = obj[i];
  178. }
  179. }
  180. return o;
  181. }
  182. /**
  183. * Initializes transport to use and starts probe.
  184. *
  185. * @api private
  186. */
  187. Socket.prototype.open = function () {
  188. var transport;
  189. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
  190. transport = 'websocket';
  191. } else if (0 === this.transports.length) {
  192. // Emit error on next tick so it can be listened to
  193. var self = this;
  194. setTimeout(function () {
  195. self.emit('error', 'No transports available');
  196. }, 0);
  197. return;
  198. } else {
  199. transport = this.transports[0];
  200. }
  201. this.readyState = 'opening';
  202. // Retry with the next transport if the transport is disabled (jsonp: false)
  203. try {
  204. transport = this.createTransport(transport);
  205. } catch (e) {
  206. this.transports.shift();
  207. this.open();
  208. return;
  209. }
  210. transport.open();
  211. this.setTransport(transport);
  212. };
  213. /**
  214. * Sets the current transport. Disables the existing one (if any).
  215. *
  216. * @api private
  217. */
  218. Socket.prototype.setTransport = function (transport) {
  219. debug('setting transport %s', transport.name);
  220. var self = this;
  221. if (this.transport) {
  222. debug('clearing existing transport %s', this.transport.name);
  223. this.transport.removeAllListeners();
  224. }
  225. // set up transport
  226. this.transport = transport;
  227. // set up transport listeners
  228. transport
  229. .on('drain', function () {
  230. self.onDrain();
  231. })
  232. .on('packet', function (packet) {
  233. self.onPacket(packet);
  234. })
  235. .on('error', function (e) {
  236. self.onError(e);
  237. })
  238. .on('close', function () {
  239. self.onClose('transport close');
  240. });
  241. };
  242. /**
  243. * Probes a transport.
  244. *
  245. * @param {String} transport name
  246. * @api private
  247. */
  248. Socket.prototype.probe = function (name) {
  249. debug('probing transport "%s"', name);
  250. var transport = this.createTransport(name, { probe: 1 });
  251. var failed = false;
  252. var self = this;
  253. Socket.priorWebsocketSuccess = false;
  254. function onTransportOpen () {
  255. if (self.onlyBinaryUpgrades) {
  256. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  257. failed = failed || upgradeLosesBinary;
  258. }
  259. if (failed) return;
  260. debug('probe transport "%s" opened', name);
  261. transport.send([{ type: 'ping', data: 'probe' }]);
  262. transport.once('packet', function (msg) {
  263. if (failed) return;
  264. if ('pong' === msg.type && 'probe' === msg.data) {
  265. debug('probe transport "%s" pong', name);
  266. self.upgrading = true;
  267. self.emit('upgrading', transport);
  268. if (!transport) return;
  269. Socket.priorWebsocketSuccess = 'websocket' === transport.name;
  270. debug('pausing current transport "%s"', self.transport.name);
  271. self.transport.pause(function () {
  272. if (failed) return;
  273. if ('closed' === self.readyState) return;
  274. debug('changing transport and sending upgrade packet');
  275. cleanup();
  276. self.setTransport(transport);
  277. transport.send([{ type: 'upgrade' }]);
  278. self.emit('upgrade', transport);
  279. transport = null;
  280. self.upgrading = false;
  281. self.flush();
  282. });
  283. } else {
  284. debug('probe transport "%s" failed', name);
  285. var err = new Error('probe error');
  286. err.transport = transport.name;
  287. self.emit('upgradeError', err);
  288. }
  289. });
  290. }
  291. function freezeTransport () {
  292. if (failed) return;
  293. // Any callback called by transport should be ignored since now
  294. failed = true;
  295. cleanup();
  296. transport.close();
  297. transport = null;
  298. }
  299. // Handle any error that happens while probing
  300. function onerror (err) {
  301. var error = new Error('probe error: ' + err);
  302. error.transport = transport.name;
  303. freezeTransport();
  304. debug('probe transport "%s" failed because of error: %s', name, err);
  305. self.emit('upgradeError', error);
  306. }
  307. function onTransportClose () {
  308. onerror('transport closed');
  309. }
  310. // When the socket is closed while we're probing
  311. function onclose () {
  312. onerror('socket closed');
  313. }
  314. // When the socket is upgraded while we're probing
  315. function onupgrade (to) {
  316. if (transport && to.name !== transport.name) {
  317. debug('"%s" works - aborting "%s"', to.name, transport.name);
  318. freezeTransport();
  319. }
  320. }
  321. // Remove all listeners on the transport and on self
  322. function cleanup () {
  323. transport.removeListener('open', onTransportOpen);
  324. transport.removeListener('error', onerror);
  325. transport.removeListener('close', onTransportClose);
  326. self.removeListener('close', onclose);
  327. self.removeListener('upgrading', onupgrade);
  328. }
  329. transport.once('open', onTransportOpen);
  330. transport.once('error', onerror);
  331. transport.once('close', onTransportClose);
  332. this.once('close', onclose);
  333. this.once('upgrading', onupgrade);
  334. transport.open();
  335. };
  336. /**
  337. * Called when connection is deemed open.
  338. *
  339. * @api public
  340. */
  341. Socket.prototype.onOpen = function () {
  342. debug('socket open');
  343. this.readyState = 'open';
  344. Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  345. this.emit('open');
  346. this.flush();
  347. // we check for `readyState` in case an `open`
  348. // listener already closed the socket
  349. if ('open' === this.readyState && this.upgrade && this.transport.pause) {
  350. debug('starting upgrade probes');
  351. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  352. this.probe(this.upgrades[i]);
  353. }
  354. }
  355. };
  356. /**
  357. * Handles a packet.
  358. *
  359. * @api private
  360. */
  361. Socket.prototype.onPacket = function (packet) {
  362. if ('opening' === this.readyState || 'open' === this.readyState ||
  363. 'closing' === this.readyState) {
  364. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  365. this.emit('packet', packet);
  366. // Socket is live - any packet counts
  367. this.emit('heartbeat');
  368. switch (packet.type) {
  369. case 'open':
  370. this.onHandshake(JSON.parse(packet.data));
  371. break;
  372. case 'pong':
  373. this.setPing();
  374. this.emit('pong');
  375. break;
  376. case 'error':
  377. var err = new Error('server error');
  378. err.code = packet.data;
  379. this.onError(err);
  380. break;
  381. case 'message':
  382. this.emit('data', packet.data);
  383. this.emit('message', packet.data);
  384. break;
  385. }
  386. } else {
  387. debug('packet received with socket readyState "%s"', this.readyState);
  388. }
  389. };
  390. /**
  391. * Called upon handshake completion.
  392. *
  393. * @param {Object} handshake obj
  394. * @api private
  395. */
  396. Socket.prototype.onHandshake = function (data) {
  397. this.emit('handshake', data);
  398. this.id = data.sid;
  399. this.transport.query.sid = data.sid;
  400. this.upgrades = this.filterUpgrades(data.upgrades);
  401. this.pingInterval = data.pingInterval;
  402. this.pingTimeout = data.pingTimeout;
  403. this.onOpen();
  404. // In case open handler closes socket
  405. if ('closed' === this.readyState) return;
  406. this.setPing();
  407. // Prolong liveness of socket on heartbeat
  408. this.removeListener('heartbeat', this.onHeartbeat);
  409. this.on('heartbeat', this.onHeartbeat);
  410. };
  411. /**
  412. * Resets ping timeout.
  413. *
  414. * @api private
  415. */
  416. Socket.prototype.onHeartbeat = function (timeout) {
  417. clearTimeout(this.pingTimeoutTimer);
  418. var self = this;
  419. self.pingTimeoutTimer = setTimeout(function () {
  420. if ('closed' === self.readyState) return;
  421. self.onClose('ping timeout');
  422. }, timeout || (self.pingInterval + self.pingTimeout));
  423. };
  424. /**
  425. * Pings server every `this.pingInterval` and expects response
  426. * within `this.pingTimeout` or closes connection.
  427. *
  428. * @api private
  429. */
  430. Socket.prototype.setPing = function () {
  431. var self = this;
  432. clearTimeout(self.pingIntervalTimer);
  433. self.pingIntervalTimer = setTimeout(function () {
  434. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  435. self.ping();
  436. self.onHeartbeat(self.pingTimeout);
  437. }, self.pingInterval);
  438. };
  439. /**
  440. * Sends a ping packet.
  441. *
  442. * @api private
  443. */
  444. Socket.prototype.ping = function () {
  445. var self = this;
  446. this.sendPacket('ping', function () {
  447. self.emit('ping');
  448. });
  449. };
  450. /**
  451. * Called on `drain` event
  452. *
  453. * @api private
  454. */
  455. Socket.prototype.onDrain = function () {
  456. this.writeBuffer.splice(0, this.prevBufferLen);
  457. // setting prevBufferLen = 0 is very important
  458. // for example, when upgrading, upgrade packet is sent over,
  459. // and a nonzero prevBufferLen could cause problems on `drain`
  460. this.prevBufferLen = 0;
  461. if (0 === this.writeBuffer.length) {
  462. this.emit('drain');
  463. } else {
  464. this.flush();
  465. }
  466. };
  467. /**
  468. * Flush write buffers.
  469. *
  470. * @api private
  471. */
  472. Socket.prototype.flush = function () {
  473. if ('closed' !== this.readyState && this.transport.writable &&
  474. !this.upgrading && this.writeBuffer.length) {
  475. debug('flushing %d packets in socket', this.writeBuffer.length);
  476. this.transport.send(this.writeBuffer);
  477. // keep track of current length of writeBuffer
  478. // splice writeBuffer and callbackBuffer on `drain`
  479. this.prevBufferLen = this.writeBuffer.length;
  480. this.emit('flush');
  481. }
  482. };
  483. /**
  484. * Sends a message.
  485. *
  486. * @param {String} message.
  487. * @param {Function} callback function.
  488. * @param {Object} options.
  489. * @return {Socket} for chaining.
  490. * @api public
  491. */
  492. Socket.prototype.write =
  493. Socket.prototype.send = function (msg, options, fn) {
  494. this.sendPacket('message', msg, options, fn);
  495. return this;
  496. };
  497. /**
  498. * Sends a packet.
  499. *
  500. * @param {String} packet type.
  501. * @param {String} data.
  502. * @param {Object} options.
  503. * @param {Function} callback function.
  504. * @api private
  505. */
  506. Socket.prototype.sendPacket = function (type, data, options, fn) {
  507. if ('function' === typeof data) {
  508. fn = data;
  509. data = undefined;
  510. }
  511. if ('function' === typeof options) {
  512. fn = options;
  513. options = null;
  514. }
  515. if ('closing' === this.readyState || 'closed' === this.readyState) {
  516. return;
  517. }
  518. options = options || {};
  519. options.compress = false !== options.compress;
  520. var packet = {
  521. type: type,
  522. data: data,
  523. options: options
  524. };
  525. this.emit('packetCreate', packet);
  526. this.writeBuffer.push(packet);
  527. if (fn) this.once('flush', fn);
  528. this.flush();
  529. };
  530. /**
  531. * Closes the connection.
  532. *
  533. * @api private
  534. */
  535. Socket.prototype.close = function () {
  536. if ('opening' === this.readyState || 'open' === this.readyState) {
  537. this.readyState = 'closing';
  538. var self = this;
  539. if (this.writeBuffer.length) {
  540. this.once('drain', function () {
  541. if (this.upgrading) {
  542. waitForUpgrade();
  543. } else {
  544. close();
  545. }
  546. });
  547. } else if (this.upgrading) {
  548. waitForUpgrade();
  549. } else {
  550. close();
  551. }
  552. }
  553. function close () {
  554. self.onClose('forced close');
  555. debug('socket closing - telling transport to close');
  556. self.transport.close();
  557. }
  558. function cleanupAndClose () {
  559. self.removeListener('upgrade', cleanupAndClose);
  560. self.removeListener('upgradeError', cleanupAndClose);
  561. close();
  562. }
  563. function waitForUpgrade () {
  564. // wait for upgrade to finish since we can't send packets while pausing a transport
  565. self.once('upgrade', cleanupAndClose);
  566. self.once('upgradeError', cleanupAndClose);
  567. }
  568. return this;
  569. };
  570. /**
  571. * Called upon transport error
  572. *
  573. * @api private
  574. */
  575. Socket.prototype.onError = function (err) {
  576. debug('socket error %j', err);
  577. Socket.priorWebsocketSuccess = false;
  578. this.emit('error', err);
  579. this.onClose('transport error', err);
  580. };
  581. /**
  582. * Called upon transport close.
  583. *
  584. * @api private
  585. */
  586. Socket.prototype.onClose = function (reason, desc) {
  587. if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
  588. debug('socket close with reason: "%s"', reason);
  589. var self = this;
  590. // clear timers
  591. clearTimeout(this.pingIntervalTimer);
  592. clearTimeout(this.pingTimeoutTimer);
  593. // stop event from firing again for transport
  594. this.transport.removeAllListeners('close');
  595. // ensure transport won't stay open
  596. this.transport.close();
  597. // ignore further transport communication
  598. this.transport.removeAllListeners();
  599. // set ready state
  600. this.readyState = 'closed';
  601. // clear session id
  602. this.id = null;
  603. // emit close event
  604. this.emit('close', reason, desc);
  605. // clean buffers after, so users can still
  606. // grab the buffers on `close` event
  607. self.writeBuffer = [];
  608. self.prevBufferLen = 0;
  609. }
  610. };
  611. /**
  612. * Filters upgrades, returning only those matching client transports.
  613. *
  614. * @param {Array} server upgrades
  615. * @api private
  616. *
  617. */
  618. Socket.prototype.filterUpgrades = function (upgrades) {
  619. var filteredUpgrades = [];
  620. for (var i = 0, j = upgrades.length; i < j; i++) {
  621. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  622. }
  623. return filteredUpgrades;
  624. };