From baeae199516009e14584dbdff16f82f096c8afb9 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 25 Jan 2017 19:05:48 +0100 Subject: [PATCH] Add experimental run interface --- bigchaindb/backend/mongodb/connection.py | 8 + bigchaindb/backend/mongodb/query.py | 235 +++++++++++++---------- bigchaindb/utils.py | 28 +++ tests/test_utils.py | 8 + 4 files changed, 180 insertions(+), 99 deletions(-) diff --git a/bigchaindb/backend/mongodb/connection.py b/bigchaindb/backend/mongodb/connection.py index 19731161..eaa9852e 100644 --- a/bigchaindb/backend/mongodb/connection.py +++ b/bigchaindb/backend/mongodb/connection.py @@ -5,6 +5,7 @@ from pymongo import MongoClient from pymongo import errors import bigchaindb +from bigchaindb.utils import Lazy from bigchaindb.common import exceptions from bigchaindb.backend.connection import Connection @@ -43,6 +44,9 @@ class MongoDBConnection(Connection): def db(self): return self.conn[self.dbname] + def run(self, query): + return query.run(self.db) + def _connect(self): # we should only return a connection if the replica set is # initialized. initialize_replica_set will check if the @@ -60,6 +64,10 @@ class MongoDBConnection(Connection): time.sleep(2**i) +def table(name): + return Lazy()[name] + + def initialize_replica_set(): """Initialize a replica set. If already initialized skip.""" diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index c4e3cdc8..ee7562c1 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -5,10 +5,11 @@ from time import time from pymongo import ReturnDocument from pymongo import errors + from bigchaindb import backend from bigchaindb.common.exceptions import CyclicBlockchainError from bigchaindb.backend.utils import module_dispatch_registrar -from bigchaindb.backend.mongodb.connection import MongoDBConnection +from bigchaindb.backend.mongodb.connection import MongoDBConnection, table register_query = module_dispatch_registrar(backend.query) @@ -17,7 +18,9 @@ register_query = module_dispatch_registrar(backend.query) @register_query(MongoDBConnection) def write_transaction(conn, signed_transaction): try: - return conn.db['backlog'].insert_one(signed_transaction) + return conn.run( + table('backlog') + .insert_one(signed_transaction)) except errors.DuplicateKeyError: return @@ -26,40 +29,49 @@ def write_transaction(conn, signed_transaction): def update_transaction(conn, transaction_id, doc): # with mongodb we need to add update operators to the doc doc = {'$set': doc} - return conn.db['backlog']\ - .find_one_and_update({'id': transaction_id}, - doc, - return_document=ReturnDocument.AFTER) + return conn.run( + table('backlog') + .find_one_and_update( + {'id': transaction_id}, + doc, + return_document=ReturnDocument.AFTER)) @register_query(MongoDBConnection) def delete_transaction(conn, *transaction_id): - return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}}) + return conn.run( + table('backlog') + .delete_many({'id': {'$in': transaction_id}})) @register_query(MongoDBConnection) def get_stale_transactions(conn, reassign_delay): - return conn.db['backlog']\ - .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, - projection={'_id': False}) + return conn.run( + table('backlog') + .find({'assignment_timestamp': {'$lt': time() - reassign_delay}}, + projection={'_id': False})) @register_query(MongoDBConnection) def get_transaction_from_block(conn, transaction_id, block_id): try: - return conn.db['bigchain'].aggregate([ - {'$match': {'id': block_id}}, - {'$project': { - 'block.transactions': { - '$filter': { - 'input': '$block.transactions', - 'as': 'transaction', - 'cond': { - '$eq': ['$$transaction.id', transaction_id] + return conn.run( + table('bigchain') + .aggregate([ + {'$match': {'id': block_id}}, + {'$project': { + 'block.transactions': { + '$filter': { + 'input': '$block.transactions', + 'as': 'transaction', + 'cond': { + '$eq': ['$$transaction.id', transaction_id] + } + } } - } - } - }}]).next()['block']['transactions'].pop() + }}]) + .next()['block']['transactions'] + .pop()) except (StopIteration, IndexError): # StopIteration is raised if the block was not found # IndexError is returned if the block is found but no transactions @@ -69,33 +81,38 @@ def get_transaction_from_block(conn, transaction_id, block_id): @register_query(MongoDBConnection) def get_transaction_from_backlog(conn, transaction_id): - return conn.db['backlog']\ - .find_one({'id': transaction_id}, - projection={'_id': False, 'assignee': False, - 'assignment_timestamp': False}) + return conn.run( + table('backlog') + .find_one({'id': transaction_id}, + projection={'_id': False, + 'assignee': False, + 'assignment_timestamp': False})) @register_query(MongoDBConnection) def get_blocks_status_from_transaction(conn, transaction_id): - return conn.db['bigchain']\ - .find({'block.transactions.id': transaction_id}, - projection=['id', 'block.voters']) + return conn.run( + table('bigchain') + .find({'block.transactions.id': transaction_id}, + projection=['id', 'block.voters'])) @register_query(MongoDBConnection) def get_asset_by_id(conn, asset_id): - cursor = conn.db['bigchain'].aggregate([ - {'$match': { - 'block.transactions.id': asset_id, - 'block.transactions.operation': 'CREATE' - }}, - {'$unwind': '$block.transactions'}, - {'$match': { - 'block.transactions.id': asset_id, - 'block.transactions.operation': 'CREATE' - }}, - {'$project': {'block.transactions.asset': True}} - ]) + cursor = conn.run( + table('bigchain') + .aggregate([ + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.id': asset_id, + 'block.transactions.operation': 'CREATE' + }}, + {'$project': {'block.transactions.asset': True}} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -103,13 +120,14 @@ def get_asset_by_id(conn, asset_id): @register_query(MongoDBConnection) def get_spent(conn, transaction_id, output): - cursor = conn.db['bigchain'].aggregate([ - {'$unwind': '$block.transactions'}, - {'$match': { - 'block.transactions.inputs.fulfills.txid': transaction_id, - 'block.transactions.inputs.fulfills.output': output - }} - ]) + cursor = conn.run( + table('bigchain').aggregate([ + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.inputs.fulfills.txid': transaction_id, + 'block.transactions.inputs.fulfills.output': output + }} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -117,14 +135,16 @@ def get_spent(conn, transaction_id, output): @register_query(MongoDBConnection) def get_owned_ids(conn, owner): - cursor = conn.db['bigchain'].aggregate([ - {'$unwind': '$block.transactions'}, - {'$match': { - 'block.transactions.outputs.public_keys': { - '$elemMatch': {'$eq': owner} - } - }} - ]) + cursor = conn.run( + table('bigchain') + .aggregate([ + {'$unwind': '$block.transactions'}, + {'$match': { + 'block.transactions.outputs.public_keys': { + '$elemMatch': {'$eq': owner} + } + }} + ])) # we need to access some nested fields before returning so lets use a # generator to avoid having to read all records on the cursor at this point return (elem['block']['transactions'] for elem in cursor) @@ -132,66 +152,80 @@ def get_owned_ids(conn, owner): @register_query(MongoDBConnection) def get_votes_by_block_id(conn, block_id): - return conn.db['votes']\ - .find({'vote.voting_for_block': block_id}, - projection={'_id': False}) + return conn.run( + table('votes') + .find({'vote.voting_for_block': block_id}, + projection={'_id': False})) @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): - return conn.db['votes']\ - .find({'vote.voting_for_block': block_id, - 'node_pubkey': node_pubkey}, - projection={'_id': False}) + return conn.run( + table('votes') + .find({'vote.voting_for_block': block_id, + 'node_pubkey': node_pubkey}, + projection={'_id': False})) @register_query(MongoDBConnection) def write_block(conn, block): - return conn.db['bigchain'].insert_one(block.to_dict()) + return conn.run( + table('bigchain') + .insert_one(block.to_dict())) @register_query(MongoDBConnection) def get_block(conn, block_id): - return conn.db['bigchain'].find_one({'id': block_id}, - projection={'_id': False}) + return conn.run( + table('bigchain') + .find_one({'id': block_id}, + projection={'_id': False})) @register_query(MongoDBConnection) def has_transaction(conn, transaction_id): - return bool(conn.db['bigchain'] - .find_one({'block.transactions.id': transaction_id})) + return bool(conn.run( + table('bigchain') + .find_one({'block.transactions.id': transaction_id}))) @register_query(MongoDBConnection) def count_blocks(conn): - return conn.db['bigchain'].count() + return conn.run( + table('bigchain') + .count()) @register_query(MongoDBConnection) def count_backlog(conn): - return conn.db['backlog'].count() + return conn.run( + table('backlog') + .count()) @register_query(MongoDBConnection) def write_vote(conn, vote): - conn.db['votes'].insert_one(vote) + conn.run(table('votes').insert_one(vote)) vote.pop('_id') return vote @register_query(MongoDBConnection) def get_genesis_block(conn): - return conn.db['bigchain'].find_one( - {'block.transactions.0.operation': 'GENESIS'}, - {'_id': False} - ) + return conn.run( + table('bigchain') + .find_one( + {'block.transactions.0.operation': 'GENESIS'}, + {'_id': False} + )) @register_query(MongoDBConnection) def get_last_voted_block(conn, node_pubkey): - last_voted = conn.db['votes']\ - .find({'node_pubkey': node_pubkey}, - sort=[('vote.timestamp', -1)]) + last_voted = conn.run( + table('votes') + .find({'node_pubkey': node_pubkey}, + sort=[('vote.timestamp', -1)])) # pymongo seems to return a cursor even if there are no results # so we actually need to check the count @@ -219,21 +253,22 @@ def get_last_voted_block(conn, node_pubkey): @register_query(MongoDBConnection) def get_unvoted_blocks(conn, node_pubkey): - return conn.db['bigchain'].aggregate([ - {'$lookup': { - 'from': 'votes', - 'localField': 'id', - 'foreignField': 'vote.voting_for_block', - 'as': 'votes' - }}, - {'$match': { - 'votes.node_pubkey': {'$ne': node_pubkey}, - 'block.transactions.operation': {'$ne': 'GENESIS'} - }}, - {'$project': { - 'votes': False, '_id': False - }} - ]) + return conn.run( + table('bigchain').aggregate([ + {'$lookup': { + 'from': 'votes', + 'localField': 'id', + 'foreignField': 'vote.voting_for_block', + 'as': 'votes' + }}, + {'$match': { + 'votes.node_pubkey': {'$ne': node_pubkey}, + 'block.transactions.operation': {'$ne': 'GENESIS'} + }}, + {'$project': { + 'votes': False, '_id': False + }} + ])) @register_query(MongoDBConnection) @@ -243,10 +278,12 @@ def get_txids_filtered(conn, asset_id, operation=None): if operation: match['block.transactions.operation'] = operation - cursor = conn.db['bigchain'].aggregate([ - {'$match': match}, - {'$unwind': '$block.transactions'}, - {'$match': match}, - {'$project': {'block.transactions.id': True}} - ]) + cursor = conn.run( + table('bigchain') + .aggregate([ + {'$match': match}, + {'$unwind': '$block.transactions'}, + {'$match': match}, + {'$project': {'block.transactions.id': True}} + ])) return (r['block']['transactions']['id'] for r in cursor) diff --git a/bigchaindb/utils.py b/bigchaindb/utils.py index 434c6376..e89c3b6f 100644 --- a/bigchaindb/utils.py +++ b/bigchaindb/utils.py @@ -157,3 +157,31 @@ def is_genesis_block(block): return block.transactions[0].operation == 'GENESIS' except AttributeError: return block['block']['transactions'][0]['operation'] == 'GENESIS' + + +class Lazy: + + def __init__(self): + self.stack = [] + + def __getattr__(self, name): + self.stack.append(name) + return self + + def __call__(self, *args, **kwargs): + self.stack.append((args, kwargs)) + return self + + def __getitem__(self, key): + self.stack.append('__getitem__') + self.stack.append(([key], {})) + return self + + def run(self, instance): + last = instance + + for method, (args, kwargs) in zip(self.stack[::2], self.stack[1::2]): + last = getattr(last, method)(*args, **kwargs) + + self.stack = [] + return last diff --git a/tests/test_utils.py b/tests/test_utils.py index f76ba6d9..47343ace 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -137,3 +137,11 @@ def test_is_genesis_block_returns_true_if_genesis(b): from bigchaindb.utils import is_genesis_block genesis_block = b.prepare_genesis_block() assert is_genesis_block(genesis_block) + + +def test_lazy_execution(): + from bigchaindb.utils import Lazy + l = Lazy() + l.split(',')[1].split(' ').pop(1).strip() + result = l.run('Like humans, cats tend to favor one paw over another') + assert result == 'cats'