diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 4c555e47..98e6b27b 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -59,6 +59,10 @@ config = { 'workers': None, # if none, the value will be cpu_count * 2 + 1 'threads': None, # if none, the value will be cpu_count * 2 + 1 }, + 'wsserver': { + 'host': os.environ.get('BIGCHAINDB_WSSERVER_HOST') or 'localhost', + 'port': int(os.environ.get('BIGCHAINDB_WSSERVER_PORT', 9985)), + }, 'database': _database_map[ os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb') ], diff --git a/bigchaindb/commands/bigchaindb.py b/bigchaindb/commands/bigchaindb.py index d4e37daa..a46019da 100644 --- a/bigchaindb/commands/bigchaindb.py +++ b/bigchaindb/commands/bigchaindb.py @@ -96,6 +96,10 @@ def run_configure(args, skip_if_exists=False): val = conf['server'][key] conf['server'][key] = input_on_stderr('API Server {}? (default `{}`): '.format(key, val), val) + for key in ('host', 'port'): + val = conf['wsserver'][key] + conf['wsserver'][key] = input_on_stderr('WebSocket Server {}? (default `{}`): '.format(key, val), val) + for key in database_keys: val = conf['database'][key] conf['database'][key] = input_on_stderr('Database {}? (default `{}`): '.format(key, val), val) diff --git a/bigchaindb/events.py b/bigchaindb/events.py new file mode 100644 index 00000000..bc448ce3 --- /dev/null +++ b/bigchaindb/events.py @@ -0,0 +1,33 @@ +from enum import Enum +from multiprocessing import Queue + + +class EventTypes(Enum): + BLOCK_VALID = 1 + BLOCK_INVALID = 2 + + +class Event: + + def __init__(self, event_type, event_data): + self.type = event_type + self.data = event_data + + +class EventHandler: + + def __init__(self, events_queue): + self.events_queue = events_queue + + def put_event(self, event, timeout=None): + # TODO: handle timeouts + self.events_queue.put(event, timeout=None) + + def get_event(self, timeout=None): + # TODO: handle timeouts + return self.events_queue.get(timeout=None) + + +def setup_events_queue(): + # TODO: set bounds to the queue + return Queue() diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index a5818b3e..fc7cb077 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -13,6 +13,7 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain +from bigchaindb.events import EventHandler, Event, EventTypes logger = logging.getLogger(__name__) @@ -22,8 +23,11 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self): + def __init__(self, events_queue=None): self.bigchain = Bigchain() + self.event_handler = None + if events_queue: + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -42,6 +46,7 @@ class Election: next_block = self.bigchain.get_block(block_id) result = self.bigchain.block_election(next_block) + self.handle_block_events(result, block_id) if result['status'] == self.bigchain.BLOCK_INVALID: return Block.from_dict(next_block) @@ -67,9 +72,21 @@ class Election: self.bigchain.write_transaction(tx) return invalid_block + def handle_block_events(self, result, block_id): + if self.event_handler: + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID -def create_pipeline(): - election = Election() + event = Event(event_type, self.bigchain.get_block(block_id)) + self.event_handler.put_event(event) + + +def create_pipeline(events_queue=None): + election = Election(events_queue=events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -84,8 +101,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(): - pipeline = create_pipeline() +def start(events_queue=None): + pipeline = create_pipeline(events_queue=events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 01d7a55a..205cdd3c 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,7 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale -from bigchaindb.web import server +from bigchaindb.events import setup_events_queue +from bigchaindb.web import server, websocket_server logger = logging.getLogger(__name__) @@ -25,6 +26,13 @@ BANNER = """ def start(): logger.info('Initializing BigchainDB...') + # Create the events queue + # The events queue needs to be initialized once and shared between + # processes. This seems the best way to do it + # At this point only the election processs and the event consumer require + # this queue. + events_queue = setup_events_queue() + # start the processes logger.info('Starting block') block.start() @@ -36,12 +44,18 @@ def start(): stale.start() logger.info('Starting election') - election.start() + election.start(events_queue=events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() + logger.info('WebSocket server started') + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(events_queue,)) + p_websocket_server.start() + # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 171a3bb6..0c226d7d 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -5,6 +5,9 @@ import logging from flask import jsonify, request +from bigchaindb import config + + logger = logging.getLogger(__name__) @@ -21,3 +24,8 @@ def make_error(status_code, message=None): def base_url(): return '%s://%s/' % (request.environ['wsgi.url_scheme'], request.environ['HTTP_HOST']) + + +def base_ws_uri(): + """Base websocket uri.""" + return 'ws://{host}:{port}'.format(**config['wsserver']) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index 04a15749..51b59643 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -4,8 +4,9 @@ import flask from flask_restful import Resource import bigchaindb -from bigchaindb.web.views.base import base_url +from bigchaindb.web.views.base import base_url, base_ws_uri from bigchaindb import version +from bigchaindb.web.websocket_server import EVENTS_ENDPOINT class RootIndex(Resource): @@ -30,16 +31,18 @@ class RootIndex(Resource): class ApiV1Index(Resource): def get(self): api_root = base_url() + 'api/v1/' + websocket_root = base_ws_uri() + EVENTS_ENDPOINT docs_url = [ 'https://docs.bigchaindb.com/projects/server/en/v', version.__version__, '/drivers-clients/http-client-server-api.html', ] - return { + return flask.jsonify({ '_links': { 'docs': ''.join(docs_url), 'self': api_root, 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', + 'streams_v1': websocket_root, }, - } + }) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py new file mode 100644 index 00000000..5507f504 --- /dev/null +++ b/bigchaindb/web/websocket_server.py @@ -0,0 +1,174 @@ +"""WebSocket server for the BigchainDB Event Stream API.""" + +# NOTE +# +# This module contains some functions and utilities that might belong to other +# modules. For now, I prefer to keep everything in this module. Why? Because +# those functions are needed only here. +# +# When we will extend this part of the project and we find that we need those +# functionalities elsewhere, we can start creating new modules and organizing +# things in a better way. + + +import json +import asyncio +import logging +import threading +from uuid import uuid4 + +import aiohttp +from aiohttp import web + +from bigchaindb import config +from bigchaindb.events import EventTypes + + +logger = logging.getLogger(__name__) +POISON_PILL = 'POISON_PILL' +EVENTS_ENDPOINT = '/api/v1/streams/valid_tx' + + +def _put_into_capped_queue(queue, value): + """Put a new item in a capped queue. + + If the queue reached its limit, get the first element + ready and put the new one. Note that the first element + will be lost (that's the purpose of a capped queue). + + Args: + queue: a queue + value: the value to put + """ + while True: + try: + queue.put_nowait(value) + except asyncio.QueueFull: + queue.get_nowait() + else: + return + + +def _multiprocessing_to_asyncio(in_queue, out_queue, loop): + """Bridge between a synchronous multiprocessing queue + and an asynchronous asyncio queue. + + Args: + in_queue (multiprocessing.Queue): input queue + out_queue (asyncio.Queue): output queue + """ + + while True: + value = in_queue.get() + loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value) + + +class Dispatcher: + """Dispatch events to websockets. + + This class implements a simple publish/subscribe pattern. + """ + + def __init__(self, event_source): + """Create a new instance. + + Args: + event_source: a source of events. Elements in the queue + should be strings. + """ + + self.event_source = event_source + self.subscribers = {} + + def subscribe(self, uuid, websocket): + """Add a websocket to the list of subscribers. + + Args: + uuid (str): a unique identifier for the websocket. + websocket: the websocket to publish information. + """ + + self.subscribers[uuid] = websocket + + @asyncio.coroutine + def publish(self): + """Publish new events to the subscribers.""" + + while True: + event = yield from self.event_source.get() + str_buffer = [] + + if event == POISON_PILL: + return + + if isinstance(event, str): + str_buffer.append(event) + + elif event.type == EventTypes.BLOCK_VALID: + block = event.data + + for tx in block['block']['transactions']: + asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] + data = {'block_id': block['id'], + 'asset_id': asset_id, + 'tx_id': tx['id']} + str_buffer.append(json.dumps(data)) + + for _, websocket in self.subscribers.items(): + for str_item in str_buffer: + websocket.send_str(str_item) + + +@asyncio.coroutine +def websocket_handler(request): + """Handle a new socket connection.""" + + logger.debug('New websocket connection.') + websocket = web.WebSocketResponse() + yield from websocket.prepare(request) + uuid = uuid4() + request.app['dispatcher'].subscribe(uuid, websocket) + + while True: + # Consume input buffer + msg = yield from websocket.receive() + if msg.type == aiohttp.WSMsgType.ERROR: + logger.debug('Websocket exception: %s', websocket.exception()) + return + + +def init_app(event_source, *, loop=None): + """Init the application server. + + Return: + An aiohttp application. + """ + + dispatcher = Dispatcher(event_source) + + # Schedule the dispatcher + loop.create_task(dispatcher.publish()) + + app = web.Application(loop=loop) + app['dispatcher'] = dispatcher + app.router.add_get(EVENTS_ENDPOINT, websocket_handler) + return app + + +def start(sync_event_source, loop=None): + """Create and start the WebSocket server.""" + + if not loop: + loop = asyncio.get_event_loop() + + event_source = asyncio.Queue(maxsize=1024, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_event_source, event_source, loop), + daemon=True) + bridge.start() + + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app, + host=config['wsserver']['host'], + port=config['wsserver']['port']) diff --git a/docs/server/source/drivers-clients/index.rst b/docs/server/source/drivers-clients/index.rst index 39a1cbdc..c2dcf00e 100644 --- a/docs/server/source/drivers-clients/index.rst +++ b/docs/server/source/drivers-clients/index.rst @@ -15,7 +15,6 @@ community projects listed below. :maxdepth: 1 http-client-server-api - websocket-event-stream-api The Python Driver Transaction CLI diff --git a/docs/server/source/index.rst b/docs/server/source/index.rst index 3de3cb5c..7c6df83f 100644 --- a/docs/server/source/index.rst +++ b/docs/server/source/index.rst @@ -11,6 +11,7 @@ BigchainDB Server Documentation production-nodes/index dev-and-test/index server-reference/index + websocket-event-stream-api drivers-clients/index clusters-feds/index data-models/index diff --git a/docs/server/source/drivers-clients/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst similarity index 90% rename from docs/server/source/drivers-clients/websocket-event-stream-api.rst rename to docs/server/source/websocket-event-stream-api.rst index 22effbc1..3ce86553 100644 --- a/docs/server/source/drivers-clients/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -2,7 +2,9 @@ The WebSocket Event Stream API ============================== .. important:: - This is currently scheduled to be implemented in BigchainDB Server 0.10. + The WebSocket Event Stream runs on a different port than the Web API. The + default port for the Web API is `9984`, while the one for the Event Stream + is `9985`. BigchainDB provides real-time event streams over the WebSocket protocol with the Event Stream API. @@ -28,7 +30,7 @@ response contains a ``streams_`` property in ``_links``:: { "_links": { - "streams_v1": "ws://example.com:9984/api/v1/streams/" + "streams_v1": "ws://example.com:9985/api/v1/streams/" } } @@ -80,9 +82,9 @@ the transaction's ID, associated asset ID, and containing block's ID. Example message:: { - "txid": "", - "assetid": "", - "blockid": "" + "tx_id": "", + "asset_id": "", + "block_id": "" } diff --git a/setup.py b/setup.py index c05b554a..45d6f04f 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ tests_require = [ 'pytest-mock', 'pytest-xdist', 'pytest-flask', + 'pytest-aiohttp', 'tox', ] + docs_require @@ -76,6 +77,7 @@ install_requires = [ 'multipipes~=0.1.0', 'jsonschema~=2.5.1', 'pyyaml~=3.12', + 'aiohttp~=2.0', ] setup( diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index 6fb424d6..087e1afe 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -124,10 +124,39 @@ def test_bigchain_export_my_pubkey_when_pubkey_not_set(monkeypatch): "This node's public key wasn't set anywhere so it can't be exported" -def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): +def test_bigchain_run_init_when_db_exists(mocker, capsys): from bigchaindb.commands.bigchaindb import run_init + from bigchaindb.common.exceptions import DatabaseAlreadyExists + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + init_db_mock.side_effect = DatabaseAlreadyExists args = Namespace(config=None) run_init(args) + output_message = capsys.readouterr()[1] + print(output_message) + assert output_message == ( + 'The database already exists.\n' + 'If you wish to re-initialize it, first drop it.\n' + ) + + +def test__run_init(mocker): + from bigchaindb.commands.bigchaindb import _run_init + bigchain_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.bigchaindb.Bigchain') + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + _run_init() + bigchain_mock.assert_called_once_with() + init_db_mock.assert_called_once_with( + connection=bigchain_mock.return_value.connection) + bigchain_mock.return_value.create_genesis_block.assert_called_once_with() @patch('bigchaindb.backend.schema.drop_database') diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 3127dcaf..f0dd232d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -199,3 +199,27 @@ def test_full_pipeline(b, user_pk): tx_from_block = set([tx.id for tx in invalid_block.transactions]) tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))]) assert tx_from_block == tx_from_backlog + + +def test_handle_block_events(): + from bigchaindb.events import setup_events_queue, EventTypes + + events_queue = setup_events_queue() + e = election.Election(events_queue=events_queue) + block_id = 'a' * 64 + + assert events_queue.qsize() == 0 + + # no event should be emitted in case a block is undecided + e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id) + assert events_queue.qsize() == 0 + + # put an invalid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_INVALID + + # put a valid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_VALID diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index 04c70325..7ee74432 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -144,6 +144,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): DATABASE_PORT = 4242 DATABASE_BACKEND = request.config.getoption('--database-backend') SERVER_BIND = '1.2.3.4:56' + WSSERVER_HOST = '1.2.3.4' + WSSERVER_PORT = 57 KEYRING = 'pubkey_0:pubkey_1:pubkey_2' file_config = { @@ -157,6 +159,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'BIGCHAINDB_DATABASE_PORT': str(DATABASE_PORT), 'BIGCHAINDB_DATABASE_BACKEND': DATABASE_BACKEND, 'BIGCHAINDB_SERVER_BIND': SERVER_BIND, + 'BIGCHAINDB_WSSERVER_HOST': WSSERVER_HOST, + 'BIGCHAINDB_WSSERVER_PORT': WSSERVER_PORT, 'BIGCHAINDB_KEYRING': KEYRING}) import bigchaindb @@ -198,6 +202,10 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'workers': None, 'threads': None, }, + 'wsserver': { + 'host': WSSERVER_HOST, + 'port': WSSERVER_PORT, + }, 'database': database, 'keypair': { 'public': None, diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 00000000..22369b51 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,21 @@ +def tests_event_handler(): + from bigchaindb.events import (EventTypes, Event, EventHandler, + setup_events_queue) + + # create and event + event_data = {'msg': 'some data'} + event = Event(EventTypes.BLOCK_VALID, event_data) + # create the events queue + events_queue = setup_events_queue() + + # create event handler + event_handler = EventHandler(events_queue) + + # push and event to the queue + event_handler.put_event(event) + + # get the event from the queue + event_from_queue = event_handler.get_event() + + assert event_from_queue.type == event.type + assert event_from_queue.data == event.data diff --git a/tests/test_processes.py b/tests/test_processes.py index bd69d52c..e6503541 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -9,14 +9,16 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, - mock_process): +@patch('bigchaindb.events.setup_events_queue', spec_set=True, autospec=True) +def test_processes_start(mock_setup_events_queue, mock_process, mock_vote, + mock_block, mock_election, mock_stale): from bigchaindb import processes processes.start() mock_vote.assert_called_with() mock_block.assert_called_with() - mock_election.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() + mock_election.assert_called_once_with( + events_queue=mock_setup_events_queue.return_value) diff --git a/tests/web/test_info.py b/tests/web/test_info.py index c55f467f..eeb80f78 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,5 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/valid_tx', } } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py new file mode 100644 index 00000000..6484ef4e --- /dev/null +++ b/tests/web/test_websocket_server.py @@ -0,0 +1,238 @@ +import asyncio +import json +import queue +import random +import threading +import time +from unittest.mock import patch + +import pytest + + +@pytest.fixture +def _block(b, request): + from bigchaindb.models import Transaction + total = getattr(request, 'param', 1) + transactions = [ + Transaction.create( + [b.me], + [([b.me], 1)], + metadata={'msg': random.random()}, + ).sign([b.me_private]) + for _ in range(total) + ] + return b.create_block(transactions) + + +class MockWebSocket: + def __init__(self): + self.received = [] + + def send_str(self, s): + self.received.append(s) + + +@asyncio.coroutine +def test_bridge_sync_async_queue(loop): + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio + + sync_queue = queue.Queue() + async_queue = asyncio.Queue(loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() + + sync_queue.put('fahren') + sync_queue.put('auf') + sync_queue.put('der') + sync_queue.put('Autobahn') + + result = yield from async_queue.get() + assert result == 'fahren' + + result = yield from async_queue.get() + assert result == 'auf' + + result = yield from async_queue.get() + assert result == 'der' + + result = yield from async_queue.get() + assert result == 'Autobahn' + + assert async_queue.qsize() == 0 + + +@asyncio.coroutine +def test_put_into_capped_queue(loop): + from bigchaindb.web.websocket_server import _put_into_capped_queue + q = asyncio.Queue(maxsize=2, loop=loop) + + _put_into_capped_queue(q, 'Friday') + assert q._queue[0] == 'Friday' + + _put_into_capped_queue(q, "I'm") + assert q._queue[0] == 'Friday' + assert q._queue[1] == "I'm" + + _put_into_capped_queue(q, 'in') + assert q._queue[0] == "I'm" + assert q._queue[1] == 'in' + + _put_into_capped_queue(q, 'love') + assert q._queue[0] == 'in' + assert q._queue[1] == 'love' + + +@asyncio.coroutine +def test_capped_queue(loop): + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio + + sync_queue = queue.Queue() + async_queue = asyncio.Queue(maxsize=2, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() + + sync_queue.put('we') + sync_queue.put('are') + sync_queue.put('the') + sync_queue.put('robots') + + # Wait until the thread processes all the items + time.sleep(1) + + result = yield from async_queue.get() + assert result == 'the' + + result = yield from async_queue.get() + assert result == 'robots' + + assert async_queue.qsize() == 0 + + +@patch('threading.Thread') +@patch('aiohttp.web.run_app') +@patch('bigchaindb.web.websocket_server.init_app') +@patch('asyncio.get_event_loop', return_value='event-loop') +@patch('asyncio.Queue', return_value='event-queue') +def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, + init_app_mock, run_app_mock, + thread_mock): + from bigchaindb import config + from bigchaindb.web.websocket_server import start, _multiprocessing_to_asyncio + + start(None) + thread_mock.assert_called_once_with( + target=_multiprocessing_to_asyncio, + args=(None, queue_mock.return_value, get_event_loop_mock.return_value), + daemon=True, + ) + thread_mock.return_value.start.assert_called_once_with() + init_app_mock.assert_called_with('event-queue', loop='event-loop') + run_app_mock.assert_called_once_with( + init_app_mock.return_value, + host=config['wsserver']['host'], + port=config['wsserver']['port'], + ) + + +@asyncio.coroutine +def test_websocket_string_event(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect(EVENTS_ENDPOINT) + + yield from event_source.put('hack') + yield from event_source.put('the') + yield from event_source.put('planet!') + + result = yield from ws.receive() + assert result.data == 'hack' + + result = yield from ws.receive() + assert result.data == 'the' + + result = yield from ws.receive() + assert result.data == 'planet!' + + yield from event_source.put(POISON_PILL) + + +@asyncio.coroutine +@pytest.mark.parametrize('_block', (10,), indirect=('_block',), ids=('block',)) +def test_websocket_block_event(b, _block, test_client, loop): + from bigchaindb import events + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect(EVENTS_ENDPOINT) + block = _block.to_dict() + block_event = events.Event(events.EventTypes.BLOCK_VALID, block) + + yield from event_source.put(block_event) + + for tx in block['block']['transactions']: + result = yield from ws.receive() + json_result = json.loads(result.data) + assert json_result['tx_id'] == tx['id'] + # Since the transactions are all CREATEs, asset id == transaction id + assert json_result['asset_id'] == tx['id'] + assert json_result['block_id'] == block['id'] + + yield from event_source.put(POISON_PILL) + + +@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang') +@pytest.mark.genesis +def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): + # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much + # magic in the `asyncio` module: running this test without monkey-patching + # `asycio.get_event_loop` (and without the `loop` fixture) raises a: + # RuntimeError: There is no current event loop in thread 'MainThread'. + # + # That's pretty weird because this test doesn't use the pytest-aiohttp + # plugin explicitely. + monkeypatch.setattr('asyncio.get_event_loop', lambda: loop) + + import json + import random + import aiohttp + + from bigchaindb.common import crypto + from bigchaindb import processes + from bigchaindb.models import Transaction + + # Start BigchainDB + processes.start() + + loop = asyncio.get_event_loop() + + import time + time.sleep(1) + + ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1'] + + # Connect to the WebSocket endpoint + session = aiohttp.ClientSession() + ws = loop.run_until_complete(session.ws_connect(ws_url)) + + # Create a keypair and generate a new asset + user_priv, user_pub = crypto.generate_key_pair() + asset = {'random': random.random()} + tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset) + tx = tx.sign([user_priv]) + # Post the transaction to the BigchainDB Web API + client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict())) + + result = loop.run_until_complete(ws.receive()) + json_result = json.loads(result.data) + assert json_result['tx_id'] == tx.id