diff --git a/bigchaindb/backend/connection.py b/bigchaindb/backend/connection.py index 0fda4078..bb4688da 100644 --- a/bigchaindb/backend/connection.py +++ b/bigchaindb/backend/connection.py @@ -32,6 +32,7 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None): based on the given (or defaulted) :attr:`backend`. Raises: + :exc:`~ConnectionError`: If the connection to the database fails. :exc:`~ConfigurationError`: If the given (or defaulted) :attr:`backend` is not supported or could not be loaded. """ @@ -77,6 +78,13 @@ class Connection: Args: query: the query to run + Raises: + :exc:`~DuplicateKeyError`: If the query fails because of a + duplicate key constraint. + :exc:`~OperationFailure`: If the query fails for any other + reason. + :exc:`~ConnectionError`: If the connection to the database + fails. """ raise NotImplementedError() diff --git a/bigchaindb/backend/exceptions.py b/bigchaindb/backend/exceptions.py index 41bac0c6..017e19e4 100644 --- a/bigchaindb/backend/exceptions.py +++ b/bigchaindb/backend/exceptions.py @@ -1,9 +1,17 @@ from bigchaindb.exceptions import BigchainDBError -class ConnectionError(BigchainDBError): - """Exception raised when the connection to the DataBase fails.""" +class BackendError(BigchainDBError): + """Top level exception for any backend exception.""" -class DatabaseOpFailedError(BigchainDBError): - """Exception for database operation errors.""" +class ConnectionError(BackendError): + """Exception raised when the connection to the backend fails.""" + + +class OperationError(BackendError): + """Exception raised when a backend operation fails.""" + + +class DuplicateKeyError(OperationError): + """Exception raised when an insert fails because the key is not unique""" diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index ecf36bcc..0cf37943 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -1,15 +1,15 @@ +import os import logging import time import pymongo -from pymongo import errors from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.mongodb.connection import MongoDBConnection -from bigchaindb.backend.exceptions import (DatabaseOpFailedError, - ConnectionError) +from bigchaindb.backend.exceptions import BackendError + logger = logging.getLogger(__name__) register_changefeed = module_dispatch_registrar(backend.changefeed) @@ -30,11 +30,8 @@ class MongoDBChangeFeed(ChangeFeed): try: self.run_changefeed() break - except (errors.ConnectionFailure, errors.OperationFailure, - errors.AutoReconnect, - errors.ServerSelectionTimeoutError, - DatabaseOpFailedError, ConnectionError) as exc: - logger.exception(exc) + except BackendError: + logger.exception('Error connecting to the database, retrying') time.sleep(1) def run_changefeed(self): @@ -43,9 +40,10 @@ class MongoDBChangeFeed(ChangeFeed): namespace = '{}.{}'.format(dbname, table) # last timestamp in the oplog. We only care for operations happening # in the future. - last_ts = self.connection.conn.local.oplog.rs.find()\ - .sort('$natural', pymongo.DESCENDING).limit(1)\ - .next()['ts'] + last_ts = self.connection.run( + self.connection.query().local.oplog.rs.find() + .sort('$natural', pymongo.DESCENDING).limit(1) + .next()['ts']) # tailable cursor. A tailable cursor will remain open even after the # last result was returned. ``TAILABLE_AWAIT`` will block for some # timeout after the last result was returned. If no result is received @@ -56,6 +54,7 @@ class MongoDBChangeFeed(ChangeFeed): ) while cursor.alive: + print(os.getpid(), 'alive') try: record = cursor.next() except StopIteration: diff --git a/bigchaindb/backend/mongodb/connection.py b/bigchaindb/backend/mongodb/connection.py index 79a085fb..5bd0d017 100644 --- a/bigchaindb/backend/mongodb/connection.py +++ b/bigchaindb/backend/mongodb/connection.py @@ -2,13 +2,14 @@ import time import logging from itertools import repeat -from pymongo import MongoClient -from pymongo import errors +import pymongo import bigchaindb from bigchaindb.utils import Lazy -from bigchaindb.common import exceptions -from bigchaindb.backend import exceptions as backend_exceptions +from bigchaindb.common.exceptions import ConfigurationError +from bigchaindb.backend.exceptions import (DuplicateKeyError, + OperationError, + ConnectionError) from bigchaindb.backend.connection import Connection logger = logging.getLogger(__name__) @@ -59,18 +60,33 @@ class MongoDBConnection(Connection): def db(self): return self.conn[self.dbname] + def query(self): + return Lazy() + + def collection(self, name): + """Return a lazy object that can be used to compose a query. + + Args: + name (str): the name of the collection to query. + """ + return self.query()[self.dbname][name] + def run(self, query): - attempt = 0 - for i in self.max_tries_counter: - attempt += 1 - try: - return query.run(self.conn[self.dbname]) - except errors.AutoReconnect: - if attempt == self.max_tries: - raise - self._connect() + try: + return query.run(self.conn) + except pymongo.errors.DuplicateKeyError as exc: + raise DuplicateKeyError from exc + except pymongo.errors.OperationFailure as exc: + raise OperationError from exc def _connect(self): + """Try to connect to the database. + + Raises: + :exc:`~ConnectionError`: If the connection to the database + fails. + """ + attempt = 0 for i in self.max_tries_counter: attempt += 1 @@ -83,30 +99,24 @@ class MongoDBConnection(Connection): # FYI: this might raise a `ServerSelectionTimeoutError`, # that is a subclass of `ConnectionFailure`. - self.connection = MongoClient(self.host, - self.port, - replicaset=self.replicaset, - serverselectiontimeoutms=self.connection_timeout) - except (errors.ConnectionFailure, errors.AutoReconnect) as exc: + self.connection = pymongo.MongoClient(self.host, + self.port, + replicaset=self.replicaset, + serverselectiontimeoutms=self.connection_timeout) + + # `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`. + except (pymongo.errors.ConnectionFailure, + pymongo.errors.OperationFailure) as exc: logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.', attempt, self.max_tries if self.max_tries != 0 else '∞', self.host, self.port, self.connection_timeout) if attempt == self.max_tries: logger.critical('Cannot connect to the Database. Giving up.') - raise backend_exceptions.ConnectionError() from exc + raise ConnectionError() from exc else: break -def collection(name): - """Return a lazy object that can be used to compose a query. - - Args: - name (str): the name of the collection to query. - """ - return Lazy()[name] - - def initialize_replica_set(host, port, connection_timeout): """Initialize a replica set. If already initialized skip.""" @@ -114,7 +124,7 @@ def initialize_replica_set(host, port, connection_timeout): # The reason we do this instead of `backend.connect` is that # `backend.connect` will connect you to a replica set but this fails if # you try to connect to a replica set that is not yet initialized - conn = MongoClient(host=host, + conn = pymongo.MongoClient(host=host, port=port, serverselectiontimeoutms=connection_timeout) _check_replica_set(conn) @@ -125,7 +135,7 @@ def initialize_replica_set(host, port, connection_timeout): try: conn.admin.command('replSetInitiate', config) - except errors.OperationFailure as exc_info: + except pymongo.errors.OperationFailure as exc_info: if exc_info.details['codeName'] == 'AlreadyInitialized': return raise @@ -153,12 +163,12 @@ def _check_replica_set(conn): repl_opts = options['parsed']['replication'] repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet'] except KeyError: - raise exceptions.ConfigurationError('mongod was not started with' + raise ConfigurationError('mongod was not started with' ' the replSet option.') bdb_repl_set_name = bigchaindb.config['database']['replicaset'] if repl_set_name != bdb_repl_set_name: - raise exceptions.ConfigurationError('The replicaset configuration of ' + raise ConfigurationError('The replicaset configuration of ' 'bigchaindb (`{}`) needs to match ' 'the replica set name from MongoDB' ' (`{}`)' diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index b4739dc2..e68608d7 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -10,8 +10,9 @@ from pymongo import errors from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError from bigchaindb.common.transaction import Transaction +from bigchaindb.backend.exceptions import DuplicateKeyError from bigchaindb.backend.utils import module_dispatch_registrar -from bigchaindb.backend.mongodb.connection import MongoDBConnection, collection +from bigchaindb.backend.mongodb.connection import MongoDBConnection register_query = module_dispatch_registrar(backend.query) @@ -21,9 +22,9 @@ register_query = module_dispatch_registrar(backend.query) def write_transaction(conn, signed_transaction): try: return conn.run( - collection('backlog') + conn.collection('backlog') .insert_one(signed_transaction)) - except errors.DuplicateKeyError: + except DuplicateKeyError: return @@ -32,7 +33,7 @@ def update_transaction(conn, transaction_id, doc): # with mongodb we need to add update operators to the doc doc = {'$set': doc} return conn.run( - collection('backlog') + conn.collection('backlog') .find_one_and_update( {'id': transaction_id}, doc, @@ -42,14 +43,14 @@ def update_transaction(conn, transaction_id, doc): @register_query(MongoDBConnection) def delete_transaction(conn, *transaction_id): return conn.run( - collection('backlog') + conn.collection('backlog') .delete_many({'id': {'$in': transaction_id}})) @register_query(MongoDBConnection) def get_stale_transactions(conn, reassign_delay): return conn.run( - collection('backlog') + conn.collection('backlog') .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, projection={'_id': False})) @@ -58,7 +59,7 @@ def get_stale_transactions(conn, reassign_delay): def get_transaction_from_block(conn, transaction_id, block_id): try: return conn.run( - collection('bigchain') + conn.collection('bigchain') .aggregate([ {'$match': {'id': block_id}}, {'$project': { @@ -84,7 +85,7 @@ def get_transaction_from_block(conn, transaction_id, block_id): @register_query(MongoDBConnection) def get_transaction_from_backlog(conn, transaction_id): return conn.run( - collection('backlog') + conn.collection('backlog') .find_one({'id': transaction_id}, projection={'_id': False, 'assignee': False, @@ -94,7 +95,7 @@ def get_transaction_from_backlog(conn, transaction_id): @register_query(MongoDBConnection) def get_blocks_status_from_transaction(conn, transaction_id): return conn.run( - collection('bigchain') + conn.collection('bigchain') .find({'block.transactions.id': transaction_id}, projection=['id', 'block.voters'])) @@ -139,7 +140,7 @@ def get_txids_filtered(conn, asset_id, operation=None): @register_query(MongoDBConnection) def get_asset_by_id(conn, asset_id): cursor = conn.run( - collection('bigchain') + conn.collection('bigchain') .aggregate([ {'$match': { 'block.transactions.id': asset_id, @@ -160,7 +161,7 @@ def get_asset_by_id(conn, asset_id): @register_query(MongoDBConnection) def get_spent(conn, transaction_id, output): cursor = conn.run( - collection('bigchain').aggregate([ + conn.collection('bigchain').aggregate([ {'$unwind': '$block.transactions'}, {'$match': { 'block.transactions.inputs.fulfills.txid': transaction_id, @@ -175,7 +176,7 @@ def get_spent(conn, transaction_id, output): @register_query(MongoDBConnection) def get_owned_ids(conn, owner): cursor = conn.run( - collection('bigchain') + conn.collection('bigchain') .aggregate([ {'$unwind': '$block.transactions'}, {'$match': { @@ -192,7 +193,7 @@ def get_owned_ids(conn, owner): @register_query(MongoDBConnection) def get_votes_by_block_id(conn, block_id): return conn.run( - collection('votes') + conn.collection('votes') .find({'vote.voting_for_block': block_id}, projection={'_id': False})) @@ -200,7 +201,7 @@ def get_votes_by_block_id(conn, block_id): @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): return conn.run( - collection('votes') + conn.collection('votes') .find({'vote.voting_for_block': block_id, 'node_pubkey': node_pubkey}, projection={'_id': False})) @@ -209,14 +210,14 @@ def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): @register_query(MongoDBConnection) def write_block(conn, block): return conn.run( - collection('bigchain') + conn.collection('bigchain') .insert_one(block.to_dict())) @register_query(MongoDBConnection) def get_block(conn, block_id): return conn.run( - collection('bigchain') + conn.collection('bigchain') .find_one({'id': block_id}, projection={'_id': False})) @@ -224,27 +225,27 @@ def get_block(conn, block_id): @register_query(MongoDBConnection) def has_transaction(conn, transaction_id): return bool(conn.run( - collection('bigchain') + conn.collection('bigchain') .find_one({'block.transactions.id': transaction_id}))) @register_query(MongoDBConnection) def count_blocks(conn): return conn.run( - collection('bigchain') + conn.collection('bigchain') .count()) @register_query(MongoDBConnection) def count_backlog(conn): return conn.run( - collection('backlog') + conn.collection('backlog') .count()) @register_query(MongoDBConnection) def write_vote(conn, vote): - conn.run(collection('votes').insert_one(vote)) + conn.run(conn.collection('votes').insert_one(vote)) vote.pop('_id') return vote @@ -252,7 +253,7 @@ def write_vote(conn, vote): @register_query(MongoDBConnection) def get_genesis_block(conn): return conn.run( - collection('bigchain') + conn.collection('bigchain') .find_one( {'block.transactions.0.operation': 'GENESIS'}, {'_id': False} @@ -262,7 +263,7 @@ def get_genesis_block(conn): @register_query(MongoDBConnection) def get_last_voted_block(conn, node_pubkey): last_voted = conn.run( - collection('votes') + conn.collection('votes') .find({'node_pubkey': node_pubkey}, sort=[('vote.timestamp', -1)])) diff --git a/bigchaindb/backend/rethinkdb/admin.py b/bigchaindb/backend/rethinkdb/admin.py index 63548f87..23b55048 100644 --- a/bigchaindb/backend/rethinkdb/admin.py +++ b/bigchaindb/backend/rethinkdb/admin.py @@ -5,7 +5,7 @@ import rethinkdb as r from bigchaindb.backend import admin from bigchaindb.backend.schema import TABLES -from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection @@ -78,7 +78,7 @@ def reconfigure(connection, *, table, shards, replicas, `_. Raises: - DatabaseOpFailedError: If the reconfiguration fails due to a + OperationError: If the reconfiguration fails due to a RethinkDB :exc:`ReqlOpFailedError` or :exc:`ReqlQueryLogicError`. @@ -96,7 +96,7 @@ def reconfigure(connection, *, table, shards, replicas, try: return connection.run(r.table(table).reconfigure(**params)) except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e: - raise DatabaseOpFailedError from e + raise OperationError from e @register_admin(RethinkDBConnection) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 4e18543d..89b84dba 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -23,7 +23,7 @@ from bigchaindb.utils import ProcessGroup from bigchaindb import backend from bigchaindb.backend import schema from bigchaindb.backend.admin import set_replicas, set_shards -from bigchaindb.backend.exceptions import (DatabaseOpFailedError, +from bigchaindb.backend.exceptions import (OperationError, ConnectionError) from bigchaindb.commands import utils from bigchaindb import processes @@ -252,7 +252,7 @@ def run_set_shards(args): conn = backend.connect() try: set_shards(conn, shards=args.num_shards) - except DatabaseOpFailedError as e: + except OperationError as e: logger.warn(e) @@ -260,7 +260,7 @@ def run_set_replicas(args): conn = backend.connect() try: set_replicas(conn, replicas=args.num_replicas) - except DatabaseOpFailedError as e: + except OperationError as e: logger.warn(e) diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index e7581b34..de07613c 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -1,7 +1,6 @@ from unittest import mock import pytest -from pymongo.errors import ConnectionFailure from multipipes import Pipe @@ -152,14 +151,15 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive, @pytest.mark.bdb @mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) -@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa +@mock.patch('bigchaindb.backend.mongodb.connection.MongoDBConnection.run') # noqa def test_connection_failure(mock_run_changefeed, mock_cursor_alive): from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.exceptions import ConnectionError from bigchaindb.backend.changefeed import ChangeFeed conn = connect() mock_cursor_alive.return_value = False - mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT] + mock_run_changefeed.side_effect = [ConnectionError(), mock.DEFAULT] changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT) changefeed.run_forever() diff --git a/tests/backend/mongodb/test_connection.py b/tests/backend/mongodb/test_connection.py index 60181f25..feda77ed 100644 --- a/tests/backend/mongodb/test_connection.py +++ b/tests/backend/mongodb/test_connection.py @@ -58,7 +58,7 @@ def test_connection_error(mock_sleep, mock_client, mock_init_repl_set): from bigchaindb.backend import connect from bigchaindb.backend.exceptions import ConnectionError - # force the driver to trow ConnectionFailure + # force the driver to throw ConnectionFailure # the mock on time.sleep is to prevent the actual sleep when running # the tests mock_client.side_effect = ConnectionFailure() diff --git a/tests/backend/rethinkdb/test_admin.py b/tests/backend/rethinkdb/test_admin.py index 8c4f0528..0f71ca21 100644 --- a/tests/backend/rethinkdb/test_admin.py +++ b/tests/backend/rethinkdb/test_admin.py @@ -177,8 +177,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn, db_name, db_conn): from bigchaindb.backend.rethinkdb.admin import reconfigure - from bigchaindb.backend.exceptions import DatabaseOpFailedError - with pytest.raises(DatabaseOpFailedError) as exc: + from bigchaindb.backend.exceptions import OperationError + with pytest.raises(OperationError) as exc: reconfigure(db_conn, table='backlog', shards=1, replicas={'default': 1}, primary_replica_tag='default') assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError) @@ -187,8 +187,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn, @pytest.mark.bdb def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn): from bigchaindb.backend.rethinkdb.admin import reconfigure - from bigchaindb.backend.exceptions import DatabaseOpFailedError + from bigchaindb.backend.exceptions import OperationError replicas = _count_rethinkdb_servers() + 1 - with pytest.raises(DatabaseOpFailedError) as exc: + with pytest.raises(OperationError) as exc: reconfigure(db_conn, table='backlog', shards=1, replicas=replicas) assert isinstance(exc.value.__cause__, r.ReqlOpFailedError)