diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index 073b1ce9..8dc0318a 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -15,6 +15,7 @@ config = { 'threads': None, # if none, the value will be cpu_count * 2 + 1 }, 'database': { + 'backend': 'rethinkdb', 'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'), 'port': 28015, 'name': 'bigchain', diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py new file mode 100644 index 00000000..6f7c9f88 --- /dev/null +++ b/bigchaindb/backend/__init__.py @@ -0,0 +1,6 @@ +"""Backend interfaces ...""" + +# Include the backend interfaces +from bigchaindb.backend import changefeed, schema, query # noqa + +from bigchaindb.backend.connection import connect # noqa diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py new file mode 100644 index 00000000..f42e5f71 --- /dev/null +++ b/bigchaindb/backend/changefeed.py @@ -0,0 +1 @@ +"""Changefeed interfaces for backend databases.""" diff --git a/bigchaindb/backend/connection.py b/bigchaindb/backend/connection.py new file mode 100644 index 00000000..0f0b9243 --- /dev/null +++ b/bigchaindb/backend/connection.py @@ -0,0 +1,44 @@ +import bigchaindb +from bigchaindb.common.exceptions import ConfigurationError +from importlib import import_module + + +BACKENDS = { + 'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection' +} + + +def connect(backend=None, host=None, port=None, name=None): + """Create a connection to the database backend. + + Args: + backend (str): the name of the backend to use. + host (str): the host to connect to. + port (int): the port to connect to. + name (str): the name of the database to use. + + Returns: + An instance of :class:`~bigchaindb.backend.connection.Connection`. + """ + + backend = backend or bigchaindb.config['database']['backend'] + host = host or bigchaindb.config['database']['host'] + port = port or bigchaindb.config['database']['port'] + dbname = name or bigchaindb.config['database']['name'] + + try: + module_name, _, class_name = BACKENDS[backend].rpartition('.') + Class = getattr(import_module(module_name), class_name) + except KeyError: + raise ConfigurationError('Backend `{}` is not supported. ' + 'BigchainDB currently supports {}'.format(backend, BACKENDS.keys())) + except (ImportError, AttributeError) as exc: + raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc + + return Class(host, port, dbname) + + +class Connection: + + def run(self, query): + raise NotImplementedError() diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py new file mode 100644 index 00000000..e9fe0b46 --- /dev/null +++ b/bigchaindb/backend/query.py @@ -0,0 +1,320 @@ +"""Query interfaces for backend databases.""" + +from functools import singledispatch + + +@singledispatch +def write_transaction(connection, signed_transaction): + """Write a transaction to the backlog table. + + Args: + signed_transaction (dict): a signed transaction. + + Returns: + The result of the operation. + """ + + raise NotImplementedError + + +@singledispatch +def update_transaction(connection, transaction_id, doc): + """Update a transaction in the backlog table. + + Args: + transaction_id (str): the id of the transaction. + doc (dict): the values to update. + + Returns: + The result of the operation. + """ + + raise NotImplementedError + + +@singledispatch +def delete_transaction(connection, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete. + + Returns: + The database response. + """ + raise NotImplementedError + + +@singledispatch +def get_stale_transactions(connection, reassign_delay): + """Get a cursor of stale transactions. + + Transactions are considered stale if they have been assigned a node, + but are still in the backlog after some amount of time specified in the + configuration. + + Args: + reassign_delay (int): threshold (in seconds) to mark a transaction stale. + + Returns: + A cursor of transactions. + """ + + raise NotImplementedError + + +@singledispatch +def get_transaction_from_block(connection, transaction_id, block_id): + """Get a transaction from a specific block. + + Args: + transaction_id (str): the id of the transaction. + block_id (str): the id of the block. + + Returns: + The matching transaction. + """ + + raise NotImplementedError + + +@singledispatch +def get_transaction_from_backlog(connection, transaction_id): + """Get a transaction from backlog. + + Args: + transaction_id (str): the id of the transaction. + + Returns: + The matching transaction. + """ + + raise NotImplementedError + + +@singledispatch +def get_blocks_status_from_transaction(connection, transaction_id): + """Retrieve block election information given a secondary index and value. + + Args: + value: a value to search (e.g. transaction id string, payload hash string) + index (str): name of a secondary index, e.g. 'transaction_id' + + Returns: + :obj:`list` of :obj:`dict`: A list of blocks with with only election information + """ + + raise NotImplementedError + + +@singledispatch +def get_txids_by_asset_id(connection, asset_id): + """Retrieves transactions ids related to a particular asset. + + A digital asset in bigchaindb is identified by an uuid. This allows us + to query all the transactions related to a particular digital asset, + knowing the id. + + Args: + asset_id (str): the id for this particular metadata. + + Returns: + A list of transactions ids related to the asset. If no transaction + exists for that asset it returns an empty list ``[]`` + """ + + raise NotImplementedError + + +@singledispatch +def get_asset_by_id(conneciton, asset_id): + """Returns the asset associated with an asset_id. + + Args: + asset_id (str): The asset id. + + Returns: + Returns a rethinkdb cursor. + """ + + raise NotImplementedError + + +@singledispatch +def get_spent(connection, transaction_id, condition_id): + """Check if a `txid` was already used as an input. + + A transaction can be used as an input for another transaction. Bigchain + needs to make sure that a given `txid` is only used once. + + Args: + transaction_id (str): The id of the transaction. + condition_id (int): The index of the condition in the respective + transaction. + + Returns: + The transaction that used the `txid` as an input else `None` + """ + + raise NotImplementedError + + +@singledispatch +def get_owned_ids(connection, owner): + """Retrieve a list of `txids` that can we used has inputs. + + Args: + owner (str): base58 encoded public key. + + Returns: + A cursor for the matching transactions. + """ + + raise NotImplementedError + + +@singledispatch +def get_votes_by_block_id(connection, block_id): + """Get all the votes casted for a specific block. + + Args: + block_id (str): the block id to use. + + Returns: + A cursor for the matching votes. + """ + + raise NotImplementedError + + +@singledispatch +def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey): + """Get all the votes casted for a specific block by a specific voter. + + Args: + block_id (str): the block id to use. + node_pubkey (str): base58 encoded public key + + Returns: + A cursor for the matching votes. + """ + + raise NotImplementedError + + +@singledispatch +def write_block(connection, block): + """Write a block to the bigchain table. + + Args: + block (dict): the block to write. + + Returns: + The database response. + """ + + raise NotImplementedError + + +@singledispatch +def get_block(connection, block_id): + """Get a block from the bigchain table. + + Args: + block_id (str): block id of the block to get + + Returns: + block (dict): the block or `None` + """ + + raise NotImplementedError + + +@singledispatch +def has_transaction(connection, transaction_id): + """Check if a transaction exists in the bigchain table. + + Args: + transaction_id (str): the id of the transaction to check. + + Returns: + ``True`` if the transaction exists, ``False`` otherwise. + """ + + raise NotImplementedError + + +@singledispatch +def count_blocks(connection): + """Count the number of blocks in the bigchain table. + + Returns: + The number of blocks. + """ + + raise NotImplementedError + + +@singledispatch +def count_backlog(connection): + """Count the number of transactions in the backlog table. + + Returns: + The number of transactions in the backlog. + """ + + raise NotImplementedError + + +@singledispatch +def write_vote(connection, vote): + """Write a vote to the votes table. + + Args: + vote (dict): the vote to write. + + Returns: + The database response. + """ + + raise NotImplementedError + + +@singledispatch +def get_genesis_block(connection): + """Get the genesis block. + + Returns: + The genesis block + """ + + raise NotImplementedError + + +@singledispatch +def get_last_voted_block(connection, node_pubkey): + """Get the last voted block for a specific node. + + Args: + node_pubkey (str): base58 encoded public key. + + Returns: + The last block the node has voted on. If the node didn't cast + any vote then the genesis block is returned. + """ + + raise NotImplementedError + + +@singledispatch +def get_unvoted_blocks(connection, node_pubkey): + """Return all the blocks that have not been voted by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unvoted blocks + """ + + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/__init__.py b/bigchaindb/backend/rethinkdb/__init__.py new file mode 100644 index 00000000..42b79735 --- /dev/null +++ b/bigchaindb/backend/rethinkdb/__init__.py @@ -0,0 +1,4 @@ +"""RethinkDB backend components ...""" + +# Register the single dispatched modules on import. +from bigchaindb.backend.rethinkdb import changefeed, schema, query # noqa diff --git a/bigchaindb/db/backends/__init__.py b/bigchaindb/backend/rethinkdb/changefeed.py similarity index 100% rename from bigchaindb/db/backends/__init__.py rename to bigchaindb/backend/rethinkdb/changefeed.py diff --git a/bigchaindb/backend/rethinkdb/connection.py b/bigchaindb/backend/rethinkdb/connection.py new file mode 100644 index 00000000..03c2508c --- /dev/null +++ b/bigchaindb/backend/rethinkdb/connection.py @@ -0,0 +1,79 @@ +import time +import logging + +import rethinkdb as r + +from bigchaindb.backend.connection import Connection + +logger = logging.getLogger(__name__) + + +class RethinkDBConnection(Connection): + """ + This class is a proxy to run queries against the database, it is: + + - lazy, since it creates a connection only when needed + - resilient, because before raising exceptions it tries + more times to run the query or open a connection. + """ + + def __init__(self, host, port, dbname, max_tries=3): + """Create a new Connection instance. + + Args: + host (str): the host to connect to. + port (int): the port to connect to. + dbname (str): the name of the database to use. + max_tries (int, optional): how many tries before giving up. + Defaults to 3. + """ + + self.host = host + self.port = port + self.dbname = dbname + self.max_tries = max_tries + self.conn = None + + def run(self, query): + """Run a RethinkDB query. + + Args: + query: the RethinkDB query. + + Raises: + :exc:`rethinkdb.ReqlDriverError`: After + :attr:`~.RethinkDBConnection.max_tries`. + """ + + if self.conn is None: + self._connect() + + for i in range(self.max_tries): + try: + return query.run(self.conn) + except r.ReqlDriverError: + if i + 1 == self.max_tries: + raise + self._connect() + + def _connect(self): + """Set a connection to RethinkDB. + + The connection is available via :attr:`~.RethinkDBConnection.conn`. + + Raises: + :exc:`rethinkdb.ReqlDriverError`: After + :attr:`~.RethinkDBConnection.max_tries`. + """ + + for i in range(1, self.max_tries + 1): + logging.debug('Connecting to database %s:%s/%s. (Attempt %s/%s)', + self.host, self.port, self.dbname, i, self.max_tries) + try: + self.conn = r.connect(host=self.host, port=self.port, db=self.dbname) + except r.ReqlDriverError: + if i == self.max_tries: + raise + wait_time = 2**i + logging.debug('Error connecting to database, waiting %ss', wait_time) + time.sleep(wait_time) diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py new file mode 100644 index 00000000..dda6aaf9 --- /dev/null +++ b/bigchaindb/backend/rethinkdb/query.py @@ -0,0 +1,248 @@ +"""Query implementation for RethinkDB""" + +from time import time + +import rethinkdb as r + +from bigchaindb import backend, util +from bigchaindb.common import exceptions +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + +READ_MODE = 'majority' +WRITE_DURABILITY = 'hard' + +register_query = module_dispatch_registrar(backend.query) + + +@register_query(RethinkDBConnection) +def write_transaction(connection, signed_transaction): + return connection.run( + r.table('backlog') + .insert(signed_transaction, durability=WRITE_DURABILITY)) + + +@register_query(RethinkDBConnection) +def update_transaction(connection, transaction_id, doc): + return connection.run( + r.table('backlog') + .get(transaction_id) + .update(doc)) + + +@register_query(RethinkDBConnection) +def delete_transaction(connection, *transaction_id): + return connection.run( + r.table('backlog') + .get_all(*transaction_id) + .delete(durability=WRITE_DURABILITY)) + + +@register_query(RethinkDBConnection) +def get_stale_transactions(connection, reassign_delay): + return connection.run( + r.table('backlog') + .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) + + +@register_query(RethinkDBConnection) +def get_transaction_from_block(connection, transaction_id, block_id): + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get(block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == transaction_id))[0] + + +@register_query(RethinkDBConnection) +def get_transaction_from_backlog(connection, transaction_id): + return connection.run( + r.table('backlog') + .get(transaction_id) + .without('assignee', 'assignment_timestamp') + .default(None)) + + +@register_query(RethinkDBConnection) +def get_blocks_status_from_transaction(connection, transaction_id): + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(transaction_id, index='transaction_id') + .pluck('votes', 'id', {'block': ['voters']})) + + +@register_query(RethinkDBConnection) +def get_txids_by_asset_id(connection, asset_id): + # here we only want to return the transaction ids since later on when + # we are going to retrieve the transaction with status validation + return connection.run( + r.table('bigchain') + .get_all(asset_id, index='asset_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['asset']['id'] == asset_id) + .get_field('id')) + + +@register_query(RethinkDBConnection) +def get_asset_by_id(connection, asset_id): + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(asset_id, index='asset_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['asset']['id'] == asset_id) + .filter(lambda transaction: transaction['operation'] == 'CREATE') + .pluck('asset')) + + +@register_query(RethinkDBConnection) +def get_spent(connection, transaction_id, condition_id): + # TODO: use index! + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['fulfillments'].contains( + lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) + + +@register_query(RethinkDBConnection) +def get_owned_ids(connection, owner): + # TODO: use index! + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['conditions'].contains( + lambda c: c['owners_after'].contains(owner)))) + + +@register_query(RethinkDBConnection) +def get_votes_by_block_id(connection, block_id): + return connection.run( + r.table('votes', read_mode=READ_MODE) + .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter') + .without('id')) + + +@register_query(RethinkDBConnection) +def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey): + return connection.run( + r.table('votes') + .get_all([block_id, node_pubkey], index='block_and_voter') + .without('id')) + + +@register_query(RethinkDBConnection) +def write_block(connection, block): + return connection.run( + r.table('bigchain') + .insert(r.json(block), durability=WRITE_DURABILITY)) + + +@register_query(RethinkDBConnection) +def get_block(connection, block_id): + return connection.run(r.table('bigchain').get(block_id)) + + +@register_query(RethinkDBConnection) +def has_transaction(connection, transaction_id): + return bool(connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(transaction_id, index='transaction_id').count())) + + +@register_query(RethinkDBConnection) +def count_blocks(connection): + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .count()) + + +@register_query(RethinkDBConnection) +def count_backlog(connection): + return connection.run( + r.table('backlog', read_mode=READ_MODE) + .count()) + + +@register_query(RethinkDBConnection) +def write_vote(connection, vote): + return connection.run( + r.table('votes') + .insert(vote)) + + +@register_query(RethinkDBConnection) +def get_genesis_block(connection): + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .filter(util.is_genesis_block) + .nth(0)) + + +@register_query(RethinkDBConnection) +def get_last_voted_block(connection, node_pubkey): + try: + # get the latest value for the vote timestamp (over all votes) + max_timestamp = connection.run( + r.table('votes', read_mode=READ_MODE) + .filter(r.row['node_pubkey'] == node_pubkey) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] + + last_voted = list(connection.run( + r.table('votes', read_mode=READ_MODE) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == node_pubkey))) + + except r.ReqlNonExistenceError: + # return last vote if last vote exists else return Genesis block + return get_genesis_block(connection) + + # Now the fun starts. Since the resolution of timestamp is a second, + # we might have more than one vote per timestamp. If this is the case + # then we need to rebuild the chain for the blocks that have been retrieved + # to get the last one. + + # Given a block_id, mapping returns the id of the block pointing at it. + mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] + for v in last_voted} + + # Since we follow the chain backwards, we can start from a random + # point of the chain and "move up" from it. + last_block_id = list(mapping.values())[0] + + # We must be sure to break the infinite loop. This happens when: + # - the block we are currenty iterating is the one we are looking for. + # This will trigger a KeyError, breaking the loop + # - we are visiting again a node we already explored, hence there is + # a loop. This might happen if a vote points both `previous_block` + # and `voting_for_block` to the same `block_id` + explored = set() + + while True: + try: + if last_block_id in explored: + raise exceptions.CyclicBlockchainError() + explored.add(last_block_id) + last_block_id = mapping[last_block_id] + except KeyError: + break + + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get(last_block_id)) + + +@register_query(RethinkDBConnection) +def get_unvoted_blocks(connection, node_pubkey): + unvoted = connection.run( + r.table('bigchain', read_mode=READ_MODE) + .filter(lambda block: r.table('votes', read_mode=READ_MODE) + .get_all([block['id'], node_pubkey], index='block_and_voter') + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) + + # FIXME: I (@vrde) don't like this solution. Filtering should be done at a + # database level. Solving issue #444 can help untangling the situation + unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) + return unvoted_blocks diff --git a/bigchaindb/backend/rethinkdb/schema.py b/bigchaindb/backend/rethinkdb/schema.py new file mode 100644 index 00000000..49a1a8e8 --- /dev/null +++ b/bigchaindb/backend/rethinkdb/schema.py @@ -0,0 +1,107 @@ +"""Utils to initialize and drop the database.""" + +import logging + +import rethinkdb as r + +from bigchaindb import backend +from bigchaindb.common import exceptions +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + +logger = logging.getLogger(__name__) +register_schema = module_dispatch_registrar(backend.schema) + + +@register_schema(RethinkDBConnection) +def create_database(connection, dbname): + if connection.run(r.db_list().contains(dbname)): + raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(dbname)) + + logger.info('Create database `%s`.', dbname) + connection.run(r.db_create(dbname)) + + +@register_schema(RethinkDBConnection) +def create_tables(connection, dbname): + for table_name in ['bigchain', 'backlog', 'votes']: + logger.info('Create `%s` table.', table_name) + connection.run(r.db(dbname).table_create(table_name)) + + +@register_schema(RethinkDBConnection) +def create_indexes(connection, dbname): + create_bigchain_secondary_index(connection, dbname) + create_backlog_secondary_index(connection, dbname) + create_votes_secondary_index(connection, dbname) + + +@register_schema(RethinkDBConnection) +def drop_database(connection, dbname): + try: + logger.info('Drop database `%s`', dbname) + connection.run(r.db_drop(dbname)) + logger.info('Done.') + except r.ReqlOpFailedError: + raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(dbname)) + + +def create_bigchain_secondary_index(connection, dbname): + logger.info('Create `bigchain` secondary index.') + + # to order blocks by timestamp + connection.run( + r.db(dbname) + .table('bigchain') + .index_create('block_timestamp', r.row['block']['timestamp'])) + + # to query the bigchain for a transaction id + connection.run( + r.db(dbname) + .table('bigchain') + .index_create('transaction_id', r.row['block']['transactions']['id'], multi=True)) + + # secondary index for asset uuid + connection.run( + r.db(dbname) + .table('bigchain') + .index_create('asset_id', r.row['block']['transactions']['asset']['id'], multi=True)) + + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(dbname) + .table('bigchain') + .index_wait()) + + +def create_backlog_secondary_index(connection, dbname): + logger.info('Create `backlog` secondary index.') + + # compound index to read transactions from the backlog per assignee + connection.run( + r.db(dbname) + .table('backlog') + .index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']])) + + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(dbname) + .table('backlog') + .index_wait()) + + +def create_votes_secondary_index(connection, dbname): + logger.info('Create `votes` secondary index.') + + # compound index to order votes by block id and node + connection.run( + r.db(dbname) + .table('votes') + .index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']])) + + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(dbname) + .table('votes') + .index_wait()) diff --git a/bigchaindb/backend/schema.py b/bigchaindb/backend/schema.py new file mode 100644 index 00000000..b3ae8fab --- /dev/null +++ b/bigchaindb/backend/schema.py @@ -0,0 +1,85 @@ +"""Schema-providing interfaces for backend databases""" + +from functools import singledispatch + +import bigchaindb +from bigchaindb.backend.connection import connect + + +@singledispatch +def create_database(connection, dbname): + """Create database to be used by BigchainDB. + + Args: + dbname (str): the name of the database to create. + + Raises: + :exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already + exists as a database. + """ + + raise NotImplementedError + + +@singledispatch +def create_tables(connection, dbname): + """Create the tables to be used by BigchainDB. + + Args: + dbname (str): the name of the database to create tables for. + """ + + raise NotImplementedError + + +@singledispatch +def create_indexes(connection, dbname): + """Create the indexes to be used by BigchainDB. + + Args: + dbname (str): the name of the database to create indexes for. + """ + + raise NotImplementedError + + +@singledispatch +def drop_database(connection, dbname): + """Drop the database used by BigchainDB. + + Args: + dbname (str): the name of the database to drop. + + Raises: + :exc:`~DatabaseDoesNotExist`: If the given :attr:`dbname` does not + exist as a database. + """ + + raise NotImplementedError + + +def init_database(connection=None, dbname=None): + """Initialize the configured backend for use with BigchainDB. + + Creates a database with :attr:`dbname` with any required tables + and supporting indexes. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): an + existing connection to use to initialize the database. + Creates one if not given. + dbname (str): the name of the database to create. + Defaults to the database name given in the BigchainDB + configuration. + + Raises: + :exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already + exists as a database. + """ + + connection = connection or connect() + dbname = dbname or bigchaindb.config['database']['name'] + + create_database(connection, dbname) + create_tables(connection, dbname) + create_indexes(connection, dbname) diff --git a/bigchaindb/backend/utils.py b/bigchaindb/backend/utils.py new file mode 100644 index 00000000..23b3e2d9 --- /dev/null +++ b/bigchaindb/backend/utils.py @@ -0,0 +1,20 @@ +class ModuleDispatchRegistrationError(Exception): + """Raised when there is a problem registering dispatched functions for a + module""" + + +def module_dispatch_registrar(module): + def dispatch_wrapper(obj_type): + def wrapper(func): + func_name = func.__name__ + try: + dispatch_registrar = getattr(module, func_name) + return dispatch_registrar.register(obj_type)(func) + except AttributeError as ex: + raise ModuleDispatchRegistrationError( + ("`{module}` does not contain a single-dispatchable " + "function named `{func}`. The module being registered " + "was not implemented correctly!").format( + func=func_name, module=module.__name__)) from ex + return wrapper + return dispatch_wrapper diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 217aeeb1..1f1a0dcc 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -22,7 +22,8 @@ import bigchaindb import bigchaindb.config_utils from bigchaindb.models import Transaction from bigchaindb.util import ProcessGroup -from bigchaindb import db +from bigchaindb import backend +from bigchaindb.backend import schema from bigchaindb.commands import utils from bigchaindb import processes @@ -133,6 +134,17 @@ def run_export_my_pubkey(args): # exits with exit code 1 (signals tha an error happened) +def _run_init(): + # Try to access the keypair, throws an exception if it does not exist + b = bigchaindb.Bigchain() + + schema.init_database(connection=b.connection) + + logger.info('Create genesis block.') + b.create_genesis_block() + logger.info('Done, have fun!') + + def run_init(args): """Initialize the database""" bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) @@ -140,7 +152,7 @@ def run_init(args): # 1. prompt the user to inquire whether they wish to drop the db # 2. force the init, (e.g., via -f flag) try: - db.init() + _run_init() except DatabaseAlreadyExists: print('The database already exists.', file=sys.stderr) print('If you wish to re-initialize it, first drop it.', file=sys.stderr) @@ -149,7 +161,16 @@ def run_init(args): def run_drop(args): """Drop the database""" bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) - db.drop(assume_yes=args.yes) + dbname = bigchaindb.config['database']['name'] + + if not args.yes: + response = input('Do you want to drop `{}` database? [y/n]: '.format(dbname)) + if response != 'y': + return + + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + schema.drop_database(conn, dbname) def run_start(args): @@ -176,7 +197,7 @@ def run_start(args): logger.info('RethinkDB started with PID %s' % proc.pid) try: - db.init() + _run_init() except DatabaseAlreadyExists: pass except KeypairNotFoundException: @@ -222,23 +243,25 @@ def run_load(args): def run_set_shards(args): + conn = backend.connect() for table in ['bigchain', 'backlog', 'votes']: # See https://www.rethinkdb.com/api/python/config/ - table_config = r.table(table).config().run(db.get_conn()) + table_config = conn.run(r.table(table).config()) num_replicas = len(table_config['shards'][0]['replicas']) try: - r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas).run(db.get_conn()) + conn.run(r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas)) except r.ReqlOpFailedError as e: logger.warn(e) def run_set_replicas(args): + conn = backend.connect() for table in ['bigchain', 'backlog', 'votes']: # See https://www.rethinkdb.com/api/python/config/ - table_config = r.table(table).config().run(db.get_conn()) + table_config = conn.run(r.table(table).config()) num_shards = len(table_config['shards']) try: - r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas).run(db.get_conn()) + conn.run(r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas)) except r.ReqlOpFailedError as e: logger.warn(e) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index 573ba785..510eb2f6 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -10,7 +10,7 @@ import subprocess import rethinkdb as r import bigchaindb -from bigchaindb import db +from bigchaindb import backend from bigchaindb.version import __version__ @@ -39,11 +39,11 @@ def start_rethinkdb(): # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. try: - conn = db.get_conn() + conn = backend.connect() # Before checking if the db is ready, we need to query # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) + if conn.run(r.db_list().contains(dbname)): + conn.run(r.db(dbname).wait()) except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: raise StartupError('Error waiting for the database `{}` ' 'to be ready'.format(dbname)) from exc diff --git a/bigchaindb/core.py b/bigchaindb/core.py index b59f4ebd..f0e1b89c 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -10,8 +10,7 @@ from bigchaindb.common.transaction import TransactionLink, Asset import bigchaindb -from bigchaindb.db.utils import Connection, get_backend -from bigchaindb import config_utils, util +from bigchaindb import backend, config_utils, util from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Block, Transaction @@ -31,9 +30,7 @@ class Bigchain(object): # return if transaction is in backlog TX_IN_BACKLOG = 'backlog' - def __init__(self, host=None, port=None, dbname=None, backend=None, - public_key=None, private_key=None, keyring=[], - backlog_reassign_delay=None): + def __init__(self, public_key=None, private_key=None, keyring=[], connection=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -46,35 +43,25 @@ class Bigchain(object): its default value (defined in bigchaindb.__init__). Args: - host (str): hostname where RethinkDB is running. - port (int): port in which RethinkDB is running (usually 28015). - dbname (str): the name of the database to connect to (usually bigchain). - backend (:class:`~bigchaindb.db.backends.rethinkdb.RehinkDBBackend`): - the database backend to use. public_key (str): the base58 encoded public key for the ED25519 curve. private_key (str): the base58 encoded private key for the ED25519 curve. keyring (list[str]): list of base58 encoded public keys of the federation nodes. + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. """ config_utils.autoconfigure() - self.host = host or bigchaindb.config['database']['host'] - self.port = port or bigchaindb.config['database']['port'] - self.dbname = dbname or bigchaindb.config['database']['name'] - self.backend = backend or get_backend(host, port, dbname) + self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = BaseConsensusRules - # change RethinkDB read mode to majority. This ensures consistency in query results - self.read_mode = 'majority' - + self.connection = connection if connection else backend.connect(**bigchaindb.config['database']) if not self.me or not self.me_private: raise exceptions.KeypairNotFoundException() - self.connection = Connection(host=self.host, port=self.port, db=self.dbname) - - def write_transaction(self, signed_transaction, durability='soft'): + def write_transaction(self, signed_transaction): """Write the transaction to bigchain. When first writing a transaction to the bigchain the transaction will be kept in a backlog until @@ -100,7 +87,7 @@ class Bigchain(object): signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog - return self.backend.write_transaction(signed_transaction) + return backend.query.write_transaction(self.connection, signed_transaction) def reassign_transaction(self, transaction): """Assign a transaction to a new node @@ -126,8 +113,8 @@ class Bigchain(object): # There is no other node to assign to new_assignee = self.me - return self.backend.update_transaction( - transaction['id'], + return backend.query.update_transaction( + self.connection, transaction['id'], {'assignee': new_assignee, 'assignment_timestamp': time()}) def delete_transaction(self, *transaction_id): @@ -140,7 +127,7 @@ class Bigchain(object): The database response. """ - return self.backend.delete_transaction(*transaction_id) + return backend.query.delete_transaction(self.connection, *transaction_id) def get_stale_transactions(self): """Get a cursor of stale transactions. @@ -149,7 +136,7 @@ class Bigchain(object): backlog after some amount of time specified in the configuration """ - return self.backend.get_stale_transactions(self.backlog_reassign_delay) + return backend.query.get_stale_transactions(self.connection, self.backlog_reassign_delay) def validate_transaction(self, transaction): """Validate a transaction. @@ -200,7 +187,7 @@ class Bigchain(object): include_status (bool): also return the status of the block the return value is then a tuple: (block, status) """ - block = self.backend.get_block(block_id) + block = backend.query.get_block(self.connection, block_id) status = None if include_status: @@ -260,10 +247,10 @@ class Bigchain(object): break # Query the transaction in the target block and return - response = self.backend.get_transaction_from_block(txid, target_block_id) + response = backend.query.get_transaction_from_block(self.connection, txid, target_block_id) if check_backlog: - response = self.backend.get_transaction_from_backlog(txid) + response = backend.query.get_transaction_from_backlog(self.connection, txid) if response: tx_status = self.TX_IN_BACKLOG @@ -304,7 +291,7 @@ class Bigchain(object): """ # First, get information on all blocks which contain this transaction - blocks = self.backend.get_blocks_status_from_transaction(txid) + blocks = backend.query.get_blocks_status_from_transaction(self.connection, txid) if blocks: # Determine the election status of each block validity = { @@ -346,7 +333,7 @@ class Bigchain(object): If no transaction exists for that asset it returns an empty list `[]` """ - txids = self.backend.get_txids_by_asset_id(asset_id) + txids = backend.query.get_txids_by_asset_id(self.connection, asset_id) transactions = [] for txid in txids: tx = self.get_transaction(txid) @@ -364,7 +351,7 @@ class Bigchain(object): :class:`~bigchaindb.common.transaction.Asset` if the asset exists else None. """ - cursor = self.backend.get_asset_by_id(asset_id) + cursor = backend.query.get_asset_by_id(self.connection, asset_id) cursor = list(cursor) if cursor: return Asset.from_dict(cursor[0]['asset']) @@ -385,7 +372,7 @@ class Bigchain(object): """ # checks if an input was already spent # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} - transactions = list(self.backend.get_spent(txid, cid)) + transactions = list(backend.query.get_spent(self.connection, txid, cid)) # a transaction_id should have been spent at most one time if transactions: @@ -422,7 +409,7 @@ class Bigchain(object): """ # get all transactions in which owner is in the `owners_after` list - response = self.backend.get_owned_ids(owner) + response = backend.query.get_owned_ids(self.connection, owner) owned = [] for tx in response: @@ -507,7 +494,7 @@ class Bigchain(object): but the vote is invalid. """ - votes = list(self.backend.get_votes_by_block_id_and_voter(block_id, self.me)) + votes = list(backend.query.get_votes_by_block_id_and_voter(self.connection, block_id, self.me)) if len(votes) > 1: raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' @@ -522,17 +509,17 @@ class Bigchain(object): return has_previous_vote - def write_block(self, block, durability='soft'): + def write_block(self, block): """Write a block to bigchain. Args: block (Block): block to write to bigchain. """ - return self.backend.write_block(block.to_str(), durability=durability) + return backend.query.write_block(self.connection, block.to_str()) def transaction_exists(self, transaction_id): - return self.backend.has_transaction(transaction_id) + return backend.query.has_transaction(self.connection, transaction_id) def prepare_genesis_block(self): """Prepare a genesis block.""" @@ -561,13 +548,13 @@ class Bigchain(object): # 2. create the block with one transaction # 3. write the block to the bigchain - blocks_count = self.backend.count_blocks() + blocks_count = backend.query.count_blocks(self.connection) if blocks_count: raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') block = self.prepare_genesis_block() - self.write_block(block, durability='hard') + self.write_block(block) return block @@ -606,12 +593,12 @@ class Bigchain(object): def write_vote(self, vote): """Write the vote to the database.""" - return self.backend.write_vote(vote) + return backend.query.write_vote(self.connection, vote) def get_last_voted_block(self): """Returns the last block that this node voted on.""" - return Block.from_dict(self.backend.get_last_voted_block(self.me)) + return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me)) def get_unvoted_blocks(self): """Return all the blocks that have not been voted on by this node. @@ -621,12 +608,12 @@ class Bigchain(object): """ # XXX: should this return instaces of Block? - return self.backend.get_unvoted_blocks(self.me) + return backend.query.get_unvoted_blocks(self.connection, self.me) def block_election_status(self, block_id, voters): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - votes = list(self.backend.get_votes_by_block_id(block_id)) + votes = list(backend.query.get_votes_by_block_id(self.connection, block_id)) n_voters = len(voters) voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes]) diff --git a/bigchaindb/db/__init__.py b/bigchaindb/db/__init__.py deleted file mode 100644 index 28ebfc3a..00000000 --- a/bigchaindb/db/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# TODO can we use explicit imports? -from bigchaindb.db.utils import * # noqa: F401,F403 diff --git a/bigchaindb/db/backends/rethinkdb.py b/bigchaindb/db/backends/rethinkdb.py deleted file mode 100644 index 55535c65..00000000 --- a/bigchaindb/db/backends/rethinkdb.py +++ /dev/null @@ -1,414 +0,0 @@ -"""Backend implementation for RethinkDB. - -This module contains all the methods to store and retrieve data from RethinkDB. -""" - -from time import time - -import rethinkdb as r - -from bigchaindb import util -from bigchaindb.db.utils import Connection -from bigchaindb.common import exceptions - - -class RethinkDBBackend: - - def __init__(self, host=None, port=None, db=None): - """Initialize a new RethinkDB Backend instance. - - Args: - host (str): the host to connect to. - port (int): the port to connect to. - db (str): the name of the database to use. - """ - - self.read_mode = 'majority' - self.durability = 'soft' - self.connection = Connection(host=host, port=port, db=db) - - def write_transaction(self, signed_transaction): - """Write a transaction to the backlog table. - - Args: - signed_transaction (dict): a signed transaction. - - Returns: - The result of the operation. - """ - - return self.connection.run( - r.table('backlog') - .insert(signed_transaction, durability=self.durability)) - - def update_transaction(self, transaction_id, doc): - """Update a transaction in the backlog table. - - Args: - transaction_id (str): the id of the transaction. - doc (dict): the values to update. - - Returns: - The result of the operation. - """ - - return self.connection.run( - r.table('backlog') - .get(transaction_id) - .update(doc)) - - def delete_transaction(self, *transaction_id): - """Delete a transaction from the backlog. - - Args: - *transaction_id (str): the transaction(s) to delete - - Returns: - The database response. - """ - - return self.connection.run( - r.table('backlog') - .get_all(*transaction_id) - .delete(durability='hard')) - - def get_stale_transactions(self, reassign_delay): - """Get a cursor of stale transactions. - - Transactions are considered stale if they have been assigned a node, - but are still in the backlog after some amount of time specified in the - configuration. - - Args: - reassign_delay (int): threshold (in seconds) to mark a transaction stale. - - Returns: - A cursor of transactions. - """ - - return self.connection.run( - r.table('backlog') - .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) - - def get_transaction_from_block(self, transaction_id, block_id): - """Get a transaction from a specific block. - - Args: - transaction_id (str): the id of the transaction. - block_id (str): the id of the block. - - Returns: - The matching transaction. - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(block_id) - .get_field('block') - .get_field('transactions') - .filter(lambda tx: tx['id'] == transaction_id))[0] - - def get_transaction_from_backlog(self, transaction_id): - """Get a transaction from backlog. - - Args: - transaction_id (str): the id of the transaction. - - Returns: - The matching transaction. - """ - return self.connection.run( - r.table('backlog') - .get(transaction_id) - .without('assignee', 'assignment_timestamp') - .default(None)) - - def get_blocks_status_from_transaction(self, transaction_id): - """Retrieve block election information given a secondary index and value - - Args: - value: a value to search (e.g. transaction id string, payload hash string) - index (str): name of a secondary index, e.g. 'transaction_id' - - Returns: - :obj:`list` of :obj:`dict`: A list of blocks with with only election information - """ - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(transaction_id, index='transaction_id') - .pluck('votes', 'id', {'block': ['voters']})) - - def get_txids_by_asset_id(self, asset_id): - """Retrieves transactions ids related to a particular asset. - - A digital asset in bigchaindb is identified by an uuid. This allows us - to query all the transactions related to a particular digital asset, - knowing the id. - - Args: - asset_id (str): the id for this particular metadata. - - Returns: - A list of transactions ids related to the asset. If no transaction - exists for that asset it returns an empty list `[]` - """ - - # here we only want to return the transaction ids since later on when - # we are going to retrieve the transaction with status validation - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(asset_id, index='asset_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: transaction['asset']['id'] == asset_id) - .get_field('id')) - - def get_asset_by_id(self, asset_id): - """Returns the asset associated with an asset_id. - - Args: - asset_id (str): The asset id. - - Returns: - Returns a rethinkdb cursor. - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(asset_id, index='asset_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: - transaction['asset']['id'] == asset_id) - .filter(lambda transaction: - transaction['operation'] == 'CREATE') - .pluck('asset')) - - def get_spent(self, transaction_id, condition_id): - """Check if a `txid` was already used as an input. - - A transaction can be used as an input for another transaction. Bigchain needs to make sure that a - given `txid` is only used once. - - Args: - transaction_id (str): The id of the transaction. - condition_id (int): The index of the condition in the respective transaction. - - Returns: - The transaction that used the `txid` as an input else `None` - """ - - # TODO: use index! - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda transaction: transaction['fulfillments'].contains( - lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) - - def get_owned_ids(self, owner): - """Retrieve a list of `txids` that can we used has inputs. - - Args: - owner (str): base58 encoded public key. - - Returns: - A cursor for the matching transactions. - """ - - # TODO: use index! - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda tx: tx['conditions'].contains( - lambda c: c['owners_after'].contains(owner)))) - - def get_votes_by_block_id(self, block_id): - """Get all the votes casted for a specific block. - - Args: - block_id (str): the block id to use. - - Returns: - A cursor for the matching votes. - """ - return self.connection.run( - r.table('votes', read_mode=self.read_mode) - .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter') - .without('id')) - - def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): - """Get all the votes casted for a specific block by a specific voter. - - Args: - block_id (str): the block id to use. - node_pubkey (str): base58 encoded public key - - Returns: - A cursor for the matching votes. - """ - return self.connection.run( - r.table('votes', read_mode=self.read_mode) - .get_all([block_id, node_pubkey], index='block_and_voter') - .without('id')) - - def write_block(self, block, durability='soft'): - """Write a block to the bigchain table. - - Args: - block (dict): the block to write. - - Returns: - The database response. - """ - return self.connection.run( - r.table('bigchain') - .insert(r.json(block), durability=durability)) - - def get_block(self, block_id): - """Get a block from the bigchain table - - Args: - block_id (str): block id of the block to get - - Returns: - block (dict): the block or `None` - """ - return self.connection.run(r.table('bigchain').get(block_id)) - - def has_transaction(self, transaction_id): - """Check if a transaction exists in the bigchain table. - - Args: - transaction_id (str): the id of the transaction to check. - - Returns: - ``True`` if the transaction exists, ``False`` otherwise. - """ - return bool(self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(transaction_id, index='transaction_id').count())) - - def count_blocks(self): - """Count the number of blocks in the bigchain table. - - Returns: - The number of blocks. - """ - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .count()) - - def count_backlog(self): - """Count the number of transactions in the backlog table. - - Returns: - The number of transactions in the backlog. - """ - - return self.connection.run( - r.table('backlog', read_mode=self.read_mode) - .count()) - - def write_vote(self, vote): - """Write a vote to the votes table. - - Args: - vote (dict): the vote to write. - - Returns: - The database response. - """ - return self.connection.run( - r.table('votes') - .insert(vote)) - - def get_genesis_block(self): - """Get the genesis block - - Returns: - The genesis block - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block) - .nth(0)) - - def get_last_voted_block(self, node_pubkey): - """Get the last voted block for a specific node. - - Args: - node_pubkey (str): base58 encoded public key. - - Returns: - The last block the node has voted on. If the node didn't cast - any vote then the genesis block is returned. - """ - try: - # get the latest value for the vote timestamp (over all votes) - max_timestamp = self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['node_pubkey'] == node_pubkey) - .max(r.row['vote']['timestamp']))['vote']['timestamp'] - - last_voted = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['vote']['timestamp'] == max_timestamp) - .filter(r.row['node_pubkey'] == node_pubkey))) - - except r.ReqlNonExistenceError: - # return last vote if last vote exists else return Genesis block - return self.get_genesis_block() - - # Now the fun starts. Since the resolution of timestamp is a second, - # we might have more than one vote per timestamp. If this is the case - # then we need to rebuild the chain for the blocks that have been retrieved - # to get the last one. - - # Given a block_id, mapping returns the id of the block pointing at it. - mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] - for v in last_voted} - - # Since we follow the chain backwards, we can start from a random - # point of the chain and "move up" from it. - last_block_id = list(mapping.values())[0] - - # We must be sure to break the infinite loop. This happens when: - # - the block we are currenty iterating is the one we are looking for. - # This will trigger a KeyError, breaking the loop - # - we are visiting again a node we already explored, hence there is - # a loop. This might happen if a vote points both `previous_block` - # and `voting_for_block` to the same `block_id` - explored = set() - - while True: - try: - if last_block_id in explored: - raise exceptions.CyclicBlockchainError() - explored.add(last_block_id) - last_block_id = mapping[last_block_id] - except KeyError: - break - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(last_block_id)) - - def get_unvoted_blocks(self, node_pubkey): - """Return all the blocks that have not been voted by the specified node. - - Args: - node_pubkey (str): base58 encoded public key - - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ - - unvoted = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(lambda block: r.table('votes', read_mode=self.read_mode) - .get_all([block['id'], node_pubkey], index='block_and_voter') - .is_empty()) - .order_by(r.asc(r.row['block']['timestamp']))) - - # FIXME: I (@vrde) don't like this solution. Filtering should be done at a - # database level. Solving issue #444 can help untangling the situation - unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) - return unvoted_blocks diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py deleted file mode 100644 index 2b3c2d0d..00000000 --- a/bigchaindb/db/utils.py +++ /dev/null @@ -1,197 +0,0 @@ -"""Utils to initialize and drop the database.""" - -import time -import logging - -from bigchaindb.common import exceptions -import rethinkdb as r - -import bigchaindb - - -logger = logging.getLogger(__name__) - - -class Connection: - """This class is a proxy to run queries against the database, - it is: - - lazy, since it creates a connection only when needed - - resilient, because before raising exceptions it tries - more times to run the query or open a connection. - """ - - def __init__(self, host=None, port=None, db=None, max_tries=3): - """Create a new Connection instance. - - Args: - host (str, optional): the host to connect to. - port (int, optional): the port to connect to. - db (str, optional): the database to use. - max_tries (int, optional): how many tries before giving up. - """ - - self.host = host or bigchaindb.config['database']['host'] - self.port = port or bigchaindb.config['database']['port'] - self.db = db or bigchaindb.config['database']['name'] - self.max_tries = max_tries - self.conn = None - - def run(self, query): - """Run a query. - - Args: - query: the RethinkDB query. - """ - - if self.conn is None: - self._connect() - - for i in range(self.max_tries): - try: - return query.run(self.conn) - except r.ReqlDriverError as exc: - if i + 1 == self.max_tries: - raise - else: - self._connect() - - def _connect(self): - for i in range(self.max_tries): - try: - self.conn = r.connect(host=self.host, port=self.port, - db=self.db) - except r.ReqlDriverError as exc: - if i + 1 == self.max_tries: - raise - else: - time.sleep(2**i) - - -def get_backend(host=None, port=None, db=None): - '''Get a backend instance.''' - - from bigchaindb.db.backends import rethinkdb - - # NOTE: this function will be re-implemented when we have real - # multiple backends to support. Right now it returns the RethinkDB one. - return rethinkdb.RethinkDBBackend(host=host or bigchaindb.config['database']['host'], - port=port or bigchaindb.config['database']['port'], - db=db or bigchaindb.config['database']['name']) - - -def get_conn(): - '''Get the connection to the database.''' - - return r.connect(host=bigchaindb.config['database']['host'], - port=bigchaindb.config['database']['port'], - db=bigchaindb.config['database']['name']) - - -def get_database_name(): - return bigchaindb.config['database']['name'] - - -def create_database(conn, dbname): - if r.db_list().contains(dbname).run(conn): - raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(dbname)) - - logger.info('Create database `%s`.', dbname) - r.db_create(dbname).run(conn) - - -def create_table(conn, dbname, table_name): - logger.info('Create `%s` table.', table_name) - # create the table - r.db(dbname).table_create(table_name).run(conn) - - -def create_bigchain_secondary_index(conn, dbname): - logger.info('Create `bigchain` secondary index.') - # to order blocks by timestamp - r.db(dbname).table('bigchain')\ - .index_create('block_timestamp', r.row['block']['timestamp'])\ - .run(conn) - # to query the bigchain for a transaction id - r.db(dbname).table('bigchain')\ - .index_create('transaction_id', - r.row['block']['transactions']['id'], multi=True)\ - .run(conn) - # secondary index for asset uuid - r.db(dbname).table('bigchain')\ - .index_create('asset_id', - r.row['block']['transactions']['asset']['id'], multi=True)\ - .run(conn) - - # wait for rethinkdb to finish creating secondary indexes - r.db(dbname).table('bigchain').index_wait().run(conn) - - -def create_backlog_secondary_index(conn, dbname): - logger.info('Create `backlog` secondary index.') - # compound index to read transactions from the backlog per assignee - r.db(dbname).table('backlog')\ - .index_create('assignee__transaction_timestamp', - [r.row['assignee'], r.row['assignment_timestamp']])\ - .run(conn) - - # wait for rethinkdb to finish creating secondary indexes - r.db(dbname).table('backlog').index_wait().run(conn) - - -def create_votes_secondary_index(conn, dbname): - logger.info('Create `votes` secondary index.') - # compound index to order votes by block id and node - r.db(dbname).table('votes')\ - .index_create('block_and_voter', - [r.row['vote']['voting_for_block'], - r.row['node_pubkey']])\ - .run(conn) - - # wait for rethinkdb to finish creating secondary indexes - r.db(dbname).table('votes').index_wait().run(conn) - - -def init_database(): - conn = get_conn() - dbname = get_database_name() - create_database(conn, dbname) - - table_names = ['bigchain', 'backlog', 'votes'] - for table_name in table_names: - create_table(conn, dbname, table_name) - - create_bigchain_secondary_index(conn, dbname) - create_backlog_secondary_index(conn, dbname) - create_votes_secondary_index(conn, dbname) - - -def init(): - # Try to access the keypair, throws an exception if it does not exist - b = bigchaindb.Bigchain() - - init_database() - - logger.info('Create genesis block.') - b.create_genesis_block() - logger.info('Done, have fun!') - - -def drop(assume_yes=False): - conn = get_conn() - dbname = bigchaindb.config['database']['name'] - if assume_yes: - response = 'y' - - else: - response = input('Do you want to drop `{}` database? [y/n]: '.format(dbname)) - - if response == 'y': - try: - logger.info('Drop database `%s`', dbname) - r.db_drop(dbname).run(conn) - logger.info('Done.') - except r.ReqlOpFailedError: - raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(dbname)) - - else: - logger.info('Drop aborted') diff --git a/docs/server/source/appendices/backend.rst b/docs/server/source/appendices/backend.rst new file mode 100644 index 00000000..2cb035e8 --- /dev/null +++ b/docs/server/source/appendices/backend.rst @@ -0,0 +1,61 @@ +############################################### +:mod:`bigchaindb.backend` -- Backend Interfaces +############################################### + +.. automodule:: bigchaindb.backend + :special-members: __init__ + + +Generic Backend +=============== + +:mod:`bigchaindb.backend.connection` -- Connection +-------------------------------------------------- + +.. automodule:: bigchaindb.backend.connection + +:mod:`bigchaindb.backend.schema` -- Schema +------------------------------------------ +.. automodule:: bigchaindb.backend.schema + +:mod:`bigchaindb.backend.query` -- Query +---------------------------------------- +.. automodule:: bigchaindb.backend.query + +:mod:`bigchaindb.backend.changefeed` -- Changefeed +-------------------------------------------------- +.. automodule:: bigchaindb.backend.changefeed + +:mod:`bigchaindb.backend.utils` +------------------------------- +.. automodule:: bigchaindb.backend.utils + + +:mod:`bigchaindb.backend.rethinkdb` -- RethinkDB Backend +======================================================== + +.. automodule:: bigchaindb.backend.rethinkdb + :special-members: __init__ + +:mod:`bigchaindb.backend.rethinkdb.connection` +---------------------------------------------- + +.. automodule:: bigchaindb.backend.rethinkdb.connection + :special-members: __init__ + +:mod:`bigchaindb.backend.rethinkdb.schema` +------------------------------------------ +.. automodule:: bigchaindb.backend.rethinkdb.schema + +:mod:`bigchaindb.backend.rethinkdb.query` +----------------------------------------- +.. automodule:: bigchaindb.backend.rethinkdb.query + +:mod:`bigchaindb.backend.rethinkdb.changefeed` +---------------------------------------------- +.. automodule:: bigchaindb.backend.rethinkdb.changefeed + + +MongoDB Backend +=============== +Stay tuned! diff --git a/docs/server/source/appendices/index.rst b/docs/server/source/appendices/index.rst index c251db09..41b742b9 100755 --- a/docs/server/source/appendices/index.rst +++ b/docs/server/source/appendices/index.rst @@ -15,6 +15,7 @@ Appendices the-Bigchain-class consensus pipelines + backend aws-setup firewall-notes ntp-notes diff --git a/docs/server/source/conf.py b/docs/server/source/conf.py index f4951235..5550e994 100644 --- a/docs/server/source/conf.py +++ b/docs/server/source/conf.py @@ -45,6 +45,7 @@ extensions = [ 'sphinx.ext.intersphinx', 'sphinx.ext.coverage', 'sphinx.ext.viewcode', + 'sphinx.ext.todo', 'sphinx.ext.napoleon', 'sphinxcontrib.httpdomain', 'sphinx.ext.autosectionlabel', @@ -60,6 +61,8 @@ autodoc_default_flags = [ 'members', ] +todo_include_todos = True + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/tests/assets/test_digital_assets.py b/tests/assets/test_digital_assets.py index 5d8654f2..6c95284a 100644 --- a/tests/assets/test_digital_assets.py +++ b/tests/assets/test_digital_assets.py @@ -91,7 +91,7 @@ def test_get_asset_id_transfer_transaction(b, user_pk, user_sk): tx_transfer_signed = tx_transfer.sign([user_sk]) # create a block block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -130,7 +130,7 @@ def test_get_transactions_by_asset_id(b, user_pk, user_sk): tx_transfer_signed = tx_transfer.sign([user_sk]) # create the block block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -163,7 +163,7 @@ def test_get_transactions_by_asset_id_with_invalid_block(b, user_pk, user_sk): tx_transfer_signed = tx_transfer.sign([user_sk]) # create the block block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block invalid vote = b.vote(block.id, b.get_last_voted_block().id, False) b.write_vote(vote) @@ -187,7 +187,7 @@ def test_get_asset_by_id(b, user_pk, user_sk): tx_transfer_signed = tx_transfer.sign([user_sk]) # create the block block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) diff --git a/tests/assets/test_divisible_assets.py b/tests/assets/test_divisible_assets.py index 13059c7c..01cb99e0 100644 --- a/tests/assets/test_divisible_assets.py +++ b/tests/assets/test_divisible_assets.py @@ -141,7 +141,7 @@ def test_single_in_single_own_single_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -175,7 +175,7 @@ def test_single_in_single_own_multiple_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -211,7 +211,7 @@ def test_single_in_single_own_single_out_multiple_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -252,7 +252,7 @@ def test_single_in_single_own_multiple_out_mix_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -294,7 +294,7 @@ def test_single_in_multiple_own_single_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -333,7 +333,7 @@ def test_multiple_in_single_own_single_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -370,7 +370,7 @@ def test_multiple_in_multiple_own_single_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -415,7 +415,7 @@ def test_muiltiple_in_mix_own_multiple_out_single_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -460,7 +460,7 @@ def test_muiltiple_in_mix_own_multiple_out_mix_own_transfer(b, user_pk, # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -512,7 +512,7 @@ def test_multiple_in_different_transactions(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -528,7 +528,7 @@ def test_multiple_in_different_transactions(b, user_pk, user_sk): # create block block = b.create_block([tx_transfer1_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -569,7 +569,7 @@ def test_amount_error_transfer(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -612,7 +612,7 @@ def test_threshold_same_public_key(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -641,7 +641,7 @@ def test_sum_amount(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -670,7 +670,7 @@ def test_divide(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -703,7 +703,7 @@ def test_non_positive_amounts_on_transfer(b, user_pk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) @@ -729,7 +729,7 @@ def test_non_positive_amounts_on_transfer_validate(b, user_pk, user_sk): # create block block = b.create_block([tx_create_signed]) assert block.validate(b) == block - b.write_block(block, durability='hard') + b.write_block(block) # vote vote = b.vote(block.id, b.get_last_voted_block().id, True) b.write_vote(vote) diff --git a/tests/backend/test_connection.py b/tests/backend/test_connection.py new file mode 100644 index 00000000..bdc336a9 --- /dev/null +++ b/tests/backend/test_connection.py @@ -0,0 +1,33 @@ +import pytest + + +def test_get_connection_returns_the_correct_instance(): + from bigchaindb.backend import connect + from bigchaindb.backend.connection import Connection + from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + config = { + 'backend': 'rethinkdb', + 'host': 'localhost', + 'port': 28015, + 'name': 'test' + } + + conn = connect(**config) + assert isinstance(conn, Connection) + assert isinstance(conn, RethinkDBConnection) + + +def test_get_connection_raises_a_configuration_error(monkeypatch): + from bigchaindb.common.exceptions import ConfigurationError + from bigchaindb.backend import connect + + with pytest.raises(ConfigurationError): + connect('msaccess', 'localhost', '1337', 'mydb') + + with pytest.raises(ConfigurationError): + # We need to force a misconfiguration here + monkeypatch.setattr('bigchaindb.backend.connection.BACKENDS', + {'catsandra': 'bigchaindb.backend.meowmeow.Catsandra'}) + + connect('catsandra', 'localhost', '1337', 'mydb') diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py new file mode 100644 index 00000000..b11d534a --- /dev/null +++ b/tests/backend/test_generics.py @@ -0,0 +1,45 @@ +from pytest import mark, raises + + +@mark.parametrize('schema_func_name,args_qty', ( + ('create_database', 1), + ('create_tables', 1), + ('create_indexes', 1), + ('drop_database', 1), +)) +def test_schema(schema_func_name, args_qty): + from bigchaindb.backend import schema + schema_func = getattr(schema, schema_func_name) + with raises(NotImplementedError): + schema_func(None, *range(args_qty)) + + +@mark.parametrize('query_func_name,args_qty', ( + ('write_transaction', 1), + ('count_blocks', 0), + ('count_backlog', 0), + ('get_genesis_block', 0), + ('delete_transaction', 1), + ('get_stale_transactions', 1), + ('get_blocks_status_from_transaction', 1), + ('get_transaction_from_backlog', 1), + ('get_txids_by_asset_id', 1), + ('get_asset_by_id', 1), + ('get_owned_ids', 1), + ('get_votes_by_block_id', 1), + ('write_block', 1), + ('get_block', 1), + ('has_transaction', 1), + ('write_vote', 1), + ('get_last_voted_block', 1), + ('get_unvoted_blocks', 1), + ('get_spent', 2), + ('get_votes_by_block_id_and_voter', 2), + ('update_transaction', 2), + ('get_transaction_from_block', 2), +)) +def test_query(query_func_name, args_qty): + from bigchaindb.backend import query + query_func = getattr(query, query_func_name) + with raises(NotImplementedError): + query_func(None, *range(args_qty)) diff --git a/tests/backend/test_utils.py b/tests/backend/test_utils.py new file mode 100644 index 00000000..afdfdabe --- /dev/null +++ b/tests/backend/test_utils.py @@ -0,0 +1,72 @@ +from functools import singledispatch +from types import ModuleType + +import pytest + + +@pytest.fixture +def mock_module(): + return ModuleType('mock_module') + + +def test_module_dispatch_registers(mock_module): + from bigchaindb.backend.utils import module_dispatch_registrar + + @singledispatch + def dispatcher(t): + pass + mock_module.dispatched = dispatcher + mock_dispatch = module_dispatch_registrar(mock_module) + + @mock_dispatch(str) + def dispatched(t): + pass + + assert mock_module.dispatched.registry[str] == dispatched + + +def test_module_dispatch_dispatches(mock_module): + from bigchaindb.backend.utils import module_dispatch_registrar + + @singledispatch + def dispatcher(t): + return False + mock_module.dispatched = dispatcher + mock_dispatch = module_dispatch_registrar(mock_module) + + @mock_dispatch(str) + def dispatched(t): + return True + + assert mock_module.dispatched(1) is False # Goes to dispatcher() + assert mock_module.dispatched('1') is True # Goes to dispatched() + + +def test_module_dispatch_errors_on_missing_func(mock_module): + from bigchaindb.backend.utils import ( + module_dispatch_registrar, + ModuleDispatchRegistrationError, + ) + mock_dispatch = module_dispatch_registrar(mock_module) + + with pytest.raises(ModuleDispatchRegistrationError): + @mock_dispatch(str) + def dispatched(): + pass + + +def test_module_dispatch_errors_on_non_dispatchable_func(mock_module): + from bigchaindb.backend.utils import ( + module_dispatch_registrar, + ModuleDispatchRegistrationError, + ) + + def dispatcher(): + pass + mock_module.dispatched = dispatcher + mock_dispatch = module_dispatch_registrar(mock_module) + + with pytest.raises(ModuleDispatchRegistrationError): + @mock_dispatch(str) + def dispatched(): + pass diff --git a/tests/conftest.py b/tests/conftest.py index 66305b6a..013a6e9b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,11 +16,11 @@ DB_NAME = 'bigchain_test_{}'.format(os.getpid()) CONFIG = { 'database': { - 'name': DB_NAME + 'name': DB_NAME, }, 'keypair': { 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', - 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8', } } @@ -29,6 +29,15 @@ USER_PRIVATE_KEY = '8eJ8q9ZQpReWyQT5aFCiwtZ5wDZC4eDnCen88p3tQ6ie' USER_PUBLIC_KEY = 'JEAkEJqLbbgDRAtMm8YAjGp759Aq2qTn9eaEHUj2XePE' +def pytest_addoption(parser): + from bigchaindb.backend import connection + + backends = ', '.join(connection.BACKENDS.keys()) + + parser.addoption('--database-backend', action='store', default='rethinkdb', + help='Defines the backend to use (available: {})'.format(backends)) + + # We need this function to avoid loading an existing # conf file located in the home of the user running # the tests. If it's too aggressive we can change it @@ -49,8 +58,10 @@ def restore_config(request, node_config): @pytest.fixture(scope='module') -def node_config(): - return copy.deepcopy(CONFIG) +def node_config(request): + config = copy.deepcopy(CONFIG) + config['database']['backend'] = request.config.getoption('--database-backend') + return config @pytest.fixture diff --git a/tests/db/conftest.py b/tests/db/conftest.py index b04e0a39..4959e2d2 100644 --- a/tests/db/conftest.py +++ b/tests/db/conftest.py @@ -10,10 +10,11 @@ import pytest import rethinkdb as r from bigchaindb import Bigchain -from bigchaindb.db import get_conn, init_database +from bigchaindb.backend import connect, schema from bigchaindb.common import crypto from bigchaindb.common.exceptions import DatabaseAlreadyExists + USER2_SK, USER2_PK = crypto.generate_key_pair() @@ -27,13 +28,13 @@ def restore_config(request, node_config): def setup_database(request, node_config): print('Initializing test db') db_name = node_config['database']['name'] - conn = get_conn() + conn = connect() - if r.db_list().contains(db_name).run(conn): - r.db_drop(db_name).run(conn) + if conn.run(r.db_list().contains(db_name)): + conn.run(r.db_drop(db_name)) try: - init_database() + schema.init_database() except DatabaseAlreadyExists: print('Database already exists.') @@ -41,9 +42,9 @@ def setup_database(request, node_config): def fin(): print('Deleting `{}` database'.format(db_name)) - get_conn().repl() + conn = connect() try: - r.db_drop(db_name).run() + conn.run(r.db_drop(db_name)) except r.ReqlOpFailedError as e: if e.message != 'Database `{}` does not exist.'.format(db_name): raise @@ -57,11 +58,11 @@ def cleanup_tables(request, node_config): db_name = node_config['database']['name'] def fin(): - get_conn().repl() + conn = connect() try: - r.db(db_name).table('bigchain').delete().run() - r.db(db_name).table('backlog').delete().run() - r.db(db_name).table('votes').delete().run() + conn.run(r.db(db_name).table('bigchain').delete()) + conn.run(r.db(db_name).table('backlog').delete()) + conn.run(r.db(db_name).table('votes').delete()) except r.ReqlOpFailedError as e: if e.message != 'Database `{}` does not exist.'.format(db_name): raise @@ -88,7 +89,7 @@ def inputs(user_pk): for i in range(10) ] block = b.create_block(transactions) - b.write_block(block, durability='hard') + b.write_block(block) # 3. vote the blocks valid, so that the inputs are valid vote = b.vote(block.id, prev_block_id, True) @@ -126,7 +127,7 @@ def inputs_shared(user_pk, user2_pk): for i in range(10) ] block = b.create_block(transactions) - b.write_block(block, durability='hard') + b.write_block(block) # 3. vote the blocks valid, so that the inputs are valid vote = b.vote(block.id, prev_block_id, True) diff --git a/tests/db/rethinkdb/__init__.py b/tests/db/rethinkdb/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/db/rethinkdb/test_schema.py b/tests/db/rethinkdb/test_schema.py new file mode 100644 index 00000000..20e2ad2f --- /dev/null +++ b/tests/db/rethinkdb/test_schema.py @@ -0,0 +1,130 @@ +import pytest +import rethinkdb as r + +import bigchaindb +from bigchaindb import backend +from bigchaindb.backend.rethinkdb import schema +from ..conftest import setup_database as _setup_database + + +# Since we are testing database initialization and database drop, +# we need to use the `setup_database` fixture on a function level +@pytest.fixture(scope='function', autouse=True) +def setup_database(request, node_config): + _setup_database(request, node_config) + + +def test_init_creates_db_tables_and_indexes(): + from bigchaindb.backend.schema import init_database + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures so we need to remove it + conn.run(r.db_drop(dbname)) + + init_database() + + assert conn.run(r.db_list().contains(dbname)) is True + + assert conn.run(r.db(dbname).table_list().contains('backlog', 'bigchain')) is True + + assert conn.run(r.db(dbname).table('bigchain').index_list().contains( + 'block_timestamp')) is True + + assert conn.run(r.db(dbname).table('backlog').index_list().contains( + 'assignee__transaction_timestamp')) is True + + +def test_init_database_fails_if_db_exists(): + from bigchaindb.backend.schema import init_database + from bigchaindb.common import exceptions + + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures + assert conn.run(r.db_list().contains(dbname)) is True + + with pytest.raises(exceptions.DatabaseAlreadyExists): + init_database() + + +def test_create_database(): + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures so we need to remove it + # and recreate it just with one table + conn.run(r.db_drop(dbname)) + schema.create_database(conn, dbname) + assert conn.run(r.db_list().contains(dbname)) is True + + +def test_create_tables(): + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures so we need to remove it + # and recreate it just with one table + conn.run(r.db_drop(dbname)) + schema.create_database(conn, dbname) + schema.create_tables(conn, dbname) + + assert conn.run(r.db(dbname).table_list().contains('bigchain')) is True + assert conn.run(r.db(dbname).table_list().contains('backlog')) is True + assert conn.run(r.db(dbname).table_list().contains('votes')) is True + assert len(conn.run(r.db(dbname).table_list())) == 3 + + +def test_create_secondary_indexes(): + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures so we need to remove it + # and recreate it just with one table + conn.run(r.db_drop(dbname)) + schema.create_database(conn, dbname) + schema.create_tables(conn, dbname) + schema.create_indexes(conn, dbname) + + # Bigchain table + assert conn.run(r.db(dbname).table('bigchain').index_list().contains( + 'block_timestamp')) is True + assert conn.run(r.db(dbname).table('bigchain').index_list().contains( + 'transaction_id')) is True + assert conn.run(r.db(dbname).table('bigchain').index_list().contains( + 'asset_id')) is True + + # Backlog table + assert conn.run(r.db(dbname).table('backlog').index_list().contains( + 'assignee__transaction_timestamp')) is True + + # Votes table + assert conn.run(r.db(dbname).table('votes').index_list().contains( + 'block_and_voter')) is True + + +def test_drop(): + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures + assert conn.run(r.db_list().contains(dbname)) is True + + schema.drop_database(conn, dbname) + assert conn.run(r.db_list().contains(dbname)) is False + + +def test_drop_non_existent_db_raises_an_error(): + from bigchaindb.common import exceptions + + conn = backend.connect() + dbname = bigchaindb.config['database']['name'] + + # The db is set up by fixtures + assert conn.run(r.db_list().contains(dbname)) is True + + schema.drop_database(conn, dbname) + + with pytest.raises(exceptions.DatabaseDoesNotExist): + schema.drop_database(conn, dbname) diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 9ad3732f..747bf412 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -41,7 +41,7 @@ class TestBigchainApi(object): tx = tx.sign([b.me_private]) monkeypatch.setattr('time.time', lambda: 1) block1 = b.create_block([tx]) - b.write_block(block1, durability='hard') + b.write_block(block1) # Manipulate vote to create a cyclic Blockchain vote = b.vote(block1.id, b.get_last_voted_block().id, True) @@ -79,7 +79,7 @@ class TestBigchainApi(object): monkeypatch.setattr('time.time', lambda: 1) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) assert b.has_previous_vote(block.id, block.voters) is False @@ -99,21 +99,21 @@ class TestBigchainApi(object): monkeypatch.setattr('time.time', lambda: 1) block1 = b.create_block([tx]) - b.write_block(block1, durability='hard') + b.write_block(block1) monkeypatch.setattr('time.time', lambda: 2) transfer_tx = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], tx.asset) transfer_tx = transfer_tx.sign([b.me_private]) block2 = b.create_block([transfer_tx]) - b.write_block(block2, durability='hard') + b.write_block(block2) monkeypatch.setattr('time.time', lambda: 3333333333) transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([b.me], 1)], tx.asset) transfer_tx2 = transfer_tx2.sign([b.me_private]) block3 = b.create_block([transfer_tx2]) - b.write_block(block3, durability='hard') + b.write_block(block3) # Vote both block2 and block3 valid to provoke a double spend vote = b.vote(block2.id, b.get_last_voted_block().id, True) @@ -135,11 +135,11 @@ class TestBigchainApi(object): monkeypatch.setattr('time.time', lambda: 1) block1 = b.create_block([tx]) - b.write_block(block1, durability='hard') + b.write_block(block1) monkeypatch.setattr('time.time', lambda: 2222222222) block2 = b.create_block([tx]) - b.write_block(block2, durability='hard') + b.write_block(block2) # Vote both blocks valid (creating a double spend) vote = b.vote(block1.id, b.get_last_voted_block().id, True) @@ -159,13 +159,13 @@ class TestBigchainApi(object): tx1 = Transaction.create([b.me], [([b.me], 1)]) tx1 = tx1.sign([b.me_private]) block1 = b.create_block([tx1]) - b.write_block(block1, durability='hard') + b.write_block(block1) monkeypatch.setattr('time.time', lambda: 2222222222) tx2 = Transaction.create([b.me], [([b.me], 1)]) tx2 = tx2.sign([b.me_private]) block2 = b.create_block([tx2]) - b.write_block(block2, durability='hard') + b.write_block(block2) # vote the first block invalid vote = b.vote(block1.id, b.get_last_voted_block().id, False) @@ -209,7 +209,7 @@ class TestBigchainApi(object): # create block and write it to the bighcain before retrieving the transaction block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) response, status = b.get_transaction(tx.id, include_status=True) # add validity information, which will be returned @@ -229,7 +229,7 @@ class TestBigchainApi(object): # create block block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block invalid vote = b.vote(block.id, b.get_last_voted_block().id, False) @@ -255,7 +255,7 @@ class TestBigchainApi(object): # create block block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block invalid vote = b.vote(block.id, b.get_last_voted_block().id, False) @@ -270,7 +270,8 @@ class TestBigchainApi(object): @pytest.mark.usefixtures('inputs') def test_genesis_block(self, b): - block = b.backend.get_genesis_block() + from bigchaindb.backend import query + block = query.get_genesis_block(b.connection) assert len(block['block']['transactions']) == 1 assert block['block']['transactions'][0]['operation'] == 'GENESIS' @@ -286,8 +287,9 @@ class TestBigchainApi(object): @pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode') def test_get_last_block(self, b): + from bigchaindb.backend import query # get the number of blocks - num_blocks = b.backend.count_blocks() + num_blocks = query.count_blocks(b.connection) # get the last block last_block = b.get_last_block() @@ -305,7 +307,7 @@ class TestBigchainApi(object): def test_get_previous_block(self, b): last_block = b.get_last_block() new_block = b.create_block([]) - b.write_block(new_block, durability='hard') + b.write_block(new_block) prev_block = b.get_previous_block(new_block) @@ -315,7 +317,7 @@ class TestBigchainApi(object): def test_get_previous_block_id(self, b): last_block = b.get_last_block() new_block = b.create_block([]) - b.write_block(new_block, durability='hard') + b.write_block(new_block) prev_block_id = b.get_previous_block_id(new_block) @@ -332,7 +334,7 @@ class TestBigchainApi(object): @pytest.mark.usefixtures('inputs') def test_get_block_by_id(self, b): new_block = dummy_block() - b.write_block(new_block, durability='hard') + b.write_block(new_block) assert b.get_block(new_block.id) == new_block.to_dict() block, status = b.get_block(new_block.id, include_status=True) @@ -340,9 +342,10 @@ class TestBigchainApi(object): def test_get_last_voted_block_returns_genesis_if_no_votes_has_been_casted(self, b): from bigchaindb.models import Block + from bigchaindb.backend import query b.create_genesis_block() - genesis = b.backend.get_genesis_block() + genesis = query.get_genesis_block(b.connection) genesis = Block.from_dict(genesis) gb = b.get_last_voted_block() assert gb == genesis @@ -360,9 +363,9 @@ class TestBigchainApi(object): monkeypatch.setattr('time.time', lambda: 3) block_3 = dummy_block() - b.write_block(block_1, durability='hard') - b.write_block(block_2, durability='hard') - b.write_block(block_3, durability='hard') + b.write_block(block_1) + b.write_block(block_2) + b.write_block(block_3) # make sure all the votes are written with the same timestamps monkeypatch.setattr('time.time', lambda: 4) @@ -387,9 +390,9 @@ class TestBigchainApi(object): monkeypatch.setattr('time.time', lambda: 3) block_3 = dummy_block() - b.write_block(block_1, durability='hard') - b.write_block(block_2, durability='hard') - b.write_block(block_3, durability='hard') + b.write_block(block_1) + b.write_block(block_2) + b.write_block(block_3) # make sure all the votes are written with different timestamps monkeypatch.setattr('time.time', lambda: 4) @@ -409,7 +412,7 @@ class TestBigchainApi(object): genesis = b.create_genesis_block() block_1 = dummy_block() - b.write_block(block_1, durability='hard') + b.write_block(block_1) b.write_vote(b.vote(block_1.id, genesis.id, True)) retrieved_block_1 = b.get_block(block_1.id) @@ -427,7 +430,7 @@ class TestBigchainApi(object): b.create_genesis_block() block_1 = dummy_block() - b.write_block(block_1, durability='hard') + b.write_block(block_1) # insert duplicate votes vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True) vote_2 = b.vote(block_1.id, b.get_last_voted_block().id, True) @@ -445,7 +448,7 @@ class TestBigchainApi(object): genesis = b.create_genesis_block() block_1 = dummy_block() - b.write_block(block_1, durability='hard') + b.write_block(block_1) # insert duplicate votes for i in range(2): b.write_vote(b.vote(block_1.id, genesis.id, True)) @@ -465,7 +468,7 @@ class TestBigchainApi(object): b.create_genesis_block() block_1 = dummy_block() - b.write_block(block_1, durability='hard') + b.write_block(block_1) vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True) # mangle the signature vote_1['signature'] = 'a' * 87 @@ -477,6 +480,7 @@ class TestBigchainApi(object): @pytest.mark.usefixtures('inputs') def test_assign_transaction_one_node(self, b, user_pk, user_sk): + from bigchaindb.backend import query from bigchaindb.models import Transaction input_tx = b.get_owned_ids(user_pk).pop() @@ -487,13 +491,14 @@ class TestBigchainApi(object): b.write_transaction(tx) # retrieve the transaction - response = list(b.backend.get_stale_transactions(0))[0] + response = list(query.get_stale_transactions(b.connection, 0))[0] # check if the assignee is the current node assert response['assignee'] == b.me @pytest.mark.usefixtures('inputs') def test_assign_transaction_multiple_nodes(self, b, user_pk, user_sk): + from bigchaindb.backend import query from bigchaindb.common.crypto import generate_key_pair from bigchaindb.models import Transaction @@ -511,13 +516,12 @@ class TestBigchainApi(object): b.write_transaction(tx) # retrieve the transaction - response = b.backend.get_stale_transactions(0) + response = query.get_stale_transactions(b.connection, 0) # check if the assignee is one of the _other_ federation nodes for tx in response: assert tx['assignee'] in b.nodes_except_me - @pytest.mark.usefixtures('inputs') def test_non_create_input_not_found(self, b, user_pk): from cryptoconditions import Ed25519Fulfillment @@ -537,6 +541,7 @@ class TestBigchainApi(object): tx.validate(Bigchain()) def test_count_backlog(self, b, user_pk): + from bigchaindb.backend import query from bigchaindb.models import Transaction for _ in range(4): @@ -544,7 +549,7 @@ class TestBigchainApi(object): [([user_pk], 1)]).sign([b.me_private]) b.write_transaction(tx) - assert b.backend.count_backlog() == 4 + assert query.count_backlog(b.connection) == 4 class TestTransactionValidation(object): @@ -605,7 +610,7 @@ class TestTransactionValidation(object): b.write_transaction(signed_transfer_tx) block = b.create_block([signed_transfer_tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) @@ -636,7 +641,7 @@ class TestTransactionValidation(object): # create block block = b.create_block([transfer_tx]) assert b.validate_block(block) == block - b.write_block(block, durability='hard') + b.write_block(block) # check that the transaction is still valid after being written to the # bigchain @@ -660,7 +665,7 @@ class TestTransactionValidation(object): # create block block = b.create_block([transfer_tx]) - b.write_block(block, durability='hard') + b.write_block(block) # create transaction with the undecided input tx_invalid = Transaction.transfer(transfer_tx.to_inputs(), @@ -805,7 +810,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) @@ -838,7 +843,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote block valid vote = b.vote(block.id, b.get_last_voted_block().id, True) @@ -866,7 +871,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) owned_inputs_user2 = b.get_owned_ids(user2_pk) @@ -876,7 +881,7 @@ class TestMultipleInputs(object): tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset) tx = tx.sign([user_sk]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) owned_inputs_user2 = b.get_owned_ids(user2_pk) @@ -896,7 +901,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block VALID vote = b.vote(block.id, genesis.id, True) @@ -913,7 +918,7 @@ class TestMultipleInputs(object): tx.asset) tx_invalid = tx_invalid.sign([user_sk]) block = b.create_block([tx_invalid]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block invalid vote = b.vote(block.id, b.get_last_voted_block().id, False) @@ -941,7 +946,7 @@ class TestMultipleInputs(object): asset=asset) tx_create_signed = tx_create.sign([b.me_private]) block = b.create_block([tx_create_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # get input owned_inputs_user1 = b.get_owned_ids(user_pk) @@ -958,7 +963,7 @@ class TestMultipleInputs(object): asset=tx_create.asset) tx_transfer_signed = tx_transfer.sign([user_sk]) block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) owned_inputs_user2 = b.get_owned_ids(user2_pk) @@ -977,7 +982,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) owned_inputs_user2 = b.get_owned_ids(user2_pk) @@ -989,7 +994,7 @@ class TestMultipleInputs(object): tx = Transaction.transfer(tx.to_inputs(), [([user3_pk], 1)], tx.asset) tx = tx.sign([user_sk, user2_sk]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) owned_inputs_user2 = b.get_owned_ids(user2_pk) @@ -1005,7 +1010,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk).pop() @@ -1019,7 +1024,7 @@ class TestMultipleInputs(object): tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset) tx = tx.sign([user_sk]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) spent_inputs_user1 = b.get_spent(input_txid, input_cid) assert spent_inputs_user1 == tx @@ -1036,7 +1041,7 @@ class TestMultipleInputs(object): tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block VALID vote = b.vote(block.id, genesis.id, True) @@ -1054,7 +1059,7 @@ class TestMultipleInputs(object): tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset) tx = tx.sign([user_sk]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # vote the block invalid vote = b.vote(block.id, b.get_last_voted_block().id, False) @@ -1083,7 +1088,7 @@ class TestMultipleInputs(object): asset=asset) tx_create_signed = tx_create.sign([b.me_private]) block = b.create_block([tx_create_signed]) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) @@ -1097,7 +1102,7 @@ class TestMultipleInputs(object): asset=tx_create.asset) tx_transfer_signed = tx_transfer.sign([user_sk]) block = b.create_block([tx_transfer_signed]) - b.write_block(block, durability='hard') + b.write_block(block) # check that used inputs are marked as spent for ffill in tx_create.to_inputs()[:2]: @@ -1124,7 +1129,7 @@ class TestMultipleInputs(object): tx = tx.sign([b.me_private]) transactions.append(tx) block = b.create_block(transactions) - b.write_block(block, durability='hard') + b.write_block(block) owned_inputs_user1 = b.get_owned_ids(user_pk) @@ -1137,7 +1142,7 @@ class TestMultipleInputs(object): [([user3_pk], 1)], transactions[0].asset) tx = tx.sign([user_sk, user2_sk]) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # check that used inputs are marked as spent assert b.get_spent(transactions[0].id, 0) == tx diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py deleted file mode 100644 index 853604e9..00000000 --- a/tests/db/test_utils.py +++ /dev/null @@ -1,201 +0,0 @@ -import builtins - -from bigchaindb.common import exceptions -import pytest -import rethinkdb as r - -import bigchaindb -from bigchaindb.db import utils -from .conftest import setup_database as _setup_database - - -# Since we are testing database initialization and database drop, -# we need to use the `setup_database` fixture on a function level -@pytest.fixture(scope='function', autouse=True) -def setup_database(request, node_config): - _setup_database(request, node_config) - - -def test_init_creates_db_tables_and_indexes(): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures so we need to remove it - r.db_drop(dbname).run(conn) - - utils.init() - - assert r.db_list().contains(dbname).run(conn) is True - - assert r.db(dbname).table_list().contains('backlog', 'bigchain').run(conn) is True - - assert r.db(dbname).table('bigchain').index_list().contains( - 'block_timestamp').run(conn) is True - - assert r.db(dbname).table('backlog').index_list().contains( - 'assignee__transaction_timestamp').run(conn) is True - - -def test_create_database(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - assert r.db_list().contains(dbname).run(conn) is True - - -def test_create_bigchain_table(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'bigchain') - - assert r.db(dbname).table_list().contains('bigchain').run(conn) is True - assert r.db(dbname).table_list().contains('backlog').run(conn) is False - assert r.db(dbname).table_list().contains('votes').run(conn) is False - - -def test_create_bigchain_secondary_index(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'bigchain') - utils.create_bigchain_secondary_index(conn, dbname) - - assert r.db(dbname).table('bigchain').index_list().contains( - 'block_timestamp').run(conn) is True - assert r.db(dbname).table('bigchain').index_list().contains( - 'transaction_id').run(conn) is True - - -def test_create_backlog_table(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'backlog') - - assert r.db(dbname).table_list().contains('backlog').run(conn) is True - assert r.db(dbname).table_list().contains('bigchain').run(conn) is False - assert r.db(dbname).table_list().contains('votes').run(conn) is False - - -def test_create_backlog_secondary_index(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'backlog') - utils.create_backlog_secondary_index(conn, dbname) - - assert r.db(dbname).table('backlog').index_list().contains( - 'assignee__transaction_timestamp').run(conn) is True - - -def test_create_votes_table(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'votes') - - assert r.db(dbname).table_list().contains('votes').run(conn) is True - assert r.db(dbname).table_list().contains('bigchain').run(conn) is False - assert r.db(dbname).table_list().contains('backlog').run(conn) is False - - -def test_create_votes_secondary_index(): - conn = utils.get_conn() - dbname = utils.get_database_name() - - # The db is set up by fixtures so we need to remove it - # and recreate it just with one table - r.db_drop(dbname).run(conn) - utils.create_database(conn, dbname) - utils.create_table(conn, dbname, 'votes') - utils.create_votes_secondary_index(conn, dbname) - - assert r.db(dbname).table('votes').index_list().contains( - 'block_and_voter').run(conn) is True - - -def test_init_fails_if_db_exists(): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - assert r.db_list().contains(dbname).run(conn) is True - - with pytest.raises(exceptions.DatabaseAlreadyExists): - utils.init() - - -def test_drop_interactively_drops_the_database_when_user_says_yes(monkeypatch): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - assert r.db_list().contains(dbname).run(conn) is True - - monkeypatch.setattr(builtins, 'input', lambda x: 'y') - utils.drop() - - assert r.db_list().contains(dbname).run(conn) is False - - -def test_drop_programmatically_drops_the_database_when_assume_yes_is_true(monkeypatch): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - assert r.db_list().contains(dbname).run(conn) is True - - utils.drop(assume_yes=True) - - assert r.db_list().contains(dbname).run(conn) is False - - -def test_drop_interactively_does_not_drop_the_database_when_user_says_no(monkeypatch): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - print(r.db_list().contains(dbname).run(conn)) - assert r.db_list().contains(dbname).run(conn) is True - - monkeypatch.setattr(builtins, 'input', lambda x: 'n') - utils.drop() - - assert r.db_list().contains(dbname).run(conn) is True - -def test_drop_non_existent_db_raises_an_error(): - conn = utils.get_conn() - dbname = bigchaindb.config['database']['name'] - - # The db is set up by fixtures - assert r.db_list().contains(dbname).run(conn) is True - utils.drop(assume_yes=True) - - with pytest.raises(exceptions.DatabaseDoesNotExist): - utils.drop(assume_yes=True) - diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index fee4817f..b90e5926 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -26,6 +26,7 @@ def inputs(user_pk): @pytest.mark.usefixtures('processes') def test_fast_double_create(b, user_pk): from bigchaindb.models import Transaction + from bigchaindb.backend.query import count_blocks tx = Transaction.create([b.me], [([user_pk], 1)], metadata={'test': 'test'}) \ .sign([b.me_private]) @@ -42,12 +43,13 @@ def test_fast_double_create(b, user_pk): # test the transaction appears only once last_voted_block = b.get_last_voted_block() assert len(last_voted_block.transactions) == 1 - assert b.backend.count_blocks() == 2 + assert count_blocks(b.connection) == 2 @pytest.mark.usefixtures('processes') def test_double_create(b, user_pk): from bigchaindb.models import Transaction + from bigchaindb.backend.query import count_blocks tx = Transaction.create([b.me], [([user_pk], 1)], metadata={'test': 'test'}) \ .sign([b.me_private]) @@ -63,4 +65,4 @@ def test_double_create(b, user_pk): # test the transaction appears only once last_voted_block = b.get_last_voted_block() assert len(last_voted_block.transactions) == 1 - assert b.backend.count_blocks() == 2 + assert count_blocks(b.connection) == 2 diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 9cc7b56c..0616365d 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -67,7 +67,7 @@ def test_write_block(b, user_pk): block_doc = b.create_block(txs) block_maker.write(block_doc) - expected = b.backend.get_block(block_doc.id) + expected = b.get_block(block_doc.id) expected = Block.from_dict(expected) assert expected == block_doc @@ -88,7 +88,7 @@ def test_duplicate_transaction(b, user_pk): block_maker.write(block_doc) # block is in bigchain - assert b.backend.get_block(block_doc.id) == block_doc.to_dict() + assert b.get_block(block_doc.id) == block_doc.to_dict() b.write_transaction(txs[0]) @@ -159,6 +159,7 @@ def test_start(create_pipeline): def test_full_pipeline(b, user_pk): import random + from bigchaindb.backend import query from bigchaindb.models import Block, Transaction from bigchaindb.pipelines.block import create_pipeline, get_changefeed @@ -172,7 +173,7 @@ def test_full_pipeline(b, user_pk): b.write_transaction(tx) - assert b.backend.count_backlog() == 100 + assert query.count_backlog(b.connection) == 100 pipeline = create_pipeline() pipeline.setup(indata=get_changefeed(), outdata=outpipe) @@ -182,9 +183,9 @@ def test_full_pipeline(b, user_pk): pipeline.terminate() block_doc = outpipe.get() - chained_block = b.backend.get_block(block_doc.id) + chained_block = b.get_block(block_doc.id) chained_block = Block.from_dict(chained_block) block_len = len(block_doc.transactions) assert chained_block == block_doc - assert b.backend.count_backlog() == 100 - block_len + assert query.count_backlog(b.connection) == 100 - block_len diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 300dee3f..f9fee0bb 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -136,6 +136,7 @@ def test_start(mock_start): def test_full_pipeline(b, user_pk): import random + from bigchaindb.backend import query from bigchaindb.models import Transaction outpipe = Pipe() @@ -177,8 +178,8 @@ def test_full_pipeline(b, user_pk): # only transactions from the invalid block should be returned to # the backlog - assert b.backend.count_backlog() == 100 + assert query.count_backlog(b.connection) == 100 # NOTE: I'm still, I'm still tx from the block. tx_from_block = set([tx.id for tx in invalid_block.transactions]) - tx_from_backlog = set([tx['id'] for tx in list(b.backend.get_stale_transactions(0))]) + tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))]) assert tx_from_block == tx_from_backlog diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py index 31848122..d90cf4f0 100644 --- a/tests/pipelines/test_stale_monitor.py +++ b/tests/pipelines/test_stale_monitor.py @@ -10,7 +10,7 @@ def test_get_stale(b, user_pk): from bigchaindb.models import Transaction tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) - b.write_transaction(tx, durability='hard') + b.write_transaction(tx) stm = stale.StaleTransactionMonitor(timeout=0.001, backlog_reassign_delay=0.001) @@ -23,11 +23,12 @@ def test_get_stale(b, user_pk): def test_reassign_transactions(b, user_pk): + from bigchaindb.backend import query from bigchaindb.models import Transaction # test with single node tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) - b.write_transaction(tx, durability='hard') + b.write_transaction(tx) stm = stale.StaleTransactionMonitor(timeout=0.001, backlog_reassign_delay=0.001) @@ -36,15 +37,15 @@ def test_reassign_transactions(b, user_pk): # test with federation tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) - b.write_transaction(tx, durability='hard') + b.write_transaction(tx) stm = stale.StaleTransactionMonitor(timeout=0.001, backlog_reassign_delay=0.001) stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] - tx = list(b.backend.get_stale_transactions(0))[0] + tx = list(query.get_stale_transactions(b.connection, 0))[0] stm.reassign_transactions(tx) - reassigned_tx = list(b.backend.get_stale_transactions(0))[0] + reassigned_tx = list(query.get_stale_transactions(b.connection, 0))[0] assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] assert reassigned_tx['assignee'] != tx['assignee'] @@ -52,15 +53,16 @@ def test_reassign_transactions(b, user_pk): tx = Transaction.create([b.me], [([user_pk], 1)]) tx = tx.sign([b.me_private]) stm.bigchain.nodes_except_me = ['lol'] - b.write_transaction(tx, durability='hard') + b.write_transaction(tx) stm.bigchain.nodes_except_me = None - tx = list(b.backend.get_stale_transactions(0))[0] + tx = list(query.get_stale_transactions(b.connection, 0))[0] stm.reassign_transactions(tx) assert tx['assignee'] != 'lol' def test_full_pipeline(monkeypatch, user_pk): + from bigchaindb.backend import query from bigchaindb.models import Transaction CONFIG = { 'database': { @@ -87,7 +89,7 @@ def test_full_pipeline(monkeypatch, user_pk): original_txc.append(tx.to_dict()) b.write_transaction(tx) - original_txs = list(b.backend.get_stale_transactions(0)) + original_txs = list(query.get_stale_transactions(b.connection, 0)) original_txs = {tx['id']: tx for tx in original_txs} assert len(original_txs) == 100 @@ -111,14 +113,15 @@ def test_full_pipeline(monkeypatch, user_pk): pipeline.terminate() - assert len(list(b.backend.get_stale_transactions(0))) == 100 - reassigned_txs= list(b.backend.get_stale_transactions(0)) + assert len(list(query.get_stale_transactions(b.connection, 0))) == 100 + reassigned_txs = list(query.get_stale_transactions(b.connection, 0)) # check that every assignment timestamp has increased, and every tx has a new assignee for reassigned_tx in reassigned_txs: assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + @patch.object(Pipeline, 'start') def test_start(mock_start): # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index 66f1bdd9..1271068a 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -1,56 +1,67 @@ -from unittest.mock import patch +import pytest +from unittest.mock import Mock from multipipes import Pipe -from bigchaindb.db.utils import Connection +from bigchaindb import Bigchain +from bigchaindb.backend.connection import Connection from bigchaindb.pipelines.utils import ChangeFeed -MOCK_CHANGEFEED_DATA = [{ - 'new_val': 'seems like we have an insert here', - 'old_val': None, -}, { - 'new_val': None, - 'old_val': 'seems like we have a delete here', -}, { - 'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here', -}] +@pytest.fixture +def mock_changefeed_data(): + return [ + { + 'new_val': 'seems like we have an insert here', + 'old_val': None, + }, { + 'new_val': None, + 'old_val': 'seems like we have a delete here', + }, { + 'new_val': 'seems like we have an update here', + 'old_val': 'seems like we have an update here', + } + ] -@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) -def test_changefeed_insert(mock_run): +@pytest.fixture +def mock_changefeed_bigchain(mock_changefeed_data): + connection = Connection() + connection.run = Mock(return_value=mock_changefeed_data) + return Bigchain(connection=connection) + + +def test_changefeed_insert(mock_changefeed_bigchain): outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT) + changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' assert outpipe.qsize() == 0 -@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) -def test_changefeed_delete(mock_run): +def test_changefeed_delete(mock_changefeed_bigchain): outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.DELETE) + changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have a delete here' assert outpipe.qsize() == 0 -@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) -def test_changefeed_update(mock_run): +def test_changefeed_update(mock_changefeed_bigchain): outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) + changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 -@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) -def test_changefeed_multiple_operations(mock_run): +def test_changefeed_multiple_operations(mock_changefeed_bigchain): outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed = ChangeFeed('backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE, + bigchain=mock_changefeed_bigchain) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' @@ -58,10 +69,12 @@ def test_changefeed_multiple_operations(mock_run): assert outpipe.qsize() == 0 -@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) -def test_changefeed_prefeed(mock_run): +def test_changefeed_prefeed(mock_changefeed_bigchain): outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3]) + changefeed = ChangeFeed('backlog', + ChangeFeed.INSERT, + prefeed=[1, 2, 3], + bigchain=mock_changefeed_bigchain) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.qsize() == 4 diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index be3652b2..0f471c0b 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -157,6 +157,7 @@ def test_vote_accumulates_transactions(b): def test_valid_block_voting_sequential(b, monkeypatch): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.pipelines import vote @@ -169,7 +170,7 @@ def test_valid_block_voting_sequential(b, monkeypatch): last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx)) vote_obj.write_vote(last_vote) - vote_rs = b.backend.get_votes_by_block_id_and_voter(block_id, b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me) vote_doc = vote_rs.next() assert vote_doc['vote'] == {'voting_for_block': block.id, @@ -185,6 +186,7 @@ def test_valid_block_voting_sequential(b, monkeypatch): def test_valid_block_voting_multiprocessing(b, monkeypatch): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.pipelines import vote @@ -203,7 +205,7 @@ def test_valid_block_voting_multiprocessing(b, monkeypatch): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block.id, @@ -219,6 +221,7 @@ def test_valid_block_voting_multiprocessing(b, monkeypatch): def test_valid_block_voting_with_create_transaction(b, monkeypatch): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.models import Transaction from bigchaindb.pipelines import vote @@ -244,7 +247,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block.id, @@ -260,6 +263,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch): def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.models import Transaction from bigchaindb.pipelines import vote @@ -273,7 +277,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 1111111111) block = b.create_block([tx]) - b.write_block(block, durability='hard') + b.write_block(block) # create a `TRANSFER` transaction test_user2_priv, test_user2_pub = crypto.generate_key_pair() @@ -283,7 +287,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 2222222222) block2 = b.create_block([tx2]) - b.write_block(block2, durability='hard') + b.write_block(block2) inpipe = Pipe() outpipe = Pipe() @@ -299,7 +303,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): vote2_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block.id, @@ -313,7 +317,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): assert crypto.PublicKey(b.me).verify(serialized_vote, vote_doc['signature']) is True - vote2_rs = b.backend.get_votes_by_block_id_and_voter(block2.id, b.me) + vote2_rs = query.get_votes_by_block_id_and_voter(b.connection, block2.id, b.me) vote2_doc = vote2_rs.next() assert vote2_out['vote'] == vote2_doc['vote'] assert vote2_doc['vote'] == {'voting_for_block': block2.id, @@ -329,6 +333,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b): def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.models import Transaction from bigchaindb.pipelines import vote @@ -350,7 +355,7 @@ def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block.id, @@ -366,6 +371,7 @@ def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk): def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.models import Transaction from bigchaindb.pipelines import vote @@ -389,7 +395,7 @@ def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block['id'], @@ -405,6 +411,7 @@ def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk): def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.models import Transaction from bigchaindb.pipelines import vote @@ -428,7 +435,7 @@ def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block['id'], @@ -444,6 +451,7 @@ def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk): def test_invalid_block_voting(monkeypatch, b, user_pk): + from bigchaindb.backend import query from bigchaindb.common import crypto, util from bigchaindb.pipelines import vote @@ -463,7 +471,7 @@ def test_invalid_block_voting(monkeypatch, b, user_pk): vote_out = outpipe.get() vote_pipeline.terminate() - vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me) + vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me) vote_doc = vote_rs.next() assert vote_out['vote'] == vote_doc['vote'] assert vote_doc['vote'] == {'voting_for_block': block['id'], @@ -479,6 +487,7 @@ def test_invalid_block_voting(monkeypatch, b, user_pk): def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): + from bigchaindb.backend import query from bigchaindb.pipelines import vote outpipe = Pipe() @@ -492,11 +501,11 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 2222222222) block_1 = dummy_block(b) block_ids.append(block_1.id) - b.write_block(block_1, durability='hard') monkeypatch.setattr('time.time', lambda: 3333333333) + b.write_block(block_1) block_2 = dummy_block(b) block_ids.append(block_2.id) - b.write_block(block_2, durability='hard') + b.write_block(block_2) vote_pipeline = vote.create_pipeline() vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) @@ -511,7 +520,7 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 4444444444) block_3 = dummy_block(b) block_ids.append(block_3.id) - b.write_block(block_3, durability='hard') + b.write_block(block_3) # Same as before with the two `get`s outpipe.get() @@ -519,13 +528,14 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): vote_pipeline.terminate() # retrieve vote - votes = [list(b.backend.get_votes_by_block_id(_id))[0] + votes = [list(query.get_votes_by_block_id(b.connection, _id))[0] for _id in block_ids] assert all(vote['node_pubkey'] == b.me for vote in votes) def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): + from bigchaindb.backend import query from bigchaindb.pipelines import vote outpipe = Pipe() @@ -537,12 +547,12 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 2222222222) block_1 = dummy_block(b) block_ids.append(block_1.id) - b.write_block(block_1, durability='hard') + b.write_block(block_1) monkeypatch.setattr('time.time', lambda: 3333333333) block_2 = dummy_block(b) block_ids.append(block_2.id) - b.write_block(block_2, durability='hard') + b.write_block(block_2) vote_pipeline = vote.create_pipeline() vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) @@ -558,7 +568,7 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): blocks = [b.get_block(_id) for _id in block_ids] # retrieve votes - votes = [list(b.backend.get_votes_by_block_id(_id))[0] + votes = [list(query.get_votes_by_block_id(b.connection, _id))[0] for _id in block_ids] assert ({v['vote']['voting_for_block'] for v in votes} == @@ -566,6 +576,7 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): def test_voter_checks_for_previous_vote(monkeypatch, b): + from bigchaindb.backend import query from bigchaindb.pipelines import vote inpipe = Pipe() @@ -577,7 +588,7 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): monkeypatch.setattr('time.time', lambda: 2222222222) block_1 = dummy_block(b) inpipe.put(block_1.to_dict()) - assert len(list(b.backend.get_votes_by_block_id(block_1.id))) == 0 + assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 0 vote_pipeline = vote.create_pipeline() vote_pipeline.setup(indata=inpipe, outdata=outpipe) @@ -600,8 +611,8 @@ def test_voter_checks_for_previous_vote(monkeypatch, b): vote_pipeline.terminate() - assert len(list(b.backend.get_votes_by_block_id(block_1.id))) == 1 - assert len(list(b.backend.get_votes_by_block_id(block_2.id))) == 1 + assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 1 + assert len(list(query.get_votes_by_block_id(b.connection, block_2.id))) == 1 @patch.object(Pipeline, 'start') diff --git a/tests/test_commands.py b/tests/test_commands.py index 95fc4179..b7d5d365 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -21,13 +21,13 @@ def mock_write_config(monkeypatch): @pytest.fixture def mock_db_init_with_existing_db(monkeypatch): - from bigchaindb import db + from bigchaindb import commands from bigchaindb.common.exceptions import DatabaseAlreadyExists def mockreturn(): raise DatabaseAlreadyExists - monkeypatch.setattr(db, 'init', mockreturn) + monkeypatch.setattr(commands.bigchain, '_run_init', mockreturn) @pytest.fixture @@ -36,16 +36,6 @@ def mock_processes_start(monkeypatch): monkeypatch.setattr(processes, 'start', lambda *args: None) -@pytest.fixture -def mock_rethink_db_drop(monkeypatch): - def mockreturn(dbname): - class MockDropped(object): - def run(self, conn): - return - return MockDropped() - monkeypatch.setattr('rethinkdb.db_drop', mockreturn) - - @pytest.fixture def mock_generate_key_pair(monkeypatch): monkeypatch.setattr('bigchaindb.common.crypto.generate_key_pair', lambda: ('privkey', 'pubkey')) @@ -225,10 +215,33 @@ def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db): run_init(args) -def test_drop_existing_db(mock_rethink_db_drop): +@patch('bigchaindb.backend.schema.drop_database') +def test_drop_db_when_assumed_yes(mock_db_drop): from bigchaindb.commands.bigchain import run_drop args = Namespace(config=None, yes=True) + run_drop(args) + assert mock_db_drop.called + + +@patch('bigchaindb.backend.schema.drop_database') +def test_drop_db_when_interactive_yes(mock_db_drop, monkeypatch): + from bigchaindb.commands.bigchain import run_drop + args = Namespace(config=None, yes=False) + monkeypatch.setattr('bigchaindb.commands.bigchain.input', lambda x: 'y') + + run_drop(args) + assert mock_db_drop.called + + +@patch('bigchaindb.backend.schema.drop_database') +def test_drop_db_does_not_drop_when_interactive_no(mock_db_drop, monkeypatch): + from bigchaindb.commands.bigchain import run_drop + args = Namespace(config=None, yes=False) + monkeypatch.setattr('bigchaindb.commands.bigchain.input', lambda x: 'n') + + run_drop(args) + assert not mock_db_drop.called def test_run_configure_when_config_exists_and_skipping(monkeypatch): diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index ac972b4c..548d3160 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -113,7 +113,7 @@ def test_env_config(monkeypatch): assert result == expected -def test_autoconfigure_read_both_from_file_and_env(monkeypatch): +def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request): file_config = { 'database': {'host': 'test-host'}, 'backlog_reassign_delay': 5 @@ -136,6 +136,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): 'threads': None, }, 'database': { + 'backend': request.config.getoption('--database-backend'), 'host': 'test-host', 'port': 4242, 'name': 'test-dbname', diff --git a/tests/test_core.py b/tests/test_core.py index 55d73e77..16974098 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,5 +1,3 @@ -from collections import namedtuple - from rethinkdb.ast import RqlQuery import pytest @@ -9,6 +7,7 @@ import pytest def config(request, monkeypatch): config = { 'database': { + 'backend': request.config.getoption('--database-backend'), 'host': 'host', 'port': 28015, 'name': 'bigchain', @@ -30,10 +29,12 @@ def config(request, monkeypatch): def test_bigchain_class_default_initialization(config): from bigchaindb.core import Bigchain from bigchaindb.consensus import BaseConsensusRules + from bigchaindb.backend.connection import Connection bigchain = Bigchain() - assert bigchain.host == config['database']['host'] - assert bigchain.port == config['database']['port'] - assert bigchain.dbname == config['database']['name'] + assert isinstance(bigchain.connection, Connection) + assert bigchain.connection.host == config['database']['host'] + assert bigchain.connection.port == config['database']['port'] + assert bigchain.connection.dbname == config['database']['name'] assert bigchain.me == config['keypair']['public'] assert bigchain.me_private == config['keypair']['private'] assert bigchain.nodes_except_me == config['keyring'] @@ -42,19 +43,25 @@ def test_bigchain_class_default_initialization(config): def test_bigchain_class_initialization_with_parameters(config): from bigchaindb.core import Bigchain + from bigchaindb.backend import connect from bigchaindb.consensus import BaseConsensusRules init_kwargs = { - 'host': 'some_node', - 'port': '12345', - 'dbname': 'atom', 'public_key': 'white', 'private_key': 'black', 'keyring': ['key_one', 'key_two'], } - bigchain = Bigchain(**init_kwargs) - assert bigchain.host == init_kwargs['host'] - assert bigchain.port == init_kwargs['port'] - assert bigchain.dbname == init_kwargs['dbname'] + init_db_kwargs = { + 'backend': 'rethinkdb', + 'host': 'this_is_the_db_host', + 'port': 12345, + 'name': 'this_is_the_db_name', + } + connection = connect(**init_db_kwargs) + bigchain = Bigchain(connection=connection, **init_kwargs) + assert bigchain.connection == connection + assert bigchain.connection.host == init_db_kwargs['host'] + assert bigchain.connection.port == init_db_kwargs['port'] + assert bigchain.connection.dbname == init_db_kwargs['name'] assert bigchain.me == init_kwargs['public_key'] assert bigchain.me_private == init_kwargs['private_key'] assert bigchain.nodes_except_me == init_kwargs['keyring'] @@ -62,12 +69,12 @@ def test_bigchain_class_initialization_with_parameters(config): def test_get_blocks_status_containing_tx(monkeypatch): - from bigchaindb.db.backends.rethinkdb import RethinkDBBackend + from bigchaindb.backend import query as backend_query from bigchaindb.core import Bigchain blocks = [ {'id': 1}, {'id': 2} ] - monkeypatch.setattr(RethinkDBBackend, 'get_blocks_status_from_transaction', lambda x: blocks) + monkeypatch.setattr(backend_query, 'get_blocks_status_from_transaction', lambda x: blocks) monkeypatch.setattr(Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID) bigchain = Bigchain(public_key='pubkey', private_key='privkey') with pytest.raises(Exception): diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index f81bd232..1aeea11e 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -3,11 +3,11 @@ import pytest import rethinkdb as r -from bigchaindb.db.utils import Connection +from bigchaindb.backend import connect def test_run_a_simple_query(): - conn = Connection() + conn = connect() query = r.expr('1') assert conn.run(query) == '1' @@ -17,7 +17,7 @@ def test_raise_exception_when_max_tries(): def run(self, conn): raise r.ReqlDriverError('mock') - conn = Connection() + conn = connect() with pytest.raises(r.ReqlDriverError): conn.run(MockQuery()) @@ -30,7 +30,7 @@ def test_reconnect_when_connection_lost(): def raise_exception(*args, **kwargs): raise r.ReqlDriverError('mock') - conn = Connection() + conn = connect() original_connect = r.connect r.connect = raise_exception @@ -75,7 +75,6 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): else: time.sleep(10) - bigchain = Bigchain() bigchain.connection = MockConnection() changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,