Major improvs for MongoDBConnection class

This commit is contained in:
vrde 2017-01-31 01:27:55 +01:00
parent d8ba1f8f67
commit a8bbc87c1c
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
2 changed files with 55 additions and 22 deletions

View File

@ -1,5 +1,6 @@
import time import time
import logging import logging
from itertools import repeat
from pymongo import MongoClient from pymongo import MongoClient
from pymongo import errors from pymongo import errors
@ -12,9 +13,17 @@ from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# TODO: waiting for #1082 to be merged
# to move this constants in the configuration.
CONNECTION_TIMEOUT = 4000 # in milliseconds
MAX_RETRIES = 3 # number of tries before giving up, if 0 then try forever
class MongoDBConnection(Connection): class MongoDBConnection(Connection):
def __init__(self, host=None, port=None, dbname=None, max_tries=3, def __init__(self, host=None, port=None, dbname=None,
connection_timeout=None, max_tries=None,
replicaset=None): replicaset=None):
"""Create a new Connection instance. """Create a new Connection instance.
@ -22,7 +31,10 @@ class MongoDBConnection(Connection):
host (str, optional): the host to connect to. host (str, optional): the host to connect to.
port (int, optional): the port to connect to. port (int, optional): the port to connect to.
dbname (str, optional): the database to use. dbname (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up. connection_timeout (int, optional): the milliseconds to wait
until timing out the database connection attempt.
max_tries (int, optional): how many tries before giving up,
if 0 then try forever.
replicaset (str, optional): the name of the replica set to replicaset (str, optional): the name of the replica set to
connect to. connect to.
""" """
@ -31,13 +43,15 @@ class MongoDBConnection(Connection):
self.port = port or bigchaindb.config['database']['port'] self.port = port or bigchaindb.config['database']['port']
self.replicaset = replicaset or bigchaindb.config['database']['replicaset'] self.replicaset = replicaset or bigchaindb.config['database']['replicaset']
self.dbname = dbname or bigchaindb.config['database']['name'] self.dbname = dbname or bigchaindb.config['database']['name']
self.max_tries = max_tries self.connection_timeout = connection_timeout if connection_timeout is not None else CONNECTION_TIMEOUT
self.max_tries = max_tries if max_tries is not None else MAX_RETRIES
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
self.connection = None self.connection = None
@property @property
def conn(self): def conn(self):
if self.connection is None: if self.connection is None:
self._connect() self.connection = self._connect()
return self.connection return self.connection
@property @property
@ -45,23 +59,41 @@ class MongoDBConnection(Connection):
return self.conn[self.dbname] return self.conn[self.dbname]
def run(self, query): def run(self, query):
return query.run(self.db) attempt = 0
for i in self.max_tries_counter:
attempt += 1
try:
return query.run(self.conn[self.dbname])
except errors.AutoReconnect:
if attempt == self.max_tries:
raise
self._connect()
def _connect(self): def _connect(self):
# we should only return a connection if the replica set is attempt = 0
# initialized. initialize_replica_set will check if the for i in self.max_tries_counter:
# replica set is initialized else it will initialize it. attempt += 1
initialize_replica_set()
for i in range(self.max_tries):
try: try:
self.connection = MongoClient(self.host, self.port, # FYI: this might raise a `ServerSelectionTimeoutError`,
replicaset=self.replicaset) # that is a subclass of `ConnectionFailure`.
except errors.ConnectionFailure: connection = MongoClient(self.host,
if i + 1 == self.max_tries: self.port,
raise replicaset=self.replicaset,
else: serverselectiontimeoutms=self.connection_timeout)
time.sleep(2**i)
# we should only return a connection if the replica set is
# initialized. initialize_replica_set will check if the
# replica set is initialized else it will initialize it.
initialize_replica_set(self.host, self.port, self.connection_timeout)
return connection
except (errors.ConnectionFailure, errors.AutoReconnect) as exc:
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
attempt, self.max_tries if self.max_tries != 0 else '',
self.host, self.port, self.connection_timeout)
if attempt == self.max_tries:
logger.exception('Cannot connect to the Database. Giving up.')
raise errors.ConnectionFailure() from exc
def collection(name): def collection(name):
@ -73,15 +105,16 @@ def collection(name):
return Lazy()[name] return Lazy()[name]
def initialize_replica_set(): def initialize_replica_set(host, port, connection_timeout):
"""Initialize a replica set. If already initialized skip.""" """Initialize a replica set. If already initialized skip."""
# Setup a MongoDB connection # Setup a MongoDB connection
# The reason we do this instead of `backend.connect` is that # The reason we do this instead of `backend.connect` is that
# `backend.connect` will connect you to a replica set but this fails if # `backend.connect` will connect you to a replica set but this fails if
# you try to connect to a replica set that is not yet initialized # you try to connect to a replica set that is not yet initialized
conn = MongoClient(host=bigchaindb.config['database']['host'], conn = MongoClient(host=host,
port=bigchaindb.config['database']['port']) port=port,
serverselectiontimeoutms=connection_timeout)
_check_replica_set(conn) _check_replica_set(conn)
host = '{}:{}'.format(bigchaindb.config['database']['host'], host = '{}:{}'.format(bigchaindb.config['database']['host'],
bigchaindb.config['database']['port']) bigchaindb.config['database']['port'])

View File

@ -138,7 +138,7 @@ def test_initialize_replica_set(mock_cmd_line_opts):
] ]
# check that it returns # check that it returns
assert initialize_replica_set() is None assert initialize_replica_set('host', 1337, 1000) is None
# test it raises OperationError if anything wrong # test it raises OperationError if anything wrong
with mock.patch.object(Database, 'command') as mock_command: with mock.patch.object(Database, 'command') as mock_command:
@ -148,4 +148,4 @@ def test_initialize_replica_set(mock_cmd_line_opts):
] ]
with pytest.raises(OperationFailure): with pytest.raises(OperationFailure):
initialize_replica_set() initialize_replica_set('host', 1337, 1000)