Merge pull request #788 from bigchaindb/abstract-db-layer

Abstract database interface
This commit is contained in:
Sylvain Bellemare 2016-12-12 17:45:19 +01:00 committed by GitHub
commit 4c3d5f0e2b
42 changed files with 1584 additions and 1060 deletions

View File

@ -15,6 +15,7 @@ config = {
'threads': None, # if none, the value will be cpu_count * 2 + 1
},
'database': {
'backend': 'rethinkdb',
'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'),
'port': 28015,
'name': 'bigchain',

View File

@ -0,0 +1,6 @@
"""Backend interfaces ..."""
# Include the backend interfaces
from bigchaindb.backend import changefeed, schema, query # noqa
from bigchaindb.backend.connection import connect # noqa

View File

@ -0,0 +1 @@
"""Changefeed interfaces for backend databases."""

View File

@ -0,0 +1,44 @@
import bigchaindb
from bigchaindb.common.exceptions import ConfigurationError
from importlib import import_module
BACKENDS = {
'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection'
}
def connect(backend=None, host=None, port=None, name=None):
"""Create a connection to the database backend.
Args:
backend (str): the name of the backend to use.
host (str): the host to connect to.
port (int): the port to connect to.
name (str): the name of the database to use.
Returns:
An instance of :class:`~bigchaindb.backend.connection.Connection`.
"""
backend = backend or bigchaindb.config['database']['backend']
host = host or bigchaindb.config['database']['host']
port = port or bigchaindb.config['database']['port']
dbname = name or bigchaindb.config['database']['name']
try:
module_name, _, class_name = BACKENDS[backend].rpartition('.')
Class = getattr(import_module(module_name), class_name)
except KeyError:
raise ConfigurationError('Backend `{}` is not supported. '
'BigchainDB currently supports {}'.format(backend, BACKENDS.keys()))
except (ImportError, AttributeError) as exc:
raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc
return Class(host, port, dbname)
class Connection:
def run(self, query):
raise NotImplementedError()

320
bigchaindb/backend/query.py Normal file
View File

@ -0,0 +1,320 @@
"""Query interfaces for backend databases."""
from functools import singledispatch
@singledispatch
def write_transaction(connection, signed_transaction):
"""Write a transaction to the backlog table.
Args:
signed_transaction (dict): a signed transaction.
Returns:
The result of the operation.
"""
raise NotImplementedError
@singledispatch
def update_transaction(connection, transaction_id, doc):
"""Update a transaction in the backlog table.
Args:
transaction_id (str): the id of the transaction.
doc (dict): the values to update.
Returns:
The result of the operation.
"""
raise NotImplementedError
@singledispatch
def delete_transaction(connection, *transaction_id):
"""Delete a transaction from the backlog.
Args:
*transaction_id (str): the transaction(s) to delete.
Returns:
The database response.
"""
raise NotImplementedError
@singledispatch
def get_stale_transactions(connection, reassign_delay):
"""Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node,
but are still in the backlog after some amount of time specified in the
configuration.
Args:
reassign_delay (int): threshold (in seconds) to mark a transaction stale.
Returns:
A cursor of transactions.
"""
raise NotImplementedError
@singledispatch
def get_transaction_from_block(connection, transaction_id, block_id):
"""Get a transaction from a specific block.
Args:
transaction_id (str): the id of the transaction.
block_id (str): the id of the block.
Returns:
The matching transaction.
"""
raise NotImplementedError
@singledispatch
def get_transaction_from_backlog(connection, transaction_id):
"""Get a transaction from backlog.
Args:
transaction_id (str): the id of the transaction.
Returns:
The matching transaction.
"""
raise NotImplementedError
@singledispatch
def get_blocks_status_from_transaction(connection, transaction_id):
"""Retrieve block election information given a secondary index and value.
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
raise NotImplementedError
@singledispatch
def get_txids_by_asset_id(connection, asset_id):
"""Retrieves transactions ids related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us
to query all the transactions related to a particular digital asset,
knowing the id.
Args:
asset_id (str): the id for this particular metadata.
Returns:
A list of transactions ids related to the asset. If no transaction
exists for that asset it returns an empty list ``[]``
"""
raise NotImplementedError
@singledispatch
def get_asset_by_id(conneciton, asset_id):
"""Returns the asset associated with an asset_id.
Args:
asset_id (str): The asset id.
Returns:
Returns a rethinkdb cursor.
"""
raise NotImplementedError
@singledispatch
def get_spent(connection, transaction_id, condition_id):
"""Check if a `txid` was already used as an input.
A transaction can be used as an input for another transaction. Bigchain
needs to make sure that a given `txid` is only used once.
Args:
transaction_id (str): The id of the transaction.
condition_id (int): The index of the condition in the respective
transaction.
Returns:
The transaction that used the `txid` as an input else `None`
"""
raise NotImplementedError
@singledispatch
def get_owned_ids(connection, owner):
"""Retrieve a list of `txids` that can we used has inputs.
Args:
owner (str): base58 encoded public key.
Returns:
A cursor for the matching transactions.
"""
raise NotImplementedError
@singledispatch
def get_votes_by_block_id(connection, block_id):
"""Get all the votes casted for a specific block.
Args:
block_id (str): the block id to use.
Returns:
A cursor for the matching votes.
"""
raise NotImplementedError
@singledispatch
def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
"""Get all the votes casted for a specific block by a specific voter.
Args:
block_id (str): the block id to use.
node_pubkey (str): base58 encoded public key
Returns:
A cursor for the matching votes.
"""
raise NotImplementedError
@singledispatch
def write_block(connection, block):
"""Write a block to the bigchain table.
Args:
block (dict): the block to write.
Returns:
The database response.
"""
raise NotImplementedError
@singledispatch
def get_block(connection, block_id):
"""Get a block from the bigchain table.
Args:
block_id (str): block id of the block to get
Returns:
block (dict): the block or `None`
"""
raise NotImplementedError
@singledispatch
def has_transaction(connection, transaction_id):
"""Check if a transaction exists in the bigchain table.
Args:
transaction_id (str): the id of the transaction to check.
Returns:
``True`` if the transaction exists, ``False`` otherwise.
"""
raise NotImplementedError
@singledispatch
def count_blocks(connection):
"""Count the number of blocks in the bigchain table.
Returns:
The number of blocks.
"""
raise NotImplementedError
@singledispatch
def count_backlog(connection):
"""Count the number of transactions in the backlog table.
Returns:
The number of transactions in the backlog.
"""
raise NotImplementedError
@singledispatch
def write_vote(connection, vote):
"""Write a vote to the votes table.
Args:
vote (dict): the vote to write.
Returns:
The database response.
"""
raise NotImplementedError
@singledispatch
def get_genesis_block(connection):
"""Get the genesis block.
Returns:
The genesis block
"""
raise NotImplementedError
@singledispatch
def get_last_voted_block(connection, node_pubkey):
"""Get the last voted block for a specific node.
Args:
node_pubkey (str): base58 encoded public key.
Returns:
The last block the node has voted on. If the node didn't cast
any vote then the genesis block is returned.
"""
raise NotImplementedError
@singledispatch
def get_unvoted_blocks(connection, node_pubkey):
"""Return all the blocks that have not been voted by the specified node.
Args:
node_pubkey (str): base58 encoded public key
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
raise NotImplementedError

View File

@ -0,0 +1,4 @@
"""RethinkDB backend components ..."""
# Register the single dispatched modules on import.
from bigchaindb.backend.rethinkdb import changefeed, schema, query # noqa

View File

@ -0,0 +1,79 @@
import time
import logging
import rethinkdb as r
from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
class RethinkDBConnection(Connection):
"""
This class is a proxy to run queries against the database, it is:
- lazy, since it creates a connection only when needed
- resilient, because before raising exceptions it tries
more times to run the query or open a connection.
"""
def __init__(self, host, port, dbname, max_tries=3):
"""Create a new Connection instance.
Args:
host (str): the host to connect to.
port (int): the port to connect to.
dbname (str): the name of the database to use.
max_tries (int, optional): how many tries before giving up.
Defaults to 3.
"""
self.host = host
self.port = port
self.dbname = dbname
self.max_tries = max_tries
self.conn = None
def run(self, query):
"""Run a RethinkDB query.
Args:
query: the RethinkDB query.
Raises:
:exc:`rethinkdb.ReqlDriverError`: After
:attr:`~.RethinkDBConnection.max_tries`.
"""
if self.conn is None:
self._connect()
for i in range(self.max_tries):
try:
return query.run(self.conn)
except r.ReqlDriverError:
if i + 1 == self.max_tries:
raise
self._connect()
def _connect(self):
"""Set a connection to RethinkDB.
The connection is available via :attr:`~.RethinkDBConnection.conn`.
Raises:
:exc:`rethinkdb.ReqlDriverError`: After
:attr:`~.RethinkDBConnection.max_tries`.
"""
for i in range(1, self.max_tries + 1):
logging.debug('Connecting to database %s:%s/%s. (Attempt %s/%s)',
self.host, self.port, self.dbname, i, self.max_tries)
try:
self.conn = r.connect(host=self.host, port=self.port, db=self.dbname)
except r.ReqlDriverError:
if i == self.max_tries:
raise
wait_time = 2**i
logging.debug('Error connecting to database, waiting %ss', wait_time)
time.sleep(wait_time)

View File

@ -0,0 +1,248 @@
"""Query implementation for RethinkDB"""
from time import time
import rethinkdb as r
from bigchaindb import backend, util
from bigchaindb.common import exceptions
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
READ_MODE = 'majority'
WRITE_DURABILITY = 'hard'
register_query = module_dispatch_registrar(backend.query)
@register_query(RethinkDBConnection)
def write_transaction(connection, signed_transaction):
return connection.run(
r.table('backlog')
.insert(signed_transaction, durability=WRITE_DURABILITY))
@register_query(RethinkDBConnection)
def update_transaction(connection, transaction_id, doc):
return connection.run(
r.table('backlog')
.get(transaction_id)
.update(doc))
@register_query(RethinkDBConnection)
def delete_transaction(connection, *transaction_id):
return connection.run(
r.table('backlog')
.get_all(*transaction_id)
.delete(durability=WRITE_DURABILITY))
@register_query(RethinkDBConnection)
def get_stale_transactions(connection, reassign_delay):
return connection.run(
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay))
@register_query(RethinkDBConnection)
def get_transaction_from_block(connection, transaction_id, block_id):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get(block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == transaction_id))[0]
@register_query(RethinkDBConnection)
def get_transaction_from_backlog(connection, transaction_id):
return connection.run(
r.table('backlog')
.get(transaction_id)
.without('assignee', 'assignment_timestamp')
.default(None))
@register_query(RethinkDBConnection)
def get_blocks_status_from_transaction(connection, transaction_id):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get_all(transaction_id, index='transaction_id')
.pluck('votes', 'id', {'block': ['voters']}))
@register_query(RethinkDBConnection)
def get_txids_by_asset_id(connection, asset_id):
# here we only want to return the transaction ids since later on when
# we are going to retrieve the transaction with status validation
return connection.run(
r.table('bigchain')
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['asset']['id'] == asset_id)
.get_field('id'))
@register_query(RethinkDBConnection)
def get_asset_by_id(connection, asset_id):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['asset']['id'] == asset_id)
.filter(lambda transaction: transaction['operation'] == 'CREATE')
.pluck('asset'))
@register_query(RethinkDBConnection)
def get_spent(connection, transaction_id, condition_id):
# TODO: use index!
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['fulfillments'].contains(
lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id})))
@register_query(RethinkDBConnection)
def get_owned_ids(connection, owner):
# TODO: use index!
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['conditions'].contains(
lambda c: c['owners_after'].contains(owner))))
@register_query(RethinkDBConnection)
def get_votes_by_block_id(connection, block_id):
return connection.run(
r.table('votes', read_mode=READ_MODE)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')
.without('id'))
@register_query(RethinkDBConnection)
def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
return connection.run(
r.table('votes')
.get_all([block_id, node_pubkey], index='block_and_voter')
.without('id'))
@register_query(RethinkDBConnection)
def write_block(connection, block):
return connection.run(
r.table('bigchain')
.insert(r.json(block), durability=WRITE_DURABILITY))
@register_query(RethinkDBConnection)
def get_block(connection, block_id):
return connection.run(r.table('bigchain').get(block_id))
@register_query(RethinkDBConnection)
def has_transaction(connection, transaction_id):
return bool(connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get_all(transaction_id, index='transaction_id').count()))
@register_query(RethinkDBConnection)
def count_blocks(connection):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.count())
@register_query(RethinkDBConnection)
def count_backlog(connection):
return connection.run(
r.table('backlog', read_mode=READ_MODE)
.count())
@register_query(RethinkDBConnection)
def write_vote(connection, vote):
return connection.run(
r.table('votes')
.insert(vote))
@register_query(RethinkDBConnection)
def get_genesis_block(connection):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.filter(util.is_genesis_block)
.nth(0))
@register_query(RethinkDBConnection)
def get_last_voted_block(connection, node_pubkey):
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = connection.run(
r.table('votes', read_mode=READ_MODE)
.filter(r.row['node_pubkey'] == node_pubkey)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(connection.run(
r.table('votes', read_mode=READ_MODE)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == node_pubkey)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return get_genesis_block(connection)
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get(last_block_id))
@register_query(RethinkDBConnection)
def get_unvoted_blocks(connection, node_pubkey):
unvoted = connection.run(
r.table('bigchain', read_mode=READ_MODE)
.filter(lambda block: r.table('votes', read_mode=READ_MODE)
.get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks

View File

@ -0,0 +1,107 @@
"""Utils to initialize and drop the database."""
import logging
import rethinkdb as r
from bigchaindb import backend
from bigchaindb.common import exceptions
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logging.getLogger(__name__)
register_schema = module_dispatch_registrar(backend.schema)
@register_schema(RethinkDBConnection)
def create_database(connection, dbname):
if connection.run(r.db_list().contains(dbname)):
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(dbname))
logger.info('Create database `%s`.', dbname)
connection.run(r.db_create(dbname))
@register_schema(RethinkDBConnection)
def create_tables(connection, dbname):
for table_name in ['bigchain', 'backlog', 'votes']:
logger.info('Create `%s` table.', table_name)
connection.run(r.db(dbname).table_create(table_name))
@register_schema(RethinkDBConnection)
def create_indexes(connection, dbname):
create_bigchain_secondary_index(connection, dbname)
create_backlog_secondary_index(connection, dbname)
create_votes_secondary_index(connection, dbname)
@register_schema(RethinkDBConnection)
def drop_database(connection, dbname):
try:
logger.info('Drop database `%s`', dbname)
connection.run(r.db_drop(dbname))
logger.info('Done.')
except r.ReqlOpFailedError:
raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(dbname))
def create_bigchain_secondary_index(connection, dbname):
logger.info('Create `bigchain` secondary index.')
# to order blocks by timestamp
connection.run(
r.db(dbname)
.table('bigchain')
.index_create('block_timestamp', r.row['block']['timestamp']))
# to query the bigchain for a transaction id
connection.run(
r.db(dbname)
.table('bigchain')
.index_create('transaction_id', r.row['block']['transactions']['id'], multi=True))
# secondary index for asset uuid
connection.run(
r.db(dbname)
.table('bigchain')
.index_create('asset_id', r.row['block']['transactions']['asset']['id'], multi=True))
# wait for rethinkdb to finish creating secondary indexes
connection.run(
r.db(dbname)
.table('bigchain')
.index_wait())
def create_backlog_secondary_index(connection, dbname):
logger.info('Create `backlog` secondary index.')
# compound index to read transactions from the backlog per assignee
connection.run(
r.db(dbname)
.table('backlog')
.index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']]))
# wait for rethinkdb to finish creating secondary indexes
connection.run(
r.db(dbname)
.table('backlog')
.index_wait())
def create_votes_secondary_index(connection, dbname):
logger.info('Create `votes` secondary index.')
# compound index to order votes by block id and node
connection.run(
r.db(dbname)
.table('votes')
.index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']]))
# wait for rethinkdb to finish creating secondary indexes
connection.run(
r.db(dbname)
.table('votes')
.index_wait())

View File

@ -0,0 +1,85 @@
"""Schema-providing interfaces for backend databases"""
from functools import singledispatch
import bigchaindb
from bigchaindb.backend.connection import connect
@singledispatch
def create_database(connection, dbname):
"""Create database to be used by BigchainDB.
Args:
dbname (str): the name of the database to create.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
"""
raise NotImplementedError
@singledispatch
def create_tables(connection, dbname):
"""Create the tables to be used by BigchainDB.
Args:
dbname (str): the name of the database to create tables for.
"""
raise NotImplementedError
@singledispatch
def create_indexes(connection, dbname):
"""Create the indexes to be used by BigchainDB.
Args:
dbname (str): the name of the database to create indexes for.
"""
raise NotImplementedError
@singledispatch
def drop_database(connection, dbname):
"""Drop the database used by BigchainDB.
Args:
dbname (str): the name of the database to drop.
Raises:
:exc:`~DatabaseDoesNotExist`: If the given :attr:`dbname` does not
exist as a database.
"""
raise NotImplementedError
def init_database(connection=None, dbname=None):
"""Initialize the configured backend for use with BigchainDB.
Creates a database with :attr:`dbname` with any required tables
and supporting indexes.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`): an
existing connection to use to initialize the database.
Creates one if not given.
dbname (str): the name of the database to create.
Defaults to the database name given in the BigchainDB
configuration.
Raises:
:exc:`~DatabaseAlreadyExists`: If the given :attr:`dbname` already
exists as a database.
"""
connection = connection or connect()
dbname = dbname or bigchaindb.config['database']['name']
create_database(connection, dbname)
create_tables(connection, dbname)
create_indexes(connection, dbname)

View File

@ -0,0 +1,20 @@
class ModuleDispatchRegistrationError(Exception):
"""Raised when there is a problem registering dispatched functions for a
module"""
def module_dispatch_registrar(module):
def dispatch_wrapper(obj_type):
def wrapper(func):
func_name = func.__name__
try:
dispatch_registrar = getattr(module, func_name)
return dispatch_registrar.register(obj_type)(func)
except AttributeError as ex:
raise ModuleDispatchRegistrationError(
("`{module}` does not contain a single-dispatchable "
"function named `{func}`. The module being registered "
"was not implemented correctly!").format(
func=func_name, module=module.__name__)) from ex
return wrapper
return dispatch_wrapper

View File

@ -22,7 +22,8 @@ import bigchaindb
import bigchaindb.config_utils
from bigchaindb.models import Transaction
from bigchaindb.util import ProcessGroup
from bigchaindb import db
from bigchaindb import backend
from bigchaindb.backend import schema
from bigchaindb.commands import utils
from bigchaindb import processes
@ -133,6 +134,17 @@ def run_export_my_pubkey(args):
# exits with exit code 1 (signals tha an error happened)
def _run_init():
# Try to access the keypair, throws an exception if it does not exist
b = bigchaindb.Bigchain()
schema.init_database(connection=b.connection)
logger.info('Create genesis block.')
b.create_genesis_block()
logger.info('Done, have fun!')
def run_init(args):
"""Initialize the database"""
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
@ -140,7 +152,7 @@ def run_init(args):
# 1. prompt the user to inquire whether they wish to drop the db
# 2. force the init, (e.g., via -f flag)
try:
db.init()
_run_init()
except DatabaseAlreadyExists:
print('The database already exists.', file=sys.stderr)
print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
@ -149,7 +161,16 @@ def run_init(args):
def run_drop(args):
"""Drop the database"""
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
db.drop(assume_yes=args.yes)
dbname = bigchaindb.config['database']['name']
if not args.yes:
response = input('Do you want to drop `{}` database? [y/n]: '.format(dbname))
if response != 'y':
return
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
schema.drop_database(conn, dbname)
def run_start(args):
@ -176,7 +197,7 @@ def run_start(args):
logger.info('RethinkDB started with PID %s' % proc.pid)
try:
db.init()
_run_init()
except DatabaseAlreadyExists:
pass
except KeypairNotFoundException:
@ -222,23 +243,25 @@ def run_load(args):
def run_set_shards(args):
conn = backend.connect()
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = r.table(table).config().run(db.get_conn())
table_config = conn.run(r.table(table).config())
num_replicas = len(table_config['shards'][0]['replicas'])
try:
r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas).run(db.get_conn())
conn.run(r.table(table).reconfigure(shards=args.num_shards, replicas=num_replicas))
except r.ReqlOpFailedError as e:
logger.warn(e)
def run_set_replicas(args):
conn = backend.connect()
for table in ['bigchain', 'backlog', 'votes']:
# See https://www.rethinkdb.com/api/python/config/
table_config = r.table(table).config().run(db.get_conn())
table_config = conn.run(r.table(table).config())
num_shards = len(table_config['shards'])
try:
r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas).run(db.get_conn())
conn.run(r.table(table).reconfigure(shards=num_shards, replicas=args.num_replicas))
except r.ReqlOpFailedError as e:
logger.warn(e)

View File

@ -10,7 +10,7 @@ import subprocess
import rethinkdb as r
import bigchaindb
from bigchaindb import db
from bigchaindb import backend
from bigchaindb.version import __version__
@ -39,11 +39,11 @@ def start_rethinkdb():
# of the database. This code assumes the tables are ready
# when the database is ready. This seems a valid assumption.
try:
conn = db.get_conn()
conn = backend.connect()
# Before checking if the db is ready, we need to query
# the server to check if it contains that db
if r.db_list().contains(dbname).run(conn):
r.db(dbname).wait().run(conn)
if conn.run(r.db_list().contains(dbname)):
conn.run(r.db(dbname).wait())
except (r.ReqlOpFailedError, r.ReqlDriverError) as exc:
raise StartupError('Error waiting for the database `{}` '
'to be ready'.format(dbname)) from exc

View File

@ -10,8 +10,7 @@ from bigchaindb.common.transaction import TransactionLink, Asset
import bigchaindb
from bigchaindb.db.utils import Connection, get_backend
from bigchaindb import config_utils, util
from bigchaindb import backend, config_utils, util
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Block, Transaction
@ -31,9 +30,7 @@ class Bigchain(object):
# return if transaction is in backlog
TX_IN_BACKLOG = 'backlog'
def __init__(self, host=None, port=None, dbname=None, backend=None,
public_key=None, private_key=None, keyring=[],
backlog_reassign_delay=None):
def __init__(self, public_key=None, private_key=None, keyring=[], connection=None, backlog_reassign_delay=None):
"""Initialize the Bigchain instance
A Bigchain instance has several configuration parameters (e.g. host).
@ -46,35 +43,25 @@ class Bigchain(object):
its default value (defined in bigchaindb.__init__).
Args:
host (str): hostname where RethinkDB is running.
port (int): port in which RethinkDB is running (usually 28015).
dbname (str): the name of the database to connect to (usually bigchain).
backend (:class:`~bigchaindb.db.backends.rethinkdb.RehinkDBBackend`):
the database backend to use.
public_key (str): the base58 encoded public key for the ED25519 curve.
private_key (str): the base58 encoded private key for the ED25519 curve.
keyring (list[str]): list of base58 encoded public keys of the federation nodes.
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
"""
config_utils.autoconfigure()
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname or bigchaindb.config['database']['name']
self.backend = backend or get_backend(host, port, dbname)
self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
self.consensus = BaseConsensusRules
# change RethinkDB read mode to majority. This ensures consistency in query results
self.read_mode = 'majority'
self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])
if not self.me or not self.me_private:
raise exceptions.KeypairNotFoundException()
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
def write_transaction(self, signed_transaction, durability='soft'):
def write_transaction(self, signed_transaction):
"""Write the transaction to bigchain.
When first writing a transaction to the bigchain the transaction will be kept in a backlog until
@ -100,7 +87,7 @@ class Bigchain(object):
signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog
return self.backend.write_transaction(signed_transaction)
return backend.query.write_transaction(self.connection, signed_transaction)
def reassign_transaction(self, transaction):
"""Assign a transaction to a new node
@ -126,8 +113,8 @@ class Bigchain(object):
# There is no other node to assign to
new_assignee = self.me
return self.backend.update_transaction(
transaction['id'],
return backend.query.update_transaction(
self.connection, transaction['id'],
{'assignee': new_assignee, 'assignment_timestamp': time()})
def delete_transaction(self, *transaction_id):
@ -140,7 +127,7 @@ class Bigchain(object):
The database response.
"""
return self.backend.delete_transaction(*transaction_id)
return backend.query.delete_transaction(self.connection, *transaction_id)
def get_stale_transactions(self):
"""Get a cursor of stale transactions.
@ -149,7 +136,7 @@ class Bigchain(object):
backlog after some amount of time specified in the configuration
"""
return self.backend.get_stale_transactions(self.backlog_reassign_delay)
return backend.query.get_stale_transactions(self.connection, self.backlog_reassign_delay)
def validate_transaction(self, transaction):
"""Validate a transaction.
@ -200,7 +187,7 @@ class Bigchain(object):
include_status (bool): also return the status of the block
the return value is then a tuple: (block, status)
"""
block = self.backend.get_block(block_id)
block = backend.query.get_block(self.connection, block_id)
status = None
if include_status:
@ -260,10 +247,10 @@ class Bigchain(object):
break
# Query the transaction in the target block and return
response = self.backend.get_transaction_from_block(txid, target_block_id)
response = backend.query.get_transaction_from_block(self.connection, txid, target_block_id)
if check_backlog:
response = self.backend.get_transaction_from_backlog(txid)
response = backend.query.get_transaction_from_backlog(self.connection, txid)
if response:
tx_status = self.TX_IN_BACKLOG
@ -304,7 +291,7 @@ class Bigchain(object):
"""
# First, get information on all blocks which contain this transaction
blocks = self.backend.get_blocks_status_from_transaction(txid)
blocks = backend.query.get_blocks_status_from_transaction(self.connection, txid)
if blocks:
# Determine the election status of each block
validity = {
@ -346,7 +333,7 @@ class Bigchain(object):
If no transaction exists for that asset it returns an empty list
`[]`
"""
txids = self.backend.get_txids_by_asset_id(asset_id)
txids = backend.query.get_txids_by_asset_id(self.connection, asset_id)
transactions = []
for txid in txids:
tx = self.get_transaction(txid)
@ -364,7 +351,7 @@ class Bigchain(object):
:class:`~bigchaindb.common.transaction.Asset` if the asset
exists else None.
"""
cursor = self.backend.get_asset_by_id(asset_id)
cursor = backend.query.get_asset_by_id(self.connection, asset_id)
cursor = list(cursor)
if cursor:
return Asset.from_dict(cursor[0]['asset'])
@ -385,7 +372,7 @@ class Bigchain(object):
"""
# checks if an input was already spent
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
transactions = list(self.backend.get_spent(txid, cid))
transactions = list(backend.query.get_spent(self.connection, txid, cid))
# a transaction_id should have been spent at most one time
if transactions:
@ -422,7 +409,7 @@ class Bigchain(object):
"""
# get all transactions in which owner is in the `owners_after` list
response = self.backend.get_owned_ids(owner)
response = backend.query.get_owned_ids(self.connection, owner)
owned = []
for tx in response:
@ -507,7 +494,7 @@ class Bigchain(object):
but the vote is invalid.
"""
votes = list(self.backend.get_votes_by_block_id_and_voter(block_id, self.me))
votes = list(backend.query.get_votes_by_block_id_and_voter(self.connection, block_id, self.me))
if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
@ -522,17 +509,17 @@ class Bigchain(object):
return has_previous_vote
def write_block(self, block, durability='soft'):
def write_block(self, block):
"""Write a block to bigchain.
Args:
block (Block): block to write to bigchain.
"""
return self.backend.write_block(block.to_str(), durability=durability)
return backend.query.write_block(self.connection, block.to_str())
def transaction_exists(self, transaction_id):
return self.backend.has_transaction(transaction_id)
return backend.query.has_transaction(self.connection, transaction_id)
def prepare_genesis_block(self):
"""Prepare a genesis block."""
@ -561,13 +548,13 @@ class Bigchain(object):
# 2. create the block with one transaction
# 3. write the block to the bigchain
blocks_count = self.backend.count_blocks()
blocks_count = backend.query.count_blocks(self.connection)
if blocks_count:
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
block = self.prepare_genesis_block()
self.write_block(block, durability='hard')
self.write_block(block)
return block
@ -606,12 +593,12 @@ class Bigchain(object):
def write_vote(self, vote):
"""Write the vote to the database."""
return self.backend.write_vote(vote)
return backend.query.write_vote(self.connection, vote)
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
return Block.from_dict(self.backend.get_last_voted_block(self.me))
return Block.from_dict(backend.query.get_last_voted_block(self.connection, self.me))
def get_unvoted_blocks(self):
"""Return all the blocks that have not been voted on by this node.
@ -621,12 +608,12 @@ class Bigchain(object):
"""
# XXX: should this return instaces of Block?
return self.backend.get_unvoted_blocks(self.me)
return backend.query.get_unvoted_blocks(self.connection, self.me)
def block_election_status(self, block_id, voters):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = list(self.backend.get_votes_by_block_id(block_id))
votes = list(backend.query.get_votes_by_block_id(self.connection, block_id))
n_voters = len(voters)
voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes])

View File

@ -1,2 +0,0 @@
# TODO can we use explicit imports?
from bigchaindb.db.utils import * # noqa: F401,F403

View File

@ -1,414 +0,0 @@
"""Backend implementation for RethinkDB.
This module contains all the methods to store and retrieve data from RethinkDB.
"""
from time import time
import rethinkdb as r
from bigchaindb import util
from bigchaindb.db.utils import Connection
from bigchaindb.common import exceptions
class RethinkDBBackend:
def __init__(self, host=None, port=None, db=None):
"""Initialize a new RethinkDB Backend instance.
Args:
host (str): the host to connect to.
port (int): the port to connect to.
db (str): the name of the database to use.
"""
self.read_mode = 'majority'
self.durability = 'soft'
self.connection = Connection(host=host, port=port, db=db)
def write_transaction(self, signed_transaction):
"""Write a transaction to the backlog table.
Args:
signed_transaction (dict): a signed transaction.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.insert(signed_transaction, durability=self.durability))
def update_transaction(self, transaction_id, doc):
"""Update a transaction in the backlog table.
Args:
transaction_id (str): the id of the transaction.
doc (dict): the values to update.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.update(doc))
def delete_transaction(self, *transaction_id):
"""Delete a transaction from the backlog.
Args:
*transaction_id (str): the transaction(s) to delete
Returns:
The database response.
"""
return self.connection.run(
r.table('backlog')
.get_all(*transaction_id)
.delete(durability='hard'))
def get_stale_transactions(self, reassign_delay):
"""Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node,
but are still in the backlog after some amount of time specified in the
configuration.
Args:
reassign_delay (int): threshold (in seconds) to mark a transaction stale.
Returns:
A cursor of transactions.
"""
return self.connection.run(
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay))
def get_transaction_from_block(self, transaction_id, block_id):
"""Get a transaction from a specific block.
Args:
transaction_id (str): the id of the transaction.
block_id (str): the id of the block.
Returns:
The matching transaction.
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == transaction_id))[0]
def get_transaction_from_backlog(self, transaction_id):
"""Get a transaction from backlog.
Args:
transaction_id (str): the id of the transaction.
Returns:
The matching transaction.
"""
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.without('assignee', 'assignment_timestamp')
.default(None))
def get_blocks_status_from_transaction(self, transaction_id):
"""Retrieve block election information given a secondary index and value
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id')
.pluck('votes', 'id', {'block': ['voters']}))
def get_txids_by_asset_id(self, asset_id):
"""Retrieves transactions ids related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us
to query all the transactions related to a particular digital asset,
knowing the id.
Args:
asset_id (str): the id for this particular metadata.
Returns:
A list of transactions ids related to the asset. If no transaction
exists for that asset it returns an empty list `[]`
"""
# here we only want to return the transaction ids since later on when
# we are going to retrieve the transaction with status validation
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['asset']['id'] == asset_id)
.get_field('id'))
def get_asset_by_id(self, asset_id):
"""Returns the asset associated with an asset_id.
Args:
asset_id (str): The asset id.
Returns:
Returns a rethinkdb cursor.
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction:
transaction['asset']['id'] == asset_id)
.filter(lambda transaction:
transaction['operation'] == 'CREATE')
.pluck('asset'))
def get_spent(self, transaction_id, condition_id):
"""Check if a `txid` was already used as an input.
A transaction can be used as an input for another transaction. Bigchain needs to make sure that a
given `txid` is only used once.
Args:
transaction_id (str): The id of the transaction.
condition_id (int): The index of the condition in the respective transaction.
Returns:
The transaction that used the `txid` as an input else `None`
"""
# TODO: use index!
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['fulfillments'].contains(
lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id})))
def get_owned_ids(self, owner):
"""Retrieve a list of `txids` that can we used has inputs.
Args:
owner (str): base58 encoded public key.
Returns:
A cursor for the matching transactions.
"""
# TODO: use index!
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['conditions'].contains(
lambda c: c['owners_after'].contains(owner))))
def get_votes_by_block_id(self, block_id):
"""Get all the votes casted for a specific block.
Args:
block_id (str): the block id to use.
Returns:
A cursor for the matching votes.
"""
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')
.without('id'))
def get_votes_by_block_id_and_voter(self, block_id, node_pubkey):
"""Get all the votes casted for a specific block by a specific voter.
Args:
block_id (str): the block id to use.
node_pubkey (str): base58 encoded public key
Returns:
A cursor for the matching votes.
"""
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.get_all([block_id, node_pubkey], index='block_and_voter')
.without('id'))
def write_block(self, block, durability='soft'):
"""Write a block to the bigchain table.
Args:
block (dict): the block to write.
Returns:
The database response.
"""
return self.connection.run(
r.table('bigchain')
.insert(r.json(block), durability=durability))
def get_block(self, block_id):
"""Get a block from the bigchain table
Args:
block_id (str): block id of the block to get
Returns:
block (dict): the block or `None`
"""
return self.connection.run(r.table('bigchain').get(block_id))
def has_transaction(self, transaction_id):
"""Check if a transaction exists in the bigchain table.
Args:
transaction_id (str): the id of the transaction to check.
Returns:
``True`` if the transaction exists, ``False`` otherwise.
"""
return bool(self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id').count()))
def count_blocks(self):
"""Count the number of blocks in the bigchain table.
Returns:
The number of blocks.
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
def count_backlog(self):
"""Count the number of transactions in the backlog table.
Returns:
The number of transactions in the backlog.
"""
return self.connection.run(
r.table('backlog', read_mode=self.read_mode)
.count())
def write_vote(self, vote):
"""Write a vote to the votes table.
Args:
vote (dict): the vote to write.
Returns:
The database response.
"""
return self.connection.run(
r.table('votes')
.insert(vote))
def get_genesis_block(self):
"""Get the genesis block
Returns:
The genesis block
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)
.nth(0))
def get_last_voted_block(self, node_pubkey):
"""Get the last voted block for a specific node.
Args:
node_pubkey (str): base58 encoded public key.
Returns:
The last block the node has voted on. If the node didn't cast
any vote then the genesis block is returned.
"""
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == node_pubkey)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == node_pubkey)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return self.get_genesis_block()
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
def get_unvoted_blocks(self, node_pubkey):
"""Return all the blocks that have not been voted by the specified node.
Args:
node_pubkey (str): base58 encoded public key
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
unvoted = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks

View File

@ -1,197 +0,0 @@
"""Utils to initialize and drop the database."""
import time
import logging
from bigchaindb.common import exceptions
import rethinkdb as r
import bigchaindb
logger = logging.getLogger(__name__)
class Connection:
"""This class is a proxy to run queries against the database,
it is:
- lazy, since it creates a connection only when needed
- resilient, because before raising exceptions it tries
more times to run the query or open a connection.
"""
def __init__(self, host=None, port=None, db=None, max_tries=3):
"""Create a new Connection instance.
Args:
host (str, optional): the host to connect to.
port (int, optional): the port to connect to.
db (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up.
"""
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.db = db or bigchaindb.config['database']['name']
self.max_tries = max_tries
self.conn = None
def run(self, query):
"""Run a query.
Args:
query: the RethinkDB query.
"""
if self.conn is None:
self._connect()
for i in range(self.max_tries):
try:
return query.run(self.conn)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
self._connect()
def _connect(self):
for i in range(self.max_tries):
try:
self.conn = r.connect(host=self.host, port=self.port,
db=self.db)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
time.sleep(2**i)
def get_backend(host=None, port=None, db=None):
'''Get a backend instance.'''
from bigchaindb.db.backends import rethinkdb
# NOTE: this function will be re-implemented when we have real
# multiple backends to support. Right now it returns the RethinkDB one.
return rethinkdb.RethinkDBBackend(host=host or bigchaindb.config['database']['host'],
port=port or bigchaindb.config['database']['port'],
db=db or bigchaindb.config['database']['name'])
def get_conn():
'''Get the connection to the database.'''
return r.connect(host=bigchaindb.config['database']['host'],
port=bigchaindb.config['database']['port'],
db=bigchaindb.config['database']['name'])
def get_database_name():
return bigchaindb.config['database']['name']
def create_database(conn, dbname):
if r.db_list().contains(dbname).run(conn):
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(dbname))
logger.info('Create database `%s`.', dbname)
r.db_create(dbname).run(conn)
def create_table(conn, dbname, table_name):
logger.info('Create `%s` table.', table_name)
# create the table
r.db(dbname).table_create(table_name).run(conn)
def create_bigchain_secondary_index(conn, dbname):
logger.info('Create `bigchain` secondary index.')
# to order blocks by timestamp
r.db(dbname).table('bigchain')\
.index_create('block_timestamp', r.row['block']['timestamp'])\
.run(conn)
# to query the bigchain for a transaction id
r.db(dbname).table('bigchain')\
.index_create('transaction_id',
r.row['block']['transactions']['id'], multi=True)\
.run(conn)
# secondary index for asset uuid
r.db(dbname).table('bigchain')\
.index_create('asset_id',
r.row['block']['transactions']['asset']['id'], multi=True)\
.run(conn)
# wait for rethinkdb to finish creating secondary indexes
r.db(dbname).table('bigchain').index_wait().run(conn)
def create_backlog_secondary_index(conn, dbname):
logger.info('Create `backlog` secondary index.')
# compound index to read transactions from the backlog per assignee
r.db(dbname).table('backlog')\
.index_create('assignee__transaction_timestamp',
[r.row['assignee'], r.row['assignment_timestamp']])\
.run(conn)
# wait for rethinkdb to finish creating secondary indexes
r.db(dbname).table('backlog').index_wait().run(conn)
def create_votes_secondary_index(conn, dbname):
logger.info('Create `votes` secondary index.')
# compound index to order votes by block id and node
r.db(dbname).table('votes')\
.index_create('block_and_voter',
[r.row['vote']['voting_for_block'],
r.row['node_pubkey']])\
.run(conn)
# wait for rethinkdb to finish creating secondary indexes
r.db(dbname).table('votes').index_wait().run(conn)
def init_database():
conn = get_conn()
dbname = get_database_name()
create_database(conn, dbname)
table_names = ['bigchain', 'backlog', 'votes']
for table_name in table_names:
create_table(conn, dbname, table_name)
create_bigchain_secondary_index(conn, dbname)
create_backlog_secondary_index(conn, dbname)
create_votes_secondary_index(conn, dbname)
def init():
# Try to access the keypair, throws an exception if it does not exist
b = bigchaindb.Bigchain()
init_database()
logger.info('Create genesis block.')
b.create_genesis_block()
logger.info('Done, have fun!')
def drop(assume_yes=False):
conn = get_conn()
dbname = bigchaindb.config['database']['name']
if assume_yes:
response = 'y'
else:
response = input('Do you want to drop `{}` database? [y/n]: '.format(dbname))
if response == 'y':
try:
logger.info('Drop database `%s`', dbname)
r.db_drop(dbname).run(conn)
logger.info('Done.')
except r.ReqlOpFailedError:
raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(dbname))
else:
logger.info('Drop aborted')

View File

@ -0,0 +1,61 @@
###############################################
:mod:`bigchaindb.backend` -- Backend Interfaces
###############################################
.. automodule:: bigchaindb.backend
:special-members: __init__
Generic Backend
===============
:mod:`bigchaindb.backend.connection` -- Connection
--------------------------------------------------
.. automodule:: bigchaindb.backend.connection
:mod:`bigchaindb.backend.schema` -- Schema
------------------------------------------
.. automodule:: bigchaindb.backend.schema
:mod:`bigchaindb.backend.query` -- Query
----------------------------------------
.. automodule:: bigchaindb.backend.query
:mod:`bigchaindb.backend.changefeed` -- Changefeed
--------------------------------------------------
.. automodule:: bigchaindb.backend.changefeed
:mod:`bigchaindb.backend.utils`
-------------------------------
.. automodule:: bigchaindb.backend.utils
:mod:`bigchaindb.backend.rethinkdb` -- RethinkDB Backend
========================================================
.. automodule:: bigchaindb.backend.rethinkdb
:special-members: __init__
:mod:`bigchaindb.backend.rethinkdb.connection`
----------------------------------------------
.. automodule:: bigchaindb.backend.rethinkdb.connection
:special-members: __init__
:mod:`bigchaindb.backend.rethinkdb.schema`
------------------------------------------
.. automodule:: bigchaindb.backend.rethinkdb.schema
:mod:`bigchaindb.backend.rethinkdb.query`
-----------------------------------------
.. automodule:: bigchaindb.backend.rethinkdb.query
:mod:`bigchaindb.backend.rethinkdb.changefeed`
----------------------------------------------
.. automodule:: bigchaindb.backend.rethinkdb.changefeed
MongoDB Backend
===============
Stay tuned!

View File

@ -15,6 +15,7 @@ Appendices
the-Bigchain-class
consensus
pipelines
backend
aws-setup
firewall-notes
ntp-notes

View File

@ -45,6 +45,7 @@ extensions = [
'sphinx.ext.intersphinx',
'sphinx.ext.coverage',
'sphinx.ext.viewcode',
'sphinx.ext.todo',
'sphinx.ext.napoleon',
'sphinxcontrib.httpdomain',
'sphinx.ext.autosectionlabel',
@ -60,6 +61,8 @@ autodoc_default_flags = [
'members',
]
todo_include_todos = True
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

View File

@ -91,7 +91,7 @@ def test_get_asset_id_transfer_transaction(b, user_pk, user_sk):
tx_transfer_signed = tx_transfer.sign([user_sk])
# create a block
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -130,7 +130,7 @@ def test_get_transactions_by_asset_id(b, user_pk, user_sk):
tx_transfer_signed = tx_transfer.sign([user_sk])
# create the block
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -163,7 +163,7 @@ def test_get_transactions_by_asset_id_with_invalid_block(b, user_pk, user_sk):
tx_transfer_signed = tx_transfer.sign([user_sk])
# create the block
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
b.write_vote(vote)
@ -187,7 +187,7 @@ def test_get_asset_by_id(b, user_pk, user_sk):
tx_transfer_signed = tx_transfer.sign([user_sk])
# create the block
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)

View File

@ -141,7 +141,7 @@ def test_single_in_single_own_single_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -175,7 +175,7 @@ def test_single_in_single_own_multiple_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -211,7 +211,7 @@ def test_single_in_single_own_single_out_multiple_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -252,7 +252,7 @@ def test_single_in_single_own_multiple_out_mix_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -294,7 +294,7 @@ def test_single_in_multiple_own_single_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -333,7 +333,7 @@ def test_multiple_in_single_own_single_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -370,7 +370,7 @@ def test_multiple_in_multiple_own_single_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -415,7 +415,7 @@ def test_muiltiple_in_mix_own_multiple_out_single_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -460,7 +460,7 @@ def test_muiltiple_in_mix_own_multiple_out_mix_own_transfer(b, user_pk,
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -512,7 +512,7 @@ def test_multiple_in_different_transactions(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -528,7 +528,7 @@ def test_multiple_in_different_transactions(b, user_pk, user_sk):
# create block
block = b.create_block([tx_transfer1_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -569,7 +569,7 @@ def test_amount_error_transfer(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -612,7 +612,7 @@ def test_threshold_same_public_key(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -641,7 +641,7 @@ def test_sum_amount(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -670,7 +670,7 @@ def test_divide(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -703,7 +703,7 @@ def test_non_positive_amounts_on_transfer(b, user_pk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
@ -729,7 +729,7 @@ def test_non_positive_amounts_on_transfer_validate(b, user_pk, user_sk):
# create block
block = b.create_block([tx_create_signed])
assert block.validate(b) == block
b.write_block(block, durability='hard')
b.write_block(block)
# vote
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)

View File

@ -0,0 +1,33 @@
import pytest
def test_get_connection_returns_the_correct_instance():
from bigchaindb.backend import connect
from bigchaindb.backend.connection import Connection
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
config = {
'backend': 'rethinkdb',
'host': 'localhost',
'port': 28015,
'name': 'test'
}
conn = connect(**config)
assert isinstance(conn, Connection)
assert isinstance(conn, RethinkDBConnection)
def test_get_connection_raises_a_configuration_error(monkeypatch):
from bigchaindb.common.exceptions import ConfigurationError
from bigchaindb.backend import connect
with pytest.raises(ConfigurationError):
connect('msaccess', 'localhost', '1337', 'mydb')
with pytest.raises(ConfigurationError):
# We need to force a misconfiguration here
monkeypatch.setattr('bigchaindb.backend.connection.BACKENDS',
{'catsandra': 'bigchaindb.backend.meowmeow.Catsandra'})
connect('catsandra', 'localhost', '1337', 'mydb')

View File

@ -0,0 +1,45 @@
from pytest import mark, raises
@mark.parametrize('schema_func_name,args_qty', (
('create_database', 1),
('create_tables', 1),
('create_indexes', 1),
('drop_database', 1),
))
def test_schema(schema_func_name, args_qty):
from bigchaindb.backend import schema
schema_func = getattr(schema, schema_func_name)
with raises(NotImplementedError):
schema_func(None, *range(args_qty))
@mark.parametrize('query_func_name,args_qty', (
('write_transaction', 1),
('count_blocks', 0),
('count_backlog', 0),
('get_genesis_block', 0),
('delete_transaction', 1),
('get_stale_transactions', 1),
('get_blocks_status_from_transaction', 1),
('get_transaction_from_backlog', 1),
('get_txids_by_asset_id', 1),
('get_asset_by_id', 1),
('get_owned_ids', 1),
('get_votes_by_block_id', 1),
('write_block', 1),
('get_block', 1),
('has_transaction', 1),
('write_vote', 1),
('get_last_voted_block', 1),
('get_unvoted_blocks', 1),
('get_spent', 2),
('get_votes_by_block_id_and_voter', 2),
('update_transaction', 2),
('get_transaction_from_block', 2),
))
def test_query(query_func_name, args_qty):
from bigchaindb.backend import query
query_func = getattr(query, query_func_name)
with raises(NotImplementedError):
query_func(None, *range(args_qty))

View File

@ -0,0 +1,72 @@
from functools import singledispatch
from types import ModuleType
import pytest
@pytest.fixture
def mock_module():
return ModuleType('mock_module')
def test_module_dispatch_registers(mock_module):
from bigchaindb.backend.utils import module_dispatch_registrar
@singledispatch
def dispatcher(t):
pass
mock_module.dispatched = dispatcher
mock_dispatch = module_dispatch_registrar(mock_module)
@mock_dispatch(str)
def dispatched(t):
pass
assert mock_module.dispatched.registry[str] == dispatched
def test_module_dispatch_dispatches(mock_module):
from bigchaindb.backend.utils import module_dispatch_registrar
@singledispatch
def dispatcher(t):
return False
mock_module.dispatched = dispatcher
mock_dispatch = module_dispatch_registrar(mock_module)
@mock_dispatch(str)
def dispatched(t):
return True
assert mock_module.dispatched(1) is False # Goes to dispatcher()
assert mock_module.dispatched('1') is True # Goes to dispatched()
def test_module_dispatch_errors_on_missing_func(mock_module):
from bigchaindb.backend.utils import (
module_dispatch_registrar,
ModuleDispatchRegistrationError,
)
mock_dispatch = module_dispatch_registrar(mock_module)
with pytest.raises(ModuleDispatchRegistrationError):
@mock_dispatch(str)
def dispatched():
pass
def test_module_dispatch_errors_on_non_dispatchable_func(mock_module):
from bigchaindb.backend.utils import (
module_dispatch_registrar,
ModuleDispatchRegistrationError,
)
def dispatcher():
pass
mock_module.dispatched = dispatcher
mock_dispatch = module_dispatch_registrar(mock_module)
with pytest.raises(ModuleDispatchRegistrationError):
@mock_dispatch(str)
def dispatched():
pass

View File

@ -16,11 +16,11 @@ DB_NAME = 'bigchain_test_{}'.format(os.getpid())
CONFIG = {
'database': {
'name': DB_NAME
'name': DB_NAME,
},
'keypair': {
'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA',
'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8'
'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8',
}
}
@ -29,6 +29,15 @@ USER_PRIVATE_KEY = '8eJ8q9ZQpReWyQT5aFCiwtZ5wDZC4eDnCen88p3tQ6ie'
USER_PUBLIC_KEY = 'JEAkEJqLbbgDRAtMm8YAjGp759Aq2qTn9eaEHUj2XePE'
def pytest_addoption(parser):
from bigchaindb.backend import connection
backends = ', '.join(connection.BACKENDS.keys())
parser.addoption('--database-backend', action='store', default='rethinkdb',
help='Defines the backend to use (available: {})'.format(backends))
# We need this function to avoid loading an existing
# conf file located in the home of the user running
# the tests. If it's too aggressive we can change it
@ -49,8 +58,10 @@ def restore_config(request, node_config):
@pytest.fixture(scope='module')
def node_config():
return copy.deepcopy(CONFIG)
def node_config(request):
config = copy.deepcopy(CONFIG)
config['database']['backend'] = request.config.getoption('--database-backend')
return config
@pytest.fixture

View File

@ -10,10 +10,11 @@ import pytest
import rethinkdb as r
from bigchaindb import Bigchain
from bigchaindb.db import get_conn, init_database
from bigchaindb.backend import connect, schema
from bigchaindb.common import crypto
from bigchaindb.common.exceptions import DatabaseAlreadyExists
USER2_SK, USER2_PK = crypto.generate_key_pair()
@ -27,13 +28,13 @@ def restore_config(request, node_config):
def setup_database(request, node_config):
print('Initializing test db')
db_name = node_config['database']['name']
conn = get_conn()
conn = connect()
if r.db_list().contains(db_name).run(conn):
r.db_drop(db_name).run(conn)
if conn.run(r.db_list().contains(db_name)):
conn.run(r.db_drop(db_name))
try:
init_database()
schema.init_database()
except DatabaseAlreadyExists:
print('Database already exists.')
@ -41,9 +42,9 @@ def setup_database(request, node_config):
def fin():
print('Deleting `{}` database'.format(db_name))
get_conn().repl()
conn = connect()
try:
r.db_drop(db_name).run()
conn.run(r.db_drop(db_name))
except r.ReqlOpFailedError as e:
if e.message != 'Database `{}` does not exist.'.format(db_name):
raise
@ -57,11 +58,11 @@ def cleanup_tables(request, node_config):
db_name = node_config['database']['name']
def fin():
get_conn().repl()
conn = connect()
try:
r.db(db_name).table('bigchain').delete().run()
r.db(db_name).table('backlog').delete().run()
r.db(db_name).table('votes').delete().run()
conn.run(r.db(db_name).table('bigchain').delete())
conn.run(r.db(db_name).table('backlog').delete())
conn.run(r.db(db_name).table('votes').delete())
except r.ReqlOpFailedError as e:
if e.message != 'Database `{}` does not exist.'.format(db_name):
raise
@ -88,7 +89,7 @@ def inputs(user_pk):
for i in range(10)
]
block = b.create_block(transactions)
b.write_block(block, durability='hard')
b.write_block(block)
# 3. vote the blocks valid, so that the inputs are valid
vote = b.vote(block.id, prev_block_id, True)
@ -126,7 +127,7 @@ def inputs_shared(user_pk, user2_pk):
for i in range(10)
]
block = b.create_block(transactions)
b.write_block(block, durability='hard')
b.write_block(block)
# 3. vote the blocks valid, so that the inputs are valid
vote = b.vote(block.id, prev_block_id, True)

View File

View File

@ -0,0 +1,130 @@
import pytest
import rethinkdb as r
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend.rethinkdb import schema
from ..conftest import setup_database as _setup_database
# Since we are testing database initialization and database drop,
# we need to use the `setup_database` fixture on a function level
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
_setup_database(request, node_config)
def test_init_creates_db_tables_and_indexes():
from bigchaindb.backend.schema import init_database
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures so we need to remove it
conn.run(r.db_drop(dbname))
init_database()
assert conn.run(r.db_list().contains(dbname)) is True
assert conn.run(r.db(dbname).table_list().contains('backlog', 'bigchain')) is True
assert conn.run(r.db(dbname).table('bigchain').index_list().contains(
'block_timestamp')) is True
assert conn.run(r.db(dbname).table('backlog').index_list().contains(
'assignee__transaction_timestamp')) is True
def test_init_database_fails_if_db_exists():
from bigchaindb.backend.schema import init_database
from bigchaindb.common import exceptions
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert conn.run(r.db_list().contains(dbname)) is True
with pytest.raises(exceptions.DatabaseAlreadyExists):
init_database()
def test_create_database():
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
conn.run(r.db_drop(dbname))
schema.create_database(conn, dbname)
assert conn.run(r.db_list().contains(dbname)) is True
def test_create_tables():
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
conn.run(r.db_drop(dbname))
schema.create_database(conn, dbname)
schema.create_tables(conn, dbname)
assert conn.run(r.db(dbname).table_list().contains('bigchain')) is True
assert conn.run(r.db(dbname).table_list().contains('backlog')) is True
assert conn.run(r.db(dbname).table_list().contains('votes')) is True
assert len(conn.run(r.db(dbname).table_list())) == 3
def test_create_secondary_indexes():
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
conn.run(r.db_drop(dbname))
schema.create_database(conn, dbname)
schema.create_tables(conn, dbname)
schema.create_indexes(conn, dbname)
# Bigchain table
assert conn.run(r.db(dbname).table('bigchain').index_list().contains(
'block_timestamp')) is True
assert conn.run(r.db(dbname).table('bigchain').index_list().contains(
'transaction_id')) is True
assert conn.run(r.db(dbname).table('bigchain').index_list().contains(
'asset_id')) is True
# Backlog table
assert conn.run(r.db(dbname).table('backlog').index_list().contains(
'assignee__transaction_timestamp')) is True
# Votes table
assert conn.run(r.db(dbname).table('votes').index_list().contains(
'block_and_voter')) is True
def test_drop():
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert conn.run(r.db_list().contains(dbname)) is True
schema.drop_database(conn, dbname)
assert conn.run(r.db_list().contains(dbname)) is False
def test_drop_non_existent_db_raises_an_error():
from bigchaindb.common import exceptions
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert conn.run(r.db_list().contains(dbname)) is True
schema.drop_database(conn, dbname)
with pytest.raises(exceptions.DatabaseDoesNotExist):
schema.drop_database(conn, dbname)

View File

@ -41,7 +41,7 @@ class TestBigchainApi(object):
tx = tx.sign([b.me_private])
monkeypatch.setattr('time.time', lambda: 1)
block1 = b.create_block([tx])
b.write_block(block1, durability='hard')
b.write_block(block1)
# Manipulate vote to create a cyclic Blockchain
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
@ -79,7 +79,7 @@ class TestBigchainApi(object):
monkeypatch.setattr('time.time', lambda: 1)
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
assert b.has_previous_vote(block.id, block.voters) is False
@ -99,21 +99,21 @@ class TestBigchainApi(object):
monkeypatch.setattr('time.time', lambda: 1)
block1 = b.create_block([tx])
b.write_block(block1, durability='hard')
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 2)
transfer_tx = Transaction.transfer(tx.to_inputs(), [([b.me], 1)],
tx.asset)
transfer_tx = transfer_tx.sign([b.me_private])
block2 = b.create_block([transfer_tx])
b.write_block(block2, durability='hard')
b.write_block(block2)
monkeypatch.setattr('time.time', lambda: 3333333333)
transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([b.me], 1)],
tx.asset)
transfer_tx2 = transfer_tx2.sign([b.me_private])
block3 = b.create_block([transfer_tx2])
b.write_block(block3, durability='hard')
b.write_block(block3)
# Vote both block2 and block3 valid to provoke a double spend
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
@ -135,11 +135,11 @@ class TestBigchainApi(object):
monkeypatch.setattr('time.time', lambda: 1)
block1 = b.create_block([tx])
b.write_block(block1, durability='hard')
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 2222222222)
block2 = b.create_block([tx])
b.write_block(block2, durability='hard')
b.write_block(block2)
# Vote both blocks valid (creating a double spend)
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
@ -159,13 +159,13 @@ class TestBigchainApi(object):
tx1 = Transaction.create([b.me], [([b.me], 1)])
tx1 = tx1.sign([b.me_private])
block1 = b.create_block([tx1])
b.write_block(block1, durability='hard')
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 2222222222)
tx2 = Transaction.create([b.me], [([b.me], 1)])
tx2 = tx2.sign([b.me_private])
block2 = b.create_block([tx2])
b.write_block(block2, durability='hard')
b.write_block(block2)
# vote the first block invalid
vote = b.vote(block1.id, b.get_last_voted_block().id, False)
@ -209,7 +209,7 @@ class TestBigchainApi(object):
# create block and write it to the bighcain before retrieving the transaction
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
response, status = b.get_transaction(tx.id, include_status=True)
# add validity information, which will be returned
@ -229,7 +229,7 @@ class TestBigchainApi(object):
# create block
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
@ -255,7 +255,7 @@ class TestBigchainApi(object):
# create block
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
@ -270,7 +270,8 @@ class TestBigchainApi(object):
@pytest.mark.usefixtures('inputs')
def test_genesis_block(self, b):
block = b.backend.get_genesis_block()
from bigchaindb.backend import query
block = query.get_genesis_block(b.connection)
assert len(block['block']['transactions']) == 1
assert block['block']['transactions'][0]['operation'] == 'GENESIS'
@ -286,8 +287,9 @@ class TestBigchainApi(object):
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
def test_get_last_block(self, b):
from bigchaindb.backend import query
# get the number of blocks
num_blocks = b.backend.count_blocks()
num_blocks = query.count_blocks(b.connection)
# get the last block
last_block = b.get_last_block()
@ -305,7 +307,7 @@ class TestBigchainApi(object):
def test_get_previous_block(self, b):
last_block = b.get_last_block()
new_block = b.create_block([])
b.write_block(new_block, durability='hard')
b.write_block(new_block)
prev_block = b.get_previous_block(new_block)
@ -315,7 +317,7 @@ class TestBigchainApi(object):
def test_get_previous_block_id(self, b):
last_block = b.get_last_block()
new_block = b.create_block([])
b.write_block(new_block, durability='hard')
b.write_block(new_block)
prev_block_id = b.get_previous_block_id(new_block)
@ -332,7 +334,7 @@ class TestBigchainApi(object):
@pytest.mark.usefixtures('inputs')
def test_get_block_by_id(self, b):
new_block = dummy_block()
b.write_block(new_block, durability='hard')
b.write_block(new_block)
assert b.get_block(new_block.id) == new_block.to_dict()
block, status = b.get_block(new_block.id, include_status=True)
@ -340,9 +342,10 @@ class TestBigchainApi(object):
def test_get_last_voted_block_returns_genesis_if_no_votes_has_been_casted(self, b):
from bigchaindb.models import Block
from bigchaindb.backend import query
b.create_genesis_block()
genesis = b.backend.get_genesis_block()
genesis = query.get_genesis_block(b.connection)
genesis = Block.from_dict(genesis)
gb = b.get_last_voted_block()
assert gb == genesis
@ -360,9 +363,9 @@ class TestBigchainApi(object):
monkeypatch.setattr('time.time', lambda: 3)
block_3 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_2, durability='hard')
b.write_block(block_3, durability='hard')
b.write_block(block_1)
b.write_block(block_2)
b.write_block(block_3)
# make sure all the votes are written with the same timestamps
monkeypatch.setattr('time.time', lambda: 4)
@ -387,9 +390,9 @@ class TestBigchainApi(object):
monkeypatch.setattr('time.time', lambda: 3)
block_3 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_2, durability='hard')
b.write_block(block_3, durability='hard')
b.write_block(block_1)
b.write_block(block_2)
b.write_block(block_3)
# make sure all the votes are written with different timestamps
monkeypatch.setattr('time.time', lambda: 4)
@ -409,7 +412,7 @@ class TestBigchainApi(object):
genesis = b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_1)
b.write_vote(b.vote(block_1.id, genesis.id, True))
retrieved_block_1 = b.get_block(block_1.id)
@ -427,7 +430,7 @@ class TestBigchainApi(object):
b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_1)
# insert duplicate votes
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
vote_2 = b.vote(block_1.id, b.get_last_voted_block().id, True)
@ -445,7 +448,7 @@ class TestBigchainApi(object):
genesis = b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_1)
# insert duplicate votes
for i in range(2):
b.write_vote(b.vote(block_1.id, genesis.id, True))
@ -465,7 +468,7 @@ class TestBigchainApi(object):
b.create_genesis_block()
block_1 = dummy_block()
b.write_block(block_1, durability='hard')
b.write_block(block_1)
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
# mangle the signature
vote_1['signature'] = 'a' * 87
@ -477,6 +480,7 @@ class TestBigchainApi(object):
@pytest.mark.usefixtures('inputs')
def test_assign_transaction_one_node(self, b, user_pk, user_sk):
from bigchaindb.backend import query
from bigchaindb.models import Transaction
input_tx = b.get_owned_ids(user_pk).pop()
@ -487,13 +491,14 @@ class TestBigchainApi(object):
b.write_transaction(tx)
# retrieve the transaction
response = list(b.backend.get_stale_transactions(0))[0]
response = list(query.get_stale_transactions(b.connection, 0))[0]
# check if the assignee is the current node
assert response['assignee'] == b.me
@pytest.mark.usefixtures('inputs')
def test_assign_transaction_multiple_nodes(self, b, user_pk, user_sk):
from bigchaindb.backend import query
from bigchaindb.common.crypto import generate_key_pair
from bigchaindb.models import Transaction
@ -511,13 +516,12 @@ class TestBigchainApi(object):
b.write_transaction(tx)
# retrieve the transaction
response = b.backend.get_stale_transactions(0)
response = query.get_stale_transactions(b.connection, 0)
# check if the assignee is one of the _other_ federation nodes
for tx in response:
assert tx['assignee'] in b.nodes_except_me
@pytest.mark.usefixtures('inputs')
def test_non_create_input_not_found(self, b, user_pk):
from cryptoconditions import Ed25519Fulfillment
@ -537,6 +541,7 @@ class TestBigchainApi(object):
tx.validate(Bigchain())
def test_count_backlog(self, b, user_pk):
from bigchaindb.backend import query
from bigchaindb.models import Transaction
for _ in range(4):
@ -544,7 +549,7 @@ class TestBigchainApi(object):
[([user_pk], 1)]).sign([b.me_private])
b.write_transaction(tx)
assert b.backend.count_backlog() == 4
assert query.count_backlog(b.connection) == 4
class TestTransactionValidation(object):
@ -605,7 +610,7 @@ class TestTransactionValidation(object):
b.write_transaction(signed_transfer_tx)
block = b.create_block([signed_transfer_tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
@ -636,7 +641,7 @@ class TestTransactionValidation(object):
# create block
block = b.create_block([transfer_tx])
assert b.validate_block(block) == block
b.write_block(block, durability='hard')
b.write_block(block)
# check that the transaction is still valid after being written to the
# bigchain
@ -660,7 +665,7 @@ class TestTransactionValidation(object):
# create block
block = b.create_block([transfer_tx])
b.write_block(block, durability='hard')
b.write_block(block)
# create transaction with the undecided input
tx_invalid = Transaction.transfer(transfer_tx.to_inputs(),
@ -805,7 +810,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
@ -838,7 +843,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
@ -866,7 +871,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
owned_inputs_user2 = b.get_owned_ids(user2_pk)
@ -876,7 +881,7 @@ class TestMultipleInputs(object):
tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset)
tx = tx.sign([user_sk])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
owned_inputs_user2 = b.get_owned_ids(user2_pk)
@ -896,7 +901,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block VALID
vote = b.vote(block.id, genesis.id, True)
@ -913,7 +918,7 @@ class TestMultipleInputs(object):
tx.asset)
tx_invalid = tx_invalid.sign([user_sk])
block = b.create_block([tx_invalid])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
@ -941,7 +946,7 @@ class TestMultipleInputs(object):
asset=asset)
tx_create_signed = tx_create.sign([b.me_private])
block = b.create_block([tx_create_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# get input
owned_inputs_user1 = b.get_owned_ids(user_pk)
@ -958,7 +963,7 @@ class TestMultipleInputs(object):
asset=tx_create.asset)
tx_transfer_signed = tx_transfer.sign([user_sk])
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
owned_inputs_user2 = b.get_owned_ids(user2_pk)
@ -977,7 +982,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk, user2_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
owned_inputs_user2 = b.get_owned_ids(user2_pk)
@ -989,7 +994,7 @@ class TestMultipleInputs(object):
tx = Transaction.transfer(tx.to_inputs(), [([user3_pk], 1)], tx.asset)
tx = tx.sign([user_sk, user2_sk])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
owned_inputs_user2 = b.get_owned_ids(user2_pk)
@ -1005,7 +1010,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk).pop()
@ -1019,7 +1024,7 @@ class TestMultipleInputs(object):
tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset)
tx = tx.sign([user_sk])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
spent_inputs_user1 = b.get_spent(input_txid, input_cid)
assert spent_inputs_user1 == tx
@ -1036,7 +1041,7 @@ class TestMultipleInputs(object):
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block VALID
vote = b.vote(block.id, genesis.id, True)
@ -1054,7 +1059,7 @@ class TestMultipleInputs(object):
tx = Transaction.transfer(tx.to_inputs(), [([user2_pk], 1)], tx.asset)
tx = tx.sign([user_sk])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
@ -1083,7 +1088,7 @@ class TestMultipleInputs(object):
asset=asset)
tx_create_signed = tx_create.sign([b.me_private])
block = b.create_block([tx_create_signed])
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
@ -1097,7 +1102,7 @@ class TestMultipleInputs(object):
asset=tx_create.asset)
tx_transfer_signed = tx_transfer.sign([user_sk])
block = b.create_block([tx_transfer_signed])
b.write_block(block, durability='hard')
b.write_block(block)
# check that used inputs are marked as spent
for ffill in tx_create.to_inputs()[:2]:
@ -1124,7 +1129,7 @@ class TestMultipleInputs(object):
tx = tx.sign([b.me_private])
transactions.append(tx)
block = b.create_block(transactions)
b.write_block(block, durability='hard')
b.write_block(block)
owned_inputs_user1 = b.get_owned_ids(user_pk)
@ -1137,7 +1142,7 @@ class TestMultipleInputs(object):
[([user3_pk], 1)], transactions[0].asset)
tx = tx.sign([user_sk, user2_sk])
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# check that used inputs are marked as spent
assert b.get_spent(transactions[0].id, 0) == tx

View File

@ -1,201 +0,0 @@
import builtins
from bigchaindb.common import exceptions
import pytest
import rethinkdb as r
import bigchaindb
from bigchaindb.db import utils
from .conftest import setup_database as _setup_database
# Since we are testing database initialization and database drop,
# we need to use the `setup_database` fixture on a function level
@pytest.fixture(scope='function', autouse=True)
def setup_database(request, node_config):
_setup_database(request, node_config)
def test_init_creates_db_tables_and_indexes():
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures so we need to remove it
r.db_drop(dbname).run(conn)
utils.init()
assert r.db_list().contains(dbname).run(conn) is True
assert r.db(dbname).table_list().contains('backlog', 'bigchain').run(conn) is True
assert r.db(dbname).table('bigchain').index_list().contains(
'block_timestamp').run(conn) is True
assert r.db(dbname).table('backlog').index_list().contains(
'assignee__transaction_timestamp').run(conn) is True
def test_create_database():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
assert r.db_list().contains(dbname).run(conn) is True
def test_create_bigchain_table():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'bigchain')
assert r.db(dbname).table_list().contains('bigchain').run(conn) is True
assert r.db(dbname).table_list().contains('backlog').run(conn) is False
assert r.db(dbname).table_list().contains('votes').run(conn) is False
def test_create_bigchain_secondary_index():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'bigchain')
utils.create_bigchain_secondary_index(conn, dbname)
assert r.db(dbname).table('bigchain').index_list().contains(
'block_timestamp').run(conn) is True
assert r.db(dbname).table('bigchain').index_list().contains(
'transaction_id').run(conn) is True
def test_create_backlog_table():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'backlog')
assert r.db(dbname).table_list().contains('backlog').run(conn) is True
assert r.db(dbname).table_list().contains('bigchain').run(conn) is False
assert r.db(dbname).table_list().contains('votes').run(conn) is False
def test_create_backlog_secondary_index():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'backlog')
utils.create_backlog_secondary_index(conn, dbname)
assert r.db(dbname).table('backlog').index_list().contains(
'assignee__transaction_timestamp').run(conn) is True
def test_create_votes_table():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'votes')
assert r.db(dbname).table_list().contains('votes').run(conn) is True
assert r.db(dbname).table_list().contains('bigchain').run(conn) is False
assert r.db(dbname).table_list().contains('backlog').run(conn) is False
def test_create_votes_secondary_index():
conn = utils.get_conn()
dbname = utils.get_database_name()
# The db is set up by fixtures so we need to remove it
# and recreate it just with one table
r.db_drop(dbname).run(conn)
utils.create_database(conn, dbname)
utils.create_table(conn, dbname, 'votes')
utils.create_votes_secondary_index(conn, dbname)
assert r.db(dbname).table('votes').index_list().contains(
'block_and_voter').run(conn) is True
def test_init_fails_if_db_exists():
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert r.db_list().contains(dbname).run(conn) is True
with pytest.raises(exceptions.DatabaseAlreadyExists):
utils.init()
def test_drop_interactively_drops_the_database_when_user_says_yes(monkeypatch):
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert r.db_list().contains(dbname).run(conn) is True
monkeypatch.setattr(builtins, 'input', lambda x: 'y')
utils.drop()
assert r.db_list().contains(dbname).run(conn) is False
def test_drop_programmatically_drops_the_database_when_assume_yes_is_true(monkeypatch):
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert r.db_list().contains(dbname).run(conn) is True
utils.drop(assume_yes=True)
assert r.db_list().contains(dbname).run(conn) is False
def test_drop_interactively_does_not_drop_the_database_when_user_says_no(monkeypatch):
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
print(r.db_list().contains(dbname).run(conn))
assert r.db_list().contains(dbname).run(conn) is True
monkeypatch.setattr(builtins, 'input', lambda x: 'n')
utils.drop()
assert r.db_list().contains(dbname).run(conn) is True
def test_drop_non_existent_db_raises_an_error():
conn = utils.get_conn()
dbname = bigchaindb.config['database']['name']
# The db is set up by fixtures
assert r.db_list().contains(dbname).run(conn) is True
utils.drop(assume_yes=True)
with pytest.raises(exceptions.DatabaseDoesNotExist):
utils.drop(assume_yes=True)

View File

@ -26,6 +26,7 @@ def inputs(user_pk):
@pytest.mark.usefixtures('processes')
def test_fast_double_create(b, user_pk):
from bigchaindb.models import Transaction
from bigchaindb.backend.query import count_blocks
tx = Transaction.create([b.me], [([user_pk], 1)],
metadata={'test': 'test'}) \
.sign([b.me_private])
@ -42,12 +43,13 @@ def test_fast_double_create(b, user_pk):
# test the transaction appears only once
last_voted_block = b.get_last_voted_block()
assert len(last_voted_block.transactions) == 1
assert b.backend.count_blocks() == 2
assert count_blocks(b.connection) == 2
@pytest.mark.usefixtures('processes')
def test_double_create(b, user_pk):
from bigchaindb.models import Transaction
from bigchaindb.backend.query import count_blocks
tx = Transaction.create([b.me], [([user_pk], 1)],
metadata={'test': 'test'}) \
.sign([b.me_private])
@ -63,4 +65,4 @@ def test_double_create(b, user_pk):
# test the transaction appears only once
last_voted_block = b.get_last_voted_block()
assert len(last_voted_block.transactions) == 1
assert b.backend.count_blocks() == 2
assert count_blocks(b.connection) == 2

View File

@ -67,7 +67,7 @@ def test_write_block(b, user_pk):
block_doc = b.create_block(txs)
block_maker.write(block_doc)
expected = b.backend.get_block(block_doc.id)
expected = b.get_block(block_doc.id)
expected = Block.from_dict(expected)
assert expected == block_doc
@ -88,7 +88,7 @@ def test_duplicate_transaction(b, user_pk):
block_maker.write(block_doc)
# block is in bigchain
assert b.backend.get_block(block_doc.id) == block_doc.to_dict()
assert b.get_block(block_doc.id) == block_doc.to_dict()
b.write_transaction(txs[0])
@ -159,6 +159,7 @@ def test_start(create_pipeline):
def test_full_pipeline(b, user_pk):
import random
from bigchaindb.backend import query
from bigchaindb.models import Block, Transaction
from bigchaindb.pipelines.block import create_pipeline, get_changefeed
@ -172,7 +173,7 @@ def test_full_pipeline(b, user_pk):
b.write_transaction(tx)
assert b.backend.count_backlog() == 100
assert query.count_backlog(b.connection) == 100
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed(), outdata=outpipe)
@ -182,9 +183,9 @@ def test_full_pipeline(b, user_pk):
pipeline.terminate()
block_doc = outpipe.get()
chained_block = b.backend.get_block(block_doc.id)
chained_block = b.get_block(block_doc.id)
chained_block = Block.from_dict(chained_block)
block_len = len(block_doc.transactions)
assert chained_block == block_doc
assert b.backend.count_backlog() == 100 - block_len
assert query.count_backlog(b.connection) == 100 - block_len

View File

@ -136,6 +136,7 @@ def test_start(mock_start):
def test_full_pipeline(b, user_pk):
import random
from bigchaindb.backend import query
from bigchaindb.models import Transaction
outpipe = Pipe()
@ -177,8 +178,8 @@ def test_full_pipeline(b, user_pk):
# only transactions from the invalid block should be returned to
# the backlog
assert b.backend.count_backlog() == 100
assert query.count_backlog(b.connection) == 100
# NOTE: I'm still, I'm still tx from the block.
tx_from_block = set([tx.id for tx in invalid_block.transactions])
tx_from_backlog = set([tx['id'] for tx in list(b.backend.get_stale_transactions(0))])
tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))])
assert tx_from_block == tx_from_backlog

View File

@ -10,7 +10,7 @@ def test_get_stale(b, user_pk):
from bigchaindb.models import Transaction
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
b.write_transaction(tx, durability='hard')
b.write_transaction(tx)
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
@ -23,11 +23,12 @@ def test_get_stale(b, user_pk):
def test_reassign_transactions(b, user_pk):
from bigchaindb.backend import query
from bigchaindb.models import Transaction
# test with single node
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
b.write_transaction(tx, durability='hard')
b.write_transaction(tx)
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
@ -36,15 +37,15 @@ def test_reassign_transactions(b, user_pk):
# test with federation
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
b.write_transaction(tx, durability='hard')
b.write_transaction(tx)
stm = stale.StaleTransactionMonitor(timeout=0.001,
backlog_reassign_delay=0.001)
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
tx = list(b.backend.get_stale_transactions(0))[0]
tx = list(query.get_stale_transactions(b.connection, 0))[0]
stm.reassign_transactions(tx)
reassigned_tx = list(b.backend.get_stale_transactions(0))[0]
reassigned_tx = list(query.get_stale_transactions(b.connection, 0))[0]
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
assert reassigned_tx['assignee'] != tx['assignee']
@ -52,15 +53,16 @@ def test_reassign_transactions(b, user_pk):
tx = Transaction.create([b.me], [([user_pk], 1)])
tx = tx.sign([b.me_private])
stm.bigchain.nodes_except_me = ['lol']
b.write_transaction(tx, durability='hard')
b.write_transaction(tx)
stm.bigchain.nodes_except_me = None
tx = list(b.backend.get_stale_transactions(0))[0]
tx = list(query.get_stale_transactions(b.connection, 0))[0]
stm.reassign_transactions(tx)
assert tx['assignee'] != 'lol'
def test_full_pipeline(monkeypatch, user_pk):
from bigchaindb.backend import query
from bigchaindb.models import Transaction
CONFIG = {
'database': {
@ -87,7 +89,7 @@ def test_full_pipeline(monkeypatch, user_pk):
original_txc.append(tx.to_dict())
b.write_transaction(tx)
original_txs = list(b.backend.get_stale_transactions(0))
original_txs = list(query.get_stale_transactions(b.connection, 0))
original_txs = {tx['id']: tx for tx in original_txs}
assert len(original_txs) == 100
@ -111,14 +113,15 @@ def test_full_pipeline(monkeypatch, user_pk):
pipeline.terminate()
assert len(list(b.backend.get_stale_transactions(0))) == 100
reassigned_txs= list(b.backend.get_stale_transactions(0))
assert len(list(query.get_stale_transactions(b.connection, 0))) == 100
reassigned_txs = list(query.get_stale_transactions(b.connection, 0))
# check that every assignment timestamp has increased, and every tx has a new assignee
for reassigned_tx in reassigned_txs:
assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp']
assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee']
@patch.object(Pipeline, 'start')
def test_start(mock_start):
# TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`,

View File

@ -1,56 +1,67 @@
from unittest.mock import patch
import pytest
from unittest.mock import Mock
from multipipes import Pipe
from bigchaindb.db.utils import Connection
from bigchaindb import Bigchain
from bigchaindb.backend.connection import Connection
from bigchaindb.pipelines.utils import ChangeFeed
MOCK_CHANGEFEED_DATA = [{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}]
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_insert(mock_run):
@pytest.fixture
def mock_changefeed_bigchain(mock_changefeed_data):
connection = Connection()
connection.run = Mock(return_value=mock_changefeed_data)
return Bigchain(connection=connection)
def test_changefeed_insert(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT)
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_delete(mock_run):
def test_changefeed_delete(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE)
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_update(mock_run):
def test_changefeed_update(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_multiple_operations(mock_run):
def test_changefeed_multiple_operations(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE,
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
@ -58,10 +69,12 @@ def test_changefeed_multiple_operations(mock_run):
assert outpipe.qsize() == 0
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_prefeed(mock_run):
def test_changefeed_prefeed(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3])
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT,
prefeed=[1, 2, 3],
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -157,6 +157,7 @@ def test_vote_accumulates_transactions(b):
def test_valid_block_voting_sequential(b, monkeypatch):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.pipelines import vote
@ -169,7 +170,7 @@ def test_valid_block_voting_sequential(b, monkeypatch):
last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx))
vote_obj.write_vote(last_vote)
vote_rs = b.backend.get_votes_by_block_id_and_voter(block_id, b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me)
vote_doc = vote_rs.next()
assert vote_doc['vote'] == {'voting_for_block': block.id,
@ -185,6 +186,7 @@ def test_valid_block_voting_sequential(b, monkeypatch):
def test_valid_block_voting_multiprocessing(b, monkeypatch):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.pipelines import vote
@ -203,7 +205,7 @@ def test_valid_block_voting_multiprocessing(b, monkeypatch):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block.id,
@ -219,6 +221,7 @@ def test_valid_block_voting_multiprocessing(b, monkeypatch):
def test_valid_block_voting_with_create_transaction(b, monkeypatch):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.models import Transaction
from bigchaindb.pipelines import vote
@ -244,7 +247,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block.id,
@ -260,6 +263,7 @@ def test_valid_block_voting_with_create_transaction(b, monkeypatch):
def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.models import Transaction
from bigchaindb.pipelines import vote
@ -273,7 +277,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 1111111111)
block = b.create_block([tx])
b.write_block(block, durability='hard')
b.write_block(block)
# create a `TRANSFER` transaction
test_user2_priv, test_user2_pub = crypto.generate_key_pair()
@ -283,7 +287,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 2222222222)
block2 = b.create_block([tx2])
b.write_block(block2, durability='hard')
b.write_block(block2)
inpipe = Pipe()
outpipe = Pipe()
@ -299,7 +303,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
vote2_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block.id,
@ -313,7 +317,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
assert crypto.PublicKey(b.me).verify(serialized_vote,
vote_doc['signature']) is True
vote2_rs = b.backend.get_votes_by_block_id_and_voter(block2.id, b.me)
vote2_rs = query.get_votes_by_block_id_and_voter(b.connection, block2.id, b.me)
vote2_doc = vote2_rs.next()
assert vote2_out['vote'] == vote2_doc['vote']
assert vote2_doc['vote'] == {'voting_for_block': block2.id,
@ -329,6 +333,7 @@ def test_valid_block_voting_with_transfer_transactions(monkeypatch, b):
def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.models import Transaction
from bigchaindb.pipelines import vote
@ -350,7 +355,7 @@ def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block.id, b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block.id,
@ -366,6 +371,7 @@ def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk):
def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.models import Transaction
from bigchaindb.pipelines import vote
@ -389,7 +395,7 @@ def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block['id'],
@ -405,6 +411,7 @@ def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk):
def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.models import Transaction
from bigchaindb.pipelines import vote
@ -428,7 +435,7 @@ def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block['id'],
@ -444,6 +451,7 @@ def test_invalid_content_in_tx_in_block_voting(monkeypatch, b, user_pk):
def test_invalid_block_voting(monkeypatch, b, user_pk):
from bigchaindb.backend import query
from bigchaindb.common import crypto, util
from bigchaindb.pipelines import vote
@ -463,7 +471,7 @@ def test_invalid_block_voting(monkeypatch, b, user_pk):
vote_out = outpipe.get()
vote_pipeline.terminate()
vote_rs = b.backend.get_votes_by_block_id_and_voter(block['id'], b.me)
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
vote_doc = vote_rs.next()
assert vote_out['vote'] == vote_doc['vote']
assert vote_doc['vote'] == {'voting_for_block': block['id'],
@ -479,6 +487,7 @@ def test_invalid_block_voting(monkeypatch, b, user_pk):
def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.pipelines import vote
outpipe = Pipe()
@ -492,11 +501,11 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 2222222222)
block_1 = dummy_block(b)
block_ids.append(block_1.id)
b.write_block(block_1, durability='hard')
monkeypatch.setattr('time.time', lambda: 3333333333)
b.write_block(block_1)
block_2 = dummy_block(b)
block_ids.append(block_2.id)
b.write_block(block_2, durability='hard')
b.write_block(block_2)
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
@ -511,7 +520,7 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 4444444444)
block_3 = dummy_block(b)
block_ids.append(block_3.id)
b.write_block(block_3, durability='hard')
b.write_block(block_3)
# Same as before with the two `get`s
outpipe.get()
@ -519,13 +528,14 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
vote_pipeline.terminate()
# retrieve vote
votes = [list(b.backend.get_votes_by_block_id(_id))[0]
votes = [list(query.get_votes_by_block_id(b.connection, _id))[0]
for _id in block_ids]
assert all(vote['node_pubkey'] == b.me for vote in votes)
def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.pipelines import vote
outpipe = Pipe()
@ -537,12 +547,12 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 2222222222)
block_1 = dummy_block(b)
block_ids.append(block_1.id)
b.write_block(block_1, durability='hard')
b.write_block(block_1)
monkeypatch.setattr('time.time', lambda: 3333333333)
block_2 = dummy_block(b)
block_ids.append(block_2.id)
b.write_block(block_2, durability='hard')
b.write_block(block_2)
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
@ -558,7 +568,7 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
blocks = [b.get_block(_id) for _id in block_ids]
# retrieve votes
votes = [list(b.backend.get_votes_by_block_id(_id))[0]
votes = [list(query.get_votes_by_block_id(b.connection, _id))[0]
for _id in block_ids]
assert ({v['vote']['voting_for_block'] for v in votes} ==
@ -566,6 +576,7 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
def test_voter_checks_for_previous_vote(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.pipelines import vote
inpipe = Pipe()
@ -577,7 +588,7 @@ def test_voter_checks_for_previous_vote(monkeypatch, b):
monkeypatch.setattr('time.time', lambda: 2222222222)
block_1 = dummy_block(b)
inpipe.put(block_1.to_dict())
assert len(list(b.backend.get_votes_by_block_id(block_1.id))) == 0
assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 0
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
@ -600,8 +611,8 @@ def test_voter_checks_for_previous_vote(monkeypatch, b):
vote_pipeline.terminate()
assert len(list(b.backend.get_votes_by_block_id(block_1.id))) == 1
assert len(list(b.backend.get_votes_by_block_id(block_2.id))) == 1
assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 1
assert len(list(query.get_votes_by_block_id(b.connection, block_2.id))) == 1
@patch.object(Pipeline, 'start')

View File

@ -21,13 +21,13 @@ def mock_write_config(monkeypatch):
@pytest.fixture
def mock_db_init_with_existing_db(monkeypatch):
from bigchaindb import db
from bigchaindb import commands
from bigchaindb.common.exceptions import DatabaseAlreadyExists
def mockreturn():
raise DatabaseAlreadyExists
monkeypatch.setattr(db, 'init', mockreturn)
monkeypatch.setattr(commands.bigchain, '_run_init', mockreturn)
@pytest.fixture
@ -36,16 +36,6 @@ def mock_processes_start(monkeypatch):
monkeypatch.setattr(processes, 'start', lambda *args: None)
@pytest.fixture
def mock_rethink_db_drop(monkeypatch):
def mockreturn(dbname):
class MockDropped(object):
def run(self, conn):
return
return MockDropped()
monkeypatch.setattr('rethinkdb.db_drop', mockreturn)
@pytest.fixture
def mock_generate_key_pair(monkeypatch):
monkeypatch.setattr('bigchaindb.common.crypto.generate_key_pair', lambda: ('privkey', 'pubkey'))
@ -225,10 +215,33 @@ def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db):
run_init(args)
def test_drop_existing_db(mock_rethink_db_drop):
@patch('bigchaindb.backend.schema.drop_database')
def test_drop_db_when_assumed_yes(mock_db_drop):
from bigchaindb.commands.bigchain import run_drop
args = Namespace(config=None, yes=True)
run_drop(args)
assert mock_db_drop.called
@patch('bigchaindb.backend.schema.drop_database')
def test_drop_db_when_interactive_yes(mock_db_drop, monkeypatch):
from bigchaindb.commands.bigchain import run_drop
args = Namespace(config=None, yes=False)
monkeypatch.setattr('bigchaindb.commands.bigchain.input', lambda x: 'y')
run_drop(args)
assert mock_db_drop.called
@patch('bigchaindb.backend.schema.drop_database')
def test_drop_db_does_not_drop_when_interactive_no(mock_db_drop, monkeypatch):
from bigchaindb.commands.bigchain import run_drop
args = Namespace(config=None, yes=False)
monkeypatch.setattr('bigchaindb.commands.bigchain.input', lambda x: 'n')
run_drop(args)
assert not mock_db_drop.called
def test_run_configure_when_config_exists_and_skipping(monkeypatch):

View File

@ -113,7 +113,7 @@ def test_env_config(monkeypatch):
assert result == expected
def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
file_config = {
'database': {'host': 'test-host'},
'backlog_reassign_delay': 5
@ -136,6 +136,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
'threads': None,
},
'database': {
'backend': request.config.getoption('--database-backend'),
'host': 'test-host',
'port': 4242,
'name': 'test-dbname',

View File

@ -1,5 +1,3 @@
from collections import namedtuple
from rethinkdb.ast import RqlQuery
import pytest
@ -9,6 +7,7 @@ import pytest
def config(request, monkeypatch):
config = {
'database': {
'backend': request.config.getoption('--database-backend'),
'host': 'host',
'port': 28015,
'name': 'bigchain',
@ -30,10 +29,12 @@ def config(request, monkeypatch):
def test_bigchain_class_default_initialization(config):
from bigchaindb.core import Bigchain
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.backend.connection import Connection
bigchain = Bigchain()
assert bigchain.host == config['database']['host']
assert bigchain.port == config['database']['port']
assert bigchain.dbname == config['database']['name']
assert isinstance(bigchain.connection, Connection)
assert bigchain.connection.host == config['database']['host']
assert bigchain.connection.port == config['database']['port']
assert bigchain.connection.dbname == config['database']['name']
assert bigchain.me == config['keypair']['public']
assert bigchain.me_private == config['keypair']['private']
assert bigchain.nodes_except_me == config['keyring']
@ -42,19 +43,25 @@ def test_bigchain_class_default_initialization(config):
def test_bigchain_class_initialization_with_parameters(config):
from bigchaindb.core import Bigchain
from bigchaindb.backend import connect
from bigchaindb.consensus import BaseConsensusRules
init_kwargs = {
'host': 'some_node',
'port': '12345',
'dbname': 'atom',
'public_key': 'white',
'private_key': 'black',
'keyring': ['key_one', 'key_two'],
}
bigchain = Bigchain(**init_kwargs)
assert bigchain.host == init_kwargs['host']
assert bigchain.port == init_kwargs['port']
assert bigchain.dbname == init_kwargs['dbname']
init_db_kwargs = {
'backend': 'rethinkdb',
'host': 'this_is_the_db_host',
'port': 12345,
'name': 'this_is_the_db_name',
}
connection = connect(**init_db_kwargs)
bigchain = Bigchain(connection=connection, **init_kwargs)
assert bigchain.connection == connection
assert bigchain.connection.host == init_db_kwargs['host']
assert bigchain.connection.port == init_db_kwargs['port']
assert bigchain.connection.dbname == init_db_kwargs['name']
assert bigchain.me == init_kwargs['public_key']
assert bigchain.me_private == init_kwargs['private_key']
assert bigchain.nodes_except_me == init_kwargs['keyring']
@ -62,12 +69,12 @@ def test_bigchain_class_initialization_with_parameters(config):
def test_get_blocks_status_containing_tx(monkeypatch):
from bigchaindb.db.backends.rethinkdb import RethinkDBBackend
from bigchaindb.backend import query as backend_query
from bigchaindb.core import Bigchain
blocks = [
{'id': 1}, {'id': 2}
]
monkeypatch.setattr(RethinkDBBackend, 'get_blocks_status_from_transaction', lambda x: blocks)
monkeypatch.setattr(backend_query, 'get_blocks_status_from_transaction', lambda x: blocks)
monkeypatch.setattr(Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID)
bigchain = Bigchain(public_key='pubkey', private_key='privkey')
with pytest.raises(Exception):

View File

@ -3,11 +3,11 @@ import pytest
import rethinkdb as r
from bigchaindb.db.utils import Connection
from bigchaindb.backend import connect
def test_run_a_simple_query():
conn = Connection()
conn = connect()
query = r.expr('1')
assert conn.run(query) == '1'
@ -17,7 +17,7 @@ def test_raise_exception_when_max_tries():
def run(self, conn):
raise r.ReqlDriverError('mock')
conn = Connection()
conn = connect()
with pytest.raises(r.ReqlDriverError):
conn.run(MockQuery())
@ -30,7 +30,7 @@ def test_reconnect_when_connection_lost():
def raise_exception(*args, **kwargs):
raise r.ReqlDriverError('mock')
conn = Connection()
conn = connect()
original_connect = r.connect
r.connect = raise_exception
@ -75,7 +75,6 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
else:
time.sleep(10)
bigchain = Bigchain()
bigchain.connection = MockConnection()
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,