From 0cbf144ddf364813d2651793494ad71ab0d58553 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 28 Mar 2017 16:05:44 +0200 Subject: [PATCH 01/27] Initial implementation of an event_handler --- bigchaindb/events.py | 33 +++++++++++++++++++ bigchaindb/pipelines/election.py | 24 +++++++++++--- .../pipelines/events_consumer_example.py | 14 ++++++++ bigchaindb/processes.py | 16 ++++++++- 4 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 bigchaindb/events.py create mode 100644 bigchaindb/pipelines/events_consumer_example.py diff --git a/bigchaindb/events.py b/bigchaindb/events.py new file mode 100644 index 00000000..a061ad50 --- /dev/null +++ b/bigchaindb/events.py @@ -0,0 +1,33 @@ +from enum import Enum +from multiprocessing import Queue + + +class EventTypes(Enum): + BLOCK_VALID = 1 + BLOCK_INVALID = 2 + + +class Event(object): + + def __init__(self, event_type, event_data): + self.type = event_type + self.data = event_data + + +class EventHandler(object): + + def __init__(self, events_queue): + self.events_queue = events_queue + + def put_event(self, event, timeout=None): + # TODO: handle timeouts + self.events_queue.put(event, timeout=None) + + def get_event(self, timeout=None): + # TODO: handle timeouts + return self.events_queue.get(timeout=None) + + +def setup_events_queue(): + # TODO: set bounds to the queue + return Queue() diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index a5818b3e..b17f5722 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -13,6 +13,7 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain +from bigchaindb.events import EventHandler, Event, EventTypes logger = logging.getLogger(__name__) @@ -22,8 +23,9 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self): + def __init__(self, events_queue): self.bigchain = Bigchain() + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -42,6 +44,7 @@ class Election: next_block = self.bigchain.get_block(block_id) result = self.bigchain.block_election(next_block) + self.handle_block_events(result, block_id) if result['status'] == self.bigchain.BLOCK_INVALID: return Block.from_dict(next_block) @@ -67,9 +70,20 @@ class Election: self.bigchain.write_transaction(tx) return invalid_block + def handle_block_events(self, result, block_id): + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID -def create_pipeline(): - election = Election() + event = Event(event_type, {'block_id': block_id}) + self.event_handler.put_event(event) + + +def create_pipeline(events_queue): + election = Election(events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -84,8 +98,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(): - pipeline = create_pipeline() +def start(events_queue): + pipeline = create_pipeline(events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/pipelines/events_consumer_example.py b/bigchaindb/pipelines/events_consumer_example.py new file mode 100644 index 00000000..7e833c82 --- /dev/null +++ b/bigchaindb/pipelines/events_consumer_example.py @@ -0,0 +1,14 @@ +import multiprocessing as mp + +from bigchaindb.events import EventHandler + + +def consume_events(events_queue): + event_handler = EventHandler(events_queue) + while True: + event = event_handler.get_event() + print('Event type: {} Event data: {}'.format(event.type, event.data)) + + +def events_consumer(events_queue): + return mp.Process(target=consume_events, args=(events_queue,)) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 01d7a55a..687422ca 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,6 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale +from bigchaindb.pipelines.events_consumer_example import events_consumer +from bigchaindb.events import setup_events_queue from bigchaindb.web import server @@ -25,6 +27,13 @@ BANNER = """ def start(): logger.info('Initializing BigchainDB...') + # Create the events queue + # The events queue needs to be initialized once and shared between + # processes. This seems the best way to do it + # At this point only the election processs and the event consumer require + # this queue. + events_queue = setup_events_queue() + # start the processes logger.info('Starting block') block.start() @@ -36,12 +45,17 @@ def start(): stale.start() logger.info('Starting election') - election.start() + election.start(events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() + # start the example events consumer + logger.info('Starting the events consumer example') + p_events_consumer = events_consumer(events_queue) + p_events_consumer.start() + # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) From 5d39b42b7a8f32389a28b456ca1cd855a8f47b31 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 30 Mar 2017 17:27:03 +0200 Subject: [PATCH 02/27] Add dependencies and first test --- bigchaindb/web/websocket_server.py | 56 ++++++++++++++++++++++++++++++ setup.py | 1 + tests/web/test_websocket_server.py | 15 ++++++++ 3 files changed, 72 insertions(+) create mode 100644 bigchaindb/web/websocket_server.py create mode 100644 tests/web/test_websocket_server.py diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py new file mode 100644 index 00000000..7a5b3d77 --- /dev/null +++ b/bigchaindb/web/websocket_server.py @@ -0,0 +1,56 @@ +"""WebSocket server for the BigchainDB Event Stream API.""" + +import asyncio +from uuid import uuid4 + +from aiohttp import web + + +class PoisonPill: + pass + + +POISON_PILL = PoisonPill() + + +class Dispatcher: + + def __init__(self, event_source): + self.event_source = event_source + self.subscribers = {} + + def subscribe(self, uuid, ws): + self.subscribers[uuid] = ws + + @asyncio.coroutine + def publish(self): + while True: + event = yield from self.event_source.get() + if event == POISON_PILL: + return + for uuid, ws in self.subscribers.items(): + ws.send_str(event) + + +@asyncio.coroutine +def websocket_handler(request): + ws = web.WebSocketResponse() + yield from ws.prepare(request) + uuid = uuid4() + request.app['dispatcher'].subscribe(uuid, ws) + while True: + # Consume input buffer + yield from ws.receive() + return ws + + +def init_app(event_source, loop=None): + dispatcher = Dispatcher(event_source) + + # Schedule the dispatcher + loop.create_task(dispatcher.publish()) + + app = web.Application(loop=loop) + app['dispatcher'] = dispatcher + app.router.add_get('/', websocket_handler) + return app diff --git a/setup.py b/setup.py index c05b554a..ee8871d4 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ tests_require = [ 'pytest-mock', 'pytest-xdist', 'pytest-flask', + 'pytest-aiohttp', 'tox', ] + docs_require diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py new file mode 100644 index 00000000..fb6d555b --- /dev/null +++ b/tests/web/test_websocket_server.py @@ -0,0 +1,15 @@ +import asyncio + + +@asyncio.coroutine +def test_websocket(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect('/') + yield from event_source.put('antani') + yield from event_source.put(POISON_PILL) + result = yield from ws.receive() + assert result.data == 'antani' From 83397de397179d219938dc63280a61f5f8b56f58 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 31 Mar 2017 15:56:29 +0200 Subject: [PATCH 03/27] Add more tests and utils --- bigchaindb/web/websocket_server.py | 61 +++++++++++++++++++++++---- tests/web/test_websocket_server.py | 67 ++++++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 10 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 7a5b3d77..9d8f5ef9 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -1,29 +1,48 @@ """WebSocket server for the BigchainDB Event Stream API.""" import asyncio +import logging from uuid import uuid4 +import aiohttp from aiohttp import web -class PoisonPill: - pass - - -POISON_PILL = PoisonPill() +logger = logging.getLogger(__name__) +POISON_PILL = 'POISON_PILL' class Dispatcher: + """Dispatch events to websockets. + + This class implements a simple publish/subscribe pattern. + """ def __init__(self, event_source): + """Create a new instance. + + Args: + event_source: a source of events. Elements in the queue + should be strings. + """ + self.event_source = event_source self.subscribers = {} def subscribe(self, uuid, ws): + """Add a websocket to the list of subscribers. + + Args: + uuid (str): a unique identifier for the websocket. + ws: the websocket to publish information. + """ + self.subscribers[uuid] = ws @asyncio.coroutine def publish(self): + """Publish new events to the subscribers.""" + while True: event = yield from self.event_source.get() if event == POISON_PILL: @@ -34,17 +53,29 @@ class Dispatcher: @asyncio.coroutine def websocket_handler(request): + """Handle a new socket connection.""" + + logger.debug('New websocket connection.') ws = web.WebSocketResponse() yield from ws.prepare(request) uuid = uuid4() request.app['dispatcher'].subscribe(uuid, ws) + while True: # Consume input buffer - yield from ws.receive() - return ws + msg = yield from ws.receive() + if msg.type == aiohttp.WSMsgType.ERROR: + logger.debug('Websocket exception: {}'.format(ws.exception())) + return def init_app(event_source, loop=None): + """Init the application server. + + Return: + An aiohttp application. + """ + dispatcher = Dispatcher(event_source) # Schedule the dispatcher @@ -54,3 +85,19 @@ def init_app(event_source, loop=None): app['dispatcher'] = dispatcher app.router.add_get('/', websocket_handler) return app + + +@asyncio.coroutine +def constant_event_source(event_source): + while True: + yield from asyncio.sleep(1) + yield from event_source.put('meow') + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + event_source = asyncio.Queue() + + loop.create_task(constant_event_source(event_source)) + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app, port=9985) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index fb6d555b..382a20f0 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,41 @@ +import pytest import asyncio +class MockWebSocket: + def __init__(self): + self.received = [] + + def send_str(self, s): + self.received.append(s) + + +@asyncio.coroutine +@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') +def test_dispatcher(loop): + from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL + + ws0 = MockWebSocket() + ws1 = MockWebSocket() + + event_source = asyncio.Queue(loop=loop) + dispatcher = Dispatcher(event_source) + + dispatcher.subscribe(0, ws0) + dispatcher.subscribe(1, ws1) + + yield from event_source.put('hack') + yield from event_source.put('the') + + yield from event_source.put('planet!') + yield from event_source.put(POISON_PILL) + + loop.run_until_complete(dispatcher.publish()) + + assert ws0.received == ['hack', 'the', 'planet!'] + assert ws1.received == ['planet!'] + + @asyncio.coroutine def test_websocket(test_client, loop): from bigchaindb.web.websocket_server import init_app, POISON_PILL @@ -9,7 +44,33 @@ def test_websocket(test_client, loop): app = init_app(event_source, loop=loop) client = yield from test_client(app) ws = yield from client.ws_connect('/') - yield from event_source.put('antani') - yield from event_source.put(POISON_PILL) + + yield from event_source.put('hack') + yield from event_source.put('the') + yield from event_source.put('planet!') + result = yield from ws.receive() - assert result.data == 'antani' + assert result.data == 'hack' + + result = yield from ws.receive() + assert result.data == 'the' + + result = yield from ws.receive() + assert result.data == 'planet!' + + yield from event_source.put(POISON_PILL) + + +@asyncio.coroutine +@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") +def test_websocket_error(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect('/') + + yield from ws.close() + + yield from event_source.put(POISON_PILL) From 96daa986994413cefdfc6f933091840233a53ff5 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 29 Mar 2017 12:09:14 +0200 Subject: [PATCH 04/27] Adverstise Event stream api in api info endpoint. Updated tests. --- bigchaindb/web/views/base.py | 5 +++++ bigchaindb/web/views/info.py | 9 ++++++--- tests/web/test_info.py | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 171a3bb6..5a0ec97b 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -21,3 +21,8 @@ def make_error(status_code, message=None): def base_url(): return '%s://%s/' % (request.environ['wsgi.url_scheme'], request.environ['HTTP_HOST']) + + +def base_ws_uri(): + """Base websocket uri.""" + return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index 04a15749..b35c6378 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -4,7 +4,7 @@ import flask from flask_restful import Resource import bigchaindb -from bigchaindb.web.views.base import base_url +from bigchaindb.web.views.base import base_url, base_ws_uri from bigchaindb import version @@ -30,16 +30,19 @@ class RootIndex(Resource): class ApiV1Index(Resource): def get(self): api_root = base_url() + 'api/v1/' + websocket_root = base_ws_uri() + 'api/v1/' docs_url = [ 'https://docs.bigchaindb.com/projects/server/en/v', version.__version__, '/drivers-clients/http-client-server-api.html', ] - return { + return flask.jsonify({ '_links': { 'docs': ''.join(docs_url), 'self': api_root, 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', + # TODO: The version should probably not be hardcoded + 'streams_v1': websocket_root + 'streams/', }, - } + }) diff --git a/tests/web/test_info.py b/tests/web/test_info.py index c55f467f..93e14cbd 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,5 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', + 'streams_v1': 'ws://localhost/api/v1/streams/', } } From 83a7cffc3fe88c2ffeda8a15a19d72c9f010309d Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:29:31 +0200 Subject: [PATCH 05/27] fix tests --- bigchaindb/pipelines/election.py | 31 +++++++++++++++++-------------- bigchaindb/processes.py | 2 +- tests/test_processes.py | 10 ++++++---- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index b17f5722..8f3116cc 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -23,9 +23,11 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self, events_queue): + def __init__(self, events_queue=None): self.bigchain = Bigchain() - self.event_handler = EventHandler(events_queue) + self.event_handler = None + if events_queue: + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -71,19 +73,20 @@ class Election: return invalid_block def handle_block_events(self, result, block_id): - if result['status'] == self.bigchain.BLOCK_UNDECIDED: - return - elif result['status'] == self.bigchain.BLOCK_INVALID: - event_type = EventTypes.BLOCK_INVALID - elif result['status'] == self.bigchain.BLOCK_VALID: - event_type = EventTypes.BLOCK_VALID + if self.event_handler: + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID - event = Event(event_type, {'block_id': block_id}) - self.event_handler.put_event(event) + event = Event(event_type, {'block_id': block_id}) + self.event_handler.put_event(event) -def create_pipeline(events_queue): - election = Election(events_queue) +def create_pipeline(events_queue=None): + election = Election(events_queue=events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -98,8 +101,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(events_queue): - pipeline = create_pipeline(events_queue) +def start(events_queue=None): + pipeline = create_pipeline(events_queue=events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 687422ca..5194c05a 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -45,7 +45,7 @@ def start(): stale.start() logger.info('Starting election') - election.start(events_queue) + election.start(events_queue=events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) diff --git a/tests/test_processes.py b/tests/test_processes.py index bd69d52c..32d784bb 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from multiprocessing import Process +from multiprocessing import Process, Queue from bigchaindb.pipelines import vote, block, election, stale @@ -9,14 +9,16 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, - mock_process): +def test_processes_start(mock_process, mock_vote, mock_block, mock_election, + mock_stale): from bigchaindb import processes processes.start() mock_vote.assert_called_with() mock_block.assert_called_with() - mock_election.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() + assert mock_election.call_count == 1 + # the events queue is declared inside processes.start() + assert type(mock_election.call_args[1]['events_queue']) == type(Queue()) From 730b7482f6c8df79d30b8de48152770d9596cfdf Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:31:38 +0200 Subject: [PATCH 06/27] cleanup code --- bigchaindb/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigchaindb/events.py b/bigchaindb/events.py index a061ad50..bc448ce3 100644 --- a/bigchaindb/events.py +++ b/bigchaindb/events.py @@ -7,14 +7,14 @@ class EventTypes(Enum): BLOCK_INVALID = 2 -class Event(object): +class Event: def __init__(self, event_type, event_data): self.type = event_type self.data = event_data -class EventHandler(object): +class EventHandler: def __init__(self, events_queue): self.events_queue = events_queue From bcc2e1f781f5c082df091e75ba4aa8ebad7fe20f Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:48:50 +0200 Subject: [PATCH 07/27] fixed pep8 issue --- tests/test_processes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 32d784bb..7f8ffcd9 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -19,6 +19,5 @@ def test_processes_start(mock_process, mock_vote, mock_block, mock_election, mock_block.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() - assert mock_election.call_count == 1 # the events queue is declared inside processes.start() - assert type(mock_election.call_args[1]['events_queue']) == type(Queue()) + assert mock_election.call_count == 1 From a92c091eeb11b4fd1cc1c20684c10e0e1576392f Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:55:21 +0200 Subject: [PATCH 08/27] fix pep8 issue --- tests/test_processes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 7f8ffcd9..00716010 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from multiprocessing import Process, Queue +from multiprocessing import Process from bigchaindb.pipelines import vote, block, election, stale From 64a033b17a49e68b685a386a2ef258290b6c612a Mon Sep 17 00:00:00 2001 From: vrde Date: Mon, 3 Apr 2017 11:48:48 +0200 Subject: [PATCH 09/27] Code cleanup, rename some vars --- bigchaindb/web/websocket_server.py | 60 +++++++++++++++++++----------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 9d8f5ef9..6915d54a 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -29,15 +29,15 @@ class Dispatcher: self.event_source = event_source self.subscribers = {} - def subscribe(self, uuid, ws): + def subscribe(self, uuid, websocket): """Add a websocket to the list of subscribers. Args: uuid (str): a unique identifier for the websocket. - ws: the websocket to publish information. + websocket: the websocket to publish information. """ - self.subscribers[uuid] = ws + self.subscribers[uuid] = websocket @asyncio.coroutine def publish(self): @@ -47,8 +47,8 @@ class Dispatcher: event = yield from self.event_source.get() if event == POISON_PILL: return - for uuid, ws in self.subscribers.items(): - ws.send_str(event) + for uuid, websocket in self.subscribers.items(): + websocket.send_str(event) @asyncio.coroutine @@ -56,20 +56,20 @@ def websocket_handler(request): """Handle a new socket connection.""" logger.debug('New websocket connection.') - ws = web.WebSocketResponse() - yield from ws.prepare(request) + websocket = web.WebSocketResponse() + yield from websocket.prepare(request) uuid = uuid4() - request.app['dispatcher'].subscribe(uuid, ws) + request.app['dispatcher'].subscribe(uuid, websocket) while True: # Consume input buffer - msg = yield from ws.receive() + msg = yield from websocket.receive() if msg.type == aiohttp.WSMsgType.ERROR: - logger.debug('Websocket exception: {}'.format(ws.exception())) + logger.debug('Websocket exception: %s', websocket.exception()) return -def init_app(event_source, loop=None): +def init_app(event_source, *, loop=None): """Init the application server. Return: @@ -87,17 +87,33 @@ def init_app(event_source, loop=None): return app -@asyncio.coroutine -def constant_event_source(event_source): - while True: - yield from asyncio.sleep(1) - yield from event_source.put('meow') +def start(event_source, *, loop=None): + """Create and start the WebSocket server.""" + + if not loop: + loop = asyncio.get_event_loop() + + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app, port=9985) + + +def test_websocket_server(): + """Set up a server and output a message every second. + Used for testing purposes.""" + + @asyncio.coroutine + def constant_event_source(event_source): + """Put a message in ``event_source`` every second.""" + + while True: + yield from asyncio.sleep(1) + yield from event_source.put('meow') + + loop = asyncio.get_event_loop() + event_source = asyncio.Queue() + loop.create_task(constant_event_source(event_source)) + start(event_source, loop=loop) if __name__ == '__main__': - loop = asyncio.get_event_loop() - event_source = asyncio.Queue() - - loop.create_task(constant_event_source(event_source)) - app = init_app(event_source, loop=loop) - aiohttp.web.run_app(app, port=9985) + test_websocket_server() From f23faaa65fa1bdbe487266432faea9e4331cecae Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 09:16:22 +0200 Subject: [PATCH 10/27] Add WebSocket server --- bigchaindb/pipelines/election.py | 2 +- bigchaindb/processes.py | 12 +- bigchaindb/web/views/base.py | 2 +- bigchaindb/web/websocket_server.py | 103 ++++++++++++---- setup.py | 1 + tests/web/test_info.py | 2 +- tests/web/test_websocket_server.py | 183 +++++++++++++++++++++++++---- 7 files changed, 246 insertions(+), 59 deletions(-) diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 8f3116cc..fc7cb077 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -81,7 +81,7 @@ class Election: elif result['status'] == self.bigchain.BLOCK_VALID: event_type = EventTypes.BLOCK_VALID - event = Event(event_type, {'block_id': block_id}) + event = Event(event_type, self.bigchain.get_block(block_id)) self.event_handler.put_event(event) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 5194c05a..205cdd3c 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,9 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale -from bigchaindb.pipelines.events_consumer_example import events_consumer from bigchaindb.events import setup_events_queue -from bigchaindb.web import server +from bigchaindb.web import server, websocket_server logger = logging.getLogger(__name__) @@ -52,10 +51,11 @@ def start(): p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() - # start the example events consumer - logger.info('Starting the events consumer example') - p_events_consumer = events_consumer(events_queue) - p_events_consumer.start() + logger.info('WebSocket server started') + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(events_queue,)) + p_websocket_server.start() # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 5a0ec97b..5ab409b0 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -25,4 +25,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) + return 'ws://localhost:9985/' diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 6915d54a..dc320754 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -1,15 +1,64 @@ """WebSocket server for the BigchainDB Event Stream API.""" +# NOTE +# +# This module contains some functions and utilities that might belong to other +# modules. For now, I prefer to keep everything in this module. Why? Because +# those functions are needed only here. +# +# When we will extend this part of the project and we find that we need those +# functionalities elsewhere, we can start creating new modules and organizing +# things in a better way. + + +import json import asyncio import logging +import threading from uuid import uuid4 import aiohttp from aiohttp import web +from bigchaindb.events import EventTypes + logger = logging.getLogger(__name__) POISON_PILL = 'POISON_PILL' +EVENTS_ENDPOINT = '/api/v1/streams/' + + +def _put_into_capped_queue(queue, value): + """Put a new item in a capped queue. + + If the queue reached its limit, get the first element + ready and put the new one. Note that the first element + will be lost (that's the purpose of a capped queue). + + Args: + queue: a queue + value: the value to put + """ + while True: + try: + queue.put_nowait(value) + return + except asyncio.QueueFull: + queue.get_nowait() + + +def _multiprocessing_to_asyncio(in_queue, out_queue, loop): + """Bridge between a synchronous multiprocessing queue + and an asynchronous asyncio queue. + + Args: + in_queue (multiprocessing.Queue): input queue + out_queue (asyncio.Queue): output queue + """ + + while True: + value = in_queue.get() + loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value) class Dispatcher: @@ -45,10 +94,27 @@ class Dispatcher: while True: event = yield from self.event_source.get() + str_buffer = [] + if event == POISON_PILL: return - for uuid, websocket in self.subscribers.items(): - websocket.send_str(event) + + if isinstance(event, str): + str_buffer.append(event) + + elif event.type == EventTypes.BLOCK_VALID: + block = event.data + + for tx in block['block']['transactions']: + asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] + data = {'blockid': block['id'], + 'assetid': asset_id, + 'txid': tx['id']} + str_buffer.append(json.dumps(data)) + + for _, websocket in self.subscribers.items(): + for str_item in str_buffer: + websocket.send_str(str_item) @asyncio.coroutine @@ -83,37 +149,22 @@ def init_app(event_source, *, loop=None): app = web.Application(loop=loop) app['dispatcher'] = dispatcher - app.router.add_get('/', websocket_handler) + app.router.add_get(EVENTS_ENDPOINT, websocket_handler) return app -def start(event_source, *, loop=None): +def start(sync_event_source, loop=None): """Create and start the WebSocket server.""" if not loop: loop = asyncio.get_event_loop() + event_source = asyncio.Queue(maxsize=1024, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_event_source, event_source, loop), + daemon=True) + bridge.start() + app = init_app(event_source, loop=loop) aiohttp.web.run_app(app, port=9985) - - -def test_websocket_server(): - """Set up a server and output a message every second. - Used for testing purposes.""" - - @asyncio.coroutine - def constant_event_source(event_source): - """Put a message in ``event_source`` every second.""" - - while True: - yield from asyncio.sleep(1) - yield from event_source.put('meow') - - loop = asyncio.get_event_loop() - event_source = asyncio.Queue() - loop.create_task(constant_event_source(event_source)) - start(event_source, loop=loop) - - -if __name__ == '__main__': - test_websocket_server() diff --git a/setup.py b/setup.py index ee8871d4..45d6f04f 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ install_requires = [ 'multipipes~=0.1.0', 'jsonschema~=2.5.1', 'pyyaml~=3.12', + 'aiohttp~=2.0', ] setup( diff --git a/tests/web/test_info.py b/tests/web/test_info.py index 93e14cbd..4dc60168 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,6 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', - 'streams_v1': 'ws://localhost/api/v1/streams/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/', } } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 382a20f0..b205fb25 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,23 @@ +import json +import random + import pytest import asyncio +from bigchaindb.models import Transaction + + +def create_block(b, total=1): + transactions = [ + Transaction.create( + [b.me], + [([b.me], 1)], + metadata={'msg': random.random()}, + ).sign([b.me_private]) + for _ in range(total) + ] + return b.create_block(transactions) + class MockWebSocket: def __init__(self): @@ -11,39 +28,100 @@ class MockWebSocket: @asyncio.coroutine -@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') -def test_dispatcher(loop): - from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL +def test_bridge_sync_async_queue(loop): + import queue + import threading + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio - ws0 = MockWebSocket() - ws1 = MockWebSocket() + sync_queue = queue.Queue() + async_queue = asyncio.Queue(loop=loop) - event_source = asyncio.Queue(loop=loop) - dispatcher = Dispatcher(event_source) + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() - dispatcher.subscribe(0, ws0) - dispatcher.subscribe(1, ws1) + sync_queue.put('fahren') + sync_queue.put('auf') + sync_queue.put('der') + sync_queue.put('Autobahn') - yield from event_source.put('hack') - yield from event_source.put('the') + result = yield from async_queue.get() + assert result == 'fahren' - yield from event_source.put('planet!') - yield from event_source.put(POISON_PILL) + result = yield from async_queue.get() + assert result == 'auf' - loop.run_until_complete(dispatcher.publish()) + result = yield from async_queue.get() + assert result == 'der' - assert ws0.received == ['hack', 'the', 'planet!'] - assert ws1.received == ['planet!'] + result = yield from async_queue.get() + assert result == 'Autobahn' + + assert async_queue.qsize() == 0 @asyncio.coroutine -def test_websocket(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_put_into_capped_queue(loop): + from bigchaindb.web.websocket_server import _put_into_capped_queue + q = asyncio.Queue(maxsize=2, loop=loop) + + _put_into_capped_queue(q, 'Friday') + assert q._queue[0] == 'Friday' + + _put_into_capped_queue(q, "I'm") + assert q._queue[0] == 'Friday' + assert q._queue[1] == "I'm" + + _put_into_capped_queue(q, 'in') + assert q._queue[0] == "I'm" + assert q._queue[1] == 'in' + + _put_into_capped_queue(q, 'love') + assert q._queue[0] == 'in' + assert q._queue[1] == 'love' + + +@asyncio.coroutine +def test_capped_queue(loop): + import queue + import threading + import time + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio + + sync_queue = queue.Queue() + async_queue = asyncio.Queue(maxsize=2, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() + + sync_queue.put('we') + sync_queue.put('are') + sync_queue.put('the') + sync_queue.put('robots') + + # Wait until the thread processes all the items + time.sleep(1) + + result = yield from async_queue.get() + assert result == 'the' + + result = yield from async_queue.get() + assert result == 'robots' + + assert async_queue.qsize() == 0 + + +@asyncio.coroutine +def test_websocket_string_event(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) yield from event_source.put('hack') yield from event_source.put('the') @@ -62,15 +140,72 @@ def test_websocket(test_client, loop): @asyncio.coroutine -@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") -def test_websocket_error(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_websocket_block_event(b, test_client, loop): + from bigchaindb import events + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) + block = create_block(b, 10).to_dict() + block_event = events.Event(events.EventTypes.BLOCK_VALID, block) - yield from ws.close() + yield from event_source.put(block_event) + + for tx in block['block']['transactions']: + result = yield from ws.receive() + json_result = json.loads(result.data) + assert json_result['txid'] == tx['id'] + # Since the transactions are all CREATEs, asset id == transaction id + assert json_result['assetid'] == tx['id'] + assert json_result['blockid'] == block['id'] yield from event_source.put(POISON_PILL) + + +@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang') +@pytest.mark.genesis +def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): + # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much + # magic in the `asyncio` module: running this test without monkey-patching + # `asycio.get_event_loop` (and without the `loop` fixture) raises a: + # RuntimeError: There is no current event loop in thread 'MainThread'. + # + # That's pretty weird because this test doesn't use the pytest-aiohttp + # plugin explicitely. + monkeypatch.setattr('asyncio.get_event_loop', lambda: loop) + + import json + import random + import aiohttp + + from bigchaindb.common import crypto + from bigchaindb import processes + from bigchaindb.models import Transaction + + # Start BigchainDB + processes.start() + + loop = asyncio.get_event_loop() + + import time + time.sleep(1) + + ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1'] + + # Connect to the WebSocket endpoint + session = aiohttp.ClientSession() + ws = loop.run_until_complete(session.ws_connect(ws_url)) + + # Create a keypair and generate a new asset + user_priv, user_pub = crypto.generate_key_pair() + asset = {'random': random.random()} + tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset) + tx = tx.sign([user_priv]) + # Post the transaction to the BigchainDB Web API + client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict())) + + result = loop.run_until_complete(ws.receive()) + json_result = json.loads(result.data) + assert json_result['txid'] == tx.id From d260e16f117a2bfc75a0fc7f03b325e802978ba2 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 10:51:00 +0200 Subject: [PATCH 11/27] Add configuration for websocket server --- bigchaindb/__init__.py | 4 ++++ bigchaindb/commands/bigchaindb.py | 4 ++++ bigchaindb/web/views/base.py | 5 ++++- bigchaindb/web/websocket_server.py | 5 ++++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 4c555e47..98e6b27b 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -59,6 +59,10 @@ config = { 'workers': None, # if none, the value will be cpu_count * 2 + 1 'threads': None, # if none, the value will be cpu_count * 2 + 1 }, + 'wsserver': { + 'host': os.environ.get('BIGCHAINDB_WSSERVER_HOST') or 'localhost', + 'port': int(os.environ.get('BIGCHAINDB_WSSERVER_PORT', 9985)), + }, 'database': _database_map[ os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb') ], diff --git a/bigchaindb/commands/bigchaindb.py b/bigchaindb/commands/bigchaindb.py index d4e37daa..a46019da 100644 --- a/bigchaindb/commands/bigchaindb.py +++ b/bigchaindb/commands/bigchaindb.py @@ -96,6 +96,10 @@ def run_configure(args, skip_if_exists=False): val = conf['server'][key] conf['server'][key] = input_on_stderr('API Server {}? (default `{}`): '.format(key, val), val) + for key in ('host', 'port'): + val = conf['wsserver'][key] + conf['wsserver'][key] = input_on_stderr('WebSocket Server {}? (default `{}`): '.format(key, val), val) + for key in database_keys: val = conf['database'][key] conf['database'][key] = input_on_stderr('Database {}? (default `{}`): '.format(key, val), val) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 5ab409b0..7b12c5bb 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -5,6 +5,9 @@ import logging from flask import jsonify, request +from bigchaindb import config + + logger = logging.getLogger(__name__) @@ -25,4 +28,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return 'ws://localhost:9985/' + return 'ws://{host}:{port}/'.format(**config['wsserver']) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index dc320754..dad06b94 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -20,6 +20,7 @@ from uuid import uuid4 import aiohttp from aiohttp import web +from bigchaindb import config from bigchaindb.events import EventTypes @@ -167,4 +168,6 @@ def start(sync_event_source, loop=None): bridge.start() app = init_app(event_source, loop=loop) - aiohttp.web.run_app(app, port=9985) + aiohttp.web.run_app(app, + host=config['wsserver']['host'], + port=config['wsserver']['port']) From be763022ad7c448cdee4629e9e5f4565d35bd7ce Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:07:05 +0200 Subject: [PATCH 12/27] Update documentation (tnx @ttmc) --- docs/server/source/drivers-clients/index.rst | 1 - docs/server/source/index.rst | 1 + .../{drivers-clients => }/websocket-event-stream-api.rst | 3 --- 3 files changed, 1 insertion(+), 4 deletions(-) rename docs/server/source/{drivers-clients => }/websocket-event-stream-api.rst (97%) diff --git a/docs/server/source/drivers-clients/index.rst b/docs/server/source/drivers-clients/index.rst index 704832c0..18894f60 100644 --- a/docs/server/source/drivers-clients/index.rst +++ b/docs/server/source/drivers-clients/index.rst @@ -15,7 +15,6 @@ community projects listed below. :maxdepth: 1 http-client-server-api - websocket-event-stream-api The Python Driver Transaction CLI diff --git a/docs/server/source/index.rst b/docs/server/source/index.rst index 6ac4b9f5..7a458934 100644 --- a/docs/server/source/index.rst +++ b/docs/server/source/index.rst @@ -11,6 +11,7 @@ BigchainDB Server Documentation nodes/index dev-and-test/index server-reference/index + websocket-event-stream-api drivers-clients/index clusters-feds/index data-models/index diff --git a/docs/server/source/drivers-clients/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst similarity index 97% rename from docs/server/source/drivers-clients/websocket-event-stream-api.rst rename to docs/server/source/websocket-event-stream-api.rst index 22effbc1..88efb7bb 100644 --- a/docs/server/source/drivers-clients/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -1,9 +1,6 @@ The WebSocket Event Stream API ============================== -.. important:: - This is currently scheduled to be implemented in BigchainDB Server 0.10. - BigchainDB provides real-time event streams over the WebSocket protocol with the Event Stream API. From aeb8827e30bd313eee756b88318c6c5f69654d19 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:07:24 +0200 Subject: [PATCH 13/27] Use try..except..else --- bigchaindb/web/websocket_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index dad06b94..a725f9ee 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -43,9 +43,10 @@ def _put_into_capped_queue(queue, value): while True: try: queue.put_nowait(value) - return except asyncio.QueueFull: queue.get_nowait() + else: + return def _multiprocessing_to_asyncio(in_queue, out_queue, loop): From be3f62dd108f021ce64d8623ba3dad3aefbd9cd3 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:57:11 +0200 Subject: [PATCH 14/27] Update endpoints and docs --- bigchaindb/web/views/base.py | 2 +- bigchaindb/web/views/info.py | 5 +++-- bigchaindb/web/websocket_server.py | 2 +- docs/server/source/websocket-event-stream-api.rst | 7 ++++++- tests/test_config_utils.py | 8 ++++++++ tests/web/test_info.py | 2 +- 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 7b12c5bb..0c226d7d 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -28,4 +28,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return 'ws://{host}:{port}/'.format(**config['wsserver']) + return 'ws://{host}:{port}'.format(**config['wsserver']) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index b35c6378..9b084ac5 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -6,6 +6,7 @@ from flask_restful import Resource import bigchaindb from bigchaindb.web.views.base import base_url, base_ws_uri from bigchaindb import version +from bigchaindb.web.websocket_server import EVENTS_ENDPOINT class RootIndex(Resource): @@ -30,7 +31,7 @@ class RootIndex(Resource): class ApiV1Index(Resource): def get(self): api_root = base_url() + 'api/v1/' - websocket_root = base_ws_uri() + 'api/v1/' + websocket_root = base_ws_uri() + EVENTS_ENDPOINT docs_url = [ 'https://docs.bigchaindb.com/projects/server/en/v', version.__version__, @@ -43,6 +44,6 @@ class ApiV1Index(Resource): 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', # TODO: The version should probably not be hardcoded - 'streams_v1': websocket_root + 'streams/', + 'streams_v1': websocket_root, }, }) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index a725f9ee..ae7d6da2 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -26,7 +26,7 @@ from bigchaindb.events import EventTypes logger = logging.getLogger(__name__) POISON_PILL = 'POISON_PILL' -EVENTS_ENDPOINT = '/api/v1/streams/' +EVENTS_ENDPOINT = '/api/v1/streams/valid_tx' def _put_into_capped_queue(queue, value): diff --git a/docs/server/source/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst index 88efb7bb..1dedc45f 100644 --- a/docs/server/source/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -1,6 +1,11 @@ The WebSocket Event Stream API ============================== +.. important:: + The WebSocket Event Stream runs on a different port than the Web API. The + default port for the Web API is `9984`, while the one for the Event Stream + is `9985`. + BigchainDB provides real-time event streams over the WebSocket protocol with the Event Stream API. @@ -25,7 +30,7 @@ response contains a ``streams_`` property in ``_links``:: { "_links": { - "streams_v1": "ws://example.com:9984/api/v1/streams/" + "streams_v1": "ws://example.com:9985/api/v1/streams/" } } diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index 04c70325..7ee74432 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -144,6 +144,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): DATABASE_PORT = 4242 DATABASE_BACKEND = request.config.getoption('--database-backend') SERVER_BIND = '1.2.3.4:56' + WSSERVER_HOST = '1.2.3.4' + WSSERVER_PORT = 57 KEYRING = 'pubkey_0:pubkey_1:pubkey_2' file_config = { @@ -157,6 +159,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'BIGCHAINDB_DATABASE_PORT': str(DATABASE_PORT), 'BIGCHAINDB_DATABASE_BACKEND': DATABASE_BACKEND, 'BIGCHAINDB_SERVER_BIND': SERVER_BIND, + 'BIGCHAINDB_WSSERVER_HOST': WSSERVER_HOST, + 'BIGCHAINDB_WSSERVER_PORT': WSSERVER_PORT, 'BIGCHAINDB_KEYRING': KEYRING}) import bigchaindb @@ -198,6 +202,10 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'workers': None, 'threads': None, }, + 'wsserver': { + 'host': WSSERVER_HOST, + 'port': WSSERVER_PORT, + }, 'database': database, 'keypair': { 'public': None, diff --git a/tests/web/test_info.py b/tests/web/test_info.py index 4dc60168..eeb80f78 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,6 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', - 'streams_v1': 'ws://localhost:9985/api/v1/streams/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/valid_tx', } } From da29bbc605caeb2f0ea9ab1ef712176b73c0ecee Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 7 Apr 2017 15:02:49 +0200 Subject: [PATCH 15/27] added tests for the events --- .../pipelines/events_consumer_example.py | 14 ----------- tests/pipelines/test_election.py | 24 +++++++++++++++++++ tests/test_events.py | 21 ++++++++++++++++ 3 files changed, 45 insertions(+), 14 deletions(-) delete mode 100644 bigchaindb/pipelines/events_consumer_example.py create mode 100644 tests/test_events.py diff --git a/bigchaindb/pipelines/events_consumer_example.py b/bigchaindb/pipelines/events_consumer_example.py deleted file mode 100644 index 7e833c82..00000000 --- a/bigchaindb/pipelines/events_consumer_example.py +++ /dev/null @@ -1,14 +0,0 @@ -import multiprocessing as mp - -from bigchaindb.events import EventHandler - - -def consume_events(events_queue): - event_handler = EventHandler(events_queue) - while True: - event = event_handler.get_event() - print('Event type: {} Event data: {}'.format(event.type, event.data)) - - -def events_consumer(events_queue): - return mp.Process(target=consume_events, args=(events_queue,)) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 3127dcaf..c3254601 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -199,3 +199,27 @@ def test_full_pipeline(b, user_pk): tx_from_block = set([tx.id for tx in invalid_block.transactions]) tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))]) assert tx_from_block == tx_from_backlog + + +def test_handle_block_events(): + from bigchaindb.events import setup_events_queue, EventTypes + + events_queue = setup_events_queue() + e = election.Election(events_queue=events_queue) + block_id = 'a' * 64 + + assert events_queue.qsize() == 0 + + # no event should be emited in case a block is undecided + e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id) + assert events_queue.qsize() == 0 + + # put an invalid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_INVALID + + # put an valid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_VALID diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 00000000..22369b51 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,21 @@ +def tests_event_handler(): + from bigchaindb.events import (EventTypes, Event, EventHandler, + setup_events_queue) + + # create and event + event_data = {'msg': 'some data'} + event = Event(EventTypes.BLOCK_VALID, event_data) + # create the events queue + events_queue = setup_events_queue() + + # create event handler + event_handler = EventHandler(events_queue) + + # push and event to the queue + event_handler.put_event(event) + + # get the event from the queue + event_from_queue = event_handler.get_event() + + assert event_from_queue.type == event.type + assert event_from_queue.data == event.data From a673d9c6efcc2d37b72f545b2170ead2995762d3 Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 11 Apr 2017 16:34:50 +0200 Subject: [PATCH 16/27] Add more code coverage --- tests/web/test_websocket_server.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index b205fb25..ee0cfc6e 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,5 +1,6 @@ import json import random +from unittest.mock import patch import pytest import asyncio @@ -114,6 +115,19 @@ def test_capped_queue(loop): assert async_queue.qsize() == 0 +@patch('threading.Thread.start') +@patch('aiohttp.web.run_app') +@patch('bigchaindb.web.websocket_server.init_app') +@patch('asyncio.get_event_loop', return_value='event-loop') +@patch('asyncio.Queue', return_value='event-queue') +def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, init_app_mock, run_app_mock, thread_start_mock): + from bigchaindb.web.websocket_server import start + + start(None) + + init_app_mock.assert_called_with('event-queue', loop='event-loop') + + @asyncio.coroutine def test_websocket_string_event(test_client, loop): from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT From 79997848cd469fe75c237c1f97312c34f5f1c2f5 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 17:21:25 +0200 Subject: [PATCH 17/27] Refine test for the election pipeline process test that the process is started with the events_queue kwargs --- tests/test_processes.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 00716010..e6503541 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -9,8 +9,9 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -def test_processes_start(mock_process, mock_vote, mock_block, mock_election, - mock_stale): +@patch('bigchaindb.events.setup_events_queue', spec_set=True, autospec=True) +def test_processes_start(mock_setup_events_queue, mock_process, mock_vote, + mock_block, mock_election, mock_stale): from bigchaindb import processes processes.start() @@ -19,5 +20,5 @@ def test_processes_start(mock_process, mock_vote, mock_block, mock_election, mock_block.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() - # the events queue is declared inside processes.start() - assert mock_election.call_count == 1 + mock_election.assert_called_once_with( + events_queue=mock_setup_events_queue.return_value) From e0e997755e8666dc495a1e0c15c831437baf7731 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 18:31:56 +0200 Subject: [PATCH 18/27] Re-order imports (pep8) --- tests/web/test_websocket_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index ee0cfc6e..55564ec2 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,9 +1,9 @@ +import asyncio import json import random from unittest.mock import patch import pytest -import asyncio from bigchaindb.models import Transaction From 98e52e047e866027d0210c7d1d6749414afc4e35 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 18:32:21 +0200 Subject: [PATCH 19/27] Make utility test function into a fixture --- tests/web/test_websocket_server.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 55564ec2..403b037d 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -8,7 +8,9 @@ import pytest from bigchaindb.models import Transaction -def create_block(b, total=1): +@pytest.fixture +def _block(b, request): + total = getattr(request, 'param', 1) transactions = [ Transaction.create( [b.me], @@ -154,7 +156,8 @@ def test_websocket_string_event(test_client, loop): @asyncio.coroutine -def test_websocket_block_event(b, test_client, loop): +@pytest.mark.parametrize('_block', (10,), indirect=('_block',), ids=('block',)) +def test_websocket_block_event(b, _block, test_client, loop): from bigchaindb import events from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT @@ -162,7 +165,7 @@ def test_websocket_block_event(b, test_client, loop): app = init_app(event_source, loop=loop) client = yield from test_client(app) ws = yield from client.ws_connect(EVENTS_ENDPOINT) - block = create_block(b, 10).to_dict() + block = _block.to_dict() block_event = events.Event(events.EventTypes.BLOCK_VALID, block) yield from event_source.put(block_event) From 75dd645ec9dfdc2b39918bce11c2a31c215b2b75 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:47:58 +0200 Subject: [PATCH 20/27] Import stdlib pkgs at the top of the test module --- tests/web/test_websocket_server.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 403b037d..3b3f2e39 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,9 @@ import asyncio import json +import queue import random +import threading +import time from unittest.mock import patch import pytest @@ -32,8 +35,6 @@ class MockWebSocket: @asyncio.coroutine def test_bridge_sync_async_queue(loop): - import queue - import threading from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() @@ -87,9 +88,6 @@ def test_put_into_capped_queue(loop): @asyncio.coroutine def test_capped_queue(loop): - import queue - import threading - import time from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() From e614834a0360ce8e474fb5334972710a99e2821d Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:49:10 +0200 Subject: [PATCH 21/27] Import Transaction class within fixture --- tests/web/test_websocket_server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 3b3f2e39..4323685a 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -8,11 +8,10 @@ from unittest.mock import patch import pytest -from bigchaindb.models import Transaction - @pytest.fixture def _block(b, request): + from bigchaindb.models import Transaction total = getattr(request, 'param', 1) transactions = [ Transaction.create( From 0347fbccf49d95c3add28ce9339d58025aad439d Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:50:09 +0200 Subject: [PATCH 22/27] Add a few more checks to the test --- tests/web/test_websocket_server.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 4323685a..13015dbb 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -114,17 +114,30 @@ def test_capped_queue(loop): assert async_queue.qsize() == 0 -@patch('threading.Thread.start') +@patch('threading.Thread') @patch('aiohttp.web.run_app') @patch('bigchaindb.web.websocket_server.init_app') @patch('asyncio.get_event_loop', return_value='event-loop') @patch('asyncio.Queue', return_value='event-queue') -def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, init_app_mock, run_app_mock, thread_start_mock): - from bigchaindb.web.websocket_server import start +def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, + init_app_mock, run_app_mock, + thread_mock): + from bigchaindb import config + from bigchaindb.web.websocket_server import start, _multiprocessing_to_asyncio start(None) - + thread_mock.assert_called_once_with( + target=_multiprocessing_to_asyncio, + args=(None, queue_mock.return_value, get_event_loop_mock.return_value), + daemon=True, + ) + thread_mock.return_value.start.assert_called_once_with() init_app_mock.assert_called_with('event-queue', loop='event-loop') + run_app_mock.assert_called_once_with( + init_app_mock.return_value, + host=config['wsserver']['host'], + port=config['wsserver']['port'], + ) @asyncio.coroutine From 2bedc9b059a3ff7d25e574978f927c977d05f4c1 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 12 Apr 2017 14:39:15 +0200 Subject: [PATCH 23/27] Fix typos --- tests/pipelines/test_election.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index c3254601..f0dd232d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -210,7 +210,7 @@ def test_handle_block_events(): assert events_queue.qsize() == 0 - # no event should be emited in case a block is undecided + # no event should be emitted in case a block is undecided e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id) assert events_queue.qsize() == 0 @@ -219,7 +219,7 @@ def test_handle_block_events(): event = e.event_handler.get_event() assert event.type == EventTypes.BLOCK_INVALID - # put an valid block event in the queue + # put a valid block event in the queue e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) event = e.event_handler.get_event() assert event.type == EventTypes.BLOCK_VALID From 4c9adededd558a4c4d29965e11c60b86da4bdafe Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 12 Apr 2017 15:54:11 +0200 Subject: [PATCH 24/27] Remove TODO --- bigchaindb/web/views/info.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index 9b084ac5..51b59643 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -43,7 +43,6 @@ class ApiV1Index(Resource): 'self': api_root, 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', - # TODO: The version should probably not be hardcoded 'streams_v1': websocket_root, }, }) From a7ed28e539a1ff605a5f5954efeabb078eb9bf26 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 16:12:41 +0200 Subject: [PATCH 25/27] Test command helper _run_init --- tests/commands/test_commands.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index 6fb424d6..fa3ecf42 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -130,6 +130,22 @@ def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): run_init(args) +def test__run_init(mocker): + from bigchaindb.commands.bigchaindb import _run_init + bigchain_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.bigchaindb.Bigchain') + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + _run_init() + bigchain_mock.assert_called_once_with() + init_db_mock.assert_called_once_with( + connection=bigchain_mock.return_value.connection) + bigchain_mock.return_value.create_genesis_block.assert_called_once_with() + + @patch('bigchaindb.backend.schema.drop_database') def test_drop_db_when_assumed_yes(mock_db_drop): from bigchaindb.commands.bigchaindb import run_drop From 303e12ee280befb2cbe0bc707c5b62f7ef896066 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 16:38:18 +0200 Subject: [PATCH 26/27] Test command run_init when db already exists --- tests/commands/test_commands.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index fa3ecf42..087e1afe 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -124,10 +124,23 @@ def test_bigchain_export_my_pubkey_when_pubkey_not_set(monkeypatch): "This node's public key wasn't set anywhere so it can't be exported" -def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): +def test_bigchain_run_init_when_db_exists(mocker, capsys): from bigchaindb.commands.bigchaindb import run_init + from bigchaindb.common.exceptions import DatabaseAlreadyExists + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + init_db_mock.side_effect = DatabaseAlreadyExists args = Namespace(config=None) run_init(args) + output_message = capsys.readouterr()[1] + print(output_message) + assert output_message == ( + 'The database already exists.\n' + 'If you wish to re-initialize it, first drop it.\n' + ) def test__run_init(mocker): From 414d915033c9e37476a37449a3899abc6a69ba7d Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 13 Apr 2017 08:54:34 +0200 Subject: [PATCH 27/27] Snakecaseify keys --- bigchaindb/web/websocket_server.py | 6 +++--- docs/server/source/websocket-event-stream-api.rst | 6 +++--- tests/web/test_websocket_server.py | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index ae7d6da2..5507f504 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -109,9 +109,9 @@ class Dispatcher: for tx in block['block']['transactions']: asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] - data = {'blockid': block['id'], - 'assetid': asset_id, - 'txid': tx['id']} + data = {'block_id': block['id'], + 'asset_id': asset_id, + 'tx_id': tx['id']} str_buffer.append(json.dumps(data)) for _, websocket in self.subscribers.items(): diff --git a/docs/server/source/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst index 1dedc45f..3ce86553 100644 --- a/docs/server/source/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -82,9 +82,9 @@ the transaction's ID, associated asset ID, and containing block's ID. Example message:: { - "txid": "", - "assetid": "", - "blockid": "" + "tx_id": "", + "asset_id": "", + "block_id": "" } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 13015dbb..6484ef4e 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -183,10 +183,10 @@ def test_websocket_block_event(b, _block, test_client, loop): for tx in block['block']['transactions']: result = yield from ws.receive() json_result = json.loads(result.data) - assert json_result['txid'] == tx['id'] + assert json_result['tx_id'] == tx['id'] # Since the transactions are all CREATEs, asset id == transaction id - assert json_result['assetid'] == tx['id'] - assert json_result['blockid'] == block['id'] + assert json_result['asset_id'] == tx['id'] + assert json_result['block_id'] == block['id'] yield from event_source.put(POISON_PILL) @@ -235,4 +235,4 @@ def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): result = loop.run_until_complete(ws.receive()) json_result = json.loads(result.data) - assert json_result['txid'] == tx.id + assert json_result['tx_id'] == tx.id