From 9fd40682f235d043b12e48f83308a2b4e6de9087 Mon Sep 17 00:00:00 2001 From: Troy McConaghy Date: Fri, 24 Mar 2017 15:38:27 +0100 Subject: [PATCH 01/32] docs re: database.connection_timeout and database.max_tries --- .../source/server-reference/configuration.md | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/docs/server/source/server-reference/configuration.md b/docs/server/source/server-reference/configuration.md index 4cd9e9d4..42f22d4e 100644 --- a/docs/server/source/server-reference/configuration.md +++ b/docs/server/source/server-reference/configuration.md @@ -16,6 +16,8 @@ For convenience, here's a list of all the relevant environment variables (docume `BIGCHAINDB_DATABASE_PORT`
`BIGCHAINDB_DATABASE_NAME`
`BIGCHAINDB_DATABASE_REPLICASET`
+`BIGCHAINDB_DATABASE_CONNECTION_TIMEOUT`
+`BIGCHAINDB_DATABASE_MAX_TRIES`
`BIGCHAINDB_SERVER_BIND`
`BIGCHAINDB_SERVER_WORKERS`
`BIGCHAINDB_SERVER_THREADS`
@@ -85,9 +87,18 @@ Note how the keys in the list are separated by colons. ``` -## database.backend, database.host, database.port, database.name & database.replicaset +## database.* -The database backend to use (`rethinkdb` or `mongodb`) and its hostname, port and name. If the database backend is `mongodb`, then there's a fifth setting: the name of the replica set. If the database backend is `rethinkdb`, you *can* set the name of the replica set, but it won't be used for anything. +The settings with names of the form `database.*` are for the database backend +(currently either RethinkDB or MongoDB). They are: + +* `database.backend` is either `rethinkdb` or `mongodb`. +* `database.host` is the hostname (FQDN) of the backend database. +* `database.port` is self-explanatory. +* `database.name` is a user-chosen name for the database inside RethinkDB or MongoDB, e.g. `bigchain`. +* `database.replicaset` is only relevant if using MongoDB; it's the name of the MongoDB replica set, e.g. `bigchain-rs`. +* `database.connection_timeout` is the maximum number of milliseconds that BigchainDB will wait before giving up on one attempt to connect to the database backend. +* `database.max_tries` is the maximum number of times that BigchainDB will try to establish a connection with the database backend. If 0, then it will try forever. **Example using environment variables** ```text @@ -96,6 +107,8 @@ export BIGCHAINDB_DATABASE_HOST=localhost export BIGCHAINDB_DATABASE_PORT=27017 export BIGCHAINDB_DATABASE_NAME=bigchain export BIGCHAINDB_DATABASE_REPLICASET=bigchain-rs +export BIGCHAINDB_DATABASE_CONNECTION_TIMEOUT=5000 +export BIGCHAINDB_DATABASE_MAX_TRIES=3 ``` **Default values** @@ -105,8 +118,10 @@ If (no environment variables were set and there's no local config file), or you "database": { "backend": "rethinkdb", "host": "localhost", + "port": 28015, "name": "bigchain", - "port": 28015 + "connection_timeout": 5000, + "max_tries": 3 } ``` @@ -115,9 +130,11 @@ If you used `bigchaindb -y configure mongodb` to create a default local config f "database": { "backend": "mongodb", "host": "localhost", - "name": "bigchain", "port": 27017, - "replicaset": "bigchain-rs" + "name": "bigchain", + "replicaset": "bigchain-rs", + "connection_timeout": 5000, + "max_tries": 3 } ``` From 0cbf144ddf364813d2651793494ad71ab0d58553 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 28 Mar 2017 16:05:44 +0200 Subject: [PATCH 02/32] Initial implementation of an event_handler --- bigchaindb/events.py | 33 +++++++++++++++++++ bigchaindb/pipelines/election.py | 24 +++++++++++--- .../pipelines/events_consumer_example.py | 14 ++++++++ bigchaindb/processes.py | 16 ++++++++- 4 files changed, 81 insertions(+), 6 deletions(-) create mode 100644 bigchaindb/events.py create mode 100644 bigchaindb/pipelines/events_consumer_example.py diff --git a/bigchaindb/events.py b/bigchaindb/events.py new file mode 100644 index 00000000..a061ad50 --- /dev/null +++ b/bigchaindb/events.py @@ -0,0 +1,33 @@ +from enum import Enum +from multiprocessing import Queue + + +class EventTypes(Enum): + BLOCK_VALID = 1 + BLOCK_INVALID = 2 + + +class Event(object): + + def __init__(self, event_type, event_data): + self.type = event_type + self.data = event_data + + +class EventHandler(object): + + def __init__(self, events_queue): + self.events_queue = events_queue + + def put_event(self, event, timeout=None): + # TODO: handle timeouts + self.events_queue.put(event, timeout=None) + + def get_event(self, timeout=None): + # TODO: handle timeouts + return self.events_queue.get(timeout=None) + + +def setup_events_queue(): + # TODO: set bounds to the queue + return Queue() diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index a5818b3e..b17f5722 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -13,6 +13,7 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain +from bigchaindb.events import EventHandler, Event, EventTypes logger = logging.getLogger(__name__) @@ -22,8 +23,9 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self): + def __init__(self, events_queue): self.bigchain = Bigchain() + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -42,6 +44,7 @@ class Election: next_block = self.bigchain.get_block(block_id) result = self.bigchain.block_election(next_block) + self.handle_block_events(result, block_id) if result['status'] == self.bigchain.BLOCK_INVALID: return Block.from_dict(next_block) @@ -67,9 +70,20 @@ class Election: self.bigchain.write_transaction(tx) return invalid_block + def handle_block_events(self, result, block_id): + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID -def create_pipeline(): - election = Election() + event = Event(event_type, {'block_id': block_id}) + self.event_handler.put_event(event) + + +def create_pipeline(events_queue): + election = Election(events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -84,8 +98,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(): - pipeline = create_pipeline() +def start(events_queue): + pipeline = create_pipeline(events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/pipelines/events_consumer_example.py b/bigchaindb/pipelines/events_consumer_example.py new file mode 100644 index 00000000..7e833c82 --- /dev/null +++ b/bigchaindb/pipelines/events_consumer_example.py @@ -0,0 +1,14 @@ +import multiprocessing as mp + +from bigchaindb.events import EventHandler + + +def consume_events(events_queue): + event_handler = EventHandler(events_queue) + while True: + event = event_handler.get_event() + print('Event type: {} Event data: {}'.format(event.type, event.data)) + + +def events_consumer(events_queue): + return mp.Process(target=consume_events, args=(events_queue,)) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 01d7a55a..687422ca 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,6 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale +from bigchaindb.pipelines.events_consumer_example import events_consumer +from bigchaindb.events import setup_events_queue from bigchaindb.web import server @@ -25,6 +27,13 @@ BANNER = """ def start(): logger.info('Initializing BigchainDB...') + # Create the events queue + # The events queue needs to be initialized once and shared between + # processes. This seems the best way to do it + # At this point only the election processs and the event consumer require + # this queue. + events_queue = setup_events_queue() + # start the processes logger.info('Starting block') block.start() @@ -36,12 +45,17 @@ def start(): stale.start() logger.info('Starting election') - election.start() + election.start(events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() + # start the example events consumer + logger.info('Starting the events consumer example') + p_events_consumer = events_consumer(events_queue) + p_events_consumer.start() + # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) From 5d39b42b7a8f32389a28b456ca1cd855a8f47b31 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 30 Mar 2017 17:27:03 +0200 Subject: [PATCH 03/32] Add dependencies and first test --- bigchaindb/web/websocket_server.py | 56 ++++++++++++++++++++++++++++++ setup.py | 1 + tests/web/test_websocket_server.py | 15 ++++++++ 3 files changed, 72 insertions(+) create mode 100644 bigchaindb/web/websocket_server.py create mode 100644 tests/web/test_websocket_server.py diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py new file mode 100644 index 00000000..7a5b3d77 --- /dev/null +++ b/bigchaindb/web/websocket_server.py @@ -0,0 +1,56 @@ +"""WebSocket server for the BigchainDB Event Stream API.""" + +import asyncio +from uuid import uuid4 + +from aiohttp import web + + +class PoisonPill: + pass + + +POISON_PILL = PoisonPill() + + +class Dispatcher: + + def __init__(self, event_source): + self.event_source = event_source + self.subscribers = {} + + def subscribe(self, uuid, ws): + self.subscribers[uuid] = ws + + @asyncio.coroutine + def publish(self): + while True: + event = yield from self.event_source.get() + if event == POISON_PILL: + return + for uuid, ws in self.subscribers.items(): + ws.send_str(event) + + +@asyncio.coroutine +def websocket_handler(request): + ws = web.WebSocketResponse() + yield from ws.prepare(request) + uuid = uuid4() + request.app['dispatcher'].subscribe(uuid, ws) + while True: + # Consume input buffer + yield from ws.receive() + return ws + + +def init_app(event_source, loop=None): + dispatcher = Dispatcher(event_source) + + # Schedule the dispatcher + loop.create_task(dispatcher.publish()) + + app = web.Application(loop=loop) + app['dispatcher'] = dispatcher + app.router.add_get('/', websocket_handler) + return app diff --git a/setup.py b/setup.py index c05b554a..ee8871d4 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ tests_require = [ 'pytest-mock', 'pytest-xdist', 'pytest-flask', + 'pytest-aiohttp', 'tox', ] + docs_require diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py new file mode 100644 index 00000000..fb6d555b --- /dev/null +++ b/tests/web/test_websocket_server.py @@ -0,0 +1,15 @@ +import asyncio + + +@asyncio.coroutine +def test_websocket(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect('/') + yield from event_source.put('antani') + yield from event_source.put(POISON_PILL) + result = yield from ws.receive() + assert result.data == 'antani' From 83397de397179d219938dc63280a61f5f8b56f58 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 31 Mar 2017 15:56:29 +0200 Subject: [PATCH 04/32] Add more tests and utils --- bigchaindb/web/websocket_server.py | 61 +++++++++++++++++++++++---- tests/web/test_websocket_server.py | 67 ++++++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 10 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 7a5b3d77..9d8f5ef9 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -1,29 +1,48 @@ """WebSocket server for the BigchainDB Event Stream API.""" import asyncio +import logging from uuid import uuid4 +import aiohttp from aiohttp import web -class PoisonPill: - pass - - -POISON_PILL = PoisonPill() +logger = logging.getLogger(__name__) +POISON_PILL = 'POISON_PILL' class Dispatcher: + """Dispatch events to websockets. + + This class implements a simple publish/subscribe pattern. + """ def __init__(self, event_source): + """Create a new instance. + + Args: + event_source: a source of events. Elements in the queue + should be strings. + """ + self.event_source = event_source self.subscribers = {} def subscribe(self, uuid, ws): + """Add a websocket to the list of subscribers. + + Args: + uuid (str): a unique identifier for the websocket. + ws: the websocket to publish information. + """ + self.subscribers[uuid] = ws @asyncio.coroutine def publish(self): + """Publish new events to the subscribers.""" + while True: event = yield from self.event_source.get() if event == POISON_PILL: @@ -34,17 +53,29 @@ class Dispatcher: @asyncio.coroutine def websocket_handler(request): + """Handle a new socket connection.""" + + logger.debug('New websocket connection.') ws = web.WebSocketResponse() yield from ws.prepare(request) uuid = uuid4() request.app['dispatcher'].subscribe(uuid, ws) + while True: # Consume input buffer - yield from ws.receive() - return ws + msg = yield from ws.receive() + if msg.type == aiohttp.WSMsgType.ERROR: + logger.debug('Websocket exception: {}'.format(ws.exception())) + return def init_app(event_source, loop=None): + """Init the application server. + + Return: + An aiohttp application. + """ + dispatcher = Dispatcher(event_source) # Schedule the dispatcher @@ -54,3 +85,19 @@ def init_app(event_source, loop=None): app['dispatcher'] = dispatcher app.router.add_get('/', websocket_handler) return app + + +@asyncio.coroutine +def constant_event_source(event_source): + while True: + yield from asyncio.sleep(1) + yield from event_source.put('meow') + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + event_source = asyncio.Queue() + + loop.create_task(constant_event_source(event_source)) + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app, port=9985) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index fb6d555b..382a20f0 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,41 @@ +import pytest import asyncio +class MockWebSocket: + def __init__(self): + self.received = [] + + def send_str(self, s): + self.received.append(s) + + +@asyncio.coroutine +@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') +def test_dispatcher(loop): + from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL + + ws0 = MockWebSocket() + ws1 = MockWebSocket() + + event_source = asyncio.Queue(loop=loop) + dispatcher = Dispatcher(event_source) + + dispatcher.subscribe(0, ws0) + dispatcher.subscribe(1, ws1) + + yield from event_source.put('hack') + yield from event_source.put('the') + + yield from event_source.put('planet!') + yield from event_source.put(POISON_PILL) + + loop.run_until_complete(dispatcher.publish()) + + assert ws0.received == ['hack', 'the', 'planet!'] + assert ws1.received == ['planet!'] + + @asyncio.coroutine def test_websocket(test_client, loop): from bigchaindb.web.websocket_server import init_app, POISON_PILL @@ -9,7 +44,33 @@ def test_websocket(test_client, loop): app = init_app(event_source, loop=loop) client = yield from test_client(app) ws = yield from client.ws_connect('/') - yield from event_source.put('antani') - yield from event_source.put(POISON_PILL) + + yield from event_source.put('hack') + yield from event_source.put('the') + yield from event_source.put('planet!') + result = yield from ws.receive() - assert result.data == 'antani' + assert result.data == 'hack' + + result = yield from ws.receive() + assert result.data == 'the' + + result = yield from ws.receive() + assert result.data == 'planet!' + + yield from event_source.put(POISON_PILL) + + +@asyncio.coroutine +@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") +def test_websocket_error(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL + + event_source = asyncio.Queue(loop=loop) + app = init_app(event_source, loop=loop) + client = yield from test_client(app) + ws = yield from client.ws_connect('/') + + yield from ws.close() + + yield from event_source.put(POISON_PILL) From 96daa986994413cefdfc6f933091840233a53ff5 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 29 Mar 2017 12:09:14 +0200 Subject: [PATCH 05/32] Adverstise Event stream api in api info endpoint. Updated tests. --- bigchaindb/web/views/base.py | 5 +++++ bigchaindb/web/views/info.py | 9 ++++++--- tests/web/test_info.py | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 171a3bb6..5a0ec97b 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -21,3 +21,8 @@ def make_error(status_code, message=None): def base_url(): return '%s://%s/' % (request.environ['wsgi.url_scheme'], request.environ['HTTP_HOST']) + + +def base_ws_uri(): + """Base websocket uri.""" + return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index 04a15749..b35c6378 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -4,7 +4,7 @@ import flask from flask_restful import Resource import bigchaindb -from bigchaindb.web.views.base import base_url +from bigchaindb.web.views.base import base_url, base_ws_uri from bigchaindb import version @@ -30,16 +30,19 @@ class RootIndex(Resource): class ApiV1Index(Resource): def get(self): api_root = base_url() + 'api/v1/' + websocket_root = base_ws_uri() + 'api/v1/' docs_url = [ 'https://docs.bigchaindb.com/projects/server/en/v', version.__version__, '/drivers-clients/http-client-server-api.html', ] - return { + return flask.jsonify({ '_links': { 'docs': ''.join(docs_url), 'self': api_root, 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', + # TODO: The version should probably not be hardcoded + 'streams_v1': websocket_root + 'streams/', }, - } + }) diff --git a/tests/web/test_info.py b/tests/web/test_info.py index c55f467f..93e14cbd 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,5 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', + 'streams_v1': 'ws://localhost/api/v1/streams/', } } From 83a7cffc3fe88c2ffeda8a15a19d72c9f010309d Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:29:31 +0200 Subject: [PATCH 06/32] fix tests --- bigchaindb/pipelines/election.py | 31 +++++++++++++++++-------------- bigchaindb/processes.py | 2 +- tests/test_processes.py | 10 ++++++---- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index b17f5722..8f3116cc 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -23,9 +23,11 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self, events_queue): + def __init__(self, events_queue=None): self.bigchain = Bigchain() - self.event_handler = EventHandler(events_queue) + self.event_handler = None + if events_queue: + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -71,19 +73,20 @@ class Election: return invalid_block def handle_block_events(self, result, block_id): - if result['status'] == self.bigchain.BLOCK_UNDECIDED: - return - elif result['status'] == self.bigchain.BLOCK_INVALID: - event_type = EventTypes.BLOCK_INVALID - elif result['status'] == self.bigchain.BLOCK_VALID: - event_type = EventTypes.BLOCK_VALID + if self.event_handler: + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID - event = Event(event_type, {'block_id': block_id}) - self.event_handler.put_event(event) + event = Event(event_type, {'block_id': block_id}) + self.event_handler.put_event(event) -def create_pipeline(events_queue): - election = Election(events_queue) +def create_pipeline(events_queue=None): + election = Election(events_queue=events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -98,8 +101,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(events_queue): - pipeline = create_pipeline(events_queue) +def start(events_queue=None): + pipeline = create_pipeline(events_queue=events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 687422ca..5194c05a 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -45,7 +45,7 @@ def start(): stale.start() logger.info('Starting election') - election.start(events_queue) + election.start(events_queue=events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) diff --git a/tests/test_processes.py b/tests/test_processes.py index bd69d52c..32d784bb 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from multiprocessing import Process +from multiprocessing import Process, Queue from bigchaindb.pipelines import vote, block, election, stale @@ -9,14 +9,16 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, - mock_process): +def test_processes_start(mock_process, mock_vote, mock_block, mock_election, + mock_stale): from bigchaindb import processes processes.start() mock_vote.assert_called_with() mock_block.assert_called_with() - mock_election.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() + assert mock_election.call_count == 1 + # the events queue is declared inside processes.start() + assert type(mock_election.call_args[1]['events_queue']) == type(Queue()) From 730b7482f6c8df79d30b8de48152770d9596cfdf Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:31:38 +0200 Subject: [PATCH 07/32] cleanup code --- bigchaindb/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigchaindb/events.py b/bigchaindb/events.py index a061ad50..bc448ce3 100644 --- a/bigchaindb/events.py +++ b/bigchaindb/events.py @@ -7,14 +7,14 @@ class EventTypes(Enum): BLOCK_INVALID = 2 -class Event(object): +class Event: def __init__(self, event_type, event_data): self.type = event_type self.data = event_data -class EventHandler(object): +class EventHandler: def __init__(self, events_queue): self.events_queue = events_queue From bcc2e1f781f5c082df091e75ba4aa8ebad7fe20f Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:48:50 +0200 Subject: [PATCH 08/32] fixed pep8 issue --- tests/test_processes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 32d784bb..7f8ffcd9 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -19,6 +19,5 @@ def test_processes_start(mock_process, mock_vote, mock_block, mock_election, mock_block.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() - assert mock_election.call_count == 1 # the events queue is declared inside processes.start() - assert type(mock_election.call_args[1]['events_queue']) == type(Queue()) + assert mock_election.call_count == 1 From a92c091eeb11b4fd1cc1c20684c10e0e1576392f Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Mon, 3 Apr 2017 14:55:21 +0200 Subject: [PATCH 09/32] fix pep8 issue --- tests/test_processes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 7f8ffcd9..00716010 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from multiprocessing import Process, Queue +from multiprocessing import Process from bigchaindb.pipelines import vote, block, election, stale From 64a033b17a49e68b685a386a2ef258290b6c612a Mon Sep 17 00:00:00 2001 From: vrde Date: Mon, 3 Apr 2017 11:48:48 +0200 Subject: [PATCH 10/32] Code cleanup, rename some vars --- bigchaindb/web/websocket_server.py | 60 +++++++++++++++++++----------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 9d8f5ef9..6915d54a 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -29,15 +29,15 @@ class Dispatcher: self.event_source = event_source self.subscribers = {} - def subscribe(self, uuid, ws): + def subscribe(self, uuid, websocket): """Add a websocket to the list of subscribers. Args: uuid (str): a unique identifier for the websocket. - ws: the websocket to publish information. + websocket: the websocket to publish information. """ - self.subscribers[uuid] = ws + self.subscribers[uuid] = websocket @asyncio.coroutine def publish(self): @@ -47,8 +47,8 @@ class Dispatcher: event = yield from self.event_source.get() if event == POISON_PILL: return - for uuid, ws in self.subscribers.items(): - ws.send_str(event) + for uuid, websocket in self.subscribers.items(): + websocket.send_str(event) @asyncio.coroutine @@ -56,20 +56,20 @@ def websocket_handler(request): """Handle a new socket connection.""" logger.debug('New websocket connection.') - ws = web.WebSocketResponse() - yield from ws.prepare(request) + websocket = web.WebSocketResponse() + yield from websocket.prepare(request) uuid = uuid4() - request.app['dispatcher'].subscribe(uuid, ws) + request.app['dispatcher'].subscribe(uuid, websocket) while True: # Consume input buffer - msg = yield from ws.receive() + msg = yield from websocket.receive() if msg.type == aiohttp.WSMsgType.ERROR: - logger.debug('Websocket exception: {}'.format(ws.exception())) + logger.debug('Websocket exception: %s', websocket.exception()) return -def init_app(event_source, loop=None): +def init_app(event_source, *, loop=None): """Init the application server. Return: @@ -87,17 +87,33 @@ def init_app(event_source, loop=None): return app -@asyncio.coroutine -def constant_event_source(event_source): - while True: - yield from asyncio.sleep(1) - yield from event_source.put('meow') +def start(event_source, *, loop=None): + """Create and start the WebSocket server.""" + + if not loop: + loop = asyncio.get_event_loop() + + app = init_app(event_source, loop=loop) + aiohttp.web.run_app(app, port=9985) + + +def test_websocket_server(): + """Set up a server and output a message every second. + Used for testing purposes.""" + + @asyncio.coroutine + def constant_event_source(event_source): + """Put a message in ``event_source`` every second.""" + + while True: + yield from asyncio.sleep(1) + yield from event_source.put('meow') + + loop = asyncio.get_event_loop() + event_source = asyncio.Queue() + loop.create_task(constant_event_source(event_source)) + start(event_source, loop=loop) if __name__ == '__main__': - loop = asyncio.get_event_loop() - event_source = asyncio.Queue() - - loop.create_task(constant_event_source(event_source)) - app = init_app(event_source, loop=loop) - aiohttp.web.run_app(app, port=9985) + test_websocket_server() From f23faaa65fa1bdbe487266432faea9e4331cecae Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 09:16:22 +0200 Subject: [PATCH 11/32] Add WebSocket server --- bigchaindb/pipelines/election.py | 2 +- bigchaindb/processes.py | 12 +- bigchaindb/web/views/base.py | 2 +- bigchaindb/web/websocket_server.py | 103 ++++++++++++---- setup.py | 1 + tests/web/test_info.py | 2 +- tests/web/test_websocket_server.py | 183 +++++++++++++++++++++++++---- 7 files changed, 246 insertions(+), 59 deletions(-) diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 8f3116cc..fc7cb077 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -81,7 +81,7 @@ class Election: elif result['status'] == self.bigchain.BLOCK_VALID: event_type = EventTypes.BLOCK_VALID - event = Event(event_type, {'block_id': block_id}) + event = Event(event_type, self.bigchain.get_block(block_id)) self.event_handler.put_event(event) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 5194c05a..205cdd3c 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,9 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale -from bigchaindb.pipelines.events_consumer_example import events_consumer from bigchaindb.events import setup_events_queue -from bigchaindb.web import server +from bigchaindb.web import server, websocket_server logger = logging.getLogger(__name__) @@ -52,10 +51,11 @@ def start(): p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() - # start the example events consumer - logger.info('Starting the events consumer example') - p_events_consumer = events_consumer(events_queue) - p_events_consumer.start() + logger.info('WebSocket server started') + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(events_queue,)) + p_websocket_server.start() # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 5a0ec97b..5ab409b0 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -25,4 +25,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) + return 'ws://localhost:9985/' diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 6915d54a..dc320754 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -1,15 +1,64 @@ """WebSocket server for the BigchainDB Event Stream API.""" +# NOTE +# +# This module contains some functions and utilities that might belong to other +# modules. For now, I prefer to keep everything in this module. Why? Because +# those functions are needed only here. +# +# When we will extend this part of the project and we find that we need those +# functionalities elsewhere, we can start creating new modules and organizing +# things in a better way. + + +import json import asyncio import logging +import threading from uuid import uuid4 import aiohttp from aiohttp import web +from bigchaindb.events import EventTypes + logger = logging.getLogger(__name__) POISON_PILL = 'POISON_PILL' +EVENTS_ENDPOINT = '/api/v1/streams/' + + +def _put_into_capped_queue(queue, value): + """Put a new item in a capped queue. + + If the queue reached its limit, get the first element + ready and put the new one. Note that the first element + will be lost (that's the purpose of a capped queue). + + Args: + queue: a queue + value: the value to put + """ + while True: + try: + queue.put_nowait(value) + return + except asyncio.QueueFull: + queue.get_nowait() + + +def _multiprocessing_to_asyncio(in_queue, out_queue, loop): + """Bridge between a synchronous multiprocessing queue + and an asynchronous asyncio queue. + + Args: + in_queue (multiprocessing.Queue): input queue + out_queue (asyncio.Queue): output queue + """ + + while True: + value = in_queue.get() + loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value) class Dispatcher: @@ -45,10 +94,27 @@ class Dispatcher: while True: event = yield from self.event_source.get() + str_buffer = [] + if event == POISON_PILL: return - for uuid, websocket in self.subscribers.items(): - websocket.send_str(event) + + if isinstance(event, str): + str_buffer.append(event) + + elif event.type == EventTypes.BLOCK_VALID: + block = event.data + + for tx in block['block']['transactions']: + asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] + data = {'blockid': block['id'], + 'assetid': asset_id, + 'txid': tx['id']} + str_buffer.append(json.dumps(data)) + + for _, websocket in self.subscribers.items(): + for str_item in str_buffer: + websocket.send_str(str_item) @asyncio.coroutine @@ -83,37 +149,22 @@ def init_app(event_source, *, loop=None): app = web.Application(loop=loop) app['dispatcher'] = dispatcher - app.router.add_get('/', websocket_handler) + app.router.add_get(EVENTS_ENDPOINT, websocket_handler) return app -def start(event_source, *, loop=None): +def start(sync_event_source, loop=None): """Create and start the WebSocket server.""" if not loop: loop = asyncio.get_event_loop() + event_source = asyncio.Queue(maxsize=1024, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_event_source, event_source, loop), + daemon=True) + bridge.start() + app = init_app(event_source, loop=loop) aiohttp.web.run_app(app, port=9985) - - -def test_websocket_server(): - """Set up a server and output a message every second. - Used for testing purposes.""" - - @asyncio.coroutine - def constant_event_source(event_source): - """Put a message in ``event_source`` every second.""" - - while True: - yield from asyncio.sleep(1) - yield from event_source.put('meow') - - loop = asyncio.get_event_loop() - event_source = asyncio.Queue() - loop.create_task(constant_event_source(event_source)) - start(event_source, loop=loop) - - -if __name__ == '__main__': - test_websocket_server() diff --git a/setup.py b/setup.py index ee8871d4..45d6f04f 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ install_requires = [ 'multipipes~=0.1.0', 'jsonschema~=2.5.1', 'pyyaml~=3.12', + 'aiohttp~=2.0', ] setup( diff --git a/tests/web/test_info.py b/tests/web/test_info.py index 93e14cbd..4dc60168 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,6 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', - 'streams_v1': 'ws://localhost/api/v1/streams/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/', } } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 382a20f0..b205fb25 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,23 @@ +import json +import random + import pytest import asyncio +from bigchaindb.models import Transaction + + +def create_block(b, total=1): + transactions = [ + Transaction.create( + [b.me], + [([b.me], 1)], + metadata={'msg': random.random()}, + ).sign([b.me_private]) + for _ in range(total) + ] + return b.create_block(transactions) + class MockWebSocket: def __init__(self): @@ -11,39 +28,100 @@ class MockWebSocket: @asyncio.coroutine -@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') -def test_dispatcher(loop): - from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL +def test_bridge_sync_async_queue(loop): + import queue + import threading + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio - ws0 = MockWebSocket() - ws1 = MockWebSocket() + sync_queue = queue.Queue() + async_queue = asyncio.Queue(loop=loop) - event_source = asyncio.Queue(loop=loop) - dispatcher = Dispatcher(event_source) + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() - dispatcher.subscribe(0, ws0) - dispatcher.subscribe(1, ws1) + sync_queue.put('fahren') + sync_queue.put('auf') + sync_queue.put('der') + sync_queue.put('Autobahn') - yield from event_source.put('hack') - yield from event_source.put('the') + result = yield from async_queue.get() + assert result == 'fahren' - yield from event_source.put('planet!') - yield from event_source.put(POISON_PILL) + result = yield from async_queue.get() + assert result == 'auf' - loop.run_until_complete(dispatcher.publish()) + result = yield from async_queue.get() + assert result == 'der' - assert ws0.received == ['hack', 'the', 'planet!'] - assert ws1.received == ['planet!'] + result = yield from async_queue.get() + assert result == 'Autobahn' + + assert async_queue.qsize() == 0 @asyncio.coroutine -def test_websocket(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_put_into_capped_queue(loop): + from bigchaindb.web.websocket_server import _put_into_capped_queue + q = asyncio.Queue(maxsize=2, loop=loop) + + _put_into_capped_queue(q, 'Friday') + assert q._queue[0] == 'Friday' + + _put_into_capped_queue(q, "I'm") + assert q._queue[0] == 'Friday' + assert q._queue[1] == "I'm" + + _put_into_capped_queue(q, 'in') + assert q._queue[0] == "I'm" + assert q._queue[1] == 'in' + + _put_into_capped_queue(q, 'love') + assert q._queue[0] == 'in' + assert q._queue[1] == 'love' + + +@asyncio.coroutine +def test_capped_queue(loop): + import queue + import threading + import time + from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio + + sync_queue = queue.Queue() + async_queue = asyncio.Queue(maxsize=2, loop=loop) + + bridge = threading.Thread(target=_multiprocessing_to_asyncio, + args=(sync_queue, async_queue, loop), + daemon=True) + bridge.start() + + sync_queue.put('we') + sync_queue.put('are') + sync_queue.put('the') + sync_queue.put('robots') + + # Wait until the thread processes all the items + time.sleep(1) + + result = yield from async_queue.get() + assert result == 'the' + + result = yield from async_queue.get() + assert result == 'robots' + + assert async_queue.qsize() == 0 + + +@asyncio.coroutine +def test_websocket_string_event(test_client, loop): + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) yield from event_source.put('hack') yield from event_source.put('the') @@ -62,15 +140,72 @@ def test_websocket(test_client, loop): @asyncio.coroutine -@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") -def test_websocket_error(test_client, loop): - from bigchaindb.web.websocket_server import init_app, POISON_PILL +def test_websocket_block_event(b, test_client, loop): + from bigchaindb import events + from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT event_source = asyncio.Queue(loop=loop) app = init_app(event_source, loop=loop) client = yield from test_client(app) - ws = yield from client.ws_connect('/') + ws = yield from client.ws_connect(EVENTS_ENDPOINT) + block = create_block(b, 10).to_dict() + block_event = events.Event(events.EventTypes.BLOCK_VALID, block) - yield from ws.close() + yield from event_source.put(block_event) + + for tx in block['block']['transactions']: + result = yield from ws.receive() + json_result = json.loads(result.data) + assert json_result['txid'] == tx['id'] + # Since the transactions are all CREATEs, asset id == transaction id + assert json_result['assetid'] == tx['id'] + assert json_result['blockid'] == block['id'] yield from event_source.put(POISON_PILL) + + +@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang') +@pytest.mark.genesis +def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): + # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much + # magic in the `asyncio` module: running this test without monkey-patching + # `asycio.get_event_loop` (and without the `loop` fixture) raises a: + # RuntimeError: There is no current event loop in thread 'MainThread'. + # + # That's pretty weird because this test doesn't use the pytest-aiohttp + # plugin explicitely. + monkeypatch.setattr('asyncio.get_event_loop', lambda: loop) + + import json + import random + import aiohttp + + from bigchaindb.common import crypto + from bigchaindb import processes + from bigchaindb.models import Transaction + + # Start BigchainDB + processes.start() + + loop = asyncio.get_event_loop() + + import time + time.sleep(1) + + ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1'] + + # Connect to the WebSocket endpoint + session = aiohttp.ClientSession() + ws = loop.run_until_complete(session.ws_connect(ws_url)) + + # Create a keypair and generate a new asset + user_priv, user_pub = crypto.generate_key_pair() + asset = {'random': random.random()} + tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset) + tx = tx.sign([user_priv]) + # Post the transaction to the BigchainDB Web API + client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict())) + + result = loop.run_until_complete(ws.receive()) + json_result = json.loads(result.data) + assert json_result['txid'] == tx.id From d260e16f117a2bfc75a0fc7f03b325e802978ba2 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 10:51:00 +0200 Subject: [PATCH 12/32] Add configuration for websocket server --- bigchaindb/__init__.py | 4 ++++ bigchaindb/commands/bigchaindb.py | 4 ++++ bigchaindb/web/views/base.py | 5 ++++- bigchaindb/web/websocket_server.py | 5 ++++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 4c555e47..98e6b27b 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -59,6 +59,10 @@ config = { 'workers': None, # if none, the value will be cpu_count * 2 + 1 'threads': None, # if none, the value will be cpu_count * 2 + 1 }, + 'wsserver': { + 'host': os.environ.get('BIGCHAINDB_WSSERVER_HOST') or 'localhost', + 'port': int(os.environ.get('BIGCHAINDB_WSSERVER_PORT', 9985)), + }, 'database': _database_map[ os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb') ], diff --git a/bigchaindb/commands/bigchaindb.py b/bigchaindb/commands/bigchaindb.py index d4e37daa..a46019da 100644 --- a/bigchaindb/commands/bigchaindb.py +++ b/bigchaindb/commands/bigchaindb.py @@ -96,6 +96,10 @@ def run_configure(args, skip_if_exists=False): val = conf['server'][key] conf['server'][key] = input_on_stderr('API Server {}? (default `{}`): '.format(key, val), val) + for key in ('host', 'port'): + val = conf['wsserver'][key] + conf['wsserver'][key] = input_on_stderr('WebSocket Server {}? (default `{}`): '.format(key, val), val) + for key in database_keys: val = conf['database'][key] conf['database'][key] = input_on_stderr('Database {}? (default `{}`): '.format(key, val), val) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 5ab409b0..7b12c5bb 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -5,6 +5,9 @@ import logging from flask import jsonify, request +from bigchaindb import config + + logger = logging.getLogger(__name__) @@ -25,4 +28,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return 'ws://localhost:9985/' + return 'ws://{host}:{port}/'.format(**config['wsserver']) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index dc320754..dad06b94 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -20,6 +20,7 @@ from uuid import uuid4 import aiohttp from aiohttp import web +from bigchaindb import config from bigchaindb.events import EventTypes @@ -167,4 +168,6 @@ def start(sync_event_source, loop=None): bridge.start() app = init_app(event_source, loop=loop) - aiohttp.web.run_app(app, port=9985) + aiohttp.web.run_app(app, + host=config['wsserver']['host'], + port=config['wsserver']['port']) From be763022ad7c448cdee4629e9e5f4565d35bd7ce Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:07:05 +0200 Subject: [PATCH 13/32] Update documentation (tnx @ttmc) --- docs/server/source/drivers-clients/index.rst | 1 - docs/server/source/index.rst | 1 + .../{drivers-clients => }/websocket-event-stream-api.rst | 3 --- 3 files changed, 1 insertion(+), 4 deletions(-) rename docs/server/source/{drivers-clients => }/websocket-event-stream-api.rst (97%) diff --git a/docs/server/source/drivers-clients/index.rst b/docs/server/source/drivers-clients/index.rst index 704832c0..18894f60 100644 --- a/docs/server/source/drivers-clients/index.rst +++ b/docs/server/source/drivers-clients/index.rst @@ -15,7 +15,6 @@ community projects listed below. :maxdepth: 1 http-client-server-api - websocket-event-stream-api The Python Driver Transaction CLI diff --git a/docs/server/source/index.rst b/docs/server/source/index.rst index 6ac4b9f5..7a458934 100644 --- a/docs/server/source/index.rst +++ b/docs/server/source/index.rst @@ -11,6 +11,7 @@ BigchainDB Server Documentation nodes/index dev-and-test/index server-reference/index + websocket-event-stream-api drivers-clients/index clusters-feds/index data-models/index diff --git a/docs/server/source/drivers-clients/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst similarity index 97% rename from docs/server/source/drivers-clients/websocket-event-stream-api.rst rename to docs/server/source/websocket-event-stream-api.rst index 22effbc1..88efb7bb 100644 --- a/docs/server/source/drivers-clients/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -1,9 +1,6 @@ The WebSocket Event Stream API ============================== -.. important:: - This is currently scheduled to be implemented in BigchainDB Server 0.10. - BigchainDB provides real-time event streams over the WebSocket protocol with the Event Stream API. From aeb8827e30bd313eee756b88318c6c5f69654d19 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:07:24 +0200 Subject: [PATCH 14/32] Use try..except..else --- bigchaindb/web/websocket_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index dad06b94..a725f9ee 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -43,9 +43,10 @@ def _put_into_capped_queue(queue, value): while True: try: queue.put_nowait(value) - return except asyncio.QueueFull: queue.get_nowait() + else: + return def _multiprocessing_to_asyncio(in_queue, out_queue, loop): From be3f62dd108f021ce64d8623ba3dad3aefbd9cd3 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 7 Apr 2017 14:57:11 +0200 Subject: [PATCH 15/32] Update endpoints and docs --- bigchaindb/web/views/base.py | 2 +- bigchaindb/web/views/info.py | 5 +++-- bigchaindb/web/websocket_server.py | 2 +- docs/server/source/websocket-event-stream-api.rst | 7 ++++++- tests/test_config_utils.py | 8 ++++++++ tests/web/test_info.py | 2 +- 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/bigchaindb/web/views/base.py b/bigchaindb/web/views/base.py index 7b12c5bb..0c226d7d 100644 --- a/bigchaindb/web/views/base.py +++ b/bigchaindb/web/views/base.py @@ -28,4 +28,4 @@ def base_url(): def base_ws_uri(): """Base websocket uri.""" - return 'ws://{host}:{port}/'.format(**config['wsserver']) + return 'ws://{host}:{port}'.format(**config['wsserver']) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index b35c6378..9b084ac5 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -6,6 +6,7 @@ from flask_restful import Resource import bigchaindb from bigchaindb.web.views.base import base_url, base_ws_uri from bigchaindb import version +from bigchaindb.web.websocket_server import EVENTS_ENDPOINT class RootIndex(Resource): @@ -30,7 +31,7 @@ class RootIndex(Resource): class ApiV1Index(Resource): def get(self): api_root = base_url() + 'api/v1/' - websocket_root = base_ws_uri() + 'api/v1/' + websocket_root = base_ws_uri() + EVENTS_ENDPOINT docs_url = [ 'https://docs.bigchaindb.com/projects/server/en/v', version.__version__, @@ -43,6 +44,6 @@ class ApiV1Index(Resource): 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', # TODO: The version should probably not be hardcoded - 'streams_v1': websocket_root + 'streams/', + 'streams_v1': websocket_root, }, }) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index a725f9ee..ae7d6da2 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -26,7 +26,7 @@ from bigchaindb.events import EventTypes logger = logging.getLogger(__name__) POISON_PILL = 'POISON_PILL' -EVENTS_ENDPOINT = '/api/v1/streams/' +EVENTS_ENDPOINT = '/api/v1/streams/valid_tx' def _put_into_capped_queue(queue, value): diff --git a/docs/server/source/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst index 88efb7bb..1dedc45f 100644 --- a/docs/server/source/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -1,6 +1,11 @@ The WebSocket Event Stream API ============================== +.. important:: + The WebSocket Event Stream runs on a different port than the Web API. The + default port for the Web API is `9984`, while the one for the Event Stream + is `9985`. + BigchainDB provides real-time event streams over the WebSocket protocol with the Event Stream API. @@ -25,7 +30,7 @@ response contains a ``streams_`` property in ``_links``:: { "_links": { - "streams_v1": "ws://example.com:9984/api/v1/streams/" + "streams_v1": "ws://example.com:9985/api/v1/streams/" } } diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index 04c70325..7ee74432 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -144,6 +144,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): DATABASE_PORT = 4242 DATABASE_BACKEND = request.config.getoption('--database-backend') SERVER_BIND = '1.2.3.4:56' + WSSERVER_HOST = '1.2.3.4' + WSSERVER_PORT = 57 KEYRING = 'pubkey_0:pubkey_1:pubkey_2' file_config = { @@ -157,6 +159,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'BIGCHAINDB_DATABASE_PORT': str(DATABASE_PORT), 'BIGCHAINDB_DATABASE_BACKEND': DATABASE_BACKEND, 'BIGCHAINDB_SERVER_BIND': SERVER_BIND, + 'BIGCHAINDB_WSSERVER_HOST': WSSERVER_HOST, + 'BIGCHAINDB_WSSERVER_PORT': WSSERVER_PORT, 'BIGCHAINDB_KEYRING': KEYRING}) import bigchaindb @@ -198,6 +202,10 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'workers': None, 'threads': None, }, + 'wsserver': { + 'host': WSSERVER_HOST, + 'port': WSSERVER_PORT, + }, 'database': database, 'keypair': { 'public': None, diff --git a/tests/web/test_info.py b/tests/web/test_info.py index 4dc60168..eeb80f78 100644 --- a/tests/web/test_info.py +++ b/tests/web/test_info.py @@ -31,6 +31,6 @@ def test_api_v1_endpoint(client): 'self': 'http://localhost/api/v1/', 'statuses': 'http://localhost/api/v1/statuses/', 'transactions': 'http://localhost/api/v1/transactions/', - 'streams_v1': 'ws://localhost:9985/api/v1/streams/', + 'streams_v1': 'ws://localhost:9985/api/v1/streams/valid_tx', } } From da29bbc605caeb2f0ea9ab1ef712176b73c0ecee Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 7 Apr 2017 15:02:49 +0200 Subject: [PATCH 16/32] added tests for the events --- .../pipelines/events_consumer_example.py | 14 ----------- tests/pipelines/test_election.py | 24 +++++++++++++++++++ tests/test_events.py | 21 ++++++++++++++++ 3 files changed, 45 insertions(+), 14 deletions(-) delete mode 100644 bigchaindb/pipelines/events_consumer_example.py create mode 100644 tests/test_events.py diff --git a/bigchaindb/pipelines/events_consumer_example.py b/bigchaindb/pipelines/events_consumer_example.py deleted file mode 100644 index 7e833c82..00000000 --- a/bigchaindb/pipelines/events_consumer_example.py +++ /dev/null @@ -1,14 +0,0 @@ -import multiprocessing as mp - -from bigchaindb.events import EventHandler - - -def consume_events(events_queue): - event_handler = EventHandler(events_queue) - while True: - event = event_handler.get_event() - print('Event type: {} Event data: {}'.format(event.type, event.data)) - - -def events_consumer(events_queue): - return mp.Process(target=consume_events, args=(events_queue,)) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 3127dcaf..c3254601 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -199,3 +199,27 @@ def test_full_pipeline(b, user_pk): tx_from_block = set([tx.id for tx in invalid_block.transactions]) tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))]) assert tx_from_block == tx_from_backlog + + +def test_handle_block_events(): + from bigchaindb.events import setup_events_queue, EventTypes + + events_queue = setup_events_queue() + e = election.Election(events_queue=events_queue) + block_id = 'a' * 64 + + assert events_queue.qsize() == 0 + + # no event should be emited in case a block is undecided + e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id) + assert events_queue.qsize() == 0 + + # put an invalid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_INVALID + + # put an valid block event in the queue + e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) + event = e.event_handler.get_event() + assert event.type == EventTypes.BLOCK_VALID diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 00000000..22369b51 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,21 @@ +def tests_event_handler(): + from bigchaindb.events import (EventTypes, Event, EventHandler, + setup_events_queue) + + # create and event + event_data = {'msg': 'some data'} + event = Event(EventTypes.BLOCK_VALID, event_data) + # create the events queue + events_queue = setup_events_queue() + + # create event handler + event_handler = EventHandler(events_queue) + + # push and event to the queue + event_handler.put_event(event) + + # get the event from the queue + event_from_queue = event_handler.get_event() + + assert event_from_queue.type == event.type + assert event_from_queue.data == event.data From a673d9c6efcc2d37b72f545b2170ead2995762d3 Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 11 Apr 2017 16:34:50 +0200 Subject: [PATCH 17/32] Add more code coverage --- tests/web/test_websocket_server.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index b205fb25..ee0cfc6e 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,5 +1,6 @@ import json import random +from unittest.mock import patch import pytest import asyncio @@ -114,6 +115,19 @@ def test_capped_queue(loop): assert async_queue.qsize() == 0 +@patch('threading.Thread.start') +@patch('aiohttp.web.run_app') +@patch('bigchaindb.web.websocket_server.init_app') +@patch('asyncio.get_event_loop', return_value='event-loop') +@patch('asyncio.Queue', return_value='event-queue') +def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, init_app_mock, run_app_mock, thread_start_mock): + from bigchaindb.web.websocket_server import start + + start(None) + + init_app_mock.assert_called_with('event-queue', loop='event-loop') + + @asyncio.coroutine def test_websocket_string_event(test_client, loop): from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT From 79997848cd469fe75c237c1f97312c34f5f1c2f5 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 17:21:25 +0200 Subject: [PATCH 18/32] Refine test for the election pipeline process test that the process is started with the events_queue kwargs --- tests/test_processes.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_processes.py b/tests/test_processes.py index 00716010..e6503541 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -9,8 +9,9 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -def test_processes_start(mock_process, mock_vote, mock_block, mock_election, - mock_stale): +@patch('bigchaindb.events.setup_events_queue', spec_set=True, autospec=True) +def test_processes_start(mock_setup_events_queue, mock_process, mock_vote, + mock_block, mock_election, mock_stale): from bigchaindb import processes processes.start() @@ -19,5 +20,5 @@ def test_processes_start(mock_process, mock_vote, mock_block, mock_election, mock_block.assert_called_with() mock_stale.assert_called_with() mock_process.assert_called_with() - # the events queue is declared inside processes.start() - assert mock_election.call_count == 1 + mock_election.assert_called_once_with( + events_queue=mock_setup_events_queue.return_value) From e0e997755e8666dc495a1e0c15c831437baf7731 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 18:31:56 +0200 Subject: [PATCH 19/32] Re-order imports (pep8) --- tests/web/test_websocket_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index ee0cfc6e..55564ec2 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,9 +1,9 @@ +import asyncio import json import random from unittest.mock import patch import pytest -import asyncio from bigchaindb.models import Transaction From 98e52e047e866027d0210c7d1d6749414afc4e35 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 11 Apr 2017 18:32:21 +0200 Subject: [PATCH 20/32] Make utility test function into a fixture --- tests/web/test_websocket_server.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 55564ec2..403b037d 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -8,7 +8,9 @@ import pytest from bigchaindb.models import Transaction -def create_block(b, total=1): +@pytest.fixture +def _block(b, request): + total = getattr(request, 'param', 1) transactions = [ Transaction.create( [b.me], @@ -154,7 +156,8 @@ def test_websocket_string_event(test_client, loop): @asyncio.coroutine -def test_websocket_block_event(b, test_client, loop): +@pytest.mark.parametrize('_block', (10,), indirect=('_block',), ids=('block',)) +def test_websocket_block_event(b, _block, test_client, loop): from bigchaindb import events from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT @@ -162,7 +165,7 @@ def test_websocket_block_event(b, test_client, loop): app = init_app(event_source, loop=loop) client = yield from test_client(app) ws = yield from client.ws_connect(EVENTS_ENDPOINT) - block = create_block(b, 10).to_dict() + block = _block.to_dict() block_event = events.Event(events.EventTypes.BLOCK_VALID, block) yield from event_source.put(block_event) From 75dd645ec9dfdc2b39918bce11c2a31c215b2b75 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:47:58 +0200 Subject: [PATCH 21/32] Import stdlib pkgs at the top of the test module --- tests/web/test_websocket_server.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 403b037d..3b3f2e39 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -1,6 +1,9 @@ import asyncio import json +import queue import random +import threading +import time from unittest.mock import patch import pytest @@ -32,8 +35,6 @@ class MockWebSocket: @asyncio.coroutine def test_bridge_sync_async_queue(loop): - import queue - import threading from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() @@ -87,9 +88,6 @@ def test_put_into_capped_queue(loop): @asyncio.coroutine def test_capped_queue(loop): - import queue - import threading - import time from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() From e614834a0360ce8e474fb5334972710a99e2821d Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:49:10 +0200 Subject: [PATCH 22/32] Import Transaction class within fixture --- tests/web/test_websocket_server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 3b3f2e39..4323685a 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -8,11 +8,10 @@ from unittest.mock import patch import pytest -from bigchaindb.models import Transaction - @pytest.fixture def _block(b, request): + from bigchaindb.models import Transaction total = getattr(request, 'param', 1) transactions = [ Transaction.create( From 0347fbccf49d95c3add28ce9339d58025aad439d Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 13:50:09 +0200 Subject: [PATCH 23/32] Add a few more checks to the test --- tests/web/test_websocket_server.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 4323685a..13015dbb 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -114,17 +114,30 @@ def test_capped_queue(loop): assert async_queue.qsize() == 0 -@patch('threading.Thread.start') +@patch('threading.Thread') @patch('aiohttp.web.run_app') @patch('bigchaindb.web.websocket_server.init_app') @patch('asyncio.get_event_loop', return_value='event-loop') @patch('asyncio.Queue', return_value='event-queue') -def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, init_app_mock, run_app_mock, thread_start_mock): - from bigchaindb.web.websocket_server import start +def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, + init_app_mock, run_app_mock, + thread_mock): + from bigchaindb import config + from bigchaindb.web.websocket_server import start, _multiprocessing_to_asyncio start(None) - + thread_mock.assert_called_once_with( + target=_multiprocessing_to_asyncio, + args=(None, queue_mock.return_value, get_event_loop_mock.return_value), + daemon=True, + ) + thread_mock.return_value.start.assert_called_once_with() init_app_mock.assert_called_with('event-queue', loop='event-loop') + run_app_mock.assert_called_once_with( + init_app_mock.return_value, + host=config['wsserver']['host'], + port=config['wsserver']['port'], + ) @asyncio.coroutine From 2bedc9b059a3ff7d25e574978f927c977d05f4c1 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 12 Apr 2017 14:39:15 +0200 Subject: [PATCH 24/32] Fix typos --- tests/pipelines/test_election.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index c3254601..f0dd232d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -210,7 +210,7 @@ def test_handle_block_events(): assert events_queue.qsize() == 0 - # no event should be emited in case a block is undecided + # no event should be emitted in case a block is undecided e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id) assert events_queue.qsize() == 0 @@ -219,7 +219,7 @@ def test_handle_block_events(): event = e.event_handler.get_event() assert event.type == EventTypes.BLOCK_INVALID - # put an valid block event in the queue + # put a valid block event in the queue e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) event = e.event_handler.get_event() assert event.type == EventTypes.BLOCK_VALID From 4c9adededd558a4c4d29965e11c60b86da4bdafe Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 12 Apr 2017 15:54:11 +0200 Subject: [PATCH 25/32] Remove TODO --- bigchaindb/web/views/info.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigchaindb/web/views/info.py b/bigchaindb/web/views/info.py index 9b084ac5..51b59643 100644 --- a/bigchaindb/web/views/info.py +++ b/bigchaindb/web/views/info.py @@ -43,7 +43,6 @@ class ApiV1Index(Resource): 'self': api_root, 'statuses': api_root + 'statuses/', 'transactions': api_root + 'transactions/', - # TODO: The version should probably not be hardcoded 'streams_v1': websocket_root, }, }) From a7ed28e539a1ff605a5f5954efeabb078eb9bf26 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 16:12:41 +0200 Subject: [PATCH 26/32] Test command helper _run_init --- tests/commands/test_commands.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index 6fb424d6..fa3ecf42 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -130,6 +130,22 @@ def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): run_init(args) +def test__run_init(mocker): + from bigchaindb.commands.bigchaindb import _run_init + bigchain_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.bigchaindb.Bigchain') + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + _run_init() + bigchain_mock.assert_called_once_with() + init_db_mock.assert_called_once_with( + connection=bigchain_mock.return_value.connection) + bigchain_mock.return_value.create_genesis_block.assert_called_once_with() + + @patch('bigchaindb.backend.schema.drop_database') def test_drop_db_when_assumed_yes(mock_db_drop): from bigchaindb.commands.bigchaindb import run_drop From 303e12ee280befb2cbe0bc707c5b62f7ef896066 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Wed, 12 Apr 2017 16:38:18 +0200 Subject: [PATCH 27/32] Test command run_init when db already exists --- tests/commands/test_commands.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index fa3ecf42..087e1afe 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -124,10 +124,23 @@ def test_bigchain_export_my_pubkey_when_pubkey_not_set(monkeypatch): "This node's public key wasn't set anywhere so it can't be exported" -def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): +def test_bigchain_run_init_when_db_exists(mocker, capsys): from bigchaindb.commands.bigchaindb import run_init + from bigchaindb.common.exceptions import DatabaseAlreadyExists + init_db_mock = mocker.patch( + 'bigchaindb.commands.bigchaindb.schema.init_database', + autospec=True, + spec_set=True, + ) + init_db_mock.side_effect = DatabaseAlreadyExists args = Namespace(config=None) run_init(args) + output_message = capsys.readouterr()[1] + print(output_message) + assert output_message == ( + 'The database already exists.\n' + 'If you wish to re-initialize it, first drop it.\n' + ) def test__run_init(mocker): From 414d915033c9e37476a37449a3899abc6a69ba7d Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 13 Apr 2017 08:54:34 +0200 Subject: [PATCH 28/32] Snakecaseify keys --- bigchaindb/web/websocket_server.py | 6 +++--- docs/server/source/websocket-event-stream-api.rst | 6 +++--- tests/web/test_websocket_server.py | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index ae7d6da2..5507f504 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -109,9 +109,9 @@ class Dispatcher: for tx in block['block']['transactions']: asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] - data = {'blockid': block['id'], - 'assetid': asset_id, - 'txid': tx['id']} + data = {'block_id': block['id'], + 'asset_id': asset_id, + 'tx_id': tx['id']} str_buffer.append(json.dumps(data)) for _, websocket in self.subscribers.items(): diff --git a/docs/server/source/websocket-event-stream-api.rst b/docs/server/source/websocket-event-stream-api.rst index 1dedc45f..3ce86553 100644 --- a/docs/server/source/websocket-event-stream-api.rst +++ b/docs/server/source/websocket-event-stream-api.rst @@ -82,9 +82,9 @@ the transaction's ID, associated asset ID, and containing block's ID. Example message:: { - "txid": "", - "assetid": "", - "blockid": "" + "tx_id": "", + "asset_id": "", + "block_id": "" } diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 13015dbb..6484ef4e 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -183,10 +183,10 @@ def test_websocket_block_event(b, _block, test_client, loop): for tx in block['block']['transactions']: result = yield from ws.receive() json_result = json.loads(result.data) - assert json_result['txid'] == tx['id'] + assert json_result['tx_id'] == tx['id'] # Since the transactions are all CREATEs, asset id == transaction id - assert json_result['assetid'] == tx['id'] - assert json_result['blockid'] == block['id'] + assert json_result['asset_id'] == tx['id'] + assert json_result['block_id'] == block['id'] yield from event_source.put(POISON_PILL) @@ -235,4 +235,4 @@ def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): result = loop.run_until_complete(ws.receive()) json_result = json.loads(result.data) - assert json_result['txid'] == tx.id + assert json_result['tx_id'] == tx.id From aa4d532e47230ba01b1c72910e4b1cbf9bb8dd1d Mon Sep 17 00:00:00 2001 From: Troy McConaghy Date: Sun, 16 Apr 2017 21:22:12 +0200 Subject: [PATCH 29/32] added docs re enforcing max tx size with a reverse proxy --- .../source/data-models/inputs-outputs.rst | 5 +- docs/server/source/production-nodes/index.rst | 2 + .../production-nodes/reverse-proxy-notes.md | 72 +++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 docs/server/source/production-nodes/reverse-proxy-notes.md diff --git a/docs/server/source/data-models/inputs-outputs.rst b/docs/server/source/data-models/inputs-outputs.rst index e81aa3b2..5ad360ec 100644 --- a/docs/server/source/data-models/inputs-outputs.rst +++ b/docs/server/source/data-models/inputs-outputs.rst @@ -26,7 +26,10 @@ When one creates a condition, one can calculate its fulfillment length (e.g. 96). The more complex the condition, the larger its fulfillment length will be. A BigchainDB federation can put an upper limit on the complexity of the conditions, either directly by setting an allowed maximum fulfillment length, -or indirectly by setting a maximum allowed transaction size which would limit +or +`indirectly `_ +by :ref:`setting a maximum allowed transaction size ` +which would limit the overall complexity accross all inputs and outputs of a transaction. If someone tries to make a condition where the output of a threshold condition feeds into the input of another “earlier” threshold condition (i.e. in a closed logical circuit), then their computer will take forever to calculate the (infinite) “condition URI”, at least in theory. In practice, their computer will run out of memory or their client software will timeout after a while. diff --git a/docs/server/source/production-nodes/index.rst b/docs/server/source/production-nodes/index.rst index 7b42cbaa..4a9cb15b 100644 --- a/docs/server/source/production-nodes/index.rst +++ b/docs/server/source/production-nodes/index.rst @@ -8,3 +8,5 @@ Production Nodes node-components node-requirements setup-run-node + reverse-proxy-notes + \ No newline at end of file diff --git a/docs/server/source/production-nodes/reverse-proxy-notes.md b/docs/server/source/production-nodes/reverse-proxy-notes.md new file mode 100644 index 00000000..18930942 --- /dev/null +++ b/docs/server/source/production-nodes/reverse-proxy-notes.md @@ -0,0 +1,72 @@ +# Using a Reverse Proxy + +You may want to: + +* rate limit inbound HTTP requests, +* authenticate/authorize inbound HTTP requests, +* block requests with an HTTP request body that's too large, or +* enable HTTPS (TLS) between your users and your node. + +While we could have built all that into BigchainDB Server, +we didn't, because you can do all that (and more) +using a reverse proxy such as NGINX or HAProxy. +(You would put it in front of your BigchainDB Server, +so that all inbound HTTP requests would arrive +at the reverse proxy before *maybe* being proxied +onwards to your BigchainDB Server.) +For detailed instructions, see the documentation +for your reverse proxy. + +Below, we note how a reverse proxy can be used +to do some BigchainDB-specific things. + +You may also be interested in +[our NGINX configuration file template](https://github.com/bigchaindb/nginx_3scale/blob/master/nginx.conf.template) +(open source, on GitHub). + + +## Enforcing a Max Transaction Size + +The BigchainDB HTTP API has several endpoints, +but only one of them, the `POST /transactions` endpoint, +expects a non-empty HTTP request body: +the transaction (JSON) being submitted by the user. + +If you want to enforce a maximum-allowed transaction size +(discarding any that are larger), +then you can do so by configuring a maximum request body size +in your reverse proxy. +For example, NGINX has the `client_max_body_size` +configuration setting. You could set it to 15 kB +with the following line in your NGINX config file: + +```text +client_max_body_size 15k; +``` + +For more information, see +[the NGINX docs about client_max_body_size](https://nginx.org/en/docs/http/ngx_http_core_module.html#client_max_body_size). + +Note: By enforcing a maximum transaction size, you +[indirectly enforce a maximum crypto-conditions complexity](https://github.com/bigchaindb/bigchaindb/issues/356#issuecomment-288085251). + + +**Aside: Why 15 kB?** + +Both [RethinkDB](https://rethinkdb.com/limitations/) and +[MongoDB have a maximum document size of 16 MB](https://docs.mongodb.com/manual/reference/limits/#limit-bson-document-size). +In BigchainDB, the biggest documents are the blocks. +A BigchainDB block can contain up to 1000 transactions, +plus some other data (e.g. the timestamp). +If we ignore the other data as negligible relative to all the transactions, +then a block of size 16 MB +will have an average transaction size of (16 MB)/1000 = 16 kB. +Therefore by limiting the max transaction size to 15 kB, +you can be fairly sure that no blocks will ever be +bigger than 16 MB. + +Note: Technically, the documents that MongoDB stores aren't the JSON +that BigchainDB users think of; they're JSON converted to BSON. +Moreover, [one can use GridFS with MongoDB to store larger documents](https://docs.mongodb.com/manual/core/gridfs/). +Therefore the above calculation shoud be seen as a rough guide, +not the last word. From 0ec29abd24f372f81c302aecedad0437247de6ba Mon Sep 17 00:00:00 2001 From: Troy McConaghy Date: Sun, 16 Apr 2017 21:57:05 +0200 Subject: [PATCH 30/32] docs: added note re only real way to limit CC complexity today --- docs/server/source/data-models/inputs-outputs.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/server/source/data-models/inputs-outputs.rst b/docs/server/source/data-models/inputs-outputs.rst index 5ad360ec..4309a4c8 100644 --- a/docs/server/source/data-models/inputs-outputs.rst +++ b/docs/server/source/data-models/inputs-outputs.rst @@ -25,12 +25,16 @@ The (single) output of a threshold condition can be used as one of the inputs of When one creates a condition, one can calculate its fulfillment length (e.g. 96). The more complex the condition, the larger its fulfillment length will be. A BigchainDB federation can put an upper limit on the complexity of the -conditions, either directly by setting an allowed maximum fulfillment length, +conditions, either directly by setting a maximum allowed fulfillment length, or `indirectly `_ by :ref:`setting a maximum allowed transaction size ` which would limit the overall complexity accross all inputs and outputs of a transaction. +Note: At the time of writing, there was no configuration setting +to set a maximum allowed fulfillment length, +so the only real option was to +:ref:`set a maximum allowed transaction size `. If someone tries to make a condition where the output of a threshold condition feeds into the input of another “earlier” threshold condition (i.e. in a closed logical circuit), then their computer will take forever to calculate the (infinite) “condition URI”, at least in theory. In practice, their computer will run out of memory or their client software will timeout after a while. From a65c8799dd8b6c86a352d0ec28bddd41ea4908c9 Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Thu, 13 Apr 2017 16:18:02 +0200 Subject: [PATCH 31/32] Document gunicorn loglevel setting --- docs/server/source/server-reference/configuration.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/server/source/server-reference/configuration.md b/docs/server/source/server-reference/configuration.md index 91fa4efb..15726659 100644 --- a/docs/server/source/server-reference/configuration.md +++ b/docs/server/source/server-reference/configuration.md @@ -17,6 +17,7 @@ For convenience, here's a list of all the relevant environment variables (docume `BIGCHAINDB_DATABASE_NAME`
`BIGCHAINDB_DATABASE_REPLICASET`
`BIGCHAINDB_SERVER_BIND`
+`BIGCHAINDB_SERVER_LOGLEVEL`
`BIGCHAINDB_SERVER_WORKERS`
`BIGCHAINDB_SERVER_THREADS`
`BIGCHAINDB_CONFIG_PATH`
@@ -121,17 +122,22 @@ If you used `bigchaindb -y configure mongodb` to create a default local config f ``` -## server.bind, server.workers & server.threads +## server.bind, server.loglevel, server.workers & server.threads These settings are for the [Gunicorn HTTP server](http://gunicorn.org/), which is used to serve the [HTTP client-server API](../drivers-clients/http-client-server-api.html). `server.bind` is where to bind the Gunicorn HTTP server socket. It's a string. It can be any valid value for [Gunicorn's bind setting](http://docs.gunicorn.org/en/stable/settings.html#bind). If you want to allow IPv4 connections from anyone, on port 9984, use '0.0.0.0:9984'. In a production setting, we recommend you use Gunicorn behind a reverse proxy server. If Gunicorn and the reverse proxy are running on the same machine, then use 'localhost:PORT' where PORT is _not_ 9984 (because the reverse proxy needs to listen on port 9984). Maybe use PORT=9983 in that case because we know 9983 isn't used. If Gunicorn and the reverse proxy are running on different machines, then use 'A.B.C.D:9984' where A.B.C.D is the IP address of the reverse proxy. There's [more information about deploying behind a reverse proxy in the Gunicorn documentation](http://docs.gunicorn.org/en/stable/deploy.html). (They call it a proxy.) +`server.loglevel` sets the log level of Gunicorn's Error log outputs. See +[Gunicorn's documentation](http://docs.gunicorn.org/en/latest/settings.html#loglevel) +for more information. + `server.workers` is [the number of worker processes](http://docs.gunicorn.org/en/stable/settings.html#workers) for handling requests. If `None` (the default), the value will be (cpu_count * 2 + 1). `server.threads` is [the number of threads-per-worker](http://docs.gunicorn.org/en/stable/settings.html#threads) for handling requests. If `None` (the default), the value will be (cpu_count * 2 + 1). The HTTP server will be able to handle `server.workers` * `server.threads` requests simultaneously. **Example using environment variables** ```text export BIGCHAINDB_SERVER_BIND=0.0.0.0:9984 +export BIGCHAINDB_SERVER_LOGLEVEL=debug export BIGCHAINDB_SERVER_WORKERS=5 export BIGCHAINDB_SERVER_THREADS=5 ``` @@ -140,6 +146,7 @@ export BIGCHAINDB_SERVER_THREADS=5 ```js "server": { "bind": "0.0.0.0:9984", + "loglevel": "debug", "workers": 5, "threads": 5 } @@ -149,6 +156,7 @@ export BIGCHAINDB_SERVER_THREADS=5 ```js "server": { "bind": "localhost:9984", + "loglevel": "info", "workers": null, "threads": null } From 6921b1386c5c4dbfce72bd7cf693c1d2b5a5dc8d Mon Sep 17 00:00:00 2001 From: Troy McConaghy Date: Tue, 18 Apr 2017 10:53:27 +0200 Subject: [PATCH 32/32] docs: noted that rethinkdb doesn't use database.connection_timeout setting yet --- docs/server/source/server-reference/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/server/source/server-reference/configuration.md b/docs/server/source/server-reference/configuration.md index 42f22d4e..32672129 100644 --- a/docs/server/source/server-reference/configuration.md +++ b/docs/server/source/server-reference/configuration.md @@ -97,7 +97,7 @@ The settings with names of the form `database.*` are for the database backend * `database.port` is self-explanatory. * `database.name` is a user-chosen name for the database inside RethinkDB or MongoDB, e.g. `bigchain`. * `database.replicaset` is only relevant if using MongoDB; it's the name of the MongoDB replica set, e.g. `bigchain-rs`. -* `database.connection_timeout` is the maximum number of milliseconds that BigchainDB will wait before giving up on one attempt to connect to the database backend. +* `database.connection_timeout` is the maximum number of milliseconds that BigchainDB will wait before giving up on one attempt to connect to the database backend. Note: At the time of writing, this setting was only used by MongoDB; there was an open [issue to make RethinkDB use it as well](https://github.com/bigchaindb/bigchaindb/issues/1337). * `database.max_tries` is the maximum number of times that BigchainDB will try to establish a connection with the database backend. If 0, then it will try forever. **Example using environment variables**