diff --git a/bigchaindb/block.py b/bigchaindb/block.py index 651257f5..01fd0d56 100644 --- a/bigchaindb/block.py +++ b/bigchaindb/block.py @@ -229,3 +229,41 @@ class Block(object): p_write.start() p_delete.start() + +class BlockDeleteRevert(object): + + def __init__(self, q_delete_to_revert): + self.q_delete_to_revert = q_delete_to_revert + + def write_blocks(self): + """ + Write blocks to the bigchain + """ + + # create bigchain instance + b = Bigchain() + + # Write blocks + while True: + block = self.q_delete_to_revert.get() + + # poison pill + if block == 'stop': + return + + b.write_block(block) + + def kill(self): + for i in range(mp.cpu_count()): + self.q_delete_to_revert.put('stop') + + def start(self): + """ + Initialize, spawn, and start the processes + """ + + # initialize the processes + p_write = ProcessGroup(name='write_blocks', target=self.write_blocks) + + # start the processes + p_write.start() diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 4f36377b..e1c91587 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -16,6 +16,10 @@ class GenesisBlockAlreadyExistsError(Exception): pass +class ImproperVoteError(Exception): + pass + + class Bigchain(object): """Bigchain API @@ -415,8 +419,11 @@ class Bigchain(object): The block if the block is valid else it raises and exception describing the reason why the block is invalid. """ + # First, make sure this node hasn't already voted on this block + if self.has_previous_vote(block): + return block - # First: Run the plugin block validation logic + # Run the plugin block validation logic self.consensus.validate_block(self, block) # Finally: Tentative assumption that every blockchain will want to @@ -428,6 +435,26 @@ class Bigchain(object): return block + def has_previous_vote(self, block): + """Check for previous votes from this node + + Args: + block (dict): block to check. + + Returns: + True if this block already has a valid vote from this node, False otherwise. If + there is already a vote, but the vote is invalid, raises an ImproperVoteError + """ + if block['votes']: + for vote in block['votes']: + if vote['node_pubkey'] == self.me: + if util.verify_vote_signature(block, vote): + return True + else: + raise ImproperVoteError('Block {block_id} already has an incorrectly signed vote ' + 'from public key {me}').format(block_id=block['id'], me=self.me) + return False + def is_valid_block(self, block): """Check whether a block is valid or invalid. @@ -525,6 +552,10 @@ class Bigchain(object): def write_vote(self, block, vote, block_number): """Write the vote to the database.""" + # First, make sure this block doesn't contain a vote from this node + if self.has_previous_vote(block): + return None + update = {'votes': r.row['votes'].append(vote)} # We need to *not* override the existing block_number, if any diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index b51df516..2abd4879 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -6,7 +6,7 @@ import rethinkdb as r import bigchaindb from bigchaindb import Bigchain from bigchaindb.voter import Voter, Election -from bigchaindb.block import Block +from bigchaindb.block import Block, BlockDeleteRevert from bigchaindb.web import server @@ -32,6 +32,7 @@ class Processes(object): self.q_new_block = mp.Queue() self.q_new_transaction = mp.Queue() self.q_block_new_vote = mp.Queue() + self.q_revert_delete = mp.Queue() def map_backlog(self): # listen to changes on the backlog and redirect the changes @@ -69,7 +70,8 @@ class Processes(object): # delete elif change['new_val'] is None: - pass + # this should never happen in regular operation + self.q_revert_delete.put(change['old_val']) # update (new vote) elif change['new_val'] is not None and change['old_val'] is not None: @@ -80,6 +82,7 @@ class Processes(object): # instantiate block and voter block = Block(self.q_new_transaction) + delete_reverter = BlockDeleteRevert(self.q_revert_delete) # start the web api app_server = server.create_server(bigchaindb.config['server']) @@ -90,6 +93,7 @@ class Processes(object): p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain) p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) p_block = mp.Process(name='block', target=block.start) + p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start) p_voter = Voter(self.q_new_block) p_election = Election(self.q_block_new_vote) @@ -100,6 +104,7 @@ class Processes(object): p_map_backlog.start() logger.info('starting block') p_block.start() + p_block_delete_revert.start() logger.info('starting voter') p_voter.start() diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 4cff4c28..d8389f91 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -13,7 +13,7 @@ from bigchaindb import util from bigchaindb import exceptions from bigchaindb import crypto from bigchaindb.voter import Voter -from bigchaindb.block import Block +from bigchaindb.block import Block, BlockDeleteRevert @pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird ' @@ -257,6 +257,21 @@ class TestBigchainApi(object): b.write_vote(block_3, b.vote(block_3, b.get_last_voted_block(), True), 3) assert b.get_last_voted_block()['id'] == block_3['id'] + def test_no_vote_written_if_block_already_has_vote(self, b): + b.create_genesis_block() + + block_1 = b.create_block([]) + + b.write_block(block_1, durability='hard') + + b.write_vote(block_1, b.vote(block_1, b.get_last_voted_block(), True), 1) + retrieved_block_1 = r.table('bigchain').get(block_1['id']).run(b.conn) + + # try to vote again on the retrieved block, should do nothing + b.write_vote(retrieved_block_1, b.vote(retrieved_block_1, b.get_last_voted_block(), True), 1) + retrieved_block_2 = r.table('bigchain').get(block_1['id']).run(b.conn) + + assert retrieved_block_1 == retrieved_block_2 class TestTransactionValidation(object): @pytest.mark.usefixtures('inputs') @@ -787,6 +802,39 @@ class TestBigchainBlock(object): # join the process block.kill() + def test_revert_delete_block(self, b): + b.create_genesis_block() + + block_1 = b.create_block([]) + block_2 = b.create_block([]) + block_3 = b.create_block([]) + + b.write_block(block_1, durability='hard') + b.write_block(block_2, durability='hard') + b.write_block(block_3, durability='hard') + + b.write_vote(block_1, b.vote(block_1, b.get_last_voted_block(), True), 1) + b.write_vote(block_2, b.vote(block_2, b.get_last_voted_block(), True), 2) + b.write_vote(block_3, b.vote(block_3, b.get_last_voted_block(), True), 3) + + q_revert_delete = mp.Queue() + + reverter = BlockDeleteRevert(q_revert_delete) + + # simulate changefeed + r.table('bigchain').get(block_2['id']).delete().run(b.conn) + q_revert_delete.put(block_2) + + assert r.table('bigchain').get(block_2['id']).run(b.conn) is None + + reverter.start() + time.sleep(1) + reverter.kill() + + reverted_block_2 = r.table('bigchain').get(block_2['id']).run(b.conn) + + assert reverted_block_2 == block_2 + def test_duplicated_transactions(self): pytest.skip('We may have duplicates in the initial_results and changefeed') diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index 8ad397a1..21fc8534 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -292,6 +292,31 @@ class TestBigchainVoter(object): assert blocks[1]['votes'][0]['vote']['voting_for_block'] == block_1['id'] assert blocks[2]['votes'][0]['vote']['voting_for_block'] == block_2['id'] + def test_voter_checks_for_previous_vote(self, b): + b.create_genesis_block() + block_1 = b.create_block([]) + b.write_block(block_1, durability='hard') + + q_new_block = mp.Queue() + + voter = Voter(q_new_block) + voter.start() + + # queue block for voting + q_new_block.put(block_1) + time.sleep(1) + retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + # queue block for voting AGAIN + q_new_block.put(retrieved_block) + time.sleep(1) + voter.kill() + + re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + # block should be unchanged + assert retrieved_block == re_retrieved_block + @pytest.mark.skipif(reason='Updating the block_number must be atomic') def test_updating_block_number_must_be_atomic(self): pass