Votes table (#379)

* create votes table and indexes

* initial db call rewrite

* setup votes table in tests

* rewrite election status

* update api tests

* update tests

* bigchaindb set-shards should change 'votes' table too

* Forgot to add change to bigchain.py in last commit

* update test

* move exceptions

* collapse line

* revise test

* remove obsolete test

* remove votelist from blocks

* sharding and replication

* documentation changes

* language change

* more readable assertions

* fix exception

* test exceptions

* remove parameter

* remove loop
This commit is contained in:
Ryan Henderson 2016-07-13 09:40:07 +02:00 committed by GitHub
parent fc8facaa61
commit 073dcdaa79
8 changed files with 200 additions and 111 deletions

View File

@ -207,7 +207,7 @@ def run_load(args):
def run_set_shards(args):
b = bigchaindb.Bigchain()
for table in ['bigchain', 'backlog']:
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = r.table(table).config().run(b.conn)
num_replicas = len(table_config['shards'][0]['replicas'])
@ -219,7 +219,7 @@ def run_set_shards(args):
def run_set_replicas(args):
b = bigchaindb.Bigchain()
for table in ['bigchain', 'backlog']:
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = r.table(table).config().run(b.conn)
num_shards = len(table_config['shards'])

View File

@ -1,6 +1,7 @@
import random
import math
import operator
import collections
import rethinkdb as r
import rapidjson
@ -397,7 +398,6 @@ class Bigchain(object):
'id': block_hash,
'block': block,
'signature': block_signature,
'votes': []
}
return block
@ -444,15 +444,20 @@ class Bigchain(object):
but the vote is invalid.
"""
if block['votes']:
for vote in block['votes']:
if vote['node_pubkey'] == self.me:
if not util.verify_vote_signature(block, vote):
raise exceptions.ImproperVoteError(
'Block {} already has an incorrectly signed vote from public key {}'
).format(block['id'], self.me)
return True
return False
votes = list(r.table('votes').get_all([block['id'], self.me], index='block_and_voter').run(self.conn))
if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
.format(block_id=block['id'], n_votes=str(len(votes)), me=self.me))
has_previous_vote = False
if votes:
if util.verify_vote_signature(block, votes[0]):
has_previous_vote = True
else:
raise exceptions.ImproperVoteError('Block {block_id} already has an incorrectly signed vote '
'from public key {me}'.format(block_id=block['id'], me=self.me))
return has_previous_vote
def is_valid_block(self, block):
"""Check whether a block is valid or invalid.
@ -555,44 +560,44 @@ class Bigchain(object):
if self.has_previous_vote(block):
return None
update = {'votes': r.row['votes'].append(vote)}
# We need to *not* override the existing block_number, if any
# FIXME: MIGHT HAVE RACE CONDITIONS WITH THE OTHER NODES IN THE FEDERATION
if 'block_number' not in block:
update['block_number'] = block_number
if 'block_number' not in vote:
vote['block_number'] = block_number # maybe this should be in the signed part...or better yet, removed..
r.table('bigchain') \
.get(vote['vote']['voting_for_block']) \
.update(update) \
r.table('votes') \
.insert(vote) \
.run(self.conn)
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
# query bigchain for all blocks this node is a voter but didn't voted on
last_voted = r.table('bigchain') \
.filter(r.row['block']['voters'].contains(self.me)) \
.filter(lambda doc: doc['votes'].contains(lambda vote: vote['node_pubkey'] == self.me)) \
last_voted = r.table('votes') \
.filter(r.row['node_pubkey'] == self.me) \
.order_by(r.desc('block_number')) \
.limit(1) \
.run(self.conn)
# return last vote if last vote exists else return Genesis block
last_voted = list(last_voted)
if not last_voted:
return list(r.table('bigchain')
.filter(r.row['block_number'] == 0)
.run(self.conn))[0]
return last_voted[0]
res = r.table('bigchain').get(last_voted[0]['vote']['voting_for_block']).run(self.conn)
if 'block_number' in last_voted[0]:
res['block_number'] = last_voted[0]['block_number']
return res
def get_unvoted_blocks(self):
"""Return all the blocks that has not been voted by this node."""
unvoted = r.table('bigchain') \
.filter(lambda doc: doc['votes'].contains(lambda vote: vote['node_pubkey'] == self.me).not_()) \
.order_by(r.asc((r.row['block']['timestamp']))) \
.filter(lambda block: r.table('votes').get_all([block['id'], self.me], index='block_and_voter')
.is_empty()) \
.order_by(r.desc('block_number')) \
.run(self.conn)
if unvoted and unvoted[0].get('block_number') == 0:
@ -603,9 +608,26 @@ class Bigchain(object):
def block_election_status(self, block):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = r.table('votes') \
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \
.run(self.conn)
votes = list(votes)
n_voters = len(block['block']['voters'])
vote_cast = [vote['vote']['is_block_valid'] for vote in block['votes']]
vote_validity = [self.consensus.verify_vote_signature(block, vote) for vote in block['votes']]
voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes])
for node in voter_counts:
if voter_counts[node] > 1:
raise exceptions.MultipleVotesError('Block {block_id} has multiple votes ({n_votes}) from voting node {node_id}'
.format(block_id=block['id'], n_votes=str(voter_counts[node]), node_id=node))
if len(votes) > n_voters:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes cast, but only {n_voters} voters'
.format(block_id=block['id'], n_votes=str(len(votes)), n_voters=str(n_voters)))
vote_cast = [vote['vote']['is_block_valid'] for vote in votes]
vote_validity = [self.consensus.verify_vote_signature(block, vote) for vote in votes]
# element-wise product of stated vote and validity of vote
vote_list = list(map(operator.mul, vote_cast, vote_validity))

View File

@ -36,6 +36,7 @@ def init():
# create the tables
r.db(dbname).table_create('bigchain').run(conn)
r.db(dbname).table_create('backlog').run(conn)
r.db(dbname).table_create('votes').run(conn)
logger.info(' - indexes')
# create the secondary indexes
@ -52,6 +53,11 @@ def init():
r.db(dbname).table('backlog')\
.index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['transaction']['timestamp']])\
.run(conn)
# compound index to order votes by block id and node
r.db(dbname).table('votes').index_create('block_and_voter',
[r.row['vote']['voting_for_block'], r.row['node_pubkey']]).run(conn)
# secondary index for payload data by UUID
r.db(dbname).table('bigchain')\
.index_create('payload_uuid', r.row['block']['transactions']['transaction']['data']['uuid'], multi=True)\
@ -60,6 +66,7 @@ def init():
# wait for rethinkdb to finish creating secondary indexes
r.db(dbname).table('backlog').index_wait().run(conn)
r.db(dbname).table('bigchain').index_wait().run(conn)
r.db(dbname).table('votes').index_wait().run(conn)
logger.info(' - genesis block')
b.create_genesis_block()

View File

@ -47,7 +47,11 @@ class StartupError(Exception):
class ImproperVoteError(Exception):
"""Raised when an invalid vote is found"""
"""Raised if a vote is not constructed correctly, or signed incorrectly"""
class MultipleVotesError(Exception):
"""Raised if a voter has voted more than once"""
class GenesisBlockAlreadyExistsError(Exception):

View File

@ -3,7 +3,7 @@
BigchainDB stores all its records in JSON documents.
The three main kinds of records are transactions, blocks and votes.
_Transactions_ are used to register, issue, create or transfer things (e.g. assets). Multiple transactions are combined with some other metadata to form _blocks_. Nodes append _votes_ to blocks. This section is a reference on the details of transactions, blocks and votes.
_Transactions_ are used to register, issue, create or transfer things (e.g. assets). Multiple transactions are combined with some other metadata to form _blocks_. Nodes vote on blocks. This section is a reference on the details of transactions, blocks and votes.
Below we often refer to cryptographic hashes, keys and signatures. The details of those are covered in [the section on cryptography](../appendices/cryptography.html).
@ -230,7 +230,6 @@ If there is only one _current owner_, the fulfillment will be a simple signature
"voters": ["<list of federation nodes public keys>"]
},
"signature": "<signature of block>",
"votes": ["<list of votes>"]
}
```
@ -244,14 +243,14 @@ If there is only one _current owner_, the fulfillment will be a simple signature
in the federation when the block was created, so that at a later point in
time we can check that the block received the correct number of votes.
- `signature`: Signature of the block by the node that created the block. (To create the signature, the node serializes the block contents and signs that with its private key.)
- `votes`: Initially an empty list. New votes are appended as they come in from the nodes.
## The Vote Model
Each node must generate a vote for each block, to be appended to that block's `votes` list. A vote has the following structure:
Each node must generate a vote for each block, to be appended the `votes` table. A vote has the following structure:
```json
{
"id": "<RethinkDB-generated ID for the vote>",
"node_pubkey": "<the public key of the voting node>",
"vote": {
"voting_for_block": "<id of the block the node is voting for>",
@ -260,7 +259,8 @@ Each node must generate a vote for each block, to be appended to that block's `v
"invalid_reason": "<None|DOUBLE_SPEND|TRANSACTIONS_HASH_MISMATCH|NODES_PUBKEYS_MISMATCH",
"timestamp": "<Unix time when the vote was generated, provided by the voting node>"
},
"signature": "<signature of vote>"
"signature": "<signature of vote>",
"block_number": "<roughly sequential integer index for block ordering>"
}
```

View File

@ -38,6 +38,8 @@ def setup_database(request, node_config):
# setup tables
r.db(db_name).table_create('bigchain').run()
r.db(db_name).table_create('backlog').run()
r.db(db_name).table_create('votes').run()
# create the secondary indexes
# to order blocks by timestamp
r.db(db_name).table('bigchain').index_create('block_timestamp', r.row['block']['timestamp']).run()
@ -55,6 +57,9 @@ def setup_database(request, node_config):
r.db(db_name).table('backlog')\
.index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['transaction']['timestamp']])\
.run()
# compound index to order votes by block id and node
r.db(db_name).table('votes').index_create('block_and_voter',
[r.row['vote']['voting_for_block'], r.row['node_pubkey']]).run()
# order transactions by id
r.db(db_name).table('bigchain').index_create('transaction_id', r.row['block']['transactions']['id'],
multi=True).run()
@ -84,6 +89,7 @@ def cleanup_tables(request, node_config):
try:
r.db(db_name).table('bigchain').delete().run()
r.db(db_name).table('backlog').delete().run()
r.db(db_name).table('votes').delete().run()
except r.ReqlOpFailedError as e:
if e.message != 'Database `{}` does not exist.'.format(db_name):
raise

View File

@ -247,7 +247,6 @@ class TestBigchainApi(object):
assert new_block['block']['node_pubkey'] == b.me
assert crypto.VerifyingKey(b.me).verify(util.serialize(new_block['block']), new_block['signature']) is True
assert new_block['id'] == block_hash
assert new_block['votes'] == []
def test_create_empty_block(self, b):
with pytest.raises(exceptions.OperationError) as excinfo:
@ -300,6 +299,57 @@ class TestBigchainApi(object):
assert retrieved_block_1 == retrieved_block_2
def test_more_votes_than_voters(self, b):
b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
# insert duplicate votes
vote_1 = b.vote(block_1, b.get_last_voted_block(), True)
vote_2 = b.vote(block_1, b.get_last_voted_block(), True)
vote_2['node_pubkey'] = 'aaaaaaa'
r.table('votes').insert(vote_1).run(b.conn)
r.table('votes').insert(vote_2).run(b.conn)
from bigchaindb.exceptions import MultipleVotesError
with pytest.raises(MultipleVotesError) as excinfo:
b.block_election_status(block_1)
assert excinfo.value.args[0] == 'Block {block_id} has {n_votes} votes cast, but only {n_voters} voters'\
.format(block_id=block_1['id'], n_votes=str(2), n_voters=str(1))
def test_multiple_votes_single_node(self, b):
b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
# insert duplicate votes
for i in range(2):
r.table('votes').insert(b.vote(block_1, b.get_last_voted_block(), True)).run(b.conn)
from bigchaindb.exceptions import MultipleVotesError
with pytest.raises(MultipleVotesError) as excinfo:
b.block_election_status(block_1)
assert excinfo.value.args[0] == 'Block {block_id} has multiple votes ({n_votes}) from voting node {node_id}'\
.format(block_id=block_1['id'], n_votes=str(2), node_id=b.me)
with pytest.raises(MultipleVotesError) as excinfo:
b.has_previous_vote(block_1)
assert excinfo.value.args[0] == 'Block {block_id} has {n_votes} votes from public key {me}'\
.format(block_id=block_1['id'], n_votes=str(2), me=b.me)
def test_improper_vote_error(selfs, b):
b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
vote_1 = b.vote(block_1, b.get_last_voted_block(), True)
# mangle the signature
vote_1['signature'] = vote_1['signature'][2:] + vote_1['signature'][:1]
r.table('votes').insert(vote_1).run(b.conn)
from bigchaindb.exceptions import ImproperVoteError
with pytest.raises(ImproperVoteError) as excinfo:
b.has_previous_vote(block_1)
assert excinfo.value.args[0] == 'Block {block_id} already has an incorrectly signed ' \
'vote from public key {me}'.format(block_id=block_1['id'], me=b.me)
class TestTransactionValidation(object):
@pytest.mark.usefixtures('inputs')
def test_create_operation_with_inputs(self, b, user_vk):
@ -552,9 +602,12 @@ class TestBigchainVoter(object):
# retrive block from bigchain
bigchain_block = r.table('bigchain').get(block['id']).run(b.conn)
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(bigchain_block['votes']) == 1
vote = bigchain_block['votes'][0]
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['previous_block'] == genesis['id']
@ -593,9 +646,12 @@ class TestBigchainVoter(object):
# retrive block from bigchain
bigchain_block = r.table('bigchain').get(block['id']).run(b.conn)
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(bigchain_block['votes']) == 1
vote = bigchain_block['votes'][0]
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['previous_block'] == genesis['id']

View File

@ -52,9 +52,12 @@ class TestBigchainVoter(object):
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(blocks[1]['votes']) == 1
vote = blocks[1]['votes'][0]
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['previous_block'] == genesis['id']
@ -95,10 +98,12 @@ class TestBigchainVoter(object):
blocks = list(r.table('bigchain')
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(blocks[1]['votes']) == 1
vote = blocks[1]['votes'][0]
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['previous_block'] == genesis['id']
@ -138,8 +143,12 @@ class TestBigchainVoter(object):
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(blocks[1]['votes']) == 1
assert vote is not None
# create a `TRANSFER` transaction
test_user2_priv, test_user2_pub = crypto.generate_key_pair()
@ -167,10 +176,12 @@ class TestBigchainVoter(object):
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
# validate vote
assert len(blocks[2]['votes']) == 1
# retrieve vote
vote = r.table('votes').get_all([blocks[2]['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
vote = blocks[2]['votes'][0]
# validate vote
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['is_block_valid'] is True
@ -208,9 +219,12 @@ class TestBigchainVoter(object):
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
# retrieve vote
vote = r.table('votes').get_all([block['id'], b.me], index='block_and_voter').run(b.conn)
vote = vote.next()
# validate vote
assert len(blocks[1]['votes']) == 1
vote = blocks[1]['votes'][0]
assert vote is not None
assert vote['vote']['voting_for_block'] == block['id']
assert vote['vote']['previous_block'] == genesis['id']
@ -282,7 +296,11 @@ class TestBigchainVoter(object):
# FIXME: remove genesis block, we don't vote on it (might change in the future)
blocks.pop(0)
assert all(block['votes'][0]['node_pubkey'] == b.me for block in blocks)
# retrieve vote
votes = r.table('votes').run(b.conn)
votes = list(votes)
assert all(vote['node_pubkey'] == b.me for vote in votes)
def test_voter_chains_blocks_with_the_previous_ones(self, b):
b.create_genesis_block()
@ -306,14 +324,14 @@ class TestBigchainVoter(object):
.order_by(r.asc((r.row['block']['timestamp'])))
.run(b.conn))
assert blocks[0]['block_number'] == 0
assert blocks[1]['block_number'] == 1
assert blocks[2]['block_number'] == 2
# retrieve votes
votes = r.table('votes')\
.order_by(r.asc((r.row['block_number'])))\
.run(b.conn)
# we don't vote on the genesis block right now
# assert blocks[0]['votes'][0]['vote']['voting_for_block'] == genesis['id']
assert blocks[1]['votes'][0]['vote']['voting_for_block'] == block_1['id']
assert blocks[2]['votes'][0]['vote']['voting_for_block'] == block_2['id']
assert blocks[0]['block_number'] == 0 # genesis block
assert votes[0]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
assert votes[1]['vote']['voting_for_block'] in (blocks[1]['id'], blocks[2]['id'])
def test_voter_checks_for_previous_vote(self, b):
b.create_genesis_block()
@ -374,43 +392,52 @@ class TestBlockElection(object):
for vote in improperly_signed_valid_vote]
# test unanimously valid block
test_block['votes'] = valid_vote
r.table('votes').insert(valid_vote, durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
r.table('votes').delete().run(b.conn)
# test partial quorum situations
test_block['votes'] = valid_vote[:2]
r.table('votes').insert(valid_vote[:2], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
r.table('votes').delete().run(b.conn)
#
test_block['votes'] = valid_vote[:3]
r.table('votes').insert(valid_vote[:3], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
r.table('votes').delete().run(b.conn)
#
test_block['votes'] = invalid_vote[:2]
r.table('votes').insert(invalid_vote[:2], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
r.table('votes').delete().run(b.conn)
# test unanimously valid block with one improperly signed vote -- should still succeed
test_block['votes'] = valid_vote[:3] + improperly_signed_valid_vote[:1]
r.table('votes').insert(valid_vote[:3] + improperly_signed_valid_vote[3:], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
r.table('votes').delete().run(b.conn)
# test unanimously valid block with two improperly signed votes -- should fail
test_block['votes'] = valid_vote[:2] + improperly_signed_valid_vote[:2]
r.table('votes').insert(valid_vote[:2] + improperly_signed_valid_vote[2:], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
r.table('votes').delete().run(b.conn)
# test block with minority invalid vote
test_block['votes'] = invalid_vote[:1] + valid_vote[:3]
r.table('votes').insert(invalid_vote[:1] + valid_vote[1:], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
r.table('votes').delete().run(b.conn)
# test split vote
test_block['votes'] = invalid_vote[:2] + valid_vote[:2]
r.table('votes').insert(invalid_vote[:2] + valid_vote[2:], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
r.table('votes').delete().run(b.conn)
# test undecided
test_block['votes'] = valid_vote[:2]
r.table('votes').insert(valid_vote[:2], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
r.table('votes').delete().run(b.conn)
# change signatures in block, should fail
test_block['block']['voters'][0] = 'abc'
test_block['block']['voters'][1] = 'abc'
test_block['votes'] = valid_vote
r.table('votes').insert(valid_vote, durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
def test_quorum_odd(self, b):
@ -434,17 +461,21 @@ class TestBlockElection(object):
invalid_vote = [member.vote(test_block, 'abc', False)
for member in test_federation]
test_block['votes'] = valid_vote[:2]
r.table('votes').insert(valid_vote[:2], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
r.table('votes').delete().run(b.conn)
test_block['votes'] = invalid_vote[:2]
r.table('votes').insert(invalid_vote[:2], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_UNDECIDED
r.table('votes').delete().run(b.conn)
test_block['votes'] = valid_vote[:3]
r.table('votes').insert(valid_vote[:3], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_VALID
r.table('votes').delete().run(b.conn)
test_block['votes'] = invalid_vote[:3]
r.table('votes').insert(invalid_vote[:3], durability='hard').run(b.conn)
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
r.table('votes').delete().run(b.conn)
def test_tx_rewritten_after_invalid(self, b, user_vk):
q_block_new_vote = mp.Queue()
@ -473,11 +504,11 @@ class TestBlockElection(object):
[member.vote(test_block_2, 'abc', False) for member in test_federation[2:]]
# construct valid block
test_block_1['votes'] = vote_1
r.table('votes').insert(vote_1, durability='hard').run(b.conn)
q_block_new_vote.put(test_block_1)
# construct invalid block
test_block_2['votes'] = vote_2
r.table('votes').insert(vote_2, durability='hard').run(b.conn)
q_block_new_vote.put(test_block_2)
election = Election(q_block_new_vote)
@ -523,43 +554,6 @@ class TestBlockStream(object):
assert bs.get() == block_1
assert bs.get() == block_2
def test_if_old_blocks_get_should_return_old_block_first(self, b):
# create two blocks
block_1 = dummy_block()
# sleep so that block_2 as an higher timestamp then block_1
time.sleep(1)
block_2 = dummy_block()
# write the blocks
b.write_block(block_1, durability='hard')
b.write_block(block_2, durability='hard')
new_blocks = mp.Queue()
bs = BlockStream(new_blocks)
# assert len(list(bs.old_blocks)) == 2
# import pdb; pdb.set_trace()
# from pprint import pprint as pp
# pp(bs.old_blocks)
# pp(block_1)
# pp(block_2)
# create two new blocks that will appear in the changefeed
block_3 = dummy_block()
block_4 = dummy_block()
# simulate a changefeed
new_blocks.put(block_3)
new_blocks.put(block_4)
assert len(bs.unvoted_blocks) == 2
# and check if we get the old blocks first
assert bs.get() == block_1
assert bs.get() == block_2
assert bs.get() == block_3
assert bs.get() == block_4
@pytest.mark.skipif(reason='We may have duplicated blocks when retrieving the BlockStream')
def test_ignore_duplicated_blocks_when_retrieving_the_blockstream(self):
pass