123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748 |
- /**
- * Module dependencies.
- */
- var transports = require('./transports/index');
- var Emitter = require('component-emitter');
- var debug = require('debug')('engine.io-client:socket');
- var index = require('indexof');
- var parser = require('engine.io-parser');
- var parseuri = require('parseuri');
- var parseqs = require('parseqs');
- /**
- * Module exports.
- */
- module.exports = Socket;
- /**
- * Socket constructor.
- *
- * @param {String|Object} uri or options
- * @param {Object} options
- * @api public
- */
- function Socket (uri, opts) {
- if (!(this instanceof Socket)) return new Socket(uri, opts);
- opts = opts || {};
- if (uri && 'object' === typeof uri) {
- opts = uri;
- uri = null;
- }
- if (uri) {
- uri = parseuri(uri);
- opts.hostname = uri.host;
- opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
- opts.port = uri.port;
- if (uri.query) opts.query = uri.query;
- } else if (opts.host) {
- opts.hostname = parseuri(opts.host).host;
- }
- this.secure = null != opts.secure ? opts.secure
- : (typeof location !== 'undefined' && 'https:' === location.protocol);
- if (opts.hostname && !opts.port) {
- // if no port is specified manually, use the protocol default
- opts.port = this.secure ? '443' : '80';
- }
- this.agent = opts.agent || false;
- this.hostname = opts.hostname ||
- (typeof location !== 'undefined' ? location.hostname : 'localhost');
- this.port = opts.port || (typeof location !== 'undefined' && location.port
- ? location.port
- : (this.secure ? 443 : 80));
- this.query = opts.query || {};
- if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
- this.upgrade = false !== opts.upgrade;
- this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
- this.forceJSONP = !!opts.forceJSONP;
- this.jsonp = false !== opts.jsonp;
- this.forceBase64 = !!opts.forceBase64;
- this.enablesXDR = !!opts.enablesXDR;
- this.withCredentials = false !== opts.withCredentials;
- this.timestampParam = opts.timestampParam || 't';
- this.timestampRequests = opts.timestampRequests;
- this.transports = opts.transports || ['polling', 'websocket'];
- this.transportOptions = opts.transportOptions || {};
- this.readyState = '';
- this.writeBuffer = [];
- this.prevBufferLen = 0;
- this.policyPort = opts.policyPort || 843;
- this.rememberUpgrade = opts.rememberUpgrade || false;
- this.binaryType = null;
- this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
- this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
- if (true === this.perMessageDeflate) this.perMessageDeflate = {};
- if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
- this.perMessageDeflate.threshold = 1024;
- }
- // SSL options for Node.js client
- this.pfx = opts.pfx || undefined;
- this.key = opts.key || undefined;
- this.passphrase = opts.passphrase || undefined;
- this.cert = opts.cert || undefined;
- this.ca = opts.ca || undefined;
- this.ciphers = opts.ciphers || undefined;
- this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
- this.forceNode = !!opts.forceNode;
- // detect ReactNative environment
- this.isReactNative = (typeof navigator !== 'undefined' && typeof navigator.product === 'string' && navigator.product.toLowerCase() === 'reactnative');
- // other options for Node.js or ReactNative client
- if (typeof self === 'undefined' || this.isReactNative) {
- if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
- this.extraHeaders = opts.extraHeaders;
- }
- if (opts.localAddress) {
- this.localAddress = opts.localAddress;
- }
- }
- // set on handshake
- this.id = null;
- this.upgrades = null;
- this.pingInterval = null;
- this.pingTimeout = null;
- // set on heartbeat
- this.pingIntervalTimer = null;
- this.pingTimeoutTimer = null;
- this.open();
- }
- Socket.priorWebsocketSuccess = false;
- /**
- * Mix in `Emitter`.
- */
- Emitter(Socket.prototype);
- /**
- * Protocol version.
- *
- * @api public
- */
- Socket.protocol = parser.protocol; // this is an int
- /**
- * Expose deps for legacy compatibility
- * and standalone browser access.
- */
- Socket.Socket = Socket;
- Socket.Transport = require('./transport');
- Socket.transports = require('./transports/index');
- Socket.parser = require('engine.io-parser');
- /**
- * Creates transport of the given type.
- *
- * @param {String} transport name
- * @return {Transport}
- * @api private
- */
- Socket.prototype.createTransport = function (name) {
- debug('creating transport "%s"', name);
- var query = clone(this.query);
- // append engine.io protocol identifier
- query.EIO = parser.protocol;
- // transport name
- query.transport = name;
- // per-transport options
- var options = this.transportOptions[name] || {};
- // session id if we already have one
- if (this.id) query.sid = this.id;
- var transport = new transports[name]({
- query: query,
- socket: this,
- agent: options.agent || this.agent,
- hostname: options.hostname || this.hostname,
- port: options.port || this.port,
- secure: options.secure || this.secure,
- path: options.path || this.path,
- forceJSONP: options.forceJSONP || this.forceJSONP,
- jsonp: options.jsonp || this.jsonp,
- forceBase64: options.forceBase64 || this.forceBase64,
- enablesXDR: options.enablesXDR || this.enablesXDR,
- withCredentials: options.withCredentials || this.withCredentials,
- timestampRequests: options.timestampRequests || this.timestampRequests,
- timestampParam: options.timestampParam || this.timestampParam,
- policyPort: options.policyPort || this.policyPort,
- pfx: options.pfx || this.pfx,
- key: options.key || this.key,
- passphrase: options.passphrase || this.passphrase,
- cert: options.cert || this.cert,
- ca: options.ca || this.ca,
- ciphers: options.ciphers || this.ciphers,
- rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
- perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
- extraHeaders: options.extraHeaders || this.extraHeaders,
- forceNode: options.forceNode || this.forceNode,
- localAddress: options.localAddress || this.localAddress,
- requestTimeout: options.requestTimeout || this.requestTimeout,
- protocols: options.protocols || void (0),
- isReactNative: this.isReactNative
- });
- return transport;
- };
- function clone (obj) {
- var o = {};
- for (var i in obj) {
- if (obj.hasOwnProperty(i)) {
- o[i] = obj[i];
- }
- }
- return o;
- }
- /**
- * Initializes transport to use and starts probe.
- *
- * @api private
- */
- Socket.prototype.open = function () {
- var transport;
- if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
- transport = 'websocket';
- } else if (0 === this.transports.length) {
- // Emit error on next tick so it can be listened to
- var self = this;
- setTimeout(function () {
- self.emit('error', 'No transports available');
- }, 0);
- return;
- } else {
- transport = this.transports[0];
- }
- this.readyState = 'opening';
- // Retry with the next transport if the transport is disabled (jsonp: false)
- try {
- transport = this.createTransport(transport);
- } catch (e) {
- this.transports.shift();
- this.open();
- return;
- }
- transport.open();
- this.setTransport(transport);
- };
- /**
- * Sets the current transport. Disables the existing one (if any).
- *
- * @api private
- */
- Socket.prototype.setTransport = function (transport) {
- debug('setting transport %s', transport.name);
- var self = this;
- if (this.transport) {
- debug('clearing existing transport %s', this.transport.name);
- this.transport.removeAllListeners();
- }
- // set up transport
- this.transport = transport;
- // set up transport listeners
- transport
- .on('drain', function () {
- self.onDrain();
- })
- .on('packet', function (packet) {
- self.onPacket(packet);
- })
- .on('error', function (e) {
- self.onError(e);
- })
- .on('close', function () {
- self.onClose('transport close');
- });
- };
- /**
- * Probes a transport.
- *
- * @param {String} transport name
- * @api private
- */
- Socket.prototype.probe = function (name) {
- debug('probing transport "%s"', name);
- var transport = this.createTransport(name, { probe: 1 });
- var failed = false;
- var self = this;
- Socket.priorWebsocketSuccess = false;
- function onTransportOpen () {
- if (self.onlyBinaryUpgrades) {
- var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
- failed = failed || upgradeLosesBinary;
- }
- if (failed) return;
- debug('probe transport "%s" opened', name);
- transport.send([{ type: 'ping', data: 'probe' }]);
- transport.once('packet', function (msg) {
- if (failed) return;
- if ('pong' === msg.type && 'probe' === msg.data) {
- debug('probe transport "%s" pong', name);
- self.upgrading = true;
- self.emit('upgrading', transport);
- if (!transport) return;
- Socket.priorWebsocketSuccess = 'websocket' === transport.name;
- debug('pausing current transport "%s"', self.transport.name);
- self.transport.pause(function () {
- if (failed) return;
- if ('closed' === self.readyState) return;
- debug('changing transport and sending upgrade packet');
- cleanup();
- self.setTransport(transport);
- transport.send([{ type: 'upgrade' }]);
- self.emit('upgrade', transport);
- transport = null;
- self.upgrading = false;
- self.flush();
- });
- } else {
- debug('probe transport "%s" failed', name);
- var err = new Error('probe error');
- err.transport = transport.name;
- self.emit('upgradeError', err);
- }
- });
- }
- function freezeTransport () {
- if (failed) return;
- // Any callback called by transport should be ignored since now
- failed = true;
- cleanup();
- transport.close();
- transport = null;
- }
- // Handle any error that happens while probing
- function onerror (err) {
- var error = new Error('probe error: ' + err);
- error.transport = transport.name;
- freezeTransport();
- debug('probe transport "%s" failed because of error: %s', name, err);
- self.emit('upgradeError', error);
- }
- function onTransportClose () {
- onerror('transport closed');
- }
- // When the socket is closed while we're probing
- function onclose () {
- onerror('socket closed');
- }
- // When the socket is upgraded while we're probing
- function onupgrade (to) {
- if (transport && to.name !== transport.name) {
- debug('"%s" works - aborting "%s"', to.name, transport.name);
- freezeTransport();
- }
- }
- // Remove all listeners on the transport and on self
- function cleanup () {
- transport.removeListener('open', onTransportOpen);
- transport.removeListener('error', onerror);
- transport.removeListener('close', onTransportClose);
- self.removeListener('close', onclose);
- self.removeListener('upgrading', onupgrade);
- }
- transport.once('open', onTransportOpen);
- transport.once('error', onerror);
- transport.once('close', onTransportClose);
- this.once('close', onclose);
- this.once('upgrading', onupgrade);
- transport.open();
- };
- /**
- * Called when connection is deemed open.
- *
- * @api public
- */
- Socket.prototype.onOpen = function () {
- debug('socket open');
- this.readyState = 'open';
- Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
- this.emit('open');
- this.flush();
- // we check for `readyState` in case an `open`
- // listener already closed the socket
- if ('open' === this.readyState && this.upgrade && this.transport.pause) {
- debug('starting upgrade probes');
- for (var i = 0, l = this.upgrades.length; i < l; i++) {
- this.probe(this.upgrades[i]);
- }
- }
- };
- /**
- * Handles a packet.
- *
- * @api private
- */
- Socket.prototype.onPacket = function (packet) {
- if ('opening' === this.readyState || 'open' === this.readyState ||
- 'closing' === this.readyState) {
- debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
- this.emit('packet', packet);
- // Socket is live - any packet counts
- this.emit('heartbeat');
- switch (packet.type) {
- case 'open':
- this.onHandshake(JSON.parse(packet.data));
- break;
- case 'pong':
- this.setPing();
- this.emit('pong');
- break;
- case 'error':
- var err = new Error('server error');
- err.code = packet.data;
- this.onError(err);
- break;
- case 'message':
- this.emit('data', packet.data);
- this.emit('message', packet.data);
- break;
- }
- } else {
- debug('packet received with socket readyState "%s"', this.readyState);
- }
- };
- /**
- * Called upon handshake completion.
- *
- * @param {Object} handshake obj
- * @api private
- */
- Socket.prototype.onHandshake = function (data) {
- this.emit('handshake', data);
- this.id = data.sid;
- this.transport.query.sid = data.sid;
- this.upgrades = this.filterUpgrades(data.upgrades);
- this.pingInterval = data.pingInterval;
- this.pingTimeout = data.pingTimeout;
- this.onOpen();
- // In case open handler closes socket
- if ('closed' === this.readyState) return;
- this.setPing();
- // Prolong liveness of socket on heartbeat
- this.removeListener('heartbeat', this.onHeartbeat);
- this.on('heartbeat', this.onHeartbeat);
- };
- /**
- * Resets ping timeout.
- *
- * @api private
- */
- Socket.prototype.onHeartbeat = function (timeout) {
- clearTimeout(this.pingTimeoutTimer);
- var self = this;
- self.pingTimeoutTimer = setTimeout(function () {
- if ('closed' === self.readyState) return;
- self.onClose('ping timeout');
- }, timeout || (self.pingInterval + self.pingTimeout));
- };
- /**
- * Pings server every `this.pingInterval` and expects response
- * within `this.pingTimeout` or closes connection.
- *
- * @api private
- */
- Socket.prototype.setPing = function () {
- var self = this;
- clearTimeout(self.pingIntervalTimer);
- self.pingIntervalTimer = setTimeout(function () {
- debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
- self.ping();
- self.onHeartbeat(self.pingTimeout);
- }, self.pingInterval);
- };
- /**
- * Sends a ping packet.
- *
- * @api private
- */
- Socket.prototype.ping = function () {
- var self = this;
- this.sendPacket('ping', function () {
- self.emit('ping');
- });
- };
- /**
- * Called on `drain` event
- *
- * @api private
- */
- Socket.prototype.onDrain = function () {
- this.writeBuffer.splice(0, this.prevBufferLen);
- // setting prevBufferLen = 0 is very important
- // for example, when upgrading, upgrade packet is sent over,
- // and a nonzero prevBufferLen could cause problems on `drain`
- this.prevBufferLen = 0;
- if (0 === this.writeBuffer.length) {
- this.emit('drain');
- } else {
- this.flush();
- }
- };
- /**
- * Flush write buffers.
- *
- * @api private
- */
- Socket.prototype.flush = function () {
- if ('closed' !== this.readyState && this.transport.writable &&
- !this.upgrading && this.writeBuffer.length) {
- debug('flushing %d packets in socket', this.writeBuffer.length);
- this.transport.send(this.writeBuffer);
- // keep track of current length of writeBuffer
- // splice writeBuffer and callbackBuffer on `drain`
- this.prevBufferLen = this.writeBuffer.length;
- this.emit('flush');
- }
- };
- /**
- * Sends a message.
- *
- * @param {String} message.
- * @param {Function} callback function.
- * @param {Object} options.
- * @return {Socket} for chaining.
- * @api public
- */
- Socket.prototype.write =
- Socket.prototype.send = function (msg, options, fn) {
- this.sendPacket('message', msg, options, fn);
- return this;
- };
- /**
- * Sends a packet.
- *
- * @param {String} packet type.
- * @param {String} data.
- * @param {Object} options.
- * @param {Function} callback function.
- * @api private
- */
- Socket.prototype.sendPacket = function (type, data, options, fn) {
- if ('function' === typeof data) {
- fn = data;
- data = undefined;
- }
- if ('function' === typeof options) {
- fn = options;
- options = null;
- }
- if ('closing' === this.readyState || 'closed' === this.readyState) {
- return;
- }
- options = options || {};
- options.compress = false !== options.compress;
- var packet = {
- type: type,
- data: data,
- options: options
- };
- this.emit('packetCreate', packet);
- this.writeBuffer.push(packet);
- if (fn) this.once('flush', fn);
- this.flush();
- };
- /**
- * Closes the connection.
- *
- * @api private
- */
- Socket.prototype.close = function () {
- if ('opening' === this.readyState || 'open' === this.readyState) {
- this.readyState = 'closing';
- var self = this;
- if (this.writeBuffer.length) {
- this.once('drain', function () {
- if (this.upgrading) {
- waitForUpgrade();
- } else {
- close();
- }
- });
- } else if (this.upgrading) {
- waitForUpgrade();
- } else {
- close();
- }
- }
- function close () {
- self.onClose('forced close');
- debug('socket closing - telling transport to close');
- self.transport.close();
- }
- function cleanupAndClose () {
- self.removeListener('upgrade', cleanupAndClose);
- self.removeListener('upgradeError', cleanupAndClose);
- close();
- }
- function waitForUpgrade () {
- // wait for upgrade to finish since we can't send packets while pausing a transport
- self.once('upgrade', cleanupAndClose);
- self.once('upgradeError', cleanupAndClose);
- }
- return this;
- };
- /**
- * Called upon transport error
- *
- * @api private
- */
- Socket.prototype.onError = function (err) {
- debug('socket error %j', err);
- Socket.priorWebsocketSuccess = false;
- this.emit('error', err);
- this.onClose('transport error', err);
- };
- /**
- * Called upon transport close.
- *
- * @api private
- */
- Socket.prototype.onClose = function (reason, desc) {
- if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
- debug('socket close with reason: "%s"', reason);
- var self = this;
- // clear timers
- clearTimeout(this.pingIntervalTimer);
- clearTimeout(this.pingTimeoutTimer);
- // stop event from firing again for transport
- this.transport.removeAllListeners('close');
- // ensure transport won't stay open
- this.transport.close();
- // ignore further transport communication
- this.transport.removeAllListeners();
- // set ready state
- this.readyState = 'closed';
- // clear session id
- this.id = null;
- // emit close event
- this.emit('close', reason, desc);
- // clean buffers after, so users can still
- // grab the buffers on `close` event
- self.writeBuffer = [];
- self.prevBufferLen = 0;
- }
- };
- /**
- * Filters upgrades, returning only those matching client transports.
- *
- * @param {Array} server upgrades
- * @api private
- *
- */
- Socket.prototype.filterUpgrades = function (upgrades) {
- var filteredUpgrades = [];
- for (var i = 0, j = upgrades.length; i < j; i++) {
- if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
- }
- return filteredUpgrades;
- };
|