mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Problem: Event stream api crashing (#2173)
Solution: Fix Exchange initialization
This commit is contained in:
parent
7683ea00fc
commit
ea8ac4b80e
@ -35,8 +35,6 @@ BANNER = """
|
|||||||
def start():
|
def start():
|
||||||
# Exchange object for event stream api
|
# Exchange object for event stream api
|
||||||
exchange = Exchange()
|
exchange = Exchange()
|
||||||
p_exchange = Process(name='exchange', target=exchange.run)
|
|
||||||
p_exchange.start()
|
|
||||||
|
|
||||||
# start the web api
|
# start the web api
|
||||||
app_server = server.create_server(
|
app_server = server.create_server(
|
||||||
@ -61,6 +59,9 @@ def start():
|
|||||||
args=(exchange.get_publisher_queue(),))
|
args=(exchange.get_publisher_queue(),))
|
||||||
p_websocket_client.start()
|
p_websocket_client.start()
|
||||||
|
|
||||||
|
p_exchange = Process(name='exchange', target=exchange.run)
|
||||||
|
p_exchange.start()
|
||||||
|
|
||||||
# We need to import this after spawning the web server
|
# We need to import this after spawning the web server
|
||||||
# because import ABCIServer will monkeypatch all sockets
|
# because import ABCIServer will monkeypatch all sockets
|
||||||
# for gevent.
|
# for gevent.
|
||||||
|
@ -96,9 +96,9 @@ class Dispatcher:
|
|||||||
elif event.type == EventTypes.BLOCK_VALID:
|
elif event.type == EventTypes.BLOCK_VALID:
|
||||||
block = event.data
|
block = event.data
|
||||||
|
|
||||||
for tx in block['block']['transactions']:
|
for tx in block['transactions']:
|
||||||
asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id']
|
asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id']
|
||||||
data = {'block_id': block['id'],
|
data = {'height': block['height'],
|
||||||
'asset_id': asset_id,
|
'asset_id': asset_id,
|
||||||
'transaction_id': tx['id']}
|
'transaction_id': tx['id']}
|
||||||
str_buffer.append(json.dumps(data))
|
str_buffer.append(json.dumps(data))
|
||||||
|
@ -121,23 +121,29 @@ def test_websocket_string_event(test_client, loop):
|
|||||||
def test_websocket_block_event(b, _block, test_client, loop):
|
def test_websocket_block_event(b, _block, test_client, loop):
|
||||||
from bigchaindb import events
|
from bigchaindb import events
|
||||||
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
||||||
|
from bigchaindb.models import Transaction
|
||||||
|
from bigchaindb.common import crypto
|
||||||
|
|
||||||
|
user_priv, user_pub = crypto.generate_key_pair()
|
||||||
|
tx = Transaction.create([user_pub], [([user_pub], 1)])
|
||||||
|
tx = tx.sign([user_priv])
|
||||||
|
|
||||||
event_source = asyncio.Queue(loop=loop)
|
event_source = asyncio.Queue(loop=loop)
|
||||||
app = init_app(event_source, loop=loop)
|
app = init_app(event_source, loop=loop)
|
||||||
client = yield from test_client(app)
|
client = yield from test_client(app)
|
||||||
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
|
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
|
||||||
block = _block.to_dict()
|
block = {'height': 1, 'transactions': [tx.to_dict()]}
|
||||||
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
||||||
|
|
||||||
yield from event_source.put(block_event)
|
yield from event_source.put(block_event)
|
||||||
|
|
||||||
for tx in block['block']['transactions']:
|
for tx in block['transactions']:
|
||||||
result = yield from ws.receive()
|
result = yield from ws.receive()
|
||||||
json_result = json.loads(result.data)
|
json_result = json.loads(result.data)
|
||||||
assert json_result['transaction_id'] == tx['id']
|
assert json_result['transaction_id'] == tx['id']
|
||||||
# Since the transactions are all CREATEs, asset id == transaction id
|
# Since the transactions are all CREATEs, asset id == transaction id
|
||||||
assert json_result['asset_id'] == tx['id']
|
assert json_result['asset_id'] == tx['id']
|
||||||
assert json_result['block_id'] == block['id']
|
assert json_result['height'] == block['height']
|
||||||
|
|
||||||
yield from event_source.put(POISON_PILL)
|
yield from event_source.put(POISON_PILL)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user