From c11808ecc55c62022ab286f9044670972f824c87 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 23 Feb 2017 17:20:21 +0100 Subject: [PATCH] Move common stuff to generic Connection class --- bigchaindb/__init__.py | 4 ++ bigchaindb/backend/connection.py | 54 ++++++++++++++- bigchaindb/backend/mongodb/connection.py | 79 ++++++---------------- bigchaindb/backend/rethinkdb/connection.py | 51 ++------------ tests/backend/mongodb/test_admin.py | 2 +- tests/test_config_utils.py | 5 ++ tests/test_core.py | 2 + 7 files changed, 90 insertions(+), 107 deletions(-) diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 10e9e6ce..1df2551c 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -10,6 +10,8 @@ _database_rethinkdb = { 'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'), 'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 28015)), 'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'), + 'connection_timeout': 5000, + 'max_tries': 3, } _database_mongodb = { @@ -18,6 +20,8 @@ _database_mongodb = { 'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 27017)), 'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'), 'replicaset': os.environ.get('BIGCHAINDB_DATABASE_REPLICASET', 'bigchain-rs'), + 'connection_timeout': 5000, + 'max_tries': 3, } _database_map = { diff --git a/bigchaindb/backend/connection.py b/bigchaindb/backend/connection.py index df21321d..d0913cf6 100644 --- a/bigchaindb/backend/connection.py +++ b/bigchaindb/backend/connection.py @@ -1,8 +1,10 @@ +from itertools import repeat from importlib import import_module import logging import bigchaindb from bigchaindb.common.exceptions import ConfigurationError +from bigchaindb.backend.exceptions import ConnectionError BACKENDS = { @@ -13,7 +15,8 @@ BACKENDS = { logger = logging.getLogger(__name__) -def connect(backend=None, host=None, port=None, name=None, replicaset=None): +def connect(backend=None, host=None, port=None, name=None, max_tries=None, + connection_timeout=None, replicaset=None): """Create a new connection to the database backend. All arguments default to the current configuration's values if not @@ -58,7 +61,7 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None): raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc logger.debug('Connection: {}'.format(Class)) - return Class(host, port, dbname, replicaset=replicaset) + return Class(host=host, port=port, dbname=dbname, replicaset=replicaset) class Connection: @@ -68,17 +71,41 @@ class Connection: from and implements this class. """ - def __init__(self, host=None, port=None, dbname=None, *args, **kwargs): + def __init__(self, host=None, port=None, dbname=None, + connection_timeout=None, max_tries=None, + **kwargs): """Create a new :class:`~.Connection` instance. Args: host (str): the host to connect to. port (int): the port to connect to. dbname (str): the name of the database to use. + connection_timeout (int, optional): the milliseconds to wait + until timing out the database connection attempt. + Defaults to 5000ms. + max_tries (int, optional): how many tries before giving up, + if 0 then try forever. Defaults to 3. **kwargs: arbitrary keyword arguments provided by the configuration's ``database`` settings """ + dbconf = bigchaindb.config['database'] + + self.host = host or dbconf['host'] + self.port = port or dbconf['port'] + self.dbname = dbname or dbconf['name'] + self.connection_timeout = connection_timeout if connection_timeout is not None\ + else dbconf['connection_timeout'] + self.max_tries = max_tries if max_tries is not None else dbconf['max_tries'] + self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0) + self._conn = None + + @property + def conn(self): + if self._conn is None: + self.connect() + return self._conn + def run(self, query): """Run a query. @@ -94,3 +121,24 @@ class Connection: """ raise NotImplementedError() + + def connect(self): + """Try to connect to the database. + + Raises: + :exc:`~ConnectionError`: If the connection to the database + fails. + """ + + attempt = 0 + for i in self.max_tries_counter: + attempt += 1 + try: + self._conn = self._connect() + except ConnectionError as exc: + logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.', + attempt, self.max_tries if self.max_tries != 0 else '∞', + self.host, self.port, self.connection_timeout) + if attempt == self.max_tries: + logger.critical('Cannot connect to the Database. Giving up.') + raise ConnectionError() from exc diff --git a/bigchaindb/backend/mongodb/connection.py b/bigchaindb/backend/mongodb/connection.py index d01d5861..271d0e8e 100644 --- a/bigchaindb/backend/mongodb/connection.py +++ b/bigchaindb/backend/mongodb/connection.py @@ -1,6 +1,5 @@ import time import logging -from itertools import repeat import pymongo @@ -15,46 +14,20 @@ from bigchaindb.backend.connection import Connection logger = logging.getLogger(__name__) -# TODO: waiting for #1082 to be merged -# to move this constants in the configuration. - -CONNECTION_TIMEOUT = 4000 # in milliseconds -MAX_RETRIES = 3 # number of tries before giving up, if 0 then try forever - - class MongoDBConnection(Connection): - def __init__(self, host=None, port=None, dbname=None, - connection_timeout=None, max_tries=None, - replicaset=None): + def __init__(self, replicaset=None, **kwargs): """Create a new Connection instance. Args: - host (str, optional): the host to connect to. - port (int, optional): the port to connect to. - dbname (str, optional): the database to use. - connection_timeout (int, optional): the milliseconds to wait - until timing out the database connection attempt. - max_tries (int, optional): how many tries before giving up, - if 0 then try forever. replicaset (str, optional): the name of the replica set to connect to. + **kwargs: arbitrary keyword arguments provided by the + configuration's ``database`` settings """ - self.host = host or bigchaindb.config['database']['host'] - self.port = port or bigchaindb.config['database']['port'] + super().__init__(**kwargs) self.replicaset = replicaset or bigchaindb.config['database']['replicaset'] - self.dbname = dbname or bigchaindb.config['database']['name'] - self.connection_timeout = connection_timeout if connection_timeout is not None else CONNECTION_TIMEOUT - self.max_tries = max_tries if max_tries is not None else MAX_RETRIES - self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0) - self.connection = None - - @property - def conn(self): - if self.connection is None: - self._connect() - return self.connection @property def db(self): @@ -94,34 +67,23 @@ class MongoDBConnection(Connection): fails. """ - attempt = 0 - for i in self.max_tries_counter: - attempt += 1 + try: + # we should only return a connection if the replica set is + # initialized. initialize_replica_set will check if the + # replica set is initialized else it will initialize it. + initialize_replica_set(self.host, self.port, self.connection_timeout) - try: - # we should only return a connection if the replica set is - # initialized. initialize_replica_set will check if the - # replica set is initialized else it will initialize it. - initialize_replica_set(self.host, self.port, self.connection_timeout) + # FYI: this might raise a `ServerSelectionTimeoutError`, + # that is a subclass of `ConnectionFailure`. + return pymongo.MongoClient(self.host, + self.port, + replicaset=self.replicaset, + serverselectiontimeoutms=self.connection_timeout) - # FYI: this might raise a `ServerSelectionTimeoutError`, - # that is a subclass of `ConnectionFailure`. - self.connection = pymongo.MongoClient(self.host, - self.port, - replicaset=self.replicaset, - serverselectiontimeoutms=self.connection_timeout) - - # `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`. - except (pymongo.errors.ConnectionFailure, - pymongo.errors.OperationFailure) as exc: - logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.', - attempt, self.max_tries if self.max_tries != 0 else '∞', - self.host, self.port, self.connection_timeout) - if attempt == self.max_tries: - logger.critical('Cannot connect to the Database. Giving up.') - raise ConnectionError() from exc - else: - break + # `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`. + except (pymongo.errors.ConnectionFailure, + pymongo.errors.OperationFailure) as exc: + raise ConnectionError() from exc def initialize_replica_set(host, port, connection_timeout): @@ -166,9 +128,10 @@ def _check_replica_set(conn): replSet option. """ options = conn.admin.command('getCmdLineOpts') + print(options) try: repl_opts = options['parsed']['replication'] - repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet'] + repl_set_name = repl_opts.get('replSetName', repl_opts.get('replSet')) except KeyError: raise ConfigurationError('mongod was not started with' ' the replSet option.') diff --git a/bigchaindb/backend/rethinkdb/connection.py b/bigchaindb/backend/rethinkdb/connection.py index 988573f6..e4d2c524 100644 --- a/bigchaindb/backend/rethinkdb/connection.py +++ b/bigchaindb/backend/rethinkdb/connection.py @@ -1,11 +1,7 @@ -import time -import logging - import rethinkdb as r from bigchaindb.backend.connection import Connection - -logger = logging.getLogger(__name__) +from bigchaindb.backend.exceptions import ConnectionError class RethinkDBConnection(Connection): @@ -17,23 +13,6 @@ class RethinkDBConnection(Connection): more times to run the query or open a connection. """ - def __init__(self, host, port, dbname, max_tries=3, **kwargs): - """Create a new :class:`~.RethinkDBConnection` instance. - - See :meth:`.Connection.__init__` for - :attr:`host`, :attr:`port`, and :attr:`dbname`. - - Args: - max_tries (int, optional): how many tries before giving up. - Defaults to 3. - """ - - self.host = host - self.port = port - self.dbname = dbname - self.max_tries = max_tries - self.conn = None - def run(self, query): """Run a RethinkDB query. @@ -45,16 +24,7 @@ class RethinkDBConnection(Connection): :attr:`~.RethinkDBConnection.max_tries`. """ - if self.conn is None: - self._connect() - - for i in range(self.max_tries): - try: - return query.run(self.conn) - except r.ReqlDriverError: - if i + 1 == self.max_tries: - raise - self._connect() + return query.run(self.conn) def _connect(self): """Set a connection to RethinkDB. @@ -66,16 +36,7 @@ class RethinkDBConnection(Connection): :attr:`~.RethinkDBConnection.max_tries`. """ - for i in range(1, self.max_tries + 1): - logging.debug('Connecting to database %s:%s/%s. (Attempt %s/%s)', - self.host, self.port, self.dbname, i, self.max_tries) - try: - self.conn = r.connect(host=self.host, port=self.port, db=self.dbname) - except r.ReqlDriverError: - if i == self.max_tries: - raise - wait_time = 2**i - logging.debug('Error connecting to database, waiting %ss', wait_time) - time.sleep(wait_time) - else: - break + try: + return r.connect(host=self.host, port=self.port, db=self.dbname) + except r.ReqlDriverError as exc: + raise ConnectionError() from exc diff --git a/tests/backend/mongodb/test_admin.py b/tests/backend/mongodb/test_admin.py index 148c853a..075ea2f9 100644 --- a/tests/backend/mongodb/test_admin.py +++ b/tests/backend/mongodb/test_admin.py @@ -40,7 +40,7 @@ def connection(): # executed to make sure that the replica set is correctly initialized. # Here we force the the connection setup so that all required # `Database.command` are executed before we mock them it in the tests. - connection._connect() + connection.connect() return connection diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index d69b789a..727c3ba3 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -139,12 +139,17 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): 'host': DATABASE_HOST, 'port': DATABASE_PORT, 'name': DATABASE_NAME, + 'connection_timeout': 5000, + 'max_tries': 3 } + database_mongodb = { 'backend': 'mongodb', 'host': DATABASE_HOST, 'port': DATABASE_PORT, 'name': DATABASE_NAME, + 'connection_timeout': 5000, + 'max_tries': 3, 'replicaset': 'bigchain-rs', } diff --git a/tests/test_core.py b/tests/test_core.py index 6bcabdc9..f6f56ed1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -10,6 +10,8 @@ def config(request, monkeypatch): 'port': 28015, 'name': 'bigchain', 'replicaset': 'bigchain-rs', + 'connection_timeout': 5000, + 'max_tries': 3 }, 'keypair': { 'public': 'pubkey',