Expose websocket events

The Peer "class" now extends EventEmitter. Listening to any websocket
events (e.g., "message", "close", "open", etc.) will not only subscribe
to the current websocket, but all future websockets. This provides a
much needed abstraction, since reconnection replaces the socket, which
would typically destroy your listeners.
This commit is contained in:
Jesse Gibson 2016-11-17 14:07:26 -07:00
parent 2317a54d45
commit e8f8047cb6

View File

@ -1,4 +1,9 @@
/* eslint-disable no-underscore-dangle */
'use strict';
var WebSocket = require('ws');
var Emitter = require('events');
var util = require('util');
/**
* Calculates backoff instances.
@ -38,8 +43,23 @@ Backoff.prototype.reset = function () {
};
/**
* Create a websocket client and handle reconnect backoff logic.
* @param {String} url - A preformatted url (starts with ws://)
* Schedules the next connection, according to the backoff.
* @param {Peer} peer - A peer instance.
* @return {Timeout} - The timeout value from `setTimeout`.
*/
function scheduleReconnect (peer) {
var backoff = peer.backoff;
var time = backoff.time;
backoff.next();
var reconnect = peer.connect.bind(peer);
return setTimeout(reconnect, time);
}
/**
* Handles reconnections and defers messages until the socket is ready.
* @param {String} url - The address to connect to.
* @param {Object} [options] - Override how the socket is managed.
* @param {Object} options.backoff - Backoff options (see the constructor).
* @class
@ -49,14 +69,38 @@ function Peer (url, options) {
return new Peer(url, options);
}
// Extend EventEmitter.
Emitter.call(this);
this.setMaxListeners(Infinity);
this.options = options || {};
// Messages sent while offline.
this.offline = [];
// Messages sent before the socket is ready.
this.deferredMsgs = [];
this.url = Peer.formatURL(url);
this.backoff = new Backoff(this.options.backoff);
this.retry(url);
// Set up the websocket.
this.connect();
var peer = this;
var reconnect = scheduleReconnect.bind(null, peer);
// Handle reconnection.
this.on('close', reconnect);
this.on('error', function (error) {
if (error.code === 'ECONNREFUSED') {
reconnect();
}
});
// Send deferred messages.
this.on('open', function () {
peer.drainQueue();
peer.backoff.reset();
});
}
/**
@ -70,92 +114,47 @@ Peer.formatURL = function (url) {
return url.replace('http', 'ws');
};
util.inherits(Peer, Emitter);
var API = Peer.prototype;
/**
* Attempts a websocket connection.
* @param {String} url - The websocket URL.
* @return {WebSocket} - The new websocket instance.
*/
API.retry = function () {
API.connect = function () {
var url = this.url;
// Open a new websocket.
var socket = new WebSocket(url);
// Re-use the previous listeners.
socket._events = this._events;
this.socket = socket;
this.retryOnDisconnect(socket);
this.sendOnConnection();
return socket;
};
/**
* Sends the messages that couldn't be sent before once
* the connection is open.
* Sends all the messages in the deferred queue.
* @return {Peer} - The context.
*/
API.sendOnConnection = function () {
API.drainQueue = function () {
var peer = this;
var queue = this.offline;
var socket = this.socket;
// Wait for the socket to connect.
socket.once('open', function () {
queue.forEach(function (msg) {
socket.send(msg);
});
peer.offline = [];
this.deferredMsgs.forEach(function (msg) {
peer.send(msg);
});
// Reset the queue.
this.deferredMsgs = [];
return this;
};
/**
* Schedules the next retry, according to the backoff.
* @param {Peer} peer - A peer instance.
* @return {Timeout} - The timeout value from `setTimeout`.
*/
function schedule (peer) {
var backoff = peer.backoff;
var time = backoff.time;
backoff.next();
return setTimeout(function () {
var socket = peer.retry();
// Successfully reconnected? Reset the backoff.
socket.once('open', backoff.reset.bind(backoff));
}, time);
}
/**
* Attaches handlers to the socket, attempting reconnection
* when it's closed.
* @param {WebSocket} socket - The websocket instance to bind to.
* @return {WebSocket} - The same websocket.
*/
API.retryOnDisconnect = function (socket) {
var peer = this;
// Listen for socket close events.
socket.once('close', function () {
schedule(peer);
});
socket.on('error', function (error) {
if (error.code === 'ECONNREFUSED') {
schedule(peer);
}
});
return socket;
};
/**
* Send data through the socket, or add it to a queue
* of offline requests if it's not ready yet.
* of deferred messages if it's not ready yet.
* @param {String} msg - The data to send.
* @return {Peer} - The context.
*/
@ -164,10 +163,11 @@ API.send = function (msg) {
var state = socket.readyState;
var ready = socket.OPEN;
// Make sure the socket is ready.
if (state === ready) {
socket.send(msg);
} else {
this.offline.push(msg);
this.deferredMsgs.push(msg);
}
return this;