diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py index 5507f504..0aa51ecb 100644 --- a/bigchaindb/web/websocket_server.py +++ b/bigchaindb/web/websocket_server.py @@ -29,26 +29,6 @@ POISON_PILL = 'POISON_PILL' EVENTS_ENDPOINT = '/api/v1/streams/valid_tx' -def _put_into_capped_queue(queue, value): - """Put a new item in a capped queue. - - If the queue reached its limit, get the first element - ready and put the new one. Note that the first element - will be lost (that's the purpose of a capped queue). - - Args: - queue: a queue - value: the value to put - """ - while True: - try: - queue.put_nowait(value) - except asyncio.QueueFull: - queue.get_nowait() - else: - return - - def _multiprocessing_to_asyncio(in_queue, out_queue, loop): """Bridge between a synchronous multiprocessing queue and an asynchronous asyncio queue. @@ -60,7 +40,7 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop): while True: value = in_queue.get() - loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value) + loop.call_soon_threadsafe(out_queue.put_nowait, value) class Dispatcher: @@ -161,7 +141,7 @@ def start(sync_event_source, loop=None): if not loop: loop = asyncio.get_event_loop() - event_source = asyncio.Queue(maxsize=1024, loop=loop) + event_source = asyncio.Queue(loop=loop) bridge = threading.Thread(target=_multiprocessing_to_asyncio, args=(sync_event_source, event_source, loop), diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 6484ef4e..f25e183f 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -3,7 +3,6 @@ import json import queue import random import threading -import time from unittest.mock import patch import pytest @@ -64,56 +63,6 @@ def test_bridge_sync_async_queue(loop): assert async_queue.qsize() == 0 -@asyncio.coroutine -def test_put_into_capped_queue(loop): - from bigchaindb.web.websocket_server import _put_into_capped_queue - q = asyncio.Queue(maxsize=2, loop=loop) - - _put_into_capped_queue(q, 'Friday') - assert q._queue[0] == 'Friday' - - _put_into_capped_queue(q, "I'm") - assert q._queue[0] == 'Friday' - assert q._queue[1] == "I'm" - - _put_into_capped_queue(q, 'in') - assert q._queue[0] == "I'm" - assert q._queue[1] == 'in' - - _put_into_capped_queue(q, 'love') - assert q._queue[0] == 'in' - assert q._queue[1] == 'love' - - -@asyncio.coroutine -def test_capped_queue(loop): - from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio - - sync_queue = queue.Queue() - async_queue = asyncio.Queue(maxsize=2, loop=loop) - - bridge = threading.Thread(target=_multiprocessing_to_asyncio, - args=(sync_queue, async_queue, loop), - daemon=True) - bridge.start() - - sync_queue.put('we') - sync_queue.put('are') - sync_queue.put('the') - sync_queue.put('robots') - - # Wait until the thread processes all the items - time.sleep(1) - - result = yield from async_queue.get() - assert result == 'the' - - result = yield from async_queue.get() - assert result == 'robots' - - assert async_queue.qsize() == 0 - - @patch('threading.Thread') @patch('aiohttp.web.run_app') @patch('bigchaindb.web.websocket_server.init_app')