add election class, process starts (#221)

* add election class, process starts

* remove else clause

* resolve splitvote

* move quorum check to core

* remove comments

* fixed vote counting

* add test

* poison pill

* queue test

* generalize election status

* get correct part of block

* clean shut down

* test block liquidation

* add vote signature checking

* add tests

* user_public_key to user_vk

* python3 style floordivs

* correct verdict for odd number of voters, and tests

* remove whitespace

* simplify verify_vote_signature

* election checks pubkey is in block voters

* tests for elector pubkey

* count invalid votes as votes for invalid block

* test update

* test mismatched vote, block sigs

* variable name change

* comments on voting logic

* comments on voting logic

* remove unused queue

* correct block model

* imperative docstring

* election status class constants

* election tests use constants
This commit is contained in:
Ryan Henderson 2016-05-19 14:03:52 +02:00
parent b1101747c0
commit 9f03b2b771
6 changed files with 299 additions and 5 deletions

View File

@ -89,6 +89,19 @@ class AbstractConsensusRules(metaclass=ABCMeta):
and correct, False otherwise.
"""
@abstractmethod
def verify_vote_signature(block, signed_vote):
"""Verify a cast vote.
Args:
block (dict): block under election
signed_vote (dict): signed vote to verify
Returns:
bool: True if the votes's required signature data is present
and correct, False otherwise.
"""
raise NotImplementedError
class BaseConsensusRules(AbstractConsensusRules):
"""Base consensus rules for Bigchain.
@ -223,3 +236,12 @@ class BaseConsensusRules(AbstractConsensusRules):
"""
return util.validate_fulfillments(signed_transaction)
@staticmethod
def verify_vote_signature(block, signed_vote):
"""Verify the signature of a vote.
Refer to the documentation of ``bigchaindb.util.verify_signature``
"""
return util.verify_vote_signature(block, signed_vote)

View File

@ -1,4 +1,6 @@
import random
import math
import operator
import rethinkdb as r
import rapidjson
@ -20,6 +22,10 @@ class Bigchain(object):
Create, read, sign, write transactions to the database
"""
BLOCK_INVALID = 'invalid'
BLOCK_VALID = 'valid'
BLOCK_UNDECIDED = 'undecided'
def __init__(self, host=None, port=None, dbname=None,
public_key=None, private_key=None, keyring=[],
consensus_plugin=None):
@ -487,3 +493,29 @@ class Bigchain(object):
unvoted.pop(0)
return unvoted
def block_election_status(self, block):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
n_voters = len(block['block']['voters'])
vote_cast = [vote['vote']['is_block_valid'] for vote in block['votes']]
vote_validity = [self.consensus.verify_vote_signature(block, vote) for vote in block['votes']]
# element-wise product of stated vote and validity of vote
vote_list = list(map(operator.mul, vote_cast, vote_validity))
# validate votes here
n_valid_votes = sum(vote_list)
n_invalid_votes = len(vote_list) - n_valid_votes
# The use of ceiling and floor is to account for the case of an
# even number of voters where half the voters have voted 'invalid'
# and half 'valid'. In this case, the block should be marked invalid
# to avoid a tie. In the case of an odd number of voters this is not
# relevant, since one side must be a majority.
if n_invalid_votes >= math.ceil(n_voters / 2):
return Bigchain.BLOCK_INVALID
elif n_valid_votes > math.floor(n_voters / 2):
return Bigchain.BLOCK_VALID
else:
return Bigchain.BLOCK_UNDECIDED

View File

@ -5,7 +5,7 @@ import rethinkdb as r
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.voter import Voter
from bigchaindb.voter import Voter, Election
from bigchaindb.block import Block
from bigchaindb.web import server
@ -31,6 +31,7 @@ class Processes(object):
# initialize the class
self.q_new_block = mp.Queue()
self.q_new_transaction = mp.Queue()
self.q_block_new_vote = mp.Queue()
def map_backlog(self):
# listen to changes on the backlog and redirect the changes
@ -70,9 +71,9 @@ class Processes(object):
elif change['new_val'] is None:
pass
# update
# update (new vote)
elif change['new_val'] is not None and change['old_val'] is not None:
pass
self.q_block_new_vote.put(change['new_val'])
def start(self):
logger.info('Initializing BigchainDB...')
@ -90,6 +91,7 @@ class Processes(object):
p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog)
p_block = mp.Process(name='block', target=block.start)
p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote)
# start the processes
logger.info('starting bigchain mapper')
@ -101,6 +103,8 @@ class Processes(object):
logger.info('starting voter')
p_voter.start()
logger.info('starting election')
p_election.start()
# start message
block.initialized.wait()

View File

@ -545,6 +545,30 @@ def get_hash_data(transaction):
return crypto.hash_data(serialize(tx))
def verify_vote_signature(block, signed_vote):
"""Verify the signature of a vote
A valid vote should have been signed `current_owner` corresponding private key.
Args:
block (dict): block under election
signed_vote (dict): a vote with the `signature` included.
Returns:
bool: True if the signature is correct, False otherwise.
"""
signature = signed_vote['signature']
vk_base58 = signed_vote['node_pubkey']
# immediately return False if the voter is not in the block voter list
if vk_base58 not in block['block']['voters']:
return False
public_key = crypto.VerifyingKey(vk_base58)
return public_key.verify(serialize(signed_vote['vote']), signature)
def transform_create(tx):
"""Change the owner and signature for a ``CREATE`` transaction created by a node"""

View File

@ -196,3 +196,68 @@ class Voter(object):
p_validate.start()
p_vote.start()
p_update.start()
class Election(object):
def __init__(self, q_block_new_vote):
"""
Initialize the class with the needed queues.
Initialize a queue where blocks with new votes will be held
"""
self.q_block_new_vote = q_block_new_vote
self.q_invalid_blocks = mp.Queue()
def check_for_quorum(self):
"""
Checks if block has enough invalid votes to make a decision
"""
b = Bigchain()
while True:
next_block = self.q_block_new_vote.get()
# poison pill
if next_block == 'stop':
self.q_invalid_blocks.put('stop')
logger.info('clean exit')
return
if b.block_election_status(next_block) == 'invalid':
self.q_invalid_blocks.put(next_block)
def requeue_transactions(self):
"""
Liquidates transactions from invalid blocks so they can be processed again
"""
while True:
invalid_block = self.q_invalid_blocks.get()
# poison pill
if invalid_block == 'stop':
logger.info('clean exit')
return
b = Bigchain()
for tx in invalid_block['block']['transactions']:
b.write_transaction(tx)
def kill(self):
"""
Terminate processes
"""
self.q_block_new_vote.put('stop')
def start(self):
"""
Initialize, spawn, and start the processes
"""
# initialize the processes
p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum)
p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions)
# start the processes
p_quorum_check.start()
p_requeue_tx.start()

View File

@ -5,8 +5,8 @@ import multiprocessing as mp
from bigchaindb import util
from bigchaindb.voter import Voter, BlockStream
from bigchaindb import crypto
from bigchaindb.voter import Voter, Election, BlockStream
from bigchaindb import crypto, Bigchain
class TestBigchainVoter(object):
@ -297,6 +297,153 @@ class TestBigchainVoter(object):
pass
class TestBlockElection(object):
def test_quorum(self, b):
# create a new block
test_block = b.create_block([])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# dummy block with test federation public keys as voters
test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
# fake "yes" votes
valid_vote = [member.vote(test_block, 'abc', True)
for member in test_federation]
# fake "no" votes
invalid_vote = [member.vote(test_block, 'abc', False)
for member in test_federation]
# fake "yes" votes with incorrect signatures
improperly_signed_valid_vote = [member.vote(test_block, 'abc', True) for
member in test_federation]
[vote['vote'].update(this_should_ruin_things='lol')
for vote in improperly_signed_valid_vote]
# test unanimously valid block
test_block['votes'] = valid_vote
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
# test partial quorum situations
test_block['votes'] = valid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
#
test_block['votes'] = valid_vote[:3]
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
#
test_block['votes'] = invalid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
# test unanimously valid block with one improperly signed vote -- should still succeed
test_block['votes'] = valid_vote[:3] + improperly_signed_valid_vote[:1]
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
# test unanimously valid block with two improperly signed votes -- should fail
test_block['votes'] = valid_vote[:2] + improperly_signed_valid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
# test block with minority invalid vote
test_block['votes'] = invalid_vote[:1] + valid_vote[:3]
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
# test split vote
test_block['votes'] = invalid_vote[:2] + valid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
# test undecided
test_block['votes'] = valid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
# change signatures in block, should fail
test_block['block']['voters'][0] = 'abc'
test_block['block']['voters'][1] = 'abc'
test_block['votes'] = valid_vote
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
def test_quorum_odd(self, b):
# test partial quorum situations for odd numbers of voters
# create a new block
test_block = b.create_block([])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(5)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# dummy block with test federation public keys as voters
test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
# fake "yes" votes
valid_vote = [member.vote(test_block, 'abc', True)
for member in test_federation]
# fake "no" votes
invalid_vote = [member.vote(test_block, 'abc', False)
for member in test_federation]
test_block['votes'] = valid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
test_block['votes'] = invalid_vote[:2]
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
test_block['votes'] = valid_vote[:3]
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
test_block['votes'] = invalid_vote[:3]
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
def test_tx_rewritten_after_invalid(self, b, user_vk):
q_block_new_vote = mp.Queue()
# create blocks with transactions
tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx2 = b.create_transaction(b.me, user_vk, None, 'CREATE')
test_block_1 = b.create_block([tx1])
test_block_2 = b.create_block([tx2])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# simulate a federation with four voters
test_block_1['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
test_block_2['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
# votes for block one
vote_1 = [member.vote(test_block_1, 'abc', True)
for member in test_federation]
# votes for block two
vote_2 = [member.vote(test_block_2, 'abc', True) for member in test_federation[:2]] + \
[member.vote(test_block_2, 'abc', False) for member in test_federation[2:]]
# construct valid block
test_block_1['votes'] = vote_1
q_block_new_vote.put(test_block_1)
# construct invalid block
test_block_2['votes'] = vote_2
q_block_new_vote.put(test_block_2)
election = Election(q_block_new_vote)
election.start()
time.sleep(1)
election.kill()
# tx1 was in a valid block, and should not be in the backlog
assert r.table('backlog').get(tx1['id']).run(b.conn) is None
# tx2 was in an invalid block and SHOULD be in the backlog
assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id']
class TestBlockStream(object):
def test_if_federation_size_is_greater_than_one_ignore_past_blocks(self, b):