diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index 485a0738..2cef51bf 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -1,204 +1,12 @@ import logging import multiprocessing as mp -import ctypes from bigchaindb import Bigchain -from bigchaindb.monitor import Monitor logger = logging.getLogger(__name__) -class BlockStream(object): - """ - Combine the stream of new blocks coming from the changefeed with the list of unvoted blocks. - - This is a utility class that abstracts the source of data for the `Voter`. - """ - - def __init__(self, new_blocks): - """ - Create a new BlockStream instance. - - Args: - new_block (queue): a queue of new blocks - """ - - b = Bigchain() - self.new_blocks = new_blocks - # TODO: there might be duplicate blocks since we *first* get the changefeed and only *then* we query the - # database to get the old blocks. - - # TODO how about a one liner, something like: - # self.unvoted_blocks = b.get_unvoted_blocks() if not b.nodes_except_me else [] - self.unvoted_blocks = [] - if not b.nodes_except_me: - self.unvoted_blocks = b.get_unvoted_blocks() - - def get(self): - """ - Return the next block to be processed. - """ - try: - # FIXME: apparently RethinkDB returns a list instead of a cursor when using `order_by`. - # We might change the `pop` in the future, when the driver will return a cursor. - # We have a test for this, so if the driver implementation changes we will get a failure: - # - tests/test_voter.py::TestBlockStream::test_if_old_blocks_get_should_return_old_block_first - return self.unvoted_blocks.pop(0) - except IndexError: - return self.new_blocks.get() - - -class Voter(object): - - def __init__(self, q_new_block): - """ - Initialize the class with the needed queues. - - Initialize with a queue where new blocks added to the bigchain will be put - """ - - self.monitor = Monitor() - - self.q_new_block = q_new_block - self.q_blocks_to_validate = mp.Queue() - self.q_validated_block = mp.Queue() - self.q_voted_block = mp.Queue() - self.v_previous_block_id = mp.Value(ctypes.c_char_p) - self.initialized = mp.Event() - - def feed_blocks(self): - """ - Prepare the queue with blocks to validate - """ - - block_stream = BlockStream(self.q_new_block) - while True: - # poison pill - block = block_stream.get() - if block == 'stop': - self.q_blocks_to_validate.put('stop') - return - - self.q_blocks_to_validate.put(block) - - def validate(self): - """ - Checks if incoming blocks are valid or not - """ - - # create a bigchain instance. All processes should create their own bigchcain instance so that they all - # have their own connection to the database - b = Bigchain() - - logger.info('voter waiting for new blocks') - # signal initialization complete - self.initialized.set() - - while True: - new_block = self.q_blocks_to_validate.get() - - # poison pill - if new_block == 'stop': - self.q_validated_block.put('stop') - return - - logger.info('new_block arrived to voter') - - with self.monitor.timer('validate_block'): - # FIXME: the following check is done also in `is_valid_block`, - # but validity can be true even if the block has already - # a vote. - if b.has_previous_vote(new_block): - continue - validity = b.is_valid_block(new_block) - - self.q_validated_block.put((new_block, - self.v_previous_block_id.value.decode(), - validity)) - - self.v_previous_block_id.value = new_block['id'].encode() - - def vote(self): - """ - Votes on the block based on the decision of the validation - """ - - # create a bigchain instance - b = Bigchain() - - while True: - elem = self.q_validated_block.get() - - # poison pill - if elem == 'stop': - self.q_voted_block.put('stop') - return - - validated_block, previous_block_id, decision = elem - vote = b.vote(validated_block['id'], previous_block_id, decision) - self.q_voted_block.put((validated_block, vote)) - - def update_block(self): - """ - Appends the vote in the bigchain table - """ - - # create a bigchain instance - b = Bigchain() - - while True: - elem = self.q_voted_block.get() - - # poison pill - if elem == 'stop': - logger.info('clean exit') - return - - block, vote = elem - pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid' - logger.info('voting %s for block %s', pretty_vote, block['id']) - b.write_vote(vote) - - def bootstrap(self): - """ - Before starting handling the new blocks received by the changefeed we need to handle unvoted blocks - added to the bigchain while the process was down - - We also need to set the previous_block_id. - """ - - b = Bigchain() - last_voted = b.get_last_voted_block() - - self.v_previous_block_id.value = last_voted['id'].encode() - - def kill(self): - """ - Terminate processes - """ - self.q_new_block.put('stop') - - def start(self): - """ - Initialize, spawn, and start the processes - """ - - self.bootstrap() - - # initialize the processes - p_feed_blocks = mp.Process(name='block_feeder', target=self.feed_blocks) - p_validate = mp.Process(name='block_validator', target=self.validate) - p_vote = mp.Process(name='block_voter', target=self.vote) - p_update = mp.Process(name='block_updater', target=self.update_block) - - # start the processes - p_feed_blocks.start() - p_validate.start() - p_vote.start() - p_update.start() - - class Election(object): def __init__(self, q_block_new_vote): diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7a75371b..d279cc9a 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -1,6 +1,5 @@ import copy import multiprocessing as mp -import random import time import pytest @@ -9,7 +8,6 @@ import cryptoconditions as cc import bigchaindb from bigchaindb import crypto, exceptions, util -from bigchaindb.voter import Voter from bigchaindb.block import BlockDeleteRevert diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index 9627d12b..fb6cb495 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -5,7 +5,7 @@ import multiprocessing as mp from bigchaindb import util -from bigchaindb.voter import Voter, Election, BlockStream +from bigchaindb.voter import Election, BlockStream from bigchaindb import crypto, Bigchain @@ -308,38 +308,3 @@ class TestBlockElection(object): # tx2 was in an invalid block and SHOULD be in the backlog assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id'] - -class TestBlockStream(object): - - def test_if_federation_size_is_greater_than_one_ignore_past_blocks(self, b): - for _ in range(5): - b.nodes_except_me.append(crypto.generate_key_pair()[1]) - new_blocks = mp.Queue() - bs = BlockStream(new_blocks) - block_1 = dummy_block() - new_blocks.put(block_1) - assert block_1 == bs.get() - - def test_if_no_old_blocks_get_should_return_new_blocks(self, b): - new_blocks = mp.Queue() - bs = BlockStream(new_blocks) - - # create two blocks - block_1 = dummy_block() - block_2 = dummy_block() - - # write the blocks - b.write_block(block_1, durability='hard') - b.write_block(block_2, durability='hard') - - # simulate a changefeed - new_blocks.put(block_1) - new_blocks.put(block_2) - - # and check if we get exactly these two blocks - assert bs.get() == block_1 - assert bs.get() == block_2 - - @pytest.mark.skipif(reason='We may have duplicated blocks when retrieving the BlockStream') - def test_ignore_duplicated_blocks_when_retrieving_the_blockstream(self): - pass diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 08732916..42960c52 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -157,6 +157,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch): assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), vote_doc['signature']) is True + def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): from bigchaindb.pipelines import vote @@ -257,6 +258,125 @@ def test_invalid_block_voting(monkeypatch, b, user_vk): vote_doc['signature']) is True +def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): + from bigchaindb.pipelines import vote + + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + # insert blocks in the database while the voter process is not listening + # (these blocks won't appear in the changefeed) + block_1 = dummy_block(b) + b.write_block(block_1, durability='hard') + block_2 = dummy_block(b) + b.write_block(block_2, durability='hard') + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.start() + + # We expects two votes, so instead of waiting an arbitrary amount + # of time, we can do two blocking calls to `get` + outpipe.get() + outpipe.get() + + # create a new block that will appear in the changefeed + block_3 = dummy_block(b) + b.write_block(block_3, durability='hard') + + # Same as before with the two `get`s + outpipe.get() + + vote_pipeline.terminate() + + # retrive blocks from bigchain + blocks = list(r.table('bigchain') + .order_by(r.asc((r.row['block']['timestamp']))) + .run(b.conn)) + + # FIXME: remove genesis block, we don't vote on it (might change in the future) + blocks.pop(0) + vote_pipeline.terminate() + + # retrieve vote + votes = r.table('votes').run(b.conn) + votes = list(votes) + + assert all(vote['node_pubkey'] == b.me for vote in votes) + + +def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): + from bigchaindb.pipelines import vote + + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + monkeypatch.setattr(util, 'timestamp', lambda: '2') + block_1 = dummy_block(b) + b.write_block(block_1, durability='hard') + + monkeypatch.setattr(util, 'timestamp', lambda: '3') + block_2 = dummy_block(b) + b.write_block(block_2, durability='hard') + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.start() + + # We expects two votes, so instead of waiting an arbitrary amount + # of time, we can do two blocking calls to `get` + outpipe.get() + outpipe.get() + vote_pipeline.terminate() + + # retrive blocks from bigchain + blocks = list(r.table('bigchain') + .order_by(r.asc((r.row['block']['timestamp']))) + .run(b.conn)) + + # retrieve votes + votes = list(r.table('votes').run(b.conn)) + + assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) + assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) + + +def test_voter_checks_for_previous_vote(monkeypatch, b): + from bigchaindb.pipelines import vote + + inpipe = Pipe() + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + block_1 = dummy_block(b) + inpipe.put(block_1) + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + vote_pipeline.start() + + # wait for the result + outpipe.get() + + retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + # queue block for voting AGAIN + inpipe.put(retrieved_block) + + re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + vote_pipeline.terminate() + + # block should be unchanged + assert retrieved_block == re_retrieved_block + + @patch.object(Pipeline, 'start') def test_start(mock_start, b): # TODO: `block.start` is just a wrapper around `vote.create_pipeline`,