This commit is contained in:
vrde 2016-07-28 13:55:57 +02:00
parent ff042b5954
commit 74a5412cd9
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
4 changed files with 91 additions and 22 deletions

View File

@ -565,13 +565,9 @@ class Bigchain(object):
return vote_signed
def write_vote(self, block, vote):
def write_vote(self, vote):
"""Write the vote to the database."""
# First, make sure this block doesn't contain a vote from this node
if self.has_previous_vote(block):
return None
r.table('votes') \
.insert(vote) \
.run(self.conn)

View File

@ -0,0 +1,73 @@
from collections import Counter
from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
class Vote:
def __init__(self):
self.bigchain = Bigchain()
last_voted = self.bigchain.get_last_voted_block()
self.last_voted_id = last_voted['id']
self.last_voted_number = last_voted['block_number']
self.counters = Counter()
self.validity = {}
def ungroup(self, block):
num_tx = len(block['block']['transactions'])
for tx in block['block']['transactions']:
yield tx, block['id'], num_tx
def validate_tx(self, tx, block_id, num_tx):
return bool(self.bigchain.is_valid_transaction(tx)), block_id, num_tx
def vote(self, tx_validity, block_id, num_tx):
self.counters[block_id] += 1
self.validity[block_id] = tx_validity and self.validity.get(block_id,
True)
if self.counters[block_id] == num_tx:
vote = self.bigchain.vote(block_id,
self.last_voted_id,
self.validity[block_id])
self.last_voted_id = block_id
del self.counters[block_id]
del self.validity[block_id]
return vote
def write_vote(self, vote):
self.bigchain.write_vote(vote)
def initial():
b = Bigchain()
initial = b.get_unvoted_blocks()
return initial
def get_changefeed():
return ChangeFeed('bigchain', 'insert', prefeed=initial())
def create_pipeline():
voter = Vote()
vote_pipeline = Pipeline([
Node(voter.ungroup),
Node(voter.validate_tx, fraction_of_cores=1),
Node(voter.vote),
Node(voter.write_vote)
])
return vote_pipeline
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

View File

@ -158,7 +158,7 @@ class Voter(object):
block, vote = elem
pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid'
logger.info('voting %s for block %s', pretty_vote, block['id'])
b.write_vote(block, vote)
b.write_vote(vote)
def bootstrap(self):
"""

View File

@ -140,7 +140,7 @@ class TestBigchainApi(object):
# vote the block invalid
vote = b.vote(block['id'], b.get_last_voted_block()['id'], False)
b.write_vote(block, vote)
b.write_vote(vote)
response = b.get_transaction(tx_signed["id"])
# should be None, because invalid blocks are ignored
@ -280,13 +280,13 @@ class TestBigchainApi(object):
# make sure all the blocks are written at the same time
monkeypatch.setattr(util, 'timestamp', lambda: '1')
b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_1['id']
b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_2['id']
b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_3['id']
@ -305,15 +305,15 @@ class TestBigchainApi(object):
# make sure all the blocks are written at different timestamps
monkeypatch.setattr(util, 'timestamp', lambda: '1')
b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_1['id']
monkeypatch.setattr(util, 'timestamp', lambda: '2')
b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_2['id']
monkeypatch.setattr(util, 'timestamp', lambda: '3')
b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
assert b.get_last_voted_block()['id'] == block_3['id']
def test_no_vote_written_if_block_already_has_vote(self, b):
@ -323,11 +323,11 @@ class TestBigchainApi(object):
b.write_block(block_1, durability='hard')
b.write_vote(block_1, b.vote(block_1['id'], genesis['id'], True))
b.write_vote(b.vote(block_1['id'], genesis['id'], True))
retrieved_block_1 = r.table('bigchain').get(block_1['id']).run(b.conn)
# try to vote again on the retrieved block, should do nothing
b.write_vote(retrieved_block_1, b.vote(retrieved_block_1['id'], genesis['id'], True))
b.write_vote(b.vote(retrieved_block_1['id'], genesis['id'], True))
retrieved_block_2 = r.table('bigchain').get(block_1['id']).run(b.conn)
assert retrieved_block_1 == retrieved_block_2
@ -622,9 +622,9 @@ class TestBigchainBlock(object):
b.write_block(block_2, durability='hard')
b.write_block(block_3, durability='hard')
b.write_vote(block_1, b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
b.write_vote(block_2, b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
b.write_vote(block_3, b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_1['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_2['id'], b.get_last_voted_block()['id'], True))
b.write_vote(b.vote(block_3['id'], b.get_last_voted_block()['id'], True))
q_revert_delete = mp.Queue()
@ -922,7 +922,7 @@ class TestMultipleInputs(object):
# vote the block VALID
vote = b.vote(block['id'], genesis['id'], True)
b.write_vote(block, vote)
b.write_vote(vote)
# get input
owned_inputs_user1 = b.get_owned_ids(user_vk)
@ -938,7 +938,7 @@ class TestMultipleInputs(object):
# vote the block invalid
vote = b.vote(block['id'], b.get_last_voted_block()['id'], False)
b.write_vote(block, vote)
b.write_vote(vote)
owned_inputs_user1 = b.get_owned_ids(user_vk)
owned_inputs_user2 = b.get_owned_ids(user2_vk)
@ -1052,7 +1052,7 @@ class TestMultipleInputs(object):
# vote the block VALID
vote = b.vote(block['id'], genesis['id'], True)
b.write_vote(block, vote)
b.write_vote(vote)
# get input
owned_inputs_user1 = b.get_owned_ids(user_vk)
@ -1069,7 +1069,7 @@ class TestMultipleInputs(object):
# vote the block invalid
vote = b.vote(block['id'], b.get_last_voted_block()['id'], False)
b.write_vote(block, vote)
b.write_vote(vote)
response = b.get_transaction(tx_signed["id"])
spent_inputs_user1 = b.get_spent(owned_inputs_user1[0])