Added bigchaindb commands to add and remove nodes from replicaset

This commit is contained in:
Rodolphe Marques 2017-01-24 17:55:06 +01:00
parent 5683ed5163
commit 69505a366b
5 changed files with 98 additions and 7 deletions

View File

@ -23,10 +23,10 @@ def set_replicas(connection, *, replicas):
@singledispatch @singledispatch
def add_replicas(connection, *, replicas): def add_replicas(connection, replicas):
raise NotImplementedError raise NotImplementedError
@singledispatch @singledispatch
def remove_replicas(connection, *, replicas): def remove_replicas(connection, replicas):
raise NotImplementedError raise NotImplementedError

View File

@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module.
""" """
# Register the single dispatched modules on import. # Register the single dispatched modules on import.
from bigchaindb.backend.mongodb import schema, query, changefeed # noqa from bigchaindb.backend.mongodb import admin, schema, query, changefeed # noqa
# MongoDBConnection should always be accessed via # MongoDBConnection should always be accessed via
# ``bigchaindb.backend.connect()``. # ``bigchaindb.backend.connect()``.

View File

@ -1,8 +1,11 @@
"""Database configuration functions.""" """Database configuration functions."""
import logging import logging
from pymongo.errors import OperationFailure
from bigchaindb.backend import admin from bigchaindb.backend import admin
from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.backend.mongodb.connection import MongoDBConnection from bigchaindb.backend.mongodb.connection import MongoDBConnection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -35,7 +38,10 @@ def add_replicas(connection, replicas):
conf['config']['version'] += 1 conf['config']['version'] += 1
# apply new configuration # apply new configuration
return connection.conn.admin.command('replSetReconfig', conf['config']) try:
return connection.conn.admin.command('replSetReconfig', conf['config'])
except OperationFailure as exc:
raise DatabaseOpFailedError(exc.details['errmsg'])
@register_admin(MongoDBConnection) @register_admin(MongoDBConnection)
@ -56,4 +62,7 @@ def remove_replicas(connection, replicas):
conf['config']['version'] += 1 conf['config']['version'] += 1
# apply new configuration # apply new configuration
return connection.conn.admin.command('replSetReconfig', conf['config']) try:
return connection.conn.admin.command('replSetReconfig', conf['config'])
except OperationFailure as exc:
raise DatabaseOpFailedError(exc.details['errmsg'])

View File

@ -22,7 +22,8 @@ from bigchaindb.models import Transaction
from bigchaindb.utils import ProcessGroup from bigchaindb.utils import ProcessGroup
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend import schema from bigchaindb.backend import schema
from bigchaindb.backend.admin import set_replicas, set_shards from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas,
remove_replicas)
from bigchaindb.backend.exceptions import DatabaseOpFailedError from bigchaindb.backend.exceptions import DatabaseOpFailedError
from bigchaindb.commands import utils from bigchaindb.commands import utils
from bigchaindb import processes from bigchaindb import processes
@ -269,6 +270,32 @@ def run_set_replicas(args):
logger.warn(e) logger.warn(e)
def run_add_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
add_replicas(conn, args.replicas)
except DatabaseOpFailedError as e:
logger.warn(e)
else:
logger.info('Added {} to the replicaset.'.format(args.replicas))
def run_remove_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
remove_replicas(conn, args.replicas)
except DatabaseOpFailedError as e:
logger.warn(e)
else:
logger.info('Removed {} from the replicaset.'.format(args.replicas))
def create_parser(): def create_parser():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Control your BigchainDB node.', description='Control your BigchainDB node.',
@ -334,6 +361,32 @@ def create_parser():
type=int, default=1, type=int, default=1,
help='Number of replicas (i.e. the replication factor)') help='Number of replicas (i.e. the replication factor)')
# parser for adding nodes to the replica set
add_replicas_parser = subparsers.add_parser('add-replicas',
help='Add a set of nodes to the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
add_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'add to the replicaset. Each host '
'should be in the form `host:port`.')
# parser for removing nodes from the replica set
rm_replicas_parser = subparsers.add_parser('remove-replicas',
help='Remove a set of nodes from the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
rm_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'remove from the replicaset. Each host '
'should be in the form `host:port`.')
load_parser = subparsers.add_parser('load', load_parser = subparsers.add_parser('load',
help='Write transactions to the backlog') help='Write transactions to the backlog')

View File

@ -3,14 +3,15 @@ for ``argparse.ArgumentParser``.
""" """
import argparse import argparse
from bigchaindb.common.exceptions import StartupError
import multiprocessing as mp import multiprocessing as mp
import subprocess import subprocess
import rethinkdb as r import rethinkdb as r
from pymongo import uri_parser
import bigchaindb import bigchaindb
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.common.exceptions import StartupError
from bigchaindb.version import __version__ from bigchaindb.version import __version__
@ -95,6 +96,34 @@ def start(parser, argv, scope):
return func(args) return func(args)
def mongodb_host(host):
"""Utility function that works as a type for mongodb ``host`` args.
This function validates the ``host`` args provided by to the
``add-replicas`` and ``remove-replicas`` commands and checks if each arg
is in the form "host:port"
Args:
host (str): A string containing hostname and port (e.g. "host:port")
Raises:
ArgumentTypeError: if it fails to parse the argument
"""
# check if mongodb can parse the host
try:
hostname, port = uri_parser.parse_host(host, default_port=None)
except ValueError as exc:
raise argparse.ArgumentTypeError(exc.args[0])
# we do require the port to be provided.
if port is None:
raise argparse.ArgumentTypeError('expected host in the form '
'`host:port`. Got `{}` instead.'
.format(host))
return host
base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb') base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb')
base_parser.add_argument('-c', '--config', base_parser.add_argument('-c', '--config',