Merge pull request #1088 from bigchaindb/feat/1055/commands-add-remove-replicas

BigchainDB commands to add and remove nodes from the replica set
This commit is contained in:
Rodolphe Marques 2017-02-02 14:00:49 +01:00 committed by GitHub
commit 90eb38e7f1
8 changed files with 366 additions and 4 deletions

View File

@ -20,3 +20,15 @@ def set_shards(connection, *, shards):
@singledispatch
def set_replicas(connection, *, replicas):
raise NotImplementedError
@singledispatch
def add_replicas(connection, replicas):
raise NotImplementedError('This command is specific to the '
'MongoDB backend.')
@singledispatch
def remove_replicas(connection, replicas):
raise NotImplementedError('This command is specific to the '
'MongoDB backend.')

View File

@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module.
"""
# Register the single dispatched modules on import.
from bigchaindb.backend.mongodb import schema, query, changefeed # noqa
from bigchaindb.backend.mongodb import admin, schema, query, changefeed # noqa
# MongoDBConnection should always be accessed via
# ``bigchaindb.backend.connect()``.

View File

@ -0,0 +1,86 @@
"""Database configuration functions."""
import logging
from pymongo.errors import OperationFailure
from bigchaindb.backend import admin
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.backend.mongodb.connection import MongoDBConnection
logger = logging.getLogger(__name__)
register_admin = module_dispatch_registrar(admin)
@register_admin(MongoDBConnection)
def add_replicas(connection, replicas):
"""Add a set of replicas to the replicaset
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
replicas (:obj:`list` of :obj:`str`): replica addresses in the
form "hostname:port".
Raises:
DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB
:exc:`OperationFailure`
"""
# get current configuration
conf = connection.conn.admin.command('replSetGetConfig')
# MongoDB does not automatically add an id for the members so we need
# to choose one that does not exist yet. The safest way is to use
# incrementing ids, so we first check what is the highest id already in
# the set and continue from there.
cur_id = max([member['_id'] for member in conf['config']['members']])
# add the nodes to the members list of the replica set
for replica in replicas:
cur_id += 1
conf['config']['members'].append({'_id': cur_id, 'host': replica})
# increase the configuration version number
# when reconfiguring, mongodb expects a version number higher than the one
# it currently has
conf['config']['version'] += 1
# apply new configuration
try:
connection.conn.admin.command('replSetReconfig', conf['config'])
except OperationFailure as exc:
raise DatabaseOpFailedError(exc.details['errmsg'])
@register_admin(MongoDBConnection)
def remove_replicas(connection, replicas):
"""Remove a set of replicas from the replicaset
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
replicas (:obj:`list` of :obj:`str`): replica addresses in the
form "hostname:port".
Raises:
DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB
:exc:`OperationFailure`
"""
# get the current configuration
conf = connection.conn.admin.command('replSetGetConfig')
# remove the nodes from the members list in the replica set
conf['config']['members'] = list(
filter(lambda member: member['host'] not in replicas,
conf['config']['members'])
)
# increase the configuration version number
conf['config']['version'] += 1
# apply new configuration
try:
connection.conn.admin.command('replSetReconfig', conf['config'])
except OperationFailure as exc:
raise DatabaseOpFailedError(exc.details['errmsg'])

View File

@ -22,7 +22,8 @@ from bigchaindb.models import Transaction
from bigchaindb.utils import ProcessGroup
from bigchaindb import backend
from bigchaindb.backend import schema
from bigchaindb.backend.admin import set_replicas, set_shards
from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas,
remove_replicas)
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.commands import utils
from bigchaindb import processes
@ -264,6 +265,32 @@ def run_set_replicas(args):
logger.warn(e)
def run_add_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
add_replicas(conn, args.replicas)
except (DatabaseOpFailedError, NotImplementedError) as e:
logger.warn(e)
else:
logger.info('Added {} to the replicaset.'.format(args.replicas))
def run_remove_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
remove_replicas(conn, args.replicas)
except (DatabaseOpFailedError, NotImplementedError) as e:
logger.warn(e)
else:
logger.info('Removed {} from the replicaset.'.format(args.replicas))
def create_parser():
parser = argparse.ArgumentParser(
description='Control your BigchainDB node.',
@ -329,6 +356,32 @@ def create_parser():
type=int, default=1,
help='Number of replicas (i.e. the replication factor)')
# parser for adding nodes to the replica set
add_replicas_parser = subparsers.add_parser('add-replicas',
help='Add a set of nodes to the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
add_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'add to the replicaset. Each host '
'should be in the form `host:port`.')
# parser for removing nodes from the replica set
rm_replicas_parser = subparsers.add_parser('remove-replicas',
help='Remove a set of nodes from the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
rm_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'remove from the replicaset. Each host '
'should be in the form `host:port`.')
load_parser = subparsers.add_parser('load',
help='Write transactions to the backlog')

View File

@ -3,14 +3,15 @@ for ``argparse.ArgumentParser``.
"""
import argparse
from bigchaindb.common.exceptions import StartupError
import multiprocessing as mp
import subprocess
import rethinkdb as r
from pymongo import uri_parser
import bigchaindb
from bigchaindb import backend
from bigchaindb.common.exceptions import StartupError
from bigchaindb.version import __version__
@ -95,6 +96,34 @@ def start(parser, argv, scope):
return func(args)
def mongodb_host(host):
"""Utility function that works as a type for mongodb ``host`` args.
This function validates the ``host`` args provided by to the
``add-replicas`` and ``remove-replicas`` commands and checks if each arg
is in the form "host:port"
Args:
host (str): A string containing hostname and port (e.g. "host:port")
Raises:
ArgumentTypeError: if it fails to parse the argument
"""
# check if mongodb can parse the host
try:
hostname, port = uri_parser.parse_host(host, default_port=None)
except ValueError as exc:
raise argparse.ArgumentTypeError(exc.args[0])
# we do require the port to be provided.
if port is None or hostname == '':
raise argparse.ArgumentTypeError('expected host in the form '
'`host:port`. Got `{}` instead.'
.format(host))
return host
base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb')
base_parser.add_argument('-c', '--config',

View File

@ -0,0 +1,108 @@
"""Tests for the :mod:`bigchaindb.backend.mongodb.admin` module."""
import copy
from unittest import mock
import pytest
from pymongo.database import Database
from pymongo.errors import OperationFailure
@pytest.fixture
def mock_replicaset_config():
return {
'config': {
'_id': 'bigchain-rs',
'members': [
{
'_id': 0,
'arbiterOnly': False,
'buildIndexes': True,
'hidden': False,
'host': 'localhost:27017',
'priority': 1.0,
'slaveDelay': 0,
'tags': {},
'votes': 1
}
],
'version': 1
}
}
@pytest.fixture
def connection():
from bigchaindb.backend import connect
connection = connect()
# connection is a lazy object. It only actually creates a connection to
# the database when its first used.
# During the setup of a MongoDBConnection some `Database.command` are
# executed to make sure that the replica set is correctly initialized.
# Here we force the the connection setup so that all required
# `Database.command` are executed before we mock them it in the tests.
connection._connect()
return connection
def test_add_replicas(mock_replicaset_config, connection):
from bigchaindb.backend.admin import add_replicas
expected_config = copy.deepcopy(mock_replicaset_config)
expected_config['config']['members'] += [
{'_id': 1, 'host': 'localhost:27018'},
{'_id': 2, 'host': 'localhost:27019'}
]
expected_config['config']['version'] += 1
with mock.patch.object(Database, 'command') as mock_command:
mock_command.return_value = mock_replicaset_config
add_replicas(connection, ['localhost:27018', 'localhost:27019'])
mock_command.assert_called_with('replSetReconfig',
expected_config['config'])
def test_add_replicas_raises(mock_replicaset_config, connection):
from bigchaindb.backend.admin import add_replicas
from bigchaindb.backend.exceptions import DatabaseOpFailedError
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock_replicaset_config,
OperationFailure(error=1, details={'errmsg': ''})
]
with pytest.raises(DatabaseOpFailedError):
add_replicas(connection, ['localhost:27018'])
def test_remove_replicas(mock_replicaset_config, connection):
from bigchaindb.backend.admin import remove_replicas
expected_config = copy.deepcopy(mock_replicaset_config)
expected_config['config']['version'] += 1
# add some hosts to the configuration to remove
mock_replicaset_config['config']['members'] += [
{'_id': 1, 'host': 'localhost:27018'},
{'_id': 2, 'host': 'localhost:27019'}
]
with mock.patch.object(Database, 'command') as mock_command:
mock_command.return_value = mock_replicaset_config
remove_replicas(connection, ['localhost:27018', 'localhost:27019'])
mock_command.assert_called_with('replSetReconfig',
expected_config['config'])
def test_remove_replicas_raises(mock_replicaset_config, connection):
from bigchaindb.backend.admin import remove_replicas
from bigchaindb.backend.exceptions import DatabaseOpFailedError
with mock.patch.object(Database, 'command') as mock_command:
mock_command.side_effect = [
mock_replicaset_config,
OperationFailure(error=1, details={'errmsg': ''})
]
with pytest.raises(DatabaseOpFailedError):
remove_replicas(connection, ['localhost:27018'])

View File

@ -71,6 +71,8 @@ def test_changefeed_class(changefeed_class_func_name, args_qty):
('reconfigure', {'table': None, 'shards': None, 'replicas': None}),
('set_shards', {'shards': None}),
('set_replicas', {'replicas': None}),
('add_replicas', {'replicas': None}),
('remove_replicas', {'replicas': None}),
))
def test_admin(admin_func_name, kwargs):
from bigchaindb.backend import admin

View File

@ -1,6 +1,6 @@
import json
from unittest.mock import Mock, patch
from argparse import Namespace
from argparse import Namespace, ArgumentTypeError
import copy
import pytest
@ -22,6 +22,8 @@ def test_make_sure_we_dont_remove_any_command():
assert parser.parse_args(['set-shards', '1']).command
assert parser.parse_args(['set-replicas', '1']).command
assert parser.parse_args(['load']).command
assert parser.parse_args(['add-replicas', 'localhost:27017']).command
assert parser.parse_args(['remove-replicas', 'localhost:27017']).command
def test_start_raises_if_command_not_implemented():
@ -376,3 +378,73 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock,
'distributed equally to all '
'the processes')
assert start_mock.called is True
@pytest.mark.usefixtures('ignore_local_config_file')
@patch('bigchaindb.commands.bigchain.add_replicas')
def test_run_add_replicas(mock_add_replicas):
from bigchaindb.commands.bigchain import run_add_replicas
from bigchaindb.backend.exceptions import DatabaseOpFailedError
args = Namespace(config=None, replicas=['localhost:27017'])
# test add_replicas no raises
mock_add_replicas.return_value = None
assert run_add_replicas(args) is None
assert mock_add_replicas.call_count == 1
mock_add_replicas.reset_mock()
# test add_replicas with `DatabaseOpFailedError`
mock_add_replicas.side_effect = DatabaseOpFailedError()
assert run_add_replicas(args) is None
assert mock_add_replicas.call_count == 1
mock_add_replicas.reset_mock()
# test add_replicas with `NotImplementedError`
mock_add_replicas.side_effect = NotImplementedError()
assert run_add_replicas(args) is None
assert mock_add_replicas.call_count == 1
mock_add_replicas.reset_mock()
@pytest.mark.usefixtures('ignore_local_config_file')
@patch('bigchaindb.commands.bigchain.remove_replicas')
def test_run_remove_replicas(mock_remove_replicas):
from bigchaindb.commands.bigchain import run_remove_replicas
from bigchaindb.backend.exceptions import DatabaseOpFailedError
args = Namespace(config=None, replicas=['localhost:27017'])
# test add_replicas no raises
mock_remove_replicas.return_value = None
assert run_remove_replicas(args) is None
assert mock_remove_replicas.call_count == 1
mock_remove_replicas.reset_mock()
# test add_replicas with `DatabaseOpFailedError`
mock_remove_replicas.side_effect = DatabaseOpFailedError()
assert run_remove_replicas(args) is None
assert mock_remove_replicas.call_count == 1
mock_remove_replicas.reset_mock()
# test add_replicas with `NotImplementedError`
mock_remove_replicas.side_effect = NotImplementedError()
assert run_remove_replicas(args) is None
assert mock_remove_replicas.call_count == 1
mock_remove_replicas.reset_mock()
def test_mongodb_host_type():
from bigchaindb.commands.utils import mongodb_host
# bad port provided
with pytest.raises(ArgumentTypeError):
mongodb_host('localhost:11111111111')
# no port information provided
with pytest.raises(ArgumentTypeError):
mongodb_host('localhost')
# bad host provided
with pytest.raises(ArgumentTypeError):
mongodb_host(':27017')