Rename things, add tests

This commit is contained in:
vrde 2017-08-09 09:28:28 +02:00
parent 21a7da1c9d
commit a0cbb63db8
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
6 changed files with 88 additions and 31 deletions

View File

@ -6,6 +6,8 @@ POISON_PILL = 'POISON_PILL'
class EventTypes:
"""Container class that holds all the possible
events BigchainDB manages."""
ALL = ~0
BLOCK_VALID = 1
@ -16,37 +18,69 @@ class EventTypes:
class Event:
"""An Event."""
def __init__(self, event_type, event_data):
"""Creates a new event.
Args:
event_type (int): the type of the event, see
:class:`~bigchaindb.events.EventTypes`
event_data (obj): the data of the event.
"""
self.type = event_type
self.data = event_data
class PubSub:
class Exchange:
"""Dispatch events to subscribers."""
def __init__(self):
self.publisher_queue = Queue()
# Map <event_types -> queues>
self.queues = defaultdict(list)
def get_publisher_queue(self):
"""Get the queue used by the publisher.
Returns:
a :class:`multiprocessing.Queue`.
"""
return self.publisher_queue
def get_subscriber_queue(self, event_types=EventTypes.ALL):
"""Create a new queue for a specific combination of event types
and return it.
Returns:
a :class:`multiprocessing.Queue`.
"""
queue = Queue()
self.queues[event_types].append(queue)
return queue
def publish(self, event):
def dispatch(self, event):
"""Given an event, send it to all the subscribers.
Args
event (:class:`~bigchaindb.events.EventTypes`): the event to
dispatch to all the subscribers.
"""
for event_types, queues in self.queues.items():
if event.type & event_types:
for queue in queues:
queue.put(event)
def run(self):
"""Start the exchange"""
while True:
event = self.publisher_queue.get()
if event is POISON_PILL:
return
else:
self.publish(event)
self.dispatch(event)

View File

@ -2,9 +2,9 @@ import logging
import multiprocessing as mp
import bigchaindb
from bigchaindb.config_utils import load_events_plugins
from bigchaindb import config_utils
from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.events import PubSub
from bigchaindb.events import Exchange
from bigchaindb.web import server, websocket_server
@ -24,14 +24,15 @@ BANNER = """
"""
def start_events_plugins(pubsub):
plugins = load_events_plugins(bigchaindb.config.get('events_plugins'))
def start_events_plugins(exchange):
plugins = config_utils.load_events_plugins(
bigchaindb.config.get('events_plugins'))
for name, plugin in plugins:
logger.info('Loading events plugin %s', name)
event_types = getattr(plugin, 'event_types', None)
queue = pubsub.get_subscriber_queue(event_types)
queue = exchange.get_subscriber_queue(event_types)
mp.Process(name='events_plugin_{}'.format(name),
target=plugin.run,
@ -41,12 +42,12 @@ def start_events_plugins(pubsub):
def start():
logger.info('Initializing BigchainDB...')
# Create a PubSub object.
# Create a Exchange object.
# The events queue needs to be initialized once and shared between
# processes. This seems the best way to do it
# At this point only the election processs and the event consumer require
# this queue.
pubsub = PubSub()
exchange = Exchange()
# start the processes
logger.info('Starting block')
@ -59,7 +60,7 @@ def start():
stale.start()
logger.info('Starting election')
election.start(events_queue=pubsub.get_publisher_queue())
election.start(events_queue=exchange.get_publisher_queue())
# start the web api
app_server = server.create_server(bigchaindb.config['server'])
@ -69,12 +70,12 @@ def start():
logger.info('WebSocket server started')
p_websocket_server = mp.Process(name='ws',
target=websocket_server.start,
args=(pubsub.get_subscriber_queue(),))
args=(exchange.get_subscriber_queue(),))
p_websocket_server.start()
# start message
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
start_events_plugins(pubsub)
start_events_plugins(exchange)
pubsub.run()
exchange.run()

View File

@ -202,10 +202,10 @@ def test_full_pipeline(b, user_pk):
def test_handle_block_events():
from bigchaindb.events import PubSub, EventTypes
from bigchaindb.events import Exchange, EventTypes
pubsub = PubSub()
events_queue = pubsub.get_publisher_queue()
exchange = Exchange()
events_queue = exchange.get_publisher_queue()
e = election.Election(events_queue=events_queue)
block_id = 'a' * 64

View File

@ -80,13 +80,13 @@ def test_load_consensus_plugin_raises_with_invalid_subclass(monkeypatch):
config_utils.load_consensus_plugin(str(time.time()))
def test_load_block_publisher(monkeypatch):
def test_load_events_plugins(monkeypatch):
from bigchaindb import config_utils
monkeypatch.setattr(config_utils,
'iter_entry_points',
lambda *args: [type('entry_point', (object, ), {'load': lambda: object})])
plugins = config_utils.load_block_publisher_plugins(['one', 'two'])
plugins = config_utils.load_events_plugins(['one', 'two'])
assert len(plugins) == 2

View File

@ -1,22 +1,22 @@
def test_event_handler():
from bigchaindb.events import EventTypes, Event, PubSub
from bigchaindb.events import EventTypes, Event, Exchange
# create and event
event_data = {'msg': 'some data'}
event = Event(EventTypes.BLOCK_VALID, event_data)
# create the events pub sub
pubsub = PubSub()
exchange = Exchange()
sub0 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID)
sub1 = pubsub.get_subscriber_queue(EventTypes.BLOCK_VALID |
EventTypes.BLOCK_INVALID)
sub0 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID)
sub1 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID |
EventTypes.BLOCK_INVALID)
# Subscribe to all events
sub2 = pubsub.get_subscriber_queue()
sub3 = pubsub.get_subscriber_queue(EventTypes.BLOCK_INVALID)
sub2 = exchange.get_subscriber_queue()
sub3 = exchange.get_subscriber_queue(EventTypes.BLOCK_INVALID)
# push and event to the queue
pubsub.publish(event)
exchange.dispatch(event)
# get the event from the queue
event_sub0 = sub0.get()

View File

@ -9,9 +9,9 @@ from bigchaindb.pipelines import vote, block, election, stale
@patch.object(block, 'start')
@patch.object(vote, 'start')
@patch.object(Process, 'start')
@patch('bigchaindb.events.PubSub.get_publisher_queue', spec_set=True, autospec=True)
@patch('bigchaindb.events.PubSub.run', spec_set=True, autospec=True)
def test_processes_start(mock_pub_sub_run, mock_pub_sub, mock_process, mock_vote,
@patch('bigchaindb.events.Exchange.get_publisher_queue', spec_set=True, autospec=True)
@patch('bigchaindb.events.Exchange.run', spec_set=True, autospec=True)
def test_processes_start(mock_exchange_run, mock_exchange, mock_process, mock_vote,
mock_block, mock_election, mock_stale):
from bigchaindb import processes
@ -22,4 +22,26 @@ def test_processes_start(mock_pub_sub_run, mock_pub_sub, mock_process, mock_vote
mock_stale.assert_called_with()
mock_process.assert_called_with()
mock_election.assert_called_once_with(
events_queue=mock_pub_sub.return_value)
events_queue=mock_exchange.return_value)
@patch.object(Process, 'start')
def test_start_events_plugins(mock_process, monkeypatch):
class MockPlugin:
def __init__(self, event_types):
self.event_types = event_types
def run(self, queue):
pass
monkeypatch.setattr('bigchaindb.config_utils.load_events_plugins',
lambda names: [('one', MockPlugin(1)),
('two', MockPlugin(2))])
from bigchaindb import processes
from bigchaindb.events import Exchange
exchange = Exchange()
processes.start_events_plugins(exchange)
assert len(exchange.queues) == 2