Merge pull request #942 from bigchaindb/docker-mongo

mongodb integration
This commit is contained in:
Rodolphe Marques 2016-12-16 13:54:32 +01:00 committed by GitHub
commit e704b19454
10 changed files with 421 additions and 0 deletions

View File

@ -1,13 +1,17 @@
from importlib import import_module
import logging
import bigchaindb
from bigchaindb.common.exceptions import ConfigurationError
BACKENDS = {
'mongodb': 'bigchaindb.backend.mongodb.connection.MongoDBConnection',
'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection'
}
logger = logging.getLogger(__name__)
def connect(backend=None, host=None, port=None, name=None):
"""Create a new connection to the database backend.
@ -44,6 +48,7 @@ def connect(backend=None, host=None, port=None, name=None):
except (ImportError, AttributeError) as exc:
raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc
logger.debug('Connection: {}'.format(Class))
return Class(host, port, dbname)

View File

@ -0,0 +1,22 @@
"""MongoDB backend implementation.
Contains a MongoDB-specific implementation of the
:mod:`~bigchaindb.backend.changefeed`, :mod:`~bigchaindb.backend.query`, and
:mod:`~bigchaindb.backend.schema` interfaces.
You can specify BigchainDB to use MongoDB as its database backend by either
setting ``database.backend`` to ``'rethinkdb'`` in your configuration file, or
setting the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable to
``'rethinkdb'``.
If configured to use MongoDB, BigchainDB will automatically return instances
of :class:`~bigchaindb.backend.rethinkdb.MongoDBConnection` for
:func:`~bigchaindb.backend.connection.connect` and dispatch calls of the
generic backend interfaces to the implementations in this module.
"""
# Register the single dispatched modules on import.
from bigchaindb.backend.mongodb import schema, query # noqa no changefeed for now
# MongoDBConnection should always be accessed via
# ``bigchaindb.backend.connect()``.

View File

@ -0,0 +1,53 @@
import time
import logging
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
import bigchaindb
from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
class MongoDBConnection(Connection):
def __init__(self, host=None, port=None, dbname=None, max_tries=3):
"""Create a new Connection instance.
Args:
host (str, optional): the host to connect to.
port (int, optional): the port to connect to.
dbname (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up.
"""
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname or bigchaindb.config['database']['name']
self.max_tries = max_tries
self.connection = None
@property
def conn(self):
if self.connection is None:
self._connect()
return self.connection
@property
def db(self):
if self.conn is None:
self._connect()
else:
return self.conn[self.dbname]
def _connect(self):
for i in range(self.max_tries):
try:
self.connection = MongoClient(self.host, self.port)
except ConnectionFailure as exc:
if i + 1 == self.max_tries:
raise
else:
time.sleep(2**i)

View File

@ -0,0 +1,169 @@
"""Query implementation for MongoDB"""
from time import time
from pymongo import ReturnDocument
from bigchaindb import backend
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
register_query = module_dispatch_registrar(backend.query)
@register_query(MongoDBConnection)
def write_transaction(conn, signed_transaction):
return conn.db['backlog'].insert_one(signed_transaction)
@register_query(MongoDBConnection)
def update_transaction(conn, transaction_id, doc):
return conn.db['backlog']\
.find_one_and_update({'id': transaction_id},
doc,
return_document=ReturnDocument.AFTER)
@register_query(MongoDBConnection)
def delete_transaction(conn, *transaction_id):
return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}})
@register_query(MongoDBConnection)
def get_stale_transactions(conn, reassign_delay):
return conn.db['backlog']\
.find({'assignment_timestamp': {'$lt': time() - reassign_delay}})
@register_query(MongoDBConnection)
def get_transaction_from_block(conn, block_id, tx_id):
# this is definitely wrong, but it's something like this
return conn.db['bigchain'].find_one({'id': block_id,
'block.transactions.id': tx_id})
@register_query(MongoDBConnection)
def get_transaction_from_backlog(conn, transaction_id):
return conn.db['backlog'].find_one({'id': transaction_id})
@register_query(MongoDBConnection)
def get_blocks_status_from_transaction(conn, transaction_id):
return conn.db['bigchain']\
.find({'block.transactions.id': transaction_id},
projection=['id', 'block.voters'])
@register_query(MongoDBConnection)
def get_txids_by_asset_id(conn, asset_id):
return conn.db['bigchain']\
.find({'block.transactions.asset.id': asset_id},
projection=['id'])
@register_query(MongoDBConnection)
def get_asset_by_id(conn, asset_id):
return conn.db['bigchain']\
.find_one({'block.transactions.asset.id': asset_id,
'block.transactions.asset.operation': 'CREATE'},
projection=['block.transactions.asset'])
@register_query(MongoDBConnection)
def get_spent(conn, transaction_id, condition_id):
return conn.db['bigchain']\
.find_one({'block.transactions.fulfillments.input.txid':
transaction_id,
'block.transactions.fulfillments.input.cid':
condition_id})
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner):
return conn.db['bigchain']\
.find({'block.transactions.transaction.conditions.owners_after':
owner})
@register_query(MongoDBConnection)
def get_votes_by_block_id(conn, block_id):
return conn.db['votes']\
.find({'vote.voting_for_block': block_id})
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.db['votes']\
.find({'vote.voting_for_block': block_id,
'node_pubkey': node_pubkey})
@register_query(MongoDBConnection)
def write_block(conn, block):
return conn.db['bigchain'].insert_one(block.to_dict())
@register_query(MongoDBConnection)
def get_block(conn, block_id):
return conn.db['bigchain'].find_one({'id': block_id})
@register_query(MongoDBConnection)
def has_transaction(conn, transaction_id):
return bool(conn.db['bigchain']
.find_one({'block.transactions.id': transaction_id}))
@register_query(MongoDBConnection)
def count_blocks(conn):
return conn.db['bigchain'].count()
@register_query(MongoDBConnection)
def count_backlog(conn):
return conn.db['backlog'].count()
@register_query(MongoDBConnection)
def write_vote(conn, vote):
return conn.db['votes'].insert_one(vote)
@register_query(MongoDBConnection)
def get_genesis_block(conn):
return conn.db['bigchain'].find_one({'block.transactions.0.operation' ==
'GENESIS'})
@register_query(MongoDBConnection)
def get_last_voted_block(conn, node_pubkey):
last_voted = conn.db['votes']\
.find({'node_pubkey': node_pubkey},
sort=[('vote.timestamp', -1)])
if not last_voted:
return get_genesis_block(conn)
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
last_block_id = list(mapping.values())[0]
explored = set()
while True:
try:
if last_block_id in explored:
raise CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return get_block(conn, last_block_id)
@register_query(MongoDBConnection)
def get_unvoted_blocks(conn, node_pubkey):
pass

View File

@ -0,0 +1,112 @@
"""Utils to initialize and drop the database."""
import logging
from pymongo import ASCENDING, DESCENDING
from bigchaindb import backend
from bigchaindb.common import exceptions
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
logger = logging.getLogger(__name__)
register_schema = module_dispatch_registrar(backend.schema)
@register_schema(MongoDBConnection)
def create_database(conn, dbname):
if dbname in conn.conn.database_names():
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'
.format(dbname))
logger.info('Create database `%s`.', dbname)
# TODO: read and write concerns can be declared here
conn.conn.get_database(dbname)
@register_schema(MongoDBConnection)
def create_tables(conn, dbname):
for table_name in ['bigchain', 'backlog', 'votes']:
logger.info('Create `%s` table.', table_name)
# create the table
# TODO: read and write concerns can be declared here
conn.conn[dbname].create_collection(table_name)
@register_schema(MongoDBConnection)
def create_indexes(conn, dbname):
create_bigchain_secondary_index(conn, dbname)
create_backlog_secondary_index(conn, dbname)
create_votes_secondary_index(conn, dbname)
@register_schema(MongoDBConnection)
def drop_database(conn, dbname):
conn.conn.drop_database(dbname)
def create_bigchain_secondary_index(conn, dbname):
logger.info('Create `bigchain` secondary index.')
# to select blocks by id
conn.conn[dbname]['bigchain'].create_index('id', name='block_id')
# to order blocks by timestamp
conn.conn[dbname]['bigchain'].create_index([('block.timestamp',
ASCENDING)],
name='block_timestamp')
# to query the bigchain for a transaction id, this field is unique
conn.conn[dbname]['bigchain'].create_index('block.transactions.id',
name='transaction_id',
unique=True)
# secondary index for payload data by UUID, this field is unique
conn.conn[dbname]['bigchain']\
.create_index('block.transactions.transaction.metadata.id',
name='metadata_id', unique=True)
# secondary index for asset uuid, this field is unique
conn.conn[dbname]['bigchain']\
.create_index('block.transactions.transaction.asset.id',
name='asset_id', unique=True)
# compound index on fulfillment and transactions id
conn.conn[dbname]['bigchain']\
.create_index([('block.transactions.transaction.fulfillments.txid',
ASCENDING),
('block.transactions.transaction.fulfillments.cid',
ASCENDING)],
name='tx_and_fulfillment')
def create_backlog_secondary_index(conn, dbname):
logger.info('Create `backlog` secondary index.')
# to order transactions by timestamp
conn.conn[dbname]['backlog'].create_index([('transaction.timestamp',
ASCENDING)],
name='transaction_timestamp')
# compound index to read transactions from the backlog per assignee
conn.conn[dbname]['backlog']\
.create_index([('assignee', ASCENDING),
('assignment_timestamp', DESCENDING)],
name='assignee__transaction_timestamp')
def create_votes_secondary_index(conn, dbname):
logger.info('Create `votes` secondary index.')
# index on block id to quickly poll
conn.conn[dbname]['votes'].create_index('vote.voting_for_block',
name='voting_for')
# is the first index redundant then?
# compound index to order votes by block id and node
conn.conn[dbname]['votes'].create_index([('vote.voting_for_block',
ASCENDING),
('node_pubkey',
ASCENDING)],
name='block_and_voter')

View File

@ -1,10 +1,13 @@
"""Database creation and schema-providing interfaces for backends."""
from functools import singledispatch
import logging
import bigchaindb
from bigchaindb.backend.connection import connect
logger = logging.getLogger(__name__)
@singledispatch
def create_database(connection, dbname):

View File

@ -32,6 +32,7 @@ coverage:
- "benchmarking-tests/*"
- "speed-tests/*"
- "ntools/*"
- "bigchaindb/backend/mongodb/*"
comment:
# @stevepeak (from codecov.io) suggested we change 'suggestions' to 'uncovered'

View File

@ -1,6 +1,11 @@
version: '2'
services:
mdb:
image: mongo
ports:
- "27017"
rdb:
image: rethinkdb
ports:
@ -30,8 +35,30 @@ services:
- ./tox.ini:/usr/src/app/tox.ini
- ./Makefile:/usr/src/app/Makefile
environment:
BIGCHAINDB_DATABASE_BACKEND: rethinkdb
BIGCHAINDB_DATABASE_HOST: rdb
BIGCHAINDB_SERVER_BIND: 0.0.0.0:9984
ports:
- "9984"
command: bigchaindb start
bdb-mdb:
build:
context: .
dockerfile: Dockerfile-dev
volumes:
- ./bigchaindb:/usr/src/app/bigchaindb
- ./tests:/usr/src/app/tests
- ./docs:/usr/src/app/docs
- ./setup.py:/usr/src/app/setup.py
- ./setup.cfg:/usr/src/app/setup.cfg
- ./pytest.ini:/usr/src/app/pytest.ini
- ./tox.ini:/usr/src/app/tox.ini
environment:
BIGCHAINDB_DATABASE_BACKEND: mongodb
BIGCHAINDB_DATABASE_HOST: mdb
BIGCHAINDB_DATABASE_PORT: 27017
BIGCHAINDB_SERVER_BIND: 0.0.0.0:9984
ports:
- "9984"
command: bigchaindb start

View File

@ -1,3 +1,6 @@
from importlib import import_module
from unittest.mock import patch
from pytest import mark, raises
@ -64,3 +67,26 @@ def test_changefeed_class(changefeed_class_func_name, args_qty):
changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name)
with raises(NotImplementedError):
changefeed_class_func(None, *range(args_qty))
@mark.parametrize('db,conn_cls', (
('mongodb', 'MongoDBConnection'),
('rethinkdb', 'RethinkDBConnection'),
))
@patch('bigchaindb.backend.schema.create_indexes',
autospec=True, return_value=None)
@patch('bigchaindb.backend.schema.create_tables',
autospec=True, return_value=None)
@patch('bigchaindb.backend.schema.create_database',
autospec=True, return_value=None)
def test_init_database(mock_create_database, mock_create_tables,
mock_create_indexes, db, conn_cls):
from bigchaindb.backend.schema import init_database
conn = getattr(
import_module('bigchaindb.backend.{}.connection'.format(db)),
conn_cls,
)('host', 'port', 'dbname')
init_database(connection=conn, dbname='mickeymouse')
mock_create_database.assert_called_once_with(conn, 'mickeymouse')
mock_create_tables.assert_called_once_with(conn, 'mickeymouse')
mock_create_indexes.assert_called_once_with(conn, 'mickeymouse')

View File

@ -64,6 +64,9 @@ def restore_config(request, node_config):
def node_config(request):
config = copy.deepcopy(CONFIG)
config['database']['backend'] = request.config.getoption('--database-backend')
if config['database']['backend'] == 'mongodb':
# not a great way to do this
config['database']['port'] = 27017
return config