From 5d39b42b7a8f32389a28b456ca1cd855a8f47b31 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 30 Mar 2017 17:27:03 +0200 Subject: [PATCH] Add dependencies and first test --- bigchaindb/web/websocket_server.py | 56 ++++++++++++++++++++++++++++++ setup.py | 1 + tests/web/test_websocket_server.py | 15 ++++++++ 3 files changed, 72 insertions(+) create mode 100644 bigchaindb/web/websocket_server.py create mode 100644 tests/web/test_websocket_server.py diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py new file mode 100644 index 00000000..7a5b3d77 --- /dev/null +++ b/bigchaindb/web/websocket_server.py @@ -0,0 +1,56 @@ +"""WebSocket server for the BigchainDB Event Stream API.""" + +import asyncio +from uuid import uuid4 + +from aiohttp import web + + +class PoisonPill: + pass + + +POISON_PILL = PoisonPill() + + +class Dispatcher: + + def __init__(self, event_source): + self.event_source = event_source + self.subscribers = {} + + def subscribe(self, uuid, ws): + self.subscribers[uuid] = ws + + @asyncio.coroutine + def publish(self): + while True: + event = yield from self.event_source.get() + if event == POISON_PILL: + return + for uuid, ws in self.subscribers.items(): + ws.send_str(event) + + +@asyncio.coroutine +def websocket_handler(request): + ws = web.WebSocketResponse() + yield from ws.prepare(request) + uuid = uuid4() + request.app['dispatcher'].subscribe(uuid, ws) + while True: + # Consume input buffer + yield from ws.receive() + return ws + + +def init_app(event_source, loop=None): + dispatcher = Dispatcher(event_source) + + # Schedule the dispatcher + loop.create_task(dispatcher.publish()) + + app = web.Application(loop=loop) + app['dispatcher'] = dispatcher + app.router.add_get('/', websocket_handler) + return app diff --git a/setup.py b/setup.py index c05b554a..ee8871d4 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ tests_require = [ 'pytest-mock', 'pytest-xdist', 'pytest-flask', + 'pytest-aiohttp', 'tox', ] + docs_require diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py new file mode 100644 index 00000000..fb6d555b --- /dev/null +++ b/tests/web/test_websocket_server.py @@ -0,0 +1,15 @@ +import asyncio + + +@asyncio.coroutine +def test_websocket(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect('/') + yield from event_source.put('antani') + yield from event_source.put(POISON_PILL) + result = yield from ws.receive() + assert result.data == 'antani'