Plugins can subscribe to specific events

This commit is contained in:
vrde 2017-08-03 09:06:38 +02:00
parent 8d72421d7c
commit 21a7da1c9d
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
5 changed files with 47 additions and 19 deletions

View File

@ -289,14 +289,14 @@ def load_consensus_plugin(name=None):
return plugin return plugin
def load_block_publisher_plugins(names): def load_events_plugins(names=None):
plugins = [] plugins = []
if names is None: if names is None:
return return plugins
for name in names: for name in names:
for entry_point in iter_entry_points('bigchaindb.block_publisher', name): for entry_point in iter_entry_points('bigchaindb.events', name):
plugins.append((name, entry_point.load())) plugins.append((name, entry_point.load()))
return plugins return plugins

View File

@ -1,13 +1,18 @@
from enum import Enum from collections import defaultdict
from multiprocessing import Queue from multiprocessing import Queue
POISON_PILL = 'POISON_PILL' POISON_PILL = 'POISON_PILL'
class EventTypes(Enum): class EventTypes:
ALL = ~0
BLOCK_VALID = 1 BLOCK_VALID = 1
BLOCK_INVALID = 2 BLOCK_INVALID = 2
# NEW_EVENT = 4
# NEW_EVENT = 8
# NEW_EVENT = 16...
class Event: class Event:
@ -21,19 +26,22 @@ class PubSub:
def __init__(self): def __init__(self):
self.publisher_queue = Queue() self.publisher_queue = Queue()
self.queues = [] # Map <event_types -> queues>
self.queues = defaultdict(list)
def get_publisher_queue(self): def get_publisher_queue(self):
return self.publisher_queue return self.publisher_queue
def get_subscriber_queue(self): def get_subscriber_queue(self, event_types=EventTypes.ALL):
queue = Queue() queue = Queue()
self.queues.append(queue) self.queues[event_types].append(queue)
return queue return queue
def publish(self, event): def publish(self, event):
for queue in self.queues: for event_types, queues in self.queues.items():
queue.put(event) if event.type & event_types:
for queue in queues:
queue.put(event)
def run(self): def run(self):
while True: while True:

View File

@ -13,7 +13,7 @@ from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block from bigchaindb.models import Block
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.events import Event, EventTypes from bigchaindb.events import EventTypes, Event
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -2,7 +2,7 @@ import logging
import multiprocessing as mp import multiprocessing as mp
import bigchaindb import bigchaindb
from bigchaindb.config_utils import load_block_publisher_plugins from bigchaindb.config_utils import load_events_plugins
from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.events import PubSub from bigchaindb.events import PubSub
from bigchaindb.web import server, websocket_server from bigchaindb.web import server, websocket_server
@ -24,6 +24,20 @@ BANNER = """
""" """
def start_events_plugins(pubsub):
plugins = load_events_plugins(bigchaindb.config.get('events_plugins'))
for name, plugin in plugins:
logger.info('Loading events plugin %s', name)
event_types = getattr(plugin, 'event_types', None)
queue = pubsub.get_subscriber_queue(event_types)
mp.Process(name='events_plugin_{}'.format(name),
target=plugin.run,
args=(queue, )).start()
def start(): def start():
logger.info('Initializing BigchainDB...') logger.info('Initializing BigchainDB...')
@ -61,10 +75,6 @@ def start():
# start message # start message
logger.info(BANNER.format(bigchaindb.config['server']['bind'])) logger.info(BANNER.format(bigchaindb.config['server']['bind']))
for name, plugin in load_block_publisher_plugins(bigchaindb.config.get('block_publishers', [])): start_events_plugins(pubsub)
logger.info('Loading block publisher plugin %s', name)
mp.Process(name='block_publisher_{}'.format(name),
target=plugin.run,
args=(pubsub.get_subscriber_queue(),)).start()
pubsub.run() pubsub.run()

View File

@ -8,8 +8,12 @@ def test_event_handler():
# create the events pub sub # create the events pub sub
pubsub = PubSub() pubsub = PubSub()
sub0 = pubsub.get_subscriber_queue() sub0 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID)
sub1 = pubsub.get_subscriber_queue() sub1 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID |
EventTypes.BLOCK_INVALID)
# Subscribe to all events
sub2 = pubsub.get_subscriber_queue()
sub3 = pubsub.get_subscriber_queue(EventTypes.BLOCK_INVALID)
# push and event to the queue # push and event to the queue
pubsub.publish(event) pubsub.publish(event)
@ -17,9 +21,15 @@ def test_event_handler():
# get the event from the queue # get the event from the queue
event_sub0 = sub0.get() event_sub0 = sub0.get()
event_sub1 = sub1.get() event_sub1 = sub1.get()
event_sub2 = sub2.get()
assert event_sub0.type == event.type assert event_sub0.type == event.type
assert event_sub0.data == event.data assert event_sub0.data == event.data
assert event_sub1.type == event.type assert event_sub1.type == event.type
assert event_sub1.data == event.data assert event_sub1.data == event.data
assert event_sub2.type == event.type
assert event_sub2.data == event.data
assert sub3.qsize() == 0