From 9f03b2b771d73af6b247e8d97b0092fa00467a59 Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Thu, 19 May 2016 14:03:52 +0200 Subject: [PATCH] add election class, process starts (#221) * add election class, process starts * remove else clause * resolve splitvote * move quorum check to core * remove comments * fixed vote counting * add test * poison pill * queue test * generalize election status * get correct part of block * clean shut down * test block liquidation * add vote signature checking * add tests * user_public_key to user_vk * python3 style floordivs * correct verdict for odd number of voters, and tests * remove whitespace * simplify verify_vote_signature * election checks pubkey is in block voters * tests for elector pubkey * count invalid votes as votes for invalid block * test update * test mismatched vote, block sigs * variable name change * comments on voting logic * comments on voting logic * remove unused queue * correct block model * imperative docstring * election status class constants * election tests use constants --- bigchaindb/consensus.py | 22 ++++++ bigchaindb/core.py | 32 +++++++++ bigchaindb/processes.py | 10 ++- bigchaindb/util.py | 24 +++++++ bigchaindb/voter.py | 65 +++++++++++++++++ tests/db/test_voter.py | 151 +++++++++++++++++++++++++++++++++++++++- 6 files changed, 299 insertions(+), 5 deletions(-) diff --git a/bigchaindb/consensus.py b/bigchaindb/consensus.py index 29cca142..c54fbc1e 100644 --- a/bigchaindb/consensus.py +++ b/bigchaindb/consensus.py @@ -89,6 +89,19 @@ class AbstractConsensusRules(metaclass=ABCMeta): and correct, False otherwise. """ + @abstractmethod + def verify_vote_signature(block, signed_vote): + """Verify a cast vote. + + Args: + block (dict): block under election + signed_vote (dict): signed vote to verify + + Returns: + bool: True if the votes's required signature data is present + and correct, False otherwise. + """ + raise NotImplementedError class BaseConsensusRules(AbstractConsensusRules): """Base consensus rules for Bigchain. @@ -223,3 +236,12 @@ class BaseConsensusRules(AbstractConsensusRules): """ return util.validate_fulfillments(signed_transaction) + + @staticmethod + def verify_vote_signature(block, signed_vote): + """Verify the signature of a vote. + + Refer to the documentation of ``bigchaindb.util.verify_signature`` + """ + + return util.verify_vote_signature(block, signed_vote) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index b7ed151a..9a433697 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -1,4 +1,6 @@ import random +import math +import operator import rethinkdb as r import rapidjson @@ -20,6 +22,10 @@ class Bigchain(object): Create, read, sign, write transactions to the database """ + BLOCK_INVALID = 'invalid' + BLOCK_VALID = 'valid' + BLOCK_UNDECIDED = 'undecided' + def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], consensus_plugin=None): @@ -487,3 +493,29 @@ class Bigchain(object): unvoted.pop(0) return unvoted + + def block_election_status(self, block): + """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" + + n_voters = len(block['block']['voters']) + vote_cast = [vote['vote']['is_block_valid'] for vote in block['votes']] + vote_validity = [self.consensus.verify_vote_signature(block, vote) for vote in block['votes']] + + # element-wise product of stated vote and validity of vote + vote_list = list(map(operator.mul, vote_cast, vote_validity)) + + # validate votes here + n_valid_votes = sum(vote_list) + n_invalid_votes = len(vote_list) - n_valid_votes + + # The use of ceiling and floor is to account for the case of an + # even number of voters where half the voters have voted 'invalid' + # and half 'valid'. In this case, the block should be marked invalid + # to avoid a tie. In the case of an odd number of voters this is not + # relevant, since one side must be a majority. + if n_invalid_votes >= math.ceil(n_voters / 2): + return Bigchain.BLOCK_INVALID + elif n_valid_votes > math.floor(n_voters / 2): + return Bigchain.BLOCK_VALID + else: + return Bigchain.BLOCK_UNDECIDED diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 20b8df8d..b51df516 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -5,7 +5,7 @@ import rethinkdb as r import bigchaindb from bigchaindb import Bigchain -from bigchaindb.voter import Voter +from bigchaindb.voter import Voter, Election from bigchaindb.block import Block from bigchaindb.web import server @@ -31,6 +31,7 @@ class Processes(object): # initialize the class self.q_new_block = mp.Queue() self.q_new_transaction = mp.Queue() + self.q_block_new_vote = mp.Queue() def map_backlog(self): # listen to changes on the backlog and redirect the changes @@ -70,9 +71,9 @@ class Processes(object): elif change['new_val'] is None: pass - # update + # update (new vote) elif change['new_val'] is not None and change['old_val'] is not None: - pass + self.q_block_new_vote.put(change['new_val']) def start(self): logger.info('Initializing BigchainDB...') @@ -90,6 +91,7 @@ class Processes(object): p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) p_block = mp.Process(name='block', target=block.start) p_voter = Voter(self.q_new_block) + p_election = Election(self.q_block_new_vote) # start the processes logger.info('starting bigchain mapper') @@ -101,6 +103,8 @@ class Processes(object): logger.info('starting voter') p_voter.start() + logger.info('starting election') + p_election.start() # start message block.initialized.wait() diff --git a/bigchaindb/util.py b/bigchaindb/util.py index 65362a17..5f664a40 100644 --- a/bigchaindb/util.py +++ b/bigchaindb/util.py @@ -545,6 +545,30 @@ def get_hash_data(transaction): return crypto.hash_data(serialize(tx)) +def verify_vote_signature(block, signed_vote): + """Verify the signature of a vote + + A valid vote should have been signed `current_owner` corresponding private key. + + Args: + block (dict): block under election + signed_vote (dict): a vote with the `signature` included. + + Returns: + bool: True if the signature is correct, False otherwise. + """ + + signature = signed_vote['signature'] + vk_base58 = signed_vote['node_pubkey'] + + # immediately return False if the voter is not in the block voter list + if vk_base58 not in block['block']['voters']: + return False + + public_key = crypto.VerifyingKey(vk_base58) + return public_key.verify(serialize(signed_vote['vote']), signature) + + def transform_create(tx): """Change the owner and signature for a ``CREATE`` transaction created by a node""" diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index 9a34c0d8..b1b3b618 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -196,3 +196,68 @@ class Voter(object): p_validate.start() p_vote.start() p_update.start() + + +class Election(object): + + def __init__(self, q_block_new_vote): + """ + Initialize the class with the needed queues. + + Initialize a queue where blocks with new votes will be held + """ + self.q_block_new_vote = q_block_new_vote + self.q_invalid_blocks = mp.Queue() + + def check_for_quorum(self): + """ + Checks if block has enough invalid votes to make a decision + """ + b = Bigchain() + + while True: + next_block = self.q_block_new_vote.get() + + # poison pill + if next_block == 'stop': + self.q_invalid_blocks.put('stop') + logger.info('clean exit') + return + + if b.block_election_status(next_block) == 'invalid': + self.q_invalid_blocks.put(next_block) + + def requeue_transactions(self): + """ + Liquidates transactions from invalid blocks so they can be processed again + """ + while True: + invalid_block = self.q_invalid_blocks.get() + + # poison pill + if invalid_block == 'stop': + logger.info('clean exit') + return + + b = Bigchain() + for tx in invalid_block['block']['transactions']: + b.write_transaction(tx) + + def kill(self): + """ + Terminate processes + """ + self.q_block_new_vote.put('stop') + + def start(self): + """ + Initialize, spawn, and start the processes + """ + + # initialize the processes + p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum) + p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions) + + # start the processes + p_quorum_check.start() + p_requeue_tx.start() diff --git a/tests/db/test_voter.py b/tests/db/test_voter.py index ce1b42e1..8ad397a1 100644 --- a/tests/db/test_voter.py +++ b/tests/db/test_voter.py @@ -5,8 +5,8 @@ import multiprocessing as mp from bigchaindb import util -from bigchaindb.voter import Voter, BlockStream -from bigchaindb import crypto +from bigchaindb.voter import Voter, Election, BlockStream +from bigchaindb import crypto, Bigchain class TestBigchainVoter(object): @@ -297,6 +297,153 @@ class TestBigchainVoter(object): pass +class TestBlockElection(object): + + def test_quorum(self, b): + # create a new block + test_block = b.create_block([]) + + # simulate a federation with four voters + key_pairs = [crypto.generate_key_pair() for _ in range(4)] + test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0]) + for key_pair in key_pairs] + + # dummy block with test federation public keys as voters + test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs] + + # fake "yes" votes + valid_vote = [member.vote(test_block, 'abc', True) + for member in test_federation] + + # fake "no" votes + invalid_vote = [member.vote(test_block, 'abc', False) + for member in test_federation] + + # fake "yes" votes with incorrect signatures + improperly_signed_valid_vote = [member.vote(test_block, 'abc', True) for + member in test_federation] + [vote['vote'].update(this_should_ruin_things='lol') + for vote in improperly_signed_valid_vote] + + # test unanimously valid block + test_block['votes'] = valid_vote + assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID + + # test partial quorum situations + test_block['votes'] = valid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED + # + test_block['votes'] = valid_vote[:3] + assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID + # + test_block['votes'] = invalid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID + + # test unanimously valid block with one improperly signed vote -- should still succeed + test_block['votes'] = valid_vote[:3] + improperly_signed_valid_vote[:1] + assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID + + # test unanimously valid block with two improperly signed votes -- should fail + test_block['votes'] = valid_vote[:2] + improperly_signed_valid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID + + # test block with minority invalid vote + test_block['votes'] = invalid_vote[:1] + valid_vote[:3] + assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID + + # test split vote + test_block['votes'] = invalid_vote[:2] + valid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID + + # test undecided + test_block['votes'] = valid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED + + # change signatures in block, should fail + test_block['block']['voters'][0] = 'abc' + test_block['block']['voters'][1] = 'abc' + test_block['votes'] = valid_vote + assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID + + def test_quorum_odd(self, b): + # test partial quorum situations for odd numbers of voters + # create a new block + test_block = b.create_block([]) + + # simulate a federation with four voters + key_pairs = [crypto.generate_key_pair() for _ in range(5)] + test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0]) + for key_pair in key_pairs] + + # dummy block with test federation public keys as voters + test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs] + + # fake "yes" votes + valid_vote = [member.vote(test_block, 'abc', True) + for member in test_federation] + + # fake "no" votes + invalid_vote = [member.vote(test_block, 'abc', False) + for member in test_federation] + + test_block['votes'] = valid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED + + test_block['votes'] = invalid_vote[:2] + assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED + + test_block['votes'] = valid_vote[:3] + assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID + + test_block['votes'] = invalid_vote[:3] + assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID + + def test_tx_rewritten_after_invalid(self, b, user_vk): + q_block_new_vote = mp.Queue() + + # create blocks with transactions + tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx2 = b.create_transaction(b.me, user_vk, None, 'CREATE') + test_block_1 = b.create_block([tx1]) + test_block_2 = b.create_block([tx2]) + + # simulate a federation with four voters + key_pairs = [crypto.generate_key_pair() for _ in range(4)] + test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0]) + for key_pair in key_pairs] + + # simulate a federation with four voters + test_block_1['block']['voters'] = [key_pair[1] for key_pair in key_pairs] + test_block_2['block']['voters'] = [key_pair[1] for key_pair in key_pairs] + + # votes for block one + vote_1 = [member.vote(test_block_1, 'abc', True) + for member in test_federation] + + # votes for block two + vote_2 = [member.vote(test_block_2, 'abc', True) for member in test_federation[:2]] + \ + [member.vote(test_block_2, 'abc', False) for member in test_federation[2:]] + + # construct valid block + test_block_1['votes'] = vote_1 + q_block_new_vote.put(test_block_1) + + # construct invalid block + test_block_2['votes'] = vote_2 + q_block_new_vote.put(test_block_2) + + election = Election(q_block_new_vote) + election.start() + time.sleep(1) + election.kill() + + # tx1 was in a valid block, and should not be in the backlog + assert r.table('backlog').get(tx1['id']).run(b.conn) is None + + # tx2 was in an invalid block and SHOULD be in the backlog + assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id'] + + class TestBlockStream(object): def test_if_federation_size_is_greater_than_one_ignore_past_blocks(self, b):