Problem: New collections are not created in existing DB. (#2520)

Solution: Do not abort the initialisation if a collection exists. Unify the index creation.
This commit is contained in:
Lev Berman 2018-09-12 14:39:07 +02:00 committed by vrde
parent 8a7650c13a
commit 35e35ecd57
9 changed files with 82 additions and 243 deletions

View File

@ -7,9 +7,9 @@
import logging import logging
from pymongo import ASCENDING, DESCENDING, TEXT from pymongo import ASCENDING, DESCENDING, TEXT
from pymongo.errors import CollectionInvalid
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.common import exceptions
from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection
@ -18,12 +18,47 @@ logger = logging.getLogger(__name__)
register_schema = module_dispatch_registrar(backend.schema) register_schema = module_dispatch_registrar(backend.schema)
INDEXES = {
'transactions': [
('id', dict(unique=True, name='transaction_id')),
('asset.id', dict(name='asset_id')),
('outputs.public_keys', dict(name='outputs')),
([('inputs.fulfills.transaction_id', ASCENDING),
('inputs.fulfills.output_index', ASCENDING)], dict(name='inputs')),
],
'assets': [
('id', dict(name='asset_id', unique=True)),
([('$**', TEXT)], dict(name='text')),
],
'blocks': [
([('height', DESCENDING)], dict(name='height', unique=True)),
],
'metadata': [
('id', dict(name='transaction_id', unique=True)),
([('$**', TEXT)], dict(name='text')),
],
'utxos': [
([('transaction_id', ASCENDING),
('output_index', ASCENDING)], dict(name='utxo', unique=True)),
],
'pre_commit': [
('commit_id', dict(name='pre_commit_id', unique=True)),
],
'elections': [
('election_id', dict(name='election_id', unique=True)),
],
'validators': [
('height', dict(name='height', unique=True)),
],
'abci_chains': [
('height', dict(name='height', unique=True)),
('chain_id', dict(name='chain_id', unique=True)),
],
}
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def create_database(conn, dbname): def create_database(conn, dbname):
if dbname in conn.conn.database_names():
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'
.format(dbname))
logger.info('Create database `%s`.', dbname) logger.info('Create database `%s`.', dbname)
# TODO: read and write concerns can be declared here # TODO: read and write concerns can be declared here
conn.conn.get_database(dbname) conn.conn.get_database(dbname)
@ -32,128 +67,22 @@ def create_database(conn, dbname):
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def create_tables(conn, dbname): def create_tables(conn, dbname):
for table_name in backend.schema.TABLES: for table_name in backend.schema.TABLES:
logger.info('Create `%s` table.', table_name)
# create the table # create the table
# TODO: read and write concerns can be declared here # TODO: read and write concerns can be declared here
try:
logger.info(f'Create `{table_name}` table.')
conn.conn[dbname].create_collection(table_name) conn.conn[dbname].create_collection(table_name)
except CollectionInvalid:
logger.info(f'Collection {table_name} already exists.')
create_indexes(conn, dbname, table_name, INDEXES[table_name])
@register_schema(LocalMongoDBConnection) def create_indexes(conn, dbname, collection, indexes):
def create_indexes(conn, dbname): logger.info(f'Ensure secondary indexes for `{collection}`.')
create_transactions_secondary_index(conn, dbname) for fields, kwargs in indexes:
create_assets_secondary_index(conn, dbname) conn.conn[dbname][collection].create_index(fields, **kwargs)
create_blocks_secondary_index(conn, dbname)
create_metadata_secondary_index(conn, dbname)
create_utxos_secondary_index(conn, dbname)
create_pre_commit_secondary_index(conn, dbname)
create_validators_secondary_index(conn, dbname)
create_abci_chains_indexes(conn, dbname)
create_elections_secondary_index(conn, dbname)
@register_schema(LocalMongoDBConnection) @register_schema(LocalMongoDBConnection)
def drop_database(conn, dbname): def drop_database(conn, dbname):
conn.conn.drop_database(dbname) conn.conn.drop_database(dbname)
def create_transactions_secondary_index(conn, dbname):
logger.info('Create `transactions` secondary index.')
# to query the transactions for a transaction id, this field is unique
conn.conn[dbname]['transactions'].create_index('id',
unique=True,
name='transaction_id')
# secondary index for asset uuid, this field is unique
conn.conn[dbname]['transactions']\
.create_index('asset.id', name='asset_id')
# secondary index on the public keys of outputs
conn.conn[dbname]['transactions']\
.create_index('outputs.public_keys',
name='outputs')
# secondary index on inputs/transaction links (transaction_id, output)
conn.conn[dbname]['transactions']\
.create_index([
('inputs.fulfills.transaction_id', ASCENDING),
('inputs.fulfills.output_index', ASCENDING),
], name='inputs')
def create_assets_secondary_index(conn, dbname):
logger.info('Create `assets` secondary index.')
# unique index on the id of the asset.
# the id is the txid of the transaction that created the asset
conn.conn[dbname]['assets'].create_index('id',
name='asset_id',
unique=True)
# full text search index
conn.conn[dbname]['assets'].create_index([('$**', TEXT)], name='text')
def create_blocks_secondary_index(conn, dbname):
conn.conn[dbname]['blocks']\
.create_index([('height', DESCENDING)], name='height', unique=True)
def create_metadata_secondary_index(conn, dbname):
logger.info('Create `assets` secondary index.')
# the id is the txid of the transaction where metadata was defined
conn.conn[dbname]['metadata'].create_index('id',
name='transaction_id',
unique=True)
# full text search index
conn.conn[dbname]['metadata'].create_index([('$**', TEXT)], name='text')
def create_utxos_secondary_index(conn, dbname):
logger.info('Create `utxos` secondary index.')
conn.conn[dbname]['utxos'].create_index(
[('transaction_id', ASCENDING), ('output_index', ASCENDING)],
name='utxo',
unique=True,
)
def create_pre_commit_secondary_index(conn, dbname):
logger.info('Create `pre_commit` secondary index.')
conn.conn[dbname]['pre_commit'].create_index('commit_id',
name='pre_commit_id',
unique=True)
def create_validators_secondary_index(conn, dbname):
logger.info('Create `validators` secondary index.')
conn.conn[dbname]['validators'].create_index('height',
name='height',
unique=True,)
def create_abci_chains_indexes(conn, dbname):
logger.info('Create `abci_chains.height` secondary index.')
conn.conn[dbname]['abci_chains'].create_index('height',
name='height',
unique=True,)
logger.info('Create `abci_chains.chain_id` secondary index.')
conn.conn[dbname]['abci_chains'].create_index('chain_id',
name='chain_id',
unique=True)
def create_elections_secondary_index(conn, dbname):
logger.info('Create `elections` secondary index.')
conn.conn[dbname]['elections'].create_index('election_id',
name='election_id',
unique=True,)

View File

@ -31,10 +31,6 @@ def create_database(connection, dbname):
Args: Args:
dbname (str): the name of the database to create. dbname (str): the name of the database to create.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
""" """
raise NotImplementedError raise NotImplementedError
@ -51,17 +47,6 @@ def create_tables(connection, dbname):
raise NotImplementedError raise NotImplementedError
@singledispatch
def create_indexes(connection, dbname):
"""Create the indexes to be used by BigchainDB.
Args:
dbname (str): the name of the database to create indexes for.
"""
raise NotImplementedError
@singledispatch @singledispatch
def drop_database(connection, dbname): def drop_database(connection, dbname):
"""Drop the database used by BigchainDB. """Drop the database used by BigchainDB.
@ -90,10 +75,6 @@ def init_database(connection=None, dbname=None):
dbname (str): the name of the database to create. dbname (str): the name of the database to create.
Defaults to the database name given in the BigchainDB Defaults to the database name given in the BigchainDB
configuration. configuration.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
""" """
connection = connection or connect() connection = connection or connect()
@ -101,7 +82,6 @@ def init_database(connection=None, dbname=None):
create_database(connection, dbname) create_database(connection, dbname)
create_tables(connection, dbname) create_tables(connection, dbname)
create_indexes(connection, dbname)
def validate_language_key(obj, key): def validate_language_key(obj, key):

View File

@ -14,8 +14,7 @@ import json
import sys import sys
from bigchaindb.utils import load_node_key from bigchaindb.utils import load_node_key
from bigchaindb.common.exceptions import (DatabaseAlreadyExists, from bigchaindb.common.exceptions import (DatabaseDoesNotExist,
DatabaseDoesNotExist,
ValidationError) ValidationError)
from bigchaindb.elections.vote import Vote from bigchaindb.elections.vote import Vote
import bigchaindb import bigchaindb
@ -228,14 +227,7 @@ def _run_init():
@configure_bigchaindb @configure_bigchaindb
def run_init(args): def run_init(args):
"""Initialize the database""" """Initialize the database"""
# TODO Provide mechanism to:
# 1. prompt the user to inquire whether they wish to drop the db
# 2. force the init, (e.g., via -f flag)
try:
_run_init() _run_init()
except DatabaseAlreadyExists:
print('The database already exists.', file=sys.stderr)
print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
@configure_bigchaindb @configure_bigchaindb
@ -279,12 +271,9 @@ def run_start(args):
logger.info('BigchainDB Version %s', bigchaindb.__version__) logger.info('BigchainDB Version %s', bigchaindb.__version__)
run_recover(bigchaindb.lib.BigchainDB()) run_recover(bigchaindb.lib.BigchainDB())
try:
if not args.skip_initialize_database: if not args.skip_initialize_database:
logger.info('Initializing database') logger.info('Initializing database')
_run_init() _run_init()
except DatabaseAlreadyExists:
pass
logger.info('Starting BigchainDB main process.') logger.info('Starting BigchainDB main process.')
from bigchaindb.start import start from bigchaindb.start import start

View File

@ -11,10 +11,6 @@ class ConfigurationError(BigchainDBError):
"""Raised when there is a problem with server configuration""" """Raised when there is a problem with server configuration"""
class DatabaseAlreadyExists(BigchainDBError):
"""Raised when trying to create the database but the db is already there"""
class DatabaseDoesNotExist(BigchainDBError): class DatabaseDoesNotExist(BigchainDBError):
"""Raised when trying to delete the database but the db is not there""" """Raised when trying to delete the database but the db is not there"""

View File

@ -2,8 +2,6 @@
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0 # Code is Apache-2.0 and docs are CC-BY-4.0
import pytest
def test_init_creates_db_tables_and_indexes(): def test_init_creates_db_tables_and_indexes():
import bigchaindb import bigchaindb
@ -50,11 +48,10 @@ def test_init_creates_db_tables_and_indexes():
assert set(indexes) == {'_id_', 'election_id'} assert set(indexes) == {'_id_', 'election_id'}
def test_init_database_fails_if_db_exists(): def test_init_database_is_graceful_if_db_exists():
import bigchaindb import bigchaindb
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend.schema import init_database from bigchaindb.backend.schema import init_database
from bigchaindb.common import exceptions
conn = backend.connect() conn = backend.connect()
dbname = bigchaindb.config['database']['name'] dbname = bigchaindb.config['database']['name']
@ -62,7 +59,6 @@ def test_init_database_fails_if_db_exists():
# The db is set up by the fixtures # The db is set up by the fixtures
assert dbname in conn.conn.database_names() assert dbname in conn.conn.database_names()
with pytest.raises(exceptions.DatabaseAlreadyExists):
init_database() init_database()
@ -85,21 +81,6 @@ def test_create_tables():
'pre_commit', 'abci_chains', 'pre_commit', 'abci_chains',
} }
def test_create_secondary_indexes():
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend import schema
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by the fixtures so we need to remove it
conn.conn.drop_database(dbname)
schema.create_database(conn, dbname)
schema.create_tables(conn, dbname)
schema.create_indexes(conn, dbname)
indexes = conn.conn[dbname]['assets'].index_information().keys() indexes = conn.conn[dbname]['assets'].index_information().keys()
assert set(indexes) == {'_id_', 'asset_id', 'text'} assert set(indexes) == {'_id_', 'asset_id', 'text'}

View File

@ -8,7 +8,6 @@ from pytest import mark, raises
@mark.parametrize('schema_func_name,args_qty', ( @mark.parametrize('schema_func_name,args_qty', (
('create_database', 1), ('create_database', 1),
('create_tables', 1), ('create_tables', 1),
('create_indexes', 1),
('drop_database', 1), ('drop_database', 1),
)) ))
def test_schema(schema_func_name, args_qty): def test_schema(schema_func_name, args_qty):

View File

@ -75,25 +75,6 @@ def test_bigchain_show_config(capsys):
assert output_config == config assert output_config == config
def test_bigchain_run_init_when_db_exists(mocker, capsys):
from bigchaindb.commands.bigchaindb import run_init
from bigchaindb.common.exceptions import DatabaseAlreadyExists
init_db_mock = mocker.patch(
'bigchaindb.commands.bigchaindb.schema.init_database',
autospec=True,
spec_set=True,
)
init_db_mock.side_effect = DatabaseAlreadyExists
args = Namespace(config=None)
run_init(args)
output_message = capsys.readouterr()[1]
print(output_message)
assert output_message == (
'The database already exists.\n'
'If you wish to re-initialize it, first drop it.\n'
)
def test__run_init(mocker): def test__run_init(mocker):
from bigchaindb.commands.bigchaindb import _run_init from bigchaindb.commands.bigchaindb import _run_init
bigchain_mock = mocker.patch( bigchain_mock = mocker.patch(
@ -219,23 +200,6 @@ def test_run_configure_with_backend(backend, monkeypatch, mock_write_config):
assert value['return'] == expected_config assert value['return'] == expected_config
def test_run_start_when_db_already_exists(mocker,
monkeypatch,
run_start_args,
mocked_setup_logging):
from bigchaindb.commands.bigchaindb import run_start
from bigchaindb.common.exceptions import DatabaseAlreadyExists
mocked_start = mocker.patch('bigchaindb.start.start')
def mock_run_init():
raise DatabaseAlreadyExists()
monkeypatch.setattr('builtins.input', lambda: '\x03')
monkeypatch.setattr(
'bigchaindb.commands.bigchaindb._run_init', mock_run_init)
run_start(run_start_args)
assert mocked_start.called
@patch('bigchaindb.commands.utils.start') @patch('bigchaindb.commands.utils.start')
def test_calling_main(start_mock, monkeypatch): def test_calling_main(start_mock, monkeypatch):
from bigchaindb.commands.bigchaindb import main from bigchaindb.commands.bigchaindb import main

View File

@ -23,8 +23,10 @@ from pymongo import MongoClient
from bigchaindb.common import crypto from bigchaindb.common import crypto
from bigchaindb.log import setup_logging from bigchaindb.log import setup_logging
from bigchaindb.tendermint_utils import key_from_base64 from bigchaindb.tendermint_utils import key_from_base64
from bigchaindb.backend import schema
from bigchaindb.common.crypto import (key_pair_from_ed25519_key, from bigchaindb.common.crypto import (key_pair_from_ed25519_key,
public_key_from_ed25519_key) public_key_from_ed25519_key)
from bigchaindb.common.exceptions import DatabaseDoesNotExist
from bigchaindb.lib import Block from bigchaindb.lib import Block
@ -113,17 +115,12 @@ def _configure_bigchaindb(request):
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def _setup_database(_configure_bigchaindb): def _setup_database(_configure_bigchaindb):
from bigchaindb import config from bigchaindb import config
from bigchaindb.backend import connect, schema from bigchaindb.backend import connect
from bigchaindb.common.exceptions import DatabaseDoesNotExist
print('Initializing test db') print('Initializing test db')
dbname = config['database']['name'] dbname = config['database']['name']
conn = connect() conn = connect()
try: _drop_db(conn, dbname)
schema.drop_database(conn, dbname)
except DatabaseDoesNotExist:
pass
schema.init_database(conn) schema.init_database(conn)
print('Finishing init database') print('Finishing init database')
@ -131,10 +128,7 @@ def _setup_database(_configure_bigchaindb):
print('Deleting `{}` database'.format(dbname)) print('Deleting `{}` database'.format(dbname))
conn = connect() conn = connect()
try: _drop_db(conn, dbname)
schema.drop_database(conn, dbname)
except DatabaseDoesNotExist:
pass
print('Finished deleting `{}`'.format(dbname)) print('Finished deleting `{}`'.format(dbname))
@ -270,7 +264,6 @@ def signed_create_tx(alice, create_tx):
return create_tx.sign([alice.private_key]) return create_tx.sign([alice.private_key])
@pytest.mark.abci
@pytest.fixture @pytest.fixture
def posted_create_tx(b, signed_create_tx): def posted_create_tx(b, signed_create_tx):
res = b.post_transaction(signed_create_tx, 'broadcast_tx_commit') res = b.post_transaction(signed_create_tx, 'broadcast_tx_commit')
@ -321,20 +314,22 @@ def inputs(user_pk, b, alice):
@pytest.fixture @pytest.fixture
def dummy_db(request): def dummy_db(request):
from bigchaindb.backend import connect, schema from bigchaindb.backend import connect
from bigchaindb.common.exceptions import (DatabaseDoesNotExist,
DatabaseAlreadyExists)
conn = connect() conn = connect()
dbname = request.fixturename dbname = request.fixturename
xdist_suffix = getattr(request.config, 'slaveinput', {}).get('slaveid') xdist_suffix = getattr(request.config, 'slaveinput', {}).get('slaveid')
if xdist_suffix: if xdist_suffix:
dbname = '{}_{}'.format(dbname, xdist_suffix) dbname = '{}_{}'.format(dbname, xdist_suffix)
try:
schema.init_database(conn, dbname) _drop_db(conn, dbname) # make sure we start with a clean DB
except DatabaseAlreadyExists:
schema.drop_database(conn, dbname)
schema.init_database(conn, dbname) schema.init_database(conn, dbname)
yield dbname yield dbname
_drop_db(conn, dbname)
def _drop_db(conn, dbname):
try: try:
schema.drop_database(conn, dbname) schema.drop_database(conn, dbname)
except DatabaseDoesNotExist: except DatabaseDoesNotExist:
@ -430,7 +425,6 @@ def event_loop():
loop.close() loop.close()
@pytest.mark.bdb
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def abci_server(): def abci_server():
from abci import ABCIServer from abci import ABCIServer

View File

@ -5,11 +5,12 @@
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
pytestmark = [pytest.mark.bdb, pytest.mark.usefixtures('inputs')]
OUTPUTS_ENDPOINT = '/api/v1/outputs/' OUTPUTS_ENDPOINT = '/api/v1/outputs/'
@pytest.mark.bdb
@pytest.mark.userfixtures('inputs')
def test_get_outputs_endpoint(client, user_pk): def test_get_outputs_endpoint(client, user_pk):
m = MagicMock() m = MagicMock()
m.txid = 'a' m.txid = 'a'
@ -38,6 +39,8 @@ def test_get_outputs_endpoint_unspent(client, user_pk):
gof.assert_called_once_with(user_pk, False) gof.assert_called_once_with(user_pk, False)
@pytest.mark.bdb
@pytest.mark.userfixtures('inputs')
def test_get_outputs_endpoint_spent(client, user_pk): def test_get_outputs_endpoint_spent(client, user_pk):
m = MagicMock() m = MagicMock()
m.txid = 'a' m.txid = 'a'
@ -51,11 +54,15 @@ def test_get_outputs_endpoint_spent(client, user_pk):
gof.assert_called_once_with(user_pk, True) gof.assert_called_once_with(user_pk, True)
@pytest.mark.bdb
@pytest.mark.userfixtures('inputs')
def test_get_outputs_endpoint_without_public_key(client): def test_get_outputs_endpoint_without_public_key(client):
res = client.get(OUTPUTS_ENDPOINT) res = client.get(OUTPUTS_ENDPOINT)
assert res.status_code == 400 assert res.status_code == 400
@pytest.mark.bdb
@pytest.mark.userfixtures('inputs')
def test_get_outputs_endpoint_with_invalid_public_key(client): def test_get_outputs_endpoint_with_invalid_public_key(client):
expected = {'message': {'public_key': 'Invalid base58 ed25519 key'}} expected = {'message': {'public_key': 'Invalid base58 ed25519 key'}}
res = client.get(OUTPUTS_ENDPOINT + '?public_key=abc') res = client.get(OUTPUTS_ENDPOINT + '?public_key=abc')
@ -63,6 +70,8 @@ def test_get_outputs_endpoint_with_invalid_public_key(client):
assert res.status_code == 400 assert res.status_code == 400
@pytest.mark.bdb
@pytest.mark.userfixtures('inputs')
def test_get_outputs_endpoint_with_invalid_spent(client, user_pk): def test_get_outputs_endpoint_with_invalid_spent(client, user_pk):
expected = {'message': {'spent': 'Boolean value must be "true" or "false" (lowercase)'}} expected = {'message': {'spent': 'Boolean value must be "true" or "false" (lowercase)'}}
params = '?spent=tru&public_key={}'.format(user_pk) params = '?spent=tru&public_key={}'.format(user_pk)
@ -72,8 +81,6 @@ def test_get_outputs_endpoint_with_invalid_spent(client, user_pk):
@pytest.mark.abci @pytest.mark.abci
@pytest.mark.bdb
@pytest.mark.usefixtures('inputs')
def test_get_divisble_transactions_returns_500(b, client): def test_get_divisble_transactions_returns_500(b, client):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.common import crypto from bigchaindb.common import crypto