From e8f8047cb6043519e8f530cb77fffc604dcf80d9 Mon Sep 17 00:00:00 2001 From: Jesse Gibson Date: Thu, 17 Nov 2016 14:07:26 -0700 Subject: [PATCH] Expose websocket events The Peer "class" now extends EventEmitter. Listening to any websocket events (e.g., "message", "close", "open", etc.) will not only subscribe to the current websocket, but all future websockets. This provides a much needed abstraction, since reconnection replaces the socket, which would typically destroy your listeners. --- lib/wsp/Peer.js | 132 ++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index 05f510b3..aafd3513 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -1,4 +1,9 @@ +/* eslint-disable no-underscore-dangle */ +'use strict'; + var WebSocket = require('ws'); +var Emitter = require('events'); +var util = require('util'); /** * Calculates backoff instances. @@ -38,8 +43,23 @@ Backoff.prototype.reset = function () { }; /** - * Create a websocket client and handle reconnect backoff logic. - * @param {String} url - A preformatted url (starts with ws://) + * Schedules the next connection, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function scheduleReconnect (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + var reconnect = peer.connect.bind(peer); + + return setTimeout(reconnect, time); +} + +/** + * Handles reconnections and defers messages until the socket is ready. + * @param {String} url - The address to connect to. * @param {Object} [options] - Override how the socket is managed. * @param {Object} options.backoff - Backoff options (see the constructor). * @class @@ -49,14 +69,38 @@ function Peer (url, options) { return new Peer(url, options); } + // Extend EventEmitter. + Emitter.call(this); + this.setMaxListeners(Infinity); + this.options = options || {}; - // Messages sent while offline. - this.offline = []; + // Messages sent before the socket is ready. + this.deferredMsgs = []; this.url = Peer.formatURL(url); this.backoff = new Backoff(this.options.backoff); - this.retry(url); + + // Set up the websocket. + this.connect(); + + var peer = this; + var reconnect = scheduleReconnect.bind(null, peer); + + // Handle reconnection. + this.on('close', reconnect); + this.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + reconnect(); + } + }); + + // Send deferred messages. + this.on('open', function () { + peer.drainQueue(); + peer.backoff.reset(); + }); + } /** @@ -70,92 +114,47 @@ Peer.formatURL = function (url) { return url.replace('http', 'ws'); }; +util.inherits(Peer, Emitter); var API = Peer.prototype; /** * Attempts a websocket connection. - * @param {String} url - The websocket URL. * @return {WebSocket} - The new websocket instance. */ -API.retry = function () { +API.connect = function () { var url = this.url; + // Open a new websocket. var socket = new WebSocket(url); + + // Re-use the previous listeners. + socket._events = this._events; + this.socket = socket; - this.retryOnDisconnect(socket); - - this.sendOnConnection(); - return socket; }; /** - * Sends the messages that couldn't be sent before once - * the connection is open. + * Sends all the messages in the deferred queue. * @return {Peer} - The context. */ -API.sendOnConnection = function () { +API.drainQueue = function () { var peer = this; - var queue = this.offline; - var socket = this.socket; - // Wait for the socket to connect. - socket.once('open', function () { - queue.forEach(function (msg) { - socket.send(msg); - }); - - peer.offline = []; + this.deferredMsgs.forEach(function (msg) { + peer.send(msg); }); + // Reset the queue. + this.deferredMsgs = []; + return this; }; -/** - * Schedules the next retry, according to the backoff. - * @param {Peer} peer - A peer instance. - * @return {Timeout} - The timeout value from `setTimeout`. - */ -function schedule (peer) { - var backoff = peer.backoff; - var time = backoff.time; - backoff.next(); - - return setTimeout(function () { - var socket = peer.retry(); - - // Successfully reconnected? Reset the backoff. - socket.once('open', backoff.reset.bind(backoff)); - }, time); -} - -/** - * Attaches handlers to the socket, attempting reconnection - * when it's closed. - * @param {WebSocket} socket - The websocket instance to bind to. - * @return {WebSocket} - The same websocket. - */ -API.retryOnDisconnect = function (socket) { - var peer = this; - - // Listen for socket close events. - socket.once('close', function () { - schedule(peer); - }); - - socket.on('error', function (error) { - if (error.code === 'ECONNREFUSED') { - schedule(peer); - } - }); - - return socket; -}; - /** * Send data through the socket, or add it to a queue - * of offline requests if it's not ready yet. + * of deferred messages if it's not ready yet. * @param {String} msg - The data to send. * @return {Peer} - The context. */ @@ -164,10 +163,11 @@ API.send = function (msg) { var state = socket.readyState; var ready = socket.OPEN; + // Make sure the socket is ready. if (state === ready) { socket.send(msg); } else { - this.offline.push(msg); + this.deferredMsgs.push(msg); } return this;