diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py new file mode 100644 index 00000000..98a19885 --- /dev/null +++ b/bigchaindb/pipelines/election.py @@ -0,0 +1,50 @@ +import rethinkdb as r +from multipipes import Pipeline, Node + +from bigchaindb.pipelines.utils import ChangeFeed +from bigchaindb import Bigchain + + +class Election: + + def __init__(self): + self.bigchain = Bigchain() + + def check_for_quorum(self, next_vote): + """ + Checks if block has enough invalid votes to make a decision + """ + next_block = r.table('bigchain')\ + .get(next_vote['vote']['voting_for_block'])\ + .run(self.bigchain.conn) + if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID: + return next_block + + def requeue_transactions(self, invalid_block): + """ + Liquidates transactions from invalid blocks so they can be processed again + """ + for tx in invalid_block['block']['transactions']: + self.bigchain.write_transaction(tx) + + +def get_changefeed(): + return ChangeFeed(table='votes', operation='insert') + + +def create_pipeline(): + election = Election() + + election_pipeline = Pipeline([ + Node(election.check_for_quorum), + Node(election.requeue_transactions) + ]) + + return election_pipeline + + +def start(): + pipeline = create_pipeline() + pipeline.setup(indata=get_changefeed()) + pipeline.start() + return pipeline \ No newline at end of file diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index fe641750..d226778d 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -4,9 +4,9 @@ import multiprocessing as mp import rethinkdb as r import bigchaindb -from bigchaindb.pipelines import block +from bigchaindb.pipelines import block, election from bigchaindb import Bigchain -from bigchaindb.voter import Voter, Election +from bigchaindb.voter import Voter from bigchaindb.block import BlockDeleteRevert from bigchaindb.web import server @@ -70,19 +70,18 @@ class Processes(object): p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain) p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start) p_voter = Voter(self.q_new_block) - p_election = Election(self.q_block_new_vote) # start the processes logger.info('starting bigchain mapper') p_map_bigchain.start() logger.info('starting backlog mapper') logger.info('starting block') block.start() + election.start() p_block_delete_revert.start() logger.info('starting voter') p_voter.start() logger.info('starting election') - p_election.start() # start message p_voter.initialized.wait() diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index f7953451..4d60ecd5 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -197,68 +197,3 @@ class Voter(object): p_validate.start() p_vote.start() p_update.start() - - -class Election(object): - - def __init__(self, q_block_new_vote): - """ - Initialize the class with the needed queues. - - Initialize a queue where blocks with new votes will be held - """ - self.q_block_new_vote = q_block_new_vote - self.q_invalid_blocks = mp.Queue() - - def check_for_quorum(self): - """ - Checks if block has enough invalid votes to make a decision - """ - b = Bigchain() - - while True: - next_block = self.q_block_new_vote.get() - - # poison pill - if next_block == 'stop': - self.q_invalid_blocks.put('stop') - logger.info('clean exit') - return - - if b.block_election_status(next_block) == 'invalid': - self.q_invalid_blocks.put(next_block) - - def requeue_transactions(self): - """ - Liquidates transactions from invalid blocks so they can be processed again - """ - while True: - invalid_block = self.q_invalid_blocks.get() - - # poison pill - if invalid_block == 'stop': - logger.info('clean exit') - return - - b = Bigchain() - for tx in invalid_block['block']['transactions']: - b.write_transaction(tx) - - def kill(self): - """ - Terminate processes - """ - self.q_block_new_vote.put('stop') - - def start(self): - """ - Initialize, spawn, and start the processes - """ - - # initialize the processes - p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum) - p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions) - - # start the processes - p_quorum_check.start() - p_requeue_tx.start() diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py new file mode 100644 index 00000000..f9f18285 --- /dev/null +++ b/tests/pipelines/test_election.py @@ -0,0 +1,17 @@ +import time +import random +from unittest.mock import patch + +import rethinkdb as r + +from bigchaindb.pipelines import election +from multipipes import Pipe, Pipeline + + +def test_check_for_quorum(b, user_vk): + e = election.Election() + + +def test_check_requeue_transaction(b, user_vk): + pass +