From f23faaa65fa1bdbe487266432faea9e4331cecae Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 09:16:22 +0200 Subject: [PATCH] Add WebSocket server --- bigchaindb/pipelines/election.py | 2 +- bigchaindb/processes.py | 12 +- bigchaindb/web/views/base.py | 2 +- bigchaindb/web/websocket_server.py | 103 ++++++++++++---- setup.py | 1 + tests/web/test_info.py | 2 +- tests/web/test_websocket_server.py | 183 +++++++++++++++++++++++++---- 7 files changed, 246 insertions(+), 59 deletions(-) diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 8f3116cc..fc7cb077 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -81,7 +81,7 @@ class Election: elif result['status'] == self.bigchain.BLOCK_VALID: event_type = EventTypes.BLOCK_VALID - event = Event(event_type, {'block_id': block_id}) + event = Event(event_type, self.bigchain.get_block(block_id)) self.event_handler.put_event(event) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 5194c05a..205cdd3c 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,9 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale -from bigchaindb.pipelines.events_consumer_example import events_consumer from bigchaindb.events import setup_events_queue -from bigchaindb.web import server +from bigchaindb.web import server, websocket_server logger = logging.getLogger(__name__) @@ -52,10 +51,11 @@ def start(): p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() - # start the example events consumer - logger.info('Starting the events consumer example') - p_events_consumer = events_consumer(events_queue) - p_events_consumer.start() + logger.info('WebSocket server started') + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(events_queue,)) + p_websocket_server.start() # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 5a0ec97b..5ab409b0 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -25,4 +25,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) + return 'ws://localhost:9985/' diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 6915d54a..dc320754 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -1,15 +1,64 @@ """WebSocket server for the BigchainDB Event Stream API.""" +# NOTE +# +# This module contains some functions and utilities that might belong to other +# modules. For now, I prefer to keep everything in this module. Why? Because +# those functions are needed only here. +# +# When we will extend this part of the project and we find that we need those +# functionalities elsewhere, we can start creating new modules and organizing +# things in a better way. + + +import json import asyncio import logging +import threading from uuid import uuid4 import aiohttp from aiohttp import web +from bigchaindb.events import EventTypes + logger = logging.getLogger(__name__) POISON_PILL = 'POISON_PILL' +EVENTS_ENDPOINT = '/api/v1/streams/' + + +def _put_into_capped_queue(queue, value): + """Put a new item in a capped queue. + + If the queue reached its limit, get the first element + ready and put the new one. Note that the first element + will be lost (that's the purpose of a capped queue). + + Args: + queue: a queue + value: the value to put + """ + while True: + try: + queue.put_nowait(value) + return + except asyncio.QueueFull: + queue.get_nowait() + + +def _multiprocessing_to_asyncio(in_queue, out_queue, loop): + """Bridge between a synchronous multiprocessing queue + and an asynchronous asyncio queue. + + Args: + in_queue (multiprocessing.Queue): input queue + out_queue (asyncio.Queue): output queue + """ + + while True: + value = in_queue.get() + loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value) class Dispatcher: @@ -45,10 +94,27 @@ class Dispatcher: while True: event = yield from self.event_source.get() + str_buffer = [] + if event == POISON_PILL: return - for uuid, websocket in self.subscribers.items(): - websocket.send_str(event) + + if isinstance(event, str): + str_buffer.append(event) + + elif event.type == EventTypes.BLOCK_VALID: + block = event.data + + for tx in block['block']['transactions']: + asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] + data = {'blockid': block['id'], + 'assetid': asset_id, + 'txid': tx['id']} + str_buffer.append(json.dumps(data)) + + for _, websocket in self.subscribers.items(): + for str_item in str_buffer: + websocket.send_str(str_item) @asyncio.coroutine @@ -83,37 +149,22 @@ def init_app(event_source, *, loop=None): app = web.Application(loop=loop) app['dispatcher'] = dispatcher - app.router.add_get('/', websocket_handler) + app.router.add_get(EVENTS_ENDPOINT, websocket_handler) return app -def start(event_source, *, loop=None): +def start(sync_event_source, loop=None): """Create and start the WebSocket server.""" if not loop: loop = asyncio.get_event_loop() + event_source = asyncio.Queue(maxsize=1024, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_event_source, event_source, loop), + daemon=True) + bridge.start() + app = init_app(event_source, loop=loop) aiohttp.web.run_app(app, port=9985) - - -def test_websocket_server(): - """Set up a server and output a message every second. - Used for testing purposes.""" - - @asyncio.coroutine - def constant_event_source(event_source): - """Put a message in ``event_source`` every second.""" - - while True: - yield from asyncio.sleep(1) - yield from event_source.put('meow') - - loop = asyncio.get_event_loop() - event_source = asyncio.Queue() - loop.create_task(constant_event_source(event_source)) - start(event_source, loop=loop) - - -if __name__ == '__main__': - test_websocket_server() diff --git a/setup.py b/setup.py index ee8871d4..45d6f04f 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ install_requires = [ 'multipipes~=0.1.0', 'jsonschema~=2.5.1', 'pyyaml~=3.12', + 'aiohttp~=2.0', ] setup( diff --git a/tests/web/test_info.py b/tests/web/test_info.py index 93e14cbd..4dc60168 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,6 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', - 'streams_v1': 'ws://localhost/api/v1/streams/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/', } } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 382a20f0..b205fb25 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,23 @@ +import json +import random + import pytest import asyncio +from bigchaindb.models import Transaction + + +def create_block(b, total=1): + transactions = [ + Transaction.create( + [b.me], + [([b.me], 1)], + metadata={'msg': random.random()}, + ).sign([b.me_private]) + for _ in range(total) + ] + return b.create_block(transactions) + class MockWebSocket: def __init__(self): @@ -11,39 +28,100 @@ class MockWebSocket: @asyncio.coroutine -@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') -def test_dispatcher(loop): - from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL +def test_bridge_sync_async_queue(loop): + import queue + import threading + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio - ws0 = MockWebSocket() - ws1 = MockWebSocket() + sync_queue = queue.Queue() + async_queue = asyncio.Queue(loop=loop) - event_source = asyncio.Queue(loop=loop) - dispatcher = Dispatcher(event_source) + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() - dispatcher.subscribe(0, ws0) - dispatcher.subscribe(1, ws1) + sync_queue.put('fahren') + sync_queue.put('auf') + sync_queue.put('der') + sync_queue.put('Autobahn') - yield from event_source.put('hack') - yield from event_source.put('the') + result = yield from async_queue.get() + assert result == 'fahren' - yield from event_source.put('planet!') - yield from event_source.put(POISON_PILL) + result = yield from async_queue.get() + assert result == 'auf' - loop.run_until_complete(dispatcher.publish()) + result = yield from async_queue.get() + assert result == 'der' - assert ws0.received == ['hack', 'the', 'planet!'] - assert ws1.received == ['planet!'] + result = yield from async_queue.get() + assert result == 'Autobahn' + + assert async_queue.qsize() == 0 @asyncio.coroutine -def test_websocket(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_put_into_capped_queue(loop): + from bigchaindb.web.websocket_server import _put_into_capped_queue + q = asyncio.Queue(maxsize=2, loop=loop) + + _put_into_capped_queue(q, 'Friday') + assert q._queue[0] == 'Friday' + + _put_into_capped_queue(q, "I'm") + assert q._queue[0] == 'Friday' + assert q._queue[1] == "I'm" + + _put_into_capped_queue(q, 'in') + assert q._queue[0] == "I'm" + assert q._queue[1] == 'in' + + _put_into_capped_queue(q, 'love') + assert q._queue[0] == 'in' + assert q._queue[1] == 'love' + + +@asyncio.coroutine +def test_capped_queue(loop): + import queue + import threading + import time + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio + + sync_queue = queue.Queue() + async_queue = asyncio.Queue(maxsize=2, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() + + sync_queue.put('we') + sync_queue.put('are') + sync_queue.put('the') + sync_queue.put('robots') + + # Wait until the thread processes all the items + time.sleep(1) + + result = yield from async_queue.get() + assert result == 'the' + + result = yield from async_queue.get() + assert result == 'robots' + + assert async_queue.qsize() == 0 + + +@asyncio.coroutine +def test_websocket_string_event(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) yield from event_source.put('hack') yield from event_source.put('the') @@ -62,15 +140,72 @@ def test_websocket(test_client, loop): @asyncio.coroutine -@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") -def test_websocket_error(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_websocket_block_event(b, test_client, loop): + from bigchaindb import events + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) + block = create_block(b, 10).to_dict() + block_event = events.Event(events.EventTypes.BLOCK_VALID, block) - yield from ws.close() + yield from event_source.put(block_event) + + for tx in block['block']['transactions']: + result = yield from ws.receive() + json_result = json.loads(result.data) + assert json_result['txid'] == tx['id'] + # Since the transactions are all CREATEs, asset id == transaction id + assert json_result['assetid'] == tx['id'] + assert json_result['blockid'] == block['id'] yield from event_source.put(POISON_PILL) + + +@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang') +@pytest.mark.genesis +def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): + # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much + # magic in the `asyncio` module: running this test without monkey-patching + # `asycio.get_event_loop` (and without the `loop` fixture) raises a: + # RuntimeError: There is no current event loop in thread 'MainThread'. + # + # That's pretty weird because this test doesn't use the pytest-aiohttp + # plugin explicitely. + monkeypatch.setattr('asyncio.get_event_loop', lambda: loop) + + import json + import random + import aiohttp + + from bigchaindb.common import crypto + from bigchaindb import processes + from bigchaindb.models import Transaction + + # Start BigchainDB + processes.start() + + loop = asyncio.get_event_loop() + + import time + time.sleep(1) + + ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1'] + + # Connect to the WebSocket endpoint + session = aiohttp.ClientSession() + ws = loop.run_until_complete(session.ws_connect(ws_url)) + + # Create a keypair and generate a new asset + user_priv, user_pub = crypto.generate_key_pair() + asset = {'random': random.random()} + tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset) + tx = tx.sign([user_priv]) + # Post the transaction to the BigchainDB Web API + client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict())) + + result = loop.run_until_complete(ws.receive()) + json_result = json.loads(result.data) + assert json_result['txid'] == tx.id