diff --git a/bigchaindb/tendermint/commands.py b/bigchaindb/tendermint/commands.py index 5f2fa2c8..8ee39edb 100644 --- a/bigchaindb/tendermint/commands.py +++ b/bigchaindb/tendermint/commands.py @@ -6,7 +6,9 @@ from os import getenv import bigchaindb from bigchaindb.tendermint.lib import BigchainDB from bigchaindb.tendermint.core import App -from bigchaindb.web import server +from bigchaindb.web import server, websocket_server +from bigchaindb.tendermint import event_stream +from bigchaindb.events import Exchange, EventTypes logger = logging.getLogger(__name__) @@ -26,6 +28,10 @@ BANNER = """ def start(): + + # Exchange object for event stream api + exchange = Exchange() + # start the web api app_server = server.create_server( settings=bigchaindb.config['server'], @@ -44,6 +50,18 @@ def start(): '--consensus.create_empty_blocks=false', ]) + # start websocket server + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),)) + p_websocket_server.start() + + # connect to tendermint event stream + p_websocket_client = mp.Process(name='ws_to_tendermint', + target=event_stream.start, + args=(exchange.get_publisher_queue(),)) + p_websocket_client.start() + # We need to import this after spawning the web server # because import ABCIServer will monkeypatch all sockets # for gevent. diff --git a/bigchaindb/tendermint/event_stream.py b/bigchaindb/tendermint/event_stream.py new file mode 100644 index 00000000..f8f01346 --- /dev/null +++ b/bigchaindb/tendermint/event_stream.py @@ -0,0 +1,87 @@ +import json +import logging +import time + +import asyncio +import aiohttp + +from bigchaindb.common.utils import gen_timestamp +from bigchaindb.events import EventTypes, Event +from bigchaindb.tendermint.utils import decode_transaction_base64 + + +HOST = 'localhost' +PORT = 46657 +URL = f'ws://{HOST}:{PORT}/websocket' + +logger = logging.getLogger(__name__) + + +@asyncio.coroutine +def connect_and_recv(event_queue): + session = aiohttp.ClientSession() + ws = yield from session.ws_connect(URL) + + logger.info('Connected to tendermint ws server') + + stream_id = "bigchaindb_stream_{}".format(gen_timestamp()) + yield from subscribe_events(ws, stream_id) + + while True: + msg = yield from ws.receive() + process_event(event_queue, msg.data, stream_id) + + if msg.type in (aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR): + session.close() + raise aiohttp.ClientConnectionError() + + +def process_event(event_queue, event, stream_id): + event_stream_id = stream_id + '#event' + event = json.loads(event) + + if (event['id'] == event_stream_id and event['result']['name'] == 'NewBlock'): + block = event['result']['data']['data']['block'] + block_id = block['header']['height'] + block_txs = block['data']['txs'] + + # Only push non empty blocks + if block_txs: + block_txs = [decode_transaction_base64(txn) for txn in block_txs] + new_block = {'id': str(block_id), 'transactions': block_txs} + event = Event(EventTypes.BLOCK_VALID, new_block) + event_queue.put(event) + + +@asyncio.coroutine +def subscribe_events(ws, stream_id): + payload = { + "method": "subscribe", + "jsonrpc": "2.0", + "params": ["NewBlock"], + "id": stream_id + } + yield from ws.send_str(json.dumps(payload)) + + +@asyncio.coroutine +def try_connect_and_recv(event_queue, max_tries): + try: + yield from connect_and_recv(event_queue) + + except Exception as e: + if max_tries: + logger.warning('WebSocket connection failed with exception %s', e) + time.sleep(2) + yield from try_connect_and_recv(event_queue, max_tries-1) + else: + logger.exception('WebSocket connection failed with exception %s', e) + + +def start(event_queue): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(try_connect_and_recv(event_queue, 5)) + except (KeyboardInterrupt, SystemExit): + logger.info('Shutting down Tendermint event stream connection') diff --git a/bigchaindb/tendermint/utils.py b/bigchaindb/tendermint/utils.py index 3deb8b2c..b84954a5 100644 --- a/bigchaindb/tendermint/utils.py +++ b/bigchaindb/tendermint/utils.py @@ -15,6 +15,12 @@ def decode_transaction(raw): return json.loads(raw.decode('utf8')) +def decode_transaction_base64(value): + """Decode a transaction from Base64.""" + + return json.loads(base64.b64decode(value.encode('utf8')).decode('utf8')) + + def calculate_hash(key_list): if not key_list: return '' diff --git a/tests/tendermint/test_event_stream.py b/tests/tendermint/test_event_stream.py new file mode 100644 index 00000000..da5c4c6f --- /dev/null +++ b/tests/tendermint/test_event_stream.py @@ -0,0 +1,64 @@ +from queue import Queue + +import pytest + + +@pytest.mark.tendermint +def test_process_event_new_block(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"id": "test_stream_id#event", "jsonrpc": "2.0", "result":'\ + ' {"data": {"data": {"block": {"data": {"txs": ["eyJpbnB1dHMiOiBb'\ + 'eyJvd25lcnNfYmVmb3JlIjogWyJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4'\ + 'TWdyWmlib0gyeUZvYzVwTCJdLCAiZnVsZmlsbHMiOiBudWxsLCAiZnVsZmlsbG1l'\ + 'bnQiOiAicEdTQUlKMER2S2JBeXkyQ2hqT212ZWVCc0FxWktTS0k3VDNWZGhtUkI2'\ + 'V2dhdzdoZ1VDUHluUnFuQW9RWDh2UlNXeXNwYk5uYWVBaVpOU19lQ3V6ejhDZWtJ'\ + 'OHBIejJnekExeDJkOF93NTUzWFVOUGJFbnpBUzhncURqeDFkaE1JeDM1ZnpVTCJ9'\ + 'XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiQlp2S0JjUlJoMndLTjhm'\ + 'bkxDZWVHM0hoRWlheE1nclppYm9IMnlGb2M1cEwiXSwgImNvbmRpdGlvbiI6IHsi'\ + 'ZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2tl'\ + 'eSI6ICJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4TWdyWmlib0gyeUZvYzVw'\ + 'TCJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7eHVFX1ZPNjd6aHc0LTRjN0k1YUtm'\ + 'WGtzX1Q1MjUwMnBuOC1mcVJQQkloRT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9'\ + 'MTMxMDcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUi'\ + 'LCAibWV0YWRhdGEiOiB7InNob3J0IjogImxpdHRsZSJ9LCAiYXNzZXQiOiB7ImRh'\ + 'dGEiOiB7ImJpY3ljbGUiOiB7InNlcmlhbF9udW1iZXIiOiAiYWJjZDEyMzQiLCAi'\ + 'bWFudWZhY3R1cmVyIjogImJrZmFiIn19fSwgInZlcnNpb24iOiAiMS4wIiwgImlk'\ + 'IjogIjE4NzM3Yzc0OWQxZGE2Yzc5YjFmYWZiZjkwOTkwNzEwMDA1ZWM4MTYxNGQ5'\ + 'YWFiNDkyZTgwYTkzNWRkYThjMzAifQ=="]}, "header": {"height": 1}}},'\ + ' "type": "new_block"}, "name": "NewBlock"}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert not event_queue.empty() + + +@pytest.mark.tendermint +def test_process_event_empty_block(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\ + '"result": {"name": "NewBlock", "data": {"type": "new_block",'\ + ' "data": {"block": {"header": {"chain_id": "test-chain-cbVRwC",'\ + ' "height": 1, "time": "2017-12-04T22:42:54.33+05:30", "num_txs": 0,'\ + ' "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\ + ' "last_commit_hash": "", "data_hash": "",'\ + ' "validators_hash": "ACF23A690EB72D051931E878E8F3D6E01A17A81C",'\ + ' "app_hash": ""}, "data": {"txs": []}, "last_commit": {"blockID": '\ + ' {"hash": "", "parts": {"total": 0, "hash": ""}}, "precommits": []}}}}}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert event_queue.empty() + + +@pytest.mark.tendermint +def test_process_unknown_event(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\ + ' "result": {"name": "UnknownEvent"}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert event_queue.empty() diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index d071f9e7..4545fb59 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -7,6 +7,8 @@ from unittest.mock import patch import pytest +pytestmark = pytest.mark.tendermint + @pytest.fixture def _block(b, request):