Replace cherrypicked class-based architecture with singledispatch

This commit is contained in:
Brett Sun 2016-12-02 14:44:27 +01:00 committed by Sylvain Bellemare
parent 1d0a7d2153
commit dabb81ac98
7 changed files with 731 additions and 686 deletions

View File

@ -1,4 +1 @@
from bigchaindb.db.factory import get_backend_factory from bigchaindb.backend.connection import Connection # noqa
from bigchaindb.db.query import Query
from bigchaindb.db.schema import Schema
from bigchaindb.db.connection import Connection

View File

@ -0,0 +1 @@
"""Changefeed interfaces for backend databases"""

View File

@ -1,13 +1,10 @@
"""Query interfaces for backend databases"""
"""Interface to query the database. from functools import singledispatch
This module contains all the methods to store and retrieve data from a generic database.
"""
class Query: @singledispatch
def write_transaction(connection, signed_transaction):
def write_transaction(self, signed_transaction):
"""Write a transaction to the backlog table. """Write a transaction to the backlog table.
Args: Args:
@ -18,7 +15,9 @@ class Query:
""" """
raise NotImplementedError() raise NotImplementedError()
def update_transaction(self, transaction_id, doc):
@singledispatch
def update_transaction(connection, transaction_id, doc):
"""Update a transaction in the backlog table. """Update a transaction in the backlog table.
Args: Args:
@ -30,7 +29,9 @@ class Query:
""" """
raise NotImplementedError() raise NotImplementedError()
def delete_transaction(self, *transaction_id):
@singledispatch
def delete_transaction(connection, *transaction_id):
"""Delete a transaction from the backlog. """Delete a transaction from the backlog.
Args: Args:
@ -41,7 +42,9 @@ class Query:
""" """
raise NotImplementedError() raise NotImplementedError()
def get_stale_transactions(self, reassign_delay):
@singledispatch
def get_stale_transactions(connection, reassign_delay):
"""Get a cursor of stale transactions. """Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node, Transactions are considered stale if they have been assigned a node,
@ -57,7 +60,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_transaction_from_block(self, transaction_id, block_id):
@singledispatch
def get_transaction_from_block(connection, transaction_id, block_id):
"""Get a transaction from a specific block. """Get a transaction from a specific block.
Args: Args:
@ -70,7 +75,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_transaction_from_backlog(self, transaction_id):
@singledispatch
def get_transaction_from_backlog(connection, transaction_id):
"""Get a transaction from backlog. """Get a transaction from backlog.
Args: Args:
@ -82,7 +89,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_blocks_status_from_transaction(self, transaction_id):
@singledispatch
def get_blocks_status_from_transaction(connection, transaction_id):
"""Retrieve block election information given a secondary index and value """Retrieve block election information given a secondary index and value
Args: Args:
@ -95,7 +104,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_transactions_by_metadata_id(self, metadata_id):
@singledispatch
def get_transactions_by_metadata_id(connection, metadata_id):
"""Retrieves transactions related to a metadata. """Retrieves transactions related to a metadata.
When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic
@ -114,7 +125,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_transactions_by_asset_id(self, asset_id):
@singledispatch
def get_transactions_by_asset_id(connection, asset_id):
"""Retrieves transactions related to a particular asset. """Retrieves transactions related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions
@ -130,7 +143,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_spent(self, transaction_id, condition_id):
@singledispatch
def get_spent(connection, transaction_id, condition_id):
"""Check if a `txid` was already used as an input. """Check if a `txid` was already used as an input.
A transaction can be used as an input for another transaction. Bigchain needs to make sure that a A transaction can be used as an input for another transaction. Bigchain needs to make sure that a
@ -146,7 +161,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_owned_ids(self, owner):
@singledispatch
def get_owned_ids(connection, owner):
"""Retrieve a list of `txids` that can we used has inputs. """Retrieve a list of `txids` that can we used has inputs.
Args: Args:
@ -158,7 +175,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_votes_by_block_id(self, block_id):
@singledispatch
def get_votes_by_block_id(connection, block_id):
"""Get all the votes casted for a specific block. """Get all the votes casted for a specific block.
Args: Args:
@ -170,7 +189,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_votes_by_block_id_and_voter(self, block_id, node_pubkey):
@singledispatch
def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
"""Get all the votes casted for a specific block by a specific voter. """Get all the votes casted for a specific block by a specific voter.
Args: Args:
@ -183,7 +204,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def write_block(self, block, durability='soft'):
@singledispatch
def write_block(connection, block, durability='soft'):
"""Write a block to the bigchain table. """Write a block to the bigchain table.
Args: Args:
@ -195,7 +218,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def has_transaction(self, transaction_id):
@singledispatch
def has_transaction(connection, transaction_id):
"""Check if a transaction exists in the bigchain table. """Check if a transaction exists in the bigchain table.
Args: Args:
@ -207,7 +232,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def count_blocks(self):
@singledispatch
def count_blocks(connection):
"""Count the number of blocks in the bigchain table. """Count the number of blocks in the bigchain table.
Returns: Returns:
@ -216,7 +243,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def write_vote(self, vote):
@singledispatch
def write_vote(connection, vote):
"""Write a vote to the votes table. """Write a vote to the votes table.
Args: Args:
@ -228,7 +257,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_last_voted_block(self, node_pubkey):
@singledispatch
def get_last_voted_block(connection, node_pubkey):
"""Get the last voted block for a specific node. """Get the last voted block for a specific node.
Args: Args:
@ -241,7 +272,9 @@ class Query:
raise NotImplementedError() raise NotImplementedError()
def get_unvoted_blocks(self, node_pubkey):
@singledispatch
def get_unvoted_blocks(connection, node_pubkey):
"""Return all the blocks that have not been voted by the specified node. """Return all the blocks that have not been voted by the specified node.
Args: Args:

View File

@ -1,11 +1,9 @@
import time import time
import logging
import rethinkdb as r import rethinkdb as r
from bigchaindb.backend.connection import Connection from bigchaindb.backend.connection import Connection
import bigchaindb
logger = logging.getLogger(__name__)
class RethinkDBConnection(Connection): class RethinkDBConnection(Connection):
@ -16,19 +14,19 @@ class RethinkDBConnection(Connection):
more times to run the query or open a connection. more times to run the query or open a connection.
""" """
def __init__(self, host, port, dbname, max_tries=3): def __init__(self, host=None, port=None, db=None, max_tries=3):
"""Create a new Connection instance. """Create a new Connection instance.
Args: Args:
host (str, optional): the host to connect to. host (str, optional): the host to connect to.
port (int, optional): the port to connect to. port (int, optional): the port to connect to.
dbname (str, optional): the name of the database to use. db (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up. max_tries (int, optional): how many tries before giving up.
""" """
self.host = host self.host = host or bigchaindb.config['database']['host']
self.port = port self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname self.db = db or bigchaindb.config['database']['name']
self.max_tries = max_tries self.max_tries = max_tries
self.conn = None self.conn = None
@ -40,7 +38,7 @@ class RethinkDBConnection(Connection):
""" """
if self.conn is None: if self.conn is None:
self._connect() self.connect()
for i in range(self.max_tries): for i in range(self.max_tries):
try: try:
@ -49,12 +47,13 @@ class RethinkDBConnection(Connection):
if i + 1 == self.max_tries: if i + 1 == self.max_tries:
raise raise
else: else:
self._connect() self.connect()
def _connect(self): def connect(self):
for i in range(self.max_tries): for i in range(self.max_tries):
try: try:
self.conn = r.connect(host=self.host, port=self.port, db=self.dbname) self.conn = r.connect(host=self.host, port=self.port,
db=self.db)
except r.ReqlDriverError as exc: except r.ReqlDriverError as exc:
if i + 1 == self.max_tries: if i + 1 == self.max_tries:
raise raise

View File

@ -1,34 +1,18 @@
"""Backend implementation for RethinkDB. """Query implementation for RethinkDB"""
This module contains all the methods to store and retrieve data from RethinkDB.
"""
from time import time from time import time
import rethinkdb as r import rethinkdb as r
from bigchaindb.db import Query
from bigchaindb import util from bigchaindb import util
from bigchaindb.db.utils import Connection
from bigchaindb.common import exceptions from bigchaindb.common import exceptions
class RethinkDBBackend(Query): READ_MODE = 'majority'
WRITE_DURABILITY = 'hard'
def __init__(self, host=None, port=None, db=None):
"""Initialize a new RethinkDB Backend instance.
Args: def write_transaction(connection, signed_transaction):
host (str): the host to connect to.
port (int): the port to connect to.
db (str): the name of the database to use.
"""
self.read_mode = 'majority'
self.durability = 'soft'
self.connection = Connection(host=host, port=port, db=db)
def write_transaction(self, signed_transaction):
"""Write a transaction to the backlog table. """Write a transaction to the backlog table.
Args: Args:
@ -38,11 +22,12 @@ class RethinkDBBackend(Query):
The result of the operation. The result of the operation.
""" """
return self.connection.run( return connection.run(
r.table('backlog') r.table('backlog')
.insert(signed_transaction, durability=self.durability)) .insert(signed_transaction, durability=WRITE_DURABILITY))
def update_transaction(self, transaction_id, doc):
def update_transaction(connection, transaction_id, doc):
"""Update a transaction in the backlog table. """Update a transaction in the backlog table.
Args: Args:
@ -53,12 +38,13 @@ class RethinkDBBackend(Query):
The result of the operation. The result of the operation.
""" """
return self.connection.run( return connection.run(
r.table('backlog') r.table('backlog')
.get(transaction_id) .get(transaction_id)
.update(doc)) .update(doc))
def delete_transaction(self, *transaction_id):
def delete_transaction(connection, *transaction_id):
"""Delete a transaction from the backlog. """Delete a transaction from the backlog.
Args: Args:
@ -68,12 +54,13 @@ class RethinkDBBackend(Query):
The database response. The database response.
""" """
return self.connection.run( return connection.run(
r.table('backlog') r.table('backlog')
.get_all(*transaction_id) .get_all(*transaction_id)
.delete(durability='hard')) .delete(durability=WRITE_DURABILITY))
def get_stale_transactions(self, reassign_delay):
def get_stale_transactions(connection, reassign_delay):
"""Get a cursor of stale transactions. """Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node, Transactions are considered stale if they have been assigned a node,
@ -87,11 +74,12 @@ class RethinkDBBackend(Query):
A cursor of transactions. A cursor of transactions.
""" """
return self.connection.run( return connection.run(
r.table('backlog') r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay)) .filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay))
def get_transaction_from_block(self, transaction_id, block_id):
def get_transaction_from_block(connection, transaction_id, block_id):
"""Get a transaction from a specific block. """Get a transaction from a specific block.
Args: Args:
@ -101,14 +89,15 @@ class RethinkDBBackend(Query):
Returns: Returns:
The matching transaction. The matching transaction.
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get(block_id) .get(block_id)
.get_field('block') .get_field('block')
.get_field('transactions') .get_field('transactions')
.filter(lambda tx: tx['id'] == transaction_id))[0] .filter(lambda tx: tx['id'] == transaction_id))[0]
def get_transaction_from_backlog(self, transaction_id):
def get_transaction_from_backlog(connection, transaction_id):
"""Get a transaction from backlog. """Get a transaction from backlog.
Args: Args:
@ -117,13 +106,14 @@ class RethinkDBBackend(Query):
Returns: Returns:
The matching transaction. The matching transaction.
""" """
return self.connection.run( return connection.run(
r.table('backlog') r.table('backlog')
.get(transaction_id) .get(transaction_id)
.without('assignee', 'assignment_timestamp') .without('assignee', 'assignment_timestamp')
.default(None)) .default(None))
def get_blocks_status_from_transaction(self, transaction_id):
def get_blocks_status_from_transaction(connection, transaction_id):
"""Retrieve block election information given a secondary index and value """Retrieve block election information given a secondary index and value
Args: Args:
@ -134,12 +124,13 @@ class RethinkDBBackend(Query):
:obj:`list` of :obj:`dict`: A list of blocks with with only election information :obj:`list` of :obj:`dict`: A list of blocks with with only election information
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get_all(transaction_id, index='transaction_id') .get_all(transaction_id, index='transaction_id')
.pluck('votes', 'id', {'block': ['voters']})) .pluck('votes', 'id', {'block': ['voters']}))
def get_txids_by_metadata_id(self, metadata_id):
def get_txids_by_metadata_id(connection, metadata_id):
"""Retrieves transaction ids related to a particular metadata. """Retrieves transaction ids related to a particular metadata.
When creating a transaction one of the optional arguments is the When creating a transaction one of the optional arguments is the
@ -156,8 +147,8 @@ class RethinkDBBackend(Query):
A list of transaction ids containing that metadata. If no A list of transaction ids containing that metadata. If no
transaction exists with that metadata it returns an empty list `[]` transaction exists with that metadata it returns an empty list `[]`
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get_all(metadata_id, index='metadata_id') .get_all(metadata_id, index='metadata_id')
.concat_map(lambda block: block['block']['transactions']) .concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: .filter(lambda transaction:
@ -165,7 +156,8 @@ class RethinkDBBackend(Query):
metadata_id) metadata_id)
.get_field('id')) .get_field('id'))
def get_txids_by_asset_id(self, asset_id):
def get_txids_by_asset_id(connection, asset_id):
"""Retrieves transactions ids related to a particular asset. """Retrieves transactions ids related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us A digital asset in bigchaindb is identified by an uuid. This allows us
@ -182,14 +174,15 @@ class RethinkDBBackend(Query):
# here we only want to return the transaction ids since later on when # here we only want to return the transaction ids since later on when
# we are going to retrieve the transaction with status validation # we are going to retrieve the transaction with status validation
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain')
.get_all(asset_id, index='asset_id') .get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions']) .concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id) .filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id)
.get_field('id')) .get_field('id'))
def get_asset_by_id(self, asset_id):
def get_asset_by_id(connection, asset_id):
"""Returns the asset associated with an asset_id. """Returns the asset associated with an asset_id.
Args: Args:
@ -198,8 +191,8 @@ class RethinkDBBackend(Query):
Returns: Returns:
Returns a rethinkdb cursor. Returns a rethinkdb cursor.
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get_all(asset_id, index='asset_id') .get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions']) .concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: .filter(lambda transaction:
@ -208,7 +201,8 @@ class RethinkDBBackend(Query):
transaction['transaction']['operation'] == 'CREATE') transaction['transaction']['operation'] == 'CREATE')
.pluck({'transaction': 'asset'})) .pluck({'transaction': 'asset'}))
def get_spent(self, transaction_id, condition_id):
def get_spent(connection, transaction_id, condition_id):
"""Check if a `txid` was already used as an input. """Check if a `txid` was already used as an input.
A transaction can be used as an input for another transaction. Bigchain needs to make sure that a A transaction can be used as an input for another transaction. Bigchain needs to make sure that a
@ -223,13 +217,14 @@ class RethinkDBBackend(Query):
""" """
# TODO: use index! # TODO: use index!
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.concat_map(lambda doc: doc['block']['transactions']) .concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments'].contains( .filter(lambda transaction: transaction['transaction']['fulfillments'].contains(
lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id}))) lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id})))
def get_owned_ids(self, owner):
def get_owned_ids(connection, owner):
"""Retrieve a list of `txids` that can we used has inputs. """Retrieve a list of `txids` that can we used has inputs.
Args: Args:
@ -240,13 +235,14 @@ class RethinkDBBackend(Query):
""" """
# TODO: use index! # TODO: use index!
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.concat_map(lambda doc: doc['block']['transactions']) .concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions'].contains( .filter(lambda tx: tx['transaction']['conditions'].contains(
lambda c: c['owners_after'].contains(owner)))) lambda c: c['owners_after'].contains(owner))))
def get_votes_by_block_id(self, block_id):
def get_votes_by_block_id(connection, block_id):
"""Get all the votes casted for a specific block. """Get all the votes casted for a specific block.
Args: Args:
@ -255,11 +251,12 @@ class RethinkDBBackend(Query):
Returns: Returns:
A cursor for the matching votes. A cursor for the matching votes.
""" """
return self.connection.run( return connection.run(
r.table('votes', read_mode=self.read_mode) r.table('votes', read_mode=READ_MODE)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter')) .between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter'))
def get_votes_by_block_id_and_voter(self, block_id, node_pubkey):
def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
"""Get all the votes casted for a specific block by a specific voter. """Get all the votes casted for a specific block by a specific voter.
Args: Args:
@ -269,11 +266,12 @@ class RethinkDBBackend(Query):
Returns: Returns:
A cursor for the matching votes. A cursor for the matching votes.
""" """
return self.connection.run( return connection.run(
r.table('votes', read_mode=self.read_mode) r.table('votes')
.get_all([block_id, node_pubkey], index='block_and_voter')) .get_all([block_id, node_pubkey], index='block_and_voter'))
def write_block(self, block, durability='soft'):
def write_block(connection, block):
"""Write a block to the bigchain table. """Write a block to the bigchain table.
Args: Args:
@ -282,11 +280,12 @@ class RethinkDBBackend(Query):
Returns: Returns:
The database response. The database response.
""" """
return self.connection.run( return connection.run(
r.table('bigchain') r.table('bigchain')
.insert(r.json(block), durability=durability)) .insert(r.json(block), durability=WRITE_DURABILITY))
def get_block(self, block_id):
def get_block(connection, block_id):
"""Get a block from the bigchain table """Get a block from the bigchain table
Args: Args:
@ -295,9 +294,10 @@ class RethinkDBBackend(Query):
Returns: Returns:
block (dict): the block or `None` block (dict): the block or `None`
""" """
return self.connection.run(r.table('bigchain').get(block_id)) return connection.run(r.table('bigchain').get(block_id))
def has_transaction(self, transaction_id):
def has_transaction(connection, transaction_id):
"""Check if a transaction exists in the bigchain table. """Check if a transaction exists in the bigchain table.
Args: Args:
@ -306,33 +306,36 @@ class RethinkDBBackend(Query):
Returns: Returns:
``True`` if the transaction exists, ``False`` otherwise. ``True`` if the transaction exists, ``False`` otherwise.
""" """
return bool(self.connection.run( return bool(connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get_all(transaction_id, index='transaction_id').count())) .get_all(transaction_id, index='transaction_id').count()))
def count_blocks(self):
def count_blocks(connection):
"""Count the number of blocks in the bigchain table. """Count the number of blocks in the bigchain table.
Returns: Returns:
The number of blocks. The number of blocks.
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.count()) .count())
def count_backlog(self):
def count_backlog(connection):
"""Count the number of transactions in the backlog table. """Count the number of transactions in the backlog table.
Returns: Returns:
The number of transactions in the backlog. The number of transactions in the backlog.
""" """
return self.connection.run( return connection.run(
r.table('backlog', read_mode=self.read_mode) r.table('backlog', read_mode=READ_MODE)
.count()) .count())
def write_vote(self, vote):
def write_vote(connection, vote):
"""Write a vote to the votes table. """Write a vote to the votes table.
Args: Args:
@ -341,22 +344,24 @@ class RethinkDBBackend(Query):
Returns: Returns:
The database response. The database response.
""" """
return self.connection.run( return connection.run(
r.table('votes') r.table('votes')
.insert(vote)) .insert(vote))
def get_genesis_block(self):
def get_genesis_block(connection):
"""Get the genesis block """Get the genesis block
Returns: Returns:
The genesis block The genesis block
""" """
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.filter(util.is_genesis_block) .filter(util.is_genesis_block)
.nth(0)) .nth(0))
def get_last_voted_block(self, node_pubkey):
def get_last_voted_block(connection, node_pubkey):
"""Get the last voted block for a specific node. """Get the last voted block for a specific node.
Args: Args:
@ -368,19 +373,19 @@ class RethinkDBBackend(Query):
""" """
try: try:
# get the latest value for the vote timestamp (over all votes) # get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run( max_timestamp = connection.run(
r.table('votes', read_mode=self.read_mode) r.table('votes', read_mode=READ_MODE)
.filter(r.row['node_pubkey'] == node_pubkey) .filter(r.row['node_pubkey'] == node_pubkey)
.max(r.row['vote']['timestamp']))['vote']['timestamp'] .max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run( last_voted = list(connection.run(
r.table('votes', read_mode=self.read_mode) r.table('votes', read_mode=READ_MODE)
.filter(r.row['vote']['timestamp'] == max_timestamp) .filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == node_pubkey))) .filter(r.row['node_pubkey'] == node_pubkey)))
except r.ReqlNonExistenceError: except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block # return last vote if last vote exists else return Genesis block
return self.get_genesis_block() return get_genesis_block()
# Now the fun starts. Since the resolution of timestamp is a second, # Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case # we might have more than one vote per timestamp. If this is the case
@ -412,11 +417,12 @@ class RethinkDBBackend(Query):
except KeyError: except KeyError:
break break
return self.connection.run( return connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.get(last_block_id)) .get(last_block_id))
def get_unvoted_blocks(self, node_pubkey):
def get_unvoted_blocks(connection, node_pubkey):
"""Return all the blocks that have not been voted by the specified node. """Return all the blocks that have not been voted by the specified node.
Args: Args:
@ -426,9 +432,9 @@ class RethinkDBBackend(Query):
:obj:`list` of :obj:`dict`: a list of unvoted blocks :obj:`list` of :obj:`dict`: a list of unvoted blocks
""" """
unvoted = self.connection.run( unvoted = connection.run(
r.table('bigchain', read_mode=self.read_mode) r.table('bigchain', read_mode=READ_MODE)
.filter(lambda block: r.table('votes', read_mode=self.read_mode) .filter(lambda block: r.table('votes', read_mode=READ_MODE)
.get_all([block['id'], node_pubkey], index='block_and_voter') .get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty()) .is_empty())
.order_by(r.asc(r.row['block']['timestamp']))) .order_by(r.asc(r.row['block']['timestamp'])))

View File

@ -2,7 +2,6 @@
import logging import logging
from bigchaindb.db import Schema
from bigchaindb.common import exceptions from bigchaindb.common import exceptions
import rethinkdb as r import rethinkdb as r
@ -10,94 +9,94 @@ import rethinkdb as r
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RethinkDBSchema(Schema): def create_database(connection, name):
if connection.run(r.db_list().contains(name)):
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(name))
def __init__(self, connection, name): logger.info('Create database `%s`.', name)
self.connection = connection connection.run(r.db_create(name))
self.name = name
def create_database(self):
if self.connection.run(r.db_list().contains(self.name)):
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'.format(self.name))
logger.info('Create database `%s`.', self.name) def create_tables(connection, name):
self.connection.run(r.db_create(self.name))
def create_tables(self):
for table_name in ['bigchain', 'backlog', 'votes']: for table_name in ['bigchain', 'backlog', 'votes']:
logger.info('Create `%s` table.', table_name) logger.info('Create `%s` table.', table_name)
self.connection.run(r.db(self.name).table_create(table_name)) connection.run(r.db(name).table_create(table_name))
def create_indexes(self):
self.create_bigchain_secondary_index()
def drop_database(self): def create_indexes(connection, name):
create_bigchain_secondary_index(connection, name)
def drop_database(connection, name):
try: try:
logger.info('Drop database `%s`', self.name) logger.info('Drop database `%s`', name)
self.connection.run(r.db_drop(self.name)) connection.run(r.db_drop(name))
logger.info('Done.') logger.info('Done.')
except r.ReqlOpFailedError: except r.ReqlOpFailedError:
raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(self.name)) raise exceptions.DatabaseDoesNotExist('Database `{}` does not exist'.format(name))
def create_bigchain_secondary_index(self):
def create_bigchain_secondary_index(connection, name):
logger.info('Create `bigchain` secondary index.') logger.info('Create `bigchain` secondary index.')
# to order blocks by timestamp # to order blocks by timestamp
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('bigchain') .table('bigchain')
.index_create('block_timestamp', r.row['block']['timestamp'])) .index_create('block_timestamp', r.row['block']['timestamp']))
# to query the bigchain for a transaction id # to query the bigchain for a transaction id
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('bigchain') .table('bigchain')
.index_create('transaction_id', r.row['block']['transactions']['id'], multi=True)) .index_create('transaction_id', r.row['block']['transactions']['id'], multi=True))
# secondary index for payload data by UUID # secondary index for payload data by UUID
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('bigchain') .table('bigchain')
.index_create('metadata_id', r.row['block']['transactions']['transaction']['metadata']['id'], multi=True)) .index_create('metadata_id', r.row['block']['transactions']['transaction']['metadata']['id'], multi=True))
# secondary index for asset uuid # secondary index for asset uuid
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('bigchain') .table('bigchain')
.index_create('asset_id', r.row['block']['transactions']['transaction']['asset']['id'], multi=True)) .index_create('asset_id', r.row['block']['transactions']['transaction']['asset']['id'], multi=True))
# wait for rethinkdb to finish creating secondary indexes # wait for rethinkdb to finish creating secondary indexes
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('bigchain') .table('bigchain')
.index_wait()) .index_wait())
def create_backlog_secondary_index(self):
def create_backlog_secondary_index(connection, name):
logger.info('Create `backlog` secondary index.') logger.info('Create `backlog` secondary index.')
# compound index to read transactions from the backlog per assignee # compound index to read transactions from the backlog per assignee
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('backlog') .table('backlog')
.index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']])) .index_create('assignee__transaction_timestamp', [r.row['assignee'], r.row['assignment_timestamp']]))
# wait for rethinkdb to finish creating secondary indexes # wait for rethinkdb to finish creating secondary indexes
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('backlog') .table('backlog')
.index_wait()) .index_wait())
def create_votes_secondary_index(self):
def create_votes_secondary_index(connection, name):
logger.info('Create `votes` secondary index.') logger.info('Create `votes` secondary index.')
# compound index to order votes by block id and node # compound index to order votes by block id and node
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('votes')\ .table('votes')
.index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']])) .index_create('block_and_voter', [r.row['vote']['voting_for_block'], r.row['node_pubkey']]))
# wait for rethinkdb to finish creating secondary indexes # wait for rethinkdb to finish creating secondary indexes
self.connection.run( connection.run(
r.db(self.name) r.db(name)
.table('votes') .table('votes')
.index_wait()) .index_wait())

View File

@ -1,13 +1,23 @@
class Schema: """Schema-providing interfaces for backend databases"""
def create_database(self): from functools import singledispatch
@singledispatch
def create_database(connection, name):
raise NotImplementedError() raise NotImplementedError()
def create_tables(self):
@singledispatch
def create_tables(connection, name):
raise NotImplementedError() raise NotImplementedError()
def create_indexes(self):
@singledispatch
def create_indexes(connection, name):
raise NotImplementedError() raise NotImplementedError()
def drop_database(self):
@singledispatch
def drop_database(connection, name):
raise NotImplementedError() raise NotImplementedError()