From dabb81ac9817b48dbc6fbb45d72460cbe5059b39 Mon Sep 17 00:00:00 2001 From: Brett Sun Date: Fri, 2 Dec 2016 14:44:27 +0100 Subject: [PATCH] Replace cherrypicked class-based architecture with singledispatch --- bigchaindb/backend/__init__.py | 5 +- bigchaindb/backend/changefeed.py | 1 + bigchaindb/backend/query.py | 379 +++++----- bigchaindb/backend/rethinkdb/connection.py | 23 +- bigchaindb/backend/rethinkdb/query.py | 834 +++++++++++---------- bigchaindb/backend/rethinkdb/schema.py | 147 ++-- bigchaindb/backend/schema.py | 28 +- 7 files changed, 731 insertions(+), 686 deletions(-) diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py index e46d1073..d8a7540d 100644 --- a/bigchaindb/backend/__init__.py +++ b/bigchaindb/backend/__init__.py @@ -1,4 +1 @@ -from bigchaindb.db.factory import get_backend_factory -from bigchaindb.db.query import Query -from bigchaindb.db.schema import Schema -from bigchaindb.db.connection import Connection +from bigchaindb.backend.connection import Connection # noqa diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index e69de29b..a3d9afd7 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -0,0 +1 @@ +"""Changefeed interfaces for backend databases""" diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 2778bb35..79e1ba28 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -1,254 +1,287 @@ +"""Query interfaces for backend databases""" -"""Interface to query the database. +from functools import singledispatch -This module contains all the methods to store and retrieve data from a generic database. -""" +@singledispatch +def write_transaction(connection, signed_transaction): + """Write a transaction to the backlog table. -class Query: + Args: + signed_transaction (dict): a signed transaction. - def write_transaction(self, signed_transaction): - """Write a transaction to the backlog table. + Returns: + The result of the operation. + """ + raise NotImplementedError() - Args: - signed_transaction (dict): a signed transaction. - Returns: - The result of the operation. - """ - raise NotImplementedError() +@singledispatch +def update_transaction(connection, transaction_id, doc): + """Update a transaction in the backlog table. - def update_transaction(self, transaction_id, doc): - """Update a transaction in the backlog table. + Args: + transaction_id (str): the id of the transaction. + doc (dict): the values to update. - Args: - transaction_id (str): the id of the transaction. - doc (dict): the values to update. + Returns: + The result of the operation. + """ + raise NotImplementedError() - Returns: - The result of the operation. - """ - raise NotImplementedError() - def delete_transaction(self, *transaction_id): - """Delete a transaction from the backlog. +@singledispatch +def delete_transaction(connection, *transaction_id): + """Delete a transaction from the backlog. - Args: - *transaction_id (str): the transaction(s) to delete + Args: + *transaction_id (str): the transaction(s) to delete - Returns: - The database response. - """ - raise NotImplementedError() + Returns: + The database response. + """ + raise NotImplementedError() - def get_stale_transactions(self, reassign_delay): - """Get a cursor of stale transactions. - Transactions are considered stale if they have been assigned a node, - but are still in the backlog after some amount of time specified in the - configuration. +@singledispatch +def get_stale_transactions(connection, reassign_delay): + """Get a cursor of stale transactions. - Args: - reassign_delay (int): threshold (in seconds) to mark a transaction stale. + Transactions are considered stale if they have been assigned a node, + but are still in the backlog after some amount of time specified in the + configuration. - Returns: - A cursor of transactions. - """ + Args: + reassign_delay (int): threshold (in seconds) to mark a transaction stale. - raise NotImplementedError() + Returns: + A cursor of transactions. + """ - def get_transaction_from_block(self, transaction_id, block_id): - """Get a transaction from a specific block. + raise NotImplementedError() - Args: - transaction_id (str): the id of the transaction. - block_id (str): the id of the block. - Returns: - The matching transaction. - """ +@singledispatch +def get_transaction_from_block(connection, transaction_id, block_id): + """Get a transaction from a specific block. - raise NotImplementedError() + Args: + transaction_id (str): the id of the transaction. + block_id (str): the id of the block. - def get_transaction_from_backlog(self, transaction_id): - """Get a transaction from backlog. + Returns: + The matching transaction. + """ - Args: - transaction_id (str): the id of the transaction. + raise NotImplementedError() - Returns: - The matching transaction. - """ - raise NotImplementedError() +@singledispatch +def get_transaction_from_backlog(connection, transaction_id): + """Get a transaction from backlog. - def get_blocks_status_from_transaction(self, transaction_id): - """Retrieve block election information given a secondary index and value + Args: + transaction_id (str): the id of the transaction. - Args: - value: a value to search (e.g. transaction id string, payload hash string) - index (str): name of a secondary index, e.g. 'transaction_id' + Returns: + The matching transaction. + """ - Returns: - :obj:`list` of :obj:`dict`: A list of blocks with with only election information - """ + raise NotImplementedError() - raise NotImplementedError() - def get_transactions_by_metadata_id(self, metadata_id): - """Retrieves transactions related to a metadata. +@singledispatch +def get_blocks_status_from_transaction(connection, transaction_id): + """Retrieve block election information given a secondary index and value - When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic - dict that contains extra information that can be appended to the transaction. + Args: + value: a value to search (e.g. transaction id string, payload hash string) + index (str): name of a secondary index, e.g. 'transaction_id' - To make it easy to query the bigchain for that particular metadata we create a UUID for the metadata and - store it with the transaction. + Returns: + :obj:`list` of :obj:`dict`: A list of blocks with with only election information + """ - Args: - metadata_id (str): the id for this particular metadata. + raise NotImplementedError() - Returns: - A list of transactions containing that metadata. If no transaction exists with that metadata it - returns an empty list `[]` - """ - raise NotImplementedError() +@singledispatch +def get_transactions_by_metadata_id(connection, metadata_id): + """Retrieves transactions related to a metadata. - def get_transactions_by_asset_id(self, asset_id): - """Retrieves transactions related to a particular asset. + When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic + dict that contains extra information that can be appended to the transaction. - A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions - related to a particular digital asset, knowing the id. + To make it easy to query the bigchain for that particular metadata we create a UUID for the metadata and + store it with the transaction. - Args: - asset_id (str): the id for this particular metadata. + Args: + metadata_id (str): the id for this particular metadata. - Returns: - A list of transactions containing related to the asset. If no transaction exists for that asset it - returns an empty list `[]` - """ + Returns: + A list of transactions containing that metadata. If no transaction exists with that metadata it + returns an empty list `[]` + """ - raise NotImplementedError() + raise NotImplementedError() - def get_spent(self, transaction_id, condition_id): - """Check if a `txid` was already used as an input. - A transaction can be used as an input for another transaction. Bigchain needs to make sure that a - given `txid` is only used once. +@singledispatch +def get_transactions_by_asset_id(connection, asset_id): + """Retrieves transactions related to a particular asset. - Args: - transaction_id (str): The id of the transaction. - condition_id (int): The index of the condition in the respective transaction. + A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions + related to a particular digital asset, knowing the id. - Returns: - The transaction that used the `txid` as an input else `None` - """ + Args: + asset_id (str): the id for this particular metadata. - raise NotImplementedError() + Returns: + A list of transactions containing related to the asset. If no transaction exists for that asset it + returns an empty list `[]` + """ - def get_owned_ids(self, owner): - """Retrieve a list of `txids` that can we used has inputs. + raise NotImplementedError() - Args: - owner (str): base58 encoded public key. - Returns: - A cursor for the matching transactions. - """ +@singledispatch +def get_spent(connection, transaction_id, condition_id): + """Check if a `txid` was already used as an input. - raise NotImplementedError() + A transaction can be used as an input for another transaction. Bigchain needs to make sure that a + given `txid` is only used once. - def get_votes_by_block_id(self, block_id): - """Get all the votes casted for a specific block. + Args: + transaction_id (str): The id of the transaction. + condition_id (int): The index of the condition in the respective transaction. - Args: - block_id (str): the block id to use. + Returns: + The transaction that used the `txid` as an input else `None` + """ - Returns: - A cursor for the matching votes. - """ + raise NotImplementedError() - raise NotImplementedError() - def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): - """Get all the votes casted for a specific block by a specific voter. +@singledispatch +def get_owned_ids(connection, owner): + """Retrieve a list of `txids` that can we used has inputs. - Args: - block_id (str): the block id to use. - node_pubkey (str): base58 encoded public key + Args: + owner (str): base58 encoded public key. - Returns: - A cursor for the matching votes. - """ + Returns: + A cursor for the matching transactions. + """ - raise NotImplementedError() + raise NotImplementedError() - def write_block(self, block, durability='soft'): - """Write a block to the bigchain table. - Args: - block (dict): the block to write. +@singledispatch +def get_votes_by_block_id(connection, block_id): + """Get all the votes casted for a specific block. - Returns: - The database response. - """ + Args: + block_id (str): the block id to use. - raise NotImplementedError() + Returns: + A cursor for the matching votes. + """ - def has_transaction(self, transaction_id): - """Check if a transaction exists in the bigchain table. + raise NotImplementedError() - Args: - transaction_id (str): the id of the transaction to check. - Returns: - ``True`` if the transaction exists, ``False`` otherwise. - """ +@singledispatch +def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey): + """Get all the votes casted for a specific block by a specific voter. - raise NotImplementedError() + Args: + block_id (str): the block id to use. + node_pubkey (str): base58 encoded public key - def count_blocks(self): - """Count the number of blocks in the bigchain table. + Returns: + A cursor for the matching votes. + """ - Returns: - The number of blocks. - """ + raise NotImplementedError() - raise NotImplementedError() - def write_vote(self, vote): - """Write a vote to the votes table. +@singledispatch +def write_block(connection, block, durability='soft'): + """Write a block to the bigchain table. - Args: - vote (dict): the vote to write. + Args: + block (dict): the block to write. - Returns: - The database response. - """ + Returns: + The database response. + """ - raise NotImplementedError() + raise NotImplementedError() - def get_last_voted_block(self, node_pubkey): - """Get the last voted block for a specific node. - Args: - node_pubkey (str): base58 encoded public key. +@singledispatch +def has_transaction(connection, transaction_id): + """Check if a transaction exists in the bigchain table. - Returns: - The last block the node has voted on. If the node didn't cast - any vote then the genesis block is returned. - """ + Args: + transaction_id (str): the id of the transaction to check. - raise NotImplementedError() + Returns: + ``True`` if the transaction exists, ``False`` otherwise. + """ - def get_unvoted_blocks(self, node_pubkey): - """Return all the blocks that have not been voted by the specified node. + raise NotImplementedError() - Args: - node_pubkey (str): base58 encoded public key - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ +@singledispatch +def count_blocks(connection): + """Count the number of blocks in the bigchain table. - raise NotImplementedError() + Returns: + The number of blocks. + """ + + raise NotImplementedError() + + +@singledispatch +def write_vote(connection, vote): + """Write a vote to the votes table. + + Args: + vote (dict): the vote to write. + + Returns: + The database response. + """ + + raise NotImplementedError() + + +@singledispatch +def get_last_voted_block(connection, node_pubkey): + """Get the last voted block for a specific node. + + Args: + node_pubkey (str): base58 encoded public key. + + Returns: + The last block the node has voted on. If the node didn't cast + any vote then the genesis block is returned. + """ + + raise NotImplementedError() + + +@singledispatch +def get_unvoted_blocks(connection, node_pubkey): + """Return all the blocks that have not been voted by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unvoted blocks + """ + + raise NotImplementedError() diff --git a/bigchaindb/backend/rethinkdb/connection.py b/bigchaindb/backend/rethinkdb/connection.py index 4be9ca26..04a82dc6 100644 --- a/bigchaindb/backend/rethinkdb/connection.py +++ b/bigchaindb/backend/rethinkdb/connection.py @@ -1,11 +1,9 @@ import time -import logging import rethinkdb as r from bigchaindb.backend.connection import Connection - -logger = logging.getLogger(__name__) +import bigchaindb class RethinkDBConnection(Connection): @@ -16,19 +14,19 @@ class RethinkDBConnection(Connection): more times to run the query or open a connection. """ - def __init__(self, host, port, dbname, max_tries=3): + def __init__(self, host=None, port=None, db=None, max_tries=3): """Create a new Connection instance. Args: host (str, optional): the host to connect to. port (int, optional): the port to connect to. - dbname (str, optional): the name of the database to use. + db (str, optional): the database to use. max_tries (int, optional): how many tries before giving up. """ - self.host = host - self.port = port - self.dbname = dbname + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.db = db or bigchaindb.config['database']['name'] self.max_tries = max_tries self.conn = None @@ -40,7 +38,7 @@ class RethinkDBConnection(Connection): """ if self.conn is None: - self._connect() + self.connect() for i in range(self.max_tries): try: @@ -49,12 +47,13 @@ class RethinkDBConnection(Connection): if i + 1 == self.max_tries: raise else: - self._connect() + self.connect() - def _connect(self): + def connect(self): for i in range(self.max_tries): try: - self.conn = r.connect(host=self.host, port=self.port, db=self.dbname) + self.conn = r.connect(host=self.host, port=self.port, + db=self.db) except r.ReqlDriverError as exc: if i + 1 == self.max_tries: raise diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 070c3bb8..605af9ef 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -1,439 +1,445 @@ -"""Backend implementation for RethinkDB. - -This module contains all the methods to store and retrieve data from RethinkDB. -""" +"""Query implementation for RethinkDB""" from time import time import rethinkdb as r -from bigchaindb.db import Query from bigchaindb import util -from bigchaindb.db.utils import Connection from bigchaindb.common import exceptions -class RethinkDBBackend(Query): +READ_MODE = 'majority' +WRITE_DURABILITY = 'hard' - def __init__(self, host=None, port=None, db=None): - """Initialize a new RethinkDB Backend instance. + +def write_transaction(connection, signed_transaction): + """Write a transaction to the backlog table. + + Args: + signed_transaction (dict): a signed transaction. + + Returns: + The result of the operation. + """ + + return connection.run( + r.table('backlog') + .insert(signed_transaction, durability=WRITE_DURABILITY)) + + +def update_transaction(connection, transaction_id, doc): + """Update a transaction in the backlog table. + + Args: + transaction_id (str): the id of the transaction. + doc (dict): the values to update. + + Returns: + The result of the operation. + """ + + return connection.run( + r.table('backlog') + .get(transaction_id) + .update(doc)) + + +def delete_transaction(connection, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete + + Returns: + The database response. + """ + + return connection.run( + r.table('backlog') + .get_all(*transaction_id) + .delete(durability=WRITE_DURABILITY)) + + +def get_stale_transactions(connection, reassign_delay): + """Get a cursor of stale transactions. + + Transactions are considered stale if they have been assigned a node, + but are still in the backlog after some amount of time specified in the + configuration. + + Args: + reassign_delay (int): threshold (in seconds) to mark a transaction stale. + + Returns: + A cursor of transactions. + """ + + return connection.run( + r.table('backlog') + .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) + + +def get_transaction_from_block(connection, transaction_id, block_id): + """Get a transaction from a specific block. + + Args: + transaction_id (str): the id of the transaction. + block_id (str): the id of the block. + + Returns: + The matching transaction. + """ + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get(block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == transaction_id))[0] + + +def get_transaction_from_backlog(connection, transaction_id): + """Get a transaction from backlog. + + Args: + transaction_id (str): the id of the transaction. + + Returns: + The matching transaction. + """ + return connection.run( + r.table('backlog') + .get(transaction_id) + .without('assignee', 'assignment_timestamp') + .default(None)) + + +def get_blocks_status_from_transaction(connection, transaction_id): + """Retrieve block election information given a secondary index and value + + Args: + value: a value to search (e.g. transaction id string, payload hash string) + index (str): name of a secondary index, e.g. 'transaction_id' + + Returns: + :obj:`list` of :obj:`dict`: A list of blocks with with only election information + """ + + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(transaction_id, index='transaction_id') + .pluck('votes', 'id', {'block': ['voters']})) + + +def get_txids_by_metadata_id(connection, metadata_id): + """Retrieves transaction ids related to a particular metadata. + + When creating a transaction one of the optional arguments is the + `metadata`. The metadata is a generic dict that contains extra + information that can be appended to the transaction. + + To make it easy to query the bigchain for that particular metadata we + create a UUID for the metadata and store it with the transaction. + + Args: + metadata_id (str): the id for this particular metadata. + + Returns: + A list of transaction ids containing that metadata. If no + transaction exists with that metadata it returns an empty list `[]` + """ + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(metadata_id, index='metadata_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: + transaction['transaction']['metadata']['id'] == + metadata_id) + .get_field('id')) + + +def get_txids_by_asset_id(connection, asset_id): + """Retrieves transactions ids related to a particular asset. + + A digital asset in bigchaindb is identified by an uuid. This allows us + to query all the transactions related to a particular digital asset, + knowing the id. + + Args: + asset_id (str): the id for this particular metadata. + + Returns: + A list of transactions ids related to the asset. If no transaction + exists for that asset it returns an empty list `[]` + """ + + # here we only want to return the transaction ids since later on when + # we are going to retrieve the transaction with status validation + return connection.run( + r.table('bigchain') + .get_all(asset_id, index='asset_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id) + .get_field('id')) + + +def get_asset_by_id(connection, asset_id): + """Returns the asset associated with an asset_id. Args: - host (str): the host to connect to. - port (int): the port to connect to. - db (str): the name of the database to use. - """ - - self.read_mode = 'majority' - self.durability = 'soft' - self.connection = Connection(host=host, port=port, db=db) - - def write_transaction(self, signed_transaction): - """Write a transaction to the backlog table. - - Args: - signed_transaction (dict): a signed transaction. + asset_id (str): The asset id. Returns: - The result of the operation. - """ - - return self.connection.run( - r.table('backlog') - .insert(signed_transaction, durability=self.durability)) + Returns a rethinkdb cursor. + """ + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(asset_id, index='asset_id') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: + transaction['transaction']['asset']['id'] == asset_id) + .filter(lambda transaction: + transaction['transaction']['operation'] == 'CREATE') + .pluck({'transaction': 'asset'})) - def update_transaction(self, transaction_id, doc): - """Update a transaction in the backlog table. - - Args: - transaction_id (str): the id of the transaction. - doc (dict): the values to update. - Returns: - The result of the operation. - """ +def get_spent(connection, transaction_id, condition_id): + """Check if a `txid` was already used as an input. - return self.connection.run( - r.table('backlog') - .get(transaction_id) - .update(doc)) - - def delete_transaction(self, *transaction_id): - """Delete a transaction from the backlog. - - Args: - *transaction_id (str): the transaction(s) to delete - - Returns: - The database response. - """ - - return self.connection.run( - r.table('backlog') - .get_all(*transaction_id) - .delete(durability='hard')) - - def get_stale_transactions(self, reassign_delay): - """Get a cursor of stale transactions. - - Transactions are considered stale if they have been assigned a node, - but are still in the backlog after some amount of time specified in the - configuration. - - Args: - reassign_delay (int): threshold (in seconds) to mark a transaction stale. - - Returns: - A cursor of transactions. - """ - - return self.connection.run( - r.table('backlog') - .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) - - def get_transaction_from_block(self, transaction_id, block_id): - """Get a transaction from a specific block. - - Args: - transaction_id (str): the id of the transaction. - block_id (str): the id of the block. - - Returns: - The matching transaction. - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(block_id) - .get_field('block') - .get_field('transactions') - .filter(lambda tx: tx['id'] == transaction_id))[0] - - def get_transaction_from_backlog(self, transaction_id): - """Get a transaction from backlog. - - Args: - transaction_id (str): the id of the transaction. - - Returns: - The matching transaction. - """ - return self.connection.run( - r.table('backlog') - .get(transaction_id) - .without('assignee', 'assignment_timestamp') - .default(None)) - - def get_blocks_status_from_transaction(self, transaction_id): - """Retrieve block election information given a secondary index and value - - Args: - value: a value to search (e.g. transaction id string, payload hash string) - index (str): name of a secondary index, e.g. 'transaction_id' - - Returns: - :obj:`list` of :obj:`dict`: A list of blocks with with only election information - """ - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(transaction_id, index='transaction_id') - .pluck('votes', 'id', {'block': ['voters']})) - - def get_txids_by_metadata_id(self, metadata_id): - """Retrieves transaction ids related to a particular metadata. - - When creating a transaction one of the optional arguments is the - `metadata`. The metadata is a generic dict that contains extra - information that can be appended to the transaction. - - To make it easy to query the bigchain for that particular metadata we - create a UUID for the metadata and store it with the transaction. - - Args: - metadata_id (str): the id for this particular metadata. - - Returns: - A list of transaction ids containing that metadata. If no - transaction exists with that metadata it returns an empty list `[]` - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(metadata_id, index='metadata_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: - transaction['transaction']['metadata']['id'] == - metadata_id) - .get_field('id')) - - def get_txids_by_asset_id(self, asset_id): - """Retrieves transactions ids related to a particular asset. - - A digital asset in bigchaindb is identified by an uuid. This allows us - to query all the transactions related to a particular digital asset, - knowing the id. - - Args: - asset_id (str): the id for this particular metadata. - - Returns: - A list of transactions ids related to the asset. If no transaction - exists for that asset it returns an empty list `[]` - """ - - # here we only want to return the transaction ids since later on when - # we are going to retrieve the transaction with status validation - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(asset_id, index='asset_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id) - .get_field('id')) - - def get_asset_by_id(self, asset_id): - """Returns the asset associated with an asset_id. - - Args: - asset_id (str): The asset id. - - Returns: - Returns a rethinkdb cursor. - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(asset_id, index='asset_id') - .concat_map(lambda block: block['block']['transactions']) - .filter(lambda transaction: - transaction['transaction']['asset']['id'] == asset_id) - .filter(lambda transaction: - transaction['transaction']['operation'] == 'CREATE') - .pluck({'transaction': 'asset'})) - - def get_spent(self, transaction_id, condition_id): - """Check if a `txid` was already used as an input. - - A transaction can be used as an input for another transaction. Bigchain needs to make sure that a - given `txid` is only used once. - - Args: - transaction_id (str): The id of the transaction. - condition_id (int): The index of the condition in the respective transaction. - - Returns: - The transaction that used the `txid` as an input else `None` - """ - - # TODO: use index! - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda transaction: transaction['transaction']['fulfillments'].contains( - lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) - - def get_owned_ids(self, owner): - """Retrieve a list of `txids` that can we used has inputs. - - Args: - owner (str): base58 encoded public key. - - Returns: - A cursor for the matching transactions. - """ - - # TODO: use index! - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda tx: tx['transaction']['conditions'].contains( - lambda c: c['owners_after'].contains(owner)))) - - def get_votes_by_block_id(self, block_id): - """Get all the votes casted for a specific block. - - Args: - block_id (str): the block id to use. - - Returns: - A cursor for the matching votes. - """ - return self.connection.run( - r.table('votes', read_mode=self.read_mode) - .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) - - def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): - """Get all the votes casted for a specific block by a specific voter. - - Args: - block_id (str): the block id to use. - node_pubkey (str): base58 encoded public key - - Returns: - A cursor for the matching votes. - """ - return self.connection.run( - r.table('votes', read_mode=self.read_mode) - .get_all([block_id, node_pubkey], index='block_and_voter')) - - def write_block(self, block, durability='soft'): - """Write a block to the bigchain table. - - Args: - block (dict): the block to write. - - Returns: - The database response. - """ - return self.connection.run( - r.table('bigchain') - .insert(r.json(block), durability=durability)) - - def get_block(self, block_id): - """Get a block from the bigchain table - - Args: - block_id (str): block id of the block to get - - Returns: - block (dict): the block or `None` - """ - return self.connection.run(r.table('bigchain').get(block_id)) - - def has_transaction(self, transaction_id): - """Check if a transaction exists in the bigchain table. - - Args: - transaction_id (str): the id of the transaction to check. - - Returns: - ``True`` if the transaction exists, ``False`` otherwise. - """ - return bool(self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get_all(transaction_id, index='transaction_id').count())) - - def count_blocks(self): - """Count the number of blocks in the bigchain table. - - Returns: - The number of blocks. - """ - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .count()) - - def count_backlog(self): - """Count the number of transactions in the backlog table. - - Returns: - The number of transactions in the backlog. - """ - - return self.connection.run( - r.table('backlog', read_mode=self.read_mode) - .count()) - - def write_vote(self, vote): - """Write a vote to the votes table. - - Args: - vote (dict): the vote to write. - - Returns: - The database response. - """ - return self.connection.run( - r.table('votes') - .insert(vote)) - - def get_genesis_block(self): - """Get the genesis block - - Returns: - The genesis block - """ - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block) - .nth(0)) - - def get_last_voted_block(self, node_pubkey): - """Get the last voted block for a specific node. - - Args: - node_pubkey (str): base58 encoded public key. - - Returns: - The last block the node has voted on. If the node didn't cast - any vote then the genesis block is returned. - """ + A transaction can be used as an input for another transaction. Bigchain needs to make sure that a + given `txid` is only used once. + + Args: + transaction_id (str): The id of the transaction. + condition_id (int): The index of the condition in the respective transaction. + + Returns: + The transaction that used the `txid` as an input else `None` + """ + + # TODO: use index! + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'].contains( + lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) + + +def get_owned_ids(connection, owner): + """Retrieve a list of `txids` that can we used has inputs. + + Args: + owner (str): base58 encoded public key. + + Returns: + A cursor for the matching transactions. + """ + + # TODO: use index! + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'].contains( + lambda c: c['owners_after'].contains(owner)))) + + +def get_votes_by_block_id(connection, block_id): + """Get all the votes casted for a specific block. + + Args: + block_id (str): the block id to use. + + Returns: + A cursor for the matching votes. + """ + return connection.run( + r.table('votes', read_mode=READ_MODE) + .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) + + +def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey): + """Get all the votes casted for a specific block by a specific voter. + + Args: + block_id (str): the block id to use. + node_pubkey (str): base58 encoded public key + + Returns: + A cursor for the matching votes. + """ + return connection.run( + r.table('votes') + .get_all([block_id, node_pubkey], index='block_and_voter')) + + +def write_block(connection, block): + """Write a block to the bigchain table. + + Args: + block (dict): the block to write. + + Returns: + The database response. + """ + return connection.run( + r.table('bigchain') + .insert(r.json(block), durability=WRITE_DURABILITY)) + + +def get_block(connection, block_id): + """Get a block from the bigchain table + + Args: + block_id (str): block id of the block to get + + Returns: + block (dict): the block or `None` + """ + return connection.run(r.table('bigchain').get(block_id)) + + +def has_transaction(connection, transaction_id): + """Check if a transaction exists in the bigchain table. + + Args: + transaction_id (str): the id of the transaction to check. + + Returns: + ``True`` if the transaction exists, ``False`` otherwise. + """ + return bool(connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get_all(transaction_id, index='transaction_id').count())) + + +def count_blocks(connection): + """Count the number of blocks in the bigchain table. + + Returns: + The number of blocks. + """ + + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .count()) + + +def count_backlog(connection): + """Count the number of transactions in the backlog table. + + Returns: + The number of transactions in the backlog. + """ + + return connection.run( + r.table('backlog', read_mode=READ_MODE) + .count()) + + +def write_vote(connection, vote): + """Write a vote to the votes table. + + Args: + vote (dict): the vote to write. + + Returns: + The database response. + """ + return connection.run( + r.table('votes') + .insert(vote)) + + +def get_genesis_block(connection): + """Get the genesis block + + Returns: + The genesis block + """ + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .filter(util.is_genesis_block) + .nth(0)) + + +def get_last_voted_block(connection, node_pubkey): + """Get the last voted block for a specific node. + + Args: + node_pubkey (str): base58 encoded public key. + + Returns: + The last block the node has voted on. If the node didn't cast + any vote then the genesis block is returned. + """ + try: + # get the latest value for the vote timestamp (over all votes) + max_timestamp = connection.run( + r.table('votes', read_mode=READ_MODE) + .filter(r.row['node_pubkey'] == node_pubkey) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] + + last_voted = list(connection.run( + r.table('votes', read_mode=READ_MODE) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == node_pubkey))) + + except r.ReqlNonExistenceError: + # return last vote if last vote exists else return Genesis block + return get_genesis_block() + + # Now the fun starts. Since the resolution of timestamp is a second, + # we might have more than one vote per timestamp. If this is the case + # then we need to rebuild the chain for the blocks that have been retrieved + # to get the last one. + + # Given a block_id, mapping returns the id of the block pointing at it. + mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] + for v in last_voted} + + # Since we follow the chain backwards, we can start from a random + # point of the chain and "move up" from it. + last_block_id = list(mapping.values())[0] + + # We must be sure to break the infinite loop. This happens when: + # - the block we are currenty iterating is the one we are looking for. + # This will trigger a KeyError, breaking the loop + # - we are visiting again a node we already explored, hence there is + # a loop. This might happen if a vote points both `previous_block` + # and `voting_for_block` to the same `block_id` + explored = set() + + while True: try: - # get the latest value for the vote timestamp (over all votes) - max_timestamp = self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['node_pubkey'] == node_pubkey) - .max(r.row['vote']['timestamp']))['vote']['timestamp'] + if last_block_id in explored: + raise exceptions.CyclicBlockchainError() + explored.add(last_block_id) + last_block_id = mapping[last_block_id] + except KeyError: + break - last_voted = list(self.connection.run( - r.table('votes', read_mode=self.read_mode) - .filter(r.row['vote']['timestamp'] == max_timestamp) - .filter(r.row['node_pubkey'] == node_pubkey))) + return connection.run( + r.table('bigchain', read_mode=READ_MODE) + .get(last_block_id)) - except r.ReqlNonExistenceError: - # return last vote if last vote exists else return Genesis block - return self.get_genesis_block() - # Now the fun starts. Since the resolution of timestamp is a second, - # we might have more than one vote per timestamp. If this is the case - # then we need to rebuild the chain for the blocks that have been retrieved - # to get the last one. +def get_unvoted_blocks(connection, node_pubkey): + """Return all the blocks that have not been voted by the specified node. - # Given a block_id, mapping returns the id of the block pointing at it. - mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] - for v in last_voted} + Args: + node_pubkey (str): base58 encoded public key - # Since we follow the chain backwards, we can start from a random - # point of the chain and "move up" from it. - last_block_id = list(mapping.values())[0] + Returns: + :obj:`list` of :obj:`dict`: a list of unvoted blocks + """ - # We must be sure to break the infinite loop. This happens when: - # - the block we are currenty iterating is the one we are looking for. - # This will trigger a KeyError, breaking the loop - # - we are visiting again a node we already explored, hence there is - # a loop. This might happen if a vote points both `previous_block` - # and `voting_for_block` to the same `block_id` - explored = set() + unvoted = connection.run( + r.table('bigchain', read_mode=READ_MODE) + .filter(lambda block: r.table('votes', read_mode=READ_MODE) + .get_all([block['id'], node_pubkey], index='block_and_voter') + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) - while True: - try: - if last_block_id in explored: - raise exceptions.CyclicBlockchainError() - explored.add(last_block_id) - last_block_id = mapping[last_block_id] - except KeyError: - break - - return self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .get(last_block_id)) - - def get_unvoted_blocks(self, node_pubkey): - """Return all the blocks that have not been voted by the specified node. - - Args: - node_pubkey (str): base58 encoded public key - - Returns: - :obj:`list` of :obj:`dict`: a list of unvoted blocks - """ - - unvoted = self.connection.run( - r.table('bigchain', read_mode=self.read_mode) - .filter(lambda block: r.table('votes', read_mode=self.read_mode) - .get_all([block['id'], node_pubkey], index='block_and_voter') - .is_empty()) - .order_by(r.asc(r.row['block']['timestamp']))) - - # FIXME: I (@vrde) don't like this solution. Filtering should be done at a - # database level. Solving issue #444 can help untangling the situation - unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) - return unvoted_blocks + # FIXME: I (@vrde) don't like this solution. Filtering should be done at a + # database level. Solving issue #444 can help untangling the situation + unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) + return unvoted_blocks diff --git a/bigchaindb/backend/rethinkdb/schema.py b/bigchaindb/backend/rethinkdb/schema.py index b55e1cee..25ccb34f 100644 --- a/bigchaindb/backend/rethinkdb/schema.py +++ b/bigchaindb/backend/rethinkdb/schema.py @@ -2,7 +2,6 @@ import logging -from bigchaindb.db import Schema from bigchaindb.common import exceptions import rethinkdb as r @@ -10,94 +9,94 @@ import rethinkdb as r logger = logging.getLogger(__name__) -class RethinkDBSchema(Schema): +def create_database(connection, name): + if connection.run(r.db_list().contains(name)): + raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(name)) - def __init__(self, connection, name): - self.connection = connection - self.name = name + logger.info('Create database `%s`.', name) + connection.run(r.db_create(name)) - def create_database(self): - if self.connection.run(r.db_list().contains(self.name)): - raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(self.name)) - logger.info('Create database `%s`.', self.name) - self.connection.run(r.db_create(self.name)) +def create_tables(connection, name): + for table_name in ['bigchain', 'backlog', 'votes']: + logger.info('Create `%s` table.', table_name) + connection.run(r.db(name).table_create(table_name)) - def create_tables(self): - for table_name in ['bigchain', 'backlog', 'votes']: - logger.info('Create `%s` table.', table_name) - self.connection.run(r.db(self.name).table_create(table_name)) - def create_indexes(self): - self.create_bigchain_secondary_index() +def create_indexes(connection, name): + create_bigchain_secondary_index(connection, name) - def drop_database(self): - try: - logger.info('Drop database `%s`', self.name) - self.connection.run(r.db_drop(self.name)) - logger.info('Done.') - except r.ReqlOpFailedError: - raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(self.name)) - def create_bigchain_secondary_index(self): - logger.info('Create `bigchain` secondary index.') +def drop_database(connection, name): + try: + logger.info('Drop database `%s`', name) + connection.run(r.db_drop(name)) + logger.info('Done.') + except r.ReqlOpFailedError: + raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(name)) - # to order blocks by timestamp - self.connection.run( - r.db(self.name) - .table('bigchain') - .index_create('block_timestamp', r.row['block']['timestamp'])) - # to query the bigchain for a transaction id - self.connection.run( - r.db(self.name) - .table('bigchain') - .index_create('transaction_id', r.row['block']['transactions']['id'], multi=True)) +def create_bigchain_secondary_index(connection, name): + logger.info('Create `bigchain` secondary index.') - # secondary index for payload data by UUID - self.connection.run( - r.db(self.name) - .table('bigchain') - .index_create('metadata_id', r.row['block']['transactions']['transaction']['metadata']['id'], multi=True)) + # to order blocks by timestamp + connection.run( + r.db(name) + .table('bigchain') + .index_create('block_timestamp', r.row['block']['timestamp'])) - # secondary index for asset uuid - self.connection.run( - r.db(self.name) - .table('bigchain') - .index_create('asset_id', r.row['block']['transactions']['transaction']['asset']['id'], multi=True)) + # to query the bigchain for a transaction id + connection.run( + r.db(name) + .table('bigchain') + .index_create('transaction_id', r.row['block']['transactions']['id'], multi=True)) - # wait for rethinkdb to finish creating secondary indexes - self.connection.run( - r.db(self.name) - .table('bigchain') - .index_wait()) + # secondary index for payload data by UUID + connection.run( + r.db(name) + .table('bigchain') + .index_create('metadata_id', r.row['block']['transactions']['transaction']['metadata']['id'], multi=True)) - def create_backlog_secondary_index(self): - logger.info('Create `backlog` secondary index.') + # secondary index for asset uuid + connection.run( + r.db(name) + .table('bigchain') + .index_create('asset_id', r.row['block']['transactions']['transaction']['asset']['id'], multi=True)) - # compound index to read transactions from the backlog per assignee - self.connection.run( - r.db(self.name) - .table('backlog') - .index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']])) + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(name) + .table('bigchain') + .index_wait()) - # wait for rethinkdb to finish creating secondary indexes - self.connection.run( - r.db(self.name) - .table('backlog') - .index_wait()) - def create_votes_secondary_index(self): - logger.info('Create `votes` secondary index.') +def create_backlog_secondary_index(connection, name): + logger.info('Create `backlog` secondary index.') - # compound index to order votes by block id and node - self.connection.run( - r.db(self.name) - .table('votes')\ - .index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']])) + # compound index to read transactions from the backlog per assignee + connection.run( + r.db(name) + .table('backlog') + .index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']])) - # wait for rethinkdb to finish creating secondary indexes - self.connection.run( - r.db(self.name) - .table('votes') - .index_wait()) + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(name) + .table('backlog') + .index_wait()) + + +def create_votes_secondary_index(connection, name): + logger.info('Create `votes` secondary index.') + + # compound index to order votes by block id and node + connection.run( + r.db(name) + .table('votes') + .index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']])) + + # wait for rethinkdb to finish creating secondary indexes + connection.run( + r.db(name) + .table('votes') + .index_wait()) diff --git a/bigchaindb/backend/schema.py b/bigchaindb/backend/schema.py index fad64e73..64321b68 100644 --- a/bigchaindb/backend/schema.py +++ b/bigchaindb/backend/schema.py @@ -1,13 +1,23 @@ -class Schema: +"""Schema-providing interfaces for backend databases""" - def create_database(self): - raise NotImplementedError() +from functools import singledispatch - def create_tables(self): - raise NotImplementedError() - def create_indexes(self): - raise NotImplementedError() +@singledispatch +def create_database(connection, name): + raise NotImplementedError() - def drop_database(self): - raise NotImplementedError() + +@singledispatch +def create_tables(connection, name): + raise NotImplementedError() + + +@singledispatch +def create_indexes(connection, name): + raise NotImplementedError() + + +@singledispatch +def drop_database(connection, name): + raise NotImplementedError()