diff --git a/Dockerfile b/Dockerfile index 0fcac07f..089c3ffe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ ENV BIGCHAINDB_CONFIG_PATH /data/.bigchaindb ENV BIGCHAINDB_SERVER_BIND 0.0.0.0:9984 ENV BIGCHAINDB_API_ENDPOINT http://bigchaindb:9984/api/v1 -ENTRYPOINT ["bigchaindb", "--experimental-start-rethinkdb"] +ENTRYPOINT ["bigchaindb", "--dev-start-rethinkdb", "--dev-allow-temp-keypair"] CMD ["start"] diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index ee864ead..d5fe15e6 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -31,6 +31,7 @@ config = { }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..46622867 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False): input('Statsd {}? (default `{}`): '.format(key, val)) \ or val + val = conf['backlog_reassign_delay'] + conf['backlog_reassign_delay'] = \ + input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \ + or val + if config_path != '-': bigchaindb.config_utils.write_config(conf, config_path) else: @@ -152,8 +157,20 @@ def run_drop(args): def run_start(args): """Start the processes to run the node""" logger.info('BigchainDB Version {}'.format(bigchaindb.__version__)) + bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) + if args.allow_temp_keypair: + if not (bigchaindb.config['keypair']['private'] or + bigchaindb.config['keypair']['public']): + + private_key, public_key = crypto.generate_key_pair() + bigchaindb.config['keypair']['private'] = private_key + bigchaindb.config['keypair']['public'] = public_key + else: + logger.warning('Keypair found, no need to create one on the fly.') + + if args.start_rethinkdb: try: proc = utils.start_rethinkdb() @@ -169,7 +186,8 @@ def run_start(args): sys.exit("Can't start BigchainDB, no keypair found. " 'Did you run `bigchaindb configure`?') - logger.info('Starting BigchainDB main process') + logger.info('Starting BigchainDB main process with public key %s', + bigchaindb.config['keypair']['public']) processes.start() @@ -227,17 +245,21 @@ def run_set_replicas(args): except r.ReqlOpFailedError as e: logger.warn(e) - -def main(): +def create_parser(): parser = argparse.ArgumentParser( description='Control your BigchainDB node.', parents=[utils.base_parser]) - parser.add_argument('--experimental-start-rethinkdb', + parser.add_argument('--dev-start-rethinkdb', dest='start_rethinkdb', action='store_true', help='Run RethinkDB on start') + parser.add_argument('--dev-allow-temp-keypair', + dest='allow_temp_keypair', + action='store_true', + help='Generate a random keypair on start') + # all the commands are contained in the subparsers object, # the command selected by the user will be stored in `args.command` # that is used by the `main` function to select which other @@ -302,8 +324,8 @@ def main(): 'is set, the count is distributed equally to all the ' 'processes') - utils.start(parser, globals()) + return parser -if __name__ == '__main__': - main() +def main(): + utils.start(create_parser(), sys.argv[1:], globals()) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index dc035de6..36b78653 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -18,6 +18,11 @@ def start_rethinkdb(): """Start RethinkDB as a child process and wait for it to be available. + Args: + wait_for_db (bool): wait for the database to be ready + extra_opts (list): a list of extra options to be used when + starting the db + Raises: ``bigchaindb.exceptions.StartupError`` if RethinkDB cannot be started. @@ -33,11 +38,11 @@ def start_rethinkdb(): for line in proc.stdout: if line.startswith('Server ready'): + # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - try: conn = db.get_conn() # Before checking if the db is ready, we need to query @@ -47,7 +52,6 @@ def start_rethinkdb(): except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: raise StartupError('Error waiting for the database `{}` ' 'to be ready'.format(dbname)) from exc - return proc # We are here when we exhaust the stdout of the process. @@ -55,7 +59,7 @@ def start_rethinkdb(): raise StartupError(line) -def start(parser, scope): +def start(parser, argv, scope): """Utility function to execute a subcommand. The function will look up in the ``scope`` @@ -64,17 +68,18 @@ def start(parser, scope): Args: parser: an ArgumentParser instance. + argv: the list of command line arguments without the script name. scope (dict): map containing (eventually) the functions to be called. Raises: NotImplementedError: if ``scope`` doesn't contain a function called ``run_``. """ - args = parser.parse_args() + args = parser.parse_args(argv) if not args.command: parser.print_help() - return + raise SystemExit() # look up in the current scope for a function called 'run_' # replacing all the dashes '-' with the lowercase character '_' @@ -92,7 +97,7 @@ def start(parser, scope): elif args.multiprocess is None: args.multiprocess = mp.cpu_count() - func(args) + return func(args) base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb') diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..6a53cda6 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -2,12 +2,14 @@ import random import math import collections from copy import deepcopy +from time import time from itertools import compress import rethinkdb as r import rapidjson import bigchaindb +from bigchaindb.db.utils import Connection from bigchaindb import config_utils, crypto, exceptions, util @@ -28,7 +30,7 @@ class Bigchain(object): def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], - consensus_plugin=None): + consensus_plugin=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -56,6 +58,7 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] + self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = config_utils.load_consensus_plugin(consensus_plugin) # change RethinkDB read mode to majority. This ensures consistency in query results self.read_mode = 'majority' @@ -64,6 +67,7 @@ class Bigchain(object): raise exceptions.KeypairNotFoundException() self._conn = None + self.connection = Connection(host=self.host, port=self.port, db=self.dbname) @property def conn(self): @@ -136,11 +140,56 @@ class Bigchain(object): signed_transaction = deepcopy(signed_transaction) # update the transaction signed_transaction.update({'assignee': assignee}) + signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog - response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) + response = self.connection.run( + r.table('backlog') + .insert(signed_transaction, durability=durability)) return response + def reassign_transaction(self, transaction, durability='hard'): + """Assign a transaction to a new node + + Args: + transaction (dict): assigned transaction + + Returns: + dict: database response or None if no reassignment is possible + """ + + if self.nodes_except_me: + try: + federation_nodes = self.nodes_except_me + [self.me] + index_current_assignee = federation_nodes.index(transaction['assignee']) + new_assignee = random.choice(federation_nodes[:index_current_assignee] + + federation_nodes[index_current_assignee + 1:]) + except ValueError: + # current assignee not in federation + new_assignee = random.choice(self.nodes_except_me) + + else: + # There is no other node to assign to + new_assignee = self.me + + response = r.table('backlog')\ + .get(transaction['id'])\ + .update({'assignee': new_assignee, + 'assignment_timestamp': time()}, + durability=durability).run(self.conn) + return response + + def get_stale_transactions(self): + """Get a RethinkDB cursor of stale transactions + + Transactions are considered stale if they have been assigned a node, but are still in the + backlog after some amount of time specified in the configuration + """ + + return r.table('backlog')\ + .filter(lambda tx: time() - tx['assignment_timestamp'] > + self.backlog_reassign_delay).run(self.conn) + def get_transaction(self, txid, include_status=False): """Retrieve a transaction with `txid` from bigchain. @@ -179,13 +228,16 @@ class Bigchain(object): break # Query the transaction in the target block and return - response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\ - .get_field('block').get_field('transactions')\ - .filter(lambda tx: tx['id'] == txid).run(self.conn)[0] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(target_block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == txid))[0] else: # Otherwise, check the backlog - response = r.table('backlog').get(txid).run(self.conn) + response = self.connection.run(r.table('backlog').get(txid)) if response: tx_status = self.TX_IN_BACKLOG @@ -219,8 +271,10 @@ class Bigchain(object): A list of blocks with with only election information """ # First, get information on all blocks which contain this transaction - response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\ - .pluck('votes', 'id', {'block': ['voters']}).run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(value, index=index) + .pluck('votes', 'id', {'block': ['voters']})) return list(response) @@ -262,8 +316,8 @@ class Bigchain(object): When creating a transaction one of the optional arguments is the `payload`. The payload is a generic dict that contains information about the digital asset. - To make it easy to query the bigchain for that digital asset we create a UUID for the payload and - store it with the transaction. This makes it easy for developers to keep track of their digital + To make it easy to query the bigchain for that digital asset we create a UUID for the payload and + store it with the transaction. This makes it easy for developers to keep track of their digital assets in bigchain. Args: @@ -273,11 +327,11 @@ class Bigchain(object): A list of transactions containing that payload. If no transaction exists with that payload it returns an empty list `[]` """ - cursor = r.table('bigchain', read_mode=self.read_mode) \ - .get_all(payload_uuid, index='payload_uuid') \ - .concat_map(lambda block: block['block']['transactions']) \ - .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \ - .run(self.conn) + cursor = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(payload_uuid, index='payload_uuid') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid)) transactions = list(cursor) return transactions @@ -296,11 +350,11 @@ class Bigchain(object): """ # checks if an input was already spent # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} - response = r.table('bigchain', read_mode=self.read_mode)\ - .concat_map(lambda doc: doc['block']['transactions'])\ - .filter(lambda transaction: transaction['transaction']['fulfillments'] - .contains(lambda fulfillment: fulfillment['input'] == tx_input))\ - .run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'] + .contains(lambda fulfillment: fulfillment['input'] == tx_input))) transactions = list(response) @@ -335,12 +389,12 @@ class Bigchain(object): """ # get all transactions in which owner is in the `owners_after` list - response = r.table('bigchain', read_mode=self.read_mode) \ - .concat_map(lambda doc: doc['block']['transactions']) \ - .filter(lambda tx: tx['transaction']['conditions'] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'] .contains(lambda c: c['owners_after'] - .contains(owner))) \ - .run(self.conn) + .contains(owner)))) owned = [] for tx in response: @@ -484,8 +538,9 @@ class Bigchain(object): but the vote is invalid. """ - votes = list(r.table('votes', read_mode=self.read_mode)\ - .get_all([block['id'], self.me], index='block_and_voter').run(self.conn)) + votes = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .get_all([block['id'], self.me], index='block_and_voter'))) if len(votes) > 1: raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' @@ -526,13 +581,15 @@ class Bigchain(object): """ block_serialized = rapidjson.dumps(block) - r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) + self.connection.run( + r.table('bigchain') + .insert(r.json(block_serialized), durability=durability)) - # TODO: Decide if we need this method def transaction_exists(self, transaction_id): - response = r.table('bigchain', read_mode=self.read_mode)\ - .get_all(transaction_id, index='transaction_id').run(self.conn) - return True if len(response.items) > 0 else False + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode)\ + .get_all(transaction_id, index='transaction_id')) + return len(response.items) > 0 def prepare_genesis_block(self): """Prepare a genesis block.""" @@ -558,7 +615,9 @@ class Bigchain(object): # 2. create the block with one transaction # 3. write the block to the bigchain - blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn) + blocks_count = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .count()) if blocks_count: raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') @@ -604,30 +663,30 @@ class Bigchain(object): def write_vote(self, vote): """Write the vote to the database.""" - r.table('votes') \ - .insert(vote) \ - .run(self.conn) + self.connection.run( + r.table('votes') + .insert(vote)) def get_last_voted_block(self): """Returns the last block that this node voted on.""" try: # get the latest value for the vote timestamp (over all votes) - max_timestamp = r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['node_pubkey'] == self.me) \ - .max(r.row['vote']['timestamp']) \ - .run(self.conn)['vote']['timestamp'] + max_timestamp = self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['node_pubkey'] == self.me) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] - last_voted = list(r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['vote']['timestamp'] == max_timestamp) \ - .filter(r.row['node_pubkey'] == self.me) \ - .run(self.conn)) + last_voted = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == self.me))) except r.ReqlNonExistenceError: # return last vote if last vote exists else return Genesis block - return list(r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block) - .run(self.conn))[0] + return list(self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(util.is_genesis_block)))[0] # Now the fun starts. Since the resolution of timestamp is a second, # we might have more than one vote per timestamp. If this is the case @@ -659,19 +718,21 @@ class Bigchain(object): except KeyError: break - res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn) + res = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(last_block_id)) return res def get_unvoted_blocks(self): """Return all the blocks that has not been voted by this node.""" - unvoted = r.table('bigchain', read_mode=self.read_mode) \ - .filter(lambda block: r.table('votes', read_mode=self.read_mode) + unvoted = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(lambda block: r.table('votes', read_mode=self.read_mode) .get_all([block['id'], self.me], index='block_and_voter') - .is_empty()) \ - .order_by(r.asc(r.row['block']['timestamp'])) \ - .run(self.conn) + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) # FIXME: I (@vrde) don't like this solution. Filtering should be done at a # database level. Solving issue #444 can help untangling the situation @@ -682,9 +743,8 @@ class Bigchain(object): def block_election_status(self, block): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - votes = r.table('votes', read_mode=self.read_mode) \ - .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \ - .run(self.conn) + votes = self.connection.run(r.table('votes', read_mode=self.read_mode) + .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter')) votes = list(votes) diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 603f143c..84aabdd4 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -1,5 +1,6 @@ """Utils to initialize and drop the database.""" +import time import logging import rethinkdb as r @@ -11,6 +12,61 @@ from bigchaindb import exceptions logger = logging.getLogger(__name__) +class Connection: + """This class is a proxy to run queries against the database, + it is: + - lazy, since it creates a connection only when needed + - resilient, because before raising exceptions it tries + more times to run the query or open a connection. + """ + + def __init__(self, host=None, port=None, db=None, max_tries=3): + """Create a new Connection instance. + + Args: + host (str, optional): the host to connect to. + port (int, optional): the port to connect to. + db (str, optional): the database to use. + max_tries (int, optional): how many tries before giving up. + """ + + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.db = db or bigchaindb.config['database']['name'] + self.max_tries = max_tries + self.conn = None + + def run(self, query): + """Run a query. + + Args: + query: the RethinkDB query. + """ + + if self.conn is None: + self._connect() + + for i in range(self.max_tries): + try: + return query.run(self.conn) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + self._connect() + + def _connect(self): + for i in range(self.max_tries): + try: + self.conn = r.connect(host=self.host, port=self.port, + db=self.db) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + time.sleep(2**i) + + def get_conn(): '''Get the connection to the database.''' diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..69feb705 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -42,36 +42,46 @@ class Block: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') + tx.pop('assignment_timestamp') return tx - def delete_tx(self, tx): - """Delete a transaction. - - Args: - tx (dict): the transaction to delete. - - Returns: - The transaction. - """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) - - return tx - def validate_tx(self, tx): """Validate a transaction. + Also checks if the transaction already exists in the blockchain. If it + does, or it's invalid, it's deleted from the backlog immediately. + Args: tx (dict): the transaction to validate. Returns: The transaction if valid, ``None`` otherwise. """ - tx = self.bigchain.is_valid_transaction(tx) - if tx: + if self.bigchain.transaction_exists(tx['id']): + # if the transaction already exists, we must check whether + # it's in a valid or undecided block + tx, status = self.bigchain.get_transaction(tx['id'], + include_status=True) + if status == self.bigchain.TX_VALID \ + or status == self.bigchain.TX_UNDECIDED: + # if the tx is already in a valid or undecided block, + # then it no longer should be in the backlog, or added + # to a new block. We can delete and drop it. + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None + + tx_validated = self.bigchain.is_valid_transaction(tx) + if tx_validated: return tx + else: + # if the transaction is not valid, remove it from the + # backlog + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None def create(self, tx, timeout=False): """Create a block. @@ -112,25 +122,44 @@ class Block: self.bigchain.write_block(block) return block + def delete_tx(self, block): + """Delete transactions. + + Args: + block (dict): the block containg the transactions to delete. + + Returns: + The block. + """ + r.table('backlog')\ + .get_all(*[tx['id'] for tx in block['block']['transactions']])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return block + def initial(): """Return old transactions from the backlog.""" b = Bigchain() - rs = r.table('backlog')\ - .between([b.me, r.minval], - [b.me, r.maxval], - index='assignee__transaction_timestamp')\ - .order_by(index=r.asc('assignee__transaction_timestamp'))\ - .run(b.conn) + rs = b.connection.run( + r.table('backlog') + .between( + [b.me, r.minval], + [b.me, r.maxval], + index='assignee__transaction_timestamp') + .order_by(index=r.asc('assignee__transaction_timestamp'))) + return rs def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) def create_pipeline(): @@ -141,10 +170,10 @@ def create_pipeline(): block_pipeline = Pipeline([ Node(block.filter_tx), - Node(block.delete_tx), Node(block.validate_tx, fraction_of_cores=1), Node(block.create, timeout=1), Node(block.write), + Node(block.delete_tx), ]) return block_pipeline diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 7a0e114c..cf464e5c 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -25,9 +25,10 @@ class Election: """ Checks if block has enough invalid votes to make a decision """ - next_block = r.table('bigchain')\ - .get(next_vote['vote']['voting_for_block'])\ - .run(self.bigchain.conn) + next_block = self.bigchain.connection.run( + r.table('bigchain') + .get(next_vote['vote']['voting_for_block'])) + if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID: return next_block diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py new file mode 100644 index 00000000..e1e14e21 --- /dev/null +++ b/bigchaindb/pipelines/stale.py @@ -0,0 +1,76 @@ +"""This module monitors for stale transactions. + +It reassigns transactions which have been assigned a node but +remain in the backlog past a certain amount of time. +""" + +import logging +from multipipes import Pipeline, Node +from bigchaindb import Bigchain +from time import sleep + + +logger = logging.getLogger(__name__) + + +class StaleTransactionMonitor: + """This class encapsulates the logic for re-assigning stale transactions. + + Note: + Methods of this class will be executed in different processes. + """ + + def __init__(self, timeout=5, backlog_reassign_delay=None): + """Initialize StaleTransaction monitor + + Args: + timeout: how often to check for stale tx (in sec) + backlog_reassign_delay: How stale a transaction should + be before reassignment (in sec). If supplied, overrides the + Bigchain default value. + """ + self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay) + self.timeout = timeout + + def check_transactions(self): + """Poll backlog for stale transactions + + Returns: + txs (list): txs to be re assigned + """ + sleep(self.timeout) + for tx in self.bigchain.get_stale_transactions(): + yield tx + + def reassign_transactions(self, tx): + """Put tx back in backlog with new assignee + + Returns: + transaction + """ + self.bigchain.reassign_transaction(tx) + return tx + + +def create_pipeline(timeout=5, backlog_reassign_delay=5): + """Create and return the pipeline of operations to be distributed + on different processes.""" + + stm = StaleTransactionMonitor(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + + monitor_pipeline = Pipeline([ + Node(stm.check_transactions), + Node(stm.reassign_transactions) + ]) + + return monitor_pipeline + + +def start(timeout=5, backlog_reassign_delay=5): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + pipeline.setup() + pipeline.start() + return pipeline diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..0a8dbcd1 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -1,12 +1,17 @@ """Utility classes and functions to work with the pipelines.""" +import time import rethinkdb as r +import logging from multipipes import Node from bigchaindb import Bigchain +logger = logging.getLogger(__name__) + + class ChangeFeed(Node): """This class wraps a RethinkDB changefeed adding a `prefeed`. @@ -24,7 +29,7 @@ class ChangeFeed(Node): DELETE = 2 UPDATE = 4 - def __init__(self, table, operation, prefeed=None): + def __init__(self, table, operation, prefeed=None, bigchain=None): """Create a new RethinkDB ChangeFeed. Args: @@ -35,20 +40,29 @@ class ChangeFeed(Node): (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) prefeed (iterable): whatever set of data you want to be published first. + bigchain (``Bigchain``): the bigchain instance to use (can be None). """ super().__init__(name='changefeed') self.prefeed = prefeed if prefeed else [] self.table = table self.operation = operation - self.bigchain = Bigchain() + self.bigchain = bigchain or Bigchain() def run_forever(self): for element in self.prefeed: self.outqueue.put(element) - for change in r.table(self.table).changes().run(self.bigchain.conn): + while True: + try: + self.run_changefeed() + break + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table).changes()): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete @@ -58,5 +72,5 @@ class ChangeFeed(Node): elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change) + self.outqueue.put(change['new_val']) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 4b8aa0eb..c3625fdc 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.pipelines import vote, block, election +from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.web import server @@ -31,6 +31,9 @@ def start(): logger.info('Starting voter') vote.start() + logger.info('Starting stale transaction monitor') + stale.start() + logger.info('Starting election') election.start() diff --git a/docs/source/server-reference/bigchaindb-cli.md b/docs/source/server-reference/bigchaindb-cli.md index 15dd4475..3e11446a 100644 --- a/docs/source/server-reference/bigchaindb-cli.md +++ b/docs/source/server-reference/bigchaindb-cli.md @@ -58,8 +58,9 @@ Drop (erase) the RethinkDB database. You will be prompted to make sure. If you w ## bigchaindb start Start BigchainDB. It always begins by trying a `bigchaindb init` first. See the note in the documentation for `bigchaindb init`. -You can also use the `--experimental-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running, -e.g. `bigchaindb --experimental-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops. +You can also use the `--dev-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running, +e.g. `bigchaindb --dev-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops. +The option `--dev-allow-temp-keypair` will generate a keypair on the fly if no keypair is found, this is useful when you want to run a temporary instance of BigchainDB in a Docker container, for example. ## bigchaindb load diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7242402a..166e4929 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -132,6 +132,7 @@ class TestBigchainApi(object): response, status = b.get_transaction(tx_signed["id"], include_status=True) response.pop('assignee') + response.pop('assignment_timestamp') # add validity information, which will be returned assert util.serialize(tx_signed) == util.serialize(response) assert status == b.TX_IN_BACKLOG diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py index 0299224c..957373f9 100644 --- a/tests/db/test_utils.py +++ b/tests/db/test_utils.py @@ -4,7 +4,6 @@ import pytest import rethinkdb as r import bigchaindb -from bigchaindb import util from bigchaindb.db import utils from .conftest import setup_database as _setup_database diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ce300730..a1ab6a19 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk): tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) tx['assignee'] = b.me + tx['assignment_timestamp'] = 111 # filter_tx has side effects on the `tx` instance by popping 'assignee' - assert block_maker.filter_tx(tx) == tx + # and 'assignment_timestamp' + filtered_tx = block_maker.filter_tx(tx) + assert filtered_tx == tx + assert 'assignee' not in filtered_tx + assert 'assignment_timestamp' not in filtered_tx tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) @@ -69,21 +74,59 @@ def test_write_block(b, user_vk): assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc +def test_duplicate_transaction(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(10): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + # block is in bigchain + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + b.write_transaction(txs[0]) + + # verify tx is in the backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None + + # try to validate a transaction that's already in the chain; should not + # work + assert block_maker.validate_tx(txs[0]) is None + + # duplicate tx should be removed from backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None + + def test_delete_tx(b, user_vk): block_maker = block.Block() - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + # make sure the tx appears in the backlog + b.write_transaction(tx) - tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) - tx_backlog.pop('assignee') - assert tx_backlog == tx + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) - returned_tx = block_maker.delete_tx(tx) + for tx in block_doc['block']['transactions']: + returned_tx = r.table('backlog').get(tx['id']).run(b.conn) + returned_tx.pop('assignee') + returned_tx.pop('assignment_timestamp') + assert returned_tx == tx - assert returned_tx == tx - assert r.table('backlog').get(tx['id']).run(b.conn) is None + returned_block = block_maker.delete_tx(block_doc) + + assert returned_block == block_doc + + for tx in block_doc['block']['transactions']: + assert r.table('backlog').get(tx['id']).run(b.conn) is None def test_prefeed(b, user_vk): @@ -115,6 +158,7 @@ def test_full_pipeline(b, user_vk): tx = b.sign_transaction(tx, b.me_private) assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) tx['assignee'] = assignee + tx['assignment_timestamp'] = time.time() if assignee == b.me: count_assigned_to_me += 1 r.table('backlog').insert(tx, durability='hard').run(b.conn) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a71714df..02a0e39d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk): e.requeue_transactions(test_block) tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx1 diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py new file mode 100644 index 00000000..f6cb4a0b --- /dev/null +++ b/tests/pipelines/test_stale_monitor.py @@ -0,0 +1,116 @@ +import rethinkdb as r +from bigchaindb import Bigchain +from bigchaindb.pipelines import stale +from multipipes import Pipe, Pipeline +from unittest.mock import patch +from bigchaindb import config_utils +import time +import os + + +def test_get_stale(b, user_vk): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + tx_stale = stm.check_transactions() + + for _tx in tx_stale: + _tx.pop('assignee') + _tx.pop('assignment_timestamp') + assert tx == _tx + + +def test_reassign_transactions(b, user_vk): + # test with single node + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.reassign_transactions(tx) + + # test with federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + + reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn) + assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] + assert reassigned_tx['assignee'] != tx['assignee'] + + # test with node not in federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx.update({'assignee': 'lol'}) + tx.update({'assignment_timestamp': time.time()}) + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol' + + +def test_full_pipeline(user_vk): + CONFIG = { + 'database': { + 'name': 'bigchain_test_{}'.format(os.getpid()) + }, + 'keypair': { + 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + }, + 'keyring': ['aaa', 'bbb'], + 'backlog_reassign_delay': 0.01 + } + config_utils.set_config(CONFIG) + b = Bigchain() + outpipe = Pipe() + + original_txs = {} + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + b.write_transaction(tx) + original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = stale.create_pipeline(backlog_reassign_delay=1, + timeout=1) + pipeline.setup(outdata=outpipe) + pipeline.start() + + # timing should be careful -- test will fail if reassignment happens multiple times + time.sleep(2) + pipeline.terminate() + + # to terminate + outpipe.get() + + assert r.table('backlog').count().run(b.conn) == 100 + reassigned_txs = list(r.table('backlog').run(b.conn)) + + # check that every assignment timestamp has increased, and every tx has a new assignee + for reassigned_tx in reassigned_txs: + assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] + assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + stale.start() + mock_start.assert_called_with() diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..66f1bdd9 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -1,8 +1,7 @@ from unittest.mock import patch -import rethinkdb - from multipipes import Pipe +from bigchaindb.db.utils import Connection from bigchaindb.pipelines.utils import ChangeFeed @@ -18,7 +17,7 @@ MOCK_CHANGEFEED_DATA = [{ }] -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_insert(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT) @@ -28,7 +27,7 @@ def test_changefeed_insert(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_delete(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.DELETE) @@ -38,30 +37,28 @@ def test_changefeed_delete(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_update(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_multiple_operations(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_prefeed(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3]) diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..4efa8f96 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -57,23 +57,85 @@ def mock_bigchaindb_backup_config(monkeypatch): 'keypair': {}, 'database': {'host': 'host', 'port': 12345, 'name': 'adbname'}, 'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb._config', config) +def test_make_sure_we_dont_remove_any_command(): + # thanks to: http://stackoverflow.com/a/18161115/597097 + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + assert parser.parse_args(['configure']).command + assert parser.parse_args(['show-config']).command + assert parser.parse_args(['export-my-pubkey']).command + assert parser.parse_args(['init']).command + assert parser.parse_args(['drop']).command + assert parser.parse_args(['start']).command + assert parser.parse_args(['set-shards', '1']).command + assert parser.parse_args(['set-replicas', '1']).command + assert parser.parse_args(['load']).command + + +def test_start_raises_if_command_not_implemented(): + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + with pytest.raises(NotImplementedError): + # Will raise because `scope`, the third parameter, + # doesn't contain the function `run_configure` + utils.start(parser, ['configure'], {}) + + +def test_start_raises_if_no_arguments_given(): + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + with pytest.raises(SystemExit): + utils.start(parser, [], {}) + + +@patch('multiprocessing.cpu_count', return_value=42) +def test_start_sets_multiprocess_var_based_on_cli_args(mock_cpu_count): + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + def run_load(args): + return args + + parser = create_parser() + + assert utils.start(parser, ['load'], {'run_load': run_load}).multiprocess == 1 + assert utils.start(parser, ['load', '--multiprocess'], {'run_load': run_load}).multiprocess == 42 + + +@patch('bigchaindb.commands.utils.start') +def test_main_entrypoint(mock_start): + from bigchaindb.commands.bigchain import main + main() + + assert mock_start.called + + def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_init_with_existing_db): from bigchaindb.commands.bigchain import run_start - args = Namespace(start_rethinkdb=False, config=None, yes=True) + args = Namespace(start_rethinkdb=False, allow_temp_keypair=False, config=None, yes=True) run_start(args) -@patch('bigchaindb.commands.utils.start_rethinkdb') +@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=Mock()) def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_run_configure, mock_processes_start, mock_db_init_with_existing_db): from bigchaindb.commands.bigchain import run_start - args = Namespace(start_rethinkdb=True, config=None, yes=True) + args = Namespace(start_rethinkdb=True, allow_temp_keypair=False, config=None, yes=True) run_start(args) mock_start_rethinkdb.assert_called_with() @@ -213,7 +275,9 @@ def test_run_configure_when_config_does_exist(monkeypatch, @patch('subprocess.Popen') def test_start_rethinkdb_returns_a_process_when_successful(mock_popen): from bigchaindb.commands import utils - mock_popen.return_value = Mock(stdout=['Server ready']) + mock_popen.return_value = Mock(stdout=[ + 'Listening for client driver 1234', + 'Server ready']) assert utils.start_rethinkdb() is mock_popen.return_value @@ -226,6 +290,45 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() +@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key', + 'public_key')) +def test_allow_temp_keypair_generates_one_on_the_fly(mock_gen_keypair, + mock_processes_start, + mock_db_init_with_existing_db): + import bigchaindb + from bigchaindb.commands.bigchain import run_start + + bigchaindb.config['keypair'] = { 'private': None, 'public': None } + + args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True) + run_start(args) + + assert bigchaindb.config['keypair']['private'] == 'private_key' + assert bigchaindb.config['keypair']['public'] == 'public_key' + + +@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key', + 'public_key')) +def test_allow_temp_keypair_doesnt_override_if_keypair_found(mock_gen_keypair, + mock_processes_start, + mock_db_init_with_existing_db): + import bigchaindb + from bigchaindb.commands.bigchain import run_start + + # Preconditions for the test + original_private_key = bigchaindb.config['keypair']['private'] + original_public_key = bigchaindb.config['keypair']['public'] + + assert isinstance(original_public_key, str) + assert isinstance(original_private_key, str) + + args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True) + run_start(args) + + assert bigchaindb.config['keypair']['private'] == original_private_key + assert bigchaindb.config['keypair']['public'] == original_public_key + + @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index bce2c9a8..4d2b3a1e 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -145,7 +145,8 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch): file_config = { - 'database': {'host': 'test-host'} + 'database': {'host': 'test-host'}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config) monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname', @@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 5 } diff --git a/tests/test_core.py b/tests/test_core.py index 2650ff37..397158d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'consensus_plugin': 'default' + 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } monkeypatch.setattr('bigchaindb.config', config) diff --git a/tests/test_processes.py b/tests/test_processes.py new file mode 100644 index 00000000..7013dc03 --- /dev/null +++ b/tests/test_processes.py @@ -0,0 +1,23 @@ +from unittest.mock import patch + +from multiprocessing import Process +from bigchaindb.pipelines import vote, block, election, stale + + +@patch.object(stale, 'start') +@patch.object(election, 'start') +@patch.object(block, 'start') +@patch.object(vote, 'start') +@patch.object(Process, 'start') +def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, + mock_process): + from bigchaindb import processes + + processes.start() + + mock_vote.assert_called_with() + mock_block.assert_called_with() + mock_election.assert_called_with() + mock_stale.assert_called_with() + mock_process.assert_called_with() + diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py new file mode 100644 index 00000000..f81bd232 --- /dev/null +++ b/tests/test_run_query_util.py @@ -0,0 +1,105 @@ +from threading import Thread +import pytest + +import rethinkdb as r + +from bigchaindb.db.utils import Connection + + +def test_run_a_simple_query(): + conn = Connection() + query = r.expr('1') + assert conn.run(query) == '1' + + +def test_raise_exception_when_max_tries(): + class MockQuery: + def run(self, conn): + raise r.ReqlDriverError('mock') + + conn = Connection() + + with pytest.raises(r.ReqlDriverError): + conn.run(MockQuery()) + + +def test_reconnect_when_connection_lost(): + import time + import rethinkdb as r + + def raise_exception(*args, **kwargs): + raise r.ReqlDriverError('mock') + + conn = Connection() + original_connect = r.connect + r.connect = raise_exception + + def delayed_start(): + time.sleep(1) + r.connect = original_connect + + thread = Thread(target=delayed_start) + query = r.expr('1') + thread.start() + assert conn.run(query) == '1' + + +def test_changefeed_reconnects_when_connection_lost(monkeypatch): + import time + import multiprocessing as mp + + from bigchaindb import Bigchain + from bigchaindb.pipelines.utils import ChangeFeed + + class MockConnection: + tries = 0 + + def run(self, *args, **kwargs): + return self + + def __iter__(self): + return self + + def __next__(self): + self.tries += 1 + if self.tries == 1: + raise r.ReqlDriverError('mock') + elif self.tries == 2: + return { 'new_val': { 'fact': 'A group of cats is called a clowder.' }, + 'old_val': None } + if self.tries == 3: + raise r.ReqlDriverError('mock') + elif self.tries == 4: + return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' }, + 'old_val': None } + else: + time.sleep(10) + + + bigchain = Bigchain() + bigchain.connection = MockConnection() + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, + bigchain=bigchain) + changefeed.outqueue = mp.Queue() + t_changefeed = Thread(target=changefeed.run_forever, daemon=True) + + t_changefeed.start() + time.sleep(1) + # try 1: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. + + # try 2: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. + fact = changefeed.outqueue.get()['fact'] + assert fact == 'A group of cats is called a clowder.' + + # try 3: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. + assert t_changefeed.is_alive() is True + + time.sleep(2) + # try 4: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. + + fact = changefeed.outqueue.get()['fact'] + assert fact == 'Cats sleep 70% of their lives.'