diff --git a/lib/wsp/Peer.js b/lib/wsp/Peer.js index 05f510b3..aafd3513 100644 --- a/lib/wsp/Peer.js +++ b/lib/wsp/Peer.js @@ -1,4 +1,9 @@ +/* eslint-disable no-underscore-dangle */ +'use strict'; + var WebSocket = require('ws'); +var Emitter = require('events'); +var util = require('util'); /** * Calculates backoff instances. @@ -38,8 +43,23 @@ Backoff.prototype.reset = function () { }; /** - * Create a websocket client and handle reconnect backoff logic. - * @param {String} url - A preformatted url (starts with ws://) + * Schedules the next connection, according to the backoff. + * @param {Peer} peer - A peer instance. + * @return {Timeout} - The timeout value from `setTimeout`. + */ +function scheduleReconnect (peer) { + var backoff = peer.backoff; + var time = backoff.time; + backoff.next(); + + var reconnect = peer.connect.bind(peer); + + return setTimeout(reconnect, time); +} + +/** + * Handles reconnections and defers messages until the socket is ready. + * @param {String} url - The address to connect to. * @param {Object} [options] - Override how the socket is managed. * @param {Object} options.backoff - Backoff options (see the constructor). * @class @@ -49,14 +69,38 @@ function Peer (url, options) { return new Peer(url, options); } + // Extend EventEmitter. + Emitter.call(this); + this.setMaxListeners(Infinity); + this.options = options || {}; - // Messages sent while offline. - this.offline = []; + // Messages sent before the socket is ready. + this.deferredMsgs = []; this.url = Peer.formatURL(url); this.backoff = new Backoff(this.options.backoff); - this.retry(url); + + // Set up the websocket. + this.connect(); + + var peer = this; + var reconnect = scheduleReconnect.bind(null, peer); + + // Handle reconnection. + this.on('close', reconnect); + this.on('error', function (error) { + if (error.code === 'ECONNREFUSED') { + reconnect(); + } + }); + + // Send deferred messages. + this.on('open', function () { + peer.drainQueue(); + peer.backoff.reset(); + }); + } /** @@ -70,92 +114,47 @@ Peer.formatURL = function (url) { return url.replace('http', 'ws'); }; +util.inherits(Peer, Emitter); var API = Peer.prototype; /** * Attempts a websocket connection. - * @param {String} url - The websocket URL. * @return {WebSocket} - The new websocket instance. */ -API.retry = function () { +API.connect = function () { var url = this.url; + // Open a new websocket. var socket = new WebSocket(url); + + // Re-use the previous listeners. + socket._events = this._events; + this.socket = socket; - this.retryOnDisconnect(socket); - - this.sendOnConnection(); - return socket; }; /** - * Sends the messages that couldn't be sent before once - * the connection is open. + * Sends all the messages in the deferred queue. * @return {Peer} - The context. */ -API.sendOnConnection = function () { +API.drainQueue = function () { var peer = this; - var queue = this.offline; - var socket = this.socket; - // Wait for the socket to connect. - socket.once('open', function () { - queue.forEach(function (msg) { - socket.send(msg); - }); - - peer.offline = []; + this.deferredMsgs.forEach(function (msg) { + peer.send(msg); }); + // Reset the queue. + this.deferredMsgs = []; + return this; }; -/** - * Schedules the next retry, according to the backoff. - * @param {Peer} peer - A peer instance. - * @return {Timeout} - The timeout value from `setTimeout`. - */ -function schedule (peer) { - var backoff = peer.backoff; - var time = backoff.time; - backoff.next(); - - return setTimeout(function () { - var socket = peer.retry(); - - // Successfully reconnected? Reset the backoff. - socket.once('open', backoff.reset.bind(backoff)); - }, time); -} - -/** - * Attaches handlers to the socket, attempting reconnection - * when it's closed. - * @param {WebSocket} socket - The websocket instance to bind to. - * @return {WebSocket} - The same websocket. - */ -API.retryOnDisconnect = function (socket) { - var peer = this; - - // Listen for socket close events. - socket.once('close', function () { - schedule(peer); - }); - - socket.on('error', function (error) { - if (error.code === 'ECONNREFUSED') { - schedule(peer); - } - }); - - return socket; -}; - /** * Send data through the socket, or add it to a queue - * of offline requests if it's not ready yet. + * of deferred messages if it's not ready yet. * @param {String} msg - The data to send. * @return {Peer} - The context. */ @@ -164,10 +163,11 @@ API.send = function (msg) { var state = socket.readyState; var ready = socket.OPEN; + // Make sure the socket is ready. if (state === ready) { socket.send(msg); } else { - this.offline.push(msg); + this.deferredMsgs.push(msg); } return this;