pipeline fixes; dont validate tx schema during block creation, parallelise tx schema validation in vote pipeline

This commit is contained in:
Scott Sadler 2017-05-22 12:41:15 +02:00
parent b6ec3e5f5c
commit 2589e16fd6
5 changed files with 59 additions and 47 deletions

View File

@ -240,11 +240,12 @@ class Block(object):
return False return False
@classmethod @classmethod
def from_dict(cls, block_body): def from_dict(cls, block_body, tx_construct=Transaction.from_dict):
"""Transform a Python dictionary to a Block object. """Transform a Python dictionary to a Block object.
Args: Args:
block_body (dict): A block dictionary to be transformed. block_body (dict): A block dictionary to be transformed.
tx_class (class): Transaction class to use
Returns: Returns:
:class:`~Block` :class:`~Block`
@ -261,8 +262,7 @@ class Block(object):
if block_id != block_body['id']: if block_id != block_body['id']:
raise InvalidHash() raise InvalidHash()
transactions = [Transaction.from_dict(tx) for tx transactions = [tx_construct(tx) for tx in block['transactions']]
in block['transactions']]
signature = block_body.get('signature') signature = block_body.get('signature')
@ -302,3 +302,22 @@ class Block(object):
def to_str(self): def to_str(self):
return serialize(self.to_dict()) return serialize(self.to_dict())
class FastTransaction:
"""
A minimal wrapper around a transaction dictionary. This is useful for
when validation is not required but a routine expects something that looks
like a transaction, for example during block creation.
Note: immutability could also be provided
"""
def __init__(self, tx_dict):
self.data = tx_dict
@property
def id(self):
return self.data['id']
def to_dict(self):
return self.data

View File

@ -12,7 +12,7 @@ from multipipes import Pipeline, Node, Pipe
import bigchaindb import bigchaindb
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Transaction from bigchaindb.models import FastTransaction
from bigchaindb.common.exceptions import ValidationError from bigchaindb.common.exceptions import ValidationError
from bigchaindb import Bigchain from bigchaindb import Bigchain
@ -57,11 +57,11 @@ class BlockPipeline:
tx (dict): the transaction to validate. tx (dict): the transaction to validate.
Returns: Returns:
:class:`~bigchaindb.models.Transaction`: The transaction if valid, :class:`~bigchaindb.models.FastTransaction`: The transaction if valid,
``None`` otherwise. ``None`` otherwise.
""" """
try: try:
tx = Transaction.from_dict(tx) tx = FastTransaction(tx)
except ValidationError: except ValidationError:
return None return None
@ -71,14 +71,7 @@ class BlockPipeline:
self.bigchain.delete_transaction(tx.id) self.bigchain.delete_transaction(tx.id)
return None return None
# If transaction is not valid it should not be included
try:
tx.validate(self.bigchain)
return tx return tx
except ValidationError as e:
logger.warning('Invalid tx: %s', e)
self.bigchain.delete_transaction(tx.id)
return None
def create(self, tx, timeout=False): def create(self, tx, timeout=False):
"""Create a block. """Create a block.

View File

@ -14,7 +14,7 @@ import bigchaindb
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Transaction, Block from bigchaindb.models import Transaction, Block, FastTransaction
from bigchaindb.common import exceptions from bigchaindb.common import exceptions
@ -44,20 +44,21 @@ class Vote:
self.counters = Counter() self.counters = Counter()
self.validity = {} self.validity = {}
self.invalid_dummy_tx = Transaction.create([self.bigchain.me], dummy_tx = Transaction.create([self.bigchain.me],
[([self.bigchain.me], 1)]) [([self.bigchain.me], 1)]).to_dict()
self.invalid_dummy_tx = dummy_tx
def validate_block(self, block): def validate_block(self, block_dict):
if not self.bigchain.has_previous_vote(block['id']): if not self.bigchain.has_previous_vote(block_dict['id']):
try: try:
block = Block.from_dict(block) block = Block.from_dict(block_dict, tx_construct=FastTransaction)
except (exceptions.InvalidHash): except (exceptions.InvalidHash):
# XXX: if a block is invalid we should skip the `validate_tx` # XXX: if a block is invalid we should skip the `validate_tx`
# step, but since we are in a pipeline we cannot just jump to # step, but since we are in a pipeline we cannot just jump to
# another function. Hackish solution: generate an invalid # another function. Hackish solution: generate an invalid
# transaction and propagate it to the next steps of the # transaction and propagate it to the next steps of the
# pipeline. # pipeline.
return block['id'], [self.invalid_dummy_tx] return block_dict['id'], [self.invalid_dummy_tx]
try: try:
block._validate_block(self.bigchain) block._validate_block(self.bigchain)
except exceptions.ValidationError: except exceptions.ValidationError:
@ -67,14 +68,14 @@ class Vote:
# transaction and propagate it to the next steps of the # transaction and propagate it to the next steps of the
# pipeline. # pipeline.
return block.id, [self.invalid_dummy_tx] return block.id, [self.invalid_dummy_tx]
return block.id, block.transactions return block.id, block_dict['block']['transactions']
def ungroup(self, block_id, transactions): def ungroup(self, block_id, transactions):
"""Given a block, ungroup the transactions in it. """Given a block, ungroup the transactions in it.
Args: Args:
block_id (str): the id of the block in progress. block_id (str): the id of the block in progress.
transactions (list(Transaction)): transactions of the block in transactions (list(dict)): transactions of the block in
progress. progress.
Returns: Returns:
@ -87,12 +88,12 @@ class Vote:
for tx in transactions: for tx in transactions:
yield tx, block_id, num_tx yield tx, block_id, num_tx
def validate_tx(self, tx, block_id, num_tx): def validate_tx(self, tx_dict, block_id, num_tx):
"""Validate a transaction. Transaction must also not be in any VALID """Validate a transaction. Transaction must also not be in any VALID
block. block.
Args: Args:
tx (dict): the transaction to validate tx_dict (dict): the transaction to validate
block_id (str): the id of block containing the transaction block_id (str): the id of block containing the transaction
num_tx (int): the total number of transactions to process num_tx (int): the total number of transactions to process
@ -100,16 +101,17 @@ class Vote:
Three values are returned, the validity of the transaction, Three values are returned, the validity of the transaction,
``block_id``, ``num_tx``. ``block_id``, ``num_tx``.
""" """
new = self.bigchain.is_new_transaction(tx.id, exclude_block_id=block_id)
if not new:
return False, block_id, num_tx
try: try:
tx = Transaction.from_dict(tx_dict)
new = self.bigchain.is_new_transaction(tx.id, exclude_block_id=block_id)
if not new:
raise exceptions.ValidationError('Tx already exists, %s', tx.id)
tx.validate(self.bigchain) tx.validate(self.bigchain)
valid = True valid = True
except exceptions.ValidationError as e: except exceptions.ValidationError as e:
logger.warning('Invalid tx: %s', e)
valid = False valid = False
logger.warning('Invalid tx: %s', e)
return valid, block_id, num_tx return valid, block_id, num_tx

View File

@ -28,15 +28,14 @@ def test_filter_by_assignee(b, signed_create_tx):
@pytest.mark.bdb @pytest.mark.bdb
def test_validate_transaction(b, create_tx): def test_validate_transaction(b):
from bigchaindb.pipelines.block import BlockPipeline from bigchaindb.pipelines.block import BlockPipeline
# validate_tx doesn't actually validate schema anymore.
tx = {'id': 'a'}
block_maker = BlockPipeline() block_maker = BlockPipeline()
assert block_maker.validate_tx(tx).data == tx
assert block_maker.validate_tx(create_tx.to_dict()) is None
valid_tx = create_tx.sign([b.me_private])
assert block_maker.validate_tx(valid_tx.to_dict()) == valid_tx
def test_validate_transaction_handles_exceptions(b, signed_create_tx): def test_validate_transaction_handles_exceptions(b, signed_create_tx):
@ -50,7 +49,7 @@ def test_validate_transaction_handles_exceptions(b, signed_create_tx):
tx_dict = signed_create_tx.to_dict() tx_dict = signed_create_tx.to_dict()
with patch('bigchaindb.models.Transaction.validate') as validate: with patch('bigchaindb.models.FastTransaction.__init__') as validate:
# Assert that validationerror gets caught # Assert that validationerror gets caught
validate.side_effect = ValidationError() validate.side_effect = ValidationError()
assert block_maker.validate_tx(tx_dict) is None assert block_maker.validate_tx(tx_dict) is None

View File

@ -84,7 +84,7 @@ def test_vote_validate_block(b):
validation = vote_obj.validate_block(block.to_dict()) validation = vote_obj.validate_block(block.to_dict())
assert validation[0] == block.id assert validation[0] == block.id
for tx1, tx2 in zip(validation[1], block.transactions): for tx1, tx2 in zip(validation[1], block.transactions):
assert tx1 == tx2 assert tx1 == tx2.to_dict()
block = b.create_block([tx]) block = b.create_block([tx])
# NOTE: Setting a blocks signature to `None` invalidates it. # NOTE: Setting a blocks signature to `None` invalidates it.
@ -142,7 +142,7 @@ def test_vote_validate_transaction(b):
from bigchaindb.pipelines import vote from bigchaindb.pipelines import vote
from bigchaindb.common.exceptions import ValidationError from bigchaindb.common.exceptions import ValidationError
tx = dummy_tx(b) tx = dummy_tx(b).to_dict()
vote_obj = vote.Vote() vote_obj = vote.Vote()
validation = vote_obj.validate_tx(tx, 123, 1) validation = vote_obj.validate_tx(tx, 123, 1)
assert validation == (True, 123, 1) assert validation == (True, 123, 1)
@ -165,15 +165,13 @@ def test_vote_accumulates_transactions(b):
vote_obj = vote.Vote() vote_obj = vote.Vote()
for _ in range(10):
tx = dummy_tx(b) tx = dummy_tx(b)
tx = tx validation = vote_obj.validate_tx(tx.to_dict(), 123, 1)
validation = vote_obj.validate_tx(tx, 123, 1)
assert validation == (True, 123, 1) assert validation == (True, 123, 1)
tx.inputs[0].fulfillment.signature = None tx.inputs[0].fulfillment.signature = None
validation = vote_obj.validate_tx(tx, 456, 10) validation = vote_obj.validate_tx(tx.to_dict(), 456, 10)
assert validation == (False, 456, 10) assert validation == (False, 456, 10)
@ -185,16 +183,17 @@ def test_valid_block_voting_sequential(b, genesis_block, monkeypatch):
monkeypatch.setattr('time.time', lambda: 1111111111) monkeypatch.setattr('time.time', lambda: 1111111111)
vote_obj = vote.Vote() vote_obj = vote.Vote()
block = dummy_block(b) block = dummy_block(b).to_dict()
txs = block['block']['transactions']
for tx, block_id, num_tx in vote_obj.ungroup(block.id, block.transactions): for tx, block_id, num_tx in vote_obj.ungroup(block['id'], txs):
last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx)) last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx))
vote_obj.write_vote(last_vote) vote_obj.write_vote(last_vote)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me) vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me)
vote_doc = vote_rs.next() vote_doc = vote_rs.next()
assert vote_doc['vote'] == {'voting_for_block': block.id, assert vote_doc['vote'] == {'voting_for_block': block['id'],
'previous_block': genesis_block.id, 'previous_block': genesis_block.id,
'is_block_valid': True, 'is_block_valid': True,
'invalid_reason': None, 'invalid_reason': None,
@ -655,7 +654,7 @@ def test_vote_no_double_inclusion(b):
tx = dummy_tx(b) tx = dummy_tx(b)
block = b.create_block([tx]) block = b.create_block([tx])
r = vote.Vote().validate_tx(tx, block.id, 1) r = vote.Vote().validate_tx(tx.to_dict(), block.id, 1)
assert r == (True, block.id, 1) assert r == (True, block.id, 1)
b.write_block(block) b.write_block(block)