From 94e006125e3828a9129c9fd00312dfa5801b5781 Mon Sep 17 00:00:00 2001 From: troymc Date: Tue, 6 Sep 2016 12:06:33 +0200 Subject: [PATCH 1/4] docs: listed all ports expecting unsolicited inbound traffic --- docs/source/appendices/firewall-notes.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/source/appendices/firewall-notes.md b/docs/source/appendices/firewall-notes.md index dca89ac6..bad09b05 100644 --- a/docs/source/appendices/firewall-notes.md +++ b/docs/source/appendices/firewall-notes.md @@ -1,6 +1,17 @@ # Notes for Firewall Setup -This is a page of notes on the ports used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (or security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide. +This is a page of notes on the ports potentially used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (and security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide. + + +## Expected Unsolicited Inbound Traffic + +Assuming you aren't exposing the RethinkDB web interface on port 8080 (or any other port, because [there are more secure ways to access it](https://www.rethinkdb.com/docs/security/#binding-the-web-interface-port)), there are only three ports that should expect unsolicited inbound traffic: + +1. **Port 22** can expect inbound SSH (TCP) traffic from the node administrator (i.e. a small set of IP addresses). +2. **Port 9984** can expect inbound HTTP (TCP) traffic from BigchainDB clients sending transactions to the BigchainDB HTTP API. +3. **Port 29015** can expect inbound TCP traffic from other RethinkDB nodes in the RethinkDB cluster (for RethinkDB intracluster communications). + +All other ports should only get inbound traffic in response to specific requests from inside the node. ## Port 22 From 92981e003dbe37d1e54144950ae0e3c009ccb60d Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Wed, 7 Sep 2016 16:26:41 +0200 Subject: [PATCH 2/4] Core/198/handle stale transactions (#359) * add timestamp to transaction assignment * add reassignment delay to configuration * refactor to multipipes * # This is a combination of 7 commits. # The first commit's message is: stale transaction monitor and tests # The 2nd commit message will be skipped: # simplify logic # The 3rd commit message will be skipped: # node will assign to self # The 4th commit message will be skipped: # block listens for insert and update # The 5th commit message will be skipped: # more test coverage # The 6th commit message will be skipped: # test coverage # The 7th commit message will be skipped: # test coverage * stale transaction monitor and tests * update operation only returns new value --- bigchaindb/__init__.py | 1 + bigchaindb/commands/bigchain.py | 5 ++ bigchaindb/core.py | 47 +++++++++- bigchaindb/pipelines/block.py | 3 +- bigchaindb/pipelines/stale.py | 76 ++++++++++++++++ bigchaindb/pipelines/utils.py | 2 +- bigchaindb/processes.py | 5 +- tests/db/test_bigchain_api.py | 1 + tests/pipelines/test_block_creation.py | 1 + tests/pipelines/test_election.py | 1 + tests/pipelines/test_stale_monitor.py | 116 +++++++++++++++++++++++++ tests/pipelines/test_utils.py | 6 +- tests/test_commands.py | 1 + tests/test_config_utils.py | 4 +- tests/test_core.py | 3 +- tests/test_processes.py | 23 +++++ 16 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 bigchaindb/pipelines/stale.py create mode 100644 tests/pipelines/test_stale_monitor.py create mode 100644 tests/test_processes.py diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index ee864ead..d5fe15e6 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -31,6 +31,7 @@ config = { }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..5edca2c4 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False): input('Statsd {}? (default `{}`): '.format(key, val)) \ or val + val = conf['backlog_reassign_delay'] + conf['backlog_reassign_delay'] = \ + input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \ + or val + if config_path != '-': bigchaindb.config_utils.write_config(conf, config_path) else: diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..cc753a42 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -2,6 +2,7 @@ import random import math import collections from copy import deepcopy +from time import time from itertools import compress import rethinkdb as r @@ -28,7 +29,7 @@ class Bigchain(object): def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], - consensus_plugin=None): + consensus_plugin=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -56,6 +57,7 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] + self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = config_utils.load_consensus_plugin(consensus_plugin) # change RethinkDB read mode to majority. This ensures consistency in query results self.read_mode = 'majority' @@ -136,11 +138,54 @@ class Bigchain(object): signed_transaction = deepcopy(signed_transaction) # update the transaction signed_transaction.update({'assignee': assignee}) + signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) return response + def reassign_transaction(self, transaction, durability='hard'): + """Assign a transaction to a new node + + Args: + transaction (dict): assigned transaction + + Returns: + dict: database response or None if no reassignment is possible + """ + + if self.nodes_except_me: + try: + federation_nodes = self.nodes_except_me + [self.me] + index_current_assignee = federation_nodes.index(transaction['assignee']) + new_assignee = random.choice(federation_nodes[:index_current_assignee] + + federation_nodes[index_current_assignee + 1:]) + except ValueError: + # current assignee not in federation + new_assignee = random.choice(self.nodes_except_me) + + else: + # There is no other node to assign to + new_assignee = self.me + + response = r.table('backlog')\ + .get(transaction['id'])\ + .update({'assignee': new_assignee, + 'assignment_timestamp': time()}, + durability=durability).run(self.conn) + return response + + def get_stale_transactions(self): + """Get a RethinkDB cursor of stale transactions + + Transactions are considered stale if they have been assigned a node, but are still in the + backlog after some amount of time specified in the configuration + """ + + return r.table('backlog')\ + .filter(lambda tx: time() - tx['assignment_timestamp'] > + self.backlog_reassign_delay).run(self.conn) + def get_transaction(self, txid, include_status=False): """Retrieve a transaction with `txid` from bigchain. diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..47fbd008 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -130,7 +130,8 @@ def initial(): def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) def create_pipeline(): diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py new file mode 100644 index 00000000..e1e14e21 --- /dev/null +++ b/bigchaindb/pipelines/stale.py @@ -0,0 +1,76 @@ +"""This module monitors for stale transactions. + +It reassigns transactions which have been assigned a node but +remain in the backlog past a certain amount of time. +""" + +import logging +from multipipes import Pipeline, Node +from bigchaindb import Bigchain +from time import sleep + + +logger = logging.getLogger(__name__) + + +class StaleTransactionMonitor: + """This class encapsulates the logic for re-assigning stale transactions. + + Note: + Methods of this class will be executed in different processes. + """ + + def __init__(self, timeout=5, backlog_reassign_delay=None): + """Initialize StaleTransaction monitor + + Args: + timeout: how often to check for stale tx (in sec) + backlog_reassign_delay: How stale a transaction should + be before reassignment (in sec). If supplied, overrides the + Bigchain default value. + """ + self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay) + self.timeout = timeout + + def check_transactions(self): + """Poll backlog for stale transactions + + Returns: + txs (list): txs to be re assigned + """ + sleep(self.timeout) + for tx in self.bigchain.get_stale_transactions(): + yield tx + + def reassign_transactions(self, tx): + """Put tx back in backlog with new assignee + + Returns: + transaction + """ + self.bigchain.reassign_transaction(tx) + return tx + + +def create_pipeline(timeout=5, backlog_reassign_delay=5): + """Create and return the pipeline of operations to be distributed + on different processes.""" + + stm = StaleTransactionMonitor(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + + monitor_pipeline = Pipeline([ + Node(stm.check_transactions), + Node(stm.reassign_transactions) + ]) + + return monitor_pipeline + + +def start(timeout=5, backlog_reassign_delay=5): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + pipeline.setup() + pipeline.start() + return pipeline diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..9c28907e 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -58,5 +58,5 @@ class ChangeFeed(Node): elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change) + self.outqueue.put(change['new_val']) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 4b8aa0eb..c3625fdc 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.pipelines import vote, block, election +from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.web import server @@ -31,6 +31,9 @@ def start(): logger.info('Starting voter') vote.start() + logger.info('Starting stale transaction monitor') + stale.start() + logger.info('Starting election') election.start() diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7242402a..166e4929 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -132,6 +132,7 @@ class TestBigchainApi(object): response, status = b.get_transaction(tx_signed["id"], include_status=True) response.pop('assignee') + response.pop('assignment_timestamp') # add validity information, which will be returned assert util.serialize(tx_signed) == util.serialize(response) assert status == b.TX_IN_BACKLOG diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ce300730..4abf72cc 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -78,6 +78,7 @@ def test_delete_tx(b, user_vk): tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx returned_tx = block_maker.delete_tx(tx) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a71714df..02a0e39d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk): e.requeue_transactions(test_block) tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx1 diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py new file mode 100644 index 00000000..f6cb4a0b --- /dev/null +++ b/tests/pipelines/test_stale_monitor.py @@ -0,0 +1,116 @@ +import rethinkdb as r +from bigchaindb import Bigchain +from bigchaindb.pipelines import stale +from multipipes import Pipe, Pipeline +from unittest.mock import patch +from bigchaindb import config_utils +import time +import os + + +def test_get_stale(b, user_vk): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + tx_stale = stm.check_transactions() + + for _tx in tx_stale: + _tx.pop('assignee') + _tx.pop('assignment_timestamp') + assert tx == _tx + + +def test_reassign_transactions(b, user_vk): + # test with single node + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.reassign_transactions(tx) + + # test with federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + + reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn) + assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] + assert reassigned_tx['assignee'] != tx['assignee'] + + # test with node not in federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx.update({'assignee': 'lol'}) + tx.update({'assignment_timestamp': time.time()}) + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol' + + +def test_full_pipeline(user_vk): + CONFIG = { + 'database': { + 'name': 'bigchain_test_{}'.format(os.getpid()) + }, + 'keypair': { + 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + }, + 'keyring': ['aaa', 'bbb'], + 'backlog_reassign_delay': 0.01 + } + config_utils.set_config(CONFIG) + b = Bigchain() + outpipe = Pipe() + + original_txs = {} + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + b.write_transaction(tx) + original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = stale.create_pipeline(backlog_reassign_delay=1, + timeout=1) + pipeline.setup(outdata=outpipe) + pipeline.start() + + # timing should be careful -- test will fail if reassignment happens multiple times + time.sleep(2) + pipeline.terminate() + + # to terminate + outpipe.get() + + assert r.table('backlog').count().run(b.conn) == 100 + reassigned_txs = list(r.table('backlog').run(b.conn)) + + # check that every assignment timestamp has increased, and every tx has a new assignee + for reassigned_tx in reassigned_txs: + assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] + assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + stale.start() + mock_start.assert_called_with() diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..f46c1a9d 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -44,8 +44,7 @@ def test_changefeed_update(mock_run): changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 @@ -56,8 +55,7 @@ def test_changefeed_multiple_operations(mock_run): changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..0b8df348 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch): 'keypair': {}, 'database': {'host': 'host', 'port': 12345, 'name': 'adbname'}, 'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb._config', config) diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index bce2c9a8..4d2b3a1e 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -145,7 +145,8 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch): file_config = { - 'database': {'host': 'test-host'} + 'database': {'host': 'test-host'}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config) monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname', @@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 5 } diff --git a/tests/test_core.py b/tests/test_core.py index 2650ff37..397158d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'consensus_plugin': 'default' + 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } monkeypatch.setattr('bigchaindb.config', config) diff --git a/tests/test_processes.py b/tests/test_processes.py new file mode 100644 index 00000000..7013dc03 --- /dev/null +++ b/tests/test_processes.py @@ -0,0 +1,23 @@ +from unittest.mock import patch + +from multiprocessing import Process +from bigchaindb.pipelines import vote, block, election, stale + + +@patch.object(stale, 'start') +@patch.object(election, 'start') +@patch.object(block, 'start') +@patch.object(vote, 'start') +@patch.object(Process, 'start') +def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, + mock_process): + from bigchaindb import processes + + processes.start() + + mock_vote.assert_called_with() + mock_block.assert_called_with() + mock_election.assert_called_with() + mock_stale.assert_called_with() + mock_process.assert_called_with() + From 9426c7f866e67f096549be50dd66b6f04ca0aa79 Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Thu, 8 Sep 2016 11:26:25 +0200 Subject: [PATCH 3/4] fix assignment timestamp in block (#627) --- bigchaindb/pipelines/block.py | 1 + tests/pipelines/test_block_creation.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 47fbd008..1d42423f 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -42,6 +42,7 @@ class Block: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') + tx.pop('assignment_timestamp') return tx def delete_tx(self, tx): diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 4abf72cc..a41f65b5 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk): tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) tx['assignee'] = b.me + tx['assignment_timestamp'] = 111 # filter_tx has side effects on the `tx` instance by popping 'assignee' - assert block_maker.filter_tx(tx) == tx + # and 'assignment_timestamp' + filtered_tx = block_maker.filter_tx(tx) + assert filtered_tx == tx + assert 'assignee' not in filtered_tx + assert 'assignment_timestamp' not in filtered_tx tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) @@ -116,6 +121,7 @@ def test_full_pipeline(b, user_vk): tx = b.sign_transaction(tx, b.me_private) assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) tx['assignee'] = assignee + tx['assignment_timestamp'] = time.time() if assignee == b.me: count_assigned_to_me += 1 r.table('backlog').insert(tx, durability='hard').run(b.conn) From 404f3a1c4582607e95b0d8a567d6f9b77f19ec2b Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Thu, 8 Sep 2016 11:45:48 +0200 Subject: [PATCH 4/4] Delete transactions after block is written (#609) * delete transactions after block is written * cleanup transaction_exists * check for duplicate transactions * delete invalid tx from backlog * test duplicate transaction --- bigchaindb/core.py | 3 +- bigchaindb/pipelines/block.py | 63 ++++++++++++++++++-------- tests/pipelines/test_block_creation.py | 57 +++++++++++++++++++---- 3 files changed, 92 insertions(+), 31 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index cc753a42..132a9ce7 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -573,11 +573,10 @@ class Bigchain(object): block_serialized = rapidjson.dumps(block) r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) - # TODO: Decide if we need this method def transaction_exists(self, transaction_id): response = r.table('bigchain', read_mode=self.read_mode)\ .get_all(transaction_id, index='transaction_id').run(self.conn) - return True if len(response.items) > 0 else False + return len(response.items) > 0 def prepare_genesis_block(self): """Prepare a genesis block.""" diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 1d42423f..1cd2f6c9 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -45,34 +45,43 @@ class Block: tx.pop('assignment_timestamp') return tx - def delete_tx(self, tx): - """Delete a transaction. - - Args: - tx (dict): the transaction to delete. - - Returns: - The transaction. - """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) - - return tx - def validate_tx(self, tx): """Validate a transaction. + Also checks if the transaction already exists in the blockchain. If it + does, or it's invalid, it's deleted from the backlog immediately. + Args: tx (dict): the transaction to validate. Returns: The transaction if valid, ``None`` otherwise. """ - tx = self.bigchain.is_valid_transaction(tx) - if tx: + if self.bigchain.transaction_exists(tx['id']): + # if the transaction already exists, we must check whether + # it's in a valid or undecided block + tx, status = self.bigchain.get_transaction(tx['id'], + include_status=True) + if status == self.bigchain.TX_VALID \ + or status == self.bigchain.TX_UNDECIDED: + # if the tx is already in a valid or undecided block, + # then it no longer should be in the backlog, or added + # to a new block. We can delete and drop it. + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None + + tx_validated = self.bigchain.is_valid_transaction(tx) + if tx_validated: return tx + else: + # if the transaction is not valid, remove it from the + # backlog + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None def create(self, tx, timeout=False): """Create a block. @@ -113,6 +122,22 @@ class Block: self.bigchain.write_block(block) return block + def delete_tx(self, block): + """Delete transactions. + + Args: + block (dict): the block containg the transactions to delete. + + Returns: + The block. + """ + r.table('backlog')\ + .get_all(*[tx['id'] for tx in block['block']['transactions']])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return block + def initial(): """Return old transactions from the backlog.""" @@ -143,10 +168,10 @@ def create_pipeline(): block_pipeline = Pipeline([ Node(block.filter_tx), - Node(block.delete_tx), Node(block.validate_tx, fraction_of_cores=1), Node(block.create, timeout=1), Node(block.write), + Node(block.delete_tx), ]) return block_pipeline diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index a41f65b5..a1ab6a19 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -74,22 +74,59 @@ def test_write_block(b, user_vk): assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc +def test_duplicate_transaction(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(10): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + # block is in bigchain + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + b.write_transaction(txs[0]) + + # verify tx is in the backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None + + # try to validate a transaction that's already in the chain; should not + # work + assert block_maker.validate_tx(txs[0]) is None + + # duplicate tx should be removed from backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None + + def test_delete_tx(b, user_vk): block_maker = block.Block() - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + # make sure the tx appears in the backlog + b.write_transaction(tx) - tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) - tx_backlog.pop('assignee') - tx_backlog.pop('assignment_timestamp') - assert tx_backlog == tx + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) - returned_tx = block_maker.delete_tx(tx) + for tx in block_doc['block']['transactions']: + returned_tx = r.table('backlog').get(tx['id']).run(b.conn) + returned_tx.pop('assignee') + returned_tx.pop('assignment_timestamp') + assert returned_tx == tx - assert returned_tx == tx - assert r.table('backlog').get(tx['id']).run(b.conn) is None + returned_block = block_maker.delete_tx(block_doc) + + assert returned_block == block_doc + + for tx in block_doc['block']['transactions']: + assert r.table('backlog').get(tx['id']).run(b.conn) is None def test_prefeed(b, user_vk):