diff --git a/bigchaindb/core.py b/bigchaindb/core.py index a221a177..6b15d489 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -565,13 +565,9 @@ class Bigchain(object): return vote_signed - def write_vote(self, block, vote): + def write_vote(self, vote): """Write the vote to the database.""" - # First, make sure this block doesn't contain a vote from this node - if self.has_previous_vote(block): - return None - r.table('votes') \ .insert(vote) \ .run(self.conn) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py new file mode 100644 index 00000000..63659596 --- /dev/null +++ b/bigchaindb/pipelines/vote.py @@ -0,0 +1,73 @@ +from collections import Counter + +from multipipes import Pipeline, Node + +from bigchaindb.pipelines.utils import ChangeFeed +from bigchaindb import Bigchain + + +class Vote: + + def __init__(self): + self.bigchain = Bigchain() + last_voted = self.bigchain.get_last_voted_block() + self.last_voted_id = last_voted['id'] + self.last_voted_number = last_voted['block_number'] + + self.counters = Counter() + self.validity = {} + + def ungroup(self, block): + num_tx = len(block['block']['transactions']) + for tx in block['block']['transactions']: + yield tx, block['id'], num_tx + + def validate_tx(self, tx, block_id, num_tx): + return bool(self.bigchain.is_valid_transaction(tx)), block_id, num_tx + + def vote(self, tx_validity, block_id, num_tx): + self.counters[block_id] += 1 + self.validity[block_id] = tx_validity and self.validity.get(block_id, + True) + + if self.counters[block_id] == num_tx: + vote = self.bigchain.vote(block_id, + self.last_voted_id, + self.validity[block_id]) + self.last_voted_id = block_id + del self.counters[block_id] + del self.validity[block_id] + return vote + + def write_vote(self, vote): + self.bigchain.write_vote(vote) + + +def initial(): + b = Bigchain() + initial = b.get_unvoted_blocks() + return initial + + +def get_changefeed(): + return ChangeFeed('bigchain', 'insert', prefeed=initial()) + + +def create_pipeline(): + voter = Vote() + + vote_pipeline = Pipeline([ + Node(voter.ungroup), + Node(voter.validate_tx, fraction_of_cores=1), + Node(voter.vote), + Node(voter.write_vote) + ]) + + return vote_pipeline + + +def start(): + pipeline = create_pipeline() + pipeline.setup(indata=get_changefeed()) + pipeline.start() + return pipeline diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index f7953451..485a0738 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -158,7 +158,7 @@ class Voter(object): block, vote = elem pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid' logger.info('voting %s for block %s', pretty_vote, block['id']) - b.write_vote(block, vote) + b.write_vote(vote) def bootstrap(self): """ diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7082e52f..7a75371b 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -140,7 +140,7 @@ class TestBigchainApi(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) response = b.get_transaction(tx_signed["id"]) # should be None, because invalid blocks are ignored @@ -280,13 +280,13 @@ class TestBigchainApi(object): # make sure all the blocks are written at the same time monkeypatch.setattr(util, 'timestamp', lambda: '1') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_1['id'] - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_2['id'] - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_3['id'] @@ -305,15 +305,15 @@ class TestBigchainApi(object): # make sure all the blocks are written at different timestamps monkeypatch.setattr(util, 'timestamp', lambda: '1') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_1['id'] monkeypatch.setattr(util, 'timestamp', lambda: '2') - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_2['id'] monkeypatch.setattr(util, 'timestamp', lambda: '3') - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_3['id'] def test_no_vote_written_if_block_already_has_vote(self, b): @@ -323,11 +323,11 @@ class TestBigchainApi(object): b.write_block(block_1, durability='hard') - b.write_vote(block_1, b.vote(block_1['id'], genesis['id'], True)) + b.write_vote(b.vote(block_1['id'], genesis['id'], True)) retrieved_block_1 = r.table('bigchain').get(block_1['id']).run(b.conn) # try to vote again on the retrieved block, should do nothing - b.write_vote(retrieved_block_1, b.vote(retrieved_block_1['id'], genesis['id'], True)) + b.write_vote(b.vote(retrieved_block_1['id'], genesis['id'], True)) retrieved_block_2 = r.table('bigchain').get(block_1['id']).run(b.conn) assert retrieved_block_1 == retrieved_block_2 @@ -622,9 +622,9 @@ class TestBigchainBlock(object): b.write_block(block_2, durability='hard') b.write_block(block_3, durability='hard') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) q_revert_delete = mp.Queue() @@ -922,7 +922,7 @@ class TestMultipleInputs(object): # vote the block VALID vote = b.vote(block['id'], genesis['id'], True) - b.write_vote(block, vote) + b.write_vote(vote) # get input owned_inputs_user1 = b.get_owned_ids(user_vk) @@ -938,7 +938,7 @@ class TestMultipleInputs(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) owned_inputs_user1 = b.get_owned_ids(user_vk) owned_inputs_user2 = b.get_owned_ids(user2_vk) @@ -1052,7 +1052,7 @@ class TestMultipleInputs(object): # vote the block VALID vote = b.vote(block['id'], genesis['id'], True) - b.write_vote(block, vote) + b.write_vote(vote) # get input owned_inputs_user1 = b.get_owned_ids(user_vk) @@ -1069,7 +1069,7 @@ class TestMultipleInputs(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) response = b.get_transaction(tx_signed["id"]) spent_inputs_user1 = b.get_spent(owned_inputs_user1[0])