Problem: Events API is unreliable (#2529)

Solution: Stop getting events from the Tendermint events API. Get the
event right before returning from the COMMIT phase of Tendermint, and
publish it to the events queue.
This commit is contained in:
vrde 2018-09-12 16:55:23 +02:00 committed by Vanshdeep Singh
parent 0f41869bea
commit 4b54e702f8
9 changed files with 47 additions and 235 deletions

View File

@ -26,6 +26,7 @@ from bigchaindb.lib import Block, PreCommitState
from bigchaindb.backend.query import PRE_COMMIT_ID
from bigchaindb.upsert_validator import ValidatorElection
import bigchaindb.upsert_validator.validator_utils as vutils
from bigchaindb.events import EventTypes, Event
CodeTypeOk = 0
@ -41,7 +42,8 @@ class App(BaseApplication):
State Machine.
"""
def __init__(self, bigchaindb=None):
def __init__(self, bigchaindb=None, events_queue=None):
self.events_queue = events_queue
self.bigchaindb = bigchaindb or BigchainDB()
self.block_txn_ids = []
self.block_txn_hash = ''
@ -245,4 +247,12 @@ class App(BaseApplication):
'height=%s, txn ids=%s', data, self.new_height,
self.block_txn_ids)
logger.benchmark('COMMIT_BLOCK, height:%s', self.new_height)
if self.events_queue:
event = Event(EventTypes.BLOCK_VALID, {
'height': self.new_height,
'transactions': self.block_transactions
})
self.events_queue.put(event)
return ResponseCommit(data=data)

View File

@ -1,89 +0,0 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import asyncio
import json
import logging
import time
import aiohttp
from bigchaindb import config
from bigchaindb.common.utils import gen_timestamp
from bigchaindb.events import EventTypes, Event
from bigchaindb.tendermint_utils import decode_transaction_base64
HOST = config['tendermint']['host']
PORT = config['tendermint']['port']
URL = 'ws://{}:{}/websocket'.format(HOST, PORT)
logger = logging.getLogger(__name__)
@asyncio.coroutine
def connect_and_recv(event_queue):
session = aiohttp.ClientSession()
ws = yield from session.ws_connect(URL)
logger.info('Connected to tendermint ws server')
stream_id = 'bigchaindb_stream_{}'.format(gen_timestamp())
yield from subscribe_events(ws, stream_id)
while True:
msg = yield from ws.receive()
process_event(event_queue, msg.data, stream_id)
if msg.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR):
session.close()
raise aiohttp.ClientConnectionError()
def process_event(event_queue, event, stream_id):
event_stream_id = stream_id + '#event'
event = json.loads(event)
if (event['id'] == event_stream_id and event['result']['query'] == 'tm.event=\'NewBlock\''):
block = event['result']['data']['value']['block']
block_id = block['header']['height']
block_txs = block['data']['txs']
# Only push non empty blocks
if block_txs:
block_txs = [decode_transaction_base64(txn) for txn in block_txs]
new_block = {'height': block_id, 'transactions': block_txs}
event = Event(EventTypes.BLOCK_VALID, new_block)
event_queue.put(event)
@asyncio.coroutine
def subscribe_events(ws, stream_id):
payload = {
'method': 'subscribe',
'jsonrpc': '2.0',
'params': ['tm.event=\'NewBlock\''],
'id': stream_id
}
yield from ws.send_str(json.dumps(payload))
@asyncio.coroutine
def try_connect_and_recv(event_queue):
try:
yield from connect_and_recv(event_queue)
except Exception as e:
logger.warning('WebSocket connection failed with exception %s', e)
time.sleep(3)
yield from try_connect_and_recv(event_queue)
def start(event_queue):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(try_connect_and_recv(event_queue))
except (KeyboardInterrupt, SystemExit):
logger.info('Shutting down Tendermint event stream connection')

View File

@ -9,7 +9,6 @@ import bigchaindb
from bigchaindb.lib import BigchainDB
from bigchaindb.core import App
from bigchaindb.web import server, websocket_server
from bigchaindb import event_stream
from bigchaindb.events import Exchange, EventTypes
from bigchaindb.utils import Process
@ -57,13 +56,6 @@ def start():
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
p_websocket_server.start()
# connect to tendermint event stream
p_websocket_client = Process(name='bigchaindb_ws_to_tendermint',
target=event_stream.start,
daemon=True,
args=(exchange.get_publisher_queue(),))
p_websocket_client.start()
p_exchange = Process(name='bigchaindb_exchange', target=exchange.run, daemon=True)
p_exchange.start()
@ -75,7 +67,7 @@ def start():
setproctitle.setproctitle('bigchaindb')
# Start the ABCIServer
app = ABCIServer(app=App())
app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue()))
app.run()

View File

@ -50,13 +50,13 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
def eventify_block(block):
for tx in block['transactions']:
try:
asset_id = tx['asset']['id']
except KeyError:
asset_id = tx['id']
if tx.asset:
asset_id = tx.asset.get('id', tx.id)
else:
asset_id = tx.id
yield {'height': block['height'],
'asset_id': asset_id,
'transaction_id': tx['id']}
'transaction_id': tx.id}
class Dispatcher:

View File

@ -90,7 +90,7 @@ Valid Transactions
Streams an event for any newly valid transactions committed to a block. Message
bodies contain the transaction's ID, associated asset ID, and containing
block's ID.
block's height.
Example message:
@ -99,7 +99,7 @@ Example message:
{
"transaction_id": "<sha3-256 hash>",
"asset_id": "<sha3-256 hash>",
"block_id": "<int>"
"height": <int>
}

View File

@ -235,19 +235,21 @@ def test_check_tx__unsigned_create_is_error(b):
assert result.code == CodeTypeError
def test_deliver_tx__valid_create_updates_db(b, init_chain_request):
def test_deliver_tx__valid_create_updates_db_and_emits_event(b, init_chain_request):
import multiprocessing as mp
from bigchaindb import App
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
alice = generate_key_pair()
bob = generate_key_pair()
events = mp.Queue()
tx = Transaction.create([alice.public_key],
[([bob.public_key], 1)])\
.sign([alice.private_key])
app = App(b)
app = App(b, events)
app.init_chain(init_chain_request)
@ -260,6 +262,8 @@ def test_deliver_tx__valid_create_updates_db(b, init_chain_request):
app.end_block(RequestEndBlock(height=99))
app.commit()
assert b.get_transaction(tx.id).id == tx.id
block_event = events.get()
assert block_event.data['transactions'] == [tx]
# unspent_outputs = b.get_unspent_outputs()
# unspent_output = next(unspent_outputs)

View File

@ -1,111 +0,0 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import json
import base64
from queue import Queue
from aiohttp import ClientSession
import pytest
def test_process_event_new_block():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event", "result": {'\
'"query": "tm.event=\'NewBlock\'", "data": { "type": "CF18EA939D3240",'\
'"value": { "block": { "header": { "chain_id": "test-chain-ipQIAa",'\
'"height": 1, "time": "2018-04-23T14:49:30.509920098Z", "num_txs": 1,'\
'"last_block_id": { "hash": "", "parts": { "total": 0, "hash": "" }},'\
'"total_txs": 1, "last_commit_hash": "", "data_hash": "38792142CE6D7F6F46F71777CB53F94CD9497B23",'\
'"validators_hash": "BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF",'\
'"consensus_hash": "F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "",'\
'"last_results_hash": "", "evidence_hash": "" }, "data": {"txs": ['\
'"eyJpbnB1dHMiOiBbeyJvd25lcnNfYmVmb3JlIjogWyJFb2Z0Z0FNd2hKQXM0cW81b'\
'0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZSJdLCAiZnVsZmlsbHMiOiBudWxsLCA'\
'iZnVsZmlsbG1lbnQiOiAicEdTQUlNMGNueFFGeTZrSE1PcGxBbzh1ZncwNDlsZ2VxN'\
'HBOeDFNdksya0pjRjBCZ1VETjN2RTlsWmhaT21jMWZHbFpLUFZmZDdCTi1RVTdBa0N'\
'TZ1NKWVRPYzB3YVlmQ1RXc1FQS1VmOE5fODFKd21YOUJxcnlLejYyTmVubHg0dGszN'\
'GtVRCJ9XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiRW9mdGdBTXdoSkF'\
'zNHFvNW9HYTlNRllxeXRaeVhHWjZlZmRVWHNXV0w3ZmUiXSwgImNvbmRpdGlvbiI6I'\
'HsiZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2t'\
'leSI6ICJFb2Z0Z0FNd2hKQXM0cW81b0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZ'\
'SJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7cFJZWTJQQUE0S3dHd0dUNVQtUXRCQUY'\
'0VWY1WG5JcVkxWmExVER0N0hMQT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9MTMxM'\
'DcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUiLCAibWV'\
'0YWRhdGEiOiBudWxsLCAiYXNzZXQiOiB7ImRhdGEiOiBudWxsfSwgInZlcnNpb24iO'\
'iAiMi4wIiwgImlkIjogImUwMmM0ZWM3MmExYzUzMmJkNjUyNWZkNGMxODU3ZDhmN2E'\
'wYWVkYTgyNGVjY2NhZGY4NTlmNzc0Zjc3ZTgwZGUifQ=="]}, "evidence": {'\
'"evidence": null}, "last_commit": { "blockID": { "hash": "", "parts":'\
'{"total": 0, "hash": ""} }, "precommits": null } } } } } }'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert not event_queue.empty()
block = event_queue.get()
assert isinstance(block.data['height'], int)
def test_process_event_empty_block():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "bigchaindb_stream_1524555674#event",'\
'"result": {"query": "tm.event=\'NewBlock\'", "data": {"type": '\
'"CF18EA939D3240", "value": {"block": {"header": {"chain_id": '\
'"test-chain-ipQIAa", "height": 1, "time": "2018-04-24T07:41:16.838038877Z",'\
'"num_txs": 0, "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\
'"total_txs": 0, "last_commit_hash": "", "data_hash": "", "validators_hash":'\
'"BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF", "consensus_hash": '\
'"F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "", '\
'"last_results_hash": "", "evidence_hash": ""}, "data": {"txs": null},'\
'"evidence": {"evidence": null}, "last_commit": {"blockID": {"hash": "", '\
'"parts": {"total": 0, "hash": ""}}, "precommits": null}}}}}}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()
def test_process_unknown_event():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\
' "result": { "query": "tm.event=\'UnknownEvent\'" }}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()
@pytest.mark.asyncio
@pytest.mark.abci
async def test_subscribe_events(tendermint_ws_url, b):
from bigchaindb.event_stream import subscribe_events
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.models import Transaction
session = ClientSession()
ws = await session.ws_connect(tendermint_ws_url)
stream_id = 'bigchaindb_stream_01'
await subscribe_events(ws, stream_id)
msg = await ws.receive()
assert msg.data
msg_data_dict = json.loads(msg.data)
assert msg_data_dict['id'] == stream_id
assert msg_data_dict['jsonrpc'] == '2.0'
assert msg_data_dict['result'] == {}
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset=None)\
.sign([alice.private_key])
b.post_transaction(tx, 'broadcast_tx_async')
msg = await ws.receive()
msg_data_dict = json.loads(msg.data)
raw_txn = msg_data_dict['result']['data']['value']['block']['data']['txs'][0]
transaction = json.loads(base64.b64decode(raw_txn).decode('utf8'))
assert transaction == tx.to_dict()

View File

@ -229,6 +229,7 @@ def test_upsert_validator(b, node_key, node_keys, ed25519_node_keys):
latest_block = b.get_latest_block()
# reset the validator set
b.store_validator_set(latest_block['height'], validators)
generate_block(b)
power = 1
public_key = '9B3119650DF82B9A5D8A12E38953EA47475C09F0C48A4E6A0ECE182944B24403'

View File

@ -21,25 +21,30 @@ class MockWebSocket:
def test_eventify_block_works_with_any_transaction():
from bigchaindb.web.websocket_server import eventify_block
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.lib import Transaction
block = {
'height': 1,
'transactions': [{
'id': 1
}, {
'id': 2,
'asset': {'id': 1}
}]
}
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)])\
.sign([alice.private_key])
tx_transfer = Transaction.transfer(tx.to_inputs(),
[([alice.public_key], 1)],
asset_id=tx.id)\
.sign([alice.private_key])
block = {'height': 1,
'transactions': [tx, tx_transfer]}
expected_events = [{
'height': 1,
'asset_id': 1,
'transaction_id': 1
'asset_id': tx.id,
'transaction_id': tx.id
}, {
'height': 1,
'asset_id': 1,
'transaction_id': 2
'asset_id': tx_transfer.asset['id'],
'transaction_id': tx_transfer.id
}]
for event, expected in zip(eventify_block(block), expected_events):
@ -144,7 +149,7 @@ def test_websocket_block_event(b, test_client, loop):
app = init_app(event_source, loop=loop)
client = yield from test_client(app)
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
block = {'height': 1, 'transactions': [tx.to_dict()]}
block = {'height': 1, 'transactions': [tx]}
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
yield from event_source.put(block_event)
@ -152,9 +157,9 @@ def test_websocket_block_event(b, test_client, loop):
for tx in block['transactions']:
result = yield from ws.receive()
json_result = json.loads(result.data)
assert json_result['transaction_id'] == tx['id']
assert json_result['transaction_id'] == tx.id
# Since the transactions are all CREATEs, asset id == transaction id
assert json_result['asset_id'] == tx['id']
assert json_result['asset_id'] == tx.id
assert json_result['height'] == block['height']
yield from event_source.put(POISON_PILL)