From f5b887ac598fcc9274e9d7d0c0c5129c3b9c9639 Mon Sep 17 00:00:00 2001 From: kansi Date: Wed, 6 Dec 2017 15:42:45 +0530 Subject: [PATCH 1/2] Integrate events api --- bigchaindb/tendermint/commands.py | 20 ++++++- bigchaindb/tendermint/event_stream.py | 84 +++++++++++++++++++++++++++ bigchaindb/tendermint/utils.py | 6 ++ tests/tendermint/test_event_stream.py | 64 ++++++++++++++++++++ tests/web/test_websocket_server.py | 2 + 5 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 bigchaindb/tendermint/event_stream.py create mode 100644 tests/tendermint/test_event_stream.py diff --git a/bigchaindb/tendermint/commands.py b/bigchaindb/tendermint/commands.py index 5f2fa2c8..3e007150 100644 --- a/bigchaindb/tendermint/commands.py +++ b/bigchaindb/tendermint/commands.py @@ -6,7 +6,9 @@ from os import getenv import bigchaindb from bigchaindb.tendermint.lib import BigchainDB from bigchaindb.tendermint.core import App -from bigchaindb.web import server +from bigchaindb.web import server, websocket_server +from bigchaindb.tendermint import event_stream +from bigchaindb.events import Exchange, EventTypes logger = logging.getLogger(__name__) @@ -26,6 +28,10 @@ BANNER = """ def start(): + + # Exchange object for event stream api + exchange = Exchange() + # start the web api app_server = server.create_server( settings=bigchaindb.config['server'], @@ -44,6 +50,18 @@ def start(): '--consensus.create_empty_blocks=false', ]) + # start websocket server + p_websocket_server = mp.Process(name='ws', + target=websocket_server.start, + args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),)) + p_websocket_server.start() + + # connect to tendermint event stream + p_websocket_client = mp.Process(name='ws_client', + target=event_stream.start, + args=(exchange.get_publisher_queue(),)) + p_websocket_client.start() + # We need to import this after spawning the web server # because import ABCIServer will monkeypatch all sockets # for gevent. diff --git a/bigchaindb/tendermint/event_stream.py b/bigchaindb/tendermint/event_stream.py new file mode 100644 index 00000000..2f647139 --- /dev/null +++ b/bigchaindb/tendermint/event_stream.py @@ -0,0 +1,84 @@ +import json +import logging +import time + +import asyncio +import aiohttp + +from bigchaindb.common.utils import gen_timestamp +from bigchaindb.events import EventTypes, Event +from bigchaindb.tendermint.utils import decode_txn_base64 + + +HOST = 'localhost' +PORT = 46657 +URL = f'ws://{HOST}:{PORT}/websocket' + +PAYLOAD = { + "method": "subscribe", + "jsonrpc": "2.0", + "params": ["NewBlock"], + "id": "bigchaindb_stream" +} + + +logger = logging.getLogger(__name__) + + +async def connect_and_recv(event_queue): + session = aiohttp.ClientSession() + async with session.ws_connect(URL) as ws: + logger.info('Connected to tendermint ws server') + + stream_id = "bigchaindb_stream_{}".format(gen_timestamp()) + await subscribe_events(ws, stream_id) + + async for msg in ws: + process_event(event_queue, msg.data, stream_id) + + if msg.type in (aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR): + session.close() + raise aiohttp.ClientConnectionError() + + +def process_event(event_queue, event, stream_id): + event_stream_id = stream_id + '#event' + event = json.loads(event) + + if (event['id'] == event_stream_id and event['result']['name'] == 'NewBlock'): + block = event['result']['data']['data']['block'] + block_id = block['header']['height'] + block_txs = block['data']['txs'] + + # Only push non empty blocks + if block_txs: + block_txs = [json.loads(decode_txn_base64(txn)) for txn in block_txs] + new_block = {'id': str(block_id), 'transactions': block_txs} + event = Event(EventTypes.BLOCK_VALID, new_block) + event_queue.put(event) + + +async def subscribe_events(ws, stream_id): + PAYLOAD["id"] = stream_id + await ws.send_str(json.dumps(PAYLOAD)) + + +async def try_connect_and_recv(event_queue, gas): + try: + await connect_and_recv(event_queue) + + except Exception as e: + logger.error('WebSocket connection failed with exception: {}'.format(e)) + if gas: + time.sleep(2) + await try_connect_and_recv(event_queue, gas-1) + + +def start(event_queue): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(try_connect_and_recv(event_queue, 5)) + except (KeyboardInterrupt, SystemExit): + logger.info('Shutting down tendermint event stream connection') + return diff --git a/bigchaindb/tendermint/utils.py b/bigchaindb/tendermint/utils.py index 3deb8b2c..5208e801 100644 --- a/bigchaindb/tendermint/utils.py +++ b/bigchaindb/tendermint/utils.py @@ -15,6 +15,12 @@ def decode_transaction(raw): return json.loads(raw.decode('utf8')) +def decode_txn_base64(value): + """Decode a transaction from Base64.""" + + return base64.b64decode(value.encode('utf8')).decode('utf8') + + def calculate_hash(key_list): if not key_list: return '' diff --git a/tests/tendermint/test_event_stream.py b/tests/tendermint/test_event_stream.py new file mode 100644 index 00000000..da5c4c6f --- /dev/null +++ b/tests/tendermint/test_event_stream.py @@ -0,0 +1,64 @@ +from queue import Queue + +import pytest + + +@pytest.mark.tendermint +def test_process_event_new_block(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"id": "test_stream_id#event", "jsonrpc": "2.0", "result":'\ + ' {"data": {"data": {"block": {"data": {"txs": ["eyJpbnB1dHMiOiBb'\ + 'eyJvd25lcnNfYmVmb3JlIjogWyJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4'\ + 'TWdyWmlib0gyeUZvYzVwTCJdLCAiZnVsZmlsbHMiOiBudWxsLCAiZnVsZmlsbG1l'\ + 'bnQiOiAicEdTQUlKMER2S2JBeXkyQ2hqT212ZWVCc0FxWktTS0k3VDNWZGhtUkI2'\ + 'V2dhdzdoZ1VDUHluUnFuQW9RWDh2UlNXeXNwYk5uYWVBaVpOU19lQ3V6ejhDZWtJ'\ + 'OHBIejJnekExeDJkOF93NTUzWFVOUGJFbnpBUzhncURqeDFkaE1JeDM1ZnpVTCJ9'\ + 'XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiQlp2S0JjUlJoMndLTjhm'\ + 'bkxDZWVHM0hoRWlheE1nclppYm9IMnlGb2M1cEwiXSwgImNvbmRpdGlvbiI6IHsi'\ + 'ZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2tl'\ + 'eSI6ICJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4TWdyWmlib0gyeUZvYzVw'\ + 'TCJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7eHVFX1ZPNjd6aHc0LTRjN0k1YUtm'\ + 'WGtzX1Q1MjUwMnBuOC1mcVJQQkloRT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9'\ + 'MTMxMDcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUi'\ + 'LCAibWV0YWRhdGEiOiB7InNob3J0IjogImxpdHRsZSJ9LCAiYXNzZXQiOiB7ImRh'\ + 'dGEiOiB7ImJpY3ljbGUiOiB7InNlcmlhbF9udW1iZXIiOiAiYWJjZDEyMzQiLCAi'\ + 'bWFudWZhY3R1cmVyIjogImJrZmFiIn19fSwgInZlcnNpb24iOiAiMS4wIiwgImlk'\ + 'IjogIjE4NzM3Yzc0OWQxZGE2Yzc5YjFmYWZiZjkwOTkwNzEwMDA1ZWM4MTYxNGQ5'\ + 'YWFiNDkyZTgwYTkzNWRkYThjMzAifQ=="]}, "header": {"height": 1}}},'\ + ' "type": "new_block"}, "name": "NewBlock"}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert not event_queue.empty() + + +@pytest.mark.tendermint +def test_process_event_empty_block(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\ + '"result": {"name": "NewBlock", "data": {"type": "new_block",'\ + ' "data": {"block": {"header": {"chain_id": "test-chain-cbVRwC",'\ + ' "height": 1, "time": "2017-12-04T22:42:54.33+05:30", "num_txs": 0,'\ + ' "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\ + ' "last_commit_hash": "", "data_hash": "",'\ + ' "validators_hash": "ACF23A690EB72D051931E878E8F3D6E01A17A81C",'\ + ' "app_hash": ""}, "data": {"txs": []}, "last_commit": {"blockID": '\ + ' {"hash": "", "parts": {"total": 0, "hash": ""}}, "precommits": []}}}}}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert event_queue.empty() + + +@pytest.mark.tendermint +def test_process_unknown_event(): + from bigchaindb.tendermint.event_stream import process_event + + event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\ + ' "result": {"name": "UnknownEvent"}}' + + event_queue = Queue() + process_event(event_queue, event, 'test_stream_id') + assert event_queue.empty() diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index d071f9e7..4545fb59 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -7,6 +7,8 @@ from unittest.mock import patch import pytest +pytestmark = pytest.mark.tendermint + @pytest.fixture def _block(b, request): From a8d267f429d3000ec5d29c35062b06fd88b50880 Mon Sep 17 00:00:00 2001 From: kansi Date: Tue, 12 Dec 2017 22:34:17 +0530 Subject: [PATCH 2/2] Fix asyncio compability with python 3.5 --- bigchaindb/tendermint/commands.py | 2 +- bigchaindb/tendermint/event_stream.py | 65 ++++++++++++++------------- bigchaindb/tendermint/utils.py | 4 +- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/bigchaindb/tendermint/commands.py b/bigchaindb/tendermint/commands.py index 3e007150..8ee39edb 100644 --- a/bigchaindb/tendermint/commands.py +++ b/bigchaindb/tendermint/commands.py @@ -57,7 +57,7 @@ def start(): p_websocket_server.start() # connect to tendermint event stream - p_websocket_client = mp.Process(name='ws_client', + p_websocket_client = mp.Process(name='ws_to_tendermint', target=event_stream.start, args=(exchange.get_publisher_queue(),)) p_websocket_client.start() diff --git a/bigchaindb/tendermint/event_stream.py b/bigchaindb/tendermint/event_stream.py index 2f647139..f8f01346 100644 --- a/bigchaindb/tendermint/event_stream.py +++ b/bigchaindb/tendermint/event_stream.py @@ -7,39 +7,34 @@ import aiohttp from bigchaindb.common.utils import gen_timestamp from bigchaindb.events import EventTypes, Event -from bigchaindb.tendermint.utils import decode_txn_base64 +from bigchaindb.tendermint.utils import decode_transaction_base64 HOST = 'localhost' PORT = 46657 URL = f'ws://{HOST}:{PORT}/websocket' -PAYLOAD = { - "method": "subscribe", - "jsonrpc": "2.0", - "params": ["NewBlock"], - "id": "bigchaindb_stream" -} - - logger = logging.getLogger(__name__) -async def connect_and_recv(event_queue): +@asyncio.coroutine +def connect_and_recv(event_queue): session = aiohttp.ClientSession() - async with session.ws_connect(URL) as ws: - logger.info('Connected to tendermint ws server') + ws = yield from session.ws_connect(URL) - stream_id = "bigchaindb_stream_{}".format(gen_timestamp()) - await subscribe_events(ws, stream_id) + logger.info('Connected to tendermint ws server') - async for msg in ws: - process_event(event_queue, msg.data, stream_id) + stream_id = "bigchaindb_stream_{}".format(gen_timestamp()) + yield from subscribe_events(ws, stream_id) - if msg.type in (aiohttp.WSMsgType.CLOSED, - aiohttp.WSMsgType.ERROR): - session.close() - raise aiohttp.ClientConnectionError() + while True: + msg = yield from ws.receive() + process_event(event_queue, msg.data, stream_id) + + if msg.type in (aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR): + session.close() + raise aiohttp.ClientConnectionError() def process_event(event_queue, event, stream_id): @@ -53,26 +48,35 @@ def process_event(event_queue, event, stream_id): # Only push non empty blocks if block_txs: - block_txs = [json.loads(decode_txn_base64(txn)) for txn in block_txs] + block_txs = [decode_transaction_base64(txn) for txn in block_txs] new_block = {'id': str(block_id), 'transactions': block_txs} event = Event(EventTypes.BLOCK_VALID, new_block) event_queue.put(event) -async def subscribe_events(ws, stream_id): - PAYLOAD["id"] = stream_id - await ws.send_str(json.dumps(PAYLOAD)) +@asyncio.coroutine +def subscribe_events(ws, stream_id): + payload = { + "method": "subscribe", + "jsonrpc": "2.0", + "params": ["NewBlock"], + "id": stream_id + } + yield from ws.send_str(json.dumps(payload)) -async def try_connect_and_recv(event_queue, gas): +@asyncio.coroutine +def try_connect_and_recv(event_queue, max_tries): try: - await connect_and_recv(event_queue) + yield from connect_and_recv(event_queue) except Exception as e: - logger.error('WebSocket connection failed with exception: {}'.format(e)) - if gas: + if max_tries: + logger.warning('WebSocket connection failed with exception %s', e) time.sleep(2) - await try_connect_and_recv(event_queue, gas-1) + yield from try_connect_and_recv(event_queue, max_tries-1) + else: + logger.exception('WebSocket connection failed with exception %s', e) def start(event_queue): @@ -80,5 +84,4 @@ def start(event_queue): try: loop.run_until_complete(try_connect_and_recv(event_queue, 5)) except (KeyboardInterrupt, SystemExit): - logger.info('Shutting down tendermint event stream connection') - return + logger.info('Shutting down Tendermint event stream connection') diff --git a/bigchaindb/tendermint/utils.py b/bigchaindb/tendermint/utils.py index 5208e801..b84954a5 100644 --- a/bigchaindb/tendermint/utils.py +++ b/bigchaindb/tendermint/utils.py @@ -15,10 +15,10 @@ def decode_transaction(raw): return json.loads(raw.decode('utf8')) -def decode_txn_base64(value): +def decode_transaction_base64(value): """Decode a transaction from Base64.""" - return base64.b64decode(value.encode('utf8')).decode('utf8') + return json.loads(base64.b64decode(value.encode('utf8')).decode('utf8')) def calculate_hash(key_list):