Normalize exceptions

This commit is contained in:
vrde 2017-02-02 19:18:47 +01:00
parent 1588681c5b
commit 16571b539f
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
10 changed files with 109 additions and 83 deletions

View File

@ -32,6 +32,7 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None):
based on the given (or defaulted) :attr:`backend`.
Raises:
:exc:`~ConnectionError`: If the connection to the database fails.
:exc:`~ConfigurationError`: If the given (or defaulted) :attr:`backend`
is not supported or could not be loaded.
"""
@ -77,6 +78,13 @@ class Connection:
Args:
query: the query to run
Raises:
:exc:`~DuplicateKeyError`: If the query fails because of a
duplicate key constraint.
:exc:`~OperationFailure`: If the query fails for any other
reason.
:exc:`~ConnectionError`: If the connection to the database
fails.
"""
raise NotImplementedError()

View File

@ -1,9 +1,17 @@
from bigchaindb.exceptions import BigchainDBError
class ConnectionError(BigchainDBError):
"""Exception raised when the connection to the DataBase fails."""
class BackendError(BigchainDBError):
"""Top level exception for any backend exception."""
class DatabaseOpFailedError(BigchainDBError):
"""Exception for database operation errors."""
class ConnectionError(BackendError):
"""Exception raised when the connection to the backend fails."""
class OperationError(BackendError):
"""Exception raised when a backend operation fails."""
class DuplicateKeyError(OperationError):
"""Exception raised when an insert fails because the key is not unique"""

View File

@ -1,15 +1,15 @@
import os
import logging
import time
import pymongo
from pymongo import errors
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
from bigchaindb.backend.exceptions import (DatabaseOpFailedError,
ConnectionError)
from bigchaindb.backend.exceptions import BackendError
logger = logging.getLogger(__name__)
register_changefeed = module_dispatch_registrar(backend.changefeed)
@ -30,11 +30,8 @@ class MongoDBChangeFeed(ChangeFeed):
try:
self.run_changefeed()
break
except (errors.ConnectionFailure, errors.OperationFailure,
errors.AutoReconnect,
errors.ServerSelectionTimeoutError,
DatabaseOpFailedError, ConnectionError) as exc:
logger.exception(exc)
except BackendError:
logger.exception('Error connecting to the database, retrying')
time.sleep(1)
def run_changefeed(self):
@ -43,9 +40,10 @@ class MongoDBChangeFeed(ChangeFeed):
namespace = '{}.{}'.format(dbname, table)
# last timestamp in the oplog. We only care for operations happening
# in the future.
last_ts = self.connection.conn.local.oplog.rs.find()\
.sort('$natural', pymongo.DESCENDING).limit(1)\
.next()['ts']
last_ts = self.connection.run(
self.connection.query().local.oplog.rs.find()
.sort('$natural', pymongo.DESCENDING).limit(1)
.next()['ts'])
# tailable cursor. A tailable cursor will remain open even after the
# last result was returned. ``TAILABLE_AWAIT`` will block for some
# timeout after the last result was returned. If no result is received
@ -56,6 +54,7 @@ class MongoDBChangeFeed(ChangeFeed):
)
while cursor.alive:
print(os.getpid(), 'alive')
try:
record = cursor.next()
except StopIteration:

View File

@ -2,13 +2,14 @@ import time
import logging
from itertools import repeat
from pymongo import MongoClient
from pymongo import errors
import pymongo
import bigchaindb
from bigchaindb.utils import Lazy
from bigchaindb.common import exceptions
from bigchaindb.backend import exceptions as backend_exceptions
from bigchaindb.common.exceptions import ConfigurationError
from bigchaindb.backend.exceptions import (DuplicateKeyError,
OperationError,
ConnectionError)
from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
@ -59,18 +60,33 @@ class MongoDBConnection(Connection):
def db(self):
return self.conn[self.dbname]
def query(self):
return Lazy()
def collection(self, name):
"""Return a lazy object that can be used to compose a query.
Args:
name (str): the name of the collection to query.
"""
return self.query()[self.dbname][name]
def run(self, query):
attempt = 0
for i in self.max_tries_counter:
attempt += 1
try:
return query.run(self.conn[self.dbname])
except errors.AutoReconnect:
if attempt == self.max_tries:
raise
self._connect()
try:
return query.run(self.conn)
except pymongo.errors.DuplicateKeyError as exc:
raise DuplicateKeyError from exc
except pymongo.errors.OperationFailure as exc:
raise OperationError from exc
def _connect(self):
"""Try to connect to the database.
Raises:
:exc:`~ConnectionError`: If the connection to the database
fails.
"""
attempt = 0
for i in self.max_tries_counter:
attempt += 1
@ -83,30 +99,24 @@ class MongoDBConnection(Connection):
# FYI: this might raise a `ServerSelectionTimeoutError`,
# that is a subclass of `ConnectionFailure`.
self.connection = MongoClient(self.host,
self.port,
replicaset=self.replicaset,
serverselectiontimeoutms=self.connection_timeout)
except (errors.ConnectionFailure, errors.AutoReconnect) as exc:
self.connection = pymongo.MongoClient(self.host,
self.port,
replicaset=self.replicaset,
serverselectiontimeoutms=self.connection_timeout)
# `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`.
except (pymongo.errors.ConnectionFailure,
pymongo.errors.OperationFailure) as exc:
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
attempt, self.max_tries if self.max_tries != 0 else '',
self.host, self.port, self.connection_timeout)
if attempt == self.max_tries:
logger.critical('Cannot connect to the Database. Giving up.')
raise backend_exceptions.ConnectionError() from exc
raise ConnectionError() from exc
else:
break
def collection(name):
"""Return a lazy object that can be used to compose a query.
Args:
name (str): the name of the collection to query.
"""
return Lazy()[name]
def initialize_replica_set(host, port, connection_timeout):
"""Initialize a replica set. If already initialized skip."""
@ -114,7 +124,7 @@ def initialize_replica_set(host, port, connection_timeout):
# The reason we do this instead of `backend.connect` is that
# `backend.connect` will connect you to a replica set but this fails if
# you try to connect to a replica set that is not yet initialized
conn = MongoClient(host=host,
conn = pymongo.MongoClient(host=host,
port=port,
serverselectiontimeoutms=connection_timeout)
_check_replica_set(conn)
@ -125,7 +135,7 @@ def initialize_replica_set(host, port, connection_timeout):
try:
conn.admin.command('replSetInitiate', config)
except errors.OperationFailure as exc_info:
except pymongo.errors.OperationFailure as exc_info:
if exc_info.details['codeName'] == 'AlreadyInitialized':
return
raise
@ -153,12 +163,12 @@ def _check_replica_set(conn):
repl_opts = options['parsed']['replication']
repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet']
except KeyError:
raise exceptions.ConfigurationError('mongod was not started with'
raise ConfigurationError('mongod was not started with'
' the replSet option.')
bdb_repl_set_name = bigchaindb.config['database']['replicaset']
if repl_set_name != bdb_repl_set_name:
raise exceptions.ConfigurationError('The replicaset configuration of '
raise ConfigurationError('The replicaset configuration of '
'bigchaindb (`{}`) needs to match '
'the replica set name from MongoDB'
' (`{}`)'

View File

@ -10,8 +10,9 @@ from pymongo import errors
from bigchaindb import backend
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.common.transaction import Transaction
from bigchaindb.backend.exceptions import DuplicateKeyError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection, collection
from bigchaindb.backend.mongodb.connection import MongoDBConnection
register_query = module_dispatch_registrar(backend.query)
@ -21,9 +22,9 @@ register_query = module_dispatch_registrar(backend.query)
def write_transaction(conn, signed_transaction):
try:
return conn.run(
collection('backlog')
conn.collection('backlog')
.insert_one(signed_transaction))
except errors.DuplicateKeyError:
except DuplicateKeyError:
return
@ -32,7 +33,7 @@ def update_transaction(conn, transaction_id, doc):
# with mongodb we need to add update operators to the doc
doc = {'$set': doc}
return conn.run(
collection('backlog')
conn.collection('backlog')
.find_one_and_update(
{'id': transaction_id},
doc,
@ -42,14 +43,14 @@ def update_transaction(conn, transaction_id, doc):
@register_query(MongoDBConnection)
def delete_transaction(conn, *transaction_id):
return conn.run(
collection('backlog')
conn.collection('backlog')
.delete_many({'id': {'$in': transaction_id}}))
@register_query(MongoDBConnection)
def get_stale_transactions(conn, reassign_delay):
return conn.run(
collection('backlog')
conn.collection('backlog')
.find({'assignment_timestamp': {'$lt': time() - reassign_delay}},
projection={'_id': False}))
@ -58,7 +59,7 @@ def get_stale_transactions(conn, reassign_delay):
def get_transaction_from_block(conn, transaction_id, block_id):
try:
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.aggregate([
{'$match': {'id': block_id}},
{'$project': {
@ -84,7 +85,7 @@ def get_transaction_from_block(conn, transaction_id, block_id):
@register_query(MongoDBConnection)
def get_transaction_from_backlog(conn, transaction_id):
return conn.run(
collection('backlog')
conn.collection('backlog')
.find_one({'id': transaction_id},
projection={'_id': False,
'assignee': False,
@ -94,7 +95,7 @@ def get_transaction_from_backlog(conn, transaction_id):
@register_query(MongoDBConnection)
def get_blocks_status_from_transaction(conn, transaction_id):
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.find({'block.transactions.id': transaction_id},
projection=['id', 'block.voters']))
@ -139,7 +140,7 @@ def get_txids_filtered(conn, asset_id, operation=None):
@register_query(MongoDBConnection)
def get_asset_by_id(conn, asset_id):
cursor = conn.run(
collection('bigchain')
conn.collection('bigchain')
.aggregate([
{'$match': {
'block.transactions.id': asset_id,
@ -160,7 +161,7 @@ def get_asset_by_id(conn, asset_id):
@register_query(MongoDBConnection)
def get_spent(conn, transaction_id, output):
cursor = conn.run(
collection('bigchain').aggregate([
conn.collection('bigchain').aggregate([
{'$unwind': '$block.transactions'},
{'$match': {
'block.transactions.inputs.fulfills.txid': transaction_id,
@ -175,7 +176,7 @@ def get_spent(conn, transaction_id, output):
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner):
cursor = conn.run(
collection('bigchain')
conn.collection('bigchain')
.aggregate([
{'$unwind': '$block.transactions'},
{'$match': {
@ -192,7 +193,7 @@ def get_owned_ids(conn, owner):
@register_query(MongoDBConnection)
def get_votes_by_block_id(conn, block_id):
return conn.run(
collection('votes')
conn.collection('votes')
.find({'vote.voting_for_block': block_id},
projection={'_id': False}))
@ -200,7 +201,7 @@ def get_votes_by_block_id(conn, block_id):
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.run(
collection('votes')
conn.collection('votes')
.find({'vote.voting_for_block': block_id,
'node_pubkey': node_pubkey},
projection={'_id': False}))
@ -209,14 +210,14 @@ def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
@register_query(MongoDBConnection)
def write_block(conn, block):
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.insert_one(block.to_dict()))
@register_query(MongoDBConnection)
def get_block(conn, block_id):
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.find_one({'id': block_id},
projection={'_id': False}))
@ -224,27 +225,27 @@ def get_block(conn, block_id):
@register_query(MongoDBConnection)
def has_transaction(conn, transaction_id):
return bool(conn.run(
collection('bigchain')
conn.collection('bigchain')
.find_one({'block.transactions.id': transaction_id})))
@register_query(MongoDBConnection)
def count_blocks(conn):
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.count())
@register_query(MongoDBConnection)
def count_backlog(conn):
return conn.run(
collection('backlog')
conn.collection('backlog')
.count())
@register_query(MongoDBConnection)
def write_vote(conn, vote):
conn.run(collection('votes').insert_one(vote))
conn.run(conn.collection('votes').insert_one(vote))
vote.pop('_id')
return vote
@ -252,7 +253,7 @@ def write_vote(conn, vote):
@register_query(MongoDBConnection)
def get_genesis_block(conn):
return conn.run(
collection('bigchain')
conn.collection('bigchain')
.find_one(
{'block.transactions.0.operation': 'GENESIS'},
{'_id': False}
@ -262,7 +263,7 @@ def get_genesis_block(conn):
@register_query(MongoDBConnection)
def get_last_voted_block(conn, node_pubkey):
last_voted = conn.run(
collection('votes')
conn.collection('votes')
.find({'node_pubkey': node_pubkey},
sort=[('vote.timestamp', -1)]))

View File

@ -5,7 +5,7 @@ import rethinkdb as r
from bigchaindb.backend import admin
from bigchaindb.backend.schema import TABLES
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.backend.exceptions import OperationError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
@ -78,7 +78,7 @@ def reconfigure(connection, *, table, shards, replicas,
<https://rethinkdb.com/api/python/reconfigure/>`_.
Raises:
DatabaseOpFailedError: If the reconfiguration fails due to a
OperationError: If the reconfiguration fails due to a
RethinkDB :exc:`ReqlOpFailedError` or
:exc:`ReqlQueryLogicError`.
@ -96,7 +96,7 @@ def reconfigure(connection, *, table, shards, replicas,
try:
return connection.run(r.table(table).reconfigure(**params))
except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e:
raise DatabaseOpFailedError from e
raise OperationError from e
@register_admin(RethinkDBConnection)

View File

@ -23,7 +23,7 @@ from bigchaindb.utils import ProcessGroup
from bigchaindb import backend
from bigchaindb.backend import schema
from bigchaindb.backend.admin import set_replicas, set_shards
from bigchaindb.backend.exceptions import (DatabaseOpFailedError,
from bigchaindb.backend.exceptions import (OperationError,
ConnectionError)
from bigchaindb.commands import utils
from bigchaindb import processes
@ -252,7 +252,7 @@ def run_set_shards(args):
conn = backend.connect()
try:
set_shards(conn, shards=args.num_shards)
except DatabaseOpFailedError as e:
except OperationError as e:
logger.warn(e)
@ -260,7 +260,7 @@ def run_set_replicas(args):
conn = backend.connect()
try:
set_replicas(conn, replicas=args.num_replicas)
except DatabaseOpFailedError as e:
except OperationError as e:
logger.warn(e)

View File

@ -1,7 +1,6 @@
from unittest import mock
import pytest
from pymongo.errors import ConnectionFailure
from multipipes import Pipe
@ -152,14 +151,15 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive,
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa
@mock.patch('bigchaindb.backend.mongodb.connection.MongoDBConnection.run') # noqa
def test_connection_failure(mock_run_changefeed, mock_cursor_alive):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.exceptions import ConnectionError
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.return_value = False
mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT]
mock_run_changefeed.side_effect = [ConnectionError(), mock.DEFAULT]
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT)
changefeed.run_forever()

View File

@ -58,7 +58,7 @@ def test_connection_error(mock_sleep, mock_client, mock_init_repl_set):
from bigchaindb.backend import connect
from bigchaindb.backend.exceptions import ConnectionError
# force the driver to trow ConnectionFailure
# force the driver to throw ConnectionFailure
# the mock on time.sleep is to prevent the actual sleep when running
# the tests
mock_client.side_effect = ConnectionFailure()

View File

@ -177,8 +177,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn,
db_name,
db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
from bigchaindb.backend.exceptions import DatabaseOpFailedError
with pytest.raises(DatabaseOpFailedError) as exc:
from bigchaindb.backend.exceptions import OperationError
with pytest.raises(OperationError) as exc:
reconfigure(db_conn, table='backlog', shards=1,
replicas={'default': 1}, primary_replica_tag='default')
assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError)
@ -187,8 +187,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn,
@pytest.mark.bdb
def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn):
from bigchaindb.backend.rethinkdb.admin import reconfigure
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.backend.exceptions import OperationError
replicas = _count_rethinkdb_servers() + 1
with pytest.raises(DatabaseOpFailedError) as exc:
with pytest.raises(OperationError) as exc:
reconfigure(db_conn, table='backlog', shards=1, replicas=replicas)
assert isinstance(exc.value.__cause__, r.ReqlOpFailedError)