diff --git a/bigchaindb/backend/admin.py b/bigchaindb/backend/admin.py index 057b5995..f0ea62fd 100644 --- a/bigchaindb/backend/admin.py +++ b/bigchaindb/backend/admin.py @@ -20,3 +20,15 @@ def set_shards(connection, *, shards): @singledispatch def set_replicas(connection, *, replicas): raise NotImplementedError + + +@singledispatch +def add_replicas(connection, replicas): + raise NotImplementedError('This command is specific to the ' + 'MongoDB backend.') + + +@singledispatch +def remove_replicas(connection, replicas): + raise NotImplementedError('This command is specific to the ' + 'MongoDB backend.') diff --git a/bigchaindb/backend/mongodb/__init__.py b/bigchaindb/backend/mongodb/__init__.py index af5293ac..e3746fa3 100644 --- a/bigchaindb/backend/mongodb/__init__.py +++ b/bigchaindb/backend/mongodb/__init__.py @@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module. """ # Register the single dispatched modules on import. -from bigchaindb.backend.mongodb import schema, query, changefeed # noqa +from bigchaindb.backend.mongodb import admin, schema, query, changefeed # noqa # MongoDBConnection should always be accessed via # ``bigchaindb.backend.connect()``. diff --git a/bigchaindb/backend/mongodb/admin.py b/bigchaindb/backend/mongodb/admin.py new file mode 100644 index 00000000..7d72c3a4 --- /dev/null +++ b/bigchaindb/backend/mongodb/admin.py @@ -0,0 +1,86 @@ +"""Database configuration functions.""" +import logging + +from pymongo.errors import OperationFailure + +from bigchaindb.backend import admin +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.exceptions import DatabaseOpFailedError +from bigchaindb.backend.mongodb.connection import MongoDBConnection + +logger = logging.getLogger(__name__) + +register_admin = module_dispatch_registrar(admin) + + +@register_admin(MongoDBConnection) +def add_replicas(connection, replicas): + """Add a set of replicas to the replicaset + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + replicas (:obj:`list` of :obj:`str`): replica addresses in the + form "hostname:port". + + Raises: + DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB + :exc:`OperationFailure` + """ + # get current configuration + conf = connection.conn.admin.command('replSetGetConfig') + + # MongoDB does not automatically add an id for the members so we need + # to choose one that does not exist yet. The safest way is to use + # incrementing ids, so we first check what is the highest id already in + # the set and continue from there. + cur_id = max([member['_id'] for member in conf['config']['members']]) + + # add the nodes to the members list of the replica set + for replica in replicas: + cur_id += 1 + conf['config']['members'].append({'_id': cur_id, 'host': replica}) + + # increase the configuration version number + # when reconfiguring, mongodb expects a version number higher than the one + # it currently has + conf['config']['version'] += 1 + + # apply new configuration + try: + connection.conn.admin.command('replSetReconfig', conf['config']) + except OperationFailure as exc: + raise DatabaseOpFailedError(exc.details['errmsg']) + + +@register_admin(MongoDBConnection) +def remove_replicas(connection, replicas): + """Remove a set of replicas from the replicaset + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + replicas (:obj:`list` of :obj:`str`): replica addresses in the + form "hostname:port". + + Raises: + DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB + :exc:`OperationFailure` + """ + # get the current configuration + conf = connection.conn.admin.command('replSetGetConfig') + + # remove the nodes from the members list in the replica set + conf['config']['members'] = list( + filter(lambda member: member['host'] not in replicas, + conf['config']['members']) + ) + + # increase the configuration version number + conf['config']['version'] += 1 + + # apply new configuration + try: + connection.conn.admin.command('replSetReconfig', conf['config']) + except OperationFailure as exc: + raise DatabaseOpFailedError(exc.details['errmsg']) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 272f8107..4e8de28b 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -22,7 +22,8 @@ from bigchaindb.models import Transaction from bigchaindb.utils import ProcessGroup from bigchaindb import backend from bigchaindb.backend import schema -from bigchaindb.backend.admin import set_replicas, set_shards +from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas, + remove_replicas) from bigchaindb.backend.exceptions import DatabaseOpFailedError from bigchaindb.commands import utils from bigchaindb import processes @@ -264,6 +265,32 @@ def run_set_replicas(args): logger.warn(e) +def run_add_replicas(args): + # Note: This command is specific to MongoDB + bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) + conn = backend.connect() + + try: + add_replicas(conn, args.replicas) + except (DatabaseOpFailedError, NotImplementedError) as e: + logger.warn(e) + else: + logger.info('Added {} to the replicaset.'.format(args.replicas)) + + +def run_remove_replicas(args): + # Note: This command is specific to MongoDB + bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) + conn = backend.connect() + + try: + remove_replicas(conn, args.replicas) + except (DatabaseOpFailedError, NotImplementedError) as e: + logger.warn(e) + else: + logger.info('Removed {} from the replicaset.'.format(args.replicas)) + + def create_parser(): parser = argparse.ArgumentParser( description='Control your BigchainDB node.', @@ -329,6 +356,32 @@ def create_parser(): type=int, default=1, help='Number of replicas (i.e. the replication factor)') + # parser for adding nodes to the replica set + add_replicas_parser = subparsers.add_parser('add-replicas', + help='Add a set of nodes to the ' + 'replica set. This command ' + 'is specific to the MongoDB' + ' backend.') + + add_replicas_parser.add_argument('replicas', nargs='+', + type=utils.mongodb_host, + help='A list of space separated hosts to ' + 'add to the replicaset. Each host ' + 'should be in the form `host:port`.') + + # parser for removing nodes from the replica set + rm_replicas_parser = subparsers.add_parser('remove-replicas', + help='Remove a set of nodes from the ' + 'replica set. This command ' + 'is specific to the MongoDB' + ' backend.') + + rm_replicas_parser.add_argument('replicas', nargs='+', + type=utils.mongodb_host, + help='A list of space separated hosts to ' + 'remove from the replicaset. Each host ' + 'should be in the form `host:port`.') + load_parser = subparsers.add_parser('load', help='Write transactions to the backlog') diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index 510eb2f6..80ee7a6b 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -3,14 +3,15 @@ for ``argparse.ArgumentParser``. """ import argparse -from bigchaindb.common.exceptions import StartupError import multiprocessing as mp import subprocess import rethinkdb as r +from pymongo import uri_parser import bigchaindb from bigchaindb import backend +from bigchaindb.common.exceptions import StartupError from bigchaindb.version import __version__ @@ -95,6 +96,34 @@ def start(parser, argv, scope): return func(args) +def mongodb_host(host): + """Utility function that works as a type for mongodb ``host`` args. + + This function validates the ``host`` args provided by to the + ``add-replicas`` and ``remove-replicas`` commands and checks if each arg + is in the form "host:port" + + Args: + host (str): A string containing hostname and port (e.g. "host:port") + + Raises: + ArgumentTypeError: if it fails to parse the argument + """ + # check if mongodb can parse the host + try: + hostname, port = uri_parser.parse_host(host, default_port=None) + except ValueError as exc: + raise argparse.ArgumentTypeError(exc.args[0]) + + # we do require the port to be provided. + if port is None or hostname == '': + raise argparse.ArgumentTypeError('expected host in the form ' + '`host:port`. Got `{}` instead.' + .format(host)) + + return host + + base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb') base_parser.add_argument('-c', '--config', diff --git a/tests/backend/mongodb/test_admin.py b/tests/backend/mongodb/test_admin.py new file mode 100644 index 00000000..a7784369 --- /dev/null +++ b/tests/backend/mongodb/test_admin.py @@ -0,0 +1,108 @@ +"""Tests for the :mod:`bigchaindb.backend.mongodb.admin` module.""" +import copy +from unittest import mock + +import pytest +from pymongo.database import Database +from pymongo.errors import OperationFailure + + +@pytest.fixture +def mock_replicaset_config(): + return { + 'config': { + '_id': 'bigchain-rs', + 'members': [ + { + '_id': 0, + 'arbiterOnly': False, + 'buildIndexes': True, + 'hidden': False, + 'host': 'localhost:27017', + 'priority': 1.0, + 'slaveDelay': 0, + 'tags': {}, + 'votes': 1 + } + ], + 'version': 1 + } + } + + +@pytest.fixture +def connection(): + from bigchaindb.backend import connect + connection = connect() + # connection is a lazy object. It only actually creates a connection to + # the database when its first used. + # During the setup of a MongoDBConnection some `Database.command` are + # executed to make sure that the replica set is correctly initialized. + # Here we force the the connection setup so that all required + # `Database.command` are executed before we mock them it in the tests. + connection._connect() + return connection + + +def test_add_replicas(mock_replicaset_config, connection): + from bigchaindb.backend.admin import add_replicas + + expected_config = copy.deepcopy(mock_replicaset_config) + expected_config['config']['members'] += [ + {'_id': 1, 'host': 'localhost:27018'}, + {'_id': 2, 'host': 'localhost:27019'} + ] + expected_config['config']['version'] += 1 + + with mock.patch.object(Database, 'command') as mock_command: + mock_command.return_value = mock_replicaset_config + add_replicas(connection, ['localhost:27018', 'localhost:27019']) + + mock_command.assert_called_with('replSetReconfig', + expected_config['config']) + + +def test_add_replicas_raises(mock_replicaset_config, connection): + from bigchaindb.backend.admin import add_replicas + from bigchaindb.backend.exceptions import DatabaseOpFailedError + + with mock.patch.object(Database, 'command') as mock_command: + mock_command.side_effect = [ + mock_replicaset_config, + OperationFailure(error=1, details={'errmsg': ''}) + ] + with pytest.raises(DatabaseOpFailedError): + add_replicas(connection, ['localhost:27018']) + + +def test_remove_replicas(mock_replicaset_config, connection): + from bigchaindb.backend.admin import remove_replicas + + expected_config = copy.deepcopy(mock_replicaset_config) + expected_config['config']['version'] += 1 + + # add some hosts to the configuration to remove + mock_replicaset_config['config']['members'] += [ + {'_id': 1, 'host': 'localhost:27018'}, + {'_id': 2, 'host': 'localhost:27019'} + ] + + with mock.patch.object(Database, 'command') as mock_command: + mock_command.return_value = mock_replicaset_config + remove_replicas(connection, ['localhost:27018', 'localhost:27019']) + + mock_command.assert_called_with('replSetReconfig', + expected_config['config']) + + +def test_remove_replicas_raises(mock_replicaset_config, connection): + from bigchaindb.backend.admin import remove_replicas + from bigchaindb.backend.exceptions import DatabaseOpFailedError + + with mock.patch.object(Database, 'command') as mock_command: + mock_command.side_effect = [ + mock_replicaset_config, + OperationFailure(error=1, details={'errmsg': ''}) + ] + with pytest.raises(DatabaseOpFailedError): + remove_replicas(connection, ['localhost:27018']) diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index daf50c10..2f86d417 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -71,6 +71,8 @@ def test_changefeed_class(changefeed_class_func_name, args_qty): ('reconfigure', {'table': None, 'shards': None, 'replicas': None}), ('set_shards', {'shards': None}), ('set_replicas', {'replicas': None}), + ('add_replicas', {'replicas': None}), + ('remove_replicas', {'replicas': None}), )) def test_admin(admin_func_name, kwargs): from bigchaindb.backend import admin diff --git a/tests/commands/test_commands.py b/tests/commands/test_commands.py index 1a1291e3..95bb0db7 100644 --- a/tests/commands/test_commands.py +++ b/tests/commands/test_commands.py @@ -1,6 +1,6 @@ import json from unittest.mock import Mock, patch -from argparse import Namespace +from argparse import Namespace, ArgumentTypeError import copy import pytest @@ -22,6 +22,8 @@ def test_make_sure_we_dont_remove_any_command(): assert parser.parse_args(['set-shards', '1']).command assert parser.parse_args(['set-replicas', '1']).command assert parser.parse_args(['load']).command + assert parser.parse_args(['add-replicas', 'localhost:27017']).command + assert parser.parse_args(['remove-replicas', 'localhost:27017']).command def test_start_raises_if_command_not_implemented(): @@ -376,3 +378,73 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock, 'distributed equally to all ' 'the processes') assert start_mock.called is True + + +@pytest.mark.usefixtures('ignore_local_config_file') +@patch('bigchaindb.commands.bigchain.add_replicas') +def test_run_add_replicas(mock_add_replicas): + from bigchaindb.commands.bigchain import run_add_replicas + from bigchaindb.backend.exceptions import DatabaseOpFailedError + + args = Namespace(config=None, replicas=['localhost:27017']) + + # test add_replicas no raises + mock_add_replicas.return_value = None + assert run_add_replicas(args) is None + assert mock_add_replicas.call_count == 1 + mock_add_replicas.reset_mock() + + # test add_replicas with `DatabaseOpFailedError` + mock_add_replicas.side_effect = DatabaseOpFailedError() + assert run_add_replicas(args) is None + assert mock_add_replicas.call_count == 1 + mock_add_replicas.reset_mock() + + # test add_replicas with `NotImplementedError` + mock_add_replicas.side_effect = NotImplementedError() + assert run_add_replicas(args) is None + assert mock_add_replicas.call_count == 1 + mock_add_replicas.reset_mock() + + +@pytest.mark.usefixtures('ignore_local_config_file') +@patch('bigchaindb.commands.bigchain.remove_replicas') +def test_run_remove_replicas(mock_remove_replicas): + from bigchaindb.commands.bigchain import run_remove_replicas + from bigchaindb.backend.exceptions import DatabaseOpFailedError + + args = Namespace(config=None, replicas=['localhost:27017']) + + # test add_replicas no raises + mock_remove_replicas.return_value = None + assert run_remove_replicas(args) is None + assert mock_remove_replicas.call_count == 1 + mock_remove_replicas.reset_mock() + + # test add_replicas with `DatabaseOpFailedError` + mock_remove_replicas.side_effect = DatabaseOpFailedError() + assert run_remove_replicas(args) is None + assert mock_remove_replicas.call_count == 1 + mock_remove_replicas.reset_mock() + + # test add_replicas with `NotImplementedError` + mock_remove_replicas.side_effect = NotImplementedError() + assert run_remove_replicas(args) is None + assert mock_remove_replicas.call_count == 1 + mock_remove_replicas.reset_mock() + + +def test_mongodb_host_type(): + from bigchaindb.commands.utils import mongodb_host + + # bad port provided + with pytest.raises(ArgumentTypeError): + mongodb_host('localhost:11111111111') + + # no port information provided + with pytest.raises(ArgumentTypeError): + mongodb_host('localhost') + + # bad host provided + with pytest.raises(ArgumentTypeError): + mongodb_host(':27017')