Problem: New collections are not created in existing DB.

Solution: Do not abort the initialisation if a collection exists. Unify the index creation.
This commit is contained in:
Lev Berman 2018-09-06 11:02:24 +02:00
parent b33e3808a6
commit 681aa59131
8 changed files with 62 additions and 224 deletions

View File

@ -7,9 +7,9 @@
import logging import logging
from pymongo import ASCENDING, DESCENDING, TEXT from pymongo import ASCENDING, DESCENDING, TEXT
from pymongo.errors import CollectionInvalid
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.common import exceptions
from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection
@ -18,12 +18,47 @@ logger = logging.getLogger(__name__)
register_schema = module_dispatch_registrar(backend.schema) register_schema = module_dispatch_registrar(backend.schema)
INDEXES = {
'transactions': [
('id', dict(unique=True, name='transaction_id')),
('asset.id', dict(name='asset_id')),
('outputs.public_keys', dict(name='outputs')),
([('inputs.fulfills.transaction_id', ASCENDING),
('inputs.fulfills.output_index', ASCENDING)], dict(name='inputs')),
],
'assets': [
('id', dict(name='asset_id', unique=True)),
([('$**', TEXT)], dict(name='text')),
],
'blocks': [
([('height', DESCENDING)], dict(name='height', unique=True)),
],
'metadata': [
('id', dict(name='transaction_id', unique=True)),
([('$**', TEXT)], dict(name='text')),
],
'utxos': [
([('transaction_id', ASCENDING),
('output_index', ASCENDING)], dict(name='utxo', unique=True)),
],
'pre_commit': [
('commit_id', dict(name='pre_commit_id', unique=True)),
],
'elections': [
('election_id', dict(name='election_id', unique=True)),
],
'validators': [
('height', dict(name='height', unique=True)),
],
'abci_chains': [
('height', dict(name='height', unique=True)),
('chain_id', dict(name='chain_id', unique=True)),
],
}
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def create_database(conn, dbname): def create_database(conn, dbname):
if dbname in conn.conn.database_names():
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'
.format(dbname))
logger.info('Create database `%s`.', dbname) logger.info('Create database `%s`.', dbname)
# TODO: read and write concerns can be declared here # TODO: read and write concerns can be declared here
conn.conn.get_database(dbname) conn.conn.get_database(dbname)
@ -32,128 +67,22 @@ def create_database(conn, dbname):
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def create_tables(conn, dbname): def create_tables(conn, dbname):
for table_name in backend.schema.TABLES: for table_name in backend.schema.TABLES:
logger.info('Create `%s` table.', table_name)
# create the table # create the table
# TODO: read and write concerns can be declared here # TODO: read and write concerns can be declared here
conn.conn[dbname].create_collection(table_name) try:
logger.info(f'Create `{table_name}` table.')
conn.conn[dbname].create_collection(table_name)
create_indexes(conn, dbname, table_name, INDEXES[table_name])
except CollectionInvalid:
logger.info(f'Collection {table_name} already exists.')
@register_schema(LocalMongoDBConnection) def create_indexes(conn, dbname, collection, indexes):
def create_indexes(conn, dbname): logger.info(f'Create secondary indexes for `{collection}`.')
create_transactions_secondary_index(conn, dbname) for fields, kwargs in indexes:
create_assets_secondary_index(conn, dbname) conn.conn[dbname][collection].create_index(fields, **kwargs)
create_blocks_secondary_index(conn, dbname)
create_metadata_secondary_index(conn, dbname)
create_utxos_secondary_index(conn, dbname)
create_pre_commit_secondary_index(conn, dbname)
create_validators_secondary_index(conn, dbname)
create_abci_chains_indexes(conn, dbname)
create_elections_secondary_index(conn, dbname)
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def drop_database(conn, dbname): def drop_database(conn, dbname):
conn.conn.drop_database(dbname) conn.conn.drop_database(dbname)
def create_transactions_secondary_index(conn, dbname):
logger.info('Create `transactions` secondary index.')
# to query the transactions for a transaction id, this field is unique
conn.conn[dbname]['transactions'].create_index('id',
unique=True,
name='transaction_id')
# secondary index for asset uuid, this field is unique
conn.conn[dbname]['transactions']\
.create_index('asset.id', name='asset_id')
# secondary index on the public keys of outputs
conn.conn[dbname]['transactions']\
.create_index('outputs.public_keys',
name='outputs')
# secondary index on inputs/transaction links (transaction_id, output)
conn.conn[dbname]['transactions']\
.create_index([
('inputs.fulfills.transaction_id', ASCENDING),
('inputs.fulfills.output_index', ASCENDING),
], name='inputs')
def create_assets_secondary_index(conn, dbname):
logger.info('Create `assets` secondary index.')
# unique index on the id of the asset.
# the id is the txid of the transaction that created the asset
conn.conn[dbname]['assets'].create_index('id',
name='asset_id',
unique=True)
# full text search index
conn.conn[dbname]['assets'].create_index([('$**', TEXT)], name='text')
def create_blocks_secondary_index(conn, dbname):
conn.conn[dbname]['blocks']\
.create_index([('height', DESCENDING)], name='height', unique=True)
def create_metadata_secondary_index(conn, dbname):
logger.info('Create `assets` secondary index.')
# the id is the txid of the transaction where metadata was defined
conn.conn[dbname]['metadata'].create_index('id',
name='transaction_id',
unique=True)
# full text search index
conn.conn[dbname]['metadata'].create_index([('$**', TEXT)], name='text')
def create_utxos_secondary_index(conn, dbname):
logger.info('Create `utxos` secondary index.')
conn.conn[dbname]['utxos'].create_index(
[('transaction_id', ASCENDING), ('output_index', ASCENDING)],
name='utxo',
unique=True,
)
def create_pre_commit_secondary_index(conn, dbname):
logger.info('Create `pre_commit` secondary index.')
conn.conn[dbname]['pre_commit'].create_index('commit_id',
name='pre_commit_id',
unique=True)
def create_validators_secondary_index(conn, dbname):
logger.info('Create `validators` secondary index.')
conn.conn[dbname]['validators'].create_index('height',
name='height',
unique=True,)
def create_abci_chains_indexes(conn, dbname):
logger.info('Create `abci_chains.height` secondary index.')
conn.conn[dbname]['abci_chains'].create_index('height',
name='height',
unique=True,)
logger.info('Create `abci_chains.chain_id` secondary index.')
conn.conn[dbname]['abci_chains'].create_index('chain_id',
name='chain_id',
unique=True)
def create_elections_secondary_index(conn, dbname):
logger.info('Create `elections` secondary index.')
conn.conn[dbname]['elections'].create_index('election_id',
name='election_id',
unique=True,)

View File

@ -31,10 +31,6 @@ def create_database(connection, dbname):
Args: Args:
dbname (str): the name of the database to create. dbname (str): the name of the database to create.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
""" """
raise NotImplementedError raise NotImplementedError
@ -51,17 +47,6 @@ def create_tables(connection, dbname):
raise NotImplementedError raise NotImplementedError
@singledispatch
def create_indexes(connection, dbname):
"""Create the indexes to be used by BigchainDB.
Args:
dbname (str): the name of the database to create indexes for.
"""
raise NotImplementedError
@singledispatch @singledispatch
def drop_database(connection, dbname): def drop_database(connection, dbname):
"""Drop the database used by BigchainDB. """Drop the database used by BigchainDB.
@ -90,10 +75,6 @@ def init_database(connection=None, dbname=None):
dbname (str): the name of the database to create. dbname (str): the name of the database to create.
Defaults to the database name given in the BigchainDB Defaults to the database name given in the BigchainDB
configuration. configuration.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
""" """
connection = connection or connect() connection = connection or connect()
@ -101,7 +82,6 @@ def init_database(connection=None, dbname=None):
create_database(connection, dbname) create_database(connection, dbname)
create_tables(connection, dbname) create_tables(connection, dbname)
create_indexes(connection, dbname)
def validate_language_key(obj, key): def validate_language_key(obj, key):

View File

@ -14,8 +14,7 @@ import json
import sys import sys
from bigchaindb.utils import load_node_key from bigchaindb.utils import load_node_key
from bigchaindb.common.exceptions import (DatabaseAlreadyExists, from bigchaindb.common.exceptions import (DatabaseDoesNotExist,
DatabaseDoesNotExist,
ValidationError) ValidationError)
from bigchaindb.elections.vote import Vote from bigchaindb.elections.vote import Vote
import bigchaindb import bigchaindb
@ -230,14 +229,7 @@ def _run_init():
@configure_bigchaindb @configure_bigchaindb
def run_init(args): def run_init(args):
"""Initialize the database""" """Initialize the database"""
# TODO Provide mechanism to: _run_init()
# 1. prompt the user to inquire whether they wish to drop the db
# 2. force the init, (e.g., via -f flag)
try:
_run_init()
except DatabaseAlreadyExists:
print('The database already exists.', file=sys.stderr)
print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
@configure_bigchaindb @configure_bigchaindb
@ -281,12 +273,9 @@ def run_start(args):
logger.info('BigchainDB Version %s', bigchaindb.__version__) logger.info('BigchainDB Version %s', bigchaindb.__version__)
run_recover(bigchaindb.lib.BigchainDB()) run_recover(bigchaindb.lib.BigchainDB())
try: if not args.skip_initialize_database:
if not args.skip_initialize_database: logger.info('Initializing database')
logger.info('Initializing database') _run_init()
_run_init()
except DatabaseAlreadyExists:
pass
logger.info('Starting BigchainDB main process.') logger.info('Starting BigchainDB main process.')
from bigchaindb.start import start from bigchaindb.start import start

View File

@ -11,10 +11,6 @@ class ConfigurationError(BigchainDBError):
"""Raised when there is a problem with server configuration""" """Raised when there is a problem with server configuration"""
class DatabaseAlreadyExists(BigchainDBError):
"""Raised when trying to create the database but the db is already there"""
class DatabaseDoesNotExist(BigchainDBError): class DatabaseDoesNotExist(BigchainDBError):
"""Raised when trying to delete the database but the db is not there""" """Raised when trying to delete the database but the db is not there"""

View File

@ -50,11 +50,10 @@ def test_init_creates_db_tables_and_indexes():
assert set(indexes) == {'_id_', 'election_id'} assert set(indexes) == {'_id_', 'election_id'}
def test_init_database_fails_if_db_exists(): def test_init_database_is_graceful_if_db_exists():
import bigchaindb import bigchaindb
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend.schema import init_database from bigchaindb.backend.schema import init_database
from bigchaindb.common import exceptions
conn = backend.connect() conn = backend.connect()
dbname = bigchaindb.config['database']['name'] dbname = bigchaindb.config['database']['name']
@ -62,8 +61,7 @@ def test_init_database_fails_if_db_exists():
# The db is set up by the fixtures # The db is set up by the fixtures
assert dbname in conn.conn.database_names() assert dbname in conn.conn.database_names()
with pytest.raises(exceptions.DatabaseAlreadyExists): init_database()
init_database()
def test_create_tables(): def test_create_tables():
@ -85,21 +83,6 @@ def test_create_tables():
'pre_commit', 'abci_chains', 'pre_commit', 'abci_chains',
} }
def test_create_secondary_indexes():
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend import schema
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by the fixtures so we need to remove it
conn.conn.drop_database(dbname)
schema.create_database(conn, dbname)
schema.create_tables(conn, dbname)
schema.create_indexes(conn, dbname)
indexes = conn.conn[dbname]['assets'].index_information().keys() indexes = conn.conn[dbname]['assets'].index_information().keys()
assert set(indexes) == {'_id_', 'asset_id', 'text'} assert set(indexes) == {'_id_', 'asset_id', 'text'}

View File

@ -8,7 +8,6 @@ from pytest import mark, raises
@mark.parametrize('schema_func_name,args_qty', ( @mark.parametrize('schema_func_name,args_qty', (
('create_database', 1), ('create_database', 1),
('create_tables', 1), ('create_tables', 1),
('create_indexes', 1),
('drop_database', 1), ('drop_database', 1),
)) ))
def test_schema(schema_func_name, args_qty): def test_schema(schema_func_name, args_qty):

View File

@ -75,25 +75,6 @@ def test_bigchain_show_config(capsys):
assert output_config == config assert output_config == config
def test_bigchain_run_init_when_db_exists(mocker, capsys):
from bigchaindb.commands.bigchaindb import run_init
from bigchaindb.common.exceptions import DatabaseAlreadyExists
init_db_mock = mocker.patch(
'bigchaindb.commands.bigchaindb.schema.init_database',
autospec=True,
spec_set=True,
)
init_db_mock.side_effect = DatabaseAlreadyExists
args = Namespace(config=None)
run_init(args)
output_message = capsys.readouterr()[1]
print(output_message)
assert output_message == (
'The database already exists.\n'
'If you wish to re-initialize it, first drop it.\n'
)
def test__run_init(mocker): def test__run_init(mocker):
from bigchaindb.commands.bigchaindb import _run_init from bigchaindb.commands.bigchaindb import _run_init
bigchain_mock = mocker.patch( bigchain_mock = mocker.patch(
@ -219,23 +200,6 @@ def test_run_configure_with_backend(backend, monkeypatch, mock_write_config):
assert value['return'] == expected_config assert value['return'] == expected_config
def test_run_start_when_db_already_exists(mocker,
monkeypatch,
run_start_args,
mocked_setup_logging):
from bigchaindb.commands.bigchaindb import run_start
from bigchaindb.common.exceptions import DatabaseAlreadyExists
mocked_start = mocker.patch('bigchaindb.start.start')
def mock_run_init():
raise DatabaseAlreadyExists()
monkeypatch.setattr('builtins.input', lambda: '\x03')
monkeypatch.setattr(
'bigchaindb.commands.bigchaindb._run_init', mock_run_init)
run_start(run_start_args)
assert mocked_start.called
@patch('bigchaindb.commands.utils.start') @patch('bigchaindb.commands.utils.start')
def test_calling_main(start_mock, monkeypatch): def test_calling_main(start_mock, monkeypatch):
from bigchaindb.commands.bigchaindb import main from bigchaindb.commands.bigchaindb import main

View File

@ -322,19 +322,17 @@ def inputs(user_pk, b, alice):
@pytest.fixture @pytest.fixture
def dummy_db(request): def dummy_db(request):
from bigchaindb.backend import connect, schema from bigchaindb.backend import connect, schema
from bigchaindb.common.exceptions import (DatabaseDoesNotExist, from bigchaindb.common.exceptions import DatabaseDoesNotExist
DatabaseAlreadyExists)
conn = connect() conn = connect()
dbname = request.fixturename dbname = request.fixturename
xdist_suffix = getattr(request.config, 'slaveinput', {}).get('slaveid') xdist_suffix = getattr(request.config, 'slaveinput', {}).get('slaveid')
if xdist_suffix: if xdist_suffix:
dbname = '{}_{}'.format(dbname, xdist_suffix) dbname = '{}_{}'.format(dbname, xdist_suffix)
try:
schema.init_database(conn, dbname) schema.init_database(conn, dbname)
except DatabaseAlreadyExists:
schema.drop_database(conn, dbname)
schema.init_database(conn, dbname)
yield dbname yield dbname
try: try:
schema.drop_database(conn, dbname) schema.drop_database(conn, dbname)
except DatabaseDoesNotExist: except DatabaseDoesNotExist: