diff --git a/bigchaindb/events.py b/bigchaindb/events.py index da464ec0..d5172082 100644 --- a/bigchaindb/events.py +++ b/bigchaindb/events.py @@ -6,6 +6,8 @@ POISON_PILL = 'POISON_PILL' class EventTypes: + """Container class that holds all the possible + events BigchainDB manages.""" ALL = ~0 BLOCK_VALID = 1 @@ -16,37 +18,69 @@ class EventTypes: class Event: + """An Event.""" def __init__(self, event_type, event_data): + """Creates a new event. + + Args: + event_type (int): the type of the event, see + :class:`~bigchaindb.events.EventTypes` + event_data (obj): the data of the event. + """ + self.type = event_type self.data = event_data -class PubSub: +class Exchange: + """Dispatch events to subscribers.""" def __init__(self): self.publisher_queue = Queue() + # Map queues> self.queues = defaultdict(list) def get_publisher_queue(self): + """Get the queue used by the publisher. + + Returns: + a :class:`multiprocessing.Queue`. + """ + return self.publisher_queue def get_subscriber_queue(self, event_types=EventTypes.ALL): + """Create a new queue for a specific combination of event types + and return it. + + Returns: + a :class:`multiprocessing.Queue`. + """ queue = Queue() self.queues[event_types].append(queue) return queue - def publish(self, event): + def dispatch(self, event): + """Given an event, send it to all the subscribers. + + Args + event (:class:`~bigchaindb.events.EventTypes`): the event to + dispatch to all the subscribers. + """ + for event_types, queues in self.queues.items(): if event.type & event_types: for queue in queues: queue.put(event) def run(self): + """Start the exchange""" + while True: event = self.publisher_queue.get() if event is POISON_PILL: return else: - self.publish(event) + self.dispatch(event) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 08c72472..682a9210 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,9 +2,9 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.config_utils import load_events_plugins +from bigchaindb import config_utils from bigchaindb.pipelines import vote, block, election, stale -from bigchaindb.events import PubSub +from bigchaindb.events import Exchange from bigchaindb.web import server, websocket_server @@ -24,14 +24,15 @@ BANNER = """ """ -def start_events_plugins(pubsub): - plugins = load_events_plugins(bigchaindb.config.get('events_plugins')) +def start_events_plugins(exchange): + plugins = config_utils.load_events_plugins( + bigchaindb.config.get('events_plugins')) for name, plugin in plugins: logger.info('Loading events plugin %s', name) event_types = getattr(plugin, 'event_types', None) - queue = pubsub.get_subscriber_queue(event_types) + queue = exchange.get_subscriber_queue(event_types) mp.Process(name='events_plugin_{}'.format(name), target=plugin.run, @@ -41,12 +42,12 @@ def start_events_plugins(pubsub): def start(): logger.info('Initializing BigchainDB...') - # Create a PubSub object. + # Create a Exchange object. # The events queue needs to be initialized once and shared between # processes. This seems the best way to do it # At this point only the election processs and the event consumer require # this queue. - pubsub = PubSub() + exchange = Exchange() # start the processes logger.info('Starting block') @@ -59,7 +60,7 @@ def start(): stale.start() logger.info('Starting election') - election.start(events_queue=pubsub.get_publisher_queue()) + election.start(events_queue=exchange.get_publisher_queue()) # start the web api app_server = server.create_server(bigchaindb.config['server']) @@ -69,12 +70,12 @@ def start(): logger.info('WebSocket server started') p_websocket_server = mp.Process(name='ws', target=websocket_server.start, - args=(pubsub.get_subscriber_queue(),)) + args=(exchange.get_subscriber_queue(),)) p_websocket_server.start() # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) - start_events_plugins(pubsub) + start_events_plugins(exchange) - pubsub.run() + exchange.run() diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a91bd012..05b1ef37 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -202,10 +202,10 @@ def test_full_pipeline(b, user_pk): def test_handle_block_events(): - from bigchaindb.events import PubSub, EventTypes + from bigchaindb.events import Exchange, EventTypes - pubsub = PubSub() - events_queue = pubsub.get_publisher_queue() + exchange = Exchange() + events_queue = exchange.get_publisher_queue() e = election.Election(events_queue=events_queue) block_id = 'a' * 64 diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index 66e5f804..45462367 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -80,13 +80,13 @@ def test_load_consensus_plugin_raises_with_invalid_subclass(monkeypatch): config_utils.load_consensus_plugin(str(time.time())) -def test_load_block_publisher(monkeypatch): +def test_load_events_plugins(monkeypatch): from bigchaindb import config_utils monkeypatch.setattr(config_utils, 'iter_entry_points', lambda *args: [type('entry_point', (object, ), {'load': lambda: object})]) - plugins = config_utils.load_block_publisher_plugins(['one', 'two']) + plugins = config_utils.load_events_plugins(['one', 'two']) assert len(plugins) == 2 diff --git a/tests/test_events.py b/tests/test_events.py index fdeef7df..08bd7747 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,22 +1,22 @@ def test_event_handler(): - from bigchaindb.events import EventTypes, Event, PubSub + from bigchaindb.events import EventTypes, Event, Exchange # create and event event_data = {'msg': 'some data'} event = Event(EventTypes.BLOCK_VALID, event_data) # create the events pub sub - pubsub = PubSub() + exchange = Exchange() - sub0 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID) - sub1 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID | - EventTypes.BLOCK_INVALID) + sub0 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID) + sub1 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID | + EventTypes.BLOCK_INVALID) # Subscribe to all events - sub2 = pubsub.get_subscriber_queue() - sub3 = pubsub.get_subscriber_queue(EventTypes.BLOCK_INVALID) + sub2 = exchange.get_subscriber_queue() + sub3 = exchange.get_subscriber_queue(EventTypes.BLOCK_INVALID) # push and event to the queue - pubsub.publish(event) + exchange.dispatch(event) # get the event from the queue event_sub0 = sub0.get() diff --git a/tests/test_processes.py b/tests/test_processes.py index 712c1700..3618b85e 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -9,9 +9,9 @@ from bigchaindb.pipelines import vote, block, election, stale @patch.object(block, 'start') @patch.object(vote, 'start') @patch.object(Process, 'start') -@patch('bigchaindb.events.PubSub.get_publisher_queue', spec_set=True, autospec=True) -@patch('bigchaindb.events.PubSub.run', spec_set=True, autospec=True) -def test_processes_start(mock_pub_sub_run, mock_pub_sub, mock_process, mock_vote, +@patch('bigchaindb.events.Exchange.get_publisher_queue', spec_set=True, autospec=True) +@patch('bigchaindb.events.Exchange.run', spec_set=True, autospec=True) +def test_processes_start(mock_exchange_run, mock_exchange, mock_process, mock_vote, mock_block, mock_election, mock_stale): from bigchaindb import processes @@ -22,4 +22,26 @@ def test_processes_start(mock_pub_sub_run, mock_pub_sub, mock_process, mock_vote mock_stale.assert_called_with() mock_process.assert_called_with() mock_election.assert_called_once_with( - events_queue=mock_pub_sub.return_value) + events_queue=mock_exchange.return_value) + + +@patch.object(Process, 'start') +def test_start_events_plugins(mock_process, monkeypatch): + + class MockPlugin: + def __init__(self, event_types): + self.event_types = event_types + + def run(self, queue): + pass + + monkeypatch.setattr('bigchaindb.config_utils.load_events_plugins', + lambda names: [('one', MockPlugin(1)), + ('two', MockPlugin(2))]) + + from bigchaindb import processes + from bigchaindb.events import Exchange + + exchange = Exchange() + processes.start_events_plugins(exchange) + assert len(exchange.queues) == 2