From 74a5412cd9fe82366db24e4f60d5ef44ad74dfba Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 28 Jul 2016 13:55:57 +0200 Subject: [PATCH 01/39] WIP --- bigchaindb/core.py | 6 +-- bigchaindb/pipelines/vote.py | 73 +++++++++++++++++++++++++++++++++++ bigchaindb/voter.py | 2 +- tests/db/test_bigchain_api.py | 32 +++++++-------- 4 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 bigchaindb/pipelines/vote.py diff --git a/bigchaindb/core.py b/bigchaindb/core.py index a221a177..6b15d489 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -565,13 +565,9 @@ class Bigchain(object): return vote_signed - def write_vote(self, block, vote): + def write_vote(self, vote): """Write the vote to the database.""" - # First, make sure this block doesn't contain a vote from this node - if self.has_previous_vote(block): - return None - r.table('votes') \ .insert(vote) \ .run(self.conn) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py new file mode 100644 index 00000000..63659596 --- /dev/null +++ b/bigchaindb/pipelines/vote.py @@ -0,0 +1,73 @@ +from collections import Counter + +from multipipes import Pipeline, Node + +from bigchaindb.pipelines.utils import ChangeFeed +from bigchaindb import Bigchain + + +class Vote: + + def __init__(self): + self.bigchain = Bigchain() + last_voted = self.bigchain.get_last_voted_block() + self.last_voted_id = last_voted['id'] + self.last_voted_number = last_voted['block_number'] + + self.counters = Counter() + self.validity = {} + + def ungroup(self, block): + num_tx = len(block['block']['transactions']) + for tx in block['block']['transactions']: + yield tx, block['id'], num_tx + + def validate_tx(self, tx, block_id, num_tx): + return bool(self.bigchain.is_valid_transaction(tx)), block_id, num_tx + + def vote(self, tx_validity, block_id, num_tx): + self.counters[block_id] += 1 + self.validity[block_id] = tx_validity and self.validity.get(block_id, + True) + + if self.counters[block_id] == num_tx: + vote = self.bigchain.vote(block_id, + self.last_voted_id, + self.validity[block_id]) + self.last_voted_id = block_id + del self.counters[block_id] + del self.validity[block_id] + return vote + + def write_vote(self, vote): + self.bigchain.write_vote(vote) + + +def initial(): + b = Bigchain() + initial = b.get_unvoted_blocks() + return initial + + +def get_changefeed(): + return ChangeFeed('bigchain', 'insert', prefeed=initial()) + + +def create_pipeline(): + voter = Vote() + + vote_pipeline = Pipeline([ + Node(voter.ungroup), + Node(voter.validate_tx, fraction_of_cores=1), + Node(voter.vote), + Node(voter.write_vote) + ]) + + return vote_pipeline + + +def start(): + pipeline = create_pipeline() + pipeline.setup(indata=get_changefeed()) + pipeline.start() + return pipeline diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index f7953451..485a0738 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -158,7 +158,7 @@ class Voter(object): block, vote = elem pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid' logger.info('voting %s for block %s', pretty_vote, block['id']) - b.write_vote(block, vote) + b.write_vote(vote) def bootstrap(self): """ diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7082e52f..7a75371b 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -140,7 +140,7 @@ class TestBigchainApi(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) response = b.get_transaction(tx_signed["id"]) # should be None, because invalid blocks are ignored @@ -280,13 +280,13 @@ class TestBigchainApi(object): # make sure all the blocks are written at the same time monkeypatch.setattr(util, 'timestamp', lambda: '1') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_1['id'] - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_2['id'] - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_3['id'] @@ -305,15 +305,15 @@ class TestBigchainApi(object): # make sure all the blocks are written at different timestamps monkeypatch.setattr(util, 'timestamp', lambda: '1') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_1['id'] monkeypatch.setattr(util, 'timestamp', lambda: '2') - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_2['id'] monkeypatch.setattr(util, 'timestamp', lambda: '3') - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) assert b.get_last_voted_block()['id'] == block_3['id'] def test_no_vote_written_if_block_already_has_vote(self, b): @@ -323,11 +323,11 @@ class TestBigchainApi(object): b.write_block(block_1, durability='hard') - b.write_vote(block_1, b.vote(block_1['id'], genesis['id'], True)) + b.write_vote(b.vote(block_1['id'], genesis['id'], True)) retrieved_block_1 = r.table('bigchain').get(block_1['id']).run(b.conn) # try to vote again on the retrieved block, should do nothing - b.write_vote(retrieved_block_1, b.vote(retrieved_block_1['id'], genesis['id'], True)) + b.write_vote(b.vote(retrieved_block_1['id'], genesis['id'], True)) retrieved_block_2 = r.table('bigchain').get(block_1['id']).run(b.conn) assert retrieved_block_1 == retrieved_block_2 @@ -622,9 +622,9 @@ class TestBigchainBlock(object): b.write_block(block_2, durability='hard') b.write_block(block_3, durability='hard') - b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) - b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) - b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True)) + b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True)) q_revert_delete = mp.Queue() @@ -922,7 +922,7 @@ class TestMultipleInputs(object): # vote the block VALID vote = b.vote(block['id'], genesis['id'], True) - b.write_vote(block, vote) + b.write_vote(vote) # get input owned_inputs_user1 = b.get_owned_ids(user_vk) @@ -938,7 +938,7 @@ class TestMultipleInputs(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) owned_inputs_user1 = b.get_owned_ids(user_vk) owned_inputs_user2 = b.get_owned_ids(user2_vk) @@ -1052,7 +1052,7 @@ class TestMultipleInputs(object): # vote the block VALID vote = b.vote(block['id'], genesis['id'], True) - b.write_vote(block, vote) + b.write_vote(vote) # get input owned_inputs_user1 = b.get_owned_ids(user_vk) @@ -1069,7 +1069,7 @@ class TestMultipleInputs(object): # vote the block invalid vote = b.vote(block['id'], b.get_last_voted_block()['id'], False) - b.write_vote(block, vote) + b.write_vote(vote) response = b.get_transaction(tx_signed["id"]) spent_inputs_user1 = b.get_spent(owned_inputs_user1[0]) From 199e8633bbad36cadca76e6396a305bf49bc8b92 Mon Sep 17 00:00:00 2001 From: vrde Date: Mon, 1 Aug 2016 22:45:53 +0200 Subject: [PATCH 02/39] Port tests to the new pipeline process --- bigchaindb/pipelines/vote.py | 5 +- tests/db/test_voter.py | 208 ---------------------------- tests/pipelines/test_vote.py | 258 +++++++++++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 210 deletions(-) create mode 100644 tests/pipelines/test_vote.py diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 63659596..633f9e2f 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -9,10 +9,10 @@ from bigchaindb import Bigchain class Vote: def __init__(self): + last_voted = Bigchain().get_last_voted_block() + self.bigchain = Bigchain() - last_voted = self.bigchain.get_last_voted_block() self.last_voted_id = last_voted['id'] - self.last_voted_number = last_voted['block_number'] self.counters = Counter() self.validity = {} @@ -41,6 +41,7 @@ class Vote: def write_vote(self, vote): self.bigchain.write_vote(vote) + return vote def initial(): diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index e6e200cb..9627d12b 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -25,214 +25,6 @@ def dummy_block(): class TestBigchainVoter(object): - def test_valid_block_voting(self, b): - q_new_block = mp.Queue() - - genesis = b.create_genesis_block() - - # create valid block - # sleep so that `block` as a higher timestamp then genesis - time.sleep(1) - block = dummy_block() - # assert block is valid - assert b.is_valid_block(block) - b.write_block(block, durability='hard') - - # create queue and voter - voter = Voter(q_new_block) - - # vote - voter.start() - # wait for vote to be written - time.sleep(1) - voter.kill() - - # retrive block from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # retrieve vote - vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn) - vote = vote.next() - - # validate vote - assert vote is not None - - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['previous_block'] == genesis['id'] - assert vote['vote']['is_block_valid'] is True - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - - def test_valid_block_voting_with_create_transaction(self, b): - q_new_block = mp.Queue() - - genesis = b.create_genesis_block() - - # create a `CREATE` transaction - test_user_priv, test_user_pub = crypto.generate_key_pair() - tx = b.create_transaction(b.me, test_user_pub, None, 'CREATE') - tx_signed = b.sign_transaction(tx, b.me_private) - assert b.is_valid_transaction(tx_signed) - - # create valid block - # sleep so that block as a higher timestamp then genesis - time.sleep(1) - block = b.create_block([tx_signed]) - # assert block is valid - assert b.is_valid_block(block) - b.write_block(block, durability='hard') - - # create queue and voter - voter = Voter(q_new_block) - - # vote - voter.start() - # wait for vote to be written - time.sleep(1) - voter.kill() - - # retrive block from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - # retrieve vote - vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn) - vote = vote.next() - - # validate vote - assert vote is not None - - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['previous_block'] == genesis['id'] - assert vote['vote']['is_block_valid'] is True - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - - def test_valid_block_voting_with_transfer_transactions(self, b): - q_new_block = mp.Queue() - - b.create_genesis_block() - - # create a `CREATE` transaction - test_user_priv, test_user_pub = crypto.generate_key_pair() - tx = b.create_transaction(b.me, test_user_pub, None, 'CREATE') - tx_signed = b.sign_transaction(tx, b.me_private) - assert b.is_valid_transaction(tx_signed) - - # create valid block - block = b.create_block([tx_signed]) - # assert block is valid - assert b.is_valid_block(block) - b.write_block(block, durability='hard') - - # create queue and voter - voter = Voter(q_new_block) - - # vote - voter.start() - # wait for vote to be written - time.sleep(1) - voter.kill() - - # retrive block from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # retrieve vote - vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn) - vote = vote.next() - - # validate vote - assert vote is not None - - # create a `TRANSFER` transaction - test_user2_priv, test_user2_pub = crypto.generate_key_pair() - tx2 = b.create_transaction(test_user_pub, test_user2_pub, {'txid': tx['id'], 'cid': 0}, 'TRANSFER') - tx2_signed = b.sign_transaction(tx2, test_user_priv) - assert b.is_valid_transaction(tx2_signed) - - # create valid block - block = b.create_block([tx2_signed]) - # assert block is valid - assert b.is_valid_block(block) - b.write_block(block, durability='hard') - - # create queue and voter - voter = Voter(q_new_block) - - # vote - voter.start() - # wait for vote to be written - time.sleep(1) - voter.kill() - - # retrive block from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # retrieve vote - vote = r.table('votes').get_all([blocks[2]['id'], b.me], index='block_and_voter').run(b.conn) - vote = vote.next() - - # validate vote - assert vote is not None - - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['is_block_valid'] is True - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - - def test_invalid_block_voting(self, b, user_vk): - # create queue and voter - q_new_block = mp.Queue() - voter = Voter(q_new_block) - - # create transaction - transaction = b.create_transaction(b.me, user_vk, None, 'CREATE') - transaction_signed = b.sign_transaction(transaction, b.me_private) - - genesis = b.create_genesis_block() - - # create invalid block - # sleep so that `block` as a higher timestamp then `genesis` - time.sleep(1) - block = b.create_block([transaction_signed]) - # change transaction id to make it invalid - block['block']['transactions'][0]['id'] = 'abc' - assert not b.is_valid_block(block) - b.write_block(block, durability='hard') - - # vote - voter.start() - time.sleep(1) - voter.kill() - - # retrive block from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # retrieve vote - vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn) - vote = vote.next() - - # validate vote - assert vote is not None - - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['previous_block'] == genesis['id'] - assert vote['vote']['is_block_valid'] is False - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - def test_vote_creation_valid(self, b): # create valid block block = dummy_block() diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py new file mode 100644 index 00000000..e369b2db --- /dev/null +++ b/tests/pipelines/test_vote.py @@ -0,0 +1,258 @@ +import pytest +import rethinkdb as r +from multipipes import Pipe + +from bigchaindb import util +from bigchaindb import crypto + + +def dummy_tx(b): + tx = b.create_transaction(b.me, b.me, None, 'CREATE') + tx_signed = b.sign_transaction(tx, b.me_private) + return tx_signed + + +def dummy_block(b): + block = b.create_block([dummy_tx(b) for _ in range(10)]) + return block + + +def test_vote_ungroup_returns_a_set_of_results(b): + from bigchaindb.pipelines import vote + + b.create_genesis_block() + block = dummy_block(b) + vote_obj = vote.Vote() + txs = list(vote_obj.ungroup(block)) + + assert len(txs) == 10 + + +def test_vote_validate_transaction(b): + from bigchaindb.pipelines import vote + + b.create_genesis_block() + tx = dummy_tx(b) + vote_obj = vote.Vote() + validation = vote_obj.validate_tx(tx, 123, 1) + assert validation == (True, 123, 1) + + tx['id'] = 'a' * 64 + validation = vote_obj.validate_tx(tx, 456, 10) + assert validation == (False, 456, 10) + + +def test_vote_accumulates_transactions(b): + from bigchaindb.pipelines import vote + + b.create_genesis_block() + vote_obj = vote.Vote() + + for _ in range(10): + tx = dummy_tx(b) + + validation = vote_obj.validate_tx(tx, 123, 1) + assert validation == (True, 123, 1) + + tx['id'] = 'a' * 64 + validation = vote_obj.validate_tx(tx, 456, 10) + assert validation == (False, 456, 10) + + +def test_valid_block_voting_sequential(b, monkeypatch): + from bigchaindb.pipelines import vote + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + genesis = b.create_genesis_block() + vote_obj = vote.Vote() + block = dummy_block(b) + + for tx, block_id, num_tx in vote_obj.ungroup(block): + last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx)) + + vote_obj.write_vote(last_vote) + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': True, + 'invalid_reason': None, + 'timestamp': '1'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True + + +def test_valid_block_voting_multiprocessing(b, monkeypatch): + from bigchaindb.pipelines import vote + + inpipe = Pipe() + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + genesis = b.create_genesis_block() + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + + block = dummy_block(b) + + inpipe.put(block) + vote_pipeline.start() + vote_out = outpipe.get() + vote_pipeline.terminate() + + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + assert vote_out['vote'] == vote_doc['vote'] + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': True, + 'invalid_reason': None, + 'timestamp': '1'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True + + +def test_valid_block_voting_with_create_transaction(b, monkeypatch): + from bigchaindb.pipelines import vote + + genesis = b.create_genesis_block() + + # create a `CREATE` transaction + test_user_priv, test_user_pub = crypto.generate_key_pair() + tx = b.create_transaction(b.me, test_user_pub, None, 'CREATE') + tx_signed = b.sign_transaction(tx, b.me_private) + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + block = b.create_block([tx_signed]) + + inpipe = Pipe() + outpipe = Pipe() + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + + inpipe.put(block) + vote_pipeline.start() + vote_out = outpipe.get() + vote_pipeline.terminate() + + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + assert vote_out['vote'] == vote_doc['vote'] + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': True, + 'invalid_reason': None, + 'timestamp': '1'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True + +def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): + from bigchaindb.pipelines import vote + + genesis = b.create_genesis_block() + + # create a `CREATE` transaction + test_user_priv, test_user_pub = crypto.generate_key_pair() + tx = b.create_transaction(b.me, test_user_pub, None, 'CREATE') + tx_signed = b.sign_transaction(tx, b.me_private) + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + block = b.create_block([tx_signed]) + b.write_block(block, durability='hard') + + # create a `TRANSFER` transaction + test_user2_priv, test_user2_pub = crypto.generate_key_pair() + tx2 = b.create_transaction(test_user_pub, test_user2_pub, + {'txid': tx['id'], 'cid': 0}, 'TRANSFER') + tx2_signed = b.sign_transaction(tx2, test_user_priv) + + monkeypatch.setattr(util, 'timestamp', lambda: '2') + block2 = b.create_block([tx2_signed]) + b.write_block(block2, durability='hard') + + inpipe = Pipe() + outpipe = Pipe() + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + + inpipe.put(block) + inpipe.put(block2) + vote_pipeline.start() + vote_out = outpipe.get() + vote2_out = outpipe.get() + vote_pipeline.terminate() + + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + assert vote_out['vote'] == vote_doc['vote'] + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': True, + 'invalid_reason': None, + 'timestamp': '2'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True + + + vote2_rs = r.table('votes').get_all([block2['id'], b.me], + index='block_and_voter').run(b.conn) + vote2_doc = vote2_rs.next() + assert vote2_out['vote'] == vote2_doc['vote'] + assert vote2_doc['vote'] == {'voting_for_block': block2['id'], + 'previous_block': block['id'], + 'is_block_valid': True, + 'invalid_reason': None, + 'timestamp': '2'} + + assert vote2_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote2_doc['vote']), + vote2_doc['signature']) is True + + +def test_invalid_block_voting(monkeypatch, b, user_vk): + from bigchaindb.pipelines import vote + + inpipe = Pipe() + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + genesis = b.create_genesis_block() + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + + block = dummy_block(b) + block['block']['transactions'][0]['id'] = 'abc' + + inpipe.put(block) + vote_pipeline.start() + vote_out = outpipe.get() + vote_pipeline.terminate() + + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + assert vote_out['vote'] == vote_doc['vote'] + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': False, + 'invalid_reason': None, + 'timestamp': '1'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True From cdcb6b4e1ecd258a2ab8f9cbf80448b48cad471d Mon Sep 17 00:00:00 2001 From: vrde Date: Mon, 1 Aug 2016 22:55:17 +0200 Subject: [PATCH 03/39] Remove old code --- bigchaindb/processes.py | 16 ++++------------ tests/pipelines/test_vote.py | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index fe641750..80ed43ad 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -4,9 +4,9 @@ import multiprocessing as mp import rethinkdb as r import bigchaindb -from bigchaindb.pipelines import block +from bigchaindb.pipelines import block, vote from bigchaindb import Bigchain -from bigchaindb.voter import Voter, Election +from bigchaindb.voter import Election from bigchaindb.block import BlockDeleteRevert from bigchaindb.web import server @@ -30,7 +30,6 @@ class Processes(object): def __init__(self): # initialize the class - self.q_new_block = mp.Queue() self.q_block_new_vote = mp.Queue() self.q_revert_delete = mp.Queue() @@ -43,12 +42,8 @@ class Processes(object): for change in r.table('bigchain').changes().run(b.conn): - # insert - if change['old_val'] is None: - self.q_new_block.put(change['new_val']) - # delete - elif change['new_val'] is None: + if change['new_val'] is None: # this should never happen in regular operation self.q_revert_delete.put(change['old_val']) @@ -69,21 +64,18 @@ class Processes(object): # initialize the processes p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain) p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start) - p_voter = Voter(self.q_new_block) p_election = Election(self.q_block_new_vote) # start the processes logger.info('starting bigchain mapper') p_map_bigchain.start() - logger.info('starting backlog mapper') logger.info('starting block') block.start() p_block_delete_revert.start() logger.info('starting voter') - p_voter.start() + vote.start() logger.info('starting election') p_election.start() # start message - p_voter.initialized.wait() logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index e369b2db..e2d8e596 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -1,6 +1,6 @@ -import pytest +from pytest import patch import rethinkdb as r -from multipipes import Pipe +from multipipes import Pipe, Pipeline from bigchaindb import util from bigchaindb import crypto @@ -208,9 +208,8 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), vote_doc['signature']) is True - vote2_rs = r.table('votes').get_all([block2['id'], b.me], - index='block_and_voter').run(b.conn) + index='block_and_voter').run(b.conn) vote2_doc = vote2_rs.next() assert vote2_out['vote'] == vote2_doc['vote'] assert vote2_doc['vote'] == {'voting_for_block': block2['id'], @@ -256,3 +255,14 @@ def test_invalid_block_voting(monkeypatch, b, user_vk): assert vote_doc['node_pubkey'] == b.me assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), vote_doc['signature']) is True + + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `block.start` is just a wrapper around `vote.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + from bigchaindb.pipelines import vote + + vote.start() + mock_start.assert_called_with() From 54984364b1fe8c6de9c22ba2d11c653a1766b5e9 Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 2 Aug 2016 11:53:46 +0200 Subject: [PATCH 04/39] Fix stoopid error in tests --- tests/pipelines/test_vote.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index e2d8e596..43b3abf5 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -1,4 +1,4 @@ -from pytest import patch +from unittest.mock import patch import rethinkdb as r from multipipes import Pipe, Pipeline @@ -258,11 +258,13 @@ def test_invalid_block_voting(monkeypatch, b, user_vk): @patch.object(Pipeline, 'start') -def test_start(mock_start): +def test_start(mock_start, b): # TODO: `block.start` is just a wrapper around `vote.create_pipeline`, # that is tested by `test_full_pipeline`. # If anyone has better ideas on how to test this, please do a PR :) from bigchaindb.pipelines import vote + genesis = b.create_genesis_block() + vote.start() mock_start.assert_called_with() From 17c11e593542627628bbfe3def94f79f2a439ca1 Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 2 Aug 2016 14:27:25 +0200 Subject: [PATCH 05/39] Add docstrings --- bigchaindb/pipelines/block.py | 1 + bigchaindb/pipelines/vote.py | 60 +++++++++++++++++++++++++++++++++++ tests/pipelines/test_vote.py | 2 +- 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index a73054a7..59375c57 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -152,6 +152,7 @@ def create_pipeline(): def start(): """Create, start, and return the block pipeline.""" + pipeline = create_pipeline() pipeline.setup(indata=get_changefeed()) pipeline.start() diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 633f9e2f..4a0afbf7 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -1,3 +1,10 @@ +"""This module takes care of all the logic related to block voting. + +The logic is encapsulated in the ``Vote`` class, while the sequence +of actions to do on transactions is specified in the ``create_pipeline`` +function. +""" + from collections import Counter from multipipes import Pipeline, Node @@ -7,8 +14,15 @@ from bigchaindb import Bigchain class Vote: + """This class encapsulates the logic to vote on blocks. + + Note: + Methods of this class will be executed in different processes. + """ def __init__(self): + """Initialize the Block voter.""" + last_voted = Bigchain().get_last_voted_block() self.bigchain = Bigchain() @@ -18,14 +32,46 @@ class Vote: self.validity = {} def ungroup(self, block): + """Given a block, ungroup the transactions in it. + + Args: + block (dict): the block to process + + Returns: + An iterator that yields a transaction, block id, and the total + number of transactions contained in the block. + """ + num_tx = len(block['block']['transactions']) for tx in block['block']['transactions']: yield tx, block['id'], num_tx def validate_tx(self, tx, block_id, num_tx): + """Validate a transaction. + + Args: + tx (dict): the transaction to validate + block_id (str): the id of block containing the transaction + num_tx (int): the total number of transactions to process + + Returns: + Three values are returned, the validity of the transaction, + ``block_id``, ``num_tx``. + """ return bool(self.bigchain.is_valid_transaction(tx)), block_id, num_tx def vote(self, tx_validity, block_id, num_tx): + """Collect the validity of transactions and cast a vote when ready. + + Args: + tx_validity (bool): the validity of the transaction + block_id (str): the id of block containing the transaction + num_tx (int): the total number of transactions to process + + Returns: + None, or a vote if a decision has been reached. + """ + self.counters[block_id] += 1 self.validity[block_id] = tx_validity and self.validity.get(block_id, True) @@ -40,21 +86,33 @@ class Vote: return vote def write_vote(self, vote): + """Write vote to the database. + + Args: + vote: the vote to write. + """ + self.bigchain.write_vote(vote) return vote def initial(): + """Return unvoted blocks.""" b = Bigchain() initial = b.get_unvoted_blocks() return initial def get_changefeed(): + """Create and return the changefeed for the bigchain table.""" + return ChangeFeed('bigchain', 'insert', prefeed=initial()) def create_pipeline(): + """Create and return the pipeline of operations to be distributed + on different processes.""" + voter = Vote() vote_pipeline = Pipeline([ @@ -68,6 +126,8 @@ def create_pipeline(): def start(): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline() pipeline.setup(indata=get_changefeed()) pipeline.start() diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 43b3abf5..08732916 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -264,7 +264,7 @@ def test_start(mock_start, b): # If anyone has better ideas on how to test this, please do a PR :) from bigchaindb.pipelines import vote - genesis = b.create_genesis_block() + b.create_genesis_block() vote.start() mock_start.assert_called_with() From 074cae4484ac7244c7f5bde8aafac00fddf5fb2b Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 2 Aug 2016 15:39:08 +0200 Subject: [PATCH 06/39] Add last tests --- bigchaindb/voter.py | 192 ---------------------------------- tests/db/test_bigchain_api.py | 2 - tests/db/test_voter.py | 37 +------ tests/pipelines/test_vote.py | 120 +++++++++++++++++++++ 4 files changed, 121 insertions(+), 230 deletions(-) diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index 485a0738..2cef51bf 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -1,204 +1,12 @@ import logging import multiprocessing as mp -import ctypes from bigchaindb import Bigchain -from bigchaindb.monitor import Monitor logger = logging.getLogger(__name__) -class BlockStream(object): - """ - Combine the stream of new blocks coming from the changefeed with the list of unvoted blocks. - - This is a utility class that abstracts the source of data for the `Voter`. - """ - - def __init__(self, new_blocks): - """ - Create a new BlockStream instance. - - Args: - new_block (queue): a queue of new blocks - """ - - b = Bigchain() - self.new_blocks = new_blocks - # TODO: there might be duplicate blocks since we *first* get the changefeed and only *then* we query the - # database to get the old blocks. - - # TODO how about a one liner, something like: - # self.unvoted_blocks = b.get_unvoted_blocks() if not b.nodes_except_me else [] - self.unvoted_blocks = [] - if not b.nodes_except_me: - self.unvoted_blocks = b.get_unvoted_blocks() - - def get(self): - """ - Return the next block to be processed. - """ - try: - # FIXME: apparently RethinkDB returns a list instead of a cursor when using `order_by`. - # We might change the `pop` in the future, when the driver will return a cursor. - # We have a test for this, so if the driver implementation changes we will get a failure: - # - tests/test_voter.py::TestBlockStream::test_if_old_blocks_get_should_return_old_block_first - return self.unvoted_blocks.pop(0) - except IndexError: - return self.new_blocks.get() - - -class Voter(object): - - def __init__(self, q_new_block): - """ - Initialize the class with the needed queues. - - Initialize with a queue where new blocks added to the bigchain will be put - """ - - self.monitor = Monitor() - - self.q_new_block = q_new_block - self.q_blocks_to_validate = mp.Queue() - self.q_validated_block = mp.Queue() - self.q_voted_block = mp.Queue() - self.v_previous_block_id = mp.Value(ctypes.c_char_p) - self.initialized = mp.Event() - - def feed_blocks(self): - """ - Prepare the queue with blocks to validate - """ - - block_stream = BlockStream(self.q_new_block) - while True: - # poison pill - block = block_stream.get() - if block == 'stop': - self.q_blocks_to_validate.put('stop') - return - - self.q_blocks_to_validate.put(block) - - def validate(self): - """ - Checks if incoming blocks are valid or not - """ - - # create a bigchain instance. All processes should create their own bigchcain instance so that they all - # have their own connection to the database - b = Bigchain() - - logger.info('voter waiting for new blocks') - # signal initialization complete - self.initialized.set() - - while True: - new_block = self.q_blocks_to_validate.get() - - # poison pill - if new_block == 'stop': - self.q_validated_block.put('stop') - return - - logger.info('new_block arrived to voter') - - with self.monitor.timer('validate_block'): - # FIXME: the following check is done also in `is_valid_block`, - # but validity can be true even if the block has already - # a vote. - if b.has_previous_vote(new_block): - continue - validity = b.is_valid_block(new_block) - - self.q_validated_block.put((new_block, - self.v_previous_block_id.value.decode(), - validity)) - - self.v_previous_block_id.value = new_block['id'].encode() - - def vote(self): - """ - Votes on the block based on the decision of the validation - """ - - # create a bigchain instance - b = Bigchain() - - while True: - elem = self.q_validated_block.get() - - # poison pill - if elem == 'stop': - self.q_voted_block.put('stop') - return - - validated_block, previous_block_id, decision = elem - vote = b.vote(validated_block['id'], previous_block_id, decision) - self.q_voted_block.put((validated_block, vote)) - - def update_block(self): - """ - Appends the vote in the bigchain table - """ - - # create a bigchain instance - b = Bigchain() - - while True: - elem = self.q_voted_block.get() - - # poison pill - if elem == 'stop': - logger.info('clean exit') - return - - block, vote = elem - pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid' - logger.info('voting %s for block %s', pretty_vote, block['id']) - b.write_vote(vote) - - def bootstrap(self): - """ - Before starting handling the new blocks received by the changefeed we need to handle unvoted blocks - added to the bigchain while the process was down - - We also need to set the previous_block_id. - """ - - b = Bigchain() - last_voted = b.get_last_voted_block() - - self.v_previous_block_id.value = last_voted['id'].encode() - - def kill(self): - """ - Terminate processes - """ - self.q_new_block.put('stop') - - def start(self): - """ - Initialize, spawn, and start the processes - """ - - self.bootstrap() - - # initialize the processes - p_feed_blocks = mp.Process(name='block_feeder', target=self.feed_blocks) - p_validate = mp.Process(name='block_validator', target=self.validate) - p_vote = mp.Process(name='block_voter', target=self.vote) - p_update = mp.Process(name='block_updater', target=self.update_block) - - # start the processes - p_feed_blocks.start() - p_validate.start() - p_vote.start() - p_update.start() - - class Election(object): def __init__(self, q_block_new_vote): diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7a75371b..d279cc9a 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -1,6 +1,5 @@ import copy import multiprocessing as mp -import random import time import pytest @@ -9,7 +8,6 @@ import cryptoconditions as cc import bigchaindb from bigchaindb import crypto, exceptions, util -from bigchaindb.voter import Voter from bigchaindb.block import BlockDeleteRevert diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index 9627d12b..fb6cb495 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -5,7 +5,7 @@ import multiprocessing as mp from bigchaindb import util -from bigchaindb.voter import Voter, Election, BlockStream +from bigchaindb.voter import Election, BlockStream from bigchaindb import crypto, Bigchain @@ -308,38 +308,3 @@ class TestBlockElection(object): # tx2 was in an invalid block and SHOULD be in the backlog assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id'] - -class TestBlockStream(object): - - def test_if_federation_size_is_greater_than_one_ignore_past_blocks(self, b): - for _ in range(5): - b.nodes_except_me.append(crypto.generate_key_pair()[1]) - new_blocks = mp.Queue() - bs = BlockStream(new_blocks) - block_1 = dummy_block() - new_blocks.put(block_1) - assert block_1 == bs.get() - - def test_if_no_old_blocks_get_should_return_new_blocks(self, b): - new_blocks = mp.Queue() - bs = BlockStream(new_blocks) - - # create two blocks - block_1 = dummy_block() - block_2 = dummy_block() - - # write the blocks - b.write_block(block_1, durability='hard') - b.write_block(block_2, durability='hard') - - # simulate a changefeed - new_blocks.put(block_1) - new_blocks.put(block_2) - - # and check if we get exactly these two blocks - assert bs.get() == block_1 - assert bs.get() == block_2 - - @pytest.mark.skipif(reason='We may have duplicated blocks when retrieving the BlockStream') - def test_ignore_duplicated_blocks_when_retrieving_the_blockstream(self): - pass diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 08732916..42960c52 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -157,6 +157,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch): assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), vote_doc['signature']) is True + def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): from bigchaindb.pipelines import vote @@ -257,6 +258,125 @@ def test_invalid_block_voting(monkeypatch, b, user_vk): vote_doc['signature']) is True +def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): + from bigchaindb.pipelines import vote + + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + # insert blocks in the database while the voter process is not listening + # (these blocks won't appear in the changefeed) + block_1 = dummy_block(b) + b.write_block(block_1, durability='hard') + block_2 = dummy_block(b) + b.write_block(block_2, durability='hard') + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.start() + + # We expects two votes, so instead of waiting an arbitrary amount + # of time, we can do two blocking calls to `get` + outpipe.get() + outpipe.get() + + # create a new block that will appear in the changefeed + block_3 = dummy_block(b) + b.write_block(block_3, durability='hard') + + # Same as before with the two `get`s + outpipe.get() + + vote_pipeline.terminate() + + # retrive blocks from bigchain + blocks = list(r.table('bigchain') + .order_by(r.asc((r.row['block']['timestamp']))) + .run(b.conn)) + + # FIXME: remove genesis block, we don't vote on it (might change in the future) + blocks.pop(0) + vote_pipeline.terminate() + + # retrieve vote + votes = r.table('votes').run(b.conn) + votes = list(votes) + + assert all(vote['node_pubkey'] == b.me for vote in votes) + + +def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): + from bigchaindb.pipelines import vote + + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + monkeypatch.setattr(util, 'timestamp', lambda: '2') + block_1 = dummy_block(b) + b.write_block(block_1, durability='hard') + + monkeypatch.setattr(util, 'timestamp', lambda: '3') + block_2 = dummy_block(b) + b.write_block(block_2, durability='hard') + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.start() + + # We expects two votes, so instead of waiting an arbitrary amount + # of time, we can do two blocking calls to `get` + outpipe.get() + outpipe.get() + vote_pipeline.terminate() + + # retrive blocks from bigchain + blocks = list(r.table('bigchain') + .order_by(r.asc((r.row['block']['timestamp']))) + .run(b.conn)) + + # retrieve votes + votes = list(r.table('votes').run(b.conn)) + + assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) + assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) + + +def test_voter_checks_for_previous_vote(monkeypatch, b): + from bigchaindb.pipelines import vote + + inpipe = Pipe() + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + b.create_genesis_block() + + block_1 = dummy_block(b) + inpipe.put(block_1) + + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + vote_pipeline.start() + + # wait for the result + outpipe.get() + + retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + # queue block for voting AGAIN + inpipe.put(retrieved_block) + + re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + + vote_pipeline.terminate() + + # block should be unchanged + assert retrieved_block == re_retrieved_block + + @patch.object(Pipeline, 'start') def test_start(mock_start, b): # TODO: `block.start` is just a wrapper around `vote.create_pipeline`, From 79980c08a6cb081de03cc03213d02d43a3e05f8a Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 2 Aug 2016 16:04:08 +0200 Subject: [PATCH 07/39] Port final tests --- bigchaindb/pipelines/vote.py | 8 ++- tests/db/test_voter.py | 131 +---------------------------------- tests/pipelines/test_vote.py | 15 ++-- 3 files changed, 16 insertions(+), 138 deletions(-) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 4a0afbf7..d618f24b 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -38,10 +38,14 @@ class Vote: block (dict): the block to process Returns: - An iterator that yields a transaction, block id, and the total - number of transactions contained in the block. + ``None`` if the block has been already voted, an iterator that + yields a transaction, block id, and the total number of + transactions contained in the block otherwise. """ + if self.bigchain.has_previous_vote(block): + return + num_tx = len(block['block']['transactions']) for tx in block['block']['transactions']: yield tx, block['id'], num_tx diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index fb6cb495..b5749a36 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -1,11 +1,8 @@ -import pytest import time import rethinkdb as r import multiprocessing as mp -from bigchaindb import util - -from bigchaindb.voter import Election, BlockStream +from bigchaindb.voter import Election from bigchaindb import crypto, Bigchain @@ -23,132 +20,6 @@ def dummy_block(): return block -class TestBigchainVoter(object): - - def test_vote_creation_valid(self, b): - # create valid block - block = dummy_block() - # retrieve vote - vote = b.vote(block['id'], 'abc', True) - - # assert vote is correct - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['previous_block'] == 'abc' - assert vote['vote']['is_block_valid'] is True - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - - def test_vote_creation_invalid(self, b): - # create valid block - block = dummy_block() - # retrieve vote - vote = b.vote(block['id'], 'abc', False) - - # assert vote is correct - assert vote['vote']['voting_for_block'] == block['id'] - assert vote['vote']['previous_block'] == 'abc' - assert vote['vote']['is_block_valid'] is False - assert vote['vote']['invalid_reason'] is None - assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True - - def test_voter_considers_unvoted_blocks_when_single_node(self, b): - # simulate a voter going donw in a single node environment - b.create_genesis_block() - - # insert blocks in the database while the voter process is not listening - # (these blocks won't appear in the changefeed) - block_1 = dummy_block() - b.write_block(block_1, durability='hard') - block_2 = dummy_block() - b.write_block(block_2, durability='hard') - - # voter is back online, we simulate that by creating a queue and a Voter instance - q_new_block = mp.Queue() - voter = Voter(q_new_block) - - # vote - voter.start() - time.sleep(1) - - # create a new block that will appear in the changefeed - block_3 = dummy_block() - b.write_block(block_3, durability='hard') - - time.sleep(1) - voter.kill() - - # retrive blocks from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # FIXME: remove genesis block, we don't vote on it (might change in the future) - blocks.pop(0) - - # retrieve vote - votes = r.table('votes').run(b.conn) - votes = list(votes) - - assert all(vote['node_pubkey'] == b.me for vote in votes) - - def test_voter_chains_blocks_with_the_previous_ones(self, b): - b.create_genesis_block() - # sleep so that `block_*` as a higher timestamp then `genesis` - time.sleep(1) - block_1 = dummy_block() - b.write_block(block_1, durability='hard') - time.sleep(1) - block_2 = dummy_block() - b.write_block(block_2, durability='hard') - - q_new_block = mp.Queue() - - voter = Voter(q_new_block) - voter.start() - time.sleep(1) - voter.kill() - - # retrive blocks from bigchain - blocks = list(r.table('bigchain') - .order_by(r.asc((r.row['block']['timestamp']))) - .run(b.conn)) - - # retrieve votes - votes = list(r.table('votes').run(b.conn)) - - assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) - assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id']) - - def test_voter_checks_for_previous_vote(self, b): - b.create_genesis_block() - block_1 = dummy_block() - b.write_block(block_1, durability='hard') - - q_new_block = mp.Queue() - - voter = Voter(q_new_block) - voter.start() - - time.sleep(1) - retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) - - # queue block for voting AGAIN - q_new_block.put(retrieved_block) - time.sleep(1) - voter.kill() - - re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) - - # block should be unchanged - assert retrieved_block == re_retrieved_block - - @pytest.mark.skipif(reason='Updating the block_number must be atomic') - def test_updating_block_number_must_be_atomic(self): - pass - - class TestBlockElection(object): def test_quorum(self, b): diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 42960c52..b9c55779 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -357,6 +357,8 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): block_1 = dummy_block(b) inpipe.put(block_1) + assert r.table('votes').count().run(b.conn) == 0 + vote_pipeline = vote.create_pipeline() vote_pipeline.setup(indata=inpipe, outdata=outpipe) vote_pipeline.start() @@ -364,17 +366,18 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): # wait for the result outpipe.get() - retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) - # queue block for voting AGAIN - inpipe.put(retrieved_block) + inpipe.put(block_1) - re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn) + # queue another block + inpipe.put(dummy_block(b)) + + # wait for the result of the new block + outpipe.get() vote_pipeline.terminate() - # block should be unchanged - assert retrieved_block == re_retrieved_block + assert r.table('votes').count().run(b.conn) == 2 @patch.object(Pipeline, 'start') From 13bd69f9771580ce3f7d42466bc4998ba240459f Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 2 Aug 2016 17:28:22 +0200 Subject: [PATCH 08/39] Move tests to new file --- tests/pipelines/test_vote.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index b9c55779..daa4c644 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -16,6 +16,33 @@ def dummy_block(b): block = b.create_block([dummy_tx(b) for _ in range(10)]) return block +def test_vote_creation_valid(b): + # create valid block + block = dummy_block() + # retrieve vote + vote = b.vote(block['id'], 'abc', True) + + # assert vote is correct + assert vote['vote']['voting_for_block'] == block['id'] + assert vote['vote']['previous_block'] == 'abc' + assert vote['vote']['is_block_valid'] is True + assert vote['vote']['invalid_reason'] is None + assert vote['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True + +def test_vote_creation_invalid(b): + # create valid block + block = dummy_block() + # retrieve vote + vote = b.vote(block['id'], 'abc', False) + + # assert vote is correct + assert vote['vote']['voting_for_block'] == block['id'] + assert vote['vote']['previous_block'] == 'abc' + assert vote['vote']['is_block_valid'] is False + assert vote['vote']['invalid_reason'] is None + assert vote['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True def test_vote_ungroup_returns_a_set_of_results(b): from bigchaindb.pipelines import vote From 3ad72077d3b9f7fe97e1d077359d5123efeb6f3e Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 3 Aug 2016 15:55:24 +0200 Subject: [PATCH 09/39] Fix failing tests --- tests/pipelines/test_vote.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index daa4c644..783e5373 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -16,9 +16,10 @@ def dummy_block(b): block = b.create_block([dummy_tx(b) for _ in range(10)]) return block + def test_vote_creation_valid(b): # create valid block - block = dummy_block() + block = dummy_block(b) # retrieve vote vote = b.vote(block['id'], 'abc', True) @@ -28,11 +29,13 @@ def test_vote_creation_valid(b): assert vote['vote']['is_block_valid'] is True assert vote['vote']['invalid_reason'] is None assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), + vote['signature']) is True + def test_vote_creation_invalid(b): # create valid block - block = dummy_block() + block = dummy_block(b) # retrieve vote vote = b.vote(block['id'], 'abc', False) @@ -42,7 +45,9 @@ def test_vote_creation_invalid(b): assert vote['vote']['is_block_valid'] is False assert vote['vote']['invalid_reason'] is None assert vote['node_pubkey'] == b.me - assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), vote['signature']) is True + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote['vote']), + vote['signature']) is True + def test_vote_ungroup_returns_a_set_of_results(b): from bigchaindb.pipelines import vote @@ -323,7 +328,8 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): .order_by(r.asc((r.row['block']['timestamp']))) .run(b.conn)) - # FIXME: remove genesis block, we don't vote on it (might change in the future) + # FIXME: remove genesis block, we don't vote on it + # (might change in the future) blocks.pop(0) vote_pipeline.terminate() From 25d10957cada656772df5ddeca73fd2efb327c5a Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 5 Aug 2016 11:58:07 +0200 Subject: [PATCH 10/39] Validate block metadata --- bigchaindb/pipelines/vote.py | 53 ++++++++++++++++++++++++++------ tests/pipelines/test_vote.py | 59 ++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 12 deletions(-) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index d618f24b..2dc259ed 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -9,10 +9,21 @@ from collections import Counter from multipipes import Pipeline, Node +from bigchaindb import config_utils, exceptions from bigchaindb.pipelines.utils import ChangeFeed from bigchaindb import Bigchain +def create_invalid_tx(): + """Create and return an invalid transaction. + + The transaction is invalid because it's missing the signature.""" + + b = Bigchain() + tx = b.create_transaction(b.me, b.me, None, 'CREATE') + return tx + + class Vote: """This class encapsulates the logic to vote on blocks. @@ -23,15 +34,34 @@ class Vote: def __init__(self): """Initialize the Block voter.""" + # Since cannot share a connection to RethinkDB using multiprocessing, + # we need to create a temporary instance of BigchainDB that we use + # only to query RethinkDB last_voted = Bigchain().get_last_voted_block() + self.consensus = config_utils.load_consensus_plugin() + # This is the Bigchain instance that will be "shared" (aka: copied) + # by all the subprocesses self.bigchain = Bigchain() self.last_voted_id = last_voted['id'] self.counters = Counter() self.validity = {} - def ungroup(self, block): + self.invalid_dummy_tx = create_invalid_tx() + + def validate_block(self, block): + if not self.bigchain.has_previous_vote(block): + try: + self.consensus.validate_block(self.bigchain, block) + valid = True + except (exceptions.InvalidHash, + exceptions.OperationError, + exceptions.InvalidSignature) as e: + valid = False + return block, valid + + def ungroup(self, block, valid): """Given a block, ungroup the transactions in it. Args: @@ -43,12 +73,16 @@ class Vote: transactions contained in the block otherwise. """ - if self.bigchain.has_previous_vote(block): - return - - num_tx = len(block['block']['transactions']) - for tx in block['block']['transactions']: - yield tx, block['id'], num_tx + # XXX: if a block is invalid we should skip the `validate_tx` step, + # but since we are in a pipeline we cannot just jump to another + # function. Hackish solution: generate an invalid transaction + # and propagate it to the next steps of the pipeline + if valid: + num_tx = len(block['block']['transactions']) + for tx in block['block']['transactions']: + yield tx, block['id'], num_tx + else: + yield self.invalid_dummy_tx, block['id'], 1 def validate_tx(self, tx, block_id, num_tx): """Validate a transaction. @@ -103,8 +137,8 @@ class Vote: def initial(): """Return unvoted blocks.""" b = Bigchain() - initial = b.get_unvoted_blocks() - return initial + rs = b.get_unvoted_blocks() + return rs def get_changefeed(): @@ -120,6 +154,7 @@ def create_pipeline(): voter = Vote() vote_pipeline = Pipeline([ + Node(voter.validate_block), Node(voter.ungroup), Node(voter.validate_tx, fraction_of_cores=1), Node(voter.vote), diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 783e5373..d3d4fa2e 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -55,11 +55,30 @@ def test_vote_ungroup_returns_a_set_of_results(b): b.create_genesis_block() block = dummy_block(b) vote_obj = vote.Vote() - txs = list(vote_obj.ungroup(block)) + txs = list(vote_obj.ungroup(block, True)) assert len(txs) == 10 +def test_vote_validate_block(b): + from bigchaindb.pipelines import vote + + b.create_genesis_block() + tx = dummy_tx(b) + block = b.create_block([tx]) + + vote_obj = vote.Vote() + validation = vote_obj.validate_block(block) + assert validation == (block, True) + + block = b.create_block([tx]) + block['block']['id'] = 'this-is-not-a-valid-hash' + + vote_obj = vote.Vote() + validation = vote_obj.validate_block(block) + assert validation == (block, False) + + def test_vote_validate_transaction(b): from bigchaindb.pipelines import vote @@ -99,7 +118,7 @@ def test_valid_block_voting_sequential(b, monkeypatch): vote_obj = vote.Vote() block = dummy_block(b) - for tx, block_id, num_tx in vote_obj.ungroup(block): + for tx, block_id, num_tx in vote_obj.ungroup(block, True): last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx)) vote_obj.write_vote(last_vote) @@ -256,7 +275,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): vote2_doc['signature']) is True -def test_invalid_block_voting(monkeypatch, b, user_vk): +def test_invalid_tx_in_block_voting(monkeypatch, b, user_vk): from bigchaindb.pipelines import vote inpipe = Pipe() @@ -290,6 +309,40 @@ def test_invalid_block_voting(monkeypatch, b, user_vk): vote_doc['signature']) is True +def test_invalid_block_voting(monkeypatch, b, user_vk): + from bigchaindb.pipelines import vote + + inpipe = Pipe() + outpipe = Pipe() + + monkeypatch.setattr(util, 'timestamp', lambda: '1') + genesis = b.create_genesis_block() + vote_pipeline = vote.create_pipeline() + vote_pipeline.setup(indata=inpipe, outdata=outpipe) + + block = dummy_block(b) + block['block']['id'] = 'this-is-not-a-valid-hash' + + inpipe.put(block) + vote_pipeline.start() + vote_out = outpipe.get() + vote_pipeline.terminate() + + vote_rs = r.table('votes').get_all([block['id'], b.me], + index='block_and_voter').run(b.conn) + vote_doc = vote_rs.next() + assert vote_out['vote'] == vote_doc['vote'] + assert vote_doc['vote'] == {'voting_for_block': block['id'], + 'previous_block': genesis['id'], + 'is_block_valid': False, + 'invalid_reason': None, + 'timestamp': '1'} + + assert vote_doc['node_pubkey'] == b.me + assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']), + vote_doc['signature']) is True + + def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): from bigchaindb.pipelines import vote From b379d1c549eab06fba9be165995cce9a657b4a65 Mon Sep 17 00:00:00 2001 From: troymc Date: Mon, 8 Aug 2016 14:42:25 +0200 Subject: [PATCH 11/39] docs: moved 'Nodes, Clusters & Federations' earlier --- docs/source/clusters-feds/index.rst | 1 - docs/source/index.rst | 3 ++- docs/source/{clusters-feds => }/node-cluster-fed.md | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename docs/source/{clusters-feds => }/node-cluster-fed.md (64%) diff --git a/docs/source/clusters-feds/index.rst b/docs/source/clusters-feds/index.rst index ab7571d8..433aa2cd 100644 --- a/docs/source/clusters-feds/index.rst +++ b/docs/source/clusters-feds/index.rst @@ -7,7 +7,6 @@ BigchainDB Clusters & Federations .. toctree:: :maxdepth: 1 - node-cluster-fed set-up-a-federation backup deploy-on-aws diff --git a/docs/source/index.rst b/docs/source/index.rst index f54767f5..f51daf07 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -9,9 +9,10 @@ Table of Contents introduction quickstart + node-cluster-fed nodes/index - clusters-feds/index drivers-clients/index + clusters-feds/index topic-guides/index release-notes appendices/index diff --git a/docs/source/clusters-feds/node-cluster-fed.md b/docs/source/node-cluster-fed.md similarity index 64% rename from docs/source/clusters-feds/node-cluster-fed.md rename to docs/source/node-cluster-fed.md index 2987d477..32359675 100644 --- a/docs/source/clusters-feds/node-cluster-fed.md +++ b/docs/source/node-cluster-fed.md @@ -1,6 +1,6 @@ # Nodes, Clusters & Federations -A **BigchainDB node** is a server or set of closely-linked servers running RethinkDB Server, BigchainDB Server, and other BigchainDB-related software. Each node is controlled by one person or organization. +A **BigchainDB node** is a server or set of closely-linked servers running RethinkDB Server, BigchainDB Server, and related software. Each node is controlled by one person or organization. A set of BigchainDB nodes can connect to each other to form a **cluster**. Each node in the cluster runs the same software. A cluster contains one logical RethinkDB datastore. A cluster may have additional servers to do things such as cluster monitoring. @@ -8,6 +8,6 @@ The people and organizations that run the nodes in a cluster belong to a **feder **What's the Difference Between a Cluster and a Federation?** -A cluster is just a bunch of connected nodes (computers). A cluster might be operated by just one person. A federation is an organization which has a cluster, and where each node in the cluster has a different operator. +A cluster is just a bunch of connected nodes. A federation is an organization which has a cluster, and where each node in the cluster has a different operator. Confusingly, we sometimes call a federation's cluster its "federation." You can probably tell what we mean from context. \ No newline at end of file From bfd8c29ae964d55d3af5926d330395474c5a2ec3 Mon Sep 17 00:00:00 2001 From: troymc Date: Mon, 8 Aug 2016 15:02:25 +0200 Subject: [PATCH 12/39] docs: Added new 'Node Components' page --- docs/source/nodes/index.rst | 1 + docs/source/nodes/node-components.md | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 docs/source/nodes/node-components.md diff --git a/docs/source/nodes/index.rst b/docs/source/nodes/index.rst index 7ae86d44..b5b34876 100644 --- a/docs/source/nodes/index.rst +++ b/docs/source/nodes/index.rst @@ -7,6 +7,7 @@ BigchainDB Nodes .. toctree:: :maxdepth: 1 + node-components node-requirements setup-run-node run-with-docker diff --git a/docs/source/nodes/node-components.md b/docs/source/nodes/node-components.md new file mode 100644 index 00000000..a7b432ad --- /dev/null +++ b/docs/source/nodes/node-components.md @@ -0,0 +1,17 @@ +# Node Components + +A BigchainDB node must include, at least: + +* BigchainDB Server and +* RethinkDB Server. + +When doing development and testing, it's common to install both on the same machine, but in a production environment, it may make more sense to install them on separate machines. + +In a production environment, a BigchainDB node can have several other components, including: + +* nginx or similar, as a reverse proxy and/or load balancer for the web server +* An NTP daemon running on all machines running BigchainDB code, and possibly other machines +* A RethinkDB proxy server +* Monitoring software, to monitor all the machines in the node +* Maybe more, e.g. a configuration management server and agents on all machines + From d53523189aa48f94f0cb9beca52bcb272072ff13 Mon Sep 17 00:00:00 2001 From: troymc Date: Mon, 8 Aug 2016 15:08:31 +0200 Subject: [PATCH 13/39] docs: Updated 'Node Requirements' --- docs/source/nodes/node-requirements.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/source/nodes/node-requirements.md b/docs/source/nodes/node-requirements.md index a68234b1..60df03df 100644 --- a/docs/source/nodes/node-requirements.md +++ b/docs/source/nodes/node-requirements.md @@ -1,14 +1,10 @@ # Node Requirements (OS, Memory, Storage, etc.) -For now, we will assume that a BigchainDB node is just one server. In the future, a node may consist of several closely-coupled servers run by one node operator (federation member). - - ## OS Requirements * RethinkDB Server [will run on any modern OS](https://www.rethinkdb.com/docs/install/). Note that the Fedora package isn't officially supported. Also, official support for Windows is fairly recent ([April 2016](https://rethinkdb.com/blog/2.3-release/)). -* Python 3.4+ [will run on any modern OS](https://docs.python.org/3.4/using/index.html). -* [Some functionality in the `multiprocessing` package doesn't work on OS X](https://docs.python.org/3.4/library/multiprocessing.html#multiprocessing.Queue.qsize). You can still use Mac OS X if you use Docker or a virtual machine. -* ZeroMQ [will run on any modern OS](http://zeromq.org/area:download). +* BigchainDB Server requires Python 3.4+ and Python 3.4+ [will run on any modern OS](https://docs.python.org/3.4/using/index.html). +* BigchaindB Server uses the Python `multiprocessing` package and [some functionality in the `multiprocessing` package doesn't work on OS X](https://docs.python.org/3.4/library/multiprocessing.html#multiprocessing.Queue.qsize). You can still use Mac OS X if you use Docker or a virtual machine. The BigchainDB core dev team uses Ubuntu 14.04 or Fedora 23. From d9321a83957bc08aca91c88fe658b46ec84ed785 Mon Sep 17 00:00:00 2001 From: troymc Date: Mon, 8 Aug 2016 15:26:27 +0200 Subject: [PATCH 14/39] docs: Moved 'Configuration Settings' to new 'Server Reference' section --- docs/source/appendices/firewall-notes.md | 2 +- docs/source/clusters-feds/backup.md | 2 +- docs/source/clusters-feds/deploy-on-aws.md | 2 +- docs/source/index.rst | 1 + docs/source/nodes/index.rst | 1 - docs/source/nodes/setup-run-node.md | 2 +- docs/source/{nodes => server-reference}/configuration.md | 4 +++- 7 files changed, 8 insertions(+), 6 deletions(-) rename docs/source/{nodes => server-reference}/configuration.md (94%) diff --git a/docs/source/appendices/firewall-notes.md b/docs/source/appendices/firewall-notes.md index 4f1e780c..dca89ac6 100644 --- a/docs/source/appendices/firewall-notes.md +++ b/docs/source/appendices/firewall-notes.md @@ -47,7 +47,7 @@ Port 8080 is the default port used by RethinkDB for its adminstrative web (HTTP) Port 9984 is the default port for the BigchainDB client-server HTTP API (TCP), which is served by Gunicorn HTTP Server. It's _possible_ allow port 9984 to accept inbound traffic from anyone, but we recommend against doing that. Instead, set up a reverse proxy server (e.g. using Nginx) and only allow traffic from there. Information about how to do that can be found [in the Gunicorn documentation](http://docs.gunicorn.org/en/stable/deploy.html). (They call it a proxy.) -If Gunicorn and the reverse proxy are running on the same server, then you'll have to tell Gunicorn to listen on some port other than 9984 (so that the reverse proxy can listen on port 9984). You can do that by setting `server.bind` to 'localhost:PORT' in the [BigchainDB Configuration Settings](../nodes/configuration.html), where PORT is whatever port you chose (e.g. 9983). +If Gunicorn and the reverse proxy are running on the same server, then you'll have to tell Gunicorn to listen on some port other than 9984 (so that the reverse proxy can listen on port 9984). You can do that by setting `server.bind` to 'localhost:PORT' in the [BigchainDB Configuration Settings](../server-reference/configuration.html), where PORT is whatever port you chose (e.g. 9983). You may want to have Gunicorn and the reverse proxy running on different servers, so that both can listen on port 9984. That would also help isolate the effects of a denial-of-service attack. diff --git a/docs/source/clusters-feds/backup.md b/docs/source/clusters-feds/backup.md index ff45ef47..e42a115b 100644 --- a/docs/source/clusters-feds/backup.md +++ b/docs/source/clusters-feds/backup.md @@ -39,7 +39,7 @@ rethinkdb dump -e bigchain.bigchain -e bigchain.votes ``` That should write a file named `rethinkdb_dump__