From 4b54e702f8c2477a8bd56a8dffeda25b241a88a0 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 12 Sep 2018 16:55:23 +0200 Subject: [PATCH] Problem: Events API is unreliable (#2529) Solution: Stop getting events from the Tendermint events API. Get the event right before returning from the COMMIT phase of Tendermint, and publish it to the events queue. --- bigchaindb/core.py | 12 +- bigchaindb/event_stream.py | 89 -------------- bigchaindb/start.py | 10 +- bigchaindb/web/websocket_server.py | 10 +- .../events/websocket-event-stream-api.rst | 4 +- tests/tendermint/test_core.py | 8 +- tests/tendermint/test_event_stream.py | 111 ------------------ .../test_upsert_validator_vote.py | 1 + tests/web/test_websocket_server.py | 37 +++--- 9 files changed, 47 insertions(+), 235 deletions(-) delete mode 100644 bigchaindb/event_stream.py delete mode 100644 tests/tendermint/test_event_stream.py diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 62bc6ec3..2f9fa23f 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -26,6 +26,7 @@ from bigchaindb.lib import Block, PreCommitState from bigchaindb.backend.query import PRE_COMMIT_ID from bigchaindb.upsert_validator import ValidatorElection import bigchaindb.upsert_validator.validator_utils as vutils +from bigchaindb.events import EventTypes, Event CodeTypeOk = 0 @@ -41,7 +42,8 @@ class App(BaseApplication): State Machine. """ - def __init__(self, bigchaindb=None): + def __init__(self, bigchaindb=None, events_queue=None): + self.events_queue = events_queue self.bigchaindb = bigchaindb or BigchainDB() self.block_txn_ids = [] self.block_txn_hash = '' @@ -245,4 +247,12 @@ class App(BaseApplication): 'height=%s, txn ids=%s', data, self.new_height, self.block_txn_ids) logger.benchmark('COMMIT_BLOCK, height:%s', self.new_height) + + if self.events_queue: + event = Event(EventTypes.BLOCK_VALID, { + 'height': self.new_height, + 'transactions': self.block_transactions + }) + self.events_queue.put(event) + return ResponseCommit(data=data) diff --git a/bigchaindb/event_stream.py b/bigchaindb/event_stream.py deleted file mode 100644 index a7d39187..00000000 --- a/bigchaindb/event_stream.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright BigchainDB GmbH and BigchainDB contributors -# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) -# Code is Apache-2.0 and docs are CC-BY-4.0 - -import asyncio -import json -import logging -import time - -import aiohttp - -from bigchaindb import config -from bigchaindb.common.utils import gen_timestamp -from bigchaindb.events import EventTypes, Event -from bigchaindb.tendermint_utils import decode_transaction_base64 - - -HOST = config['tendermint']['host'] -PORT = config['tendermint']['port'] -URL = 'ws://{}:{}/websocket'.format(HOST, PORT) - -logger = logging.getLogger(__name__) - - -@asyncio.coroutine -def connect_and_recv(event_queue): - session = aiohttp.ClientSession() - ws = yield from session.ws_connect(URL) - - logger.info('Connected to tendermint ws server') - - stream_id = 'bigchaindb_stream_{}'.format(gen_timestamp()) - yield from subscribe_events(ws, stream_id) - - while True: - msg = yield from ws.receive() - process_event(event_queue, msg.data, stream_id) - - if msg.type in (aiohttp.WSMsgType.CLOSED, - aiohttp.WSMsgType.ERROR): - session.close() - raise aiohttp.ClientConnectionError() - - -def process_event(event_queue, event, stream_id): - event_stream_id = stream_id + '#event' - event = json.loads(event) - - if (event['id'] == event_stream_id and event['result']['query'] == 'tm.event=\'NewBlock\''): - block = event['result']['data']['value']['block'] - block_id = block['header']['height'] - block_txs = block['data']['txs'] - - # Only push non empty blocks - if block_txs: - block_txs = [decode_transaction_base64(txn) for txn in block_txs] - new_block = {'height': block_id, 'transactions': block_txs} - event = Event(EventTypes.BLOCK_VALID, new_block) - event_queue.put(event) - - -@asyncio.coroutine -def subscribe_events(ws, stream_id): - payload = { - 'method': 'subscribe', - 'jsonrpc': '2.0', - 'params': ['tm.event=\'NewBlock\''], - 'id': stream_id - } - yield from ws.send_str(json.dumps(payload)) - - -@asyncio.coroutine -def try_connect_and_recv(event_queue): - try: - yield from connect_and_recv(event_queue) - - except Exception as e: - logger.warning('WebSocket connection failed with exception %s', e) - time.sleep(3) - yield from try_connect_and_recv(event_queue) - - -def start(event_queue): - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(try_connect_and_recv(event_queue)) - except (KeyboardInterrupt, SystemExit): - logger.info('Shutting down Tendermint event stream connection') diff --git a/bigchaindb/start.py b/bigchaindb/start.py index 2b9667b5..41173be0 100644 --- a/bigchaindb/start.py +++ b/bigchaindb/start.py @@ -9,7 +9,6 @@ import bigchaindb from bigchaindb.lib import BigchainDB from bigchaindb.core import App from bigchaindb.web import server, websocket_server -from bigchaindb import event_stream from bigchaindb.events import Exchange, EventTypes from bigchaindb.utils import Process @@ -57,13 +56,6 @@ def start(): args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),)) p_websocket_server.start() - # connect to tendermint event stream - p_websocket_client = Process(name='bigchaindb_ws_to_tendermint', - target=event_stream.start, - daemon=True, - args=(exchange.get_publisher_queue(),)) - p_websocket_client.start() - p_exchange = Process(name='bigchaindb_exchange', target=exchange.run, daemon=True) p_exchange.start() @@ -75,7 +67,7 @@ def start(): setproctitle.setproctitle('bigchaindb') # Start the ABCIServer - app = ABCIServer(app=App()) + app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue())) app.run() diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index f0a9f886..efe10151 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -50,13 +50,13 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop): def eventify_block(block): for tx in block['transactions']: - try: - asset_id = tx['asset']['id'] - except KeyError: - asset_id = tx['id'] + if tx.asset: + asset_id = tx.asset.get('id', tx.id) + else: + asset_id = tx.id yield {'height': block['height'], 'asset_id': asset_id, - 'transaction_id': tx['id']} + 'transaction_id': tx.id} class Dispatcher: diff --git a/docs/server/source/events/websocket-event-stream-api.rst b/docs/server/source/events/websocket-event-stream-api.rst index 526df494..967b3464 100644 --- a/docs/server/source/events/websocket-event-stream-api.rst +++ b/docs/server/source/events/websocket-event-stream-api.rst @@ -90,7 +90,7 @@ Valid Transactions Streams an event for any newly valid transactions committed to a block. Message bodies contain the transaction's ID, associated asset ID, and containing -block's ID. +block's height. Example message: @@ -99,7 +99,7 @@ Example message: { "transaction_id": "", "asset_id": "", - "block_id": "" + "height": } diff --git a/tests/tendermint/test_core.py b/tests/tendermint/test_core.py index 31b4e100..5875ed9b 100644 --- a/tests/tendermint/test_core.py +++ b/tests/tendermint/test_core.py @@ -235,19 +235,21 @@ def test_check_tx__unsigned_create_is_error(b): assert result.code == CodeTypeError -def test_deliver_tx__valid_create_updates_db(b, init_chain_request): +def test_deliver_tx__valid_create_updates_db_and_emits_event(b, init_chain_request): + import multiprocessing as mp from bigchaindb import App from bigchaindb.models import Transaction from bigchaindb.common.crypto import generate_key_pair alice = generate_key_pair() bob = generate_key_pair() + events = mp.Queue() tx = Transaction.create([alice.public_key], [([bob.public_key], 1)])\ .sign([alice.private_key]) - app = App(b) + app = App(b, events) app.init_chain(init_chain_request) @@ -260,6 +262,8 @@ def test_deliver_tx__valid_create_updates_db(b, init_chain_request): app.end_block(RequestEndBlock(height=99)) app.commit() assert b.get_transaction(tx.id).id == tx.id + block_event = events.get() + assert block_event.data['transactions'] == [tx] # unspent_outputs = b.get_unspent_outputs() # unspent_output = next(unspent_outputs) diff --git a/tests/tendermint/test_event_stream.py b/tests/tendermint/test_event_stream.py deleted file mode 100644 index f23eeaba..00000000 --- a/tests/tendermint/test_event_stream.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright BigchainDB GmbH and BigchainDB contributors -# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) -# Code is Apache-2.0 and docs are CC-BY-4.0 - -import json -import base64 -from queue import Queue - -from aiohttp import ClientSession -import pytest - - -def test_process_event_new_block(): - from bigchaindb.event_stream import process_event - - event = '{"jsonrpc": "2.0", "id": "test_stream_id#event", "result": {'\ - '"query": "tm.event=\'NewBlock\'", "data": { "type": "CF18EA939D3240",'\ - '"value": { "block": { "header": { "chain_id": "test-chain-ipQIAa",'\ - '"height": 1, "time": "2018-04-23T14:49:30.509920098Z", "num_txs": 1,'\ - '"last_block_id": { "hash": "", "parts": { "total": 0, "hash": "" }},'\ - '"total_txs": 1, "last_commit_hash": "", "data_hash": "38792142CE6D7F6F46F71777CB53F94CD9497B23",'\ - '"validators_hash": "BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF",'\ - '"consensus_hash": "F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "",'\ - '"last_results_hash": "", "evidence_hash": "" }, "data": {"txs": ['\ - '"eyJpbnB1dHMiOiBbeyJvd25lcnNfYmVmb3JlIjogWyJFb2Z0Z0FNd2hKQXM0cW81b'\ - '0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZSJdLCAiZnVsZmlsbHMiOiBudWxsLCA'\ - 'iZnVsZmlsbG1lbnQiOiAicEdTQUlNMGNueFFGeTZrSE1PcGxBbzh1ZncwNDlsZ2VxN'\ - 'HBOeDFNdksya0pjRjBCZ1VETjN2RTlsWmhaT21jMWZHbFpLUFZmZDdCTi1RVTdBa0N'\ - 'TZ1NKWVRPYzB3YVlmQ1RXc1FQS1VmOE5fODFKd21YOUJxcnlLejYyTmVubHg0dGszN'\ - 'GtVRCJ9XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiRW9mdGdBTXdoSkF'\ - 'zNHFvNW9HYTlNRllxeXRaeVhHWjZlZmRVWHNXV0w3ZmUiXSwgImNvbmRpdGlvbiI6I'\ - 'HsiZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2t'\ - 'leSI6ICJFb2Z0Z0FNd2hKQXM0cW81b0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZ'\ - 'SJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7cFJZWTJQQUE0S3dHd0dUNVQtUXRCQUY'\ - '0VWY1WG5JcVkxWmExVER0N0hMQT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9MTMxM'\ - 'DcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUiLCAibWV'\ - '0YWRhdGEiOiBudWxsLCAiYXNzZXQiOiB7ImRhdGEiOiBudWxsfSwgInZlcnNpb24iO'\ - 'iAiMi4wIiwgImlkIjogImUwMmM0ZWM3MmExYzUzMmJkNjUyNWZkNGMxODU3ZDhmN2E'\ - 'wYWVkYTgyNGVjY2NhZGY4NTlmNzc0Zjc3ZTgwZGUifQ=="]}, "evidence": {'\ - '"evidence": null}, "last_commit": { "blockID": { "hash": "", "parts":'\ - '{"total": 0, "hash": ""} }, "precommits": null } } } } } }' - - event_queue = Queue() - process_event(event_queue, event, 'test_stream_id') - assert not event_queue.empty() - block = event_queue.get() - assert isinstance(block.data['height'], int) - - -def test_process_event_empty_block(): - from bigchaindb.event_stream import process_event - - event = '{"jsonrpc": "2.0", "id": "bigchaindb_stream_1524555674#event",'\ - '"result": {"query": "tm.event=\'NewBlock\'", "data": {"type": '\ - '"CF18EA939D3240", "value": {"block": {"header": {"chain_id": '\ - '"test-chain-ipQIAa", "height": 1, "time": "2018-04-24T07:41:16.838038877Z",'\ - '"num_txs": 0, "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\ - '"total_txs": 0, "last_commit_hash": "", "data_hash": "", "validators_hash":'\ - '"BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF", "consensus_hash": '\ - '"F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "", '\ - '"last_results_hash": "", "evidence_hash": ""}, "data": {"txs": null},'\ - '"evidence": {"evidence": null}, "last_commit": {"blockID": {"hash": "", '\ - '"parts": {"total": 0, "hash": ""}}, "precommits": null}}}}}}' - - event_queue = Queue() - process_event(event_queue, event, 'test_stream_id') - assert event_queue.empty() - - -def test_process_unknown_event(): - from bigchaindb.event_stream import process_event - - event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\ - ' "result": { "query": "tm.event=\'UnknownEvent\'" }}' - - event_queue = Queue() - process_event(event_queue, event, 'test_stream_id') - assert event_queue.empty() - - -@pytest.mark.asyncio -@pytest.mark.abci -async def test_subscribe_events(tendermint_ws_url, b): - from bigchaindb.event_stream import subscribe_events - from bigchaindb.common.crypto import generate_key_pair - from bigchaindb.models import Transaction - - session = ClientSession() - ws = await session.ws_connect(tendermint_ws_url) - stream_id = 'bigchaindb_stream_01' - await subscribe_events(ws, stream_id) - msg = await ws.receive() - assert msg.data - msg_data_dict = json.loads(msg.data) - assert msg_data_dict['id'] == stream_id - assert msg_data_dict['jsonrpc'] == '2.0' - assert msg_data_dict['result'] == {} - - alice = generate_key_pair() - tx = Transaction.create([alice.public_key], - [([alice.public_key], 1)], - asset=None)\ - .sign([alice.private_key]) - - b.post_transaction(tx, 'broadcast_tx_async') - msg = await ws.receive() - msg_data_dict = json.loads(msg.data) - raw_txn = msg_data_dict['result']['data']['value']['block']['data']['txs'][0] - transaction = json.loads(base64.b64decode(raw_txn).decode('utf8')) - - assert transaction == tx.to_dict() diff --git a/tests/upsert_validator/test_upsert_validator_vote.py b/tests/upsert_validator/test_upsert_validator_vote.py index 8567397c..b2e23af7 100644 --- a/tests/upsert_validator/test_upsert_validator_vote.py +++ b/tests/upsert_validator/test_upsert_validator_vote.py @@ -229,6 +229,7 @@ def test_upsert_validator(b, node_key, node_keys, ed25519_node_keys): latest_block = b.get_latest_block() # reset the validator set b.store_validator_set(latest_block['height'], validators) + generate_block(b) power = 1 public_key = '9B3119650DF82B9A5D8A12E38953EA47475C09F0C48A4E6A0ECE182944B24403' diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index dff74f21..7f72b795 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -21,25 +21,30 @@ class MockWebSocket: def test_eventify_block_works_with_any_transaction(): from bigchaindb.web.websocket_server import eventify_block + from bigchaindb.common.crypto import generate_key_pair + from bigchaindb.lib import Transaction - block = { - 'height': 1, - 'transactions': [{ - 'id': 1 - }, { - 'id': 2, - 'asset': {'id': 1} - }] - } + alice = generate_key_pair() + + tx = Transaction.create([alice.public_key], + [([alice.public_key], 1)])\ + .sign([alice.private_key]) + tx_transfer = Transaction.transfer(tx.to_inputs(), + [([alice.public_key], 1)], + asset_id=tx.id)\ + .sign([alice.private_key]) + + block = {'height': 1, + 'transactions': [tx, tx_transfer]} expected_events = [{ 'height': 1, - 'asset_id': 1, - 'transaction_id': 1 + 'asset_id': tx.id, + 'transaction_id': tx.id }, { 'height': 1, - 'asset_id': 1, - 'transaction_id': 2 + 'asset_id': tx_transfer.asset['id'], + 'transaction_id': tx_transfer.id }] for event, expected in zip(eventify_block(block), expected_events): @@ -144,7 +149,7 @@ def test_websocket_block_event(b, test_client, loop): app = init_app(event_source, loop=loop) client = yield from test_client(app) ws = yield from client.ws_connect(EVENTS_ENDPOINT) - block = {'height': 1, 'transactions': [tx.to_dict()]} + block = {'height': 1, 'transactions': [tx]} block_event = events.Event(events.EventTypes.BLOCK_VALID, block) yield from event_source.put(block_event) @@ -152,9 +157,9 @@ def test_websocket_block_event(b, test_client, loop): for tx in block['transactions']: result = yield from ws.receive() json_result = json.loads(result.data) - assert json_result['transaction_id'] == tx['id'] + assert json_result['transaction_id'] == tx.id # Since the transactions are all CREATEs, asset id == transaction id - assert json_result['asset_id'] == tx['id'] + assert json_result['asset_id'] == tx.id assert json_result['height'] == block['height'] yield from event_source.put(POISON_PILL)