add election class, process starts

This commit is contained in:
ryan 2016-04-25 15:55:37 +02:00
parent e9bc552bb1
commit d3cc56b4af
2 changed files with 73 additions and 3 deletions

View File

@ -5,7 +5,7 @@ import rethinkdb as r
import bigchaindb import bigchaindb
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.voter import Voter from bigchaindb.voter import Voter, Election
from bigchaindb.block import Block from bigchaindb.block import Block
from bigchaindb.web import server from bigchaindb.web import server
@ -31,6 +31,7 @@ class Processes(object):
# initialize the class # initialize the class
self.q_new_block = mp.Queue() self.q_new_block = mp.Queue()
self.q_new_transaction = mp.Queue() self.q_new_transaction = mp.Queue()
self.q_block_new_vote = mp.Queue()
def map_backlog(self): def map_backlog(self):
# listen to changes on the backlog and redirect the changes # listen to changes on the backlog and redirect the changes
@ -70,9 +71,9 @@ class Processes(object):
elif change['new_val'] is None: elif change['new_val'] is None:
pass pass
# update # update (new vote)
elif change['new_val'] is not None and change['old_val'] is not None: elif change['new_val'] is not None and change['old_val'] is not None:
pass self.q_block_new_vote.put(change['new_val'])
def start(self): def start(self):
logger.info('Initializing BigchainDB...') logger.info('Initializing BigchainDB...')
@ -90,6 +91,7 @@ class Processes(object):
p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog)
p_block = mp.Process(name='block', target=block.start) p_block = mp.Process(name='block', target=block.start)
p_voter = Voter(self.q_new_block) p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote)
# start the processes # start the processes
logger.info('starting bigchain mapper') logger.info('starting bigchain mapper')
@ -101,6 +103,8 @@ class Processes(object):
logger.info('starting voter') logger.info('starting voter')
p_voter.start() p_voter.start()
logger.info('starting election')
p_election.start()
# start message # start message
block.initialized.wait() block.initialized.wait()

View File

@ -190,3 +190,69 @@ class Voter(object):
p_validate.start() p_validate.start()
p_vote.start() p_vote.start()
p_update.start() p_update.start()
class Election(object):
def __init__(self, q_block_new_vote):
"""
Initialize the class with the needed queues.
Initialize a queue where blocks with new votes will be held
"""
self.q_block_new_vote = q_block_new_vote
self.q_blocks_with_quorum = mp.Queue()
self.q_invalid_blocks = mp.Queue()
def check_for_quorum(self):
"""
Checks if block has enough invalid votes to make a decision
"""
while True:
next_block = self.q_block_new_vote.get()
n_voters = len(next_block['block']['voters'])
vote_list = [vote['vote']['is_block_valid'] for vote in next_block['votes']]
n_invalid_votes = vote_list.count(False)
if n_invalid_votes > int(n_voters/2):
self.q_invalid_blocks.put(next_block)
else:
# no quorum reached, do nothing
pass
def requeue_transactions(self):
"""
Liquidates transactions from invalid blocks so they can be processed again
"""
while True:
invalid_block = self.q_invalid_blocks.get()
b = Bigchain()
# FIXME: this is unsafe and could lose transactions
# Since queue items are removed immediately, there is no guarantee
# all transactions will be rewritten. Imagine if node 5/5 casts a
# deciding invalid vote. This block will never show up in the
# "update" changefeed again. Suppose the below loop is writing
# transactions and the worker crashes. Those un-written transactions
# will never be reviewed again.
#
# Ideally, queue item should be removed on completion, a la celery or
# SQS. mp.JoinableQueue is also not appropriate, since it uses
# a counter.
for tx in invalid_block['transactions']:
b.write_transaction(tx)
def start(self):
"""
Initialize, spawn, and start the processes
"""
# initialize the processes
p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum)
p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions)
# start the processes
p_quorum_check.start()
p_requeue_tx.start()