From c4e0a8e1db16dee8857568db396f26cf84229fc8 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 19 Oct 2017 12:19:43 +0200 Subject: [PATCH] Handle WS CLOSE properly --- bigchaindb/web/websocket_server.py | 34 ++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index bcf14f35..effc153c 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -70,6 +70,15 @@ class Dispatcher: self.subscribers[uuid] = websocket + def unsubscribe(self, uuid): + """Remove a websocket from the list of subscribers. + + Args: + uuid (str): a unique identifier for the websocket. + """ + + del self.subscribers[uuid] + @asyncio.coroutine def publish(self): """Publish new events to the subscribers.""" @@ -115,11 +124,16 @@ def websocket_handler(request): msg = yield from websocket.receive() except RuntimeError as e: logger.debug('Websocket exception: %s', str(e)) - return websocket - - if msg.type == aiohttp.WSMsgType.ERROR: + break + if msg.type == aiohttp.WSMsgType.CLOSED: + logger.debug('Websocket closed') + break + elif msg.type == aiohttp.WSMsgType.ERROR: logger.debug('Websocket exception: %s', websocket.exception()) - return websocket + break + + request.app['dispatcher'].unsubscribe(uuid) + return websocket def init_app(event_source, *, loop=None): @@ -157,3 +171,15 @@ def start(sync_event_source, loop=None): aiohttp.web.run_app(app, host=config['wsserver']['host'], port=config['wsserver']['port']) + + +if __name__ == '__main__': + def meow(queue): + while True: + yield from asyncio.sleep(1) + yield from queue.put('meow') + loop = asyncio.get_event_loop() + event_source = asyncio.Queue(loop=loop) + # loop.create_task(meow(event_source)) + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app)