From d473350992abe3273f74098ef79eea95483a2312 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 20 Oct 2016 13:48:30 +0200 Subject: [PATCH 01/10] [wip] move calls to a separate file --- bigchaindb/db/queries.py | 139 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 bigchaindb/db/queries.py diff --git a/bigchaindb/db/queries.py b/bigchaindb/db/queries.py new file mode 100644 index 00000000..914e8ee7 --- /dev/null +++ b/bigchaindb/db/queries.py @@ -0,0 +1,139 @@ +from bigchaindb.db.utils import Connection + +class RethinkDBBackend: + + def __init__(self, host=None, port=None, dbname=None): + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.dbname = dbname or bigchaindb.config['database']['name'] + + @property + def conn(self): + if not self._conn: + self._conn = self.reconnect() + return self._conn + + def write_transaction(self, signed_transaction, durability='soft'): + # write to the backlog + response = self.connection.run( + r.table('backlog') + .insert(signed_transaction, durability=durability)) + + + def write_vote(self, vote): + """Write the vote to the database.""" + + self.connection.run( + r.table('votes') + .insert(vote)) + + def write_block(self, block, durability='soft'): + self.connection.run( + r.table('bigchain') + .insert(r.json(block.to_str()), durability=durability)) + + def create_genesis_block(self): + blocks_count = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .count()) + + + def get_transaction(self, txid, include_status=False): + if validity: + # Query the transaction in the target block and return + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(target_block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == txid))[0] + + else: + # Otherwise, check the backlog + response = self.connection.run(r.table('backlog') + .get(txid) + .without('assignee', 'assignment_timestamp') + .default(None)) + + def get_tx_by_payload_uuid(self, payload_uuid): + cursor = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(payload_uuid, index='payload_uuid') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid)) + + def get_spent(self, txid, cid): + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'] + .contains(lambda fulfillment: fulfillment['input'] == {'txid': txid, 'cid': cid}))) + + def get_owned_ids(self, owner): + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'] + .contains(lambda c: c['owners_after'] + .contains(owner)))) + + + + def get_last_voted_block(self): + """Returns the last block that this node voted on.""" + + try: + # get the latest value for the vote timestamp (over all votes) + max_timestamp = self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['node_pubkey'] == self.me) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] + + last_voted = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == self.me))) + + except r.ReqlNonExistenceError: + # return last vote if last vote exists else return Genesis block + res = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(util.is_genesis_block)) + block = list(res)[0] + return Block.from_dict(block) + + res = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(last_block_id)) + + def get_unvoted_blocks(self): + """Return all the blocks that have not been voted on by this node. + + Returns: + :obj:`list` of :obj:`dict`: a list of unvoted blocks + """ + + unvoted = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(lambda block: r.table('votes', read_mode=self.read_mode) + .get_all([block['id'], self.me], index='block_and_voter') + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) + + def block_election_status(self, block_id, voters): + """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" + + votes = self.connection.run(r.table('votes', read_mode=self.read_mode) + .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) + + + def search_block_election_on_index(self, value, index): + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(value, index=index) + .pluck('votes', 'id', {'block': ['voters']})) + + def has_previous_vote(self, block_id, voters): + votes = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .get_all([block_id, self.me], index='block_and_voter'))) From 815b4318ba06854e4b07af217709ea0086e97072 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 26 Oct 2016 17:48:53 +0200 Subject: [PATCH 02/10] Move calls to DB to specific backend module --- bigchaindb/core.py | 186 +++--------------- bigchaindb/db/backends/__init__.py | 0 bigchaindb/db/backends/rethinkdb.py | 257 +++++++++++++++++++++++++ bigchaindb/db/queries.py | 21 +- bigchaindb/db/utils.py | 12 ++ tests/pipelines/test_block_creation.py | 3 +- tests/test_core.py | 8 +- 7 files changed, 309 insertions(+), 178 deletions(-) create mode 100644 bigchaindb/db/backends/__init__.py create mode 100644 bigchaindb/db/backends/rethinkdb.py diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 2724080f..46ccd740 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -12,7 +12,7 @@ import rethinkdb as r import bigchaindb -from bigchaindb.db.utils import Connection +from bigchaindb.db.utils import Connection, get_backend from bigchaindb import config_utils, util from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Block, Transaction @@ -33,7 +33,7 @@ class Bigchain(object): # return if transaction is in backlog TX_IN_BACKLOG = 'backlog' - def __init__(self, host=None, port=None, dbname=None, + def __init__(self, host=None, port=None, dbname=None, backend=None, public_key=None, private_key=None, keyring=[], backlog_reassign_delay=None): """Initialize the Bigchain instance @@ -60,6 +60,7 @@ class Bigchain(object): self.host = host or bigchaindb.config['database']['host'] self.port = port or bigchaindb.config['database']['port'] self.dbname = dbname or bigchaindb.config['database']['name'] + self.backend = backend or get_backend() self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] @@ -102,10 +103,7 @@ class Bigchain(object): signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog - response = self.connection.run( - r.table('backlog') - .insert(signed_transaction, durability=durability)) - return response + return self.backend.write_transaction(signed_transaction) def reassign_transaction(self, transaction, durability='hard'): """Assign a transaction to a new node @@ -131,23 +129,18 @@ class Bigchain(object): # There is no other node to assign to new_assignee = self.me - response = self.connection.run( - r.table('backlog') - .get(transaction['id']) - .update({'assignee': new_assignee, 'assignment_timestamp': time()}, - durability=durability)) - return response + return self.backend.update_transaction( + transaction['id'], + {'assignee': new_assignee, 'assignment_timestamp': time()}) def get_stale_transactions(self): - """Get a RethinkDB cursor of stale transactions + """Get a cursor of stale transactions Transactions are considered stale if they have been assigned a node, but are still in the backlog after some amount of time specified in the configuration """ - return self.connection.run( - r.table('backlog') - .filter(lambda tx: time() - tx['assignment_timestamp'] > self.backlog_reassign_delay)) + return self.backend.get_stale_transactions(self.backlog_reassign_delay) def validate_transaction(self, transaction): """Validate a transaction. @@ -224,19 +217,12 @@ class Bigchain(object): break # Query the transaction in the target block and return - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(target_block_id) - .get_field('block') - .get_field('transactions') - .filter(lambda tx: tx['id'] == txid))[0] + response = self.backend.get_transaction_from_block(txid, target_block_id) else: # Otherwise, check the backlog - response = self.connection.run(r.table('backlog') - .get(txid) - .without('assignee', 'assignment_timestamp') - .default(None)) + response = self.backend.get_transaction_from_backlog(txid) + if response: tx_status = self.TX_IN_BACKLOG @@ -262,24 +248,6 @@ class Bigchain(object): _, status = self.get_transaction(txid, include_status=True) return status - def search_block_election_on_index(self, value, index): - """Retrieve block election information given a secondary index and value - - Args: - value: a value to search (e.g. transaction id string, payload hash string) - index (str): name of a secondary index, e.g. 'transaction_id' - - Returns: - :obj:`list` of :obj:`dict`: A list of blocks with with only election information - """ - # First, get information on all blocks which contain this transaction - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(value, index=index) - .pluck('votes', 'id', {'block': ['voters']})) - - return list(response) - def get_blocks_status_containing_tx(self, txid): """Retrieve block ids and statuses related to a transaction @@ -294,7 +262,7 @@ class Bigchain(object): """ # First, get information on all blocks which contain this transaction - blocks = self.search_block_election_on_index(txid, 'transaction_id') + blocks = self.backend.get_blocks_status_from_transaction(txid) if blocks: # Determine the election status of each block validity = { @@ -336,14 +304,8 @@ class Bigchain(object): A list of transactions containing that metadata. If no transaction exists with that metadata it returns an empty list `[]` """ - cursor = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(metadata_id, index='metadata_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id)) - - transactions = list(cursor) - return [Transaction.from_dict(tx) for tx in transactions] + cursor = self.backend.get_transactions_by_metadata_id(metadata_id) + return [Transaction.from_dict(tx) for tx in cursor] def get_txs_by_asset_id(self, asset_id): """Retrieves transactions related to a particular asset. @@ -358,12 +320,8 @@ class Bigchain(object): A list of transactions containing related to the asset. If no transaction exists for that asset it returns an empty list `[]` """ - cursor = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(asset_id, index='asset_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id)) + cursor = self.backend.get_transactions_by_asset_id(asset_id) return [Transaction.from_dict(tx) for tx in cursor] def get_spent(self, txid, cid): @@ -382,13 +340,7 @@ class Bigchain(object): """ # checks if an input was already spent # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['fulfillments'] - .contains(lambda fulfillment: fulfillment['input'] == {'txid': txid, 'cid': cid}))) - - transactions = list(response) + transactions = list(self.backend.get_spent(txid, cid)) # a transaction_id should have been spent at most one time if transactions: @@ -423,12 +375,7 @@ class Bigchain(object): """ # get all transactions in which owner is in the `owners_after` list - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda tx: tx['transaction']['conditions'] - .contains(lambda c: c['owners_after'] - .contains(owner)))) + response = self.backend.get_owned_ids(owner) owned = [] for tx in response: @@ -513,9 +460,7 @@ class Bigchain(object): but the vote is invalid. """ - votes = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .get_all([block_id, self.me], index='block_and_voter'))) + votes = list(self.backend.get_votes_by_block_id_and_voter(block_id, self.me)) if len(votes) > 1: raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' @@ -537,15 +482,10 @@ class Bigchain(object): block (Block): block to write to bigchain. """ - self.connection.run( - r.table('bigchain') - .insert(r.json(block.to_str()), durability=durability)) + self.backend.write_block(block.to_str(), durability=durability) def transaction_exists(self, transaction_id): - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode)\ - .get_all(transaction_id, index='transaction_id')) - return len(response.items) > 0 + self.backend.has_transaction(transaction_id) def prepare_genesis_block(self): """Prepare a genesis block.""" @@ -574,9 +514,7 @@ class Bigchain(object): # 2. create the block with one transaction # 3. write the block to the bigchain - blocks_count = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .count()) + blocks_count = self.backend.count_blocks() if blocks_count: raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') @@ -621,69 +559,12 @@ class Bigchain(object): def write_vote(self, vote): """Write the vote to the database.""" - - self.connection.run( - r.table('votes') - .insert(vote)) + return self.backend.write_vote(vote) def get_last_voted_block(self): """Returns the last block that this node voted on.""" - try: - # get the latest value for the vote timestamp (over all votes) - max_timestamp = self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['node_pubkey'] == self.me) - .max(r.row['vote']['timestamp']))['vote']['timestamp'] - - last_voted = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['vote']['timestamp'] == max_timestamp) - .filter(r.row['node_pubkey'] == self.me))) - - except r.ReqlNonExistenceError: - # return last vote if last vote exists else return Genesis block - res = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block)) - block = list(res)[0] - return Block.from_dict(block) - - # Now the fun starts. Since the resolution of timestamp is a second, - # we might have more than one vote per timestamp. If this is the case - # then we need to rebuild the chain for the blocks that have been retrieved - # to get the last one. - - # Given a block_id, mapping returns the id of the block pointing at it. - mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] - for v in last_voted} - - # Since we follow the chain backwards, we can start from a random - # point of the chain and "move up" from it. - last_block_id = list(mapping.values())[0] - - # We must be sure to break the infinite loop. This happens when: - # - the block we are currenty iterating is the one we are looking for. - # This will trigger a KeyError, breaking the loop - # - we are visiting again a node we already explored, hence there is - # a loop. This might happen if a vote points both `previous_block` - # and `voting_for_block` to the same `block_id` - explored = set() - - while True: - try: - if last_block_id in explored: - raise exceptions.CyclicBlockchainError() - explored.add(last_block_id) - last_block_id = mapping[last_block_id] - except KeyError: - break - - res = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(last_block_id)) - - return Block.from_dict(res) + return Block.from_dict(self.backend.get_last_voted_block(self.me)) def get_unvoted_blocks(self): """Return all the blocks that have not been voted on by this node. @@ -692,26 +573,13 @@ class Bigchain(object): :obj:`list` of :obj:`dict`: a list of unvoted blocks """ - unvoted = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(lambda block: r.table('votes', read_mode=self.read_mode) - .get_all([block['id'], self.me], index='block_and_voter') - .is_empty()) - .order_by(r.asc(r.row['block']['timestamp']))) - - # FIXME: I (@vrde) don't like this solution. Filtering should be done at a - # database level. Solving issue #444 can help untangling the situation - unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) - return unvoted_blocks + # XXX: should this return instaces of Block? + return self.backend.get_unvoted_blocks(self.me) def block_election_status(self, block_id, voters): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - votes = self.connection.run(r.table('votes', read_mode=self.read_mode) - .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) - - votes = list(votes) - + votes = list(self.backend.get_votes_by_block_id(block_id)) n_voters = len(voters) voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes]) diff --git a/bigchaindb/db/backends/__init__.py b/bigchaindb/db/backends/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bigchaindb/db/backends/rethinkdb.py b/bigchaindb/db/backends/rethinkdb.py new file mode 100644 index 00000000..83aa0279 --- /dev/null +++ b/bigchaindb/db/backends/rethinkdb.py @@ -0,0 +1,257 @@ +"""Backend implementation for RethinkDB. + +This module contains all the methods to store and retrieve data from RethinkDB. +""" + +from time import time + +import rethinkdb as r + +from bigchaindb import util +from bigchaindb.db.utils import Connection +from bigchaindb.common import exceptions + + +class RethinkDBBackend: + + def __init__(self, host=None, port=None, db=None): + self.read_mode = 'majority' + self.durability = 'soft' + self.connection = Connection(host=host, port=port, db=db) + + def write_transaction(self, signed_transaction): + """Write a transaction to the backlog table. + + Args: + signed_transaction (dict): a signed transaction. + + Returns: + The result of the operation. + """ + + return self.connection.run( + r.table('backlog') + .insert(signed_transaction, durability=self.durability)) + + def update_transaction(self, transaction_id, doc): + """Update a transaction in the backlog table. + + Args: + transaction_id (str): the id of the transaction. + doc (dict): the values to update. + + Returns: + The result of the operation. + """ + + return self.connection.run( + r.table('backlog') + .get(transaction_id) + .update(doc)) + + def get_stale_transactions(self, reassign_delay): + """Get a cursor of stale transactions. + + Transactions are considered stale if they have been assigned a node, + but are still in the backlog after some amount of time specified in the + configuration. + + Args: + reassign_delay (int): threshold (in seconds) to mark a transaction stale. + + Returns: + A cursor of transactions. + """ + + return self.connection.run( + r.table('backlog') + .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) + + def get_transaction_from_block(self, transaction_id, block_id): + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == transaction_id))[0] + + def get_transaction_from_backlog(self, transaction_id): + return self.connection.run( + r.table('backlog') + .get(transaction_id) + .without('assignee', 'assignment_timestamp') + .default(None)) + + def get_blocks_status_from_transaction(self, transaction_id): + """Retrieve block election information given a secondary index and value + + Args: + value: a value to search (e.g. transaction id string, payload hash string) + index (str): name of a secondary index, e.g. 'transaction_id' + + Returns: + :obj:`list` of :obj:`dict`: A list of blocks with with only election information + """ + + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(transaction_id, index='transaction_id') + .pluck('votes', 'id', {'block': ['voters']})) + + def get_transactions_by_metadata_id(self, metadata_id): + """Retrieves transactions related to a metadata. + + When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic + dict that contains extra information that can be appended to the transaction. + + To make it easy to query the bigchain for that particular metadata we create a UUID for the metadata and + store it with the transaction. + + Args: + metadata_id (str): the id for this particular metadata. + + Returns: + A list of transactions containing that metadata. If no transaction exists with that metadata it + returns an empty list `[]` + """ + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(metadata_id, index='metadata_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id)) + + def get_transactions_by_asset_id(self, asset_id): + """Retrieves transactions related to a particular asset. + + A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions + related to a particular digital asset, knowing the id. + + Args: + asset_id (str): the id for this particular metadata. + + Returns: + A list of transactions containing related to the asset. If no transaction exists for that asset it + returns an empty list `[]` + """ + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(asset_id, index='asset_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id)) + + def get_spent(self, transaction_id, condition_id): + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'].contains( + lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) + + def get_owned_ids(self, owner): + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'].contains( + lambda c: c['owners_after'].contains(owner)))) + + def get_votes_by_block_id(self, block_id): + return self.connection.run( + r.table('votes', read_mode=self.read_mode) + .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) + + def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): + return self.connection.run( + r.table('votes', read_mode=self.read_mode) + .get_all([block_id, node_pubkey], index='block_and_voter')) + + def write_block(self, block, durability='soft'): + return self.connection.run( + r.table('bigchain') + .insert(r.json(block), durability=durability)) + + def has_transaction(self, transaction_id): + return bool(self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(transaction_id, index='transaction_id').count())) + + def count_blocks(self): + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .count()) + + def write_vote(self, vote): + return self.connection.run( + r.table('votes') + .insert(vote)) + + def get_last_voted_block(self, node_pubkey): + try: + # get the latest value for the vote timestamp (over all votes) + max_timestamp = self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['node_pubkey'] == node_pubkey) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] + + last_voted = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == node_pubkey))) + + except r.ReqlNonExistenceError: + # return last vote if last vote exists else return Genesis block + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(util.is_genesis_block) + .nth(0)) + + # Now the fun starts. Since the resolution of timestamp is a second, + # we might have more than one vote per timestamp. If this is the case + # then we need to rebuild the chain for the blocks that have been retrieved + # to get the last one. + + # Given a block_id, mapping returns the id of the block pointing at it. + mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] + for v in last_voted} + + # Since we follow the chain backwards, we can start from a random + # point of the chain and "move up" from it. + last_block_id = list(mapping.values())[0] + + # We must be sure to break the infinite loop. This happens when: + # - the block we are currenty iterating is the one we are looking for. + # This will trigger a KeyError, breaking the loop + # - we are visiting again a node we already explored, hence there is + # a loop. This might happen if a vote points both `previous_block` + # and `voting_for_block` to the same `block_id` + explored = set() + + while True: + try: + if last_block_id in explored: + raise exceptions.CyclicBlockchainError() + explored.add(last_block_id) + last_block_id = mapping[last_block_id] + except KeyError: + break + + return self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(last_block_id)) + + def get_unvoted_blocks(self, node_pubkey): + """Return all the blocks that have not been voted on by this node. + + Returns: + :obj:`list` of :obj:`dict`: a list of unvoted blocks + """ + + unvoted = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(lambda block: r.table('votes', read_mode=self.read_mode) + .get_all([block['id'], node_pubkey], index='block_and_voter') + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) + + # FIXME: I (@vrde) don't like this solution. Filtering should be done at a + # database level. Solving issue #444 can help untangling the situation + unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) + return unvoted_blocks diff --git a/bigchaindb/db/queries.py b/bigchaindb/db/queries.py index 914e8ee7..6bdd8da9 100644 --- a/bigchaindb/db/queries.py +++ b/bigchaindb/db/queries.py @@ -1,17 +1,13 @@ from bigchaindb.db.utils import Connection + class RethinkDBBackend: def __init__(self, host=None, port=None, dbname=None): self.host = host or bigchaindb.config['database']['host'] self.port = port or bigchaindb.config['database']['port'] self.dbname = dbname or bigchaindb.config['database']['name'] - - @property - def conn(self): - if not self._conn: - self._conn = self.reconnect() - return self._conn + self.connection = Connection(host=self.host, port=self.port, db=self.dbname) def write_transaction(self, signed_transaction, durability='soft'): # write to the backlog @@ -20,30 +16,29 @@ class RethinkDBBackend: .insert(signed_transaction, durability=durability)) - def write_vote(self, vote): + def write_vote(self, vote, durability='soft'): """Write the vote to the database.""" self.connection.run( r.table('votes') - .insert(vote)) + .insert(vote, durability=durability)) def write_block(self, block, durability='soft'): self.connection.run( r.table('bigchain') .insert(r.json(block.to_str()), durability=durability)) - def create_genesis_block(self): - blocks_count = self.connection.run( + def count_blocks(self): + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .count()) - - def get_transaction(self, txid, include_status=False): + def get_transaction(self, txid, block_id): if validity: # Query the transaction in the target block and return response = self.connection.run( r.table('bigchain', read_mode=self.read_mode) - .get(target_block_id) + .get(block_id) .get_field('block') .get_field('transactions') .filter(lambda tx: tx['id'] == txid))[0] diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 92e0fdd3..41afe067 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -67,6 +67,18 @@ class Connection: time.sleep(2**i) +def get_backend(): + '''Get a backend instance.''' + + from bigchaindb.db.backends import rethinkdb + + # NOTE: this function will be re-implemented when we have real + # multiple backends to support. Right now it returns the RethinkDB one. + return rethinkdb.RethinkDBBackend(host=bigchaindb.config['database']['host'], + port=bigchaindb.config['database']['port'], + db=bigchaindb.config['database']['name']) + + def get_conn(): '''Get the connection to the database.''' diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index c2403a08..741d482a 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -97,8 +97,7 @@ def test_duplicate_transaction(b, user_vk): # verify tx is in the backlog assert b.connection.run(r.table('backlog').get(txs[0].id)) is not None - # try to validate a transaction that's already in the chain; should not - # work + # try to validate a transaction that's already in the chain; should not work assert block_maker.validate_tx(txs[0].to_dict()) is None # duplicate tx should be removed from backlog diff --git a/tests/test_core.py b/tests/test_core.py index 84cdba08..a9c3dd8d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -62,14 +62,13 @@ def test_bigchain_class_initialization_with_parameters(config): def test_get_blocks_status_containing_tx(monkeypatch): + from bigchaindb.db.backends.rethinkdb import RethinkDBBackend from bigchaindb.core import Bigchain blocks = [ {'id': 1}, {'id': 2} ] - monkeypatch.setattr( - Bigchain, 'search_block_election_on_index', lambda x, y: blocks) - monkeypatch.setattr( - Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID) + monkeypatch.setattr(RethinkDBBackend, 'get_blocks_status_from_transaction', lambda x: blocks) + monkeypatch.setattr(Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID) bigchain = Bigchain(public_key='pubkey', private_key='privkey') with pytest.raises(Exception): bigchain.get_blocks_status_containing_tx('txid') @@ -85,6 +84,7 @@ def test_has_previous_vote(monkeypatch): bigchain.has_previous_vote(block) +@pytest.mark.skipif(reason='meh') @pytest.mark.parametrize('items,exists', (((0,), True), ((), False))) def test_transaction_exists(monkeypatch, items, exists): from bigchaindb.core import Bigchain From 13b5d8eab92de2693ad9a914eb6e68cd129b8031 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 27 Oct 2016 11:25:33 +0200 Subject: [PATCH 03/10] Fix tests, alles is green now yay --- bigchaindb/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 46ccd740..ee18bcb8 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -482,10 +482,10 @@ class Bigchain(object): block (Block): block to write to bigchain. """ - self.backend.write_block(block.to_str(), durability=durability) + return self.backend.write_block(block.to_str(), durability=durability) def transaction_exists(self, transaction_id): - self.backend.has_transaction(transaction_id) + return self.backend.has_transaction(transaction_id) def prepare_genesis_block(self): """Prepare a genesis block.""" From f67a8d94edbdaa41a25e70b958c164f75c5503fd Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 27 Oct 2016 15:01:09 +0200 Subject: [PATCH 04/10] Add docstrings and fix pipelines --- bigchaindb/core.py | 14 +++- bigchaindb/db/backends/rethinkdb.py | 124 +++++++++++++++++++++++++++- bigchaindb/pipelines/block.py | 15 +--- 3 files changed, 139 insertions(+), 14 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index ee18bcb8..5e45a89d 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -133,8 +133,20 @@ class Bigchain(object): transaction['id'], {'assignee': new_assignee, 'assignment_timestamp': time()}) + def delete_transaction(self, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete + + Returns: + The database response. + """ + + return self.backend.delete_transaction(*transaction_id) + def get_stale_transactions(self): - """Get a cursor of stale transactions + """Get a cursor of stale transactions. Transactions are considered stale if they have been assigned a node, but are still in the backlog after some amount of time specified in the configuration diff --git a/bigchaindb/db/backends/rethinkdb.py b/bigchaindb/db/backends/rethinkdb.py index 83aa0279..0fb84741 100644 --- a/bigchaindb/db/backends/rethinkdb.py +++ b/bigchaindb/db/backends/rethinkdb.py @@ -15,6 +15,14 @@ from bigchaindb.common import exceptions class RethinkDBBackend: def __init__(self, host=None, port=None, db=None): + """Initialize a new RethinkDB Backend instance. + + Args: + host (str): the host to connect to. + port (int): the port to connect to. + db (str): the name of the database to use. + """ + self.read_mode = 'majority' self.durability = 'soft' self.connection = Connection(host=host, port=port, db=db) @@ -49,6 +57,21 @@ class RethinkDBBackend: .get(transaction_id) .update(doc)) + def delete_transaction(self, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete + + Returns: + The database response. + """ + + return self.connection.run( + r.table('backlog') + .get_all(*transaction_id) + .delete(durability='hard')) + def get_stale_transactions(self, reassign_delay): """Get a cursor of stale transactions. @@ -68,6 +91,15 @@ class RethinkDBBackend: .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) def get_transaction_from_block(self, transaction_id, block_id): + """Get a transaction from a specific block. + + Args: + transaction_id (str): the id of the transaction. + block_id (str): the id of the block. + + Returns: + The matching transaction. + """ return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get(block_id) @@ -76,6 +108,14 @@ class RethinkDBBackend: .filter(lambda tx: tx['id'] == transaction_id))[0] def get_transaction_from_backlog(self, transaction_id): + """Get a transaction from backlog. + + Args: + transaction_id (str): the id of the transaction. + + Returns: + The matching transaction. + """ return self.connection.run( r.table('backlog') .get(transaction_id) @@ -133,6 +173,7 @@ class RethinkDBBackend: A list of transactions containing related to the asset. If no transaction exists for that asset it returns an empty list `[]` """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get_all(asset_id, index='asset_id') @@ -140,6 +181,19 @@ class RethinkDBBackend: .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id)) def get_spent(self, transaction_id, condition_id): + """Check if a `txid` was already used as an input. + + A transaction can be used as an input for another transaction. Bigchain needs to make sure that a + given `txid` is only used once. + + Args: + transaction_id (str): The id of the transaction. + condition_id (int): The index of the condition in the respective transaction. + + Returns: + The transaction that used the `txid` as an input else `None` + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) @@ -147,6 +201,15 @@ class RethinkDBBackend: lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) def get_owned_ids(self, owner): + """Retrieve a list of `txids` that can we used has inputs. + + Args: + owner (str): base58 encoded public key. + + Returns: + A cursor for the matching transactions. + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) @@ -154,36 +217,92 @@ class RethinkDBBackend: lambda c: c['owners_after'].contains(owner)))) def get_votes_by_block_id(self, block_id): + """Get all the votes casted for a specific block. + + Args: + block_id (str): the block id to use. + + Returns: + A cursor for the matching votes. + """ return self.connection.run( r.table('votes', read_mode=self.read_mode) .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): + """Get all the votes casted for a specific block by a specific voter. + + Args: + block_id (str): the block id to use. + node_pubkey (str): base58 encoded public key + + Returns: + A cursor for the matching votes. + """ return self.connection.run( r.table('votes', read_mode=self.read_mode) .get_all([block_id, node_pubkey], index='block_and_voter')) def write_block(self, block, durability='soft'): + """Write a block to the bigchain table. + + Args: + block (dict): the block to write. + + Returns: + The database response. + """ return self.connection.run( r.table('bigchain') .insert(r.json(block), durability=durability)) def has_transaction(self, transaction_id): + """Check if a transaction exists in the bigchain table. + + Args: + transaction_id (str): the id of the transaction to check. + + Returns: + ``True`` if the transaction exists, ``False`` otherwise. + """ return bool(self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get_all(transaction_id, index='transaction_id').count())) def count_blocks(self): + """Count the number of blocks in the bigchain table. + + Returns: + The number of blocks. + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .count()) def write_vote(self, vote): + """Write a vote to the votes table. + + Args: + vote (dict): the vote to write. + + Returns: + The database response. + """ return self.connection.run( r.table('votes') .insert(vote)) def get_last_voted_block(self, node_pubkey): + """Get the last voted block for a specific node. + + Args: + node_pubkey (str): base58 encoded public key. + + Returns: + The last block the node has voted on. If the node didn't cast + any vote then the genesis block is returned. + """ try: # get the latest value for the vote timestamp (over all votes) max_timestamp = self.connection.run( @@ -238,7 +357,10 @@ class RethinkDBBackend: .get(last_block_id)) def get_unvoted_blocks(self, node_pubkey): - """Return all the blocks that have not been voted on by this node. + """Return all the blocks that have not been voted by the specified node. + + Args: + node_pubkey (str): base58 encoded public key Returns: :obj:`list` of :obj:`dict`: a list of unvoted blocks diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0d5e24b2..4142b234 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -69,10 +69,7 @@ class BlockPipeline: # if the tx is already in a valid or undecided block, # then it no longer should be in the backlog, or added # to a new block. We can delete and drop it. - self.bigchain.connection.run( - r.table('backlog') - .get(tx.id) - .delete(durability='hard')) + self.bigchain.delete_transaction(tx.id) return None tx_validated = self.bigchain.is_valid_transaction(tx) @@ -81,10 +78,7 @@ class BlockPipeline: else: # if the transaction is not valid, remove it from the # backlog - self.bigchain.connection.run( - r.table('backlog') - .get(tx.id) - .delete(durability='hard')) + self.bigchain.delete_transaction(tx.id) return None def create(self, tx, timeout=False): @@ -136,10 +130,7 @@ class BlockPipeline: Returns: :class:`~bigchaindb.models.Block`: The block. """ - self.bigchain.connection.run( - r.table('backlog') - .get_all(*[tx.id for tx in block.transactions]) - .delete(durability='hard')) + self.bigchain.delete_transaction(*[tx.id for tx in block.transactions]) return block From 4585b7b5aeab8baba0edbe8147ee281e76cc619f Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 27 Oct 2016 15:47:53 +0200 Subject: [PATCH 05/10] Remove temp file --- bigchaindb/db/queries.py | 134 --------------------------------------- 1 file changed, 134 deletions(-) delete mode 100644 bigchaindb/db/queries.py diff --git a/bigchaindb/db/queries.py b/bigchaindb/db/queries.py deleted file mode 100644 index 6bdd8da9..00000000 --- a/bigchaindb/db/queries.py +++ /dev/null @@ -1,134 +0,0 @@ -from bigchaindb.db.utils import Connection - - -class RethinkDBBackend: - - def __init__(self, host=None, port=None, dbname=None): - self.host = host or bigchaindb.config['database']['host'] - self.port = port or bigchaindb.config['database']['port'] - self.dbname = dbname or bigchaindb.config['database']['name'] - self.connection = Connection(host=self.host, port=self.port, db=self.dbname) - - def write_transaction(self, signed_transaction, durability='soft'): - # write to the backlog - response = self.connection.run( - r.table('backlog') - .insert(signed_transaction, durability=durability)) - - - def write_vote(self, vote, durability='soft'): - """Write the vote to the database.""" - - self.connection.run( - r.table('votes') - .insert(vote, durability=durability)) - - def write_block(self, block, durability='soft'): - self.connection.run( - r.table('bigchain') - .insert(r.json(block.to_str()), durability=durability)) - - def count_blocks(self): - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .count()) - - def get_transaction(self, txid, block_id): - if validity: - # Query the transaction in the target block and return - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(block_id) - .get_field('block') - .get_field('transactions') - .filter(lambda tx: tx['id'] == txid))[0] - - else: - # Otherwise, check the backlog - response = self.connection.run(r.table('backlog') - .get(txid) - .without('assignee', 'assignment_timestamp') - .default(None)) - - def get_tx_by_payload_uuid(self, payload_uuid): - cursor = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(payload_uuid, index='payload_uuid') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid)) - - def get_spent(self, txid, cid): - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['fulfillments'] - .contains(lambda fulfillment: fulfillment['input'] == {'txid': txid, 'cid': cid}))) - - def get_owned_ids(self, owner): - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda tx: tx['transaction']['conditions'] - .contains(lambda c: c['owners_after'] - .contains(owner)))) - - - - def get_last_voted_block(self): - """Returns the last block that this node voted on.""" - - try: - # get the latest value for the vote timestamp (over all votes) - max_timestamp = self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['node_pubkey'] == self.me) - .max(r.row['vote']['timestamp']))['vote']['timestamp'] - - last_voted = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['vote']['timestamp'] == max_timestamp) - .filter(r.row['node_pubkey'] == self.me))) - - except r.ReqlNonExistenceError: - # return last vote if last vote exists else return Genesis block - res = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block)) - block = list(res)[0] - return Block.from_dict(block) - - res = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(last_block_id)) - - def get_unvoted_blocks(self): - """Return all the blocks that have not been voted on by this node. - - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ - - unvoted = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(lambda block: r.table('votes', read_mode=self.read_mode) - .get_all([block['id'], self.me], index='block_and_voter') - .is_empty()) - .order_by(r.asc(r.row['block']['timestamp']))) - - def block_election_status(self, block_id, voters): - """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - - votes = self.connection.run(r.table('votes', read_mode=self.read_mode) - .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) - - - def search_block_election_on_index(self, value, index): - response = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(value, index=index) - .pluck('votes', 'id', {'block': ['voters']})) - - def has_previous_vote(self, block_id, voters): - votes = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .get_all([block_id, self.me], index='block_and_voter'))) From 40ba9d8c6ad76fdd6406a80a7dd5537dfaed3bb5 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 27 Oct 2016 16:50:30 +0200 Subject: [PATCH 06/10] Fix test_transaction_exists --- tests/test_core.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index a9c3dd8d..55d73e77 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -84,11 +84,9 @@ def test_has_previous_vote(monkeypatch): bigchain.has_previous_vote(block) -@pytest.mark.skipif(reason='meh') -@pytest.mark.parametrize('items,exists', (((0,), True), ((), False))) -def test_transaction_exists(monkeypatch, items, exists): +@pytest.mark.parametrize('count,exists', ((1, True), (0, False))) +def test_transaction_exists(monkeypatch, count, exists): from bigchaindb.core import Bigchain - monkeypatch.setattr( - RqlQuery, 'run', lambda x, y: namedtuple('response', 'items')(items)) + monkeypatch.setattr(RqlQuery, 'run', lambda x, y: count) bigchain = Bigchain(public_key='pubkey', private_key='privkey') assert bigchain.transaction_exists('txid') is exists From f4454b3133834af6b3bcbf947278730f1fb517cd Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 28 Oct 2016 10:49:39 +0200 Subject: [PATCH 07/10] Fix docstring --- bigchaindb/core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 5e45a89d..865bb9bb 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -51,6 +51,8 @@ class Bigchain(object): host (str): hostname where RethinkDB is running. port (int): port in which RethinkDB is running (usually 28015). dbname (str): the name of the database to connect to (usually bigchain). + backend (:class:`~bigchaindb.db.backends.rethinkdb.RehinkDBBackend`): + the database backend to use. public_key (str): the base58 encoded public key for the ED25519 curve. private_key (str): the base58 encoded private key for the ED25519 curve. keyring (list[str]): list of base58 encoded public keys of the federation nodes. From c67402986375a6fb7cacd0414cbd8c7904af999a Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 28 Oct 2016 11:27:06 +0200 Subject: [PATCH 08/10] Remove useless param --- bigchaindb/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 865bb9bb..90b06170 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -107,7 +107,7 @@ class Bigchain(object): # write to the backlog return self.backend.write_transaction(signed_transaction) - def reassign_transaction(self, transaction, durability='hard'): + def reassign_transaction(self, transaction): """Assign a transaction to a new node Args: From 14bf1fff5e0383d826e2f331bec393f3e97e3a88 Mon Sep 17 00:00:00 2001 From: vrde Date: Fri, 28 Oct 2016 11:41:38 +0200 Subject: [PATCH 09/10] Add todos --- bigchaindb/db/backends/rethinkdb.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigchaindb/db/backends/rethinkdb.py b/bigchaindb/db/backends/rethinkdb.py index 0fb84741..c8433dca 100644 --- a/bigchaindb/db/backends/rethinkdb.py +++ b/bigchaindb/db/backends/rethinkdb.py @@ -194,6 +194,7 @@ class RethinkDBBackend: The transaction that used the `txid` as an input else `None` """ + # TODO: use index! return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) @@ -210,6 +211,7 @@ class RethinkDBBackend: A cursor for the matching transactions. """ + # TODO: use index! return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) From 51db5ab1909cc616975d5554b148a32acfb4dd3c Mon Sep 17 00:00:00 2001 From: vrde Date: Mon, 31 Oct 2016 15:08:53 +0100 Subject: [PATCH 10/10] Pass db params to get_backend --- bigchaindb/core.py | 2 +- bigchaindb/db/utils.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 90b06170..049a80eb 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -62,7 +62,7 @@ class Bigchain(object): self.host = host or bigchaindb.config['database']['host'] self.port = port or bigchaindb.config['database']['port'] self.dbname = dbname or bigchaindb.config['database']['name'] - self.backend = backend or get_backend() + self.backend = backend or get_backend(host, port, dbname) self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 41afe067..7b2939e6 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -67,16 +67,16 @@ class Connection: time.sleep(2**i) -def get_backend(): +def get_backend(host=None, port=None, db=None): '''Get a backend instance.''' from bigchaindb.db.backends import rethinkdb # NOTE: this function will be re-implemented when we have real # multiple backends to support. Right now it returns the RethinkDB one. - return rethinkdb.RethinkDBBackend(host=bigchaindb.config['database']['host'], - port=bigchaindb.config['database']['port'], - db=bigchaindb.config['database']['name']) + return rethinkdb.RethinkDBBackend(host=host or bigchaindb.config['database']['host'], + port=port or bigchaindb.config['database']['port'], + db=db or bigchaindb.config['database']['name']) def get_conn():