diff --git a/bigchaindb/events.py b/bigchaindb/events.py new file mode 100644 index 00000000..a061ad50 --- /dev/null +++ b/bigchaindb/events.py @@ -0,0 +1,33 @@ +from enum import Enum +from multiprocessing import Queue + + +class EventTypes(Enum): + BLOCK_VALID = 1 + BLOCK_INVALID = 2 + + +class Event(object): + + def __init__(self, event_type, event_data): + self.type = event_type + self.data = event_data + + +class EventHandler(object): + + def __init__(self, events_queue): + self.events_queue = events_queue + + def put_event(self, event, timeout=None): + # TODO: handle timeouts + self.events_queue.put(event, timeout=None) + + def get_event(self, timeout=None): + # TODO: handle timeouts + return self.events_queue.get(timeout=None) + + +def setup_events_queue(): + # TODO: set bounds to the queue + return Queue() diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index a5818b3e..b17f5722 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -13,6 +13,7 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain +from bigchaindb.events import EventHandler, Event, EventTypes logger = logging.getLogger(__name__) @@ -22,8 +23,9 @@ logger_results = logging.getLogger('pipeline.election.results') class Election: """Election class.""" - def __init__(self): + def __init__(self, events_queue): self.bigchain = Bigchain() + self.event_handler = EventHandler(events_queue) def check_for_quorum(self, next_vote): """ @@ -42,6 +44,7 @@ class Election: next_block = self.bigchain.get_block(block_id) result = self.bigchain.block_election(next_block) + self.handle_block_events(result, block_id) if result['status'] == self.bigchain.BLOCK_INVALID: return Block.from_dict(next_block) @@ -67,9 +70,20 @@ class Election: self.bigchain.write_transaction(tx) return invalid_block + def handle_block_events(self, result, block_id): + if result['status'] == self.bigchain.BLOCK_UNDECIDED: + return + elif result['status'] == self.bigchain.BLOCK_INVALID: + event_type = EventTypes.BLOCK_INVALID + elif result['status'] == self.bigchain.BLOCK_VALID: + event_type = EventTypes.BLOCK_VALID -def create_pipeline(): - election = Election() + event = Event(event_type, {'block_id': block_id}) + self.event_handler.put_event(event) + + +def create_pipeline(events_queue): + election = Election(events_queue) election_pipeline = Pipeline([ Node(election.check_for_quorum), @@ -84,8 +98,8 @@ def get_changefeed(): return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) -def start(): - pipeline = create_pipeline() +def start(events_queue): + pipeline = create_pipeline(events_queue) pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/pipelines/events_consumer_example.py b/bigchaindb/pipelines/events_consumer_example.py new file mode 100644 index 00000000..7e833c82 --- /dev/null +++ b/bigchaindb/pipelines/events_consumer_example.py @@ -0,0 +1,14 @@ +import multiprocessing as mp + +from bigchaindb.events import EventHandler + + +def consume_events(events_queue): + event_handler = EventHandler(events_queue) + while True: + event = event_handler.get_event() + print('Event type: {} Event data: {}'.format(event.type, event.data)) + + +def events_consumer(events_queue): + return mp.Process(target=consume_events, args=(events_queue,)) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 01d7a55a..687422ca 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -3,6 +3,8 @@ import multiprocessing as mp import bigchaindb from bigchaindb.pipelines import vote, block, election, stale +from bigchaindb.pipelines.events_consumer_example import events_consumer +from bigchaindb.events import setup_events_queue from bigchaindb.web import server @@ -25,6 +27,13 @@ BANNER = """ def start(): logger.info('Initializing BigchainDB...') + # Create the events queue + # The events queue needs to be initialized once and shared between + # processes. This seems the best way to do it + # At this point only the election processs and the event consumer require + # this queue. + events_queue = setup_events_queue() + # start the processes logger.info('Starting block') block.start() @@ -36,12 +45,17 @@ def start(): stale.start() logger.info('Starting election') - election.start() + election.start(events_queue) # start the web api app_server = server.create_server(bigchaindb.config['server']) p_webapi = mp.Process(name='webapi', target=app_server.run) p_webapi.start() + # start the example events consumer + logger.info('Starting the events consumer example') + p_events_consumer = events_consumer(events_queue) + p_events_consumer.start() + # start message logger.info(BANNER.format(bigchaindb.config['server']['bind']))