socket.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports/index');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parseqs = require('parseqs');
  11. /**
  12. * Module exports.
  13. */
  14. module.exports = Socket;
  15. /**
  16. * Socket constructor.
  17. *
  18. * @param {String|Object} uri or options
  19. * @param {Object} options
  20. * @api public
  21. */
  22. function Socket (uri, opts) {
  23. if (!(this instanceof Socket)) return new Socket(uri, opts);
  24. opts = opts || {};
  25. if (uri && 'object' === typeof uri) {
  26. opts = uri;
  27. uri = null;
  28. }
  29. if (uri) {
  30. uri = parseuri(uri);
  31. opts.hostname = uri.host;
  32. opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
  33. opts.port = uri.port;
  34. if (uri.query) opts.query = uri.query;
  35. } else if (opts.host) {
  36. opts.hostname = parseuri(opts.host).host;
  37. }
  38. this.secure = null != opts.secure ? opts.secure
  39. : (typeof location !== 'undefined' && 'https:' === location.protocol);
  40. if (opts.hostname && !opts.port) {
  41. // if no port is specified manually, use the protocol default
  42. opts.port = this.secure ? '443' : '80';
  43. }
  44. this.agent = opts.agent || false;
  45. this.hostname = opts.hostname ||
  46. (typeof location !== 'undefined' ? location.hostname : 'localhost');
  47. this.port = opts.port || (typeof location !== 'undefined' && location.port
  48. ? location.port
  49. : (this.secure ? 443 : 80));
  50. this.query = opts.query || {};
  51. if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
  52. this.upgrade = false !== opts.upgrade;
  53. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  54. this.forceJSONP = !!opts.forceJSONP;
  55. this.jsonp = false !== opts.jsonp;
  56. this.forceBase64 = !!opts.forceBase64;
  57. this.enablesXDR = !!opts.enablesXDR;
  58. this.withCredentials = false !== opts.withCredentials;
  59. this.timestampParam = opts.timestampParam || 't';
  60. this.timestampRequests = opts.timestampRequests;
  61. this.transports = opts.transports || ['polling', 'websocket'];
  62. this.transportOptions = opts.transportOptions || {};
  63. this.readyState = '';
  64. this.writeBuffer = [];
  65. this.prevBufferLen = 0;
  66. this.policyPort = opts.policyPort || 843;
  67. this.rememberUpgrade = opts.rememberUpgrade || false;
  68. this.binaryType = null;
  69. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  70. this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
  71. if (true === this.perMessageDeflate) this.perMessageDeflate = {};
  72. if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
  73. this.perMessageDeflate.threshold = 1024;
  74. }
  75. // SSL options for Node.js client
  76. this.pfx = opts.pfx || undefined;
  77. this.key = opts.key || undefined;
  78. this.passphrase = opts.passphrase || undefined;
  79. this.cert = opts.cert || undefined;
  80. this.ca = opts.ca || undefined;
  81. this.ciphers = opts.ciphers || undefined;
  82. this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
  83. this.forceNode = !!opts.forceNode;
  84. // detect ReactNative environment
  85. this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative');
  86. // other options for Node.js or ReactNative client
  87. if (typeof self === 'undefined' || this.isReactNative) {
  88. if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
  89. this.extraHeaders = opts.extraHeaders;
  90. }
  91. if (opts.localAddress) {
  92. this.localAddress = opts.localAddress;
  93. }
  94. }
  95. // set on handshake
  96. this.id = null;
  97. this.upgrades = null;
  98. this.pingInterval = null;
  99. this.pingTimeout = null;
  100. // set on heartbeat
  101. this.pingIntervalTimer = null;
  102. this.pingTimeoutTimer = null;
  103. this.open();
  104. }
  105. Socket.priorWebsocketSuccess = false;
  106. /**
  107. * Mix in `Emitter`.
  108. */
  109. Emitter(Socket.prototype);
  110. /**
  111. * Protocol version.
  112. *
  113. * @api public
  114. */
  115. Socket.protocol = parser.protocol; // this is an int
  116. /**
  117. * Expose deps for legacy compatibility
  118. * and standalone browser access.
  119. */
  120. Socket.Socket = Socket;
  121. Socket.Transport = require('./transport');
  122. Socket.transports = require('./transports/index');
  123. Socket.parser = require('engine.io-parser');
  124. /**
  125. * Creates transport of the given type.
  126. *
  127. * @param {String} transport name
  128. * @return {Transport}
  129. * @api private
  130. */
  131. Socket.prototype.createTransport = function (name) {
  132. debug('creating transport "%s"', name);
  133. var query = clone(this.query);
  134. // append engine.io protocol identifier
  135. query.EIO = parser.protocol;
  136. // transport name
  137. query.transport = name;
  138. // per-transport options
  139. var options = this.transportOptions[name] || {};
  140. // session id if we already have one
  141. if (this.id) query.sid = this.id;
  142. var transport = new transports[name]({
  143. query: query,
  144. socket: this,
  145. agent: options.agent || this.agent,
  146. hostname: options.hostname || this.hostname,
  147. port: options.port || this.port,
  148. secure: options.secure || this.secure,
  149. path: options.path || this.path,
  150. forceJSONP: options.forceJSONP || this.forceJSONP,
  151. jsonp: options.jsonp || this.jsonp,
  152. forceBase64: options.forceBase64 || this.forceBase64,
  153. enablesXDR: options.enablesXDR || this.enablesXDR,
  154. withCredentials: options.withCredentials || this.withCredentials,
  155. timestampRequests: options.timestampRequests || this.timestampRequests,
  156. timestampParam: options.timestampParam || this.timestampParam,
  157. policyPort: options.policyPort || this.policyPort,
  158. pfx: options.pfx || this.pfx,
  159. key: options.key || this.key,
  160. passphrase: options.passphrase || this.passphrase,
  161. cert: options.cert || this.cert,
  162. ca: options.ca || this.ca,
  163. ciphers: options.ciphers || this.ciphers,
  164. rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
  165. perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
  166. extraHeaders: options.extraHeaders || this.extraHeaders,
  167. forceNode: options.forceNode || this.forceNode,
  168. localAddress: options.localAddress || this.localAddress,
  169. requestTimeout: options.requestTimeout || this.requestTimeout,
  170. protocols: options.protocols || void (0),
  171. isReactNative: this.isReactNative
  172. });
  173. return transport;
  174. };
  175. function clone (obj) {
  176. var o = {};
  177. for (var i in obj) {
  178. if (obj.hasOwnProperty(i)) {
  179. o[i] = obj[i];
  180. }
  181. }
  182. return o;
  183. }
  184. /**
  185. * Initializes transport to use and starts probe.
  186. *
  187. * @api private
  188. */
  189. Socket.prototype.open = function () {
  190. var transport;
  191. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
  192. transport = 'websocket';
  193. } else if (0 === this.transports.length) {
  194. // Emit error on next tick so it can be listened to
  195. var self = this;
  196. setTimeout(function () {
  197. self.emit('error', 'No transports available');
  198. }, 0);
  199. return;
  200. } else {
  201. transport = this.transports[0];
  202. }
  203. this.readyState = 'opening';
  204. // Retry with the next transport if the transport is disabled (jsonp: false)
  205. try {
  206. transport = this.createTransport(transport);
  207. } catch (e) {
  208. this.transports.shift();
  209. this.open();
  210. return;
  211. }
  212. transport.open();
  213. this.setTransport(transport);
  214. };
  215. /**
  216. * Sets the current transport. Disables the existing one (if any).
  217. *
  218. * @api private
  219. */
  220. Socket.prototype.setTransport = function (transport) {
  221. debug('setting transport %s', transport.name);
  222. var self = this;
  223. if (this.transport) {
  224. debug('clearing existing transport %s', this.transport.name);
  225. this.transport.removeAllListeners();
  226. }
  227. // set up transport
  228. this.transport = transport;
  229. // set up transport listeners
  230. transport
  231. .on('drain', function () {
  232. self.onDrain();
  233. })
  234. .on('packet', function (packet) {
  235. self.onPacket(packet);
  236. })
  237. .on('error', function (e) {
  238. self.onError(e);
  239. })
  240. .on('close', function () {
  241. self.onClose('transport close');
  242. });
  243. };
  244. /**
  245. * Probes a transport.
  246. *
  247. * @param {String} transport name
  248. * @api private
  249. */
  250. Socket.prototype.probe = function (name) {
  251. debug('probing transport "%s"', name);
  252. var transport = this.createTransport(name, { probe: 1 });
  253. var failed = false;
  254. var self = this;
  255. Socket.priorWebsocketSuccess = false;
  256. function onTransportOpen () {
  257. if (self.onlyBinaryUpgrades) {
  258. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  259. failed = failed || upgradeLosesBinary;
  260. }
  261. if (failed) return;
  262. debug('probe transport "%s" opened', name);
  263. transport.send([{ type: 'ping', data: 'probe' }]);
  264. transport.once('packet', function (msg) {
  265. if (failed) return;
  266. if ('pong' === msg.type && 'probe' === msg.data) {
  267. debug('probe transport "%s" pong', name);
  268. self.upgrading = true;
  269. self.emit('upgrading', transport);
  270. if (!transport) return;
  271. Socket.priorWebsocketSuccess = 'websocket' === transport.name;
  272. debug('pausing current transport "%s"', self.transport.name);
  273. self.transport.pause(function () {
  274. if (failed) return;
  275. if ('closed' === self.readyState) return;
  276. debug('changing transport and sending upgrade packet');
  277. cleanup();
  278. self.setTransport(transport);
  279. transport.send([{ type: 'upgrade' }]);
  280. self.emit('upgrade', transport);
  281. transport = null;
  282. self.upgrading = false;
  283. self.flush();
  284. });
  285. } else {
  286. debug('probe transport "%s" failed', name);
  287. var err = new Error('probe error');
  288. err.transport = transport.name;
  289. self.emit('upgradeError', err);
  290. }
  291. });
  292. }
  293. function freezeTransport () {
  294. if (failed) return;
  295. // Any callback called by transport should be ignored since now
  296. failed = true;
  297. cleanup();
  298. transport.close();
  299. transport = null;
  300. }
  301. // Handle any error that happens while probing
  302. function onerror (err) {
  303. var error = new Error('probe error: ' + err);
  304. error.transport = transport.name;
  305. freezeTransport();
  306. debug('probe transport "%s" failed because of error: %s', name, err);
  307. self.emit('upgradeError', error);
  308. }
  309. function onTransportClose () {
  310. onerror('transport closed');
  311. }
  312. // When the socket is closed while we're probing
  313. function onclose () {
  314. onerror('socket closed');
  315. }
  316. // When the socket is upgraded while we're probing
  317. function onupgrade (to) {
  318. if (transport && to.name !== transport.name) {
  319. debug('"%s" works - aborting "%s"', to.name, transport.name);
  320. freezeTransport();
  321. }
  322. }
  323. // Remove all listeners on the transport and on self
  324. function cleanup () {
  325. transport.removeListener('open', onTransportOpen);
  326. transport.removeListener('error', onerror);
  327. transport.removeListener('close', onTransportClose);
  328. self.removeListener('close', onclose);
  329. self.removeListener('upgrading', onupgrade);
  330. }
  331. transport.once('open', onTransportOpen);
  332. transport.once('error', onerror);
  333. transport.once('close', onTransportClose);
  334. this.once('close', onclose);
  335. this.once('upgrading', onupgrade);
  336. transport.open();
  337. };
  338. /**
  339. * Called when connection is deemed open.
  340. *
  341. * @api public
  342. */
  343. Socket.prototype.onOpen = function () {
  344. debug('socket open');
  345. this.readyState = 'open';
  346. Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  347. this.emit('open');
  348. this.flush();
  349. // we check for `readyState` in case an `open`
  350. // listener already closed the socket
  351. if ('open' === this.readyState && this.upgrade && this.transport.pause) {
  352. debug('starting upgrade probes');
  353. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  354. this.probe(this.upgrades[i]);
  355. }
  356. }
  357. };
  358. /**
  359. * Handles a packet.
  360. *
  361. * @api private
  362. */
  363. Socket.prototype.onPacket = function (packet) {
  364. if ('opening' === this.readyState || 'open' === this.readyState ||
  365. 'closing' === this.readyState) {
  366. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  367. this.emit('packet', packet);
  368. // Socket is live - any packet counts
  369. this.emit('heartbeat');
  370. switch (packet.type) {
  371. case 'open':
  372. this.onHandshake(JSON.parse(packet.data));
  373. break;
  374. case 'pong':
  375. this.setPing();
  376. this.emit('pong');
  377. break;
  378. case 'error':
  379. var err = new Error('server error');
  380. err.code = packet.data;
  381. this.onError(err);
  382. break;
  383. case 'message':
  384. this.emit('data', packet.data);
  385. this.emit('message', packet.data);
  386. break;
  387. }
  388. } else {
  389. debug('packet received with socket readyState "%s"', this.readyState);
  390. }
  391. };
  392. /**
  393. * Called upon handshake completion.
  394. *
  395. * @param {Object} handshake obj
  396. * @api private
  397. */
  398. Socket.prototype.onHandshake = function (data) {
  399. this.emit('handshake', data);
  400. this.id = data.sid;
  401. this.transport.query.sid = data.sid;
  402. this.upgrades = this.filterUpgrades(data.upgrades);
  403. this.pingInterval = data.pingInterval;
  404. this.pingTimeout = data.pingTimeout;
  405. this.onOpen();
  406. // In case open handler closes socket
  407. if ('closed' === this.readyState) return;
  408. this.setPing();
  409. // Prolong liveness of socket on heartbeat
  410. this.removeListener('heartbeat', this.onHeartbeat);
  411. this.on('heartbeat', this.onHeartbeat);
  412. };
  413. /**
  414. * Resets ping timeout.
  415. *
  416. * @api private
  417. */
  418. Socket.prototype.onHeartbeat = function (timeout) {
  419. clearTimeout(this.pingTimeoutTimer);
  420. var self = this;
  421. self.pingTimeoutTimer = setTimeout(function () {
  422. if ('closed' === self.readyState) return;
  423. self.onClose('ping timeout');
  424. }, timeout || (self.pingInterval + self.pingTimeout));
  425. };
  426. /**
  427. * Pings server every `this.pingInterval` and expects response
  428. * within `this.pingTimeout` or closes connection.
  429. *
  430. * @api private
  431. */
  432. Socket.prototype.setPing = function () {
  433. var self = this;
  434. clearTimeout(self.pingIntervalTimer);
  435. self.pingIntervalTimer = setTimeout(function () {
  436. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  437. self.ping();
  438. self.onHeartbeat(self.pingTimeout);
  439. }, self.pingInterval);
  440. };
  441. /**
  442. * Sends a ping packet.
  443. *
  444. * @api private
  445. */
  446. Socket.prototype.ping = function () {
  447. var self = this;
  448. this.sendPacket('ping', function () {
  449. self.emit('ping');
  450. });
  451. };
  452. /**
  453. * Called on `drain` event
  454. *
  455. * @api private
  456. */
  457. Socket.prototype.onDrain = function () {
  458. this.writeBuffer.splice(0, this.prevBufferLen);
  459. // setting prevBufferLen = 0 is very important
  460. // for example, when upgrading, upgrade packet is sent over,
  461. // and a nonzero prevBufferLen could cause problems on `drain`
  462. this.prevBufferLen = 0;
  463. if (0 === this.writeBuffer.length) {
  464. this.emit('drain');
  465. } else {
  466. this.flush();
  467. }
  468. };
  469. /**
  470. * Flush write buffers.
  471. *
  472. * @api private
  473. */
  474. Socket.prototype.flush = function () {
  475. if ('closed' !== this.readyState && this.transport.writable &&
  476. !this.upgrading && this.writeBuffer.length) {
  477. debug('flushing %d packets in socket', this.writeBuffer.length);
  478. this.transport.send(this.writeBuffer);
  479. // keep track of current length of writeBuffer
  480. // splice writeBuffer and callbackBuffer on `drain`
  481. this.prevBufferLen = this.writeBuffer.length;
  482. this.emit('flush');
  483. }
  484. };
  485. /**
  486. * Sends a message.
  487. *
  488. * @param {String} message.
  489. * @param {Function} callback function.
  490. * @param {Object} options.
  491. * @return {Socket} for chaining.
  492. * @api public
  493. */
  494. Socket.prototype.write =
  495. Socket.prototype.send = function (msg, options, fn) {
  496. this.sendPacket('message', msg, options, fn);
  497. return this;
  498. };
  499. /**
  500. * Sends a packet.
  501. *
  502. * @param {String} packet type.
  503. * @param {String} data.
  504. * @param {Object} options.
  505. * @param {Function} callback function.
  506. * @api private
  507. */
  508. Socket.prototype.sendPacket = function (type, data, options, fn) {
  509. if ('function' === typeof data) {
  510. fn = data;
  511. data = undefined;
  512. }
  513. if ('function' === typeof options) {
  514. fn = options;
  515. options = null;
  516. }
  517. if ('closing' === this.readyState || 'closed' === this.readyState) {
  518. return;
  519. }
  520. options = options || {};
  521. options.compress = false !== options.compress;
  522. var packet = {
  523. type: type,
  524. data: data,
  525. options: options
  526. };
  527. this.emit('packetCreate', packet);
  528. this.writeBuffer.push(packet);
  529. if (fn) this.once('flush', fn);
  530. this.flush();
  531. };
  532. /**
  533. * Closes the connection.
  534. *
  535. * @api private
  536. */
  537. Socket.prototype.close = function () {
  538. if ('opening' === this.readyState || 'open' === this.readyState) {
  539. this.readyState = 'closing';
  540. var self = this;
  541. if (this.writeBuffer.length) {
  542. this.once('drain', function () {
  543. if (this.upgrading) {
  544. waitForUpgrade();
  545. } else {
  546. close();
  547. }
  548. });
  549. } else if (this.upgrading) {
  550. waitForUpgrade();
  551. } else {
  552. close();
  553. }
  554. }
  555. function close () {
  556. self.onClose('forced close');
  557. debug('socket closing - telling transport to close');
  558. self.transport.close();
  559. }
  560. function cleanupAndClose () {
  561. self.removeListener('upgrade', cleanupAndClose);
  562. self.removeListener('upgradeError', cleanupAndClose);
  563. close();
  564. }
  565. function waitForUpgrade () {
  566. // wait for upgrade to finish since we can't send packets while pausing a transport
  567. self.once('upgrade', cleanupAndClose);
  568. self.once('upgradeError', cleanupAndClose);
  569. }
  570. return this;
  571. };
  572. /**
  573. * Called upon transport error
  574. *
  575. * @api private
  576. */
  577. Socket.prototype.onError = function (err) {
  578. debug('socket error %j', err);
  579. Socket.priorWebsocketSuccess = false;
  580. this.emit('error', err);
  581. this.onClose('transport error', err);
  582. };
  583. /**
  584. * Called upon transport close.
  585. *
  586. * @api private
  587. */
  588. Socket.prototype.onClose = function (reason, desc) {
  589. if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
  590. debug('socket close with reason: "%s"', reason);
  591. var self = this;
  592. // clear timers
  593. clearTimeout(this.pingIntervalTimer);
  594. clearTimeout(this.pingTimeoutTimer);
  595. // stop event from firing again for transport
  596. this.transport.removeAllListeners('close');
  597. // ensure transport won't stay open
  598. this.transport.close();
  599. // ignore further transport communication
  600. this.transport.removeAllListeners();
  601. // set ready state
  602. this.readyState = 'closed';
  603. // clear session id
  604. this.id = null;
  605. // emit close event
  606. this.emit('close', reason, desc);
  607. // clean buffers after, so users can still
  608. // grab the buffers on `close` event
  609. self.writeBuffer = [];
  610. self.prevBufferLen = 0;
  611. }
  612. };
  613. /**
  614. * Filters upgrades, returning only those matching client transports.
  615. *
  616. * @param {Array} server upgrades
  617. * @api private
  618. *
  619. */
  620. Socket.prototype.filterUpgrades = function (upgrades) {
  621. var filteredUpgrades = [];
  622. for (var i = 0, j = upgrades.length; i < j; i++) {
  623. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  624. }
  625. return filteredUpgrades;
  626. };