diff --git a/bigchaindb/core.py b/bigchaindb/core.py index cc753a42..132a9ce7 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -573,11 +573,10 @@ class Bigchain(object): block_serialized = rapidjson.dumps(block) r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) - # TODO: Decide if we need this method def transaction_exists(self, transaction_id): response = r.table('bigchain', read_mode=self.read_mode)\ .get_all(transaction_id, index='transaction_id').run(self.conn) - return True if len(response.items) > 0 else False + return len(response.items) > 0 def prepare_genesis_block(self): """Prepare a genesis block.""" diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 1d42423f..1cd2f6c9 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -45,34 +45,43 @@ class Block: tx.pop('assignment_timestamp') return tx - def delete_tx(self, tx): - """Delete a transaction. - - Args: - tx (dict): the transaction to delete. - - Returns: - The transaction. - """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) - - return tx - def validate_tx(self, tx): """Validate a transaction. + Also checks if the transaction already exists in the blockchain. If it + does, or it's invalid, it's deleted from the backlog immediately. + Args: tx (dict): the transaction to validate. Returns: The transaction if valid, ``None`` otherwise. """ - tx = self.bigchain.is_valid_transaction(tx) - if tx: + if self.bigchain.transaction_exists(tx['id']): + # if the transaction already exists, we must check whether + # it's in a valid or undecided block + tx, status = self.bigchain.get_transaction(tx['id'], + include_status=True) + if status == self.bigchain.TX_VALID \ + or status == self.bigchain.TX_UNDECIDED: + # if the tx is already in a valid or undecided block, + # then it no longer should be in the backlog, or added + # to a new block. We can delete and drop it. + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None + + tx_validated = self.bigchain.is_valid_transaction(tx) + if tx_validated: return tx + else: + # if the transaction is not valid, remove it from the + # backlog + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None def create(self, tx, timeout=False): """Create a block. @@ -113,6 +122,22 @@ class Block: self.bigchain.write_block(block) return block + def delete_tx(self, block): + """Delete transactions. + + Args: + block (dict): the block containg the transactions to delete. + + Returns: + The block. + """ + r.table('backlog')\ + .get_all(*[tx['id'] for tx in block['block']['transactions']])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return block + def initial(): """Return old transactions from the backlog.""" @@ -143,10 +168,10 @@ def create_pipeline(): block_pipeline = Pipeline([ Node(block.filter_tx), - Node(block.delete_tx), Node(block.validate_tx, fraction_of_cores=1), Node(block.create, timeout=1), Node(block.write), + Node(block.delete_tx), ]) return block_pipeline diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index a41f65b5..a1ab6a19 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -74,22 +74,59 @@ def test_write_block(b, user_vk): assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc +def test_duplicate_transaction(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(10): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + # block is in bigchain + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + b.write_transaction(txs[0]) + + # verify tx is in the backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None + + # try to validate a transaction that's already in the chain; should not + # work + assert block_maker.validate_tx(txs[0]) is None + + # duplicate tx should be removed from backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None + + def test_delete_tx(b, user_vk): block_maker = block.Block() - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + # make sure the tx appears in the backlog + b.write_transaction(tx) - tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) - tx_backlog.pop('assignee') - tx_backlog.pop('assignment_timestamp') - assert tx_backlog == tx + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) - returned_tx = block_maker.delete_tx(tx) + for tx in block_doc['block']['transactions']: + returned_tx = r.table('backlog').get(tx['id']).run(b.conn) + returned_tx.pop('assignee') + returned_tx.pop('assignment_timestamp') + assert returned_tx == tx - assert returned_tx == tx - assert r.table('backlog').get(tx['id']).run(b.conn) is None + returned_block = block_maker.delete_tx(block_doc) + + assert returned_block == block_doc + + for tx in block_doc['block']['transactions']: + assert r.table('backlog').get(tx['id']).run(b.conn) is None def test_prefeed(b, user_vk):