diff --git a/bigchaindb/config_utils.py b/bigchaindb/config_utils.py index e5c58495..d4657769 100644 --- a/bigchaindb/config_utils.py +++ b/bigchaindb/config_utils.py @@ -289,14 +289,14 @@ def load_consensus_plugin(name=None): return plugin -def load_block_publisher_plugins(names): +def load_events_plugins(names=None): plugins = [] if names is None: - return + return plugins for name in names: - for entry_point in iter_entry_points('bigchaindb.block_publisher', name): + for entry_point in iter_entry_points('bigchaindb.events', name): plugins.append((name, entry_point.load())) return plugins diff --git a/bigchaindb/events.py b/bigchaindb/events.py index 43969803..da464ec0 100644 --- a/bigchaindb/events.py +++ b/bigchaindb/events.py @@ -1,13 +1,18 @@ -from enum import Enum +from collections import defaultdict from multiprocessing import Queue POISON_PILL = 'POISON_PILL' -class EventTypes(Enum): +class EventTypes: + + ALL = ~0 BLOCK_VALID = 1 BLOCK_INVALID = 2 + # NEW_EVENT = 4 + # NEW_EVENT = 8 + # NEW_EVENT = 16... class Event: @@ -21,19 +26,22 @@ class PubSub: def __init__(self): self.publisher_queue = Queue() - self.queues = [] + # Map queues> + self.queues = defaultdict(list) def get_publisher_queue(self): return self.publisher_queue - def get_subscriber_queue(self): + def get_subscriber_queue(self, event_types=EventTypes.ALL): queue = Queue() - self.queues.append(queue) + self.queues[event_types].append(queue) return queue def publish(self, event): - for queue in self.queues: - queue.put(event) + for event_types, queues in self.queues.items(): + if event.type & event_types: + for queue in queues: + queue.put(event) def run(self): while True: diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 70642e4c..47d5c248 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -13,7 +13,7 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain -from bigchaindb.events import Event, EventTypes +from bigchaindb.events import EventTypes, Event logger = logging.getLogger(__name__) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 1bb7e2f6..08c72472 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.config_utils import load_block_publisher_plugins +from bigchaindb.config_utils import load_events_plugins from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.events import PubSub from bigchaindb.web import server, websocket_server @@ -24,6 +24,20 @@ BANNER = """ """ +def start_events_plugins(pubsub): + plugins = load_events_plugins(bigchaindb.config.get('events_plugins')) + + for name, plugin in plugins: + logger.info('Loading events plugin %s', name) + + event_types = getattr(plugin, 'event_types', None) + queue = pubsub.get_subscriber_queue(event_types) + + mp.Process(name='events_plugin_{}'.format(name), + target=plugin.run, + args=(queue, )).start() + + def start(): logger.info('Initializing BigchainDB...') @@ -61,10 +75,6 @@ def start(): # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) - for name, plugin in load_block_publisher_plugins(bigchaindb.config.get('block_publishers', [])): - logger.info('Loading block publisher plugin %s', name) - mp.Process(name='block_publisher_{}'.format(name), - target=plugin.run, - args=(pubsub.get_subscriber_queue(),)).start() + start_events_plugins(pubsub) pubsub.run() diff --git a/tests/test_events.py b/tests/test_events.py index b195a1ca..fdeef7df 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -8,8 +8,12 @@ def test_event_handler(): # create the events pub sub pubsub = PubSub() - sub0 = pubsub.get_subscriber_queue() - sub1 = pubsub.get_subscriber_queue() + sub0 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID) + sub1 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID | + EventTypes.BLOCK_INVALID) + # Subscribe to all events + sub2 = pubsub.get_subscriber_queue() + sub3 = pubsub.get_subscriber_queue(EventTypes.BLOCK_INVALID) # push and event to the queue pubsub.publish(event) @@ -17,9 +21,15 @@ def test_event_handler(): # get the event from the queue event_sub0 = sub0.get() event_sub1 = sub1.get() + event_sub2 = sub2.get() assert event_sub0.type == event.type assert event_sub0.data == event.data assert event_sub1.type == event.type assert event_sub1.data == event.data + + assert event_sub2.type == event.type + assert event_sub2.data == event.data + + assert sub3.qsize() == 0