Merge pull request #1928 from bigchaindb/feat/integrate-event-stream

Integrate event stream api
This commit is contained in:
vrde 2017-12-20 16:06:16 +01:00 committed by GitHub
commit 57d4a8e895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 178 additions and 1 deletions

View File

@ -6,7 +6,9 @@ from os import getenv
import bigchaindb import bigchaindb
from bigchaindb.tendermint.lib import BigchainDB from bigchaindb.tendermint.lib import BigchainDB
from bigchaindb.tendermint.core import App from bigchaindb.tendermint.core import App
from bigchaindb.web import server from bigchaindb.web import server, websocket_server
from bigchaindb.tendermint import event_stream
from bigchaindb.events import Exchange, EventTypes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,6 +28,10 @@ BANNER = """
def start(): def start():
# Exchange object for event stream api
exchange = Exchange()
# start the web api # start the web api
app_server = server.create_server( app_server = server.create_server(
settings=bigchaindb.config['server'], settings=bigchaindb.config['server'],
@ -44,6 +50,18 @@ def start():
'--consensus.create_empty_blocks=false', '--consensus.create_empty_blocks=false',
]) ])
# start websocket server
p_websocket_server = mp.Process(name='ws',
target=websocket_server.start,
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
p_websocket_server.start()
# connect to tendermint event stream
p_websocket_client = mp.Process(name='ws_to_tendermint',
target=event_stream.start,
args=(exchange.get_publisher_queue(),))
p_websocket_client.start()
# We need to import this after spawning the web server # We need to import this after spawning the web server
# because import ABCIServer will monkeypatch all sockets # because import ABCIServer will monkeypatch all sockets
# for gevent. # for gevent.

View File

@ -0,0 +1,87 @@
import json
import logging
import time
import asyncio
import aiohttp
from bigchaindb.common.utils import gen_timestamp
from bigchaindb.events import EventTypes, Event
from bigchaindb.tendermint.utils import decode_transaction_base64
HOST = 'localhost'
PORT = 46657
URL = f'ws://{HOST}:{PORT}/websocket'
logger = logging.getLogger(__name__)
@asyncio.coroutine
def connect_and_recv(event_queue):
session = aiohttp.ClientSession()
ws = yield from session.ws_connect(URL)
logger.info('Connected to tendermint ws server')
stream_id = "bigchaindb_stream_{}".format(gen_timestamp())
yield from subscribe_events(ws, stream_id)
while True:
msg = yield from ws.receive()
process_event(event_queue, msg.data, stream_id)
if msg.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR):
session.close()
raise aiohttp.ClientConnectionError()
def process_event(event_queue, event, stream_id):
event_stream_id = stream_id + '#event'
event = json.loads(event)
if (event['id'] == event_stream_id and event['result']['name'] == 'NewBlock'):
block = event['result']['data']['data']['block']
block_id = block['header']['height']
block_txs = block['data']['txs']
# Only push non empty blocks
if block_txs:
block_txs = [decode_transaction_base64(txn) for txn in block_txs]
new_block = {'id': str(block_id), 'transactions': block_txs}
event = Event(EventTypes.BLOCK_VALID, new_block)
event_queue.put(event)
@asyncio.coroutine
def subscribe_events(ws, stream_id):
payload = {
"method": "subscribe",
"jsonrpc": "2.0",
"params": ["NewBlock"],
"id": stream_id
}
yield from ws.send_str(json.dumps(payload))
@asyncio.coroutine
def try_connect_and_recv(event_queue, max_tries):
try:
yield from connect_and_recv(event_queue)
except Exception as e:
if max_tries:
logger.warning('WebSocket connection failed with exception %s', e)
time.sleep(2)
yield from try_connect_and_recv(event_queue, max_tries-1)
else:
logger.exception('WebSocket connection failed with exception %s', e)
def start(event_queue):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(try_connect_and_recv(event_queue, 5))
except (KeyboardInterrupt, SystemExit):
logger.info('Shutting down Tendermint event stream connection')

View File

@ -15,6 +15,12 @@ def decode_transaction(raw):
return json.loads(raw.decode('utf8')) return json.loads(raw.decode('utf8'))
def decode_transaction_base64(value):
"""Decode a transaction from Base64."""
return json.loads(base64.b64decode(value.encode('utf8')).decode('utf8'))
def calculate_hash(key_list): def calculate_hash(key_list):
if not key_list: if not key_list:
return '' return ''

View File

@ -0,0 +1,64 @@
from queue import Queue
import pytest
@pytest.mark.tendermint
def test_process_event_new_block():
from bigchaindb.tendermint.event_stream import process_event
event = '{"id": "test_stream_id#event", "jsonrpc": "2.0", "result":'\
' {"data": {"data": {"block": {"data": {"txs": ["eyJpbnB1dHMiOiBb'\
'eyJvd25lcnNfYmVmb3JlIjogWyJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4'\
'TWdyWmlib0gyeUZvYzVwTCJdLCAiZnVsZmlsbHMiOiBudWxsLCAiZnVsZmlsbG1l'\
'bnQiOiAicEdTQUlKMER2S2JBeXkyQ2hqT212ZWVCc0FxWktTS0k3VDNWZGhtUkI2'\
'V2dhdzdoZ1VDUHluUnFuQW9RWDh2UlNXeXNwYk5uYWVBaVpOU19lQ3V6ejhDZWtJ'\
'OHBIejJnekExeDJkOF93NTUzWFVOUGJFbnpBUzhncURqeDFkaE1JeDM1ZnpVTCJ9'\
'XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiQlp2S0JjUlJoMndLTjhm'\
'bkxDZWVHM0hoRWlheE1nclppYm9IMnlGb2M1cEwiXSwgImNvbmRpdGlvbiI6IHsi'\
'ZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2tl'\
'eSI6ICJCWnZLQmNSUmgyd0tOOGZuTENlZUczSGhFaWF4TWdyWmlib0gyeUZvYzVw'\
'TCJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7eHVFX1ZPNjd6aHc0LTRjN0k1YUtm'\
'WGtzX1Q1MjUwMnBuOC1mcVJQQkloRT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9'\
'MTMxMDcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUi'\
'LCAibWV0YWRhdGEiOiB7InNob3J0IjogImxpdHRsZSJ9LCAiYXNzZXQiOiB7ImRh'\
'dGEiOiB7ImJpY3ljbGUiOiB7InNlcmlhbF9udW1iZXIiOiAiYWJjZDEyMzQiLCAi'\
'bWFudWZhY3R1cmVyIjogImJrZmFiIn19fSwgInZlcnNpb24iOiAiMS4wIiwgImlk'\
'IjogIjE4NzM3Yzc0OWQxZGE2Yzc5YjFmYWZiZjkwOTkwNzEwMDA1ZWM4MTYxNGQ5'\
'YWFiNDkyZTgwYTkzNWRkYThjMzAifQ=="]}, "header": {"height": 1}}},'\
' "type": "new_block"}, "name": "NewBlock"}}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert not event_queue.empty()
@pytest.mark.tendermint
def test_process_event_empty_block():
from bigchaindb.tendermint.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\
'"result": {"name": "NewBlock", "data": {"type": "new_block",'\
' "data": {"block": {"header": {"chain_id": "test-chain-cbVRwC",'\
' "height": 1, "time": "2017-12-04T22:42:54.33+05:30", "num_txs": 0,'\
' "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\
' "last_commit_hash": "", "data_hash": "",'\
' "validators_hash": "ACF23A690EB72D051931E878E8F3D6E01A17A81C",'\
' "app_hash": ""}, "data": {"txs": []}, "last_commit": {"blockID": '\
' {"hash": "", "parts": {"total": 0, "hash": ""}}, "precommits": []}}}}}}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()
@pytest.mark.tendermint
def test_process_unknown_event():
from bigchaindb.tendermint.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\
' "result": {"name": "UnknownEvent"}}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()

View File

@ -7,6 +7,8 @@ from unittest.mock import patch
import pytest import pytest
pytestmark = pytest.mark.tendermint
@pytest.fixture @pytest.fixture
def _block(b, request): def _block(b, request):