mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Delete transactions after block is written (#609)
* delete transactions after block is written * cleanup transaction_exists * check for duplicate transactions * delete invalid tx from backlog * test duplicate transaction
This commit is contained in:
parent
9426c7f866
commit
404f3a1c45
@ -573,11 +573,10 @@ class Bigchain(object):
|
|||||||
block_serialized = rapidjson.dumps(block)
|
block_serialized = rapidjson.dumps(block)
|
||||||
r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn)
|
r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn)
|
||||||
|
|
||||||
# TODO: Decide if we need this method
|
|
||||||
def transaction_exists(self, transaction_id):
|
def transaction_exists(self, transaction_id):
|
||||||
response = r.table('bigchain', read_mode=self.read_mode)\
|
response = r.table('bigchain', read_mode=self.read_mode)\
|
||||||
.get_all(transaction_id, index='transaction_id').run(self.conn)
|
.get_all(transaction_id, index='transaction_id').run(self.conn)
|
||||||
return True if len(response.items) > 0 else False
|
return len(response.items) > 0
|
||||||
|
|
||||||
def prepare_genesis_block(self):
|
def prepare_genesis_block(self):
|
||||||
"""Prepare a genesis block."""
|
"""Prepare a genesis block."""
|
||||||
|
@ -45,34 +45,43 @@ class Block:
|
|||||||
tx.pop('assignment_timestamp')
|
tx.pop('assignment_timestamp')
|
||||||
return tx
|
return tx
|
||||||
|
|
||||||
def delete_tx(self, tx):
|
|
||||||
"""Delete a transaction.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
tx (dict): the transaction to delete.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The transaction.
|
|
||||||
"""
|
|
||||||
r.table('backlog')\
|
|
||||||
.get(tx['id'])\
|
|
||||||
.delete(durability='hard')\
|
|
||||||
.run(self.bigchain.conn)
|
|
||||||
|
|
||||||
return tx
|
|
||||||
|
|
||||||
def validate_tx(self, tx):
|
def validate_tx(self, tx):
|
||||||
"""Validate a transaction.
|
"""Validate a transaction.
|
||||||
|
|
||||||
|
Also checks if the transaction already exists in the blockchain. If it
|
||||||
|
does, or it's invalid, it's deleted from the backlog immediately.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
tx (dict): the transaction to validate.
|
tx (dict): the transaction to validate.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The transaction if valid, ``None`` otherwise.
|
The transaction if valid, ``None`` otherwise.
|
||||||
"""
|
"""
|
||||||
tx = self.bigchain.is_valid_transaction(tx)
|
if self.bigchain.transaction_exists(tx['id']):
|
||||||
if tx:
|
# if the transaction already exists, we must check whether
|
||||||
|
# it's in a valid or undecided block
|
||||||
|
tx, status = self.bigchain.get_transaction(tx['id'],
|
||||||
|
include_status=True)
|
||||||
|
if status == self.bigchain.TX_VALID \
|
||||||
|
or status == self.bigchain.TX_UNDECIDED:
|
||||||
|
# if the tx is already in a valid or undecided block,
|
||||||
|
# then it no longer should be in the backlog, or added
|
||||||
|
# to a new block. We can delete and drop it.
|
||||||
|
r.table('backlog').get(tx['id']) \
|
||||||
|
.delete(durability='hard') \
|
||||||
|
.run(self.bigchain.conn)
|
||||||
|
return None
|
||||||
|
|
||||||
|
tx_validated = self.bigchain.is_valid_transaction(tx)
|
||||||
|
if tx_validated:
|
||||||
return tx
|
return tx
|
||||||
|
else:
|
||||||
|
# if the transaction is not valid, remove it from the
|
||||||
|
# backlog
|
||||||
|
r.table('backlog').get(tx['id']) \
|
||||||
|
.delete(durability='hard') \
|
||||||
|
.run(self.bigchain.conn)
|
||||||
|
return None
|
||||||
|
|
||||||
def create(self, tx, timeout=False):
|
def create(self, tx, timeout=False):
|
||||||
"""Create a block.
|
"""Create a block.
|
||||||
@ -113,6 +122,22 @@ class Block:
|
|||||||
self.bigchain.write_block(block)
|
self.bigchain.write_block(block)
|
||||||
return block
|
return block
|
||||||
|
|
||||||
|
def delete_tx(self, block):
|
||||||
|
"""Delete transactions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
block (dict): the block containg the transactions to delete.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The block.
|
||||||
|
"""
|
||||||
|
r.table('backlog')\
|
||||||
|
.get_all(*[tx['id'] for tx in block['block']['transactions']])\
|
||||||
|
.delete(durability='hard')\
|
||||||
|
.run(self.bigchain.conn)
|
||||||
|
|
||||||
|
return block
|
||||||
|
|
||||||
|
|
||||||
def initial():
|
def initial():
|
||||||
"""Return old transactions from the backlog."""
|
"""Return old transactions from the backlog."""
|
||||||
@ -143,10 +168,10 @@ def create_pipeline():
|
|||||||
|
|
||||||
block_pipeline = Pipeline([
|
block_pipeline = Pipeline([
|
||||||
Node(block.filter_tx),
|
Node(block.filter_tx),
|
||||||
Node(block.delete_tx),
|
|
||||||
Node(block.validate_tx, fraction_of_cores=1),
|
Node(block.validate_tx, fraction_of_cores=1),
|
||||||
Node(block.create, timeout=1),
|
Node(block.create, timeout=1),
|
||||||
Node(block.write),
|
Node(block.write),
|
||||||
|
Node(block.delete_tx),
|
||||||
])
|
])
|
||||||
|
|
||||||
return block_pipeline
|
return block_pipeline
|
||||||
|
@ -74,22 +74,59 @@ def test_write_block(b, user_vk):
|
|||||||
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
|
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
|
||||||
|
|
||||||
|
|
||||||
|
def test_duplicate_transaction(b, user_vk):
|
||||||
|
block_maker = block.Block()
|
||||||
|
|
||||||
|
txs = []
|
||||||
|
for i in range(10):
|
||||||
|
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||||
|
tx = b.sign_transaction(tx, b.me_private)
|
||||||
|
txs.append(tx)
|
||||||
|
|
||||||
|
block_doc = b.create_block(txs)
|
||||||
|
block_maker.write(block_doc)
|
||||||
|
|
||||||
|
# block is in bigchain
|
||||||
|
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
|
||||||
|
|
||||||
|
b.write_transaction(txs[0])
|
||||||
|
|
||||||
|
# verify tx is in the backlog
|
||||||
|
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None
|
||||||
|
|
||||||
|
# try to validate a transaction that's already in the chain; should not
|
||||||
|
# work
|
||||||
|
assert block_maker.validate_tx(txs[0]) is None
|
||||||
|
|
||||||
|
# duplicate tx should be removed from backlog
|
||||||
|
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None
|
||||||
|
|
||||||
|
|
||||||
def test_delete_tx(b, user_vk):
|
def test_delete_tx(b, user_vk):
|
||||||
block_maker = block.Block()
|
block_maker = block.Block()
|
||||||
|
|
||||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
for i in range(100):
|
||||||
tx = b.sign_transaction(tx, b.me_private)
|
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||||
b.write_transaction(tx)
|
tx = b.sign_transaction(tx, b.me_private)
|
||||||
|
block_maker.create(tx)
|
||||||
|
# make sure the tx appears in the backlog
|
||||||
|
b.write_transaction(tx)
|
||||||
|
|
||||||
tx_backlog = r.table('backlog').get(tx['id']).run(b.conn)
|
# force the output triggering a `timeout`
|
||||||
tx_backlog.pop('assignee')
|
block_doc = block_maker.create(None, timeout=True)
|
||||||
tx_backlog.pop('assignment_timestamp')
|
|
||||||
assert tx_backlog == tx
|
|
||||||
|
|
||||||
returned_tx = block_maker.delete_tx(tx)
|
for tx in block_doc['block']['transactions']:
|
||||||
|
returned_tx = r.table('backlog').get(tx['id']).run(b.conn)
|
||||||
|
returned_tx.pop('assignee')
|
||||||
|
returned_tx.pop('assignment_timestamp')
|
||||||
|
assert returned_tx == tx
|
||||||
|
|
||||||
assert returned_tx == tx
|
returned_block = block_maker.delete_tx(block_doc)
|
||||||
assert r.table('backlog').get(tx['id']).run(b.conn) is None
|
|
||||||
|
assert returned_block == block_doc
|
||||||
|
|
||||||
|
for tx in block_doc['block']['transactions']:
|
||||||
|
assert r.table('backlog').get(tx['id']).run(b.conn) is None
|
||||||
|
|
||||||
|
|
||||||
def test_prefeed(b, user_vk):
|
def test_prefeed(b, user_vk):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user