diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index ee864ead..d5fe15e6 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -31,6 +31,7 @@ config = { }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..5edca2c4 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False): input('Statsd {}? (default `{}`): '.format(key, val)) \ or val + val = conf['backlog_reassign_delay'] + conf['backlog_reassign_delay'] = \ + input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \ + or val + if config_path != '-': bigchaindb.config_utils.write_config(conf, config_path) else: diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 2248be5e..6a53cda6 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -2,6 +2,7 @@ import random import math import collections from copy import deepcopy +from time import time from itertools import compress import rethinkdb as r @@ -29,7 +30,7 @@ class Bigchain(object): def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], - consensus_plugin=None): + consensus_plugin=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -57,6 +58,7 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] + self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = config_utils.load_consensus_plugin(consensus_plugin) # change RethinkDB read mode to majority. This ensures consistency in query results self.read_mode = 'majority' @@ -138,6 +140,7 @@ class Bigchain(object): signed_transaction = deepcopy(signed_transaction) # update the transaction signed_transaction.update({'assignee': assignee}) + signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog response = self.connection.run( @@ -145,6 +148,48 @@ class Bigchain(object): .insert(signed_transaction, durability=durability)) return response + def reassign_transaction(self, transaction, durability='hard'): + """Assign a transaction to a new node + + Args: + transaction (dict): assigned transaction + + Returns: + dict: database response or None if no reassignment is possible + """ + + if self.nodes_except_me: + try: + federation_nodes = self.nodes_except_me + [self.me] + index_current_assignee = federation_nodes.index(transaction['assignee']) + new_assignee = random.choice(federation_nodes[:index_current_assignee] + + federation_nodes[index_current_assignee + 1:]) + except ValueError: + # current assignee not in federation + new_assignee = random.choice(self.nodes_except_me) + + else: + # There is no other node to assign to + new_assignee = self.me + + response = r.table('backlog')\ + .get(transaction['id'])\ + .update({'assignee': new_assignee, + 'assignment_timestamp': time()}, + durability=durability).run(self.conn) + return response + + def get_stale_transactions(self): + """Get a RethinkDB cursor of stale transactions + + Transactions are considered stale if they have been assigned a node, but are still in the + backlog after some amount of time specified in the configuration + """ + + return r.table('backlog')\ + .filter(lambda tx: time() - tx['assignment_timestamp'] > + self.backlog_reassign_delay).run(self.conn) + def get_transaction(self, txid, include_status=False): """Retrieve a transaction with `txid` from bigchain. @@ -540,12 +585,11 @@ class Bigchain(object): r.table('bigchain') .insert(r.json(block_serialized), durability=durability)) - # TODO: Decide if we need this method def transaction_exists(self, transaction_id): response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) + r.table('bigchain', read_mode=self.read_mode)\ .get_all(transaction_id, index='transaction_id')) - return True if len(response.items) > 0 else False + return len(response.items) > 0 def prepare_genesis_block(self): """Prepare a genesis block.""" diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0e9f8d04..69feb705 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -42,36 +42,46 @@ class Block: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') + tx.pop('assignment_timestamp') return tx - def delete_tx(self, tx): - """Delete a transaction. - - Args: - tx (dict): the transaction to delete. - - Returns: - The transaction. - """ - self.bigchain.connection.run( - r.table('backlog') - .get(tx['id']) - .delete(durability='hard')) - - return tx - def validate_tx(self, tx): """Validate a transaction. + Also checks if the transaction already exists in the blockchain. If it + does, or it's invalid, it's deleted from the backlog immediately. + Args: tx (dict): the transaction to validate. Returns: The transaction if valid, ``None`` otherwise. """ - tx = self.bigchain.is_valid_transaction(tx) - if tx: + if self.bigchain.transaction_exists(tx['id']): + # if the transaction already exists, we must check whether + # it's in a valid or undecided block + tx, status = self.bigchain.get_transaction(tx['id'], + include_status=True) + if status == self.bigchain.TX_VALID \ + or status == self.bigchain.TX_UNDECIDED: + # if the tx is already in a valid or undecided block, + # then it no longer should be in the backlog, or added + # to a new block. We can delete and drop it. + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None + + tx_validated = self.bigchain.is_valid_transaction(tx) + if tx_validated: return tx + else: + # if the transaction is not valid, remove it from the + # backlog + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None def create(self, tx, timeout=False): """Create a block. @@ -112,6 +122,22 @@ class Block: self.bigchain.write_block(block) return block + def delete_tx(self, block): + """Delete transactions. + + Args: + block (dict): the block containg the transactions to delete. + + Returns: + The block. + """ + r.table('backlog')\ + .get_all(*[tx['id'] for tx in block['block']['transactions']])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return block + def initial(): """Return old transactions from the backlog.""" @@ -132,7 +158,8 @@ def initial(): def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) def create_pipeline(): @@ -143,10 +170,10 @@ def create_pipeline(): block_pipeline = Pipeline([ Node(block.filter_tx), - Node(block.delete_tx), Node(block.validate_tx, fraction_of_cores=1), Node(block.create, timeout=1), Node(block.write), + Node(block.delete_tx), ]) return block_pipeline diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py new file mode 100644 index 00000000..e1e14e21 --- /dev/null +++ b/bigchaindb/pipelines/stale.py @@ -0,0 +1,76 @@ +"""This module monitors for stale transactions. + +It reassigns transactions which have been assigned a node but +remain in the backlog past a certain amount of time. +""" + +import logging +from multipipes import Pipeline, Node +from bigchaindb import Bigchain +from time import sleep + + +logger = logging.getLogger(__name__) + + +class StaleTransactionMonitor: + """This class encapsulates the logic for re-assigning stale transactions. + + Note: + Methods of this class will be executed in different processes. + """ + + def __init__(self, timeout=5, backlog_reassign_delay=None): + """Initialize StaleTransaction monitor + + Args: + timeout: how often to check for stale tx (in sec) + backlog_reassign_delay: How stale a transaction should + be before reassignment (in sec). If supplied, overrides the + Bigchain default value. + """ + self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay) + self.timeout = timeout + + def check_transactions(self): + """Poll backlog for stale transactions + + Returns: + txs (list): txs to be re assigned + """ + sleep(self.timeout) + for tx in self.bigchain.get_stale_transactions(): + yield tx + + def reassign_transactions(self, tx): + """Put tx back in backlog with new assignee + + Returns: + transaction + """ + self.bigchain.reassign_transaction(tx) + return tx + + +def create_pipeline(timeout=5, backlog_reassign_delay=5): + """Create and return the pipeline of operations to be distributed + on different processes.""" + + stm = StaleTransactionMonitor(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + + monitor_pipeline = Pipeline([ + Node(stm.check_transactions), + Node(stm.reassign_transactions) + ]) + + return monitor_pipeline + + +def start(timeout=5, backlog_reassign_delay=5): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + pipeline.setup() + pipeline.start() + return pipeline diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 8f4c3b99..0a8dbcd1 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -72,5 +72,5 @@ class ChangeFeed(Node): elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change) + self.outqueue.put(change['new_val']) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 4b8aa0eb..c3625fdc 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.pipelines import vote, block, election +from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.web import server @@ -31,6 +31,9 @@ def start(): logger.info('Starting voter') vote.start() + logger.info('Starting stale transaction monitor') + stale.start() + logger.info('Starting election') election.start() diff --git a/docs/source/appendices/firewall-notes.md b/docs/source/appendices/firewall-notes.md index dca89ac6..bad09b05 100644 --- a/docs/source/appendices/firewall-notes.md +++ b/docs/source/appendices/firewall-notes.md @@ -1,6 +1,17 @@ # Notes for Firewall Setup -This is a page of notes on the ports used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (or security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide. +This is a page of notes on the ports potentially used by BigchainDB nodes and the traffic they should expect, to help with firewall setup (and security group setup on AWS). This page is _not_ a firewall tutorial or step-by-step guide. + + +## Expected Unsolicited Inbound Traffic + +Assuming you aren't exposing the RethinkDB web interface on port 8080 (or any other port, because [there are more secure ways to access it](https://www.rethinkdb.com/docs/security/#binding-the-web-interface-port)), there are only three ports that should expect unsolicited inbound traffic: + +1. **Port 22** can expect inbound SSH (TCP) traffic from the node administrator (i.e. a small set of IP addresses). +2. **Port 9984** can expect inbound HTTP (TCP) traffic from BigchainDB clients sending transactions to the BigchainDB HTTP API. +3. **Port 29015** can expect inbound TCP traffic from other RethinkDB nodes in the RethinkDB cluster (for RethinkDB intracluster communications). + +All other ports should only get inbound traffic in response to specific requests from inside the node. ## Port 22 diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7242402a..166e4929 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -132,6 +132,7 @@ class TestBigchainApi(object): response, status = b.get_transaction(tx_signed["id"], include_status=True) response.pop('assignee') + response.pop('assignment_timestamp') # add validity information, which will be returned assert util.serialize(tx_signed) == util.serialize(response) assert status == b.TX_IN_BACKLOG diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ce300730..a1ab6a19 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk): tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) tx['assignee'] = b.me + tx['assignment_timestamp'] = 111 # filter_tx has side effects on the `tx` instance by popping 'assignee' - assert block_maker.filter_tx(tx) == tx + # and 'assignment_timestamp' + filtered_tx = block_maker.filter_tx(tx) + assert filtered_tx == tx + assert 'assignee' not in filtered_tx + assert 'assignment_timestamp' not in filtered_tx tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) @@ -69,21 +74,59 @@ def test_write_block(b, user_vk): assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc +def test_duplicate_transaction(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(10): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + # block is in bigchain + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + b.write_transaction(txs[0]) + + # verify tx is in the backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None + + # try to validate a transaction that's already in the chain; should not + # work + assert block_maker.validate_tx(txs[0]) is None + + # duplicate tx should be removed from backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None + + def test_delete_tx(b, user_vk): block_maker = block.Block() - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + # make sure the tx appears in the backlog + b.write_transaction(tx) - tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) - tx_backlog.pop('assignee') - assert tx_backlog == tx + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) - returned_tx = block_maker.delete_tx(tx) + for tx in block_doc['block']['transactions']: + returned_tx = r.table('backlog').get(tx['id']).run(b.conn) + returned_tx.pop('assignee') + returned_tx.pop('assignment_timestamp') + assert returned_tx == tx - assert returned_tx == tx - assert r.table('backlog').get(tx['id']).run(b.conn) is None + returned_block = block_maker.delete_tx(block_doc) + + assert returned_block == block_doc + + for tx in block_doc['block']['transactions']: + assert r.table('backlog').get(tx['id']).run(b.conn) is None def test_prefeed(b, user_vk): @@ -115,6 +158,7 @@ def test_full_pipeline(b, user_vk): tx = b.sign_transaction(tx, b.me_private) assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) tx['assignee'] = assignee + tx['assignment_timestamp'] = time.time() if assignee == b.me: count_assigned_to_me += 1 r.table('backlog').insert(tx, durability='hard').run(b.conn) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a71714df..02a0e39d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk): e.requeue_transactions(test_block) tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx1 diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py new file mode 100644 index 00000000..f6cb4a0b --- /dev/null +++ b/tests/pipelines/test_stale_monitor.py @@ -0,0 +1,116 @@ +import rethinkdb as r +from bigchaindb import Bigchain +from bigchaindb.pipelines import stale +from multipipes import Pipe, Pipeline +from unittest.mock import patch +from bigchaindb import config_utils +import time +import os + + +def test_get_stale(b, user_vk): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + tx_stale = stm.check_transactions() + + for _tx in tx_stale: + _tx.pop('assignee') + _tx.pop('assignment_timestamp') + assert tx == _tx + + +def test_reassign_transactions(b, user_vk): + # test with single node + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.reassign_transactions(tx) + + # test with federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + + reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn) + assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] + assert reassigned_tx['assignee'] != tx['assignee'] + + # test with node not in federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx.update({'assignee': 'lol'}) + tx.update({'assignment_timestamp': time.time()}) + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol' + + +def test_full_pipeline(user_vk): + CONFIG = { + 'database': { + 'name': 'bigchain_test_{}'.format(os.getpid()) + }, + 'keypair': { + 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + }, + 'keyring': ['aaa', 'bbb'], + 'backlog_reassign_delay': 0.01 + } + config_utils.set_config(CONFIG) + b = Bigchain() + outpipe = Pipe() + + original_txs = {} + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + b.write_transaction(tx) + original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = stale.create_pipeline(backlog_reassign_delay=1, + timeout=1) + pipeline.setup(outdata=outpipe) + pipeline.start() + + # timing should be careful -- test will fail if reassignment happens multiple times + time.sleep(2) + pipeline.terminate() + + # to terminate + outpipe.get() + + assert r.table('backlog').count().run(b.conn) == 100 + reassigned_txs = list(r.table('backlog').run(b.conn)) + + # check that every assignment timestamp has increased, and every tx has a new assignee + for reassigned_tx in reassigned_txs: + assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] + assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + stale.start() + mock_start.assert_called_with() diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index 459ae7f0..66f1bdd9 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -43,8 +43,7 @@ def test_changefeed_update(mock_run): changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 @@ -55,8 +54,7 @@ def test_changefeed_multiple_operations(mock_run): changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 diff --git a/tests/test_commands.py b/tests/test_commands.py index 87b05b89..587d38f8 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch): 'keypair': {}, 'database': {'host': 'host', 'port': 12345, 'name': 'adbname'}, 'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb._config', config) diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index bce2c9a8..4d2b3a1e 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -145,7 +145,8 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch): file_config = { - 'database': {'host': 'test-host'} + 'database': {'host': 'test-host'}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config) monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname', @@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 5 } diff --git a/tests/test_core.py b/tests/test_core.py index 2650ff37..397158d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'consensus_plugin': 'default' + 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } monkeypatch.setattr('bigchaindb.config', config) diff --git a/tests/test_processes.py b/tests/test_processes.py new file mode 100644 index 00000000..7013dc03 --- /dev/null +++ b/tests/test_processes.py @@ -0,0 +1,23 @@ +from unittest.mock import patch + +from multiprocessing import Process +from bigchaindb.pipelines import vote, block, election, stale + + +@patch.object(stale, 'start') +@patch.object(election, 'start') +@patch.object(block, 'start') +@patch.object(vote, 'start') +@patch.object(Process, 'start') +def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, + mock_process): + from bigchaindb import processes + + processes.start() + + mock_vote.assert_called_with() + mock_block.assert_called_with() + mock_election.assert_called_with() + mock_stale.assert_called_with() + mock_process.assert_called_with() +