mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Core/197/revert bigchain deletes (#330)
* naive revert * skip voting if already voted * typo * poison pill * tests * reversions * block reverter is separate process * factor out previous vote checking * add tests
This commit is contained in:
@@ -229,3 +229,41 @@ class Block(object):
|
||||
p_write.start()
|
||||
p_delete.start()
|
||||
|
||||
|
||||
class BlockDeleteRevert(object):
|
||||
|
||||
def __init__(self, q_delete_to_revert):
|
||||
self.q_delete_to_revert = q_delete_to_revert
|
||||
|
||||
def write_blocks(self):
|
||||
"""
|
||||
Write blocks to the bigchain
|
||||
"""
|
||||
|
||||
# create bigchain instance
|
||||
b = Bigchain()
|
||||
|
||||
# Write blocks
|
||||
while True:
|
||||
block = self.q_delete_to_revert.get()
|
||||
|
||||
# poison pill
|
||||
if block == 'stop':
|
||||
return
|
||||
|
||||
b.write_block(block)
|
||||
|
||||
def kill(self):
|
||||
for i in range(mp.cpu_count()):
|
||||
self.q_delete_to_revert.put('stop')
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Initialize, spawn, and start the processes
|
||||
"""
|
||||
|
||||
# initialize the processes
|
||||
p_write = ProcessGroup(name='write_blocks', target=self.write_blocks)
|
||||
|
||||
# start the processes
|
||||
p_write.start()
|
||||
|
||||
@@ -16,6 +16,10 @@ class GenesisBlockAlreadyExistsError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ImproperVoteError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Bigchain(object):
|
||||
"""Bigchain API
|
||||
|
||||
@@ -415,8 +419,11 @@ class Bigchain(object):
|
||||
The block if the block is valid else it raises and exception
|
||||
describing the reason why the block is invalid.
|
||||
"""
|
||||
# First, make sure this node hasn't already voted on this block
|
||||
if self.has_previous_vote(block):
|
||||
return block
|
||||
|
||||
# First: Run the plugin block validation logic
|
||||
# Run the plugin block validation logic
|
||||
self.consensus.validate_block(self, block)
|
||||
|
||||
# Finally: Tentative assumption that every blockchain will want to
|
||||
@@ -428,6 +435,26 @@ class Bigchain(object):
|
||||
|
||||
return block
|
||||
|
||||
def has_previous_vote(self, block):
|
||||
"""Check for previous votes from this node
|
||||
|
||||
Args:
|
||||
block (dict): block to check.
|
||||
|
||||
Returns:
|
||||
True if this block already has a valid vote from this node, False otherwise. If
|
||||
there is already a vote, but the vote is invalid, raises an ImproperVoteError
|
||||
"""
|
||||
if block['votes']:
|
||||
for vote in block['votes']:
|
||||
if vote['node_pubkey'] == self.me:
|
||||
if util.verify_vote_signature(block, vote):
|
||||
return True
|
||||
else:
|
||||
raise ImproperVoteError('Block {block_id} already has an incorrectly signed vote '
|
||||
'from public key {me}').format(block_id=block['id'], me=self.me)
|
||||
return False
|
||||
|
||||
def is_valid_block(self, block):
|
||||
"""Check whether a block is valid or invalid.
|
||||
|
||||
@@ -525,6 +552,10 @@ class Bigchain(object):
|
||||
def write_vote(self, block, vote, block_number):
|
||||
"""Write the vote to the database."""
|
||||
|
||||
# First, make sure this block doesn't contain a vote from this node
|
||||
if self.has_previous_vote(block):
|
||||
return None
|
||||
|
||||
update = {'votes': r.row['votes'].append(vote)}
|
||||
|
||||
# We need to *not* override the existing block_number, if any
|
||||
|
||||
@@ -6,7 +6,7 @@ import rethinkdb as r
|
||||
import bigchaindb
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.voter import Voter, Election
|
||||
from bigchaindb.block import Block
|
||||
from bigchaindb.block import Block, BlockDeleteRevert
|
||||
from bigchaindb.web import server
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ class Processes(object):
|
||||
self.q_new_block = mp.Queue()
|
||||
self.q_new_transaction = mp.Queue()
|
||||
self.q_block_new_vote = mp.Queue()
|
||||
self.q_revert_delete = mp.Queue()
|
||||
|
||||
def map_backlog(self):
|
||||
# listen to changes on the backlog and redirect the changes
|
||||
@@ -69,7 +70,8 @@ class Processes(object):
|
||||
|
||||
# delete
|
||||
elif change['new_val'] is None:
|
||||
pass
|
||||
# this should never happen in regular operation
|
||||
self.q_revert_delete.put(change['old_val'])
|
||||
|
||||
# update (new vote)
|
||||
elif change['new_val'] is not None and change['old_val'] is not None:
|
||||
@@ -80,6 +82,7 @@ class Processes(object):
|
||||
|
||||
# instantiate block and voter
|
||||
block = Block(self.q_new_transaction)
|
||||
delete_reverter = BlockDeleteRevert(self.q_revert_delete)
|
||||
|
||||
# start the web api
|
||||
app_server = server.create_server(bigchaindb.config['server'])
|
||||
@@ -90,6 +93,7 @@ class Processes(object):
|
||||
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
|
||||
p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog)
|
||||
p_block = mp.Process(name='block', target=block.start)
|
||||
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
|
||||
p_voter = Voter(self.q_new_block)
|
||||
p_election = Election(self.q_block_new_vote)
|
||||
|
||||
@@ -100,6 +104,7 @@ class Processes(object):
|
||||
p_map_backlog.start()
|
||||
logger.info('starting block')
|
||||
p_block.start()
|
||||
p_block_delete_revert.start()
|
||||
|
||||
logger.info('starting voter')
|
||||
p_voter.start()
|
||||
|
||||
@@ -13,7 +13,7 @@ from bigchaindb import util
|
||||
from bigchaindb import exceptions
|
||||
from bigchaindb import crypto
|
||||
from bigchaindb.voter import Voter
|
||||
from bigchaindb.block import Block
|
||||
from bigchaindb.block import Block, BlockDeleteRevert
|
||||
|
||||
|
||||
@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
|
||||
@@ -257,6 +257,21 @@ class TestBigchainApi(object):
|
||||
b.write_vote(block_3, b.vote(block_3, b.get_last_voted_block(), True), 3)
|
||||
assert b.get_last_voted_block()['id'] == block_3['id']
|
||||
|
||||
def test_no_vote_written_if_block_already_has_vote(self, b):
|
||||
b.create_genesis_block()
|
||||
|
||||
block_1 = b.create_block([])
|
||||
|
||||
b.write_block(block_1, durability='hard')
|
||||
|
||||
b.write_vote(block_1, b.vote(block_1, b.get_last_voted_block(), True), 1)
|
||||
retrieved_block_1 = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
# try to vote again on the retrieved block, should do nothing
|
||||
b.write_vote(retrieved_block_1, b.vote(retrieved_block_1, b.get_last_voted_block(), True), 1)
|
||||
retrieved_block_2 = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
assert retrieved_block_1 == retrieved_block_2
|
||||
|
||||
class TestTransactionValidation(object):
|
||||
@pytest.mark.usefixtures('inputs')
|
||||
@@ -787,6 +802,39 @@ class TestBigchainBlock(object):
|
||||
# join the process
|
||||
block.kill()
|
||||
|
||||
def test_revert_delete_block(self, b):
|
||||
b.create_genesis_block()
|
||||
|
||||
block_1 = b.create_block([])
|
||||
block_2 = b.create_block([])
|
||||
block_3 = b.create_block([])
|
||||
|
||||
b.write_block(block_1, durability='hard')
|
||||
b.write_block(block_2, durability='hard')
|
||||
b.write_block(block_3, durability='hard')
|
||||
|
||||
b.write_vote(block_1, b.vote(block_1, b.get_last_voted_block(), True), 1)
|
||||
b.write_vote(block_2, b.vote(block_2, b.get_last_voted_block(), True), 2)
|
||||
b.write_vote(block_3, b.vote(block_3, b.get_last_voted_block(), True), 3)
|
||||
|
||||
q_revert_delete = mp.Queue()
|
||||
|
||||
reverter = BlockDeleteRevert(q_revert_delete)
|
||||
|
||||
# simulate changefeed
|
||||
r.table('bigchain').get(block_2['id']).delete().run(b.conn)
|
||||
q_revert_delete.put(block_2)
|
||||
|
||||
assert r.table('bigchain').get(block_2['id']).run(b.conn) is None
|
||||
|
||||
reverter.start()
|
||||
time.sleep(1)
|
||||
reverter.kill()
|
||||
|
||||
reverted_block_2 = r.table('bigchain').get(block_2['id']).run(b.conn)
|
||||
|
||||
assert reverted_block_2 == block_2
|
||||
|
||||
def test_duplicated_transactions(self):
|
||||
pytest.skip('We may have duplicates in the initial_results and changefeed')
|
||||
|
||||
|
||||
@@ -292,6 +292,31 @@ class TestBigchainVoter(object):
|
||||
assert blocks[1]['votes'][0]['vote']['voting_for_block'] == block_1['id']
|
||||
assert blocks[2]['votes'][0]['vote']['voting_for_block'] == block_2['id']
|
||||
|
||||
def test_voter_checks_for_previous_vote(self, b):
|
||||
b.create_genesis_block()
|
||||
block_1 = b.create_block([])
|
||||
b.write_block(block_1, durability='hard')
|
||||
|
||||
q_new_block = mp.Queue()
|
||||
|
||||
voter = Voter(q_new_block)
|
||||
voter.start()
|
||||
|
||||
# queue block for voting
|
||||
q_new_block.put(block_1)
|
||||
time.sleep(1)
|
||||
retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
# queue block for voting AGAIN
|
||||
q_new_block.put(retrieved_block)
|
||||
time.sleep(1)
|
||||
voter.kill()
|
||||
|
||||
re_retrieved_block = r.table('bigchain').get(block_1['id']).run(b.conn)
|
||||
|
||||
# block should be unchanged
|
||||
assert retrieved_block == re_retrieved_block
|
||||
|
||||
@pytest.mark.skipif(reason='Updating the block_number must be atomic')
|
||||
def test_updating_block_number_must_be_atomic(self):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user