diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index ee864ead..d5fe15e6 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -31,6 +31,7 @@ config = { }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..5edca2c4 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False): input('Statsd {}? (default `{}`): '.format(key, val)) \ or val + val = conf['backlog_reassign_delay'] + conf['backlog_reassign_delay'] = \ + input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \ + or val + if config_path != '-': bigchaindb.config_utils.write_config(conf, config_path) else: diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..cc753a42 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -2,6 +2,7 @@ import random import math import collections from copy import deepcopy +from time import time from itertools import compress import rethinkdb as r @@ -28,7 +29,7 @@ class Bigchain(object): def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], - consensus_plugin=None): + consensus_plugin=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -56,6 +57,7 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] + self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = config_utils.load_consensus_plugin(consensus_plugin) # change RethinkDB read mode to majority. This ensures consistency in query results self.read_mode = 'majority' @@ -136,11 +138,54 @@ class Bigchain(object): signed_transaction = deepcopy(signed_transaction) # update the transaction signed_transaction.update({'assignee': assignee}) + signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) return response + def reassign_transaction(self, transaction, durability='hard'): + """Assign a transaction to a new node + + Args: + transaction (dict): assigned transaction + + Returns: + dict: database response or None if no reassignment is possible + """ + + if self.nodes_except_me: + try: + federation_nodes = self.nodes_except_me + [self.me] + index_current_assignee = federation_nodes.index(transaction['assignee']) + new_assignee = random.choice(federation_nodes[:index_current_assignee] + + federation_nodes[index_current_assignee + 1:]) + except ValueError: + # current assignee not in federation + new_assignee = random.choice(self.nodes_except_me) + + else: + # There is no other node to assign to + new_assignee = self.me + + response = r.table('backlog')\ + .get(transaction['id'])\ + .update({'assignee': new_assignee, + 'assignment_timestamp': time()}, + durability=durability).run(self.conn) + return response + + def get_stale_transactions(self): + """Get a RethinkDB cursor of stale transactions + + Transactions are considered stale if they have been assigned a node, but are still in the + backlog after some amount of time specified in the configuration + """ + + return r.table('backlog')\ + .filter(lambda tx: time() - tx['assignment_timestamp'] > + self.backlog_reassign_delay).run(self.conn) + def get_transaction(self, txid, include_status=False): """Retrieve a transaction with `txid` from bigchain. diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..47fbd008 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -130,7 +130,8 @@ def initial(): def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) def create_pipeline(): diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py new file mode 100644 index 00000000..e1e14e21 --- /dev/null +++ b/bigchaindb/pipelines/stale.py @@ -0,0 +1,76 @@ +"""This module monitors for stale transactions. + +It reassigns transactions which have been assigned a node but +remain in the backlog past a certain amount of time. +""" + +import logging +from multipipes import Pipeline, Node +from bigchaindb import Bigchain +from time import sleep + + +logger = logging.getLogger(__name__) + + +class StaleTransactionMonitor: + """This class encapsulates the logic for re-assigning stale transactions. + + Note: + Methods of this class will be executed in different processes. + """ + + def __init__(self, timeout=5, backlog_reassign_delay=None): + """Initialize StaleTransaction monitor + + Args: + timeout: how often to check for stale tx (in sec) + backlog_reassign_delay: How stale a transaction should + be before reassignment (in sec). If supplied, overrides the + Bigchain default value. + """ + self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay) + self.timeout = timeout + + def check_transactions(self): + """Poll backlog for stale transactions + + Returns: + txs (list): txs to be re assigned + """ + sleep(self.timeout) + for tx in self.bigchain.get_stale_transactions(): + yield tx + + def reassign_transactions(self, tx): + """Put tx back in backlog with new assignee + + Returns: + transaction + """ + self.bigchain.reassign_transaction(tx) + return tx + + +def create_pipeline(timeout=5, backlog_reassign_delay=5): + """Create and return the pipeline of operations to be distributed + on different processes.""" + + stm = StaleTransactionMonitor(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + + monitor_pipeline = Pipeline([ + Node(stm.check_transactions), + Node(stm.reassign_transactions) + ]) + + return monitor_pipeline + + +def start(timeout=5, backlog_reassign_delay=5): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + pipeline.setup() + pipeline.start() + return pipeline diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..9c28907e 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -58,5 +58,5 @@ class ChangeFeed(Node): elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change) + self.outqueue.put(change['new_val']) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 4b8aa0eb..c3625fdc 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.pipelines import vote, block, election +from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.web import server @@ -31,6 +31,9 @@ def start(): logger.info('Starting voter') vote.start() + logger.info('Starting stale transaction monitor') + stale.start() + logger.info('Starting election') election.start() diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7242402a..166e4929 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -132,6 +132,7 @@ class TestBigchainApi(object): response, status = b.get_transaction(tx_signed["id"], include_status=True) response.pop('assignee') + response.pop('assignment_timestamp') # add validity information, which will be returned assert util.serialize(tx_signed) == util.serialize(response) assert status == b.TX_IN_BACKLOG diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ce300730..4abf72cc 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -78,6 +78,7 @@ def test_delete_tx(b, user_vk): tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx returned_tx = block_maker.delete_tx(tx) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a71714df..02a0e39d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk): e.requeue_transactions(test_block) tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx1 diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py new file mode 100644 index 00000000..f6cb4a0b --- /dev/null +++ b/tests/pipelines/test_stale_monitor.py @@ -0,0 +1,116 @@ +import rethinkdb as r +from bigchaindb import Bigchain +from bigchaindb.pipelines import stale +from multipipes import Pipe, Pipeline +from unittest.mock import patch +from bigchaindb import config_utils +import time +import os + + +def test_get_stale(b, user_vk): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + tx_stale = stm.check_transactions() + + for _tx in tx_stale: + _tx.pop('assignee') + _tx.pop('assignment_timestamp') + assert tx == _tx + + +def test_reassign_transactions(b, user_vk): + # test with single node + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.reassign_transactions(tx) + + # test with federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + + reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn) + assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] + assert reassigned_tx['assignee'] != tx['assignee'] + + # test with node not in federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx.update({'assignee': 'lol'}) + tx.update({'assignment_timestamp': time.time()}) + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol' + + +def test_full_pipeline(user_vk): + CONFIG = { + 'database': { + 'name': 'bigchain_test_{}'.format(os.getpid()) + }, + 'keypair': { + 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + }, + 'keyring': ['aaa', 'bbb'], + 'backlog_reassign_delay': 0.01 + } + config_utils.set_config(CONFIG) + b = Bigchain() + outpipe = Pipe() + + original_txs = {} + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + b.write_transaction(tx) + original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = stale.create_pipeline(backlog_reassign_delay=1, + timeout=1) + pipeline.setup(outdata=outpipe) + pipeline.start() + + # timing should be careful -- test will fail if reassignment happens multiple times + time.sleep(2) + pipeline.terminate() + + # to terminate + outpipe.get() + + assert r.table('backlog').count().run(b.conn) == 100 + reassigned_txs = list(r.table('backlog').run(b.conn)) + + # check that every assignment timestamp has increased, and every tx has a new assignee + for reassigned_tx in reassigned_txs: + assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] + assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + stale.start() + mock_start.assert_called_with() diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..f46c1a9d 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -44,8 +44,7 @@ def test_changefeed_update(mock_run): changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 @@ -56,8 +55,7 @@ def test_changefeed_multiple_operations(mock_run): changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..0b8df348 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch): 'keypair': {}, 'database': {'host': 'host', 'port': 12345, 'name': 'adbname'}, 'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb._config', config) diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index bce2c9a8..4d2b3a1e 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -145,7 +145,8 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch): file_config = { - 'database': {'host': 'test-host'} + 'database': {'host': 'test-host'}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config) monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname', @@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 5 } diff --git a/tests/test_core.py b/tests/test_core.py index 2650ff37..397158d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'consensus_plugin': 'default' + 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } monkeypatch.setattr('bigchaindb.config', config) diff --git a/tests/test_processes.py b/tests/test_processes.py new file mode 100644 index 00000000..7013dc03 --- /dev/null +++ b/tests/test_processes.py @@ -0,0 +1,23 @@ +from unittest.mock import patch + +from multiprocessing import Process +from bigchaindb.pipelines import vote, block, election, stale + + +@patch.object(stale, 'start') +@patch.object(election, 'start') +@patch.object(block, 'start') +@patch.object(vote, 'start') +@patch.object(Process, 'start') +def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, + mock_process): + from bigchaindb import processes + + processes.start() + + mock_vote.assert_called_with() + mock_block.assert_called_with() + mock_election.assert_called_with() + mock_stale.assert_called_with() + mock_process.assert_called_with() +