From 4f991227586dffe457c1fc04f4f7eba783bdd3a4 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 19 Apr 2017 15:47:58 +0200 Subject: [PATCH 1/7] fast unspents --- bigchaindb/backend/mongodb/query.py | 31 ++++++++++- bigchaindb/backend/query.py | 28 ++++++++++ bigchaindb/common/transaction.py | 3 + bigchaindb/core.py | 11 ++-- bigchaindb/fastquery.py | 56 +++++++++++++++++++ tests/backend/mongodb/test_queries.py | 51 +++++++++++++++++ tests/db/test_bigchain_api.py | 12 ++-- tests/test_fastquery.py | 80 +++++++++++++++++++++++++++ 8 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 bigchaindb/fastquery.py create mode 100644 tests/test_fastquery.py diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 74b9c35a..d564e242 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -176,13 +176,33 @@ def get_spent(conn, transaction_id, output): @register_query(MongoDBConnection) -def get_owned_ids(conn, owner): +def get_spending_transactions(conn, inputs): + return conn.run( + conn.collection('bigchain').aggregate([ + {'$match': { + 'block.transactions.inputs.fulfills': { + '$in': inputs, + }, + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.inputs.fulfills': { + '$in': inputs, + }, + }}, + ])) + + +@register_query(MongoDBConnection) +def get_owned_ids(conn, owner, unwrap=True): cursor = conn.run( conn.collection('bigchain').aggregate([ {'$match': {'block.transactions.outputs.public_keys': owner}}, {'$unwind': '$block.transactions'}, {'$match': {'block.transactions.outputs.public_keys': owner}} ])) + if not unwrap: + return cursor # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -196,6 +216,15 @@ def get_votes_by_block_id(conn, block_id): projection={'_id': False})) +@register_query(MongoDBConnection) +def get_votes_for_blocks_by_voter(conn, block_ids, node_pubkey): + return conn.run( + conn.collection('votes') + .find({'vote.voting_for_block': {'$in': block_ids}, + 'node_pubkey': node_pubkey}, + projection={'_id': False})) + + @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): return conn.run( diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 9aa653d7..5ae39557 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -140,6 +140,19 @@ def get_spent(connection, transaction_id, condition_id): raise NotImplementedError +@singledispatch +def get_spending_transactions(connection, inputs): + """Return transactions which spend given inputs + + Args: + inputs (list): list of {txid, output} + + Returns: + List of transactions that spend given inputs + """ + raise NotImplementedError + + @singledispatch def get_owned_ids(connection, owner): """Retrieve a list of `txids` that can we used has inputs. @@ -183,6 +196,21 @@ def get_votes_by_block_id_and_voter(connection, block_id, node_pubkey): raise NotImplementedError +@singledispatch +def get_votes_for_blocks_by_voter(connection, block_ids, pubkey): + """Return votes for many block_ids + + Args: + block_ids (set): block_ids + pubkey (str): public key of voting node + + Returns: + A cursor of votes matching given votes. + """ + + raise NotImplementedError + + @singledispatch def write_block(connection, block): """Write a block to the bigchain table. diff --git a/bigchaindb/common/transaction.py b/bigchaindb/common/transaction.py index e956812f..7d17172a 100644 --- a/bigchaindb/common/transaction.py +++ b/bigchaindb/common/transaction.py @@ -161,6 +161,9 @@ class TransactionLink(object): # TODO: If `other !== TransactionLink` return `False` return self.to_dict() == other.to_dict() + def __hash__(self): + return hash((self.txid, self.output)) + @classmethod def from_dict(cls, link): """Transforms a Python dictionary to a TransactionLink object. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 5d2e9c03..be7cdcbc 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -8,7 +8,7 @@ from bigchaindb.common.transaction import TransactionLink import bigchaindb -from bigchaindb import backend, config_utils, utils +from bigchaindb import backend, config_utils, utils, fastquery from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Block, Transaction @@ -433,14 +433,17 @@ class Bigchain(object): """ return self.get_outputs_filtered(owner, include_spent=False) + @property + def fastquery(self): + return fastquery.FastQuery(self.connection, self.me) + def get_outputs_filtered(self, owner, include_spent=True): """ Get a list of output links filtered on some criteria """ - outputs = self.get_outputs(owner) + outputs = self.fastquery.get_outputs_by_pubkey(owner) if not include_spent: - outputs = [o for o in outputs - if not self.get_spent(o.txid, o.output)] + outputs = self.fastquery.filter_spent_outputs(outputs) return outputs def get_transactions_filtered(self, asset_id, operation=None): diff --git a/bigchaindb/fastquery.py b/bigchaindb/fastquery.py new file mode 100644 index 00000000..f2a8fb09 --- /dev/null +++ b/bigchaindb/fastquery.py @@ -0,0 +1,56 @@ +from bigchaindb.utils import output_has_owner +from bigchaindb.backend import query +from bigchaindb.common.transaction import TransactionLink + + +class FastQuery: + + """ + Database queries that join on block results from a single node. + + * Votes are not validated for security (security is replication concern) + * Votes come from only one node, and as such, fault tolerance is not provided + (elected Blockchain table not yet available) + + In return, these queries offer good performance, as it is not neccesary to validate + each result separately. + """ + + def __init__(self, connection, me): + self.connection = connection + self.me = me + + def filter_block_ids(self, block_ids, include_undecided=True): + votes = query.get_votes_for_blocks_by_voter( + self.connection, block_ids, self.me) + votes = {v['vote']['voting_for_block']: v['vote']['is_block_valid'] for v in votes} + return [b for b in block_ids if votes.get(b, include_undecided)] + + def filter_valid_blocks(self, blocks): + block_ids = list(set(b['id'] for b in blocks)) + valid_block_ids = self.filter_block_ids(block_ids) + return [b for b in blocks if b['id'] in valid_block_ids] + + def get_outputs_by_pubkey(self, pubkey): + cursor = query.get_owned_ids(self.connection, pubkey, unwrap=False) + wrapped_txs = self.filter_valid_blocks(list(cursor)) + txs = [wrapped['block']['transactions'] for wrapped in wrapped_txs] + return [TransactionLink(tx['id'], i) + for tx in txs + for i, o in enumerate(tx['outputs']) + if output_has_owner(o, pubkey)] + + def filter_spent_outputs(self, outputs): + """ + Remove outputs that have been spent + + Args: + outputs: list of TransactionLink + """ + links = [o.to_dict() for o in outputs] + wrapped = self.filter_valid_blocks( + list(query.get_spending_transactions(self.connection, links))) + spends = {TransactionLink.from_dict(input_['fulfills']) + for block in wrapped + for input_ in block['block']['transactions']['inputs']} + return [ff for ff in outputs if ff not in spends] diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index bd7e75f1..c8a03011 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -417,3 +417,54 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx): # Test get by asset and TRANSFER txids = set(query.get_txids_filtered(conn, asset_id, Transaction.TRANSFER)) assert txids == {signed_transfer_tx.id} + + +def test_get_spending_transactions(user_pk): + from bigchaindb.backend import connect, query + from bigchaindb.models import Block, Transaction + conn = connect() + + out = [([user_pk], 1)] + tx1 = Transaction.create([user_pk], out * 3) + inputs = tx1.to_inputs() + tx2 = Transaction.transfer([inputs[0]], out, tx1.id) + tx3 = Transaction.transfer([inputs[1]], out, tx1.id) + tx4 = Transaction.transfer([inputs[2]], out, tx1.id) + block = Block([tx1, tx2, tx3, tx4]) + conn.db.bigchain.insert_one(block.to_dict()) + + links = [inputs[0].fulfills.to_dict(), inputs[2].fulfills.to_dict()] + # discard block noise + res = [(r['id'], r['block']['transactions']) + for r in list(query.get_spending_transactions(conn, links))] + + # tx3 not a member because input 1 not asked for + assert res == [(block.id, tx2.to_dict()), (block.id, tx4.to_dict())] + + +def test_get_votes_for_blocks_by_voter(): + from bigchaindb.backend import connect, query + + conn = connect() + votes = [ + { + 'node_pubkey': 'a', + 'vote': {'voting_for_block': 'block1'}, + }, + { + 'node_pubkey': 'b', + 'vote': {'voting_for_block': 'block1'}, + }, + { + 'node_pubkey': 'a', + 'vote': {'voting_for_block': 'block2'}, + }, + { + 'node_pubkey': 'a', + 'vote': {'voting_for_block': 'block3'}, + } + ] + for vote in votes: + conn.db.votes.insert_one(vote.copy()) + res = query.get_votes_for_blocks_by_voter(conn, ['block1', 'block2'], 'a') + assert list(res) == [votes[0], votes[2]] diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 3f05385c..589671e5 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -1119,11 +1119,11 @@ def test_get_owned_ids_calls_get_outputs_filtered(): def test_get_outputs_filtered_only_unspent(): from bigchaindb.common.transaction import TransactionLink from bigchaindb.core import Bigchain - with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs: + with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs: get_outputs.return_value = [TransactionLink('a', 1), TransactionLink('b', 2)] - with patch('bigchaindb.core.Bigchain.get_spent') as get_spent: - get_spent.side_effect = [True, False] + with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent: + filter_spent.return_value = [TransactionLink('b', 2)] out = Bigchain().get_outputs_filtered('abc', include_spent=False) get_outputs.assert_called_once_with('abc') assert out == [TransactionLink('b', 2)] @@ -1132,13 +1132,13 @@ def test_get_outputs_filtered_only_unspent(): def test_get_outputs_filtered(): from bigchaindb.common.transaction import TransactionLink from bigchaindb.core import Bigchain - with patch('bigchaindb.core.Bigchain.get_outputs') as get_outputs: + with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs: get_outputs.return_value = [TransactionLink('a', 1), TransactionLink('b', 2)] - with patch('bigchaindb.core.Bigchain.get_spent') as get_spent: + with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent: out = Bigchain().get_outputs_filtered('abc') get_outputs.assert_called_once_with('abc') - get_spent.assert_not_called() + filter_spent.assert_not_called() assert out == get_outputs.return_value diff --git a/tests/test_fastquery.py b/tests/test_fastquery.py new file mode 100644 index 00000000..b730eee9 --- /dev/null +++ b/tests/test_fastquery.py @@ -0,0 +1,80 @@ +import pytest + +from bigchaindb.common.transaction import TransactionLink +from bigchaindb.models import Block, Transaction + +pytestmark = pytest.mark.bdb + + +@pytest.fixture +def blockdata(b, user_pk, user2_pk): + txs = [Transaction.create([user_pk], [([user2_pk], 1)]), + Transaction.create([user2_pk], [([user_pk], 1)]), + Transaction.create([user_pk], [([user_pk], 1), ([user2_pk], 1)])] + blocks = [] + for i in range(3): + block = Block([txs[i]]) + b.write_block(block) + blocks.append(block.to_dict()) + if i > 0: + b.write_vote(b.vote(block.id, '', i % 2 == 1)) + return blocks, [b['id'] for b in blocks] + + +def test_filter_block_ids_with_undecided(b, blockdata): + blocks, block_ids = blockdata + valid_ids = b.fastquery.filter_block_ids(block_ids) + assert set(valid_ids) == {blocks[0]['id'], blocks[1]['id']} + + +def test_filter_block_ids_only_valid(b, blockdata): + blocks, block_ids = blockdata + valid_ids = b.fastquery.filter_block_ids(block_ids, include_undecided=False) + assert set(valid_ids) == {blocks[1]['id']} + + +def test_filter_valid_blocks(b, blockdata): + blocks, _ = blockdata + assert b.fastquery.filter_valid_blocks(blocks) == [blocks[0], blocks[1]] + + +def test_get_outputs_by_pubkey(b, user_pk, user2_pk, blockdata): + blocks, _ = blockdata + assert b.fastquery.get_outputs_by_pubkey(user_pk) == [ + TransactionLink(blocks[1]['block']['transactions'][0]['id'], 0) + ] + assert b.fastquery.get_outputs_by_pubkey(user2_pk) == [ + TransactionLink(blocks[0]['block']['transactions'][0]['id'], 0) + ] + + +def test_filter_spent_outputs(b, user_pk): + out = [([user_pk], 1)] + tx1 = Transaction.create([user_pk], out * 3) + inputs = tx1.to_inputs() + tx2 = Transaction.transfer([inputs[0]], out, tx1.id) + tx3 = Transaction.transfer([inputs[1]], out, tx1.id) + tx4 = Transaction.transfer([inputs[2]], out, tx1.id) + + for tx in [tx1, tx2]: + block = Block([tx]) + b.write_block(block) + b.write_vote(b.vote(block.id, '', True)) + + # mark invalid + block = Block([tx3]) + b.write_block(block) + b.write_vote(b.vote(block.id, '', False)) + + # undecided + block = Block([tx4]) + b.write_block(block) + + unspents = b.fastquery.filter_spent_outputs( + b.fastquery.get_outputs_by_pubkey(user_pk)) + + assert set(unspents) == { + inputs[1].fulfills, + tx2.to_inputs()[0].fulfills, + tx4.to_inputs()[0].fulfills + } From 5b6fa13d798a6bed27fc662f1db6b2a11e7ef885 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Thu, 20 Apr 2017 16:58:29 +0200 Subject: [PATCH 2/7] fast unspent queries for RethinkDB --- bigchaindb/backend/rethinkdb/query.py | 38 +++++++++++++++++++++++---- bigchaindb/fastquery.py | 4 +-- tests/test_fastquery.py | 13 ++++++--- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 6011cc8c..6a2c644e 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -120,14 +120,16 @@ def get_spent(connection, transaction_id, output): @register_query(RethinkDBConnection) -def get_owned_ids(connection, owner): - return connection.run( - r.table('bigchain', read_mode=READ_MODE) +def get_owned_ids(connection, owner, unwrap=True): + query = (r.table('bigchain', read_mode=READ_MODE) .get_all(owner, index='outputs') .distinct() - .concat_map(lambda doc: doc['block']['transactions']) - .filter(lambda tx: tx['outputs'].contains( + .concat_map(unroll_block_transactions) + .filter(lambda doc: doc['block']['transactions']['outputs'].contains( lambda c: c['public_keys'].contains(owner)))) + if unwrap: + query = query.map(lambda doc: doc['block']['transactions']) + return connection.run(query) @register_query(RethinkDBConnection) @@ -253,3 +255,29 @@ def get_unvoted_blocks(connection, node_pubkey): # database level. Solving issue #444 can help untangling the situation unvoted_blocks = filter(lambda block: not utils.is_genesis_block(block), unvoted) return unvoted_blocks + + +@register_query(RethinkDBConnection) +def get_votes_for_blocks_by_voter(connection, block_ids, node_pubkey): + return connection.run( + r.table('votes') + .filter(lambda row: r.expr(block_ids).contains(row['vote']['voting_for_block'])) + .filter(lambda row: row['node_pubkey'] == node_pubkey)) + + +def unroll_block_transactions(block): + """ Simulate unrolling a transaction into block in MongoDB """ + return block['block']['transactions'].map( + lambda tx: block.merge({'block': {'transactions': tx}})) + + +@register_query(RethinkDBConnection) +def get_spending_transactions(connection, links): + query = ( + r.table('bigchain') + .get_all(*[(l['txid'], l['output']) for l in links], index='inputs') + .concat_map(unroll_block_transactions) + .filter(lambda doc: r.expr(links).set_intersection( + doc['block']['transactions']['inputs'].map(lambda i: i['fulfills']))) + ) + return connection.run(query) diff --git a/bigchaindb/fastquery.py b/bigchaindb/fastquery.py index f2a8fb09..7b73ab64 100644 --- a/bigchaindb/fastquery.py +++ b/bigchaindb/fastquery.py @@ -48,8 +48,8 @@ class FastQuery: outputs: list of TransactionLink """ links = [o.to_dict() for o in outputs] - wrapped = self.filter_valid_blocks( - list(query.get_spending_transactions(self.connection, links))) + spending_txs = query.get_spending_transactions(self.connection, links) + wrapped = self.filter_valid_blocks(list(spending_txs)) spends = {TransactionLink.from_dict(input_['fulfills']) for block in wrapped for input_ in block['block']['transactions']['inputs']} diff --git a/tests/test_fastquery.py b/tests/test_fastquery.py index b730eee9..8621751a 100644 --- a/tests/test_fastquery.py +++ b/tests/test_fastquery.py @@ -51,27 +51,32 @@ def test_get_outputs_by_pubkey(b, user_pk, user2_pk, blockdata): def test_filter_spent_outputs(b, user_pk): out = [([user_pk], 1)] tx1 = Transaction.create([user_pk], out * 3) + + # There are 3 inputs inputs = tx1.to_inputs() + + # Each spent individually tx2 = Transaction.transfer([inputs[0]], out, tx1.id) tx3 = Transaction.transfer([inputs[1]], out, tx1.id) tx4 = Transaction.transfer([inputs[2]], out, tx1.id) + # The CREATE and first TRANSFER are valid. tx2 produces a new unspent. for tx in [tx1, tx2]: block = Block([tx]) b.write_block(block) b.write_vote(b.vote(block.id, '', True)) - # mark invalid + # The second TRANSFER is invalid. inputs[1] remains unspent. block = Block([tx3]) b.write_block(block) b.write_vote(b.vote(block.id, '', False)) - # undecided + # The third TRANSFER is undecided. It procuces a new unspent. block = Block([tx4]) b.write_block(block) - unspents = b.fastquery.filter_spent_outputs( - b.fastquery.get_outputs_by_pubkey(user_pk)) + outputs = b.fastquery.get_outputs_by_pubkey(user_pk) + unspents = b.fastquery.filter_spent_outputs(outputs) assert set(unspents) == { inputs[1].fulfills, From 29247a9994758e2d511a2f686abb3cc4b8f94e54 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Thu, 20 Apr 2017 16:58:40 +0200 Subject: [PATCH 3/7] test clarification --- tests/test_fastquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_fastquery.py b/tests/test_fastquery.py index 8621751a..200bd7f9 100644 --- a/tests/test_fastquery.py +++ b/tests/test_fastquery.py @@ -16,8 +16,8 @@ def blockdata(b, user_pk, user2_pk): block = Block([txs[i]]) b.write_block(block) blocks.append(block.to_dict()) - if i > 0: - b.write_vote(b.vote(block.id, '', i % 2 == 1)) + b.write_vote(b.vote(blocks[1]['id'], '', True)) + b.write_vote(b.vote(blocks[2]['id'], '', False)) return blocks, [b['id'] for b in blocks] From 5d767c1162effa3f657f5d6c5c5a3594bdc11662 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Fri, 21 Apr 2017 12:23:53 +0200 Subject: [PATCH 4/7] light refactoring to remove some redundancies and fix test coverage --- bigchaindb/backend/mongodb/query.py | 11 +++--- bigchaindb/backend/rethinkdb/query.py | 19 +++++------ bigchaindb/core.py | 48 +-------------------------- bigchaindb/fastquery.py | 32 ++++++++++-------- tests/backend/mongodb/test_queries.py | 9 +++-- tests/backend/test_generics.py | 2 ++ tests/test_fastquery.py | 3 +- 7 files changed, 41 insertions(+), 83 deletions(-) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index d564e242..0eab8c6b 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -177,7 +177,7 @@ def get_spent(conn, transaction_id, output): @register_query(MongoDBConnection) def get_spending_transactions(conn, inputs): - return conn.run( + cursor = conn.run( conn.collection('bigchain').aggregate([ {'$match': { 'block.transactions.inputs.fulfills': { @@ -191,21 +191,18 @@ def get_spending_transactions(conn, inputs): }, }}, ])) + return ((b['id'], b['block']['transactions']) for b in cursor) @register_query(MongoDBConnection) -def get_owned_ids(conn, owner, unwrap=True): +def get_owned_ids(conn, owner): cursor = conn.run( conn.collection('bigchain').aggregate([ {'$match': {'block.transactions.outputs.public_keys': owner}}, {'$unwind': '$block.transactions'}, {'$match': {'block.transactions.outputs.public_keys': owner}} ])) - if not unwrap: - return cursor - # we need to access some nested fields before returning so lets use a - # generator to avoid having to read all records on the cursor at this point - return (elem['block']['transactions'] for elem in cursor) + return ((b['id'], b['block']['transactions']) for b in cursor) @register_query(MongoDBConnection) diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 6a2c644e..fca65bdf 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -120,16 +120,15 @@ def get_spent(connection, transaction_id, output): @register_query(RethinkDBConnection) -def get_owned_ids(connection, owner, unwrap=True): +def get_owned_ids(connection, owner): query = (r.table('bigchain', read_mode=READ_MODE) .get_all(owner, index='outputs') .distinct() .concat_map(unroll_block_transactions) - .filter(lambda doc: doc['block']['transactions']['outputs'].contains( + .filter(lambda doc: doc['tx']['outputs'].contains( lambda c: c['public_keys'].contains(owner)))) - if unwrap: - query = query.map(lambda doc: doc['block']['transactions']) - return connection.run(query) + cursor = connection.run(query) + return ((b['id'], b['tx']) for b in cursor) @register_query(RethinkDBConnection) @@ -266,9 +265,8 @@ def get_votes_for_blocks_by_voter(connection, block_ids, node_pubkey): def unroll_block_transactions(block): - """ Simulate unrolling a transaction into block in MongoDB """ - return block['block']['transactions'].map( - lambda tx: block.merge({'block': {'transactions': tx}})) + """ Unroll block transactions """ + return block['block']['transactions'].map(lambda tx: block.merge({'tx': tx})) @register_query(RethinkDBConnection) @@ -278,6 +276,7 @@ def get_spending_transactions(connection, links): .get_all(*[(l['txid'], l['output']) for l in links], index='inputs') .concat_map(unroll_block_transactions) .filter(lambda doc: r.expr(links).set_intersection( - doc['block']['transactions']['inputs'].map(lambda i: i['fulfills']))) + doc['tx']['inputs'].map(lambda i: i['fulfills']))) ) - return connection.run(query) + cursor = connection.run(query) + return ((b['id'], b['tx']) for b in cursor) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index be7cdcbc..63cab1d2 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -4,11 +4,10 @@ from time import time from bigchaindb import exceptions as core_exceptions from bigchaindb.common import crypto, exceptions from bigchaindb.common.utils import gen_timestamp, serialize -from bigchaindb.common.transaction import TransactionLink import bigchaindb -from bigchaindb import backend, config_utils, utils, fastquery +from bigchaindb import backend, config_utils, fastquery from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Block, Transaction @@ -376,51 +375,6 @@ class Bigchain(object): # Either no transaction was returned spending the `(txid, output)` as # input or the returned transactions are not valid. - def get_outputs(self, owner): - """Retrieve a list of links to transaction outputs for a given public - key. - - Args: - owner (str): base58 encoded public key. - - Returns: - :obj:`list` of TransactionLink: list of ``txid`` s and ``output`` s - pointing to another transaction's condition - """ - # get all transactions in which owner is in the `owners_after` list - response = backend.query.get_owned_ids(self.connection, owner) - return [ - TransactionLink(tx['id'], index) - for tx in response - if not self.is_tx_strictly_in_invalid_block(tx['id']) - for index, output in enumerate(tx['outputs']) - if utils.output_has_owner(output, owner) - ] - - def is_tx_strictly_in_invalid_block(self, txid): - """ - Checks whether the transaction with the given ``txid`` - *strictly* belongs to an invalid block. - - Args: - txid (str): Transaction id. - - Returns: - bool: ``True`` if the transaction *strictly* belongs to a - block that is invalid. ``False`` otherwise. - - Note: - Since a transaction may be in multiple blocks, with - different statuses, the term "strictly" is used to - emphasize that if a transaction is said to be in an invalid - block, it means that it is not in any other block that is - either valid or undecided. - - """ - validity = self.get_blocks_status_containing_tx(txid) - return (Bigchain.BLOCK_VALID not in validity.values() and - Bigchain.BLOCK_UNDECIDED not in validity.values()) - def get_owned_ids(self, owner): """Retrieve a list of ``txid`` s that can be used as inputs. diff --git a/bigchaindb/fastquery.py b/bigchaindb/fastquery.py index 7b73ab64..a7147722 100644 --- a/bigchaindb/fastquery.py +++ b/bigchaindb/fastquery.py @@ -9,8 +9,7 @@ class FastQuery: Database queries that join on block results from a single node. * Votes are not validated for security (security is replication concern) - * Votes come from only one node, and as such, fault tolerance is not provided - (elected Blockchain table not yet available) + * Votes come from only one node, and as such, fault tolerance is reduced In return, these queries offer good performance, as it is not neccesary to validate each result separately. @@ -21,20 +20,27 @@ class FastQuery: self.me = me def filter_block_ids(self, block_ids, include_undecided=True): + """ + Given block ids, filter the invalid blocks + """ + block_ids = list(set(block_ids)) votes = query.get_votes_for_blocks_by_voter( self.connection, block_ids, self.me) votes = {v['vote']['voting_for_block']: v['vote']['is_block_valid'] for v in votes} return [b for b in block_ids if votes.get(b, include_undecided)] - def filter_valid_blocks(self, blocks): - block_ids = list(set(b['id'] for b in blocks)) - valid_block_ids = self.filter_block_ids(block_ids) - return [b for b in blocks if b['id'] in valid_block_ids] + def filter_valid_blocks(self, blocks, key=lambda b: b[0]): + """ + Given things with block ids, remove the invalid ones. + """ + blocks = list(blocks) + valid_block_ids = set(self.filter_block_ids(key(b) for b in blocks)) + return [b for b in blocks if key(b) in valid_block_ids] def get_outputs_by_pubkey(self, pubkey): - cursor = query.get_owned_ids(self.connection, pubkey, unwrap=False) - wrapped_txs = self.filter_valid_blocks(list(cursor)) - txs = [wrapped['block']['transactions'] for wrapped in wrapped_txs] + """ Get outputs for a public key """ + res = list(query.get_owned_ids(self.connection, pubkey)) + txs = [tx for _, tx in self.filter_valid_blocks(res)] return [TransactionLink(tx['id'], i) for tx in txs for i, o in enumerate(tx['outputs']) @@ -48,9 +54,9 @@ class FastQuery: outputs: list of TransactionLink """ links = [o.to_dict() for o in outputs] - spending_txs = query.get_spending_transactions(self.connection, links) - wrapped = self.filter_valid_blocks(list(spending_txs)) + res = query.get_spending_transactions(self.connection, links) + txs = [tx for _, tx in self.filter_valid_blocks(res)] spends = {TransactionLink.from_dict(input_['fulfills']) - for block in wrapped - for input_ in block['block']['transactions']['inputs']} + for tx in txs + for input_ in tx['inputs']} return [ff for ff in outputs if ff not in spends] diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index c8a03011..426ac12d 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -205,10 +205,10 @@ def test_get_owned_ids(signed_create_tx, user_pk): block = Block(transactions=[signed_create_tx]) conn.db.bigchain.insert_one(block.to_dict()) - owned_ids = list(query.get_owned_ids(conn, user_pk)) + [(block_id, tx)] = list(query.get_owned_ids(conn, user_pk)) - assert len(owned_ids) == 1 - assert owned_ids[0] == signed_create_tx.to_dict() + assert block_id == block.id + assert tx == signed_create_tx.to_dict() def test_get_votes_by_block_id(signed_create_tx, structurally_valid_vote): @@ -435,8 +435,7 @@ def test_get_spending_transactions(user_pk): links = [inputs[0].fulfills.to_dict(), inputs[2].fulfills.to_dict()] # discard block noise - res = [(r['id'], r['block']['transactions']) - for r in list(query.get_spending_transactions(conn, links))] + res = list(query.get_spending_transactions(conn, links)) # tx3 not a member because input 1 not asked for assert res == [(block.id, tx2.to_dict()), (block.id, tx4.to_dict())] diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index 57a644ee..581a0b31 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -36,6 +36,8 @@ def test_schema(schema_func_name, args_qty): ('get_votes_by_block_id_and_voter', 2), ('update_transaction', 2), ('get_transaction_from_block', 2), + ('get_votes_for_blocks_by_voter', 2), + ('get_spending_transactions', 1), )) def test_query(query_func_name, args_qty): from bigchaindb.backend import query diff --git a/tests/test_fastquery.py b/tests/test_fastquery.py index 200bd7f9..c54ef10e 100644 --- a/tests/test_fastquery.py +++ b/tests/test_fastquery.py @@ -35,7 +35,8 @@ def test_filter_block_ids_only_valid(b, blockdata): def test_filter_valid_blocks(b, blockdata): blocks, _ = blockdata - assert b.fastquery.filter_valid_blocks(blocks) == [blocks[0], blocks[1]] + assert (b.fastquery.filter_valid_blocks(blocks, key=lambda b: b['id']) + == [blocks[0], blocks[1]]) def test_get_outputs_by_pubkey(b, user_pk, user2_pk, blockdata): From 2200a7bda417e4d5ef4fd1f3297e7231249dbb85 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Mon, 24 Apr 2017 12:21:00 +0200 Subject: [PATCH 5/7] cleanup --- bigchaindb/backend/query.py | 1 - bigchaindb/backend/rethinkdb/query.py | 9 +++++---- tests/backend/mongodb/test_queries.py | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 5ae39557..0f746132 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -207,7 +207,6 @@ def get_votes_for_blocks_by_voter(connection, block_ids, pubkey): Returns: A cursor of votes matching given votes. """ - raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index fca65bdf..18116fc1 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -124,7 +124,7 @@ def get_owned_ids(connection, owner): query = (r.table('bigchain', read_mode=READ_MODE) .get_all(owner, index='outputs') .distinct() - .concat_map(unroll_block_transactions) + .concat_map(unwind_block_transactions) .filter(lambda doc: doc['tx']['outputs'].contains( lambda c: c['public_keys'].contains(owner)))) cursor = connection.run(query) @@ -264,8 +264,8 @@ def get_votes_for_blocks_by_voter(connection, block_ids, node_pubkey): .filter(lambda row: row['node_pubkey'] == node_pubkey)) -def unroll_block_transactions(block): - """ Unroll block transactions """ +def unwind_block_transactions(block): + """ Yield a block for each transaction in given block """ return block['block']['transactions'].map(lambda tx: block.merge({'tx': tx})) @@ -274,7 +274,8 @@ def get_spending_transactions(connection, links): query = ( r.table('bigchain') .get_all(*[(l['txid'], l['output']) for l in links], index='inputs') - .concat_map(unroll_block_transactions) + .concat_map(unwind_block_transactions) + # filter transactions spending output .filter(lambda doc: r.expr(links).set_intersection( doc['tx']['inputs'].map(lambda i: i['fulfills']))) ) diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index 426ac12d..0c295b9b 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -434,7 +434,6 @@ def test_get_spending_transactions(user_pk): conn.db.bigchain.insert_one(block.to_dict()) links = [inputs[0].fulfills.to_dict(), inputs[2].fulfills.to_dict()] - # discard block noise res = list(query.get_spending_transactions(conn, links)) # tx3 not a member because input 1 not asked for From 01ba01083d26ba2c1d22eeedd4089a718f9589a9 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Tue, 25 Apr 2017 13:12:32 +0200 Subject: [PATCH 6/7] update comments --- bigchaindb/backend/query.py | 9 +++++---- bigchaindb/fastquery.py | 17 ++++++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 0f746132..6f76249b 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -148,7 +148,8 @@ def get_spending_transactions(connection, inputs): inputs (list): list of {txid, output} Returns: - List of transactions that spend given inputs + Iterator of (block_ids, transaction) for transactions that + spend given inputs. """ raise NotImplementedError @@ -161,9 +162,9 @@ def get_owned_ids(connection, owner): owner (str): base58 encoded public key. Returns: - A cursor for the matching transactions. + Iterator of (block_id, transaction) for transactions + that list given owner in conditions. """ - raise NotImplementedError @@ -205,7 +206,7 @@ def get_votes_for_blocks_by_voter(connection, block_ids, pubkey): pubkey (str): public key of voting node Returns: - A cursor of votes matching given votes. + A cursor of votes matching given block_ids and public key """ raise NotImplementedError diff --git a/bigchaindb/fastquery.py b/bigchaindb/fastquery.py index a7147722..deffabe1 100644 --- a/bigchaindb/fastquery.py +++ b/bigchaindb/fastquery.py @@ -4,24 +4,27 @@ from bigchaindb.common.transaction import TransactionLink class FastQuery: - """ Database queries that join on block results from a single node. - * Votes are not validated for security (security is replication concern) - * Votes come from only one node, and as such, fault tolerance is reduced + * Votes are not validated for security (security is a replication concern) + * Votes come from only one node, and as such, non-byzantine fault tolerance + is reduced. - In return, these queries offer good performance, as it is not neccesary to validate - each result separately. + Previously, to consider the status of a block, all votes for that block + were retrieved and the election results were counted. This meant that a + faulty node may still have been able to obtain a correct election result. + However, from the point of view of a client, it is still neccesary to + query multiple nodes to insure against getting an incorrect response from + a byzantine node. """ - def __init__(self, connection, me): self.connection = connection self.me = me def filter_block_ids(self, block_ids, include_undecided=True): """ - Given block ids, filter the invalid blocks + Given block ids, filter the invalid blocks. """ block_ids = list(set(block_ids)) votes = query.get_votes_for_blocks_by_voter( From b4f14b26ce264e7c24053b6e9be4cbea21ade054 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Mon, 8 May 2017 14:25:39 +0200 Subject: [PATCH 7/7] address nomenclature issues --- bigchaindb/core.py | 2 +- bigchaindb/fastquery.py | 39 ++++++++++++++++++++--------------- tests/db/test_bigchain_api.py | 4 ++-- tests/test_fastquery.py | 20 +++++++++--------- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 63cab1d2..91666224 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -395,7 +395,7 @@ class Bigchain(object): """ Get a list of output links filtered on some criteria """ - outputs = self.fastquery.get_outputs_by_pubkey(owner) + outputs = self.fastquery.get_outputs_by_public_key(owner) if not include_spent: outputs = self.fastquery.filter_spent_outputs(outputs) return outputs diff --git a/bigchaindb/fastquery.py b/bigchaindb/fastquery.py index deffabe1..d19294ce 100644 --- a/bigchaindb/fastquery.py +++ b/bigchaindb/fastquery.py @@ -22,32 +22,37 @@ class FastQuery: self.connection = connection self.me = me - def filter_block_ids(self, block_ids, include_undecided=True): + def filter_valid_block_ids(self, block_ids, include_undecided=False): """ - Given block ids, filter the invalid blocks. + Given block ids, return only the ones that are valid. """ block_ids = list(set(block_ids)) votes = query.get_votes_for_blocks_by_voter( self.connection, block_ids, self.me) - votes = {v['vote']['voting_for_block']: v['vote']['is_block_valid'] for v in votes} - return [b for b in block_ids if votes.get(b, include_undecided)] + votes = {vote['vote']['voting_for_block']: vote['vote']['is_block_valid'] + for vote in votes} + return [block_id for block_id in block_ids + if votes.get(block_id, include_undecided)] - def filter_valid_blocks(self, blocks, key=lambda b: b[0]): + def filter_valid_items(self, items, block_id_key=lambda b: b[0]): """ - Given things with block ids, remove the invalid ones. + Given items with block ids, return only the ones that are valid or undecided. """ - blocks = list(blocks) - valid_block_ids = set(self.filter_block_ids(key(b) for b in blocks)) - return [b for b in blocks if key(b) in valid_block_ids] + items = list(items) + block_ids = map(block_id_key, items) + valid_block_ids = set(self.filter_valid_block_ids(block_ids, True)) + return [b for b in items if block_id_key(b) in valid_block_ids] - def get_outputs_by_pubkey(self, pubkey): - """ Get outputs for a public key """ - res = list(query.get_owned_ids(self.connection, pubkey)) - txs = [tx for _, tx in self.filter_valid_blocks(res)] - return [TransactionLink(tx['id'], i) + def get_outputs_by_public_key(self, public_key): + """ + Get outputs for a public key + """ + res = list(query.get_owned_ids(self.connection, public_key)) + txs = [tx for _, tx in self.filter_valid_items(res)] + return [TransactionLink(tx['id'], index) for tx in txs - for i, o in enumerate(tx['outputs']) - if output_has_owner(o, pubkey)] + for index, output in enumerate(tx['outputs']) + if output_has_owner(output, public_key)] def filter_spent_outputs(self, outputs): """ @@ -58,7 +63,7 @@ class FastQuery: """ links = [o.to_dict() for o in outputs] res = query.get_spending_transactions(self.connection, links) - txs = [tx for _, tx in self.filter_valid_blocks(res)] + txs = [tx for _, tx in self.filter_valid_items(res)] spends = {TransactionLink.from_dict(input_['fulfills']) for tx in txs for input_ in tx['inputs']} diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 589671e5..8d17a9e7 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -1119,7 +1119,7 @@ def test_get_owned_ids_calls_get_outputs_filtered(): def test_get_outputs_filtered_only_unspent(): from bigchaindb.common.transaction import TransactionLink from bigchaindb.core import Bigchain - with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs: + with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_public_key') as get_outputs: get_outputs.return_value = [TransactionLink('a', 1), TransactionLink('b', 2)] with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent: @@ -1132,7 +1132,7 @@ def test_get_outputs_filtered_only_unspent(): def test_get_outputs_filtered(): from bigchaindb.common.transaction import TransactionLink from bigchaindb.core import Bigchain - with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_pubkey') as get_outputs: + with patch('bigchaindb.fastquery.FastQuery.get_outputs_by_public_key') as get_outputs: get_outputs.return_value = [TransactionLink('a', 1), TransactionLink('b', 2)] with patch('bigchaindb.fastquery.FastQuery.filter_spent_outputs') as filter_spent: diff --git a/tests/test_fastquery.py b/tests/test_fastquery.py index c54ef10e..8fb3378c 100644 --- a/tests/test_fastquery.py +++ b/tests/test_fastquery.py @@ -21,30 +21,30 @@ def blockdata(b, user_pk, user2_pk): return blocks, [b['id'] for b in blocks] -def test_filter_block_ids_with_undecided(b, blockdata): +def test_filter_valid_block_ids_with_undecided(b, blockdata): blocks, block_ids = blockdata - valid_ids = b.fastquery.filter_block_ids(block_ids) + valid_ids = b.fastquery.filter_valid_block_ids(block_ids, include_undecided=True) assert set(valid_ids) == {blocks[0]['id'], blocks[1]['id']} -def test_filter_block_ids_only_valid(b, blockdata): +def test_filter_valid_block_ids_only_valid(b, blockdata): blocks, block_ids = blockdata - valid_ids = b.fastquery.filter_block_ids(block_ids, include_undecided=False) + valid_ids = b.fastquery.filter_valid_block_ids(block_ids) assert set(valid_ids) == {blocks[1]['id']} -def test_filter_valid_blocks(b, blockdata): +def test_filter_valid_items(b, blockdata): blocks, _ = blockdata - assert (b.fastquery.filter_valid_blocks(blocks, key=lambda b: b['id']) + assert (b.fastquery.filter_valid_items(blocks, block_id_key=lambda b: b['id']) == [blocks[0], blocks[1]]) -def test_get_outputs_by_pubkey(b, user_pk, user2_pk, blockdata): +def test_get_outputs_by_public_key(b, user_pk, user2_pk, blockdata): blocks, _ = blockdata - assert b.fastquery.get_outputs_by_pubkey(user_pk) == [ + assert b.fastquery.get_outputs_by_public_key(user_pk) == [ TransactionLink(blocks[1]['block']['transactions'][0]['id'], 0) ] - assert b.fastquery.get_outputs_by_pubkey(user2_pk) == [ + assert b.fastquery.get_outputs_by_public_key(user2_pk) == [ TransactionLink(blocks[0]['block']['transactions'][0]['id'], 0) ] @@ -76,7 +76,7 @@ def test_filter_spent_outputs(b, user_pk): block = Block([tx4]) b.write_block(block) - outputs = b.fastquery.get_outputs_by_pubkey(user_pk) + outputs = b.fastquery.get_outputs_by_public_key(user_pk) unspents = b.fastquery.filter_spent_outputs(outputs) assert set(unspents) == {