Add WebSocket server

This commit is contained in:
vrde 2017-04-07 09:16:22 +02:00
parent 64a033b17a
commit f23faaa65f
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
7 changed files with 246 additions and 59 deletions

View File

@ -81,7 +81,7 @@ class Election:
elif result['status'] == self.bigchain.BLOCK_VALID: elif result['status'] == self.bigchain.BLOCK_VALID:
event_type = EventTypes.BLOCK_VALID event_type = EventTypes.BLOCK_VALID
event = Event(event_type, {'block_id': block_id}) event = Event(event_type, self.bigchain.get_block(block_id))
self.event_handler.put_event(event) self.event_handler.put_event(event)

View File

@ -3,9 +3,8 @@ import multiprocessing as mp
import bigchaindb import bigchaindb
from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.pipelines.events_consumer_example import events_consumer
from bigchaindb.events import setup_events_queue from bigchaindb.events import setup_events_queue
from bigchaindb.web import server from bigchaindb.web import server, websocket_server
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -52,10 +51,11 @@ def start():
p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi = mp.Process(name='webapi', target=app_server.run)
p_webapi.start() p_webapi.start()
# start the example events consumer logger.info('WebSocket server started')
logger.info('Starting the events consumer example') p_websocket_server = mp.Process(name='ws',
p_events_consumer = events_consumer(events_queue) target=websocket_server.start,
p_events_consumer.start() args=(events_queue,))
p_websocket_server.start()
# start message # start message
logger.info(BANNER.format(bigchaindb.config['server']['bind'])) logger.info(BANNER.format(bigchaindb.config['server']['bind']))

View File

@ -25,4 +25,4 @@ def base_url():
def base_ws_uri(): def base_ws_uri():
"""Base websocket uri.""" """Base websocket uri."""
return '%s://%s/' % ('ws', request.environ['HTTP_HOST']) return 'ws://localhost:9985/'

View File

@ -1,15 +1,64 @@
"""WebSocket server for the BigchainDB Event Stream API.""" """WebSocket server for the BigchainDB Event Stream API."""
# NOTE
#
# This module contains some functions and utilities that might belong to other
# modules. For now, I prefer to keep everything in this module. Why? Because
# those functions are needed only here.
#
# When we will extend this part of the project and we find that we need those
# functionalities elsewhere, we can start creating new modules and organizing
# things in a better way.
import json
import asyncio import asyncio
import logging import logging
import threading
from uuid import uuid4 from uuid import uuid4
import aiohttp import aiohttp
from aiohttp import web from aiohttp import web
from bigchaindb.events import EventTypes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
POISON_PILL = 'POISON_PILL' POISON_PILL = 'POISON_PILL'
EVENTS_ENDPOINT = '/api/v1/streams/'
def _put_into_capped_queue(queue, value):
"""Put a new item in a capped queue.
If the queue reached its limit, get the first element
ready and put the new one. Note that the first element
will be lost (that's the purpose of a capped queue).
Args:
queue: a queue
value: the value to put
"""
while True:
try:
queue.put_nowait(value)
return
except asyncio.QueueFull:
queue.get_nowait()
def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
"""Bridge between a synchronous multiprocessing queue
and an asynchronous asyncio queue.
Args:
in_queue (multiprocessing.Queue): input queue
out_queue (asyncio.Queue): output queue
"""
while True:
value = in_queue.get()
loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value)
class Dispatcher: class Dispatcher:
@ -45,10 +94,27 @@ class Dispatcher:
while True: while True:
event = yield from self.event_source.get() event = yield from self.event_source.get()
str_buffer = []
if event == POISON_PILL: if event == POISON_PILL:
return return
for uuid, websocket in self.subscribers.items():
websocket.send_str(event) if isinstance(event, str):
str_buffer.append(event)
elif event.type == EventTypes.BLOCK_VALID:
block = event.data
for tx in block['block']['transactions']:
asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id']
data = {'blockid': block['id'],
'assetid': asset_id,
'txid': tx['id']}
str_buffer.append(json.dumps(data))
for _, websocket in self.subscribers.items():
for str_item in str_buffer:
websocket.send_str(str_item)
@asyncio.coroutine @asyncio.coroutine
@ -83,37 +149,22 @@ def init_app(event_source, *, loop=None):
app = web.Application(loop=loop) app = web.Application(loop=loop)
app['dispatcher'] = dispatcher app['dispatcher'] = dispatcher
app.router.add_get('/', websocket_handler) app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
return app return app
def start(event_source, *, loop=None): def start(sync_event_source, loop=None):
"""Create and start the WebSocket server.""" """Create and start the WebSocket server."""
if not loop: if not loop:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
event_source = asyncio.Queue(maxsize=1024, loop=loop)
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
args=(sync_event_source, event_source, loop),
daemon=True)
bridge.start()
app = init_app(event_source, loop=loop) app = init_app(event_source, loop=loop)
aiohttp.web.run_app(app, port=9985) aiohttp.web.run_app(app, port=9985)
def test_websocket_server():
"""Set up a server and output a message every second.
Used for testing purposes."""
@asyncio.coroutine
def constant_event_source(event_source):
"""Put a message in ``event_source`` every second."""
while True:
yield from asyncio.sleep(1)
yield from event_source.put('meow')
loop = asyncio.get_event_loop()
event_source = asyncio.Queue()
loop.create_task(constant_event_source(event_source))
start(event_source, loop=loop)
if __name__ == '__main__':
test_websocket_server()

View File

@ -77,6 +77,7 @@ install_requires = [
'multipipes~=0.1.0', 'multipipes~=0.1.0',
'jsonschema~=2.5.1', 'jsonschema~=2.5.1',
'pyyaml~=3.12', 'pyyaml~=3.12',
'aiohttp~=2.0',
] ]
setup( setup(

View File

@ -31,6 +31,6 @@ def test_api_v1_endpoint(client):
'self': 'http://localhost/api/v1/', 'self': 'http://localhost/api/v1/',
'statuses': 'http://localhost/api/v1/statuses/', 'statuses': 'http://localhost/api/v1/statuses/',
'transactions': 'http://localhost/api/v1/transactions/', 'transactions': 'http://localhost/api/v1/transactions/',
'streams_v1': 'ws://localhost/api/v1/streams/', 'streams_v1': 'ws://localhost:9985/api/v1/streams/',
} }
} }

View File

@ -1,6 +1,23 @@
import json
import random
import pytest import pytest
import asyncio import asyncio
from bigchaindb.models import Transaction
def create_block(b, total=1):
transactions = [
Transaction.create(
[b.me],
[([b.me], 1)],
metadata={'msg': random.random()},
).sign([b.me_private])
for _ in range(total)
]
return b.create_block(transactions)
class MockWebSocket: class MockWebSocket:
def __init__(self): def __init__(self):
@ -11,39 +28,100 @@ class MockWebSocket:
@asyncio.coroutine @asyncio.coroutine
@pytest.mark.skipif(reason='This test raises a RuntimeError, dunno how to solve it now.') def test_bridge_sync_async_queue(loop):
def test_dispatcher(loop): import queue
from bigchaindb.web.websocket_server import Dispatcher, POISON_PILL import threading
from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio
ws0 = MockWebSocket() sync_queue = queue.Queue()
ws1 = MockWebSocket() async_queue = asyncio.Queue(loop=loop)
event_source = asyncio.Queue(loop=loop) bridge = threading.Thread(target=_multiprocessing_to_asyncio,
dispatcher = Dispatcher(event_source) args=(sync_queue, async_queue, loop),
daemon=True)
bridge.start()
dispatcher.subscribe(0, ws0) sync_queue.put('fahren')
dispatcher.subscribe(1, ws1) sync_queue.put('auf')
sync_queue.put('der')
sync_queue.put('Autobahn')
yield from event_source.put('hack') result = yield from async_queue.get()
yield from event_source.put('the') assert result == 'fahren'
yield from event_source.put('planet!') result = yield from async_queue.get()
yield from event_source.put(POISON_PILL) assert result == 'auf'
loop.run_until_complete(dispatcher.publish()) result = yield from async_queue.get()
assert result == 'der'
assert ws0.received == ['hack', 'the', 'planet!'] result = yield from async_queue.get()
assert ws1.received == ['planet!'] assert result == 'Autobahn'
assert async_queue.qsize() == 0
@asyncio.coroutine @asyncio.coroutine
def test_websocket(test_client, loop): def test_put_into_capped_queue(loop):
from bigchaindb.web.websocket_server import init_app, POISON_PILL from bigchaindb.web.websocket_server import _put_into_capped_queue
q = asyncio.Queue(maxsize=2, loop=loop)
_put_into_capped_queue(q, 'Friday')
assert q._queue[0] == 'Friday'
_put_into_capped_queue(q, "I'm")
assert q._queue[0] == 'Friday'
assert q._queue[1] == "I'm"
_put_into_capped_queue(q, 'in')
assert q._queue[0] == "I'm"
assert q._queue[1] == 'in'
_put_into_capped_queue(q, 'love')
assert q._queue[0] == 'in'
assert q._queue[1] == 'love'
@asyncio.coroutine
def test_capped_queue(loop):
import queue
import threading
import time
from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio
sync_queue = queue.Queue()
async_queue = asyncio.Queue(maxsize=2, loop=loop)
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
args=(sync_queue, async_queue, loop),
daemon=True)
bridge.start()
sync_queue.put('we')
sync_queue.put('are')
sync_queue.put('the')
sync_queue.put('robots')
# Wait until the thread processes all the items
time.sleep(1)
result = yield from async_queue.get()
assert result == 'the'
result = yield from async_queue.get()
assert result == 'robots'
assert async_queue.qsize() == 0
@asyncio.coroutine
def test_websocket_string_event(test_client, loop):
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
event_source = asyncio.Queue(loop=loop) event_source = asyncio.Queue(loop=loop)
app = init_app(event_source, loop=loop) app = init_app(event_source, loop=loop)
client = yield from test_client(app) client = yield from test_client(app)
ws = yield from client.ws_connect('/') ws = yield from client.ws_connect(EVENTS_ENDPOINT)
yield from event_source.put('hack') yield from event_source.put('hack')
yield from event_source.put('the') yield from event_source.put('the')
@ -62,15 +140,72 @@ def test_websocket(test_client, loop):
@asyncio.coroutine @asyncio.coroutine
@pytest.mark.skipif(reason="Still don't understand how to trigger custom errors.") def test_websocket_block_event(b, test_client, loop):
def test_websocket_error(test_client, loop): from bigchaindb import events
from bigchaindb.web.websocket_server import init_app, POISON_PILL from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
event_source = asyncio.Queue(loop=loop) event_source = asyncio.Queue(loop=loop)
app = init_app(event_source, loop=loop) app = init_app(event_source, loop=loop)
client = yield from test_client(app) client = yield from test_client(app)
ws = yield from client.ws_connect('/') ws = yield from client.ws_connect(EVENTS_ENDPOINT)
block = create_block(b, 10).to_dict()
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
yield from ws.close() yield from event_source.put(block_event)
for tx in block['block']['transactions']:
result = yield from ws.receive()
json_result = json.loads(result.data)
assert json_result['txid'] == tx['id']
# Since the transactions are all CREATEs, asset id == transaction id
assert json_result['assetid'] == tx['id']
assert json_result['blockid'] == block['id']
yield from event_source.put(POISON_PILL) yield from event_source.put(POISON_PILL)
@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang')
@pytest.mark.genesis
def test_integration_from_webapi_to_websocket(monkeypatch, client, loop):
# XXX: I think that the `pytest-aiohttp` plugin is sparkling too much
# magic in the `asyncio` module: running this test without monkey-patching
# `asycio.get_event_loop` (and without the `loop` fixture) raises a:
# RuntimeError: There is no current event loop in thread 'MainThread'.
#
# That's pretty weird because this test doesn't use the pytest-aiohttp
# plugin explicitely.
monkeypatch.setattr('asyncio.get_event_loop', lambda: loop)
import json
import random
import aiohttp
from bigchaindb.common import crypto
from bigchaindb import processes
from bigchaindb.models import Transaction
# Start BigchainDB
processes.start()
loop = asyncio.get_event_loop()
import time
time.sleep(1)
ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1']
# Connect to the WebSocket endpoint
session = aiohttp.ClientSession()
ws = loop.run_until_complete(session.ws_connect(ws_url))
# Create a keypair and generate a new asset
user_priv, user_pub = crypto.generate_key_pair()
asset = {'random': random.random()}
tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset)
tx = tx.sign([user_priv])
# Post the transaction to the BigchainDB Web API
client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict()))
result = loop.run_until_complete(ws.receive())
json_result = json.loads(result.data)
assert json_result['txid'] == tx.id