diff --git a/bigchaindb/backend/connection.py b/bigchaindb/backend/connection.py index 88c0eae4..2d4f9bac 100644 --- a/bigchaindb/backend/connection.py +++ b/bigchaindb/backend/connection.py @@ -1,13 +1,17 @@ from importlib import import_module +import logging import bigchaindb from bigchaindb.common.exceptions import ConfigurationError BACKENDS = { + 'mongodb': 'bigchaindb.backend.mongodb.connection.MongoDBConnection', 'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection' } +logger = logging.getLogger(__name__) + def connect(backend=None, host=None, port=None, name=None): """Create a new connection to the database backend. @@ -44,6 +48,7 @@ def connect(backend=None, host=None, port=None, name=None): except (ImportError, AttributeError) as exc: raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc + logger.debug('Connection: {}'.format(Class)) return Class(host, port, dbname) diff --git a/bigchaindb/backend/mongodb/__init__.py b/bigchaindb/backend/mongodb/__init__.py new file mode 100644 index 00000000..b43a9c24 --- /dev/null +++ b/bigchaindb/backend/mongodb/__init__.py @@ -0,0 +1,22 @@ +"""MongoDB backend implementation. + +Contains a MongoDB-specific implementation of the +:mod:`~bigchaindb.backend.changefeed`, :mod:`~bigchaindb.backend.query`, and +:mod:`~bigchaindb.backend.schema` interfaces. + +You can specify BigchainDB to use MongoDB as its database backend by either +setting ``database.backend`` to ``'rethinkdb'`` in your configuration file, or +setting the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable to +``'rethinkdb'``. + +If configured to use MongoDB, BigchainDB will automatically return instances +of :class:`~bigchaindb.backend.rethinkdb.MongoDBConnection` for +:func:`~bigchaindb.backend.connection.connect` and dispatch calls of the +generic backend interfaces to the implementations in this module. +""" + +# Register the single dispatched modules on import. +from bigchaindb.backend.mongodb import schema, query # noqa no changefeed for now + +# MongoDBConnection should always be accessed via +# ``bigchaindb.backend.connect()``. diff --git a/bigchaindb/backend/mongodb/connection.py b/bigchaindb/backend/mongodb/connection.py new file mode 100644 index 00000000..f50d8416 --- /dev/null +++ b/bigchaindb/backend/mongodb/connection.py @@ -0,0 +1,53 @@ +import time +import logging + +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure + +import bigchaindb +from bigchaindb.backend.connection import Connection + +logger = logging.getLogger(__name__) + + +class MongoDBConnection(Connection): + + def __init__(self, host=None, port=None, dbname=None, max_tries=3): + """Create a new Connection instance. + + Args: + host (str, optional): the host to connect to. + port (int, optional): the port to connect to. + dbname (str, optional): the database to use. + max_tries (int, optional): how many tries before giving up. + """ + + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.dbname = dbname or bigchaindb.config['database']['name'] + self.max_tries = max_tries + self.connection = None + + @property + def conn(self): + if self.connection is None: + self._connect() + return self.connection + + @property + def db(self): + if self.conn is None: + self._connect() + + else: + return self.conn[self.dbname] + + def _connect(self): + for i in range(self.max_tries): + try: + self.connection = MongoClient(self.host, self.port) + except ConnectionFailure as exc: + if i + 1 == self.max_tries: + raise + else: + time.sleep(2**i) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py new file mode 100644 index 00000000..369cb39d --- /dev/null +++ b/bigchaindb/backend/mongodb/query.py @@ -0,0 +1,169 @@ +"""Query implementation for MongoDB""" + +from time import time + +from pymongo import ReturnDocument + +from bigchaindb import backend +from bigchaindb.common.exceptions import CyclicBlockchainError +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.mongodb.connection import MongoDBConnection + + +register_query = module_dispatch_registrar(backend.query) + + +@register_query(MongoDBConnection) +def write_transaction(conn, signed_transaction): + return conn.db['backlog'].insert_one(signed_transaction) + + +@register_query(MongoDBConnection) +def update_transaction(conn, transaction_id, doc): + return conn.db['backlog']\ + .find_one_and_update({'id': transaction_id}, + doc, + return_document=ReturnDocument.AFTER) + + +@register_query(MongoDBConnection) +def delete_transaction(conn, *transaction_id): + return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}}) + + +@register_query(MongoDBConnection) +def get_stale_transactions(conn, reassign_delay): + return conn.db['backlog']\ + .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}) + + +@register_query(MongoDBConnection) +def get_transaction_from_block(conn, block_id, tx_id): + # this is definitely wrong, but it's something like this + return conn.db['bigchain'].find_one({'id': block_id, + 'block.transactions.id': tx_id}) + + +@register_query(MongoDBConnection) +def get_transaction_from_backlog(conn, transaction_id): + return conn.db['backlog'].find_one({'id': transaction_id}) + + +@register_query(MongoDBConnection) +def get_blocks_status_from_transaction(conn, transaction_id): + return conn.db['bigchain']\ + .find({'block.transactions.id': transaction_id}, + projection=['id', 'block.voters']) + + +@register_query(MongoDBConnection) +def get_txids_by_asset_id(conn, asset_id): + return conn.db['bigchain']\ + .find({'block.transactions.asset.id': asset_id}, + projection=['id']) + + +@register_query(MongoDBConnection) +def get_asset_by_id(conn, asset_id): + return conn.db['bigchain']\ + .find_one({'block.transactions.asset.id': asset_id, + 'block.transactions.asset.operation': 'CREATE'}, + projection=['block.transactions.asset']) + + +@register_query(MongoDBConnection) +def get_spent(conn, transaction_id, condition_id): + return conn.db['bigchain']\ + .find_one({'block.transactions.fulfillments.input.txid': + transaction_id, + 'block.transactions.fulfillments.input.cid': + condition_id}) + + +@register_query(MongoDBConnection) +def get_owned_ids(conn, owner): + return conn.db['bigchain']\ + .find({'block.transactions.transaction.conditions.owners_after': + owner}) + + +@register_query(MongoDBConnection) +def get_votes_by_block_id(conn, block_id): + return conn.db['votes']\ + .find({'vote.voting_for_block': block_id}) + + +@register_query(MongoDBConnection) +def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): + return conn.db['votes']\ + .find({'vote.voting_for_block': block_id, + 'node_pubkey': node_pubkey}) + + +@register_query(MongoDBConnection) +def write_block(conn, block): + return conn.db['bigchain'].insert_one(block.to_dict()) + + +@register_query(MongoDBConnection) +def get_block(conn, block_id): + return conn.db['bigchain'].find_one({'id': block_id}) + + +@register_query(MongoDBConnection) +def has_transaction(conn, transaction_id): + return bool(conn.db['bigchain'] + .find_one({'block.transactions.id': transaction_id})) + + +@register_query(MongoDBConnection) +def count_blocks(conn): + return conn.db['bigchain'].count() + + +@register_query(MongoDBConnection) +def count_backlog(conn): + return conn.db['backlog'].count() + + +@register_query(MongoDBConnection) +def write_vote(conn, vote): + return conn.db['votes'].insert_one(vote) + + +@register_query(MongoDBConnection) +def get_genesis_block(conn): + return conn.db['bigchain'].find_one({'block.transactions.0.operation' == + 'GENESIS'}) + + +@register_query(MongoDBConnection) +def get_last_voted_block(conn, node_pubkey): + last_voted = conn.db['votes']\ + .find({'node_pubkey': node_pubkey}, + sort=[('vote.timestamp', -1)]) + if not last_voted: + return get_genesis_block(conn) + + mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] + for v in last_voted} + + last_block_id = list(mapping.values())[0] + + explored = set() + + while True: + try: + if last_block_id in explored: + raise CyclicBlockchainError() + explored.add(last_block_id) + last_block_id = mapping[last_block_id] + except KeyError: + break + + return get_block(conn, last_block_id) + + +@register_query(MongoDBConnection) +def get_unvoted_blocks(conn, node_pubkey): + pass diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py new file mode 100644 index 00000000..55fd955d --- /dev/null +++ b/bigchaindb/backend/mongodb/schema.py @@ -0,0 +1,112 @@ +"""Utils to initialize and drop the database.""" + +import logging + +from pymongo import ASCENDING, DESCENDING + +from bigchaindb import backend +from bigchaindb.common import exceptions +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.mongodb.connection import MongoDBConnection + + +logger = logging.getLogger(__name__) +register_schema = module_dispatch_registrar(backend.schema) + + +@register_schema(MongoDBConnection) +def create_database(conn, dbname): + if dbname in conn.conn.database_names(): + raise exceptions.DatabaseAlreadyExists('Database `{}` already exists' + .format(dbname)) + + logger.info('Create database `%s`.', dbname) + # TODO: read and write concerns can be declared here + conn.conn.get_database(dbname) + + +@register_schema(MongoDBConnection) +def create_tables(conn, dbname): + for table_name in ['bigchain', 'backlog', 'votes']: + logger.info('Create `%s` table.', table_name) + # create the table + # TODO: read and write concerns can be declared here + conn.conn[dbname].create_collection(table_name) + + +@register_schema(MongoDBConnection) +def create_indexes(conn, dbname): + create_bigchain_secondary_index(conn, dbname) + create_backlog_secondary_index(conn, dbname) + create_votes_secondary_index(conn, dbname) + + +@register_schema(MongoDBConnection) +def drop_database(conn, dbname): + conn.conn.drop_database(dbname) + + +def create_bigchain_secondary_index(conn, dbname): + logger.info('Create `bigchain` secondary index.') + + # to select blocks by id + conn.conn[dbname]['bigchain'].create_index('id', name='block_id') + + # to order blocks by timestamp + conn.conn[dbname]['bigchain'].create_index([('block.timestamp', + ASCENDING)], + name='block_timestamp') + + # to query the bigchain for a transaction id, this field is unique + conn.conn[dbname]['bigchain'].create_index('block.transactions.id', + name='transaction_id', + unique=True) + + # secondary index for payload data by UUID, this field is unique + conn.conn[dbname]['bigchain']\ + .create_index('block.transactions.transaction.metadata.id', + name='metadata_id', unique=True) + + # secondary index for asset uuid, this field is unique + conn.conn[dbname]['bigchain']\ + .create_index('block.transactions.transaction.asset.id', + name='asset_id', unique=True) + + # compound index on fulfillment and transactions id + conn.conn[dbname]['bigchain']\ + .create_index([('block.transactions.transaction.fulfillments.txid', + ASCENDING), + ('block.transactions.transaction.fulfillments.cid', + ASCENDING)], + name='tx_and_fulfillment') + + +def create_backlog_secondary_index(conn, dbname): + logger.info('Create `backlog` secondary index.') + + # to order transactions by timestamp + conn.conn[dbname]['backlog'].create_index([('transaction.timestamp', + ASCENDING)], + name='transaction_timestamp') + + # compound index to read transactions from the backlog per assignee + conn.conn[dbname]['backlog']\ + .create_index([('assignee', ASCENDING), + ('assignment_timestamp', DESCENDING)], + name='assignee__transaction_timestamp') + + +def create_votes_secondary_index(conn, dbname): + logger.info('Create `votes` secondary index.') + + # index on block id to quickly poll + conn.conn[dbname]['votes'].create_index('vote.voting_for_block', + name='voting_for') + + # is the first index redundant then? + # compound index to order votes by block id and node + conn.conn[dbname]['votes'].create_index([('vote.voting_for_block', + ASCENDING), + ('node_pubkey', + ASCENDING)], + name='block_and_voter') diff --git a/bigchaindb/backend/schema.py b/bigchaindb/backend/schema.py index b731a55f..086854b0 100644 --- a/bigchaindb/backend/schema.py +++ b/bigchaindb/backend/schema.py @@ -1,10 +1,13 @@ """Database creation and schema-providing interfaces for backends.""" from functools import singledispatch +import logging import bigchaindb from bigchaindb.backend.connection import connect +logger = logging.getLogger(__name__) + @singledispatch def create_database(connection, dbname): diff --git a/codecov.yml b/codecov.yml index b6f22af9..877db0d3 100644 --- a/codecov.yml +++ b/codecov.yml @@ -32,6 +32,7 @@ coverage: - "benchmarking-tests/*" - "speed-tests/*" - "ntools/*" + - "bigchaindb/backend/mongodb/*" comment: # @stevepeak (from codecov.io) suggested we change 'suggestions' to 'uncovered' diff --git a/docker-compose.yml b/docker-compose.yml index cdd88078..0d20f868 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,11 @@ version: '2' services: + mdb: + image: mongo + ports: + - "27017" + rdb: image: rethinkdb ports: @@ -30,8 +35,30 @@ services: - ./tox.ini:/usr/src/app/tox.ini - ./Makefile:/usr/src/app/Makefile environment: + BIGCHAINDB_DATABASE_BACKEND: rethinkdb BIGCHAINDB_DATABASE_HOST: rdb BIGCHAINDB_SERVER_BIND: 0.0.0.0:9984 ports: - "9984" command: bigchaindb start + + bdb-mdb: + build: + context: . + dockerfile: Dockerfile-dev + volumes: + - ./bigchaindb:/usr/src/app/bigchaindb + - ./tests:/usr/src/app/tests + - ./docs:/usr/src/app/docs + - ./setup.py:/usr/src/app/setup.py + - ./setup.cfg:/usr/src/app/setup.cfg + - ./pytest.ini:/usr/src/app/pytest.ini + - ./tox.ini:/usr/src/app/tox.ini + environment: + BIGCHAINDB_DATABASE_BACKEND: mongodb + BIGCHAINDB_DATABASE_HOST: mdb + BIGCHAINDB_DATABASE_PORT: 27017 + BIGCHAINDB_SERVER_BIND: 0.0.0.0:9984 + ports: + - "9984" + command: bigchaindb start diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index f9a76e6d..c7dffac5 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -1,3 +1,6 @@ +from importlib import import_module +from unittest.mock import patch + from pytest import mark, raises @@ -64,3 +67,26 @@ def test_changefeed_class(changefeed_class_func_name, args_qty): changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name) with raises(NotImplementedError): changefeed_class_func(None, *range(args_qty)) + + +@mark.parametrize('db,conn_cls', ( + ('mongodb', 'MongoDBConnection'), + ('rethinkdb', 'RethinkDBConnection'), +)) +@patch('bigchaindb.backend.schema.create_indexes', + autospec=True, return_value=None) +@patch('bigchaindb.backend.schema.create_tables', + autospec=True, return_value=None) +@patch('bigchaindb.backend.schema.create_database', + autospec=True, return_value=None) +def test_init_database(mock_create_database, mock_create_tables, + mock_create_indexes, db, conn_cls): + from bigchaindb.backend.schema import init_database + conn = getattr( + import_module('bigchaindb.backend.{}.connection'.format(db)), + conn_cls, + )('host', 'port', 'dbname') + init_database(connection=conn, dbname='mickeymouse') + mock_create_database.assert_called_once_with(conn, 'mickeymouse') + mock_create_tables.assert_called_once_with(conn, 'mickeymouse') + mock_create_indexes.assert_called_once_with(conn, 'mickeymouse') \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 1830ab3c..bddfa47b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,6 +64,9 @@ def restore_config(request, node_config): def node_config(request): config = copy.deepcopy(CONFIG) config['database']['backend'] = request.config.getoption('--database-backend') + if config['database']['backend'] == 'mongodb': + # not a great way to do this + config['database']['port'] = 27017 return config