Merge pull request #1411 from bigchaindb/fast-unspents

Fast unspents
This commit is contained in:
libscott 2017-05-23 13:21:33 +02:00 committed by GitHub
commit 488074fa09
10 changed files with 317 additions and 68 deletions

View File

@ -175,6 +175,25 @@ def get_spent(conn, transaction_id, output):
return (elem['block']['transactions'] for elem in cursor)
@register_query(MongoDBConnection)
def get_spending_transactions(conn, inputs):
cursor = conn.run(
conn.collection('bigchain').aggregate([
{'$match': {
'block.transactions.inputs.fulfills': {
'$in': inputs,
},
}},
{'$unwind': '$block.transactions'},
{'$match': {
'block.transactions.inputs.fulfills': {
'$in': inputs,
},
}},
]))
return ((b['id'], b['block']['transactions']) for b in cursor)
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner):
cursor = conn.run(
@ -183,9 +202,7 @@ def get_owned_ids(conn, owner):
{'$unwind': '$block.transactions'},
{'$match': {'block.transactions.outputs.public_keys': owner}}
]))
# we need to access some nested fields before returning so lets use a
# generator to avoid having to read all records on the cursor at this point
return (elem['block']['transactions'] for elem in cursor)
return ((b['id'], b['block']['transactions']) for b in cursor)
@register_query(MongoDBConnection)
@ -196,6 +213,15 @@ def get_votes_by_block_id(conn, block_id):
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_votes_for_blocks_by_voter(conn, block_ids, node_pubkey):
return conn.run(
conn.collection('votes')
.find({'vote.voting_for_block': {'$in': block_ids},
'node_pubkey': node_pubkey},
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.run(

View File

@ -140,6 +140,20 @@ def get_spent(connection, transaction_id, condition_id):
raise NotImplementedError
@singledispatch
def get_spending_transactions(connection, inputs):
"""Return transactions which spend given inputs
Args:
inputs (list): list of {txid, output}
Returns:
Iterator of (block_ids, transaction) for transactions that
spend given inputs.
"""
raise NotImplementedError
@singledispatch
def get_owned_ids(connection, owner):
"""Retrieve a list of `txids` that can we used has inputs.
@ -148,9 +162,9 @@ def get_owned_ids(connection, owner):
owner (str): base58 encoded public key.
Returns:
A cursor for the matching transactions.
Iterator of (block_id, transaction) for transactions
that list given owner in conditions.
"""
raise NotImplementedError
@ -183,6 +197,20 @@ def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey):
raise NotImplementedError
@singledispatch
def get_votes_for_blocks_by_voter(connection, block_ids, pubkey):
"""Return votes for many block_ids
Args:
block_ids (set): block_ids
pubkey (str): public key of voting node
Returns:
A cursor of votes matching given block_ids and public key
"""
raise NotImplementedError
@singledispatch
def write_block(connection, block):
"""Write a block to the bigchain table.

View File

@ -121,13 +121,14 @@ def get_spent(connection, transaction_id, output):
@register_query(RethinkDBConnection)
def get_owned_ids(connection, owner):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
query = (r.table('bigchain', read_mode=READ_MODE)
.get_all(owner, index='outputs')
.distinct()
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['outputs'].contains(
.concat_map(unwind_block_transactions)
.filter(lambda doc: doc['tx']['outputs'].contains(
lambda c: c['public_keys'].contains(owner))))
cursor = connection.run(query)
return ((b['id'], b['tx']) for b in cursor)
@register_query(RethinkDBConnection)
@ -253,3 +254,30 @@ def get_unvoted_blocks(connection, node_pubkey):
# database level. Solving issue #444 can help untangling the situation
unvoted_blocks = filter(lambda block: not utils.is_genesis_block(block), unvoted)
return unvoted_blocks
@register_query(RethinkDBConnection)
def get_votes_for_blocks_by_voter(connection, block_ids, node_pubkey):
return connection.run(
r.table('votes')
.filter(lambda row: r.expr(block_ids).contains(row['vote']['voting_for_block']))
.filter(lambda row: row['node_pubkey'] == node_pubkey))
def unwind_block_transactions(block):
""" Yield a block for each transaction in given block """
return block['block']['transactions'].map(lambda tx: block.merge({'tx': tx}))
@register_query(RethinkDBConnection)
def get_spending_transactions(connection, links):
query = (
r.table('bigchain')
.get_all(*[(l['txid'], l['output']) for l in links], index='inputs')
.concat_map(unwind_block_transactions)
# filter transactions spending output
.filter(lambda doc: r.expr(links).set_intersection(
doc['tx']['inputs'].map(lambda i: i['fulfills'])))
)
cursor = connection.run(query)
return ((b['id'], b['tx']) for b in cursor)

View File

@ -161,6 +161,9 @@ class TransactionLink(object):
# TODO: If `other !== TransactionLink` return `False`
return self.to_dict() == other.to_dict()
def __hash__(self):
return hash((self.txid, self.output))
@classmethod
def from_dict(cls, link):
"""Transforms a Python dictionary to a TransactionLink object.

View File

@ -4,11 +4,10 @@ from time import time
from bigchaindb import exceptions as core_exceptions
from bigchaindb.common import crypto, exceptions
from bigchaindb.common.utils import gen_timestamp, serialize
from bigchaindb.common.transaction import TransactionLink
import bigchaindb
from bigchaindb import backend, config_utils, utils
from bigchaindb import backend, config_utils, fastquery
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Block, Transaction
@ -376,51 +375,6 @@ class Bigchain(object):
# Either no transaction was returned spending the `(txid, output)` as
# input or the returned transactions are not valid.
def get_outputs(self, owner):
"""Retrieve a list of links to transaction outputs for a given public
key.
Args:
owner (str): base58 encoded public key.
Returns:
:obj:`list` of TransactionLink: list of ``txid`` s and ``output`` s
pointing to another transaction's condition
"""
# get all transactions in which owner is in the `owners_after` list
response = backend.query.get_owned_ids(self.connection, owner)
return [
TransactionLink(tx['id'], index)
for tx in response
if not self.is_tx_strictly_in_invalid_block(tx['id'])
for index, output in enumerate(tx['outputs'])
if utils.output_has_owner(output, owner)
]
def is_tx_strictly_in_invalid_block(self, txid):
"""
Checks whether the transaction with the given ``txid``
*strictly* belongs to an invalid block.
Args:
txid (str): Transaction id.
Returns:
bool: ``True`` if the transaction *strictly* belongs to a
block that is invalid. ``False`` otherwise.
Note:
Since a transaction may be in multiple blocks, with
different statuses, the term "strictly" is used to
emphasize that if a transaction is said to be in an invalid
block, it means that it is not in any other block that is
either valid or undecided.
"""
validity = self.get_blocks_status_containing_tx(txid)
return (Bigchain.BLOCK_VALID not in validity.values() and
Bigchain.BLOCK_UNDECIDED not in validity.values())
def get_owned_ids(self, owner):
"""Retrieve a list of ``txid`` s that can be used as inputs.
@ -433,14 +387,17 @@ class Bigchain(object):
"""
return self.get_outputs_filtered(owner, include_spent=False)
@property
def fastquery(self):
return fastquery.FastQuery(self.connection, self.me)
def get_outputs_filtered(self, owner, include_spent=True):
"""
Get a list of output links filtered on some criteria
"""
outputs = self.get_outputs(owner)
outputs = self.fastquery.get_outputs_by_public_key(owner)
if not include_spent:
outputs = [o for o in outputs
if not self.get_spent(o.txid, o.output)]
outputs = self.fastquery.filter_spent_outputs(outputs)
return outputs
def get_transactions_filtered(self, asset_id, operation=None):

70
bigchaindb/fastquery.py Normal file
View File

@ -0,0 +1,70 @@
from bigchaindb.utils import output_has_owner
from bigchaindb.backend import query
from bigchaindb.common.transaction import TransactionLink
class FastQuery:
"""
Database queries that join on block results from a single node.
* Votes are not validated for security (security is a replication concern)
* Votes come from only one node, and as such, non-byzantine fault tolerance
is reduced.
Previously, to consider the status of a block, all votes for that block
were retrieved and the election results were counted. This meant that a
faulty node may still have been able to obtain a correct election result.
However, from the point of view of a client, it is still neccesary to
query multiple nodes to insure against getting an incorrect response from
a byzantine node.
"""
def __init__(self, connection, me):
self.connection = connection
self.me = me
def filter_valid_block_ids(self, block_ids, include_undecided=False):
"""
Given block ids, return only the ones that are valid.
"""
block_ids = list(set(block_ids))
votes = query.get_votes_for_blocks_by_voter(
self.connection, block_ids, self.me)
votes = {vote['vote']['voting_for_block']: vote['vote']['is_block_valid']
for vote in votes}
return [block_id for block_id in block_ids
if votes.get(block_id, include_undecided)]
def filter_valid_items(self, items, block_id_key=lambda b: b[0]):
"""
Given items with block ids, return only the ones that are valid or undecided.
"""
items = list(items)
block_ids = map(block_id_key, items)
valid_block_ids = set(self.filter_valid_block_ids(block_ids, True))
return [b for b in items if block_id_key(b) in valid_block_ids]
def get_outputs_by_public_key(self, public_key):
"""
Get outputs for a public key
"""
res = list(query.get_owned_ids(self.connection, public_key))
txs = [tx for _, tx in self.filter_valid_items(res)]
return [TransactionLink(tx['id'], index)
for tx in txs
for index, output in enumerate(tx['outputs'])
if output_has_owner(output, public_key)]
def filter_spent_outputs(self, outputs):
"""
Remove outputs that have been spent
Args:
outputs: list of TransactionLink
"""
links = [o.to_dict() for o in outputs]
res = query.get_spending_transactions(self.connection, links)
txs = [tx for _, tx in self.filter_valid_items(res)]
spends = {TransactionLink.from_dict(input_['fulfills'])
for tx in txs
for input_ in tx['inputs']}
return [ff for ff in outputs if ff not in spends]

View File

@ -205,10 +205,10 @@ def test_get_owned_ids(signed_create_tx, user_pk):
block = Block(transactions=[signed_create_tx])
conn.db.bigchain.insert_one(block.to_dict())
owned_ids = list(query.get_owned_ids(conn, user_pk))
[(block_id, tx)] = list(query.get_owned_ids(conn, user_pk))
assert len(owned_ids) == 1
assert owned_ids[0] == signed_create_tx.to_dict()
assert block_id == block.id
assert tx == signed_create_tx.to_dict()
def test_get_votes_by_block_id(signed_create_tx, structurally_valid_vote):
@ -417,3 +417,52 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx):
# Test get by asset and TRANSFER
txids = set(query.get_txids_filtered(conn, asset_id, Transaction.TRANSFER))
assert txids == {signed_transfer_tx.id}
def test_get_spending_transactions(user_pk):
from bigchaindb.backend import connect, query
from bigchaindb.models import Block, Transaction
conn = connect()
out = [([user_pk], 1)]
tx1 = Transaction.create([user_pk], out * 3)
inputs = tx1.to_inputs()
tx2 = Transaction.transfer([inputs[0]], out, tx1.id)
tx3 = Transaction.transfer([inputs[1]], out, tx1.id)
tx4 = Transaction.transfer([inputs[2]], out, tx1.id)
block = Block([tx1, tx2, tx3, tx4])
conn.db.bigchain.insert_one(block.to_dict())
links = [inputs[0].fulfills.to_dict(), inputs[2].fulfills.to_dict()]
res = list(query.get_spending_transactions(conn, links))
# tx3 not a member because input 1 not asked for
assert res == [(block.id, tx2.to_dict()), (block.id, tx4.to_dict())]
def test_get_votes_for_blocks_by_voter():
from bigchaindb.backend import connect, query
conn = connect()
votes = [
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block1'},
},
{
'node_pubkey': 'b',
'vote': {'voting_for_block': 'block1'},
},
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block2'},
},
{
'node_pubkey': 'a',
'vote': {'voting_for_block': 'block3'},
}
]
for vote in votes:
conn.db.votes.insert_one(vote.copy())
res = query.get_votes_for_blocks_by_voter(conn, ['block1', 'block2'], 'a')
assert list(res) == [votes[0], votes[2]]

View File

@ -36,6 +36,8 @@ def test_schema(schema_func_name, args_qty):
('get_votes_by_block_id_and_voter', 2),
('update_transaction', 2),
('get_transaction_from_block', 2),
('get_votes_for_blocks_by_voter', 2),
('get_spending_transactions', 1),
))
def test_query(query_func_name, args_qty):
from bigchaindb.backend import query

View File

@ -1119,11 +1119,11 @@ def test_get_owned_ids_calls_get_outputs_filtered():
def test_get_outputs_filtered_only_unspent():
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.core import Bigchain
with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs:
with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_public_key') as get_outputs:
get_outputs.return_value = [TransactionLink('a', 1),
TransactionLink('b', 2)]
with patch('bigchaindb.core.Bigchain.get_spent') as get_spent:
get_spent.side_effect = [True, False]
with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent:
filter_spent.return_value = [TransactionLink('b', 2)]
out = Bigchain().get_outputs_filtered('abc', include_spent=False)
get_outputs.assert_called_once_with('abc')
assert out == [TransactionLink('b', 2)]
@ -1132,13 +1132,13 @@ def test_get_outputs_filtered_only_unspent():
def test_get_outputs_filtered():
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.core import Bigchain
with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs:
with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_public_key') as get_outputs:
get_outputs.return_value = [TransactionLink('a', 1),
TransactionLink('b', 2)]
with patch('bigchaindb.core.Bigchain.get_spent') as get_spent:
with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent:
out = Bigchain().get_outputs_filtered('abc')
get_outputs.assert_called_once_with('abc')
get_spent.assert_not_called()
filter_spent.assert_not_called()
assert out == get_outputs.return_value

86
tests/test_fastquery.py Normal file
View File

@ -0,0 +1,86 @@
import pytest
from bigchaindb.common.transaction import TransactionLink
from bigchaindb.models import Block, Transaction
pytestmark = pytest.mark.bdb
@pytest.fixture
def blockdata(b, user_pk, user2_pk):
txs = [Transaction.create([user_pk], [([user2_pk], 1)]),
Transaction.create([user2_pk], [([user_pk], 1)]),
Transaction.create([user_pk], [([user_pk], 1), ([user2_pk], 1)])]
blocks = []
for i in range(3):
block = Block([txs[i]])
b.write_block(block)
blocks.append(block.to_dict())
b.write_vote(b.vote(blocks[1]['id'], '', True))
b.write_vote(b.vote(blocks[2]['id'], '', False))
return blocks, [b['id'] for b in blocks]
def test_filter_valid_block_ids_with_undecided(b, blockdata):
blocks, block_ids = blockdata
valid_ids = b.fastquery.filter_valid_block_ids(block_ids, include_undecided=True)
assert set(valid_ids) == {blocks[0]['id'], blocks[1]['id']}
def test_filter_valid_block_ids_only_valid(b, blockdata):
blocks, block_ids = blockdata
valid_ids = b.fastquery.filter_valid_block_ids(block_ids)
assert set(valid_ids) == {blocks[1]['id']}
def test_filter_valid_items(b, blockdata):
blocks, _ = blockdata
assert (b.fastquery.filter_valid_items(blocks, block_id_key=lambda b: b['id'])
== [blocks[0], blocks[1]])
def test_get_outputs_by_public_key(b, user_pk, user2_pk, blockdata):
blocks, _ = blockdata
assert b.fastquery.get_outputs_by_public_key(user_pk) == [
TransactionLink(blocks[1]['block']['transactions'][0]['id'], 0)
]
assert b.fastquery.get_outputs_by_public_key(user2_pk) == [
TransactionLink(blocks[0]['block']['transactions'][0]['id'], 0)
]
def test_filter_spent_outputs(b, user_pk):
out = [([user_pk], 1)]
tx1 = Transaction.create([user_pk], out * 3)
# There are 3 inputs
inputs = tx1.to_inputs()
# Each spent individually
tx2 = Transaction.transfer([inputs[0]], out, tx1.id)
tx3 = Transaction.transfer([inputs[1]], out, tx1.id)
tx4 = Transaction.transfer([inputs[2]], out, tx1.id)
# The CREATE and first TRANSFER are valid. tx2 produces a new unspent.
for tx in [tx1, tx2]:
block = Block([tx])
b.write_block(block)
b.write_vote(b.vote(block.id, '', True))
# The second TRANSFER is invalid. inputs[1] remains unspent.
block = Block([tx3])
b.write_block(block)
b.write_vote(b.vote(block.id, '', False))
# The third TRANSFER is undecided. It procuces a new unspent.
block = Block([tx4])
b.write_block(block)
outputs = b.fastquery.get_outputs_by_public_key(user_pk)
unspents = b.fastquery.filter_spent_outputs(outputs)
assert set(unspents) == {
inputs[1].fulfills,
tx2.to_inputs()[0].fulfills,
tx4.to_inputs()[0].fulfills
}