Merge branch 'master' into tm_version_compatible

This commit is contained in:
Ahmed Muawia Khan 2018-09-13 12:07:24 +02:00
commit 73352c8793
12 changed files with 52 additions and 240 deletions

View File

@ -5,5 +5,5 @@ RUN pip install --upgrade \
pycco \
websocket-client~=0.47.0 \
pytest~=3.0 \
bigchaindb-driver==0.5.2 \
bigchaindb-driver~=0.5.0 \
blns

View File

@ -28,6 +28,7 @@ from bigchaindb.lib import Block, PreCommitState
from bigchaindb.backend.query import PRE_COMMIT_ID
from bigchaindb.upsert_validator import ValidatorElection
import bigchaindb.upsert_validator.validator_utils as vutils
from bigchaindb.events import EventTypes, Event
CodeTypeOk = 0
@ -43,7 +44,8 @@ class App(BaseApplication):
State Machine.
"""
def __init__(self, bigchaindb=None):
def __init__(self, bigchaindb=None, events_queue=None):
self.events_queue = events_queue
self.bigchaindb = bigchaindb or BigchainDB()
self.block_txn_ids = []
self.block_txn_hash = ''
@ -258,4 +260,12 @@ class App(BaseApplication):
'height=%s, txn ids=%s', data, self.new_height,
self.block_txn_ids)
logger.benchmark('COMMIT_BLOCK, height:%s', self.new_height)
if self.events_queue:
event = Event(EventTypes.BLOCK_VALID, {
'height': self.new_height,
'transactions': self.block_transactions
})
self.events_queue.put(event)
return ResponseCommit(data=data)

View File

@ -1,89 +0,0 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import asyncio
import json
import logging
import time
import aiohttp
from bigchaindb import config
from bigchaindb.common.utils import gen_timestamp
from bigchaindb.events import EventTypes, Event
from bigchaindb.tendermint_utils import decode_transaction_base64
HOST = config['tendermint']['host']
PORT = config['tendermint']['port']
URL = 'ws://{}:{}/websocket'.format(HOST, PORT)
logger = logging.getLogger(__name__)
@asyncio.coroutine
def connect_and_recv(event_queue):
session = aiohttp.ClientSession()
ws = yield from session.ws_connect(URL)
logger.info('Connected to tendermint ws server')
stream_id = 'bigchaindb_stream_{}'.format(gen_timestamp())
yield from subscribe_events(ws, stream_id)
while True:
msg = yield from ws.receive()
process_event(event_queue, msg.data, stream_id)
if msg.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR):
session.close()
raise aiohttp.ClientConnectionError()
def process_event(event_queue, event, stream_id):
event_stream_id = stream_id + '#event'
event = json.loads(event)
if (event['id'] == event_stream_id and event['result']['query'] == 'tm.event=\'NewBlock\''):
block = event['result']['data']['value']['block']
block_id = block['header']['height']
block_txs = block['data']['txs']
# Only push non empty blocks
if block_txs:
block_txs = [decode_transaction_base64(txn) for txn in block_txs]
new_block = {'height': block_id, 'transactions': block_txs}
event = Event(EventTypes.BLOCK_VALID, new_block)
event_queue.put(event)
@asyncio.coroutine
def subscribe_events(ws, stream_id):
payload = {
'method': 'subscribe',
'jsonrpc': '2.0',
'params': ['tm.event=\'NewBlock\''],
'id': stream_id
}
yield from ws.send_str(json.dumps(payload))
@asyncio.coroutine
def try_connect_and_recv(event_queue):
try:
yield from connect_and_recv(event_queue)
except Exception as e:
logger.warning('WebSocket connection failed with exception %s', e)
time.sleep(3)
yield from try_connect_and_recv(event_queue)
def start(event_queue):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(try_connect_and_recv(event_queue))
except (KeyboardInterrupt, SystemExit):
logger.info('Shutting down Tendermint event stream connection')

View File

@ -9,7 +9,6 @@ import bigchaindb
from bigchaindb.lib import BigchainDB
from bigchaindb.core import App
from bigchaindb.web import server, websocket_server
from bigchaindb import event_stream
from bigchaindb.events import Exchange, EventTypes
from bigchaindb.utils import Process
@ -57,13 +56,6 @@ def start():
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
p_websocket_server.start()
# connect to tendermint event stream
p_websocket_client = Process(name='bigchaindb_ws_to_tendermint',
target=event_stream.start,
daemon=True,
args=(exchange.get_publisher_queue(),))
p_websocket_client.start()
p_exchange = Process(name='bigchaindb_exchange', target=exchange.run, daemon=True)
p_exchange.start()
@ -75,7 +67,7 @@ def start():
setproctitle.setproctitle('bigchaindb')
# Start the ABCIServer
app = ABCIServer(app=App())
app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue()))
app.run()

View File

@ -50,13 +50,13 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
def eventify_block(block):
for tx in block['transactions']:
try:
asset_id = tx['asset']['id']
except KeyError:
asset_id = tx['id']
if tx.asset:
asset_id = tx.asset.get('id', tx.id)
else:
asset_id = tx.id
yield {'height': block['height'],
'asset_id': asset_id,
'transaction_id': tx['id']}
'transaction_id': tx.id}
class Dispatcher:

View File

@ -60,9 +60,9 @@ extensions = [
# autodoc settings
autodoc_member_order = 'bysource'
autodoc_default_flags = [
'members',
]
autodoc_default_options = {
'members': None,
}
todo_include_todos = True

View File

@ -90,7 +90,7 @@ Valid Transactions
Streams an event for any newly valid transactions committed to a block. Message
bodies contain the transaction's ID, associated asset ID, and containing
block's ID.
block's height.
Example message:
@ -99,7 +99,7 @@ Example message:
{
"transaction_id": "<sha3-256 hash>",
"asset_id": "<sha3-256 hash>",
"block_id": "<int>"
"height": <int>
}

View File

@ -44,7 +44,7 @@ dev_require = [
]
docs_require = [
'Sphinx>=1.4.8',
'Sphinx~=1.0',
'recommonmark>=0.4.0',
'sphinx-rtd-theme>=0.1.9',
'sphinxcontrib-httpdomain>=1.5.0',

View File

@ -236,19 +236,21 @@ def test_check_tx__unsigned_create_is_error(b):
assert result.code == CodeTypeError
def test_deliver_tx__valid_create_updates_db(b, init_chain_request):
def test_deliver_tx__valid_create_updates_db_and_emits_event(b, init_chain_request):
import multiprocessing as mp
from bigchaindb import App
from bigchaindb.models import Transaction
from bigchaindb.common.crypto import generate_key_pair
alice = generate_key_pair()
bob = generate_key_pair()
events = mp.Queue()
tx = Transaction.create([alice.public_key],
[([bob.public_key], 1)])\
.sign([alice.private_key])
app = App(b)
app = App(b, events)
app.init_chain(init_chain_request)
@ -261,6 +263,8 @@ def test_deliver_tx__valid_create_updates_db(b, init_chain_request):
app.end_block(RequestEndBlock(height=99))
app.commit()
assert b.get_transaction(tx.id).id == tx.id
block_event = events.get()
assert block_event.data['transactions'] == [tx]
# unspent_outputs = b.get_unspent_outputs()
# unspent_output = next(unspent_outputs)

View File

@ -1,111 +0,0 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import json
import base64
from queue import Queue
from aiohttp import ClientSession
import pytest
def test_process_event_new_block():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event", "result": {'\
'"query": "tm.event=\'NewBlock\'", "data": { "type": "CF18EA939D3240",'\
'"value": { "block": { "header": { "chain_id": "test-chain-ipQIAa",'\
'"height": 1, "time": "2018-04-23T14:49:30.509920098Z", "num_txs": 1,'\
'"last_block_id": { "hash": "", "parts": { "total": 0, "hash": "" }},'\
'"total_txs": 1, "last_commit_hash": "", "data_hash": "38792142CE6D7F6F46F71777CB53F94CD9497B23",'\
'"validators_hash": "BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF",'\
'"consensus_hash": "F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "",'\
'"last_results_hash": "", "evidence_hash": "" }, "data": {"txs": ['\
'"eyJpbnB1dHMiOiBbeyJvd25lcnNfYmVmb3JlIjogWyJFb2Z0Z0FNd2hKQXM0cW81b'\
'0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZSJdLCAiZnVsZmlsbHMiOiBudWxsLCA'\
'iZnVsZmlsbG1lbnQiOiAicEdTQUlNMGNueFFGeTZrSE1PcGxBbzh1ZncwNDlsZ2VxN'\
'HBOeDFNdksya0pjRjBCZ1VETjN2RTlsWmhaT21jMWZHbFpLUFZmZDdCTi1RVTdBa0N'\
'TZ1NKWVRPYzB3YVlmQ1RXc1FQS1VmOE5fODFKd21YOUJxcnlLejYyTmVubHg0dGszN'\
'GtVRCJ9XSwgIm91dHB1dHMiOiBbeyJwdWJsaWNfa2V5cyI6IFsiRW9mdGdBTXdoSkF'\
'zNHFvNW9HYTlNRllxeXRaeVhHWjZlZmRVWHNXV0w3ZmUiXSwgImNvbmRpdGlvbiI6I'\
'HsiZGV0YWlscyI6IHsidHlwZSI6ICJlZDI1NTE5LXNoYS0yNTYiLCAicHVibGljX2t'\
'leSI6ICJFb2Z0Z0FNd2hKQXM0cW81b0dhOU1GWXF5dFp5WEdaNmVmZFVYc1dXTDdmZ'\
'SJ9LCAidXJpIjogIm5pOi8vL3NoYS0yNTY7cFJZWTJQQUE0S3dHd0dUNVQtUXRCQUY'\
'0VWY1WG5JcVkxWmExVER0N0hMQT9mcHQ9ZWQyNTUxOS1zaGEtMjU2JmNvc3Q9MTMxM'\
'DcyIn0sICJhbW91bnQiOiAiMSJ9XSwgIm9wZXJhdGlvbiI6ICJDUkVBVEUiLCAibWV'\
'0YWRhdGEiOiBudWxsLCAiYXNzZXQiOiB7ImRhdGEiOiBudWxsfSwgInZlcnNpb24iO'\
'iAiMi4wIiwgImlkIjogImUwMmM0ZWM3MmExYzUzMmJkNjUyNWZkNGMxODU3ZDhmN2E'\
'wYWVkYTgyNGVjY2NhZGY4NTlmNzc0Zjc3ZTgwZGUifQ=="]}, "evidence": {'\
'"evidence": null}, "last_commit": { "blockID": { "hash": "", "parts":'\
'{"total": 0, "hash": ""} }, "precommits": null } } } } } }'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert not event_queue.empty()
block = event_queue.get()
assert isinstance(block.data['height'], int)
def test_process_event_empty_block():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "bigchaindb_stream_1524555674#event",'\
'"result": {"query": "tm.event=\'NewBlock\'", "data": {"type": '\
'"CF18EA939D3240", "value": {"block": {"header": {"chain_id": '\
'"test-chain-ipQIAa", "height": 1, "time": "2018-04-24T07:41:16.838038877Z",'\
'"num_txs": 0, "last_block_id": {"hash": "", "parts": {"total": 0, "hash": ""}},'\
'"total_txs": 0, "last_commit_hash": "", "data_hash": "", "validators_hash":'\
'"BF0D0EC2E13C76E69FA572516B6D93E64F3C58EF", "consensus_hash": '\
'"F66EF1DF8BA6DAC7A1ECCE40CC84E54A1CEBC6A5", "app_hash": "", '\
'"last_results_hash": "", "evidence_hash": ""}, "data": {"txs": null},'\
'"evidence": {"evidence": null}, "last_commit": {"blockID": {"hash": "", '\
'"parts": {"total": 0, "hash": ""}}, "precommits": null}}}}}}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()
def test_process_unknown_event():
from bigchaindb.event_stream import process_event
event = '{"jsonrpc": "2.0", "id": "test_stream_id#event",'\
' "result": { "query": "tm.event=\'UnknownEvent\'" }}'
event_queue = Queue()
process_event(event_queue, event, 'test_stream_id')
assert event_queue.empty()
@pytest.mark.asyncio
@pytest.mark.abci
async def test_subscribe_events(tendermint_ws_url, b):
from bigchaindb.event_stream import subscribe_events
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.models import Transaction
session = ClientSession()
ws = await session.ws_connect(tendermint_ws_url)
stream_id = 'bigchaindb_stream_01'
await subscribe_events(ws, stream_id)
msg = await ws.receive()
assert msg.data
msg_data_dict = json.loads(msg.data)
assert msg_data_dict['id'] == stream_id
assert msg_data_dict['jsonrpc'] == '2.0'
assert msg_data_dict['result'] == {}
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)],
asset=None)\
.sign([alice.private_key])
b.post_transaction(tx, 'broadcast_tx_async')
msg = await ws.receive()
msg_data_dict = json.loads(msg.data)
raw_txn = msg_data_dict['result']['data']['value']['block']['data']['txs'][0]
transaction = json.loads(base64.b64decode(raw_txn).decode('utf8'))
assert transaction == tx.to_dict()

View File

@ -229,6 +229,7 @@ def test_upsert_validator(b, node_key, node_keys, ed25519_node_keys):
latest_block = b.get_latest_block()
# reset the validator set
b.store_validator_set(latest_block['height'], validators)
generate_block(b)
power = 1
public_key = '9B3119650DF82B9A5D8A12E38953EA47475C09F0C48A4E6A0ECE182944B24403'

View File

@ -21,25 +21,30 @@ class MockWebSocket:
def test_eventify_block_works_with_any_transaction():
from bigchaindb.web.websocket_server import eventify_block
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.lib import Transaction
block = {
'height': 1,
'transactions': [{
'id': 1
}, {
'id': 2,
'asset': {'id': 1}
}]
}
alice = generate_key_pair()
tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)])\
.sign([alice.private_key])
tx_transfer = Transaction.transfer(tx.to_inputs(),
[([alice.public_key], 1)],
asset_id=tx.id)\
.sign([alice.private_key])
block = {'height': 1,
'transactions': [tx, tx_transfer]}
expected_events = [{
'height': 1,
'asset_id': 1,
'transaction_id': 1
'asset_id': tx.id,
'transaction_id': tx.id
}, {
'height': 1,
'asset_id': 1,
'transaction_id': 2
'asset_id': tx_transfer.asset['id'],
'transaction_id': tx_transfer.id
}]
for event, expected in zip(eventify_block(block), expected_events):
@ -144,7 +149,7 @@ def test_websocket_block_event(b, test_client, loop):
app = init_app(event_source, loop=loop)
client = yield from test_client(app)
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
block = {'height': 1, 'transactions': [tx.to_dict()]}
block = {'height': 1, 'transactions': [tx]}
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
yield from event_source.put(block_event)
@ -152,9 +157,9 @@ def test_websocket_block_event(b, test_client, loop):
for tx in block['transactions']:
result = yield from ws.receive()
json_result = json.loads(result.data)
assert json_result['transaction_id'] == tx['id']
assert json_result['transaction_id'] == tx.id
# Since the transactions are all CREATEs, asset id == transaction id
assert json_result['asset_id'] == tx['id']
assert json_result['asset_id'] == tx.id
assert json_result['height'] == block['height']
yield from event_source.put(POISON_PILL)