Merge branch 'block-publisher-plugin-arch'

This commit is contained in:
vrde 2017-08-11 11:24:07 +02:00
commit e7640feaec
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
12 changed files with 277 additions and 45 deletions

View File

@ -287,3 +287,16 @@ def load_consensus_plugin(name=None):
'consensus.BaseConsensusRules`'.format(type(plugin)))
return plugin
def load_events_plugins(names=None):
plugins = []
if names is None:
return plugins
for name in names:
for entry_point in iter_entry_points('bigchaindb.events', name):
plugins.append((name, entry_point.load()))
return plugins

View File

@ -1,33 +1,91 @@
from enum import Enum
from collections import defaultdict
from multiprocessing import Queue
class EventTypes(Enum):
POISON_PILL = 'POISON_PILL'
class EventTypes:
"""Container class that holds all the possible
events BigchainDB manages."""
# If you add a new Event Type, make sure to add it
# to the docs in docs/server/source/event-plugin-api.rst
ALL = ~0
BLOCK_VALID = 1
BLOCK_INVALID = 2
# NEW_EVENT = 4
# NEW_EVENT = 8
# NEW_EVENT = 16...
class Event:
"""An Event."""
def __init__(self, event_type, event_data):
"""Creates a new event.
Args:
event_type (int): the type of the event, see
:class:`~bigchaindb.events.EventTypes`
event_data (obj): the data of the event.
"""
self.type = event_type
self.data = event_data
class EventHandler:
class Exchange:
"""Dispatch events to subscribers."""
def __init__(self, events_queue):
self.events_queue = events_queue
def __init__(self):
self.publisher_queue = Queue()
def put_event(self, event, timeout=None):
# TODO: handle timeouts
self.events_queue.put(event, timeout=None)
# Map <event_types -> queues>
self.queues = defaultdict(list)
def get_event(self, timeout=None):
# TODO: handle timeouts
return self.events_queue.get(timeout=None)
def get_publisher_queue(self):
"""Get the queue used by the publisher.
Returns:
a :class:`multiprocessing.Queue`.
"""
def setup_events_queue():
# TODO: set bounds to the queue
return Queue()
return self.publisher_queue
def get_subscriber_queue(self, event_types=None):
"""Create a new queue for a specific combination of event types
and return it.
Returns:
a :class:`multiprocessing.Queue`.
"""
if event_types is None:
event_types = EventTypes.ALL
queue = Queue()
self.queues[event_types].append(queue)
return queue
def dispatch(self, event):
"""Given an event, send it to all the subscribers.
Args
event (:class:`~bigchaindb.events.EventTypes`): the event to
dispatch to all the subscribers.
"""
for event_types, queues in self.queues.items():
if event.type & event_types:
for queue in queues:
queue.put(event)
def run(self):
"""Start the exchange"""
while True:
event = self.publisher_queue.get()
if event == POISON_PILL:
return
else:
self.dispatch(event)

View File

@ -13,7 +13,7 @@ from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block
from bigchaindb import Bigchain
from bigchaindb.events import EventHandler, Event, EventTypes
from bigchaindb.events import EventTypes, Event
logger = logging.getLogger(__name__)
@ -25,9 +25,7 @@ class Election:
def __init__(self, events_queue=None):
self.bigchain = Bigchain()
self.event_handler = None
if events_queue:
self.event_handler = EventHandler(events_queue)
self.events_queue = events_queue
def check_for_quorum(self, next_vote):
"""
@ -73,7 +71,7 @@ class Election:
return invalid_block
def handle_block_events(self, result, block_id):
if self.event_handler:
if self.events_queue:
if result['status'] == self.bigchain.BLOCK_UNDECIDED:
return
elif result['status'] == self.bigchain.BLOCK_INVALID:
@ -82,7 +80,7 @@ class Election:
event_type = EventTypes.BLOCK_VALID
event = Event(event_type, self.bigchain.get_block(block_id))
self.event_handler.put_event(event)
self.events_queue.put(event)
def create_pipeline(events_queue=None):

View File

@ -2,8 +2,9 @@ import logging
import multiprocessing as mp
import bigchaindb
from bigchaindb import config_utils
from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.events import setup_events_queue
from bigchaindb.events import Exchange, EventTypes
from bigchaindb.web import server, websocket_server
@ -23,15 +24,30 @@ BANNER = """
"""
def start_events_plugins(exchange):
plugins = config_utils.load_events_plugins(
bigchaindb.config.get('events_plugins'))
for name, plugin in plugins:
logger.info('Loading events plugin %s', name)
event_types = getattr(plugin, 'event_types', None)
queue = exchange.get_subscriber_queue(event_types)
mp.Process(name='events_plugin_{}'.format(name),
target=plugin.run,
args=(queue, )).start()
def start():
logger.info('Initializing BigchainDB...')
# Create the events queue
# Create a Exchange object.
# The events queue needs to be initialized once and shared between
# processes. This seems the best way to do it
# At this point only the election processs and the event consumer require
# this queue.
events_queue = setup_events_queue()
exchange = Exchange()
# start the processes
logger.info('Starting block')
@ -44,7 +60,7 @@ def start():
stale.start()
logger.info('Starting election')
election.start(events_queue=events_queue)
election.start(events_queue=exchange.get_publisher_queue())
# start the web api
app_server = server.create_server(bigchaindb.config['server'])
@ -54,8 +70,12 @@ def start():
logger.info('WebSocket server started')
p_websocket_server = mp.Process(name='ws',
target=websocket_server.start,
args=(events_queue,))
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
p_websocket_server.start()
# start message
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
start_events_plugins(exchange)
exchange.run()

View File

@ -0,0 +1,67 @@
The Event Plugin API [experimental]
===================================
.. danger::
The Event Plugin API is **experimental** and might change in the future.
BigchainDB implements an internal event system that allows different software
components to receive updates on specific topics. The WebSocket API, for example,
is a subscriber to a stream of events called ``BLOCK_VALID``. Every time a block is
voted valid, the WebSocket API is notified, and it sends updates to all the
clients connected.
We decided to make this internal event system public, to allow developers to
integrate BigchainDB with other applications, such as AMQP systems.
Available Events
----------------
The event types are listed in the source file ``bigchaindb/events.py``.
.. list-table:: Event Types
:widths: 15 10 30
:header-rows: 1
* - event name
- event id
- description
* - BLOCK_VALID
- 1
- a block has been voted valid by the network.
* - BLOCK_INVALID
- 2
- a block has been voted invalid by the network.
Plugin Example
----------------
We developed a minimal plugin that listens to new valid blocks and prints them
to the console:
https://github.com/bigchaindb/events-plugin-example
Architecture of an Event Plugin
-------------------------------
Creating your own plugin is really easy, and can be summarized in few steps:
1. Create a new Python package that defines the entry point ``bigchaindb.events`` in its ``setup.py``.
2. In your entry point, define two properties:
- ``event_types``: a variable to tell BigchainDB which events your plugin is interested in.
A plugin can subscribe to more than one events by combining them using the
**binary or** operator, e.g. in case you want to subscribe to both valid and
invalid blocks your ``event_types`` can be ``1 | 2``.
- ``run``: a function that will process the events coming from BigchainDB.
3. Install the newly created Python package in the current environment.
4. Add the plugin name to your BigchainDB configuration.
5. (Re)start BigchainDB.
If the installation was successful, the plugin will be run in a different
process. Your plugin will receive events through a ``multiprocessing.Queue``
object.
.. note::
It's your plugin's responsibility to consume it's queue.

View File

@ -0,0 +1,8 @@
The Events API
==============
.. toctree::
:maxdepth: 1
websocket-event-stream-api
event-plugin-api

View File

@ -13,7 +13,7 @@ BigchainDB Server Documentation
dev-and-test/index
server-reference/index
http-client-server-api
websocket-event-stream-api
events/index
drivers-clients/index
data-models/index
schema/transaction

View File

@ -202,9 +202,10 @@ def test_full_pipeline(b, user_pk):
def test_handle_block_events():
from bigchaindb.events import setup_events_queue, EventTypes
from bigchaindb.events import Exchange, EventTypes
events_queue = setup_events_queue()
exchange = Exchange()
events_queue = exchange.get_publisher_queue()
e = election.Election(events_queue=events_queue)
block_id = 'a' * 64
@ -216,10 +217,10 @@ def test_handle_block_events():
# put an invalid block event in the queue
e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id)
event = e.event_handler.get_event()
event = events_queue.get()
assert event.type == EventTypes.BLOCK_INVALID
# put a valid block event in the queue
e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id)
event = e.event_handler.get_event()
event = events_queue.get()
assert event.type == EventTypes.BLOCK_VALID

View File

@ -72,7 +72,7 @@ def test_load_consensus_plugin_raises_with_invalid_subclass(monkeypatch):
import time
monkeypatch.setattr(config_utils,
'iter_entry_points',
lambda *args: [type('entry_point', (object), {'load': lambda: object})])
lambda *args: [type('entry_point', (object, ), {'load': lambda: object})])
with pytest.raises(TypeError):
# Since the function is decorated with `lru_cache`, we need to
@ -80,6 +80,16 @@ def test_load_consensus_plugin_raises_with_invalid_subclass(monkeypatch):
config_utils.load_consensus_plugin(str(time.time()))
def test_load_events_plugins(monkeypatch):
from bigchaindb import config_utils
monkeypatch.setattr(config_utils,
'iter_entry_points',
lambda *args: [type('entry_point', (object, ), {'load': lambda: object})])
plugins = config_utils.load_events_plugins(['one', 'two'])
assert len(plugins) == 2
def test_map_leafs_iterator():
from bigchaindb import config_utils

View File

@ -1,21 +1,55 @@
def tests_event_handler():
from bigchaindb.events import (EventTypes, Event, EventHandler,
setup_events_queue)
def test_event_handler():
from bigchaindb.events import EventTypes, Event, Exchange
# create and event
event_data = {'msg': 'some data'}
event = Event(EventTypes.BLOCK_VALID, event_data)
# create the events queue
events_queue = setup_events_queue()
# create event handler
event_handler = EventHandler(events_queue)
# create the events pub sub
exchange = Exchange()
sub0 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID)
sub1 = exchange.get_subscriber_queue(EventTypes.BLOCK_VALID |
EventTypes.BLOCK_INVALID)
# Subscribe to all events
sub2 = exchange.get_subscriber_queue()
sub3 = exchange.get_subscriber_queue(EventTypes.BLOCK_INVALID)
# push and event to the queue
event_handler.put_event(event)
exchange.dispatch(event)
# get the event from the queue
event_from_queue = event_handler.get_event()
event_sub0 = sub0.get()
event_sub1 = sub1.get()
event_sub2 = sub2.get()
assert event_from_queue.type == event.type
assert event_from_queue.data == event.data
assert event_sub0.type == event.type
assert event_sub0.data == event.data
assert event_sub1.type == event.type
assert event_sub1.data == event.data
assert event_sub2.type == event.type
assert event_sub2.data == event.data
assert sub3.qsize() == 0
def test_exchange_stops_with_poison_pill():
from bigchaindb.events import EventTypes, Event, Exchange, POISON_PILL
# create and event
event_data = {'msg': 'some data'}
event = Event(EventTypes.BLOCK_VALID, event_data)
# create the events pub sub
exchange = Exchange()
publisher_queue = exchange.get_publisher_queue()
# push and event to the queue
publisher_queue.put(event)
publisher_queue.put(POISON_PILL)
exchange.run()
assert publisher_queue.qsize() == 0

View File

@ -9,8 +9,9 @@ from bigchaindb.pipelines import vote, block, election, stale
@patch.object(block, 'start')
@patch.object(vote, 'start')
@patch.object(Process, 'start')
@patch('bigchaindb.events.setup_events_queue', spec_set=True, autospec=True)
def test_processes_start(mock_setup_events_queue, mock_process, mock_vote,
@patch('bigchaindb.events.Exchange.get_publisher_queue', spec_set=True, autospec=True)
@patch('bigchaindb.events.Exchange.run', spec_set=True, autospec=True)
def test_processes_start(mock_exchange_run, mock_exchange, mock_process, mock_vote,
mock_block, mock_election, mock_stale):
from bigchaindb import processes
@ -21,4 +22,26 @@ def test_processes_start(mock_setup_events_queue, mock_process, mock_vote,
mock_stale.assert_called_with()
mock_process.assert_called_with()
mock_election.assert_called_once_with(
events_queue=mock_setup_events_queue.return_value)
events_queue=mock_exchange.return_value)
@patch.object(Process, 'start')
def test_start_events_plugins(mock_process, monkeypatch):
class MockPlugin:
def __init__(self, event_types):
self.event_types = event_types
def run(self, queue):
pass
monkeypatch.setattr('bigchaindb.config_utils.load_events_plugins',
lambda names: [('one', MockPlugin(1)),
('two', MockPlugin(2))])
from bigchaindb import processes
from bigchaindb.events import Exchange
exchange = Exchange()
processes.start_events_plugins(exchange)
assert len(exchange.queues) == 2