diff --git a/bigchaindb/common/exceptions.py b/bigchaindb/common/exceptions.py index 60340492..c9b741b8 100644 --- a/bigchaindb/common/exceptions.py +++ b/bigchaindb/common/exceptions.py @@ -41,6 +41,10 @@ class InvalidSignature(BigchainDBError): operation""" +class DuplicateTransaction(ValidationError): + """Raised if a duplicated transaction is found""" + + class DatabaseAlreadyExists(BigchainDBError): """Raised when trying to create the database but the db is already there""" diff --git a/bigchaindb/models.py b/bigchaindb/models.py index ee7efe8f..e6a4dc73 100644 --- a/bigchaindb/models.py +++ b/bigchaindb/models.py @@ -3,7 +3,8 @@ from bigchaindb.common.exceptions import (InvalidHash, InvalidSignature, OperationError, DoubleSpend, TransactionDoesNotExist, TransactionNotInValidBlock, - AssetIdMismatch, AmountError) + AssetIdMismatch, AmountError, + DuplicateTransaction) from bigchaindb.common.transaction import Transaction from bigchaindb.common.utils import gen_timestamp, serialize from bigchaindb.common.schema import validate_transaction_schema @@ -261,7 +262,12 @@ class Block(object): DoubleSpend: if the transaction is a double spend InvalidHash: if the hash of the transaction is wrong InvalidSignature: if the signature of the transaction is wrong + DuplicateTransaction: If the block contains a duplicated TX """ + txids = [tx.id for tx in self.transactions] + if len(txids) != len(set(txids)): + raise DuplicateTransaction('Block has duplicate transaction') + for tx in self.transactions: # If a transaction is not valid, `validate_transactions` will # throw an an exception and block validation will be canceled. diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 1f2e9017..c7d7ebc1 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -31,7 +31,7 @@ class BlockPipeline: def __init__(self): """Initialize the BlockPipeline creator""" self.bigchain = Bigchain() - self.txs = [] + self.txs = tx_collector() def filter_tx(self, tx): """Filter a transaction. @@ -98,11 +98,10 @@ class BlockPipeline: :class:`~bigchaindb.models.Block`: The block, if a block is ready, or ``None``. """ - if tx: - self.txs.append(tx) - if len(self.txs) == 1000 or (timeout and self.txs): - block = self.bigchain.create_block(self.txs) - self.txs = [] + txs = self.txs.send(tx) + if len(txs) == 1000 or (timeout and txs): + block = self.bigchain.create_block(txs) + self.txs = tx_collector() return block def write(self, block): @@ -134,6 +133,27 @@ class BlockPipeline: return block +def tx_collector(): + """ A helper to deduplicate transactions """ + + def snowflake(): + txids = set() + txs = [] + while True: + tx = yield txs + if tx: + if tx.id not in txids: + txids.add(tx.id) + txs.append(tx) + else: + logger.info('Refusing to add tx to block twice: ' + + tx.id) + + s = snowflake() + s.send(None) + return s + + def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 6597a0e7..2bf0ebcd 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -5,27 +5,6 @@ import pytest pytestmark = [pytest.mark.bdb, pytest.mark.usefixtures('processes')] -def test_fast_double_create(b, user_pk): - from bigchaindb.models import Transaction - from bigchaindb.backend.query import count_blocks - tx = Transaction.create([b.me], [([user_pk], 1)], - metadata={'test': 'test'}).sign([b.me_private]) - - # write everything fast - b.write_transaction(tx) - b.write_transaction(tx) - - time.sleep(2) - tx_returned = b.get_transaction(tx.id) - - # test that the tx can be queried - assert tx_returned == tx - # test the transaction appears only once - last_voted_block = b.get_last_voted_block() - assert len(last_voted_block.transactions) == 1 - assert count_blocks(b.connection) == 2 - - def test_double_create(b, user_pk): from bigchaindb.models import Transaction from bigchaindb.backend.query import count_blocks diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py index 0e286829..030863c6 100644 --- a/tests/pipelines/stepping.py +++ b/tests/pipelines/stepping.py @@ -72,6 +72,7 @@ class MultipipesStepper: r = f(**kwargs) if r is not None: self._enqueue(next_name, r) + return r self.tasks[name] = functools.wraps(f)(inner) self.input_tasks.add(name) @@ -90,6 +91,7 @@ class MultipipesStepper: out = f(*args, **kwargs) if out is not None and next: self._enqueue(next_name, out) + return out task = functools.wraps(f)(inner) self.tasks[name] = task @@ -111,12 +113,12 @@ class MultipipesStepper: logging.debug('Stepping %s', name) task = self.tasks[name] if name in self.input_tasks: - task(**kwargs) + return task(**kwargs) else: queue = self.queues.get(name, []) if not queue: raise Empty(name) - task(*queue.pop(0), **kwargs) + return task(*queue.pop(0), **kwargs) logging.debug('Stepped %s', name) @property diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 2991f3cf..b7d3e03e 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -226,3 +226,12 @@ def test_full_pipeline(b, user_pk): block_len = len(block_doc.transactions) assert chained_block == block_doc assert number_assigned_to_others == 100 - block_len + + +def test_block_snowflake(create_tx, signed_transfer_tx): + from bigchaindb.pipelines.block import tx_collector + snowflake = tx_collector() + assert snowflake.send(create_tx) == [create_tx] + snowflake.send(signed_transfer_tx) + snowflake.send(create_tx) + assert snowflake.send(None) == [create_tx, signed_transfer_tx] diff --git a/tests/pipelines/test_steps.py b/tests/pipelines/test_steps.py index c63a673a..834162fc 100644 --- a/tests/pipelines/test_steps.py +++ b/tests/pipelines/test_steps.py @@ -20,9 +20,26 @@ def test_stepping_changefeed_produces_update(b, steps): [tx.id, tx.id]) +@pytest.mark.bdb +@pytest.mark.genesis +def test_dupe_tx_in_block(b, steps): + tx = input_single_create(b) + for i in range(2): + steps.stale_check_transactions() + steps.stale_reassign_transactions() + steps.block_changefeed() + steps.block_filter_tx() + steps.block_validate_tx() + steps.block_validate_tx() + assert steps.counts == {'block_create': 2} + steps.block_create(timeout=False) + block = steps.block_create(timeout=True) + assert block.transactions == [tx] + + def input_single_create(b): from bigchaindb.common.transaction import Transaction metadata = {'r': random.random()} - tx = Transaction.create([b.me], [([b.me], 1)], metadata) + tx = Transaction.create([b.me], [([b.me], 1)], metadata).sign([b.me_private]) b.write_transaction(tx) return tx diff --git a/tests/test_models.py b/tests/test_models.py index 8de3a6c2..59d8e0be 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -163,3 +163,11 @@ class TestBlockModel(object): public_key = PublicKey(b.me) assert public_key.verify(expected_block_serialized, block.signature) + + def test_block_dupe_tx(self, b): + from bigchaindb.models import Transaction + from bigchaindb.common.exceptions import DuplicateTransaction + tx = Transaction.create([b.me], [([b.me], 1)]) + block = b.create_block([tx, tx]) + with raises(DuplicateTransaction): + block._validate_block_transactions(b)