mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
BigchainDB does not neet to initialize or check for replicaset
Signed-off-by: Shahbaz Nazir <shahbaz@bigchaindb.com>
This commit is contained in:
parent
3f7b521809
commit
630895449e
@ -88,23 +88,6 @@ class LocalMongoDBConnection(Connection):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if self.replicaset:
|
|
||||||
# we should only return a connection if the replica set is
|
|
||||||
# initialized. initialize_replica_set will check if the
|
|
||||||
# replica set is initialized else it will initialize it.
|
|
||||||
initialize_replica_set(self.host,
|
|
||||||
self.port,
|
|
||||||
self.connection_timeout,
|
|
||||||
self.dbname,
|
|
||||||
self.ssl,
|
|
||||||
self.login,
|
|
||||||
self.password,
|
|
||||||
self.ca_cert,
|
|
||||||
self.certfile,
|
|
||||||
self.keyfile,
|
|
||||||
self.keyfile_passphrase,
|
|
||||||
self.crlfile)
|
|
||||||
|
|
||||||
# FYI: the connection process might raise a
|
# FYI: the connection process might raise a
|
||||||
# `ServerSelectionTimeoutError`, that is a subclass of
|
# `ServerSelectionTimeoutError`, that is a subclass of
|
||||||
# `ConnectionFailure`.
|
# `ConnectionFailure`.
|
||||||
@ -140,8 +123,6 @@ class LocalMongoDBConnection(Connection):
|
|||||||
|
|
||||||
return client
|
return client
|
||||||
|
|
||||||
# `initialize_replica_set` might raise `ConnectionFailure`,
|
|
||||||
# `OperationFailure` or `ConfigurationError`.
|
|
||||||
except (pymongo.errors.ConnectionFailure,
|
except (pymongo.errors.ConnectionFailure,
|
||||||
pymongo.errors.OperationFailure) as exc:
|
pymongo.errors.OperationFailure) as exc:
|
||||||
logger.info('Exception in _connect(): {}'.format(exc))
|
logger.info('Exception in _connect(): {}'.format(exc))
|
||||||
@ -153,120 +134,3 @@ class LocalMongoDBConnection(Connection):
|
|||||||
MONGO_OPTS = {
|
MONGO_OPTS = {
|
||||||
'socketTimeoutMS': 20000,
|
'socketTimeoutMS': 20000,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def initialize_replica_set(host, port, connection_timeout, dbname, ssl, login,
|
|
||||||
password, ca_cert, certfile, keyfile,
|
|
||||||
keyfile_passphrase, crlfile):
|
|
||||||
"""Initialize a replica set. If already initialized skip."""
|
|
||||||
|
|
||||||
# Setup a MongoDB connection
|
|
||||||
# The reason we do this instead of `backend.connect` is that
|
|
||||||
# `backend.connect` will connect you to a replica set but this fails if
|
|
||||||
# you try to connect to a replica set that is not yet initialized
|
|
||||||
try:
|
|
||||||
# The presence of ca_cert, certfile, keyfile, crlfile implies the
|
|
||||||
# use of certificates for TLS connectivity.
|
|
||||||
if ca_cert is None or certfile is None or keyfile is None or \
|
|
||||||
crlfile is None:
|
|
||||||
conn = pymongo.MongoClient(host,
|
|
||||||
port,
|
|
||||||
serverselectiontimeoutms=connection_timeout,
|
|
||||||
ssl=ssl,
|
|
||||||
**MONGO_OPTS)
|
|
||||||
if login is not None and password is not None:
|
|
||||||
conn[dbname].authenticate(login, password)
|
|
||||||
else:
|
|
||||||
logger.info('Connecting to MongoDB over TLS/SSL...')
|
|
||||||
conn = pymongo.MongoClient(host,
|
|
||||||
port,
|
|
||||||
serverselectiontimeoutms=connection_timeout,
|
|
||||||
ssl=ssl,
|
|
||||||
ssl_ca_certs=ca_cert,
|
|
||||||
ssl_certfile=certfile,
|
|
||||||
ssl_keyfile=keyfile,
|
|
||||||
ssl_pem_passphrase=keyfile_passphrase,
|
|
||||||
ssl_crlfile=crlfile,
|
|
||||||
ssl_cert_reqs=CERT_REQUIRED,
|
|
||||||
**MONGO_OPTS)
|
|
||||||
if login is not None:
|
|
||||||
logger.info('Authenticating to the database...')
|
|
||||||
conn[dbname].authenticate(login, mechanism='MONGODB-X509')
|
|
||||||
|
|
||||||
except (pymongo.errors.ConnectionFailure,
|
|
||||||
pymongo.errors.OperationFailure) as exc:
|
|
||||||
logger.info('Exception in _connect(): {}'.format(exc))
|
|
||||||
raise ConnectionError(str(exc)) from exc
|
|
||||||
except pymongo.errors.ConfigurationError as exc:
|
|
||||||
raise ConfigurationError from exc
|
|
||||||
|
|
||||||
_check_replica_set(conn)
|
|
||||||
host = '{}:{}'.format(bigchaindb.config['database']['host'],
|
|
||||||
bigchaindb.config['database']['port'])
|
|
||||||
config = {'_id': bigchaindb.config['database']['replicaset'],
|
|
||||||
'members': [{'_id': 0, 'host': host}]}
|
|
||||||
|
|
||||||
try:
|
|
||||||
conn.admin.command('replSetInitiate', config)
|
|
||||||
except pymongo.errors.OperationFailure as exc_info:
|
|
||||||
if exc_info.details['codeName'] == 'AlreadyInitialized':
|
|
||||||
return
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
_wait_for_replica_set_initialization(conn)
|
|
||||||
logger.info('Initialized replica set')
|
|
||||||
finally:
|
|
||||||
if conn is not None:
|
|
||||||
logger.info('Closing initial connection to MongoDB')
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def _check_replica_set(conn):
|
|
||||||
"""Checks if the replSet option was enabled either through the command
|
|
||||||
line option or config file and if it matches the one provided by
|
|
||||||
bigchaindb configuration.
|
|
||||||
|
|
||||||
Note:
|
|
||||||
The setting we are looking for will have a different name depending
|
|
||||||
if it was set by the config file (`replSetName`) or by command
|
|
||||||
line arguments (`replSet`).
|
|
||||||
|
|
||||||
Raise:
|
|
||||||
:exc:`~ConfigurationError`: If mongod was not started with the
|
|
||||||
replSet option.
|
|
||||||
"""
|
|
||||||
options = conn.admin.command('getCmdLineOpts')
|
|
||||||
try:
|
|
||||||
repl_opts = options['parsed']['replication']
|
|
||||||
repl_set_name = repl_opts.get('replSetName', repl_opts.get('replSet'))
|
|
||||||
except KeyError:
|
|
||||||
raise ConfigurationError('mongod was not started with'
|
|
||||||
' the replSet option.')
|
|
||||||
|
|
||||||
bdb_repl_set_name = bigchaindb.config['database']['replicaset']
|
|
||||||
if repl_set_name != bdb_repl_set_name:
|
|
||||||
raise ConfigurationError('The replicaset configuration of '
|
|
||||||
'bigchaindb (`{}`) needs to match '
|
|
||||||
'the replica set name from MongoDB'
|
|
||||||
' (`{}`)'.format(bdb_repl_set_name,
|
|
||||||
repl_set_name))
|
|
||||||
|
|
||||||
|
|
||||||
def _wait_for_replica_set_initialization(conn):
|
|
||||||
"""Wait for a replica set to finish initialization.
|
|
||||||
|
|
||||||
If a replica set is being initialized for the first time it takes some
|
|
||||||
time. Nodes need to discover each other and an election needs to take
|
|
||||||
place. During this time the database is not writable so we need to wait
|
|
||||||
before continuing with the rest of the initialization
|
|
||||||
"""
|
|
||||||
|
|
||||||
# I did not find a better way to do this for now.
|
|
||||||
# To check if the database is ready we will poll the mongodb logs until
|
|
||||||
# we find the line that says the database is ready
|
|
||||||
logger.info('Waiting for mongodb replica set initialization')
|
|
||||||
while True:
|
|
||||||
logs = conn.admin.command('getLog', 'rs')['log']
|
|
||||||
if any('database writes are now permitted' in line for line in logs):
|
|
||||||
return
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|||||||
@ -109,87 +109,3 @@ def test_connection_with_credentials(mock_authenticate):
|
|||||||
password='secret')
|
password='secret')
|
||||||
conn.connect()
|
conn.connect()
|
||||||
assert mock_authenticate.call_count == 1
|
assert mock_authenticate.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
def test_check_replica_set_not_enabled(mongodb_connection):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import _check_replica_set
|
|
||||||
from bigchaindb.common.exceptions import ConfigurationError
|
|
||||||
|
|
||||||
# no replSet option set
|
|
||||||
cmd_line_opts = {'argv': ['mongod', '--dbpath=/data'],
|
|
||||||
'ok': 1.0,
|
|
||||||
'parsed': {'storage': {'dbPath': '/data'}}}
|
|
||||||
with mock.patch.object(Database, 'command', return_value=cmd_line_opts):
|
|
||||||
with pytest.raises(ConfigurationError):
|
|
||||||
_check_replica_set(mongodb_connection)
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_replica_set_command_line(mongodb_connection,
|
|
||||||
mock_cmd_line_opts):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import _check_replica_set
|
|
||||||
|
|
||||||
# replSet option set through the command line
|
|
||||||
with mock.patch.object(Database, 'command',
|
|
||||||
return_value=mock_cmd_line_opts):
|
|
||||||
assert _check_replica_set(mongodb_connection) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_replica_set_config_file(mongodb_connection, mock_config_opts):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import _check_replica_set
|
|
||||||
|
|
||||||
# replSet option set through the config file
|
|
||||||
with mock.patch.object(Database, 'command', return_value=mock_config_opts):
|
|
||||||
assert _check_replica_set(mongodb_connection) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_replica_set_name_mismatch(mongodb_connection,
|
|
||||||
mock_cmd_line_opts):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import _check_replica_set
|
|
||||||
from bigchaindb.common.exceptions import ConfigurationError
|
|
||||||
|
|
||||||
# change the replica set name so it does not match the bigchaindb config
|
|
||||||
mock_cmd_line_opts['parsed']['replication']['replSet'] = 'rs0'
|
|
||||||
|
|
||||||
with mock.patch.object(Database, 'command',
|
|
||||||
return_value=mock_cmd_line_opts):
|
|
||||||
with pytest.raises(ConfigurationError):
|
|
||||||
_check_replica_set(mongodb_connection)
|
|
||||||
|
|
||||||
|
|
||||||
def test_wait_for_replica_set_initialization(mongodb_connection):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import _wait_for_replica_set_initialization # noqa
|
|
||||||
|
|
||||||
with mock.patch.object(Database, 'command') as mock_command:
|
|
||||||
mock_command.side_effect = [
|
|
||||||
{'log': ['a line']},
|
|
||||||
{'log': ['database writes are now permitted']},
|
|
||||||
]
|
|
||||||
|
|
||||||
# check that it returns
|
|
||||||
assert _wait_for_replica_set_initialization(mongodb_connection) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_initialize_replica_set(mock_cmd_line_opts):
|
|
||||||
from bigchaindb.backend.localmongodb.connection import initialize_replica_set
|
|
||||||
|
|
||||||
with mock.patch.object(Database, 'command') as mock_command:
|
|
||||||
mock_command.side_effect = [
|
|
||||||
mock_cmd_line_opts,
|
|
||||||
None,
|
|
||||||
{'log': ['database writes are now permitted']},
|
|
||||||
]
|
|
||||||
|
|
||||||
# check that it returns
|
|
||||||
assert initialize_replica_set('host', 1337, 1000, 'dbname', False, None, None,
|
|
||||||
None, None, None, None, None) is None
|
|
||||||
|
|
||||||
# test it raises OperationError if anything wrong
|
|
||||||
with mock.patch.object(Database, 'command') as mock_command:
|
|
||||||
mock_command.side_effect = [
|
|
||||||
mock_cmd_line_opts,
|
|
||||||
pymongo.errors.OperationFailure(None, details={'codeName': ''})
|
|
||||||
]
|
|
||||||
|
|
||||||
with pytest.raises(pymongo.errors.OperationFailure):
|
|
||||||
initialize_replica_set('host', 1337, 1000, 'dbname', False, None,
|
|
||||||
None, None, None, None, None, None) is None
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user