Initial implementation of an event_handler

This commit is contained in:
Rodolphe Marques 2017-03-28 16:05:44 +02:00 committed by vrde
parent b90766f2c5
commit 0cbf144ddf
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
4 changed files with 81 additions and 6 deletions

33
bigchaindb/events.py Normal file
View File

@ -0,0 +1,33 @@
from enum import Enum
from multiprocessing import Queue
class EventTypes(Enum):
BLOCK_VALID = 1
BLOCK_INVALID = 2
class Event(object):
def __init__(self, event_type, event_data):
self.type = event_type
self.data = event_data
class EventHandler(object):
def __init__(self, events_queue):
self.events_queue = events_queue
def put_event(self, event, timeout=None):
# TODO: handle timeouts
self.events_queue.put(event, timeout=None)
def get_event(self, timeout=None):
# TODO: handle timeouts
return self.events_queue.get(timeout=None)
def setup_events_queue():
# TODO: set bounds to the queue
return Queue()

View File

@ -13,6 +13,7 @@ from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block
from bigchaindb import Bigchain
from bigchaindb.events import EventHandler, Event, EventTypes
logger = logging.getLogger(__name__)
@ -22,8 +23,9 @@ logger_results = logging.getLogger('pipeline.election.results')
class Election:
"""Election class."""
def __init__(self):
def __init__(self, events_queue):
self.bigchain = Bigchain()
self.event_handler = EventHandler(events_queue)
def check_for_quorum(self, next_vote):
"""
@ -42,6 +44,7 @@ class Election:
next_block = self.bigchain.get_block(block_id)
result = self.bigchain.block_election(next_block)
self.handle_block_events(result, block_id)
if result['status'] == self.bigchain.BLOCK_INVALID:
return Block.from_dict(next_block)
@ -67,9 +70,20 @@ class Election:
self.bigchain.write_transaction(tx)
return invalid_block
def handle_block_events(self, result, block_id):
if result['status'] == self.bigchain.BLOCK_UNDECIDED:
return
elif result['status'] == self.bigchain.BLOCK_INVALID:
event_type = EventTypes.BLOCK_INVALID
elif result['status'] == self.bigchain.BLOCK_VALID:
event_type = EventTypes.BLOCK_VALID
def create_pipeline():
election = Election()
event = Event(event_type, {'block_id': block_id})
self.event_handler.put_event(event)
def create_pipeline(events_queue):
election = Election(events_queue)
election_pipeline = Pipeline([
Node(election.check_for_quorum),
@ -84,8 +98,8 @@ def get_changefeed():
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)
def start():
pipeline = create_pipeline()
def start(events_queue):
pipeline = create_pipeline(events_queue)
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

View File

@ -0,0 +1,14 @@
import multiprocessing as mp
from bigchaindb.events import EventHandler
def consume_events(events_queue):
event_handler = EventHandler(events_queue)
while True:
event = event_handler.get_event()
print('Event type: {} Event data: {}'.format(event.type, event.data))
def events_consumer(events_queue):
return mp.Process(target=consume_events, args=(events_queue,))

View File

@ -3,6 +3,8 @@ import multiprocessing as mp
import bigchaindb
from bigchaindb.pipelines import vote, block, election, stale
from bigchaindb.pipelines.events_consumer_example import events_consumer
from bigchaindb.events import setup_events_queue
from bigchaindb.web import server
@ -25,6 +27,13 @@ BANNER = """
def start():
logger.info('Initializing BigchainDB...')
# Create the events queue
# The events queue needs to be initialized once and shared between
# processes. This seems the best way to do it
# At this point only the election processs and the event consumer require
# this queue.
events_queue = setup_events_queue()
# start the processes
logger.info('Starting block')
block.start()
@ -36,12 +45,17 @@ def start():
stale.start()
logger.info('Starting election')
election.start()
election.start(events_queue)
# start the web api
app_server = server.create_server(bigchaindb.config['server'])
p_webapi = mp.Process(name='webapi', target=app_server.run)
p_webapi.start()
# start the example events consumer
logger.info('Starting the events consumer example')
p_events_consumer = events_consumer(events_queue)
p_events_consumer.start()
# start message
logger.info(BANNER.format(bigchaindb.config['server']['bind']))