mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
dedupe tx in block, reject duplicate tx in block
This commit is contained in:
parent
4a5a5566e7
commit
1d935b9ae6
@ -41,6 +41,10 @@ class InvalidSignature(BigchainDBError):
|
||||
operation"""
|
||||
|
||||
|
||||
class DuplicateTransaction(ValidationError):
|
||||
"""Raised if a duplicated transaction is found"""
|
||||
|
||||
|
||||
class DatabaseAlreadyExists(BigchainDBError):
|
||||
"""Raised when trying to create the database but the db is already there"""
|
||||
|
||||
|
@ -3,7 +3,8 @@ from bigchaindb.common.exceptions import (InvalidHash, InvalidSignature,
|
||||
OperationError, DoubleSpend,
|
||||
TransactionDoesNotExist,
|
||||
TransactionNotInValidBlock,
|
||||
AssetIdMismatch, AmountError)
|
||||
AssetIdMismatch, AmountError,
|
||||
DuplicateTransaction)
|
||||
from bigchaindb.common.transaction import Transaction
|
||||
from bigchaindb.common.utils import gen_timestamp, serialize
|
||||
from bigchaindb.common.schema import validate_transaction_schema
|
||||
@ -261,7 +262,12 @@ class Block(object):
|
||||
DoubleSpend: if the transaction is a double spend
|
||||
InvalidHash: if the hash of the transaction is wrong
|
||||
InvalidSignature: if the signature of the transaction is wrong
|
||||
ValidationError: If the block contains a duplicated TX
|
||||
"""
|
||||
txids = [tx.id for tx in self.transactions]
|
||||
if len(txids) != len(set(txids)):
|
||||
raise DuplicateTransaction('Block has duplicate transaction')
|
||||
|
||||
for tx in self.transactions:
|
||||
# If a transaction is not valid, `validate_transactions` will
|
||||
# throw an an exception and block validation will be canceled.
|
||||
|
@ -31,7 +31,7 @@ class BlockPipeline:
|
||||
def __init__(self):
|
||||
"""Initialize the BlockPipeline creator"""
|
||||
self.bigchain = Bigchain()
|
||||
self.txs = []
|
||||
self.txs = tx_collector()
|
||||
|
||||
def filter_tx(self, tx):
|
||||
"""Filter a transaction.
|
||||
@ -98,11 +98,10 @@ class BlockPipeline:
|
||||
:class:`~bigchaindb.models.Block`: The block,
|
||||
if a block is ready, or ``None``.
|
||||
"""
|
||||
if tx:
|
||||
self.txs.append(tx)
|
||||
if len(self.txs) == 1000 or (timeout and self.txs):
|
||||
block = self.bigchain.create_block(self.txs)
|
||||
self.txs = []
|
||||
txs = self.txs.send(tx)
|
||||
if len(txs) == 1000 or (timeout and txs):
|
||||
block = self.bigchain.create_block(txs)
|
||||
self.txs = tx_collector()
|
||||
return block
|
||||
|
||||
def write(self, block):
|
||||
@ -134,6 +133,27 @@ class BlockPipeline:
|
||||
return block
|
||||
|
||||
|
||||
def tx_collector():
|
||||
""" A helper to deduplicate transactions """
|
||||
|
||||
def snowflake():
|
||||
txids = set()
|
||||
txs = []
|
||||
while True:
|
||||
tx = yield txs
|
||||
if tx:
|
||||
if tx.id not in txids:
|
||||
txids.add(tx.id)
|
||||
txs.append(tx)
|
||||
else:
|
||||
logger.info('Refusing to add tx to block twice: ' +
|
||||
tx.id)
|
||||
|
||||
s = snowflake()
|
||||
s.send(None)
|
||||
return s
|
||||
|
||||
|
||||
def create_pipeline():
|
||||
"""Create and return the pipeline of operations to be distributed
|
||||
on different processes."""
|
||||
|
@ -5,27 +5,6 @@ import pytest
|
||||
pytestmark = [pytest.mark.bdb, pytest.mark.usefixtures('processes')]
|
||||
|
||||
|
||||
def test_fast_double_create(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.backend.query import count_blocks
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'test': 'test'}).sign([b.me_private])
|
||||
|
||||
# write everything fast
|
||||
b.write_transaction(tx)
|
||||
b.write_transaction(tx)
|
||||
|
||||
time.sleep(2)
|
||||
tx_returned = b.get_transaction(tx.id)
|
||||
|
||||
# test that the tx can be queried
|
||||
assert tx_returned == tx
|
||||
# test the transaction appears only once
|
||||
last_voted_block = b.get_last_voted_block()
|
||||
assert len(last_voted_block.transactions) == 1
|
||||
assert count_blocks(b.connection) == 2
|
||||
|
||||
|
||||
def test_double_create(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.backend.query import count_blocks
|
||||
|
@ -72,6 +72,7 @@ class MultipipesStepper:
|
||||
r = f(**kwargs)
|
||||
if r is not None:
|
||||
self._enqueue(next_name, r)
|
||||
return r
|
||||
|
||||
self.tasks[name] = functools.wraps(f)(inner)
|
||||
self.input_tasks.add(name)
|
||||
@ -90,6 +91,7 @@ class MultipipesStepper:
|
||||
out = f(*args, **kwargs)
|
||||
if out is not None and next:
|
||||
self._enqueue(next_name, out)
|
||||
return out
|
||||
|
||||
task = functools.wraps(f)(inner)
|
||||
self.tasks[name] = task
|
||||
@ -111,12 +113,12 @@ class MultipipesStepper:
|
||||
logging.debug('Stepping %s', name)
|
||||
task = self.tasks[name]
|
||||
if name in self.input_tasks:
|
||||
task(**kwargs)
|
||||
return task(**kwargs)
|
||||
else:
|
||||
queue = self.queues.get(name, [])
|
||||
if not queue:
|
||||
raise Empty(name)
|
||||
task(*queue.pop(0), **kwargs)
|
||||
return task(*queue.pop(0), **kwargs)
|
||||
logging.debug('Stepped %s', name)
|
||||
|
||||
@property
|
||||
|
@ -226,3 +226,12 @@ def test_full_pipeline(b, user_pk):
|
||||
block_len = len(block_doc.transactions)
|
||||
assert chained_block == block_doc
|
||||
assert number_assigned_to_others == 100 - block_len
|
||||
|
||||
|
||||
def test_block_snowflake(create_tx, signed_transfer_tx):
|
||||
from bigchaindb.pipelines.block import tx_collector
|
||||
snowflake = tx_collector()
|
||||
snowflake.send(create_tx)
|
||||
snowflake.send(signed_transfer_tx)
|
||||
snowflake.send(create_tx)
|
||||
assert snowflake.send(None) == [create_tx, signed_transfer_tx]
|
||||
|
@ -20,9 +20,26 @@ def test_stepping_changefeed_produces_update(b, steps):
|
||||
[tx.id, tx.id])
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_dupe_tx_in_block(b, steps):
|
||||
tx = input_single_create(b)
|
||||
for i in range(2):
|
||||
steps.stale_check_transactions()
|
||||
steps.stale_reassign_transactions()
|
||||
steps.block_changefeed()
|
||||
steps.block_filter_tx()
|
||||
steps.block_validate_tx()
|
||||
steps.block_validate_tx()
|
||||
assert steps.counts == {'block_create': 2}
|
||||
steps.block_create(timeout=False)
|
||||
block = steps.block_create(timeout=True)
|
||||
assert block.transactions == [tx]
|
||||
|
||||
|
||||
def input_single_create(b):
|
||||
from bigchaindb.common.transaction import Transaction
|
||||
metadata = {'r': random.random()}
|
||||
tx = Transaction.create([b.me], [([b.me], 1)], metadata)
|
||||
tx = Transaction.create([b.me], [([b.me], 1)], metadata).sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
return tx
|
||||
|
@ -163,3 +163,11 @@ class TestBlockModel(object):
|
||||
|
||||
public_key = PublicKey(b.me)
|
||||
assert public_key.verify(expected_block_serialized, block.signature)
|
||||
|
||||
def test_block_dupe_tx(self, b):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.common.exceptions import DuplicateTransaction
|
||||
tx = Transaction.create([b.me], [([b.me], 1)])
|
||||
block = b.create_block([tx, tx])
|
||||
with raises(DuplicateTransaction):
|
||||
block._validate_block_transactions(b)
|
||||
|
Loading…
x
Reference in New Issue
Block a user