diff --git a/bigchaindb/events.py b/bigchaindb/events.py index 8b1da2e5..7959d284 100644 --- a/bigchaindb/events.py +++ b/bigchaindb/events.py @@ -1,3 +1,4 @@ +from queue import Empty from collections import defaultdict from multiprocessing import Queue @@ -41,6 +42,7 @@ class Exchange: def __init__(self): self.publisher_queue = Queue() + self.started_queue = Queue() # Map queues> self.queues = defaultdict(list) @@ -60,7 +62,16 @@ class Exchange: Returns: a :class:`multiprocessing.Queue`. + Raises: + RuntimeError if called after `run` """ + + try: + self.started_queue.get_nowait() + raise RuntimeError('Cannot create a new subscriber queue while Exchange is running.') + except Empty: + pass + if event_types is None: event_types = EventTypes.ALL @@ -83,6 +94,7 @@ class Exchange: def run(self): """Start the exchange""" + self.started_queue.put('STARTED') while True: event = self.publisher_queue.get() diff --git a/tests/test_events.py b/tests/test_events.py index 045497cf..f5cc7f18 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,3 +1,7 @@ +import pytest +pytestmark = pytest.mark.tendermint + + def test_event_handler(): from bigchaindb.events import EventTypes, Event, Exchange @@ -35,6 +39,18 @@ def test_event_handler(): assert sub3.qsize() == 0 +def test_event_handler_raises_when_called_after_start(): + from bigchaindb.events import Exchange, POISON_PILL + + exchange = Exchange() + publisher_queue = exchange.get_publisher_queue() + publisher_queue.put(POISON_PILL) + exchange.run() + + with pytest.raises(RuntimeError): + exchange.get_subscriber_queue() + + def test_exchange_stops_with_poison_pill(): from bigchaindb.events import EventTypes, Event, Exchange, POISON_PILL