diff --git a/bigchaindb/backend/connection.py b/bigchaindb/backend/connection.py index 629b6d8b..df21321d 100644 --- a/bigchaindb/backend/connection.py +++ b/bigchaindb/backend/connection.py @@ -32,6 +32,7 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None): based on the given (or defaulted) :attr:`backend`. Raises: + :exc:`~ConnectionError`: If the connection to the database fails. :exc:`~ConfigurationError`: If the given (or defaulted) :attr:`backend` is not supported or could not be loaded. """ @@ -83,6 +84,13 @@ class Connection: Args: query: the query to run + Raises: + :exc:`~DuplicateKeyError`: If the query fails because of a + duplicate key constraint. + :exc:`~OperationFailure`: If the query fails for any other + reason. + :exc:`~ConnectionError`: If the connection to the database + fails. """ raise NotImplementedError() diff --git a/bigchaindb/backend/exceptions.py b/bigchaindb/backend/exceptions.py index 1d55bc52..017e19e4 100644 --- a/bigchaindb/backend/exceptions.py +++ b/bigchaindb/backend/exceptions.py @@ -1,5 +1,17 @@ from bigchaindb.exceptions import BigchainDBError -class DatabaseOpFailedError(BigchainDBError): - """Exception for database operation errors.""" +class BackendError(BigchainDBError): + """Top level exception for any backend exception.""" + + +class ConnectionError(BackendError): + """Exception raised when the connection to the backend fails.""" + + +class OperationError(BackendError): + """Exception raised when a backend operation fails.""" + + +class DuplicateKeyError(OperationError): + """Exception raised when an insert fails because the key is not unique""" diff --git a/bigchaindb/backend/mongodb/admin.py b/bigchaindb/backend/mongodb/admin.py index 7d72c3a4..4cfc3793 100644 --- a/bigchaindb/backend/mongodb/admin.py +++ b/bigchaindb/backend/mongodb/admin.py @@ -5,7 +5,7 @@ from pymongo.errors import OperationFailure from bigchaindb.backend import admin from bigchaindb.backend.utils import module_dispatch_registrar -from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.mongodb.connection import MongoDBConnection logger = logging.getLogger(__name__) @@ -24,7 +24,7 @@ def add_replicas(connection, replicas): form "hostname:port". Raises: - DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB + OperationError: If the reconfiguration fails due to a MongoDB :exc:`OperationFailure` """ # get current configuration @@ -50,7 +50,7 @@ def add_replicas(connection, replicas): try: connection.conn.admin.command('replSetReconfig', conf['config']) except OperationFailure as exc: - raise DatabaseOpFailedError(exc.details['errmsg']) + raise OperationError(exc.details['errmsg']) @register_admin(MongoDBConnection) @@ -64,7 +64,7 @@ def remove_replicas(connection, replicas): form "hostname:port". Raises: - DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB + OperationError: If the reconfiguration fails due to a MongoDB :exc:`OperationFailure` """ # get the current configuration @@ -83,4 +83,4 @@ def remove_replicas(connection, replicas): try: connection.conn.admin.command('replSetReconfig', conf['config']) except OperationFailure as exc: - raise DatabaseOpFailedError(exc.details['errmsg']) + raise OperationError(exc.details['errmsg']) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index c54bd5da..4a5a5b7e 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -2,12 +2,12 @@ import logging import time import pymongo -from pymongo import errors from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.mongodb.connection import MongoDBConnection +from bigchaindb.backend.exceptions import BackendError logger = logging.getLogger(__name__) @@ -27,12 +27,16 @@ class MongoDBChangeFeed(ChangeFeed): while True: try: + # XXX: hack to force reconnection. Why? Because the cursor + # in `run_changefeed` does not run in the context of a + # Connection object, so if the connection is lost we need + # to manually reset the connection to None. + # See #1154 + self.connection.connection = None self.run_changefeed() break - except (errors.ConnectionFailure, errors.OperationFailure, - errors.AutoReconnect, - errors.ServerSelectionTimeoutError) as exc: - logger.exception(exc) + except (BackendError, pymongo.errors.ConnectionFailure): + logger.exception('Error connecting to the database, retrying') time.sleep(1) def run_changefeed(self): @@ -41,17 +45,19 @@ class MongoDBChangeFeed(ChangeFeed): namespace = '{}.{}'.format(dbname, table) # last timestamp in the oplog. We only care for operations happening # in the future. - last_ts = self.connection.conn.local.oplog.rs.find()\ - .sort('$natural', pymongo.DESCENDING).limit(1)\ - .next()['ts'] + last_ts = self.connection.run( + self.connection.query().local.oplog.rs.find() + .sort('$natural', pymongo.DESCENDING).limit(1) + .next()['ts']) # tailable cursor. A tailable cursor will remain open even after the # last result was returned. ``TAILABLE_AWAIT`` will block for some # timeout after the last result was returned. If no result is received # in the meantime it will raise a StopIteration excetiption. - cursor = self.connection.conn.local.oplog.rs.find( - {'ns': namespace, 'ts': {'$gt': last_ts}}, - cursor_type=pymongo.CursorType.TAILABLE_AWAIT - ) + cursor = self.connection.run( + self.connection.query().local.oplog.rs.find( + {'ns': namespace, 'ts': {'$gt': last_ts}}, + cursor_type=pymongo.CursorType.TAILABLE_AWAIT + )) while cursor.alive: try: diff --git a/bigchaindb/backend/mongodb/connection.py b/bigchaindb/backend/mongodb/connection.py index 43e92dd0..d01d5861 100644 --- a/bigchaindb/backend/mongodb/connection.py +++ b/bigchaindb/backend/mongodb/connection.py @@ -1,27 +1,42 @@ import time import logging +from itertools import repeat -from pymongo import MongoClient -from pymongo import errors +import pymongo import bigchaindb -from bigchaindb.common import exceptions +from bigchaindb.utils import Lazy +from bigchaindb.common.exceptions import ConfigurationError +from bigchaindb.backend.exceptions import (DuplicateKeyError, + OperationError, + ConnectionError) from bigchaindb.backend.connection import Connection logger = logging.getLogger(__name__) +# TODO: waiting for #1082 to be merged +# to move this constants in the configuration. + +CONNECTION_TIMEOUT = 4000 # in milliseconds +MAX_RETRIES = 3 # number of tries before giving up, if 0 then try forever + + class MongoDBConnection(Connection): - def __init__(self, host=None, port=None, dbname=None, max_tries=3, - replicaset=None, **kwargs): + def __init__(self, host=None, port=None, dbname=None, + connection_timeout=None, max_tries=None, + replicaset=None): """Create a new Connection instance. Args: host (str, optional): the host to connect to. port (int, optional): the port to connect to. dbname (str, optional): the database to use. - max_tries (int, optional): how many tries before giving up. + connection_timeout (int, optional): the milliseconds to wait + until timing out the database connection attempt. + max_tries (int, optional): how many tries before giving up, + if 0 then try forever. replicaset (str, optional): the name of the replica set to connect to. """ @@ -30,7 +45,9 @@ class MongoDBConnection(Connection): self.port = port or bigchaindb.config['database']['port'] self.replicaset = replicaset or bigchaindb.config['database']['replicaset'] self.dbname = dbname or bigchaindb.config['database']['name'] - self.max_tries = max_tries + self.connection_timeout = connection_timeout if connection_timeout is not None else CONNECTION_TIMEOUT + self.max_tries = max_tries if max_tries is not None else MAX_RETRIES + self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0) self.connection = None @property @@ -43,32 +60,80 @@ class MongoDBConnection(Connection): def db(self): return self.conn[self.dbname] - def _connect(self): - # we should only return a connection if the replica set is - # initialized. initialize_replica_set will check if the - # replica set is initialized else it will initialize it. - initialize_replica_set() + def query(self): + return Lazy() - for i in range(self.max_tries): + def collection(self, name): + """Return a lazy object that can be used to compose a query. + + Args: + name (str): the name of the collection to query. + """ + return self.query()[self.dbname][name] + + def run(self, query): + try: try: - self.connection = MongoClient(self.host, self.port, - replicaset=self.replicaset) - except errors.ConnectionFailure: - if i + 1 == self.max_tries: - raise - else: - time.sleep(2**i) + return query.run(self.conn) + except pymongo.errors.AutoReconnect as exc: + logger.warning('Lost connection to the database, ' + 'retrying query.') + return query.run(self.conn) + except pymongo.errors.AutoReconnect as exc: + raise ConnectionError from exc + except pymongo.errors.DuplicateKeyError as exc: + raise DuplicateKeyError from exc + except pymongo.errors.OperationFailure as exc: + raise OperationError from exc + + def _connect(self): + """Try to connect to the database. + + Raises: + :exc:`~ConnectionError`: If the connection to the database + fails. + """ + + attempt = 0 + for i in self.max_tries_counter: + attempt += 1 + + try: + # we should only return a connection if the replica set is + # initialized. initialize_replica_set will check if the + # replica set is initialized else it will initialize it. + initialize_replica_set(self.host, self.port, self.connection_timeout) + + # FYI: this might raise a `ServerSelectionTimeoutError`, + # that is a subclass of `ConnectionFailure`. + self.connection = pymongo.MongoClient(self.host, + self.port, + replicaset=self.replicaset, + serverselectiontimeoutms=self.connection_timeout) + + # `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`. + except (pymongo.errors.ConnectionFailure, + pymongo.errors.OperationFailure) as exc: + logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.', + attempt, self.max_tries if self.max_tries != 0 else '∞', + self.host, self.port, self.connection_timeout) + if attempt == self.max_tries: + logger.critical('Cannot connect to the Database. Giving up.') + raise ConnectionError() from exc + else: + break -def initialize_replica_set(): +def initialize_replica_set(host, port, connection_timeout): """Initialize a replica set. If already initialized skip.""" # Setup a MongoDB connection # The reason we do this instead of `backend.connect` is that # `backend.connect` will connect you to a replica set but this fails if # you try to connect to a replica set that is not yet initialized - conn = MongoClient(host=bigchaindb.config['database']['host'], - port=bigchaindb.config['database']['port']) + conn = pymongo.MongoClient(host=host, + port=port, + serverselectiontimeoutms=connection_timeout) _check_replica_set(conn) host = '{}:{}'.format(bigchaindb.config['database']['host'], bigchaindb.config['database']['port']) @@ -77,7 +142,7 @@ def initialize_replica_set(): try: conn.admin.command('replSetInitiate', config) - except errors.OperationFailure as exc_info: + except pymongo.errors.OperationFailure as exc_info: if exc_info.details['codeName'] == 'AlreadyInitialized': return raise @@ -105,17 +170,16 @@ def _check_replica_set(conn): repl_opts = options['parsed']['replication'] repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet'] except KeyError: - raise exceptions.ConfigurationError('mongod was not started with' - ' the replSet option.') + raise ConfigurationError('mongod was not started with' + ' the replSet option.') bdb_repl_set_name = bigchaindb.config['database']['replicaset'] if repl_set_name != bdb_repl_set_name: - raise exceptions.ConfigurationError('The replicaset configuration of ' - 'bigchaindb (`{}`) needs to match ' - 'the replica set name from MongoDB' - ' (`{}`)' - .format(bdb_repl_set_name, - repl_set_name)) + raise ConfigurationError('The replicaset configuration of ' + 'bigchaindb (`{}`) needs to match ' + 'the replica set name from MongoDB' + ' (`{}`)'.format(bdb_repl_set_name, + repl_set_name)) def _wait_for_replica_set_initialization(conn): diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 2e4dff51..6a241a78 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -3,11 +3,11 @@ from time import time from pymongo import ReturnDocument -from pymongo import errors from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError from bigchaindb.common.transaction import Transaction +from bigchaindb.backend.exceptions import DuplicateKeyError from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.mongodb.connection import MongoDBConnection @@ -18,8 +18,10 @@ register_query = module_dispatch_registrar(backend.query) @register_query(MongoDBConnection) def write_transaction(conn, signed_transaction): try: - return conn.db['backlog'].insert_one(signed_transaction) - except errors.DuplicateKeyError: + return conn.run( + conn.collection('backlog') + .insert_one(signed_transaction)) + except DuplicateKeyError: return @@ -27,40 +29,49 @@ def write_transaction(conn, signed_transaction): def update_transaction(conn, transaction_id, doc): # with mongodb we need to add update operators to the doc doc = {'$set': doc} - return conn.db['backlog']\ - .find_one_and_update({'id': transaction_id}, - doc, - return_document=ReturnDocument.AFTER) + return conn.run( + conn.collection('backlog') + .find_one_and_update( + {'id': transaction_id}, + doc, + return_document=ReturnDocument.AFTER)) @register_query(MongoDBConnection) def delete_transaction(conn, *transaction_id): - return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}}) + return conn.run( + conn.collection('backlog') + .delete_many({'id': {'$in': transaction_id}})) @register_query(MongoDBConnection) def get_stale_transactions(conn, reassign_delay): - return conn.db['backlog']\ - .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, - projection={'_id': False}) + return conn.run( + conn.collection('backlog') + .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, + projection={'_id': False})) @register_query(MongoDBConnection) def get_transaction_from_block(conn, transaction_id, block_id): try: - return conn.db['bigchain'].aggregate([ - {'$match': {'id': block_id}}, - {'$project': { - 'block.transactions': { - '$filter': { - 'input': '$block.transactions', - 'as': 'transaction', - 'cond': { - '$eq': ['$$transaction.id', transaction_id] + return conn.run( + conn.collection('bigchain') + .aggregate([ + {'$match': {'id': block_id}}, + {'$project': { + 'block.transactions': { + '$filter': { + 'input': '$block.transactions', + 'as': 'transaction', + 'cond': { + '$eq': ['$$transaction.id', transaction_id] + } + } } - } - } - }}]).next()['block']['transactions'].pop() + }}]) + .next()['block']['transactions'] + .pop()) except (StopIteration, IndexError): # StopIteration is raised if the block was not found # IndexError is returned if the block is found but no transactions @@ -70,17 +81,20 @@ def get_transaction_from_block(conn, transaction_id, block_id): @register_query(MongoDBConnection) def get_transaction_from_backlog(conn, transaction_id): - return conn.db['backlog']\ - .find_one({'id': transaction_id}, - projection={'_id': False, 'assignee': False, - 'assignment_timestamp': False}) + return conn.run( + conn.collection('backlog') + .find_one({'id': transaction_id}, + projection={'_id': False, + 'assignee': False, + 'assignment_timestamp': False})) @register_query(MongoDBConnection) def get_blocks_status_from_transaction(conn, transaction_id): - return conn.db['bigchain']\ - .find({'block.transactions.id': transaction_id}, - projection=['id', 'block.voters']) + return conn.run( + conn.collection('bigchain') + .find({'block.transactions.id': transaction_id}, + projection=['id', 'block.voters'])) @register_query(MongoDBConnection) @@ -107,24 +121,28 @@ def get_txids_filtered(conn, asset_id, operation=None): {'$match': match}, {'$project': {'block.transactions.id': True}} ] - cursor = conn.db['bigchain'].aggregate(pipeline) + cursor = conn.run( + conn.collection('bigchain') + .aggregate(pipeline)) return (elem['block']['transactions']['id'] for elem in cursor) @register_query(MongoDBConnection) def get_asset_by_id(conn, asset_id): - cursor = conn.db['bigchain'].aggregate([ - {'$match': { - 'block.transactions.id': asset_id, - 'block.transactions.operation': 'CREATE' - }}, - {'$unwind': '$block.transactions'}, - {'$match': { - 'block.transactions.id': asset_id, - 'block.transactions.operation': 'CREATE' - }}, - {'$project': {'block.transactions.asset': True}} - ]) + cursor = conn.run( + conn.collection('bigchain') + .aggregate([ + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$project': {'block.transactions.asset': True}} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -132,17 +150,18 @@ def get_asset_by_id(conn, asset_id): @register_query(MongoDBConnection) def get_spent(conn, transaction_id, output): - cursor = conn.db['bigchain'].aggregate([ - {'$match': { - 'block.transactions.inputs.fulfills.txid': transaction_id, - 'block.transactions.inputs.fulfills.output': output - }}, - {'$unwind': '$block.transactions'}, - {'$match': { - 'block.transactions.inputs.fulfills.txid': transaction_id, - 'block.transactions.inputs.fulfills.output': output - }} - ]) + cursor = conn.run( + conn.collection('bigchain').aggregate([ + {'$match': { + 'block.transactions.inputs.fulfills.txid': transaction_id, + 'block.transactions.inputs.fulfills.output': output + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.inputs.fulfills.txid': transaction_id, + 'block.transactions.inputs.fulfills.output': output + }} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -150,11 +169,12 @@ def get_spent(conn, transaction_id, output): @register_query(MongoDBConnection) def get_owned_ids(conn, owner): - cursor = conn.db['bigchain'].aggregate([ - {'$match': {'block.transactions.outputs.public_keys': owner}}, - {'$unwind': '$block.transactions'}, - {'$match': {'block.transactions.outputs.public_keys': owner}} - ]) + cursor = conn.run( + conn.collection('bigchain').aggregate([ + {'$match': {'block.transactions.outputs.public_keys': owner}}, + {'$unwind': '$block.transactions'}, + {'$match': {'block.transactions.outputs.public_keys': owner}} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -162,66 +182,80 @@ def get_owned_ids(conn, owner): @register_query(MongoDBConnection) def get_votes_by_block_id(conn, block_id): - return conn.db['votes']\ - .find({'vote.voting_for_block': block_id}, - projection={'_id': False}) + return conn.run( + conn.collection('votes') + .find({'vote.voting_for_block': block_id}, + projection={'_id': False})) @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): - return conn.db['votes']\ - .find({'vote.voting_for_block': block_id, - 'node_pubkey': node_pubkey}, - projection={'_id': False}) + return conn.run( + conn.collection('votes') + .find({'vote.voting_for_block': block_id, + 'node_pubkey': node_pubkey}, + projection={'_id': False})) @register_query(MongoDBConnection) def write_block(conn, block): - return conn.db['bigchain'].insert_one(block.to_dict()) + return conn.run( + conn.collection('bigchain') + .insert_one(block.to_dict())) @register_query(MongoDBConnection) def get_block(conn, block_id): - return conn.db['bigchain'].find_one({'id': block_id}, - projection={'_id': False}) + return conn.run( + conn.collection('bigchain') + .find_one({'id': block_id}, + projection={'_id': False})) @register_query(MongoDBConnection) def has_transaction(conn, transaction_id): - return bool(conn.db['bigchain'] - .find_one({'block.transactions.id': transaction_id})) + return bool(conn.run( + conn.collection('bigchain') + .find_one({'block.transactions.id': transaction_id}))) @register_query(MongoDBConnection) def count_blocks(conn): - return conn.db['bigchain'].count() + return conn.run( + conn.collection('bigchain') + .count()) @register_query(MongoDBConnection) def count_backlog(conn): - return conn.db['backlog'].count() + return conn.run( + conn.collection('backlog') + .count()) @register_query(MongoDBConnection) def write_vote(conn, vote): - conn.db['votes'].insert_one(vote) + conn.run(conn.collection('votes').insert_one(vote)) vote.pop('_id') return vote @register_query(MongoDBConnection) def get_genesis_block(conn): - return conn.db['bigchain'].find_one( - {'block.transactions.0.operation': 'GENESIS'}, - {'_id': False} - ) + return conn.run( + conn.collection('bigchain') + .find_one( + {'block.transactions.0.operation': 'GENESIS'}, + {'_id': False} + )) @register_query(MongoDBConnection) def get_last_voted_block(conn, node_pubkey): - last_voted = conn.db['votes']\ - .find({'node_pubkey': node_pubkey}, - sort=[('vote.timestamp', -1)]) + last_voted = conn.run( + conn.collection('votes') + .find({'node_pubkey': node_pubkey}, + sort=[('vote.timestamp', -1)])) # pymongo seems to return a cursor even if there are no results # so we actually need to check the count @@ -249,18 +283,20 @@ def get_last_voted_block(conn, node_pubkey): @register_query(MongoDBConnection) def get_unvoted_blocks(conn, node_pubkey): - return conn.db['bigchain'].aggregate([ - {'$lookup': { - 'from': 'votes', - 'localField': 'id', - 'foreignField': 'vote.voting_for_block', - 'as': 'votes' - }}, - {'$match': { - 'votes.node_pubkey': {'$ne': node_pubkey}, - 'block.transactions.operation': {'$ne': 'GENESIS'} - }}, - {'$project': { - 'votes': False, '_id': False - }} - ]) + return conn.run( + conn.collection('bigchain') + .aggregate([ + {'$lookup': { + 'from': 'votes', + 'localField': 'id', + 'foreignField': 'vote.voting_for_block', + 'as': 'votes' + }}, + {'$match': { + 'votes.node_pubkey': {'$ne': node_pubkey}, + 'block.transactions.operation': {'$ne': 'GENESIS'} + }}, + {'$project': { + 'votes': False, '_id': False + }} + ])) diff --git a/bigchaindb/backend/rethinkdb/admin.py b/bigchaindb/backend/rethinkdb/admin.py index 63548f87..23b55048 100644 --- a/bigchaindb/backend/rethinkdb/admin.py +++ b/bigchaindb/backend/rethinkdb/admin.py @@ -5,7 +5,7 @@ import rethinkdb as r from bigchaindb.backend import admin from bigchaindb.backend.schema import TABLES -from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection @@ -78,7 +78,7 @@ def reconfigure(connection, *, table, shards, replicas, `_. Raises: - DatabaseOpFailedError: If the reconfiguration fails due to a + OperationError: If the reconfiguration fails due to a RethinkDB :exc:`ReqlOpFailedError` or :exc:`ReqlQueryLogicError`. @@ -96,7 +96,7 @@ def reconfigure(connection, *, table, shards, replicas, try: return connection.run(r.table(table).reconfigure(**params)) except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e: - raise DatabaseOpFailedError from e + raise OperationError from e @register_admin(RethinkDBConnection) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 70836f3c..c118f857 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -24,7 +24,7 @@ from bigchaindb import backend from bigchaindb.backend import schema from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas, remove_replicas) -from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.exceptions import OperationError from bigchaindb.commands import utils from bigchaindb import processes @@ -247,7 +247,7 @@ def run_set_shards(args): conn = backend.connect() try: set_shards(conn, shards=args.num_shards) - except DatabaseOpFailedError as e: + except OperationError as e: logger.warn(e) @@ -255,7 +255,7 @@ def run_set_replicas(args): conn = backend.connect() try: set_replicas(conn, replicas=args.num_replicas) - except DatabaseOpFailedError as e: + except OperationError as e: logger.warn(e) @@ -266,7 +266,7 @@ def run_add_replicas(args): try: add_replicas(conn, args.replicas) - except (DatabaseOpFailedError, NotImplementedError) as e: + except (OperationError, NotImplementedError) as e: logger.warn(e) else: logger.info('Added {} to the replicaset.'.format(args.replicas)) @@ -279,7 +279,7 @@ def run_remove_replicas(args): try: remove_replicas(conn, args.replicas) - except (DatabaseOpFailedError, NotImplementedError) as e: + except (OperationError, NotImplementedError) as e: logger.warn(e) else: logger.info('Removed {} from the replicaset.'.format(args.replicas)) diff --git a/bigchaindb/utils.py b/bigchaindb/utils.py index 434c6376..1860dd3e 100644 --- a/bigchaindb/utils.py +++ b/bigchaindb/utils.py @@ -157,3 +157,49 @@ def is_genesis_block(block): return block.transactions[0].operation == 'GENESIS' except AttributeError: return block['block']['transactions'][0]['operation'] == 'GENESIS' + + +class Lazy: + """Lazy objects are useful to create chains of methods to + execute later. + + A lazy object records the methods that has been called, and + replay them when the :py:meth:`run` method is called. Note that + :py:meth:`run` needs an object `instance` to replay all the + methods that have been recorded. + """ + + def __init__(self): + """Instantiate a new Lazy object.""" + self.stack = [] + + def __getattr__(self, name): + self.stack.append(name) + return self + + def __call__(self, *args, **kwargs): + self.stack.append((args, kwargs)) + return self + + def __getitem__(self, key): + self.stack.append('__getitem__') + self.stack.append(([key], {})) + return self + + def run(self, instance): + """Run the recorded chain of methods on `instance`. + + Args: + instance: an object. + """ + + last = instance + + for item in self.stack: + if isinstance(item, str): + last = getattr(last, item) + else: + last = last(*item[0], **item[1]) + + self.stack = [] + return last diff --git a/tests/backend/mongodb/test_admin.py b/tests/backend/mongodb/test_admin.py index a7784369..148c853a 100644 --- a/tests/backend/mongodb/test_admin.py +++ b/tests/backend/mongodb/test_admin.py @@ -64,14 +64,14 @@ def test_add_replicas(mock_replicaset_config, connection): def test_add_replicas_raises(mock_replicaset_config, connection): from bigchaindb.backend.admin import add_replicas - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError with mock.patch.object(Database, 'command') as mock_command: mock_command.side_effect = [ mock_replicaset_config, OperationFailure(error=1, details={'errmsg': ''}) ] - with pytest.raises(DatabaseOpFailedError): + with pytest.raises(OperationError): add_replicas(connection, ['localhost:27018']) @@ -97,12 +97,12 @@ def test_remove_replicas(mock_replicaset_config, connection): def test_remove_replicas_raises(mock_replicaset_config, connection): from bigchaindb.backend.admin import remove_replicas - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError with mock.patch.object(Database, 'command') as mock_command: mock_command.side_effect = [ mock_replicaset_config, OperationFailure(error=1, details={'errmsg': ''}) ] - with pytest.raises(DatabaseOpFailedError): + with pytest.raises(OperationError): remove_replicas(connection, ['localhost:27018']) diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index e7581b34..67b54cd8 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -1,7 +1,6 @@ from unittest import mock import pytest -from pymongo.errors import ConnectionFailure from multipipes import Pipe @@ -151,15 +150,15 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb -@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) @mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa -def test_connection_failure(mock_run_changefeed, mock_cursor_alive): +def test_connection_failure(mock_run_changefeed): from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.exceptions import ConnectionError from bigchaindb.backend.changefeed import ChangeFeed conn = connect() - mock_cursor_alive.return_value = False - mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT] + mock_run_changefeed.side_effect = [ConnectionError(), + mock.DEFAULT] changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT) changefeed.run_forever() diff --git a/tests/backend/mongodb/test_connection.py b/tests/backend/mongodb/test_connection.py index 4c2919fe..786b7d7b 100644 --- a/tests/backend/mongodb/test_connection.py +++ b/tests/backend/mongodb/test_connection.py @@ -1,9 +1,9 @@ from unittest import mock import pytest +import pymongo from pymongo import MongoClient from pymongo.database import Database -from pymongo.errors import ConnectionFailure, OperationFailure pytestmark = pytest.mark.bdb @@ -56,19 +56,49 @@ def test_get_connection_returns_the_correct_instance(): @mock.patch('time.sleep') def test_connection_error(mock_sleep, mock_client, mock_init_repl_set): from bigchaindb.backend import connect + from bigchaindb.backend.exceptions import ConnectionError - # force the driver to trow ConnectionFailure + # force the driver to throw ConnectionFailure # the mock on time.sleep is to prevent the actual sleep when running # the tests - mock_client.side_effect = ConnectionFailure() + mock_client.side_effect = pymongo.errors.ConnectionFailure() - with pytest.raises(ConnectionFailure): + with pytest.raises(ConnectionError): conn = connect() conn.db assert mock_client.call_count == 3 +@mock.patch('bigchaindb.backend.mongodb.connection.initialize_replica_set') +@mock.patch('pymongo.MongoClient') +def test_connection_run_errors(mock_client, mock_init_repl_set): + from bigchaindb.backend import connect + from bigchaindb.backend.exceptions import (DuplicateKeyError, + OperationError, + ConnectionError) + + conn = connect() + + query = mock.Mock() + query.run.side_effect = pymongo.errors.AutoReconnect('foo') + with pytest.raises(ConnectionError): + conn.run(query) + assert query.run.call_count == 2 + + query = mock.Mock() + query.run.side_effect = pymongo.errors.DuplicateKeyError('foo') + with pytest.raises(DuplicateKeyError): + conn.run(query) + assert query.run.call_count == 1 + + query = mock.Mock() + query.run.side_effect = pymongo.errors.OperationFailure('foo') + with pytest.raises(OperationError): + conn.run(query) + assert query.run.call_count == 1 + + def test_check_replica_set_not_enabled(mongodb_connection): from bigchaindb.backend.mongodb.connection import _check_replica_set from bigchaindb.common.exceptions import ConfigurationError @@ -138,14 +168,14 @@ def test_initialize_replica_set(mock_cmd_line_opts): ] # check that it returns - assert initialize_replica_set() is None + assert initialize_replica_set('host', 1337, 1000) is None # test it raises OperationError if anything wrong with mock.patch.object(Database, 'command') as mock_command: mock_command.side_effect = [ mock_cmd_line_opts, - OperationFailure(None, details={'codeName': ''}) + pymongo.errors.OperationFailure(None, details={'codeName': ''}) ] - with pytest.raises(OperationFailure): - initialize_replica_set() + with pytest.raises(pymongo.errors.OperationFailure): + initialize_replica_set('host', 1337, 1000) diff --git a/tests/backend/mongodb/test_indexes.py b/tests/backend/mongodb/test_indexes.py index ba6afae1..d11d124c 100644 --- a/tests/backend/mongodb/test_indexes.py +++ b/tests/backend/mongodb/test_indexes.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock pytestmark = pytest.mark.bdb +@pytest.mark.skipif(reason='Will be handled by #1126') def test_asset_id_index(): from bigchaindb.backend.mongodb.query import get_txids_filtered from bigchaindb.backend import connect diff --git a/tests/backend/rethinkdb/test_admin.py b/tests/backend/rethinkdb/test_admin.py index 8c4f0528..0f71ca21 100644 --- a/tests/backend/rethinkdb/test_admin.py +++ b/tests/backend/rethinkdb/test_admin.py @@ -177,8 +177,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn, db_name, db_conn): from bigchaindb.backend.rethinkdb.admin import reconfigure - from bigchaindb.backend.exceptions import DatabaseOpFailedError - with pytest.raises(DatabaseOpFailedError) as exc: + from bigchaindb.backend.exceptions import OperationError + with pytest.raises(OperationError) as exc: reconfigure(db_conn, table='backlog', shards=1, replicas={'default': 1}, primary_replica_tag='default') assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError) @@ -187,8 +187,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn, @pytest.mark.bdb def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn): from bigchaindb.backend.rethinkdb.admin import reconfigure - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError replicas = _count_rethinkdb_servers() + 1 - with pytest.raises(DatabaseOpFailedError) as exc: + with pytest.raises(OperationError) as exc: reconfigure(db_conn, table='backlog', shards=1, replicas=replicas) assert isinstance(exc.value.__cause__, r.ReqlOpFailedError) diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index 95bb0db7..f806eb7c 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -384,7 +384,7 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock, @patch('bigchaindb.commands.bigchain.add_replicas') def test_run_add_replicas(mock_add_replicas): from bigchaindb.commands.bigchain import run_add_replicas - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError args = Namespace(config=None, replicas=['localhost:27017']) @@ -394,8 +394,8 @@ def test_run_add_replicas(mock_add_replicas): assert mock_add_replicas.call_count == 1 mock_add_replicas.reset_mock() - # test add_replicas with `DatabaseOpFailedError` - mock_add_replicas.side_effect = DatabaseOpFailedError() + # test add_replicas with `OperationError` + mock_add_replicas.side_effect = OperationError() assert run_add_replicas(args) is None assert mock_add_replicas.call_count == 1 mock_add_replicas.reset_mock() @@ -411,7 +411,7 @@ def test_run_add_replicas(mock_add_replicas): @patch('bigchaindb.commands.bigchain.remove_replicas') def test_run_remove_replicas(mock_remove_replicas): from bigchaindb.commands.bigchain import run_remove_replicas - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError args = Namespace(config=None, replicas=['localhost:27017']) @@ -421,8 +421,8 @@ def test_run_remove_replicas(mock_remove_replicas): assert mock_remove_replicas.call_count == 1 mock_remove_replicas.reset_mock() - # test add_replicas with `DatabaseOpFailedError` - mock_remove_replicas.side_effect = DatabaseOpFailedError() + # test add_replicas with `OperationError` + mock_remove_replicas.side_effect = OperationError() assert run_remove_replicas(args) is None assert mock_remove_replicas.call_count == 1 mock_remove_replicas.reset_mock() diff --git a/tests/test_utils.py b/tests/test_utils.py index f76ba6d9..fbf5d65d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -137,3 +137,23 @@ def test_is_genesis_block_returns_true_if_genesis(b): from bigchaindb.utils import is_genesis_block genesis_block = b.prepare_genesis_block() assert is_genesis_block(genesis_block) + + +def test_lazy_execution(): + from bigchaindb.utils import Lazy + + l = Lazy() + l.split(',')[1].split(' ').pop(1).strip() + result = l.run('Like humans, cats tend to favor one paw over another') + assert result == 'cats' + + class Cat: + def __init__(self, name): + self.name = name + + cat = Cat('Shmui') + + l = Lazy() + l.name.upper() + result = l.run(cat) + assert result == 'SHMUI'