From f94a1e020c2f77ef412efa0859414bdd48112828 Mon Sep 17 00:00:00 2001
From: vrde <agranzot@gmail.com>
Date: Tue, 18 Apr 2017 15:57:50 +0200
Subject: [PATCH] Dispatcher is a consumer, no capped queue needed

---
 bigchaindb/web/websocket_server.py | 24 ++------------
 tests/web/test_websocket_server.py | 51 ------------------------------
 2 files changed, 2 insertions(+), 73 deletions(-)

diff --git a/bigchaindb/web/websocket_server.py b/bigchaindb/web/websocket_server.py
index 5507f504..0aa51ecb 100644
--- a/bigchaindb/web/websocket_server.py
+++ b/bigchaindb/web/websocket_server.py
@@ -29,26 +29,6 @@ POISON_PILL = 'POISON_PILL'
 EVENTS_ENDPOINT = '/api/v1/streams/valid_tx'
 
 
-def _put_into_capped_queue(queue, value):
-    """Put a new item in a capped queue.
-
-    If the queue reached its limit, get the first element
-    ready and put the new one. Note that the first element
-    will be lost (that's the purpose of a capped queue).
-
-    Args:
-        queue: a queue
-        value: the value to put
-    """
-    while True:
-        try:
-            queue.put_nowait(value)
-        except asyncio.QueueFull:
-            queue.get_nowait()
-        else:
-            return
-
-
 def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
     """Bridge between a synchronous multiprocessing queue
     and an asynchronous asyncio queue.
@@ -60,7 +40,7 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
 
     while True:
         value = in_queue.get()
-        loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value)
+        loop.call_soon_threadsafe(out_queue.put_nowait, value)
 
 
 class Dispatcher:
@@ -161,7 +141,7 @@ def start(sync_event_source, loop=None):
     if not loop:
         loop = asyncio.get_event_loop()
 
-    event_source = asyncio.Queue(maxsize=1024, loop=loop)
+    event_source = asyncio.Queue(loop=loop)
 
     bridge = threading.Thread(target=_multiprocessing_to_asyncio,
                               args=(sync_event_source, event_source, loop),
diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py
index 6484ef4e..f25e183f 100644
--- a/tests/web/test_websocket_server.py
+++ b/tests/web/test_websocket_server.py
@@ -3,7 +3,6 @@ import json
 import queue
 import random
 import threading
-import time
 from unittest.mock import patch
 
 import pytest
@@ -64,56 +63,6 @@ def test_bridge_sync_async_queue(loop):
     assert async_queue.qsize() == 0
 
 
-@asyncio.coroutine
-def test_put_into_capped_queue(loop):
-    from bigchaindb.web.websocket_server import _put_into_capped_queue
-    q = asyncio.Queue(maxsize=2, loop=loop)
-
-    _put_into_capped_queue(q, 'Friday')
-    assert q._queue[0] == 'Friday'
-
-    _put_into_capped_queue(q, "I'm")
-    assert q._queue[0] == 'Friday'
-    assert q._queue[1] == "I'm"
-
-    _put_into_capped_queue(q, 'in')
-    assert q._queue[0] == "I'm"
-    assert q._queue[1] == 'in'
-
-    _put_into_capped_queue(q, 'love')
-    assert q._queue[0] == 'in'
-    assert q._queue[1] == 'love'
-
-
-@asyncio.coroutine
-def test_capped_queue(loop):
-    from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio
-
-    sync_queue = queue.Queue()
-    async_queue = asyncio.Queue(maxsize=2, loop=loop)
-
-    bridge = threading.Thread(target=_multiprocessing_to_asyncio,
-                              args=(sync_queue, async_queue, loop),
-                              daemon=True)
-    bridge.start()
-
-    sync_queue.put('we')
-    sync_queue.put('are')
-    sync_queue.put('the')
-    sync_queue.put('robots')
-
-    # Wait until the thread processes all the items
-    time.sleep(1)
-
-    result = yield from async_queue.get()
-    assert result == 'the'
-
-    result = yield from async_queue.get()
-    assert result == 'robots'
-
-    assert async_queue.qsize() == 0
-
-
 @patch('threading.Thread')
 @patch('aiohttp.web.run_app')
 @patch('bigchaindb.web.websocket_server.init_app')