Merge branch 'isolate-database-calls'

This commit is contained in:
vrde 2016-10-31 16:33:19 +01:00
commit ac6249b42f
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
7 changed files with 445 additions and 182 deletions

View File

@ -12,7 +12,7 @@ import rethinkdb as r
import bigchaindb import bigchaindb
from bigchaindb.db.utils import Connection from bigchaindb.db.utils import Connection, get_backend
from bigchaindb import config_utils, util from bigchaindb import config_utils, util
from bigchaindb.consensus import BaseConsensusRules from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Block, Transaction from bigchaindb.models import Block, Transaction
@ -33,7 +33,7 @@ class Bigchain(object):
# return if transaction is in backlog # return if transaction is in backlog
TX_IN_BACKLOG = 'backlog' TX_IN_BACKLOG = 'backlog'
def __init__(self, host=None, port=None, dbname=None, def __init__(self, host=None, port=None, dbname=None, backend=None,
public_key=None, private_key=None, keyring=[], public_key=None, private_key=None, keyring=[],
backlog_reassign_delay=None): backlog_reassign_delay=None):
"""Initialize the Bigchain instance """Initialize the Bigchain instance
@ -51,6 +51,8 @@ class Bigchain(object):
host (str): hostname where RethinkDB is running. host (str): hostname where RethinkDB is running.
port (int): port in which RethinkDB is running (usually 28015). port (int): port in which RethinkDB is running (usually 28015).
dbname (str): the name of the database to connect to (usually bigchain). dbname (str): the name of the database to connect to (usually bigchain).
backend (:class:`~bigchaindb.db.backends.rethinkdb.RehinkDBBackend`):
the database backend to use.
public_key (str): the base58 encoded public key for the ED25519 curve. public_key (str): the base58 encoded public key for the ED25519 curve.
private_key (str): the base58 encoded private key for the ED25519 curve. private_key (str): the base58 encoded private key for the ED25519 curve.
keyring (list[str]): list of base58 encoded public keys of the federation nodes. keyring (list[str]): list of base58 encoded public keys of the federation nodes.
@ -60,6 +62,7 @@ class Bigchain(object):
self.host = host or bigchaindb.config['database']['host'] self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port'] self.port = port or bigchaindb.config['database']['port']
self.dbname = dbname or bigchaindb.config['database']['name'] self.dbname = dbname or bigchaindb.config['database']['name']
self.backend = backend or get_backend(host, port, dbname)
self.me = public_key or bigchaindb.config['keypair']['public'] self.me = public_key or bigchaindb.config['keypair']['public']
self.me_private = private_key or bigchaindb.config['keypair']['private'] self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring'] self.nodes_except_me = keyring or bigchaindb.config['keyring']
@ -102,12 +105,9 @@ class Bigchain(object):
signed_transaction.update({'assignment_timestamp': time()}) signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog # write to the backlog
response = self.connection.run( return self.backend.write_transaction(signed_transaction)
r.table('backlog')
.insert(signed_transaction, durability=durability))
return response
def reassign_transaction(self, transaction, durability='hard'): def reassign_transaction(self, transaction):
"""Assign a transaction to a new node """Assign a transaction to a new node
Args: Args:
@ -131,23 +131,30 @@ class Bigchain(object):
# There is no other node to assign to # There is no other node to assign to
new_assignee = self.me new_assignee = self.me
response = self.connection.run( return self.backend.update_transaction(
r.table('backlog') transaction['id'],
.get(transaction['id']) {'assignee': new_assignee, 'assignment_timestamp': time()})
.update({'assignee': new_assignee, 'assignment_timestamp': time()},
durability=durability)) def delete_transaction(self, *transaction_id):
return response """Delete a transaction from the backlog.
Args:
*transaction_id (str): the transaction(s) to delete
Returns:
The database response.
"""
return self.backend.delete_transaction(*transaction_id)
def get_stale_transactions(self): def get_stale_transactions(self):
"""Get a RethinkDB cursor of stale transactions """Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node, but are still in the Transactions are considered stale if they have been assigned a node, but are still in the
backlog after some amount of time specified in the configuration backlog after some amount of time specified in the configuration
""" """
return self.connection.run( return self.backend.get_stale_transactions(self.backlog_reassign_delay)
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > self.backlog_reassign_delay))
def validate_transaction(self, transaction): def validate_transaction(self, transaction):
"""Validate a transaction. """Validate a transaction.
@ -224,19 +231,12 @@ class Bigchain(object):
break break
# Query the transaction in the target block and return # Query the transaction in the target block and return
response = self.connection.run( response = self.backend.get_transaction_from_block(txid, target_block_id)
r.table('bigchain', read_mode=self.read_mode)
.get(target_block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == txid))[0]
else: else:
# Otherwise, check the backlog # Otherwise, check the backlog
response = self.connection.run(r.table('backlog') response = self.backend.get_transaction_from_backlog(txid)
.get(txid)
.without('assignee', 'assignment_timestamp')
.default(None))
if response: if response:
tx_status = self.TX_IN_BACKLOG tx_status = self.TX_IN_BACKLOG
@ -262,24 +262,6 @@ class Bigchain(object):
_, status = self.get_transaction(txid, include_status=True) _, status = self.get_transaction(txid, include_status=True)
return status return status
def search_block_election_on_index(self, value, index):
"""Retrieve block election information given a secondary index and value
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
# First, get information on all blocks which contain this transaction
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(value, index=index)
.pluck('votes', 'id', {'block': ['voters']}))
return list(response)
def get_blocks_status_containing_tx(self, txid): def get_blocks_status_containing_tx(self, txid):
"""Retrieve block ids and statuses related to a transaction """Retrieve block ids and statuses related to a transaction
@ -294,7 +276,7 @@ class Bigchain(object):
""" """
# First, get information on all blocks which contain this transaction # First, get information on all blocks which contain this transaction
blocks = self.search_block_election_on_index(txid, 'transaction_id') blocks = self.backend.get_blocks_status_from_transaction(txid)
if blocks: if blocks:
# Determine the election status of each block # Determine the election status of each block
validity = { validity = {
@ -336,14 +318,8 @@ class Bigchain(object):
A list of transactions containing that metadata. If no transaction exists with that metadata it A list of transactions containing that metadata. If no transaction exists with that metadata it
returns an empty list `[]` returns an empty list `[]`
""" """
cursor = self.connection.run( cursor = self.backend.get_transactions_by_metadata_id(metadata_id)
r.table('bigchain', read_mode=self.read_mode) return [Transaction.from_dict(tx) for tx in cursor]
.get_all(metadata_id, index='metadata_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id))
transactions = list(cursor)
return [Transaction.from_dict(tx) for tx in transactions]
def get_txs_by_asset_id(self, asset_id): def get_txs_by_asset_id(self, asset_id):
"""Retrieves transactions related to a particular asset. """Retrieves transactions related to a particular asset.
@ -358,12 +334,8 @@ class Bigchain(object):
A list of transactions containing related to the asset. If no transaction exists for that asset it A list of transactions containing related to the asset. If no transaction exists for that asset it
returns an empty list `[]` returns an empty list `[]`
""" """
cursor = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id))
cursor = self.backend.get_transactions_by_asset_id(asset_id)
return [Transaction.from_dict(tx) for tx in cursor] return [Transaction.from_dict(tx) for tx in cursor]
def get_spent(self, txid, cid): def get_spent(self, txid, cid):
@ -382,13 +354,7 @@ class Bigchain(object):
""" """
# checks if an input was already spent # checks if an input was already spent
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
response = self.connection.run( transactions = list(self.backend.get_spent(txid, cid))
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments']
.contains(lambda fulfillment: fulfillment['input'] == {'txid': txid, 'cid': cid})))
transactions = list(response)
# a transaction_id should have been spent at most one time # a transaction_id should have been spent at most one time
if transactions: if transactions:
@ -423,12 +389,7 @@ class Bigchain(object):
""" """
# get all transactions in which owner is in the `owners_after` list # get all transactions in which owner is in the `owners_after` list
response = self.connection.run( response = self.backend.get_owned_ids(owner)
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions']
.contains(lambda c: c['owners_after']
.contains(owner))))
owned = [] owned = []
for tx in response: for tx in response:
@ -513,9 +474,7 @@ class Bigchain(object):
but the vote is invalid. but the vote is invalid.
""" """
votes = list(self.connection.run( votes = list(self.backend.get_votes_by_block_id_and_voter(block_id, self.me))
r.table('votes', read_mode=self.read_mode)
.get_all([block_id, self.me], index='block_and_voter')))
if len(votes) > 1: if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
@ -537,15 +496,10 @@ class Bigchain(object):
block (Block): block to write to bigchain. block (Block): block to write to bigchain.
""" """
self.connection.run( return self.backend.write_block(block.to_str(), durability=durability)
r.table('bigchain')
.insert(r.json(block.to_str()), durability=durability))
def transaction_exists(self, transaction_id): def transaction_exists(self, transaction_id):
response = self.connection.run( return self.backend.has_transaction(transaction_id)
r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id'))
return len(response.items) > 0
def prepare_genesis_block(self): def prepare_genesis_block(self):
"""Prepare a genesis block.""" """Prepare a genesis block."""
@ -574,9 +528,7 @@ class Bigchain(object):
# 2. create the block with one transaction # 2. create the block with one transaction
# 3. write the block to the bigchain # 3. write the block to the bigchain
blocks_count = self.connection.run( blocks_count = self.backend.count_blocks()
r.table('bigchain', read_mode=self.read_mode)
.count())
if blocks_count: if blocks_count:
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
@ -621,69 +573,12 @@ class Bigchain(object):
def write_vote(self, vote): def write_vote(self, vote):
"""Write the vote to the database.""" """Write the vote to the database."""
return self.backend.write_vote(vote)
self.connection.run(
r.table('votes')
.insert(vote))
def get_last_voted_block(self): def get_last_voted_block(self):
"""Returns the last block that this node voted on.""" """Returns the last block that this node voted on."""
try: return Block.from_dict(self.backend.get_last_voted_block(self.me))
# get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == self.me)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == self.me)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
res = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block))
block = list(res)[0]
return Block.from_dict(block)
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
res = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
return Block.from_dict(res)
def get_unvoted_blocks(self): def get_unvoted_blocks(self):
"""Return all the blocks that have not been voted on by this node. """Return all the blocks that have not been voted on by this node.
@ -692,26 +587,13 @@ class Bigchain(object):
:obj:`list` of :obj:`dict`: a list of unvoted blocks :obj:`list` of :obj:`dict`: a list of unvoted blocks
""" """
unvoted = self.connection.run( # XXX: should this return instaces of Block?
r.table('bigchain', read_mode=self.read_mode) return self.backend.get_unvoted_blocks(self.me)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], self.me], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks
def block_election_status(self, block_id, voters): def block_election_status(self, block_id, voters):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided.""" """Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = self.connection.run(r.table('votes', read_mode=self.read_mode) votes = list(self.backend.get_votes_by_block_id(block_id))
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter'))
votes = list(votes)
n_voters = len(voters) n_voters = len(voters)
voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes]) voter_counts = collections.Counter([vote['node_pubkey'] for vote in votes])

View File

View File

@ -0,0 +1,381 @@
"""Backend implementation for RethinkDB.
This module contains all the methods to store and retrieve data from RethinkDB.
"""
from time import time
import rethinkdb as r
from bigchaindb import util
from bigchaindb.db.utils import Connection
from bigchaindb.common import exceptions
class RethinkDBBackend:
def __init__(self, host=None, port=None, db=None):
"""Initialize a new RethinkDB Backend instance.
Args:
host (str): the host to connect to.
port (int): the port to connect to.
db (str): the name of the database to use.
"""
self.read_mode = 'majority'
self.durability = 'soft'
self.connection = Connection(host=host, port=port, db=db)
def write_transaction(self, signed_transaction):
"""Write a transaction to the backlog table.
Args:
signed_transaction (dict): a signed transaction.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.insert(signed_transaction, durability=self.durability))
def update_transaction(self, transaction_id, doc):
"""Update a transaction in the backlog table.
Args:
transaction_id (str): the id of the transaction.
doc (dict): the values to update.
Returns:
The result of the operation.
"""
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.update(doc))
def delete_transaction(self, *transaction_id):
"""Delete a transaction from the backlog.
Args:
*transaction_id (str): the transaction(s) to delete
Returns:
The database response.
"""
return self.connection.run(
r.table('backlog')
.get_all(*transaction_id)
.delete(durability='hard'))
def get_stale_transactions(self, reassign_delay):
"""Get a cursor of stale transactions.
Transactions are considered stale if they have been assigned a node,
but are still in the backlog after some amount of time specified in the
configuration.
Args:
reassign_delay (int): threshold (in seconds) to mark a transaction stale.
Returns:
A cursor of transactions.
"""
return self.connection.run(
r.table('backlog')
.filter(lambda tx: time() - tx['assignment_timestamp'] > reassign_delay))
def get_transaction_from_block(self, transaction_id, block_id):
"""Get a transaction from a specific block.
Args:
transaction_id (str): the id of the transaction.
block_id (str): the id of the block.
Returns:
The matching transaction.
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == transaction_id))[0]
def get_transaction_from_backlog(self, transaction_id):
"""Get a transaction from backlog.
Args:
transaction_id (str): the id of the transaction.
Returns:
The matching transaction.
"""
return self.connection.run(
r.table('backlog')
.get(transaction_id)
.without('assignee', 'assignment_timestamp')
.default(None))
def get_blocks_status_from_transaction(self, transaction_id):
"""Retrieve block election information given a secondary index and value
Args:
value: a value to search (e.g. transaction id string, payload hash string)
index (str): name of a secondary index, e.g. 'transaction_id'
Returns:
:obj:`list` of :obj:`dict`: A list of blocks with with only election information
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id')
.pluck('votes', 'id', {'block': ['voters']}))
def get_transactions_by_metadata_id(self, metadata_id):
"""Retrieves transactions related to a metadata.
When creating a transaction one of the optional arguments is the `metadata`. The metadata is a generic
dict that contains extra information that can be appended to the transaction.
To make it easy to query the bigchain for that particular metadata we create a UUID for the metadata and
store it with the transaction.
Args:
metadata_id (str): the id for this particular metadata.
Returns:
A list of transactions containing that metadata. If no transaction exists with that metadata it
returns an empty list `[]`
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(metadata_id, index='metadata_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['metadata']['id'] == metadata_id))
def get_transactions_by_asset_id(self, asset_id):
"""Retrieves transactions related to a particular asset.
A digital asset in bigchaindb is identified by an uuid. This allows us to query all the transactions
related to a particular digital asset, knowing the id.
Args:
asset_id (str): the id for this particular metadata.
Returns:
A list of transactions containing related to the asset. If no transaction exists for that asset it
returns an empty list `[]`
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(asset_id, index='asset_id')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['asset']['id'] == asset_id))
def get_spent(self, transaction_id, condition_id):
"""Check if a `txid` was already used as an input.
A transaction can be used as an input for another transaction. Bigchain needs to make sure that a
given `txid` is only used once.
Args:
transaction_id (str): The id of the transaction.
condition_id (int): The index of the condition in the respective transaction.
Returns:
The transaction that used the `txid` as an input else `None`
"""
# TODO: use index!
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments'].contains(
lambda fulfillment: fulfillment['input'] == {'txid': transaction_id, 'cid': condition_id})))
def get_owned_ids(self, owner):
"""Retrieve a list of `txids` that can we used has inputs.
Args:
owner (str): base58 encoded public key.
Returns:
A cursor for the matching transactions.
"""
# TODO: use index!
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions'].contains(
lambda c: c['owners_after'].contains(owner))))
def get_votes_by_block_id(self, block_id):
"""Get all the votes casted for a specific block.
Args:
block_id (str): the block id to use.
Returns:
A cursor for the matching votes.
"""
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.between([block_id, r.minval], [block_id, r.maxval], index='block_and_voter'))
def get_votes_by_block_id_and_voter(self, block_id, node_pubkey):
"""Get all the votes casted for a specific block by a specific voter.
Args:
block_id (str): the block id to use.
node_pubkey (str): base58 encoded public key
Returns:
A cursor for the matching votes.
"""
return self.connection.run(
r.table('votes', read_mode=self.read_mode)
.get_all([block_id, node_pubkey], index='block_and_voter'))
def write_block(self, block, durability='soft'):
"""Write a block to the bigchain table.
Args:
block (dict): the block to write.
Returns:
The database response.
"""
return self.connection.run(
r.table('bigchain')
.insert(r.json(block), durability=durability))
def has_transaction(self, transaction_id):
"""Check if a transaction exists in the bigchain table.
Args:
transaction_id (str): the id of the transaction to check.
Returns:
``True`` if the transaction exists, ``False`` otherwise.
"""
return bool(self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(transaction_id, index='transaction_id').count()))
def count_blocks(self):
"""Count the number of blocks in the bigchain table.
Returns:
The number of blocks.
"""
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
def write_vote(self, vote):
"""Write a vote to the votes table.
Args:
vote (dict): the vote to write.
Returns:
The database response.
"""
return self.connection.run(
r.table('votes')
.insert(vote))
def get_last_voted_block(self, node_pubkey):
"""Get the last voted block for a specific node.
Args:
node_pubkey (str): base58 encoded public key.
Returns:
The last block the node has voted on. If the node didn't cast
any vote then the genesis block is returned.
"""
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == node_pubkey)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == node_pubkey)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)
.nth(0))
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
# then we need to rebuild the chain for the blocks that have been retrieved
# to get the last one.
# Given a block_id, mapping returns the id of the block pointing at it.
mapping = {v['vote']['previous_block']: v['vote']['voting_for_block']
for v in last_voted}
# Since we follow the chain backwards, we can start from a random
# point of the chain and "move up" from it.
last_block_id = list(mapping.values())[0]
# We must be sure to break the infinite loop. This happens when:
# - the block we are currenty iterating is the one we are looking for.
# This will trigger a KeyError, breaking the loop
# - we are visiting again a node we already explored, hence there is
# a loop. This might happen if a vote points both `previous_block`
# and `voting_for_block` to the same `block_id`
explored = set()
while True:
try:
if last_block_id in explored:
raise exceptions.CyclicBlockchainError()
explored.add(last_block_id)
last_block_id = mapping[last_block_id]
except KeyError:
break
return self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
def get_unvoted_blocks(self, node_pubkey):
"""Return all the blocks that have not been voted by the specified node.
Args:
node_pubkey (str): base58 encoded public key
Returns:
:obj:`list` of :obj:`dict`: a list of unvoted blocks
"""
unvoted = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], node_pubkey], index='block_and_voter')
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
return unvoted_blocks

View File

@ -67,6 +67,18 @@ class Connection:
time.sleep(2**i) time.sleep(2**i)
def get_backend(host=None, port=None, db=None):
'''Get a backend instance.'''
from bigchaindb.db.backends import rethinkdb
# NOTE: this function will be re-implemented when we have real
# multiple backends to support. Right now it returns the RethinkDB one.
return rethinkdb.RethinkDBBackend(host=host or bigchaindb.config['database']['host'],
port=port or bigchaindb.config['database']['port'],
db=db or bigchaindb.config['database']['name'])
def get_conn(): def get_conn():
'''Get the connection to the database.''' '''Get the connection to the database.'''

View File

@ -69,10 +69,7 @@ class BlockPipeline:
# if the tx is already in a valid or undecided block, # if the tx is already in a valid or undecided block,
# then it no longer should be in the backlog, or added # then it no longer should be in the backlog, or added
# to a new block. We can delete and drop it. # to a new block. We can delete and drop it.
self.bigchain.connection.run( self.bigchain.delete_transaction(tx.id)
r.table('backlog')
.get(tx.id)
.delete(durability='hard'))
return None return None
tx_validated = self.bigchain.is_valid_transaction(tx) tx_validated = self.bigchain.is_valid_transaction(tx)
@ -81,10 +78,7 @@ class BlockPipeline:
else: else:
# if the transaction is not valid, remove it from the # if the transaction is not valid, remove it from the
# backlog # backlog
self.bigchain.connection.run( self.bigchain.delete_transaction(tx.id)
r.table('backlog')
.get(tx.id)
.delete(durability='hard'))
return None return None
def create(self, tx, timeout=False): def create(self, tx, timeout=False):
@ -136,10 +130,7 @@ class BlockPipeline:
Returns: Returns:
:class:`~bigchaindb.models.Block`: The block. :class:`~bigchaindb.models.Block`: The block.
""" """
self.bigchain.connection.run( self.bigchain.delete_transaction(*[tx.id for tx in block.transactions])
r.table('backlog')
.get_all(*[tx.id for tx in block.transactions])
.delete(durability='hard'))
return block return block

View File

@ -97,8 +97,7 @@ def test_duplicate_transaction(b, user_vk):
# verify tx is in the backlog # verify tx is in the backlog
assert b.connection.run(r.table('backlog').get(txs[0].id)) is not None assert b.connection.run(r.table('backlog').get(txs[0].id)) is not None
# try to validate a transaction that's already in the chain; should not # try to validate a transaction that's already in the chain; should not work
# work
assert block_maker.validate_tx(txs[0].to_dict()) is None assert block_maker.validate_tx(txs[0].to_dict()) is None
# duplicate tx should be removed from backlog # duplicate tx should be removed from backlog

View File

@ -62,14 +62,13 @@ def test_bigchain_class_initialization_with_parameters(config):
def test_get_blocks_status_containing_tx(monkeypatch): def test_get_blocks_status_containing_tx(monkeypatch):
from bigchaindb.db.backends.rethinkdb import RethinkDBBackend
from bigchaindb.core import Bigchain from bigchaindb.core import Bigchain
blocks = [ blocks = [
{'id': 1}, {'id': 2} {'id': 1}, {'id': 2}
] ]
monkeypatch.setattr( monkeypatch.setattr(RethinkDBBackend, 'get_blocks_status_from_transaction', lambda x: blocks)
Bigchain, 'search_block_election_on_index', lambda x, y: blocks) monkeypatch.setattr(Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID)
monkeypatch.setattr(
Bigchain, 'block_election_status', lambda x, y, z: Bigchain.BLOCK_VALID)
bigchain = Bigchain(public_key='pubkey', private_key='privkey') bigchain = Bigchain(public_key='pubkey', private_key='privkey')
with pytest.raises(Exception): with pytest.raises(Exception):
bigchain.get_blocks_status_containing_tx('txid') bigchain.get_blocks_status_containing_tx('txid')
@ -85,10 +84,9 @@ def test_has_previous_vote(monkeypatch):
bigchain.has_previous_vote(block) bigchain.has_previous_vote(block)
@pytest.mark.parametrize('items,exists', (((0,), True), ((), False))) @pytest.mark.parametrize('count,exists', ((1, True), (0, False)))
def test_transaction_exists(monkeypatch, items, exists): def test_transaction_exists(monkeypatch, count, exists):
from bigchaindb.core import Bigchain from bigchaindb.core import Bigchain
monkeypatch.setattr( monkeypatch.setattr(RqlQuery, 'run', lambda x, y: count)
RqlQuery, 'run', lambda x, y: namedtuple('response', 'items')(items))
bigchain = Bigchain(public_key='pubkey', private_key='privkey') bigchain = Bigchain(public_key='pubkey', private_key='privkey')
assert bigchain.transaction_exists('txid') is exists assert bigchain.transaction_exists('txid') is exists