Implement crash recovery for elections.

1. Save pre-commit state in the beginning of end block. 2. Provide an interface for custom elections to rollback on crash recovery. 3. Simplify pre-commit management. 4. Add crash recovery for updert-validator and chain migration elecitons.
This commit is contained in:
Lev Berman 2018-09-20 18:58:58 +02:00
parent 72d7986a58
commit 5624f9de1b
16 changed files with 307 additions and 135 deletions

View File

@ -261,18 +261,15 @@ def get_unspent_outputs(conn, *, query=None):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_pre_commit_state(conn, state): def store_pre_commit_state(conn, state):
commit_id = state['commit_id']
return conn.run( return conn.run(
conn.collection('pre_commit') conn.collection('pre_commit')
.replace_one({'commit_id': commit_id}, state, upsert=True) .replace_one({}, state, upsert=True)
) )
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_pre_commit_state(conn, commit_id): def get_pre_commit_state(conn):
return conn.run(conn.collection('pre_commit') return conn.run(conn.collection('pre_commit').find_one())
.find_one({'commit_id': commit_id},
projection={'_id': False}))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -287,6 +284,13 @@ def store_validator_set(conn, validators_update):
) )
@register_query(LocalMongoDBConnection)
def delete_validator_set(conn, height):
return conn.run(
conn.collection('validators').delete_many({'height': height})
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_election(conn, election_id, height, is_concluded): def store_election(conn, election_id, height, is_concluded):
return conn.run( return conn.run(
@ -308,6 +312,13 @@ def store_elections(conn, elections):
) )
@register_query(LocalMongoDBConnection)
def delete_elections(conn, height):
return conn.run(
conn.collection('elections').delete_many({'height': height})
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_validator_set(conn, height=None): def get_validator_set(conn, height=None):
query = {} query = {}
@ -360,6 +371,13 @@ def store_abci_chain(conn, height, chain_id, is_synced=True):
) )
@register_query(LocalMongoDBConnection)
def delete_abci_chain(conn, height):
return conn.run(
conn.collection('abci_chains').delete_many({'height': height})
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_latest_abci_chain(conn): def get_latest_abci_chain(conn):
return conn.run( return conn.run(

View File

@ -42,7 +42,7 @@ INDEXES = {
('output_index', ASCENDING)], dict(name='utxo', unique=True)), ('output_index', ASCENDING)], dict(name='utxo', unique=True)),
], ],
'pre_commit': [ 'pre_commit': [
('commit_id', dict(name='pre_commit_id', unique=True)), ('height', dict(name='height', unique=True)),
], ],
'elections': [ 'elections': [
([('height', DESCENDING), ('election_id', ASCENDING)], ([('height', DESCENDING), ('election_id', ASCENDING)],

View File

@ -8,9 +8,6 @@ from functools import singledispatch
from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.exceptions import OperationError
VALIDATOR_UPDATE_ID = 'a_unique_id_string'
PRE_COMMIT_ID = 'a_unique_id_string'
@singledispatch @singledispatch
def store_asset(connection, asset): def store_asset(connection, asset):
@ -316,12 +313,11 @@ def get_unspent_outputs(connection, *, query=None):
@singledispatch @singledispatch
def store_pre_commit_state(connection, commit_id, state): def store_pre_commit_state(connection, state):
"""Store pre-commit state in a document with `id` as `commit_id`. """Store pre-commit state.
Args: Args:
commit_id (string): `id` of document where `state` should be stored. state (dict): pre-commit state.
state (dict): commit state.
Returns: Returns:
The result of the operation. The result of the operation.
@ -331,14 +327,11 @@ def store_pre_commit_state(connection, commit_id, state):
@singledispatch @singledispatch
def get_pre_commit_state(connection, commit_id): def get_pre_commit_state(connection):
"""Get pre-commit state where `id` is `commit_id`. """Get pre-commit state.
Args:
commit_id (string): `id` of document where `state` should be stored.
Returns: Returns:
Document with `id` as `commit_id` Document representing the pre-commit state.
""" """
raise NotImplementedError raise NotImplementedError
@ -351,6 +344,13 @@ def store_validator_set(conn, validator_update):
raise NotImplementedError raise NotImplementedError
@singledispatch
def delete_validator_set(conn, height):
"""Delete the validator set at the given height."""
raise NotImplementedError
@singledispatch @singledispatch
def store_election(conn, election_id, height, is_concluded): def store_election(conn, election_id, height, is_concluded):
"""Store election record""" """Store election record"""
@ -365,6 +365,13 @@ def store_elections(conn, elections):
raise NotImplementedError raise NotImplementedError
@singledispatch
def delete_elections(conn, height):
"""Delete all election records at the given height"""
raise NotImplementedError
@singledispatch @singledispatch
def get_validator_set(conn, height): def get_validator_set(conn, height):
"""Get validator set for a given `height`, if `height` is not specified """Get validator set for a given `height`, if `height` is not specified
@ -403,6 +410,14 @@ def store_abci_chain(conn, height, chain_id, is_synced=True):
Args: Args:
is_synced: True if the chain is known by both ABCI client and server is_synced: True if the chain is known by both ABCI client and server
""" """
raise NotImplementedError
@singledispatch
def delete_abci_chain(conn, height):
"""Delete the ABCI chain at the given height."""
raise NotImplementedError raise NotImplementedError

View File

@ -13,6 +13,7 @@ import copy
import json import json
import sys import sys
from bigchaindb.core import rollback
from bigchaindb.migrations.chain_migration_election import ChainMigrationElection from bigchaindb.migrations.chain_migration_election import ChainMigrationElection
from bigchaindb.utils import load_node_key from bigchaindb.utils import load_node_key
from bigchaindb.common.exceptions import (DatabaseDoesNotExist, from bigchaindb.common.exceptions import (DatabaseDoesNotExist,
@ -22,8 +23,6 @@ import bigchaindb
from bigchaindb import (backend, ValidatorElection, from bigchaindb import (backend, ValidatorElection,
BigchainDB) BigchainDB)
from bigchaindb.backend import schema from bigchaindb.backend import schema
from bigchaindb.backend import query
from bigchaindb.backend.query import PRE_COMMIT_ID
from bigchaindb.commands import utils from bigchaindb.commands import utils
from bigchaindb.commands.utils import (configure_bigchaindb, from bigchaindb.commands.utils import (configure_bigchaindb,
input_on_stderr) input_on_stderr)
@ -270,16 +269,7 @@ def run_drop(args):
def run_recover(b): def run_recover(b):
pre_commit = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) rollback(b)
# Initially the pre-commit collection would be empty
if pre_commit:
latest_block = query.get_latest_block(b.connection)
# NOTE: the pre-commit state can only be ahead of the commited state
# by 1 block
if latest_block and (latest_block['height'] < pre_commit['height']):
query.delete_transactions(b.connection, pre_commit['transactions'])
@configure_bigchaindb @configure_bigchaindb

View File

@ -25,8 +25,7 @@ from bigchaindb.version import __tm_supported_versions__
from bigchaindb.utils import tendermint_version_is_compatible from bigchaindb.utils import tendermint_version_is_compatible
from bigchaindb.tendermint_utils import (decode_transaction, from bigchaindb.tendermint_utils import (decode_transaction,
calculate_hash) calculate_hash)
from bigchaindb.lib import Block, PreCommitState from bigchaindb.lib import Block
from bigchaindb.backend.query import PRE_COMMIT_ID
import bigchaindb.upsert_validator.validator_utils as vutils import bigchaindb.upsert_validator.validator_utils as vutils
from bigchaindb.events import EventTypes, Event from bigchaindb.events import EventTypes, Event
@ -207,6 +206,14 @@ class App(BaseApplication):
height = request_end_block.height + chain_shift height = request_end_block.height + chain_shift
self.new_height = height self.new_height = height
# store pre-commit state to recover in case there is a crash during
# `end_block` or `commit`
logger.debug(f'Updating pre-commit state: {self.new_height}')
pre_commit_state = dict(height=self.new_height,
transactions=self.block_txn_ids)
self.bigchaindb.store_pre_commit_state(pre_commit_state)
block_txn_hash = calculate_hash(self.block_txn_ids) block_txn_hash = calculate_hash(self.block_txn_ids)
block = self.bigchaindb.get_latest_block() block = self.bigchaindb.get_latest_block()
@ -219,12 +226,6 @@ class App(BaseApplication):
self.new_height, self.new_height,
self.block_transactions) self.block_transactions)
# Store pre-commit state to recover in case there is a crash during `commit`
pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID,
height=self.new_height,
transactions=self.block_txn_ids)
logger.debug('Updating PreCommitState: %s', self.new_height)
self.bigchaindb.store_pre_commit_state(pre_commit_state._asdict())
return ResponseEndBlock(validator_updates=validator_update) return ResponseEndBlock(validator_updates=validator_update)
def commit(self): def commit(self):
@ -257,3 +258,21 @@ class App(BaseApplication):
self.events_queue.put(event) self.events_queue.put(event)
return ResponseCommit(data=data) return ResponseCommit(data=data)
def rollback(b):
pre_commit = b.get_pre_commit_state()
if pre_commit is None:
# the pre_commit record is first stored in the first `end_block`
return
latest_block = b.get_latest_block()
if latest_block is None:
logger.error('Found precommit state but no blocks!')
sys.exit(1)
# NOTE: the pre-commit state is always at most 1 block ahead of the commited state
if latest_block['height'] < pre_commit['height']:
Election.rollback(b, pre_commit['height'], pre_commit['transactions'])
b.delete_transactions(pre_commit['transactions'])

View File

@ -250,6 +250,30 @@ class Election(Transaction):
return response return response
@classmethod
def _get_initiated_elections(cls, height, txns):
elections = []
for tx in txns:
if not isinstance(tx, Election):
continue
elections.append({'election_id': tx.id, 'height': height,
'is_concluded': False})
return elections
@classmethod
def _get_votes(cls, txns):
elections = OrderedDict()
for tx in txns:
if not isinstance(tx, Vote):
continue
election_id = tx.asset['id']
if election_id not in elections:
elections[election_id] = []
elections[election_id].append(tx)
return elections
@classmethod @classmethod
def process_block(cls, bigchain, new_height, txns): def process_block(cls, bigchain, new_height, txns):
"""Looks for election and vote transactions inside the block, records """Looks for election and vote transactions inside the block, records
@ -274,25 +298,15 @@ class Election(Transaction):
for other concluded elections, if it requires so, the method should for other concluded elections, if it requires so, the method should
rely on the database state. rely on the database state.
""" """
# elections placed in this block # elections initiated in this block
initiated_elections = [] initiated_elections = cls._get_initiated_elections(new_height, txns)
# elections voted for in this block and their votes
elections = OrderedDict()
for tx in txns:
if isinstance(tx, Election):
initiated_elections.append({'election_id': tx.id,
'height': new_height,
'is_concluded': False})
if not isinstance(tx, Vote):
continue
election_id = tx.asset['id']
if election_id not in elections:
elections[election_id] = []
elections[election_id].append(tx)
if initiated_elections: if initiated_elections:
bigchain.store_elections(initiated_elections) bigchain.store_elections(initiated_elections)
# elections voted for in this block and their votes
elections = cls._get_votes(txns)
validator_update = None validator_update = None
for election_id, votes in elections.items(): for election_id, votes in elections.items():
election = bigchain.get_transaction(election_id) election = bigchain.get_transaction(election_id)
@ -307,5 +321,34 @@ class Election(Transaction):
return [validator_update] if validator_update else [] return [validator_update] if validator_update else []
@classmethod
def rollback(cls, bigchain, new_height, txn_ids):
"""Looks for election and vote transactions inside the block and
cleans up the database artifacts possibly created in `process_blocks`.
Part of the `end_block`/`commit` crash recovery.
"""
# delete election records for elections initiated at this height and
# elections concluded at this height
bigchain.delete_elections(new_height)
txns = [bigchain.get_transaction(tx_id) for tx_id in txn_ids]
elections = cls._get_votes(txns)
for election_id in elections:
election = bigchain.get_transaction(election_id)
election.on_rollback(bigchain, new_height)
def on_approval(self, bigchain, new_height): def on_approval(self, bigchain, new_height):
"""Override to update the database state according to the
election rules. Consider the current database state to account for
other concluded elections, if required.
"""
raise NotImplementedError
def on_rollback(self, bigchain, new_height):
"""Override to clean up the database artifacts possibly created
in `on_approval`. Part of the `end_block`/`commit` crash recovery.
"""
raise NotImplementedError raise NotImplementedError

View File

@ -143,6 +143,9 @@ class BigchainDB(object):
backend.query.store_assets(self.connection, assets) backend.query.store_assets(self.connection, assets)
return backend.query.store_transactions(self.connection, txns) return backend.query.store_transactions(self.connection, txns)
def delete_transactions(self, txs):
return backend.query.delete_transactions(self.connection, txs)
def update_utxoset(self, transaction): def update_utxoset(self, transaction):
"""Update the UTXO set given ``transaction``. That is, remove """Update the UTXO set given ``transaction``. That is, remove
the outputs that the given ``transaction`` spends, and add the the outputs that the given ``transaction`` spends, and add the
@ -251,6 +254,9 @@ class BigchainDB(object):
return transaction return transaction
def get_transactions(self, txn_ids):
return backend.query.get_transactions(self.connection, txn_ids)
def get_transactions_filtered(self, asset_id, operation=None): def get_transactions_filtered(self, asset_id, operation=None):
"""Get a list of transactions filtered on some criteria """Get a list of transactions filtered on some criteria
""" """
@ -438,6 +444,9 @@ class BigchainDB(object):
def get_election(self, election_id): def get_election(self, election_id):
return backend.query.get_election(self.connection, election_id) return backend.query.get_election(self.connection, election_id)
def get_pre_commit_state(self):
return backend.query.get_pre_commit_state(self.connection)
def store_pre_commit_state(self, state): def store_pre_commit_state(self, state):
return backend.query.store_pre_commit_state(self.connection, state) return backend.query.store_pre_commit_state(self.connection, state)
@ -449,10 +458,16 @@ class BigchainDB(object):
return backend.query.store_validator_set(self.connection, {'height': height, return backend.query.store_validator_set(self.connection, {'height': height,
'validators': validators}) 'validators': validators})
def delete_validator_set(self, height):
return backend.query.delete_validator_set(self.connection, height)
def store_abci_chain(self, height, chain_id, is_synced=True): def store_abci_chain(self, height, chain_id, is_synced=True):
return backend.query.store_abci_chain(self.connection, height, return backend.query.store_abci_chain(self.connection, height,
chain_id, is_synced) chain_id, is_synced)
def delete_abci_chain(self, height):
return backend.query.delete_abci_chain(self.connection, height)
def get_latest_abci_chain(self): def get_latest_abci_chain(self):
return backend.query.get_latest_abci_chain(self.connection) return backend.query.get_latest_abci_chain(self.connection)
@ -487,7 +502,8 @@ class BigchainDB(object):
def store_elections(self, elections): def store_elections(self, elections):
return backend.query.store_elections(self.connection, elections) return backend.query.store_elections(self.connection, elections)
def delete_elections(self, height):
return backend.query.delete_elections(self.connection, height)
Block = namedtuple('Block', ('app_hash', 'height', 'transactions')) Block = namedtuple('Block', ('app_hash', 'height', 'transactions'))
PreCommitState = namedtuple('PreCommitState', ('commit_id', 'height', 'transactions'))

View File

@ -43,3 +43,6 @@ class ChainMigrationElection(Election):
] ]
output += f'\nvalidators={json.dumps(validators, indent=4)}' output += f'\nvalidators={json.dumps(validators, indent=4)}'
return output return output
def on_rollback(self, bigchain, new_height):
bigchain.delete_abci_chain(new_height)

View File

@ -61,3 +61,7 @@ class ValidatorElection(Election):
# TODO change to `new_height + 2` when upgrading to Tendermint 0.24.0. # TODO change to `new_height + 2` when upgrading to Tendermint 0.24.0.
bigchain.store_validator_set(new_height + 1, updated_validator_set) bigchain.store_validator_set(new_height + 1, updated_validator_set)
return encode_validator(self.asset['data']) return encode_validator(self.asset['data'])
def on_rollback(self, bigchaindb, new_height):
# TODO change to `new_height + 2` when upgrading to Tendermint 0.24.0.
bigchaindb.delete_validator_set(new_height + 1)

View File

@ -395,13 +395,10 @@ def test_get_unspent_outputs(db_context, utxoset):
def test_store_pre_commit_state(db_context): def test_store_pre_commit_state(db_context):
from bigchaindb.backend import query from bigchaindb.backend import query
from bigchaindb.lib import PreCommitState
state = PreCommitState(commit_id='test', state = dict(height=3, transactions=[])
height=3,
transactions=[])
query.store_pre_commit_state(db_context.conn, state._asdict()) query.store_pre_commit_state(db_context.conn, state)
cursor = db_context.conn.db.pre_commit.find({'commit_id': 'test'}, cursor = db_context.conn.db.pre_commit.find({'commit_id': 'test'},
projection={'_id': False}) projection={'_id': False})
assert cursor.collection.count_documents({}) == 1 assert cursor.collection.count_documents({}) == 1
@ -409,15 +406,11 @@ def test_store_pre_commit_state(db_context):
def test_get_pre_commit_state(db_context): def test_get_pre_commit_state(db_context):
from bigchaindb.backend import query from bigchaindb.backend import query
from bigchaindb.lib import PreCommitState
state = PreCommitState(commit_id='test2', state = dict(height=3, transactions=[])
height=3, db_context.conn.db.pre_commit.insert_one(state)
transactions=[]) resp = query.get_pre_commit_state(db_context.conn)
assert resp == state
db_context.conn.db.pre_commit.insert_one(state._asdict())
resp = query.get_pre_commit_state(db_context.conn, 'test2')
assert resp == state._asdict()
def test_validator_update(): def test_validator_update():

View File

@ -61,8 +61,8 @@ def test_create_tables():
assert indexes['election_id_height']['unique'] assert indexes['election_id_height']['unique']
indexes = conn.conn[dbname]['pre_commit'].index_information() indexes = conn.conn[dbname]['pre_commit'].index_information()
assert set(indexes.keys()) == {'_id_', 'pre_commit_id'} assert set(indexes.keys()) == {'_id_', 'height'}
assert indexes['pre_commit_id']['unique'] assert indexes['height']['unique']
def test_drop(dummy_db): def test_drop(dummy_db):

View File

@ -259,8 +259,7 @@ def test_recover_db_on_start(mock_run_recover,
def test_run_recover(b, alice, bob): def test_run_recover(b, alice, bob):
from bigchaindb.commands.bigchaindb import run_recover from bigchaindb.commands.bigchaindb import run_recover
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.lib import Block, PreCommitState from bigchaindb.lib import Block
from bigchaindb.backend.query import PRE_COMMIT_ID
from bigchaindb.backend import query from bigchaindb.backend import query
tx1 = Transaction.create([alice.public_key], tx1 = Transaction.create([alice.public_key],
@ -288,8 +287,7 @@ def test_run_recover(b, alice, bob):
b.store_block(block9) b.store_block(block9)
# create a pre_commit state which is ahead of the commit state # create a pre_commit state which is ahead of the commit state
pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID, height=10, pre_commit_state = dict(height=10, transactions=[tx2.id])
transactions=[tx2.id])._asdict()
b.store_pre_commit_state(pre_commit_state) b.store_pre_commit_state(pre_commit_state)
run_recover(b) run_recover(b)
@ -522,11 +520,13 @@ def test_chain_migration_election_show_shows_inconclusive(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
assert not run_election_show(Namespace(election_id=election.id), b) assert not run_election_show(Namespace(election_id=election.id), b)
@ -556,11 +556,13 @@ def test_chain_migration_election_show_shows_concluded(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
assert not run_election_show(Namespace(election_id=election.id), b) assert not run_election_show(Namespace(election_id=election.id), b)

View File

@ -17,11 +17,13 @@ def test_process_block_concludes_all_elections(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
txs = [election] txs = [election]
total_votes = votes total_votes = votes
@ -29,7 +31,8 @@ def test_process_block_concludes_all_elections(b):
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
new_validator['election']) new_validator['election'],
voter_keys)
txs += [election] txs += [election]
total_votes += votes total_votes += votes
@ -67,10 +70,13 @@ def test_process_block_approves_only_one_validator_update(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
new_validator['election']) new_validator['election'],
voter_keys)
txs = [election] txs = [election]
total_votes = votes total_votes = votes
@ -79,7 +85,8 @@ def test_process_block_approves_only_one_validator_update(b):
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
another_validator['election']) another_validator['election'],
voter_keys)
txs += [election] txs += [election]
total_votes += votes total_votes += votes
@ -109,10 +116,13 @@ def test_process_block_approves_after_pending_validator_update(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
new_validator['election']) new_validator['election'],
voter_keys)
txs = [election] txs = [election]
total_votes = votes total_votes = votes
@ -121,14 +131,16 @@ def test_process_block_approves_after_pending_validator_update(b):
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
another_validator['election']) another_validator['election'],
voter_keys)
txs += [election] txs += [election]
total_votes += votes total_votes += votes
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
txs += [election] txs += [election]
total_votes += votes total_votes += votes
@ -165,10 +177,13 @@ def test_process_block_does_not_approve_after_validator_update(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ValidatorElection, ValidatorElection,
public_key, private_key, public_key, private_key,
new_validator['election']) new_validator['election'],
voter_keys)
txs = [election] txs = [election]
total_votes = votes total_votes = votes
@ -181,7 +196,8 @@ def test_process_block_does_not_approve_after_validator_update(b):
second_election, second_votes = generate_election(b, second_election, second_votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
Election.process_block(b, 2, total_votes + [second_election]) Election.process_block(b, 2, total_votes + [second_election])
@ -205,17 +221,21 @@ def test_process_block_applies_only_one_migration(b):
public_key = validators[0]['public_key'] public_key = validators[0]['public_key']
private_key = validators[0]['private_key'] private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
txs = [election] txs = [election]
total_votes = votes total_votes = votes
election, votes = generate_election(b, election, votes = generate_election(b,
ChainMigrationElection, ChainMigrationElection,
public_key, private_key, public_key, private_key,
{}) {},
voter_keys)
txs += [election] txs += [election]
total_votes += votes total_votes += votes

View File

@ -22,12 +22,18 @@ from bigchaindb.backend.localmongodb import query
from bigchaindb.common.crypto import generate_key_pair from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.core import (CodeTypeOk, from bigchaindb.core import (CodeTypeOk,
CodeTypeError, CodeTypeError,
rollback
) )
from bigchaindb.elections.election import Election
from bigchaindb.lib import Block from bigchaindb.lib import Block
from bigchaindb.migrations.chain_migration_election import ChainMigrationElection
from bigchaindb.upsert_validator.validator_election import ValidatorElection
from bigchaindb.upsert_validator.validator_utils import new_validator_set from bigchaindb.upsert_validator.validator_utils import new_validator_set
from bigchaindb.tendermint_utils import public_key_to_base64 from bigchaindb.tendermint_utils import public_key_to_base64
from bigchaindb.version import __tm_supported_versions__ from bigchaindb.version import __tm_supported_versions__
from tests.utils import generate_election, generate_validators
pytestmark = pytest.mark.bdb pytestmark = pytest.mark.bdb
@ -347,39 +353,45 @@ def test_deliver_transfer_tx__double_spend_fails(b, init_chain_request):
assert result.code == CodeTypeError assert result.code == CodeTypeError
# The test below has to re-written one election conclusion logic has been implemented
@pytest.mark.skip
def test_end_block_return_validator_updates(b, init_chain_request): def test_end_block_return_validator_updates(b, init_chain_request):
from bigchaindb import App
from bigchaindb.backend import query
from bigchaindb.core import encode_validator
from bigchaindb.backend.query import VALIDATOR_UPDATE_ID
app = App(b) app = App(b)
app.init_chain(init_chain_request) app.init_chain(init_chain_request)
begin_block = RequestBeginBlock() begin_block = RequestBeginBlock()
app.begin_block(begin_block) app.begin_block(begin_block)
validator = {'pub_key': {'type': 'ed25519', # generate a block containing a concluded validator election
'data': 'B0E42D2589A455EAD339A035D6CE1C8C3E25863F268120AA0162AD7D003A4014'}, validators = generate_validators([1] * 4)
'power': 10} b.store_validator_set(1, [v['storage'] for v in validators])
validator_update = {'validator': validator,
'update_id': VALIDATOR_UPDATE_ID}
query.store_validator_update(b.connection, validator_update)
resp = app.end_block(RequestEndBlock(height=99)) new_validator = generate_validators([1])[0]
assert resp.validator_updates[0] == encode_validator(validator)
updates = b.approved_update() public_key = validators[0]['public_key']
assert not updates private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
election, votes = generate_election(b,
ValidatorElection,
public_key, private_key,
new_validator['election'],
voter_keys)
b.store_block(Block(height=1, transactions=[election.id],
app_hash='')._asdict())
b.store_bulk_transactions([election])
Election.process_block(b, 1, [election])
app.block_transactions = votes
resp = app.end_block(RequestEndBlock(height=2))
assert resp.validator_updates[0].power == new_validator['election']['power']
expected = bytes.fromhex(new_validator['election']['public_key']['value'])
assert expected == resp.validator_updates[0].pub_key.data
def test_store_pre_commit_state_in_end_block(b, alice, init_chain_request): def test_store_pre_commit_state_in_end_block(b, alice, init_chain_request):
from bigchaindb import App from bigchaindb import App
from bigchaindb.backend import query from bigchaindb.backend import query
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.backend.query import PRE_COMMIT_ID
tx = Transaction.create([alice.public_key], tx = Transaction.create([alice.public_key],
[([alice.public_key], 1)], [([alice.public_key], 1)],
@ -394,16 +406,14 @@ def test_store_pre_commit_state_in_end_block(b, alice, init_chain_request):
app.deliver_tx(encode_tx_to_bytes(tx)) app.deliver_tx(encode_tx_to_bytes(tx))
app.end_block(RequestEndBlock(height=99)) app.end_block(RequestEndBlock(height=99))
resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) resp = query.get_pre_commit_state(b.connection)
assert resp['commit_id'] == PRE_COMMIT_ID
assert resp['height'] == 99 assert resp['height'] == 99
assert resp['transactions'] == [tx.id] assert resp['transactions'] == [tx.id]
app.begin_block(begin_block) app.begin_block(begin_block)
app.deliver_tx(encode_tx_to_bytes(tx)) app.deliver_tx(encode_tx_to_bytes(tx))
app.end_block(RequestEndBlock(height=100)) app.end_block(RequestEndBlock(height=100))
resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) resp = query.get_pre_commit_state(b.connection)
assert resp['commit_id'] == PRE_COMMIT_ID
assert resp['height'] == 100 assert resp['height'] == 100
assert resp['transactions'] == [tx.id] assert resp['transactions'] == [tx.id]
@ -413,12 +423,70 @@ def test_store_pre_commit_state_in_end_block(b, alice, init_chain_request):
app.begin_block(begin_block) app.begin_block(begin_block)
app.deliver_tx(encode_tx_to_bytes(tx)) app.deliver_tx(encode_tx_to_bytes(tx))
app.end_block(RequestEndBlock(height=1)) app.end_block(RequestEndBlock(height=1))
resp = query.get_pre_commit_state(b.connection, PRE_COMMIT_ID) resp = query.get_pre_commit_state(b.connection)
assert resp['commit_id'] == PRE_COMMIT_ID
assert resp['height'] == 101 assert resp['height'] == 101
assert resp['transactions'] == [tx.id] assert resp['transactions'] == [tx.id]
def test_rollback_pre_commit_state_after_crash(b):
validators = generate_validators([1] * 4)
b.store_validator_set(1, [v['storage'] for v in validators])
b.store_block(Block(height=1, transactions=[], app_hash='')._asdict())
public_key = validators[0]['public_key']
private_key = validators[0]['private_key']
voter_keys = [v['private_key'] for v in validators]
migration_election, votes = generate_election(b,
ChainMigrationElection,
public_key, private_key,
{},
voter_keys)
total_votes = votes
txs = [migration_election, *votes]
new_validator = generate_validators([1])[0]
validator_election, votes = generate_election(b,
ValidatorElection,
public_key, private_key,
new_validator['election'],
voter_keys)
total_votes += votes
txs += [validator_election, *votes]
b.store_bulk_transactions(txs)
b.store_abci_chain(2, 'new_chain')
b.store_validator_set(2, [v['storage'] for v in validators])
# TODO change to `4` when upgrading to Tendermint 0.22.4.
b.store_validator_set(3, [new_validator['storage']])
b.store_election(migration_election.id, 2, is_concluded=False)
b.store_election(validator_election.id, 2, is_concluded=True)
# no pre-commit state
rollback(b)
for tx in txs:
assert b.get_transaction(tx.id)
assert b.get_latest_abci_chain()
assert len(b.get_validator_change()['validators']) == 1
assert b.get_election(migration_election.id)
assert b.get_election(validator_election.id)
b.store_pre_commit_state({'height': 2, 'transactions': [tx.id for tx in txs]})
rollback(b)
for tx in txs:
assert not b.get_transaction(tx.id)
assert not b.get_latest_abci_chain()
assert len(b.get_validator_change()['validators']) == 4
assert len(b.get_validator_change(2)['validators']) == 4
assert not b.get_election(migration_election.id)
assert not b.get_election(validator_election.id)
def test_new_validator_set(b): def test_new_validator_set(b):
node1 = {'public_key': {'type': 'ed25519-base64', node1 = {'public_key': {'type': 'ed25519-base64',
'value': 'FxjS2/8AFYoIUqF6AcePTc87qOT7e4WGgH+sGCpTUDQ='}, 'value': 'FxjS2/8AFYoIUqF6AcePTc87qOT7e4WGgH+sGCpTUDQ='},

View File

@ -147,27 +147,6 @@ def test_post_transaction_invalid_mode(b):
b.write_transaction(tx, 'nope') b.write_transaction(tx, 'nope')
@pytest.mark.skip
@pytest.mark.bdb
def test_validator_updates(b, validator_pub_key):
from bigchaindb.backend import query
from bigchaindb.backend.query import VALIDATOR_UPDATE_ID
# create a validator update object
validator = {'pub_key': {'type': 'ed25519',
'data': validator_pub_key},
'power': 10}
validator_update = {'validator': validator,
'update_id': VALIDATOR_UPDATE_ID}
query.store_validator_update(b.connection, validator_update)
updates = b.approved_updates()
assert updates == validator_update['validator']
b.delete_validator_update()
assert not b.approved_updates()
@pytest.mark.bdb @pytest.mark.bdb
def test_update_utxoset(b, signed_create_tx, signed_transfer_tx, db_context): def test_update_utxoset(b, signed_create_tx, signed_transfer_tx, db_context):
mongo_client = MongoClient(host=db_context.host, port=db_context.port) mongo_client = MongoClient(host=db_context.host, port=db_context.port)

View File

@ -96,7 +96,7 @@ def generate_validators(powers):
return validators return validators
def generate_election(b, cls, public_key, private_key, asset_data): def generate_election(b, cls, public_key, private_key, asset_data, voter_keys):
voters = cls.recipients(b) voters = cls.recipients(b)
election = cls.generate([public_key], election = cls.generate([public_key],
voters, voters,
@ -106,5 +106,7 @@ def generate_election(b, cls, public_key, private_key, asset_data):
votes = [Vote.generate([election.to_inputs()[i]], votes = [Vote.generate([election.to_inputs()[i]],
[([Election.to_public_key(election.id)], power)], [([Election.to_public_key(election.id)], power)],
election.id) for i, (_, power) in enumerate(voters)] election.id) for i, (_, power) in enumerate(voters)]
for key, v in zip(voter_keys, votes):
v.sign([key])
return election, votes return election, votes