mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Add last tests
This commit is contained in:
parent
17c11e5935
commit
074cae4484
@ -1,204 +1,12 @@
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import ctypes
|
||||
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.monitor import Monitor
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlockStream(object):
|
||||
"""
|
||||
Combine the stream of new blocks coming from the changefeed with the list of unvoted blocks.
|
||||
|
||||
This is a utility class that abstracts the source of data for the `Voter`.
|
||||
"""
|
||||
|
||||
def __init__(self, new_blocks):
|
||||
"""
|
||||
Create a new BlockStream instance.
|
||||
|
||||
Args:
|
||||
new_block (queue): a queue of new blocks
|
||||
"""
|
||||
|
||||
b = Bigchain()
|
||||
self.new_blocks = new_blocks
|
||||
# TODO: there might be duplicate blocks since we *first* get the changefeed and only *then* we query the
|
||||
# database to get the old blocks.
|
||||
|
||||
# TODO how about a one liner, something like:
|
||||
# self.unvoted_blocks = b.get_unvoted_blocks() if not b.nodes_except_me else []
|
||||
self.unvoted_blocks = []
|
||||
if not b.nodes_except_me:
|
||||
self.unvoted_blocks = b.get_unvoted_blocks()
|
||||
|
||||
def get(self):
|
||||
"""
|
||||
Return the next block to be processed.
|
||||
"""
|
||||
try:
|
||||
# FIXME: apparently RethinkDB returns a list instead of a cursor when using `order_by`.
|
||||
# We might change the `pop` in the future, when the driver will return a cursor.
|
||||
# We have a test for this, so if the driver implementation changes we will get a failure:
|
||||
# - tests/test_voter.py::TestBlockStream::test_if_old_blocks_get_should_return_old_block_first
|
||||
return self.unvoted_blocks.pop(0)
|
||||
except IndexError:
|
||||
return self.new_blocks.get()
|
||||
|
||||
|
||||
class Voter(object):
|
||||
|
||||
def __init__(self, q_new_block):
|
||||
"""
|
||||
Initialize the class with the needed queues.
|
||||
|
||||
Initialize with a queue where new blocks added to the bigchain will be put
|
||||
"""
|
||||
|
||||
self.monitor = Monitor()
|
||||
|
||||
self.q_new_block = q_new_block
|
||||
self.q_blocks_to_validate = mp.Queue()
|
||||
self.q_validated_block = mp.Queue()
|
||||
self.q_voted_block = mp.Queue()
|
||||
self.v_previous_block_id = mp.Value(ctypes.c_char_p)
|
||||
self.initialized = mp.Event()
|
||||
|
||||
def feed_blocks(self):
|
||||
"""
|
||||
Prepare the queue with blocks to validate
|
||||
"""
|
||||
|
||||
block_stream = BlockStream(self.q_new_block)
|
||||
while True:
|
||||
# poison pill
|
||||
block = block_stream.get()
|
||||
if block == 'stop':
|
||||
self.q_blocks_to_validate.put('stop')
|
||||
return
|
||||
|
||||
self.q_blocks_to_validate.put(block)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Checks if incoming blocks are valid or not
|
||||
"""
|
||||
|
||||
# create a bigchain instance. All processes should create their own bigchcain instance so that they all
|
||||
# have their own connection to the database
|
||||
b = Bigchain()
|
||||
|
||||
logger.info('voter waiting for new blocks')
|
||||
# signal initialization complete
|
||||
self.initialized.set()
|
||||
|
||||
while True:
|
||||
new_block = self.q_blocks_to_validate.get()
|
||||
|
||||
# poison pill
|
||||
if new_block == 'stop':
|
||||
self.q_validated_block.put('stop')
|
||||
return
|
||||
|
||||
logger.info('new_block arrived to voter')
|
||||
|
||||
with self.monitor.timer('validate_block'):
|
||||
# FIXME: the following check is done also in `is_valid_block`,
|
||||
# but validity can be true even if the block has already
|
||||
# a vote.
|
||||
if b.has_previous_vote(new_block):
|
||||
continue
|
||||
validity = b.is_valid_block(new_block)
|
||||
|
||||
self.q_validated_block.put((new_block,
|
||||
self.v_previous_block_id.value.decode(),
|
||||
validity))
|
||||
|
||||
self.v_previous_block_id.value = new_block['id'].encode()
|
||||
|
||||
def vote(self):
|
||||
"""
|
||||
Votes on the block based on the decision of the validation
|
||||
"""
|
||||
|
||||
# create a bigchain instance
|
||||
b = Bigchain()
|
||||
|
||||
while True:
|
||||
elem = self.q_validated_block.get()
|
||||
|
||||
# poison pill
|
||||
if elem == 'stop':
|
||||
self.q_voted_block.put('stop')
|
||||
return
|
||||
|
||||
validated_block, previous_block_id, decision = elem
|
||||
vote = b.vote(validated_block['id'], previous_block_id, decision)
|
||||
self.q_voted_block.put((validated_block, vote))
|
||||
|
||||
def update_block(self):
|
||||
"""
|
||||
Appends the vote in the bigchain table
|
||||
"""
|
||||
|
||||
# create a bigchain instance
|
||||
b = Bigchain()
|
||||
|
||||
while True:
|
||||
elem = self.q_voted_block.get()
|
||||
|
||||
# poison pill
|
||||
if elem == 'stop':
|
||||
logger.info('clean exit')
|
||||
return
|
||||
|
||||
block, vote = elem
|
||||
pretty_vote = 'valid' if vote['vote']['is_block_valid'] else 'invalid'
|
||||
logger.info('voting %s for block %s', pretty_vote, block['id'])
|
||||
b.write_vote(vote)
|
||||
|
||||
def bootstrap(self):
|
||||
"""
|
||||
Before starting handling the new blocks received by the changefeed we need to handle unvoted blocks
|
||||
added to the bigchain while the process was down
|
||||
|
||||
We also need to set the previous_block_id.
|
||||
"""
|
||||
|
||||
b = Bigchain()
|
||||
last_voted = b.get_last_voted_block()
|
||||
|
||||
self.v_previous_block_id.value = last_voted['id'].encode()
|
||||
|
||||
def kill(self):
|
||||
"""
|
||||
Terminate processes
|
||||
"""
|
||||
self.q_new_block.put('stop')
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Initialize, spawn, and start the processes
|
||||
"""
|
||||
|
||||
self.bootstrap()
|
||||
|
||||
# initialize the processes
|
||||
p_feed_blocks = mp.Process(name='block_feeder', target=self.feed_blocks)
|
||||
p_validate = mp.Process(name='block_validator', target=self.validate)
|
||||
p_vote = mp.Process(name='block_voter', target=self.vote)
|
||||
p_update = mp.Process(name='block_updater', target=self.update_block)
|
||||
|
||||
# start the processes
|
||||
p_feed_blocks.start()
|
||||
p_validate.start()
|
||||
p_vote.start()
|
||||
p_update.start()
|
||||
|
||||
|
||||
class Election(object):
|
||||
|
||||
def __init__(self, q_block_new_vote):
|
||||
|
@ -1,6 +1,5 @@
|
||||
import copy
|
||||
import multiprocessing as mp
|
||||
import random
|
||||
import time
|
||||
|
||||
import pytest
|
||||
@ -9,7 +8,6 @@ import cryptoconditions as cc
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb import crypto, exceptions, util
|
||||
from bigchaindb.voter import Voter
|
||||
from bigchaindb.block import BlockDeleteRevert
|
||||
|
||||
|
||||
|
@ -5,7 +5,7 @@ import multiprocessing as mp
|
||||
|
||||
from bigchaindb import util
|
||||
|
||||
from bigchaindb.voter import Voter, Election, BlockStream
|
||||
from bigchaindb.voter import Election, BlockStream
|
||||
from bigchaindb import crypto, Bigchain
|
||||
|
||||
|
||||
@ -308,38 +308,3 @@ class TestBlockElection(object):
|
||||
# tx2 was in an invalid block and SHOULD be in the backlog
|
||||
assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id']
|
||||
|
||||
|
||||
class TestBlockStream(object):
|
||||
|
||||
def test_if_federation_size_is_greater_than_one_ignore_past_blocks(self, b):
|
||||
for _ in range(5):
|
||||
b.nodes_except_me.append(crypto.generate_key_pair()[1])
|
||||
new_blocks = mp.Queue()
|
||||
bs = BlockStream(new_blocks)
|
||||
block_1 = dummy_block()
|
||||
new_blocks.put(block_1)
|
||||
assert block_1 == bs.get()
|
||||
|
||||
def test_if_no_old_blocks_get_should_return_new_blocks(self, b):
|
||||
new_blocks = mp.Queue()
|
||||
bs = BlockStream(new_blocks)
|
||||
|
||||
# create two blocks
|
||||
block_1 = dummy_block()
|
||||
block_2 = dummy_block()
|
||||
|
||||
# write the blocks
|
||||
b.write_block(block_1, durability='hard')
|
||||
b.write_block(block_2, durability='hard')
|
||||
|
||||
# simulate a changefeed
|
||||
new_blocks.put(block_1)
|
||||
new_blocks.put(block_2)
|
||||
|
||||
# and check if we get exactly these two blocks
|
||||
assert bs.get() == block_1
|
||||
assert bs.get() == block_2
|
||||
|
||||
@pytest.mark.skipif(reason='We may have duplicated blocks when retrieving the BlockStream')
|
||||
def test_ignore_duplicated_blocks_when_retrieving_the_blockstream(self):
|
||||
pass
|
||||
|
@ -157,6 +157,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch):
|
||||
assert crypto.VerifyingKey(b.me).verify(util.serialize(vote_doc['vote']),
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
@ -257,6 +258,125 @@ def test_invalid_block_voting(monkeypatch, b, user_vk):
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr(util, 'timestamp', lambda: '1')
|
||||
b.create_genesis_block()
|
||||
|
||||
# insert blocks in the database while the voter process is not listening
|
||||
# (these blocks won't appear in the changefeed)
|
||||
block_1 = dummy_block(b)
|
||||
b.write_block(block_1, durability='hard')
|
||||
block_2 = dummy_block(b)
|
||||
b.write_block(block_2, durability='hard')
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
|
||||
vote_pipeline.start()
|
||||
|
||||
# We expects two votes, so instead of waiting an arbitrary amount
|
||||
# of time, we can do two blocking calls to `get`
|
||||
outpipe.get()
|
||||
outpipe.get()
|
||||
|
||||
# create a new block that will appear in the changefeed
|
||||
block_3 = dummy_block(b)
|
||||
b.write_block(block_3, durability='hard')
|
||||
|
||||
# Same as before with the two `get`s
|
||||
outpipe.get()
|
||||
|
||||
vote_pipeline.terminate()
|
||||
|
||||
# retrive blocks from bigchain
|
||||
blocks = list(r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))
|
||||
.run(b.conn))
|
||||
|
||||
# FIXME: remove genesis block, we don't vote on it (might change in the future)
|
||||
blocks.pop(0)
|
||||
vote_pipeline.terminate()
|
||||
|
||||
# retrieve vote
|
||||
votes = r.table('votes').run(b.conn)
|
||||
votes = list(votes)
|
||||
|
||||
assert all(vote['node_pubkey'] == b.me for vote in votes)
|
||||
|
||||
|
||||
def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr(util, 'timestamp', lambda: '1')
|
||||
b.create_genesis_block()
|
||||
|
||||
monkeypatch.setattr(util, 'timestamp', lambda: '2')
|
||||
block_1 = dummy_block(b)
|
||||
b.write_block(block_1, durability='hard')
|
||||
|
||||
monkeypatch.setattr(util, 'timestamp', lambda: '3')
|
||||
block_2 = dummy_block(b)
|
||||
b.write_block(block_2, durability='hard')
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
|
||||
vote_pipeline.start()
|
||||
|
||||
# We expects two votes, so instead of waiting an arbitrary amount
|
||||
# of time, we can do two blocking calls to `get`
|
||||
outpipe.get()
|
||||
outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
# retrive blocks from bigchain
|
||||
blocks = list(r.table('bigchain')
|
||||
.order_by(r.asc((r.row['block']['timestamp'])))
|
||||
.run(b.conn))
|
||||
|
||||
# retrieve votes
|
||||
votes = list(r.table('votes').run(b.conn))
|
||||
|
||||
assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
|
||||
assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
|
||||
|
||||
|
||||
def test_voter_checks_for_previous_vote(monkeypatch, b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr(util, 'timestamp', lambda: '1')
|
||||
b.create_genesis_block()
|
||||
|
||||
block_1 = dummy_block(b)
|
||||
inpipe.put(block_1)
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
vote_pipeline.start()
|
||||
|
||||
# wait for the result
|
||||
outpipe.get()
|
||||
|
||||
retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
# queue block for voting AGAIN
|
||||
inpipe.put(retrieved_block)
|
||||
|
||||
re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
vote_pipeline.terminate()
|
||||
|
||||
# block should be unchanged
|
||||
assert retrieved_block == re_retrieved_block
|
||||
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
def test_start(mock_start, b):
|
||||
# TODO: `block.start` is just a wrapper around `vote.create_pipeline`,
|
||||
|
Loading…
x
Reference in New Issue
Block a user