diff --git a/bigchaindb/core.py b/bigchaindb/core.py index ee18bcb8..5e45a89d 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -133,8 +133,20 @@ class Bigchain(object): transaction['id'], {'assignee': new_assignee, 'assignment_timestamp': time()}) + def delete_transaction(self, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete + + Returns: + The database response. + """ + + return self.backend.delete_transaction(*transaction_id) + def get_stale_transactions(self): - """Get a cursor of stale transactions + """Get a cursor of stale transactions. Transactions are considered stale if they have been assigned a node, but are still in the backlog after some amount of time specified in the configuration diff --git a/bigchaindb/db/backends/rethinkdb.py b/bigchaindb/db/backends/rethinkdb.py index 83aa0279..0fb84741 100644 --- a/bigchaindb/db/backends/rethinkdb.py +++ b/bigchaindb/db/backends/rethinkdb.py @@ -15,6 +15,14 @@ from bigchaindb.common import exceptions class RethinkDBBackend: def __init__(self, host=None, port=None, db=None): + """Initialize a new RethinkDB Backend instance. + + Args: + host (str): the host to connect to. + port (int): the port to connect to. + db (str): the name of the database to use. + """ + self.read_mode = 'majority' self.durability = 'soft' self.connection = Connection(host=host, port=port, db=db) @@ -49,6 +57,21 @@ class RethinkDBBackend: .get(transaction_id) .update(doc)) + def delete_transaction(self, *transaction_id): + """Delete a transaction from the backlog. + + Args: + *transaction_id (str): the transaction(s) to delete + + Returns: + The database response. + """ + + return self.connection.run( + r.table('backlog') + .get_all(*transaction_id) + .delete(durability='hard')) + def get_stale_transactions(self, reassign_delay): """Get a cursor of stale transactions. @@ -68,6 +91,15 @@ class RethinkDBBackend: .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) def get_transaction_from_block(self, transaction_id, block_id): + """Get a transaction from a specific block. + + Args: + transaction_id (str): the id of the transaction. + block_id (str): the id of the block. + + Returns: + The matching transaction. + """ return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get(block_id) @@ -76,6 +108,14 @@ class RethinkDBBackend: .filter(lambda tx: tx['id'] == transaction_id))[0] def get_transaction_from_backlog(self, transaction_id): + """Get a transaction from backlog. + + Args: + transaction_id (str): the id of the transaction. + + Returns: + The matching transaction. + """ return self.connection.run( r.table('backlog') .get(transaction_id) @@ -133,6 +173,7 @@ class RethinkDBBackend: A list of transactions containing related to the asset. If no transaction exists for that asset it returns an empty list `[]` """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get_all(asset_id, index='asset_id') @@ -140,6 +181,19 @@ class RethinkDBBackend: .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id)) def get_spent(self, transaction_id, condition_id): + """Check if a `txid` was already used as an input. + + A transaction can be used as an input for another transaction. Bigchain needs to make sure that a + given `txid` is only used once. + + Args: + transaction_id (str): The id of the transaction. + condition_id (int): The index of the condition in the respective transaction. + + Returns: + The transaction that used the `txid` as an input else `None` + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) @@ -147,6 +201,15 @@ class RethinkDBBackend: lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) def get_owned_ids(self, owner): + """Retrieve a list of `txids` that can we used has inputs. + + Args: + owner (str): base58 encoded public key. + + Returns: + A cursor for the matching transactions. + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .concat_map(lambda doc: doc['block']['transactions']) @@ -154,36 +217,92 @@ class RethinkDBBackend: lambda c: c['owners_after'].contains(owner)))) def get_votes_by_block_id(self, block_id): + """Get all the votes casted for a specific block. + + Args: + block_id (str): the block id to use. + + Returns: + A cursor for the matching votes. + """ return self.connection.run( r.table('votes', read_mode=self.read_mode) .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) def get_votes_by_block_id_and_voter(self, block_id, node_pubkey): + """Get all the votes casted for a specific block by a specific voter. + + Args: + block_id (str): the block id to use. + node_pubkey (str): base58 encoded public key + + Returns: + A cursor for the matching votes. + """ return self.connection.run( r.table('votes', read_mode=self.read_mode) .get_all([block_id, node_pubkey], index='block_and_voter')) def write_block(self, block, durability='soft'): + """Write a block to the bigchain table. + + Args: + block (dict): the block to write. + + Returns: + The database response. + """ return self.connection.run( r.table('bigchain') .insert(r.json(block), durability=durability)) def has_transaction(self, transaction_id): + """Check if a transaction exists in the bigchain table. + + Args: + transaction_id (str): the id of the transaction to check. + + Returns: + ``True`` if the transaction exists, ``False`` otherwise. + """ return bool(self.connection.run( r.table('bigchain', read_mode=self.read_mode) .get_all(transaction_id, index='transaction_id').count())) def count_blocks(self): + """Count the number of blocks in the bigchain table. + + Returns: + The number of blocks. + """ + return self.connection.run( r.table('bigchain', read_mode=self.read_mode) .count()) def write_vote(self, vote): + """Write a vote to the votes table. + + Args: + vote (dict): the vote to write. + + Returns: + The database response. + """ return self.connection.run( r.table('votes') .insert(vote)) def get_last_voted_block(self, node_pubkey): + """Get the last voted block for a specific node. + + Args: + node_pubkey (str): base58 encoded public key. + + Returns: + The last block the node has voted on. If the node didn't cast + any vote then the genesis block is returned. + """ try: # get the latest value for the vote timestamp (over all votes) max_timestamp = self.connection.run( @@ -238,7 +357,10 @@ class RethinkDBBackend: .get(last_block_id)) def get_unvoted_blocks(self, node_pubkey): - """Return all the blocks that have not been voted on by this node. + """Return all the blocks that have not been voted by the specified node. + + Args: + node_pubkey (str): base58 encoded public key Returns: :obj:`list` of :obj:`dict`: a list of unvoted blocks diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0d5e24b2..4142b234 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -69,10 +69,7 @@ class BlockPipeline: # if the tx is already in a valid or undecided block, # then it no longer should be in the backlog, or added # to a new block. We can delete and drop it. - self.bigchain.connection.run( - r.table('backlog') - .get(tx.id) - .delete(durability='hard')) + self.bigchain.delete_transaction(tx.id) return None tx_validated = self.bigchain.is_valid_transaction(tx) @@ -81,10 +78,7 @@ class BlockPipeline: else: # if the transaction is not valid, remove it from the # backlog - self.bigchain.connection.run( - r.table('backlog') - .get(tx.id) - .delete(durability='hard')) + self.bigchain.delete_transaction(tx.id) return None def create(self, tx, timeout=False): @@ -136,10 +130,7 @@ class BlockPipeline: Returns: :class:`~bigchaindb.models.Block`: The block. """ - self.bigchain.connection.run( - r.table('backlog') - .get_all(*[tx.id for tx in block.transactions]) - .delete(durability='hard')) + self.bigchain.delete_transaction(*[tx.id for tx in block.transactions]) return block