diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 20b8df8d..b51df516 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -5,7 +5,7 @@ import rethinkdb as r import bigchaindb from bigchaindb import Bigchain -from bigchaindb.voter import Voter +from bigchaindb.voter import Voter, Election from bigchaindb.block import Block from bigchaindb.web import server @@ -31,6 +31,7 @@ class Processes(object): # initialize the class self.q_new_block = mp.Queue() self.q_new_transaction = mp.Queue() + self.q_block_new_vote = mp.Queue() def map_backlog(self): # listen to changes on the backlog and redirect the changes @@ -70,9 +71,9 @@ class Processes(object): elif change['new_val'] is None: pass - # update + # update (new vote) elif change['new_val'] is not None and change['old_val'] is not None: - pass + self.q_block_new_vote.put(change['new_val']) def start(self): logger.info('Initializing BigchainDB...') @@ -90,6 +91,7 @@ class Processes(object): p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) p_block = mp.Process(name='block', target=block.start) p_voter = Voter(self.q_new_block) + p_election = Election(self.q_block_new_vote) # start the processes logger.info('starting bigchain mapper') @@ -101,6 +103,8 @@ class Processes(object): logger.info('starting voter') p_voter.start() + logger.info('starting election') + p_election.start() # start message block.initialized.wait() diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index 3ed73636..a92d825c 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -190,3 +190,69 @@ class Voter(object): p_validate.start() p_vote.start() p_update.start() + + +class Election(object): + + def __init__(self, q_block_new_vote): + """ + Initialize the class with the needed queues. + + Initialize a queue where blocks with new votes will be held + """ + self.q_block_new_vote = q_block_new_vote + self.q_blocks_with_quorum = mp.Queue() + self.q_invalid_blocks = mp.Queue() + + def check_for_quorum(self): + """ + Checks if block has enough invalid votes to make a decision + """ + while True: + next_block = self.q_block_new_vote.get() + n_voters = len(next_block['block']['voters']) + + vote_list = [vote['vote']['is_block_valid'] for vote in next_block['votes']] + + n_invalid_votes = vote_list.count(False) + + if n_invalid_votes > int(n_voters/2): + self.q_invalid_blocks.put(next_block) + else: + # no quorum reached, do nothing + pass + + def requeue_transactions(self): + """ + Liquidates transactions from invalid blocks so they can be processed again + """ + while True: + invalid_block = self.q_invalid_blocks.get() + b = Bigchain() + # FIXME: this is unsafe and could lose transactions + # Since queue items are removed immediately, there is no guarantee + # all transactions will be rewritten. Imagine if node 5/5 casts a + # deciding invalid vote. This block will never show up in the + # "update" changefeed again. Suppose the below loop is writing + # transactions and the worker crashes. Those un-written transactions + # will never be reviewed again. + # + # Ideally, queue item should be removed on completion, a la celery or + # SQS. mp.JoinableQueue is also not appropriate, since it uses + # a counter. + + for tx in invalid_block['transactions']: + b.write_transaction(tx) + + def start(self): + """ + Initialize, spawn, and start the processes + """ + + # initialize the processes + p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum) + p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions) + + # start the processes + p_quorum_check.start() + p_requeue_tx.start()