Add experimental run interface

This commit is contained in:
vrde 2017-01-25 19:05:48 +01:00
parent cd7d65b63e
commit baeae19951
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
4 changed files with 180 additions and 99 deletions

View File

@ -5,6 +5,7 @@ from pymongo import MongoClient
from pymongo import errors
import bigchaindb
from bigchaindb.utils import Lazy
from bigchaindb.common import exceptions
from bigchaindb.backend.connection import Connection
@ -43,6 +44,9 @@ class MongoDBConnection(Connection):
def db(self):
return self.conn[self.dbname]
def run(self, query):
return query.run(self.db)
def _connect(self):
# we should only return a connection if the replica set is
# initialized. initialize_replica_set will check if the
@ -60,6 +64,10 @@ class MongoDBConnection(Connection):
time.sleep(2**i)
def table(name):
return Lazy()[name]
def initialize_replica_set():
"""Initialize a replica set. If already initialized skip."""

View File

@ -5,10 +5,11 @@ from time import time
from pymongo import ReturnDocument
from pymongo import errors
from bigchaindb import backend
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.mongodb.connection import MongoDBConnection
from bigchaindb.backend.mongodb.connection import MongoDBConnection, table
register_query = module_dispatch_registrar(backend.query)
@ -17,7 +18,9 @@ register_query = module_dispatch_registrar(backend.query)
@register_query(MongoDBConnection)
def write_transaction(conn, signed_transaction):
try:
return conn.db['backlog'].insert_one(signed_transaction)
return conn.run(
table('backlog')
.insert_one(signed_transaction))
except errors.DuplicateKeyError:
return
@ -26,28 +29,35 @@ def write_transaction(conn, signed_transaction):
def update_transaction(conn, transaction_id, doc):
# with mongodb we need to add update operators to the doc
doc = {'$set': doc}
return conn.db['backlog']\
.find_one_and_update({'id': transaction_id},
return conn.run(
table('backlog')
.find_one_and_update(
{'id': transaction_id},
doc,
return_document=ReturnDocument.AFTER)
return_document=ReturnDocument.AFTER))
@register_query(MongoDBConnection)
def delete_transaction(conn, *transaction_id):
return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}})
return conn.run(
table('backlog')
.delete_many({'id': {'$in': transaction_id}}))
@register_query(MongoDBConnection)
def get_stale_transactions(conn, reassign_delay):
return conn.db['backlog']\
return conn.run(
table('backlog')
.find({'assignment_timestamp': {'$lt': time() - reassign_delay}},
projection={'_id': False})
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_transaction_from_block(conn, transaction_id, block_id):
try:
return conn.db['bigchain'].aggregate([
return conn.run(
table('bigchain')
.aggregate([
{'$match': {'id': block_id}},
{'$project': {
'block.transactions': {
@ -59,7 +69,9 @@ def get_transaction_from_block(conn, transaction_id, block_id):
}
}
}
}}]).next()['block']['transactions'].pop()
}}])
.next()['block']['transactions']
.pop())
except (StopIteration, IndexError):
# StopIteration is raised if the block was not found
# IndexError is returned if the block is found but no transactions
@ -69,22 +81,27 @@ def get_transaction_from_block(conn, transaction_id, block_id):
@register_query(MongoDBConnection)
def get_transaction_from_backlog(conn, transaction_id):
return conn.db['backlog']\
return conn.run(
table('backlog')
.find_one({'id': transaction_id},
projection={'_id': False, 'assignee': False,
'assignment_timestamp': False})
projection={'_id': False,
'assignee': False,
'assignment_timestamp': False}))
@register_query(MongoDBConnection)
def get_blocks_status_from_transaction(conn, transaction_id):
return conn.db['bigchain']\
return conn.run(
table('bigchain')
.find({'block.transactions.id': transaction_id},
projection=['id', 'block.voters'])
projection=['id', 'block.voters']))
@register_query(MongoDBConnection)
def get_asset_by_id(conn, asset_id):
cursor = conn.db['bigchain'].aggregate([
cursor = conn.run(
table('bigchain')
.aggregate([
{'$match': {
'block.transactions.id': asset_id,
'block.transactions.operation': 'CREATE'
@ -95,7 +112,7 @@ def get_asset_by_id(conn, asset_id):
'block.transactions.operation': 'CREATE'
}},
{'$project': {'block.transactions.asset': True}}
])
]))
# we need to access some nested fields before returning so lets use a
# generator to avoid having to read all records on the cursor at this point
return (elem['block']['transactions'] for elem in cursor)
@ -103,13 +120,14 @@ def get_asset_by_id(conn, asset_id):
@register_query(MongoDBConnection)
def get_spent(conn, transaction_id, output):
cursor = conn.db['bigchain'].aggregate([
cursor = conn.run(
table('bigchain').aggregate([
{'$unwind': '$block.transactions'},
{'$match': {
'block.transactions.inputs.fulfills.txid': transaction_id,
'block.transactions.inputs.fulfills.output': output
}}
])
]))
# we need to access some nested fields before returning so lets use a
# generator to avoid having to read all records on the cursor at this point
return (elem['block']['transactions'] for elem in cursor)
@ -117,14 +135,16 @@ def get_spent(conn, transaction_id, output):
@register_query(MongoDBConnection)
def get_owned_ids(conn, owner):
cursor = conn.db['bigchain'].aggregate([
cursor = conn.run(
table('bigchain')
.aggregate([
{'$unwind': '$block.transactions'},
{'$match': {
'block.transactions.outputs.public_keys': {
'$elemMatch': {'$eq': owner}
}
}}
])
]))
# we need to access some nested fields before returning so lets use a
# generator to avoid having to read all records on the cursor at this point
return (elem['block']['transactions'] for elem in cursor)
@ -132,66 +152,80 @@ def get_owned_ids(conn, owner):
@register_query(MongoDBConnection)
def get_votes_by_block_id(conn, block_id):
return conn.db['votes']\
return conn.run(
table('votes')
.find({'vote.voting_for_block': block_id},
projection={'_id': False})
projection={'_id': False}))
@register_query(MongoDBConnection)
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
return conn.db['votes']\
return conn.run(
table('votes')
.find({'vote.voting_for_block': block_id,
'node_pubkey': node_pubkey},
projection={'_id': False})
projection={'_id': False}))
@register_query(MongoDBConnection)
def write_block(conn, block):
return conn.db['bigchain'].insert_one(block.to_dict())
return conn.run(
table('bigchain')
.insert_one(block.to_dict()))
@register_query(MongoDBConnection)
def get_block(conn, block_id):
return conn.db['bigchain'].find_one({'id': block_id},
projection={'_id': False})
return conn.run(
table('bigchain')
.find_one({'id': block_id},
projection={'_id': False}))
@register_query(MongoDBConnection)
def has_transaction(conn, transaction_id):
return bool(conn.db['bigchain']
.find_one({'block.transactions.id': transaction_id}))
return bool(conn.run(
table('bigchain')
.find_one({'block.transactions.id': transaction_id})))
@register_query(MongoDBConnection)
def count_blocks(conn):
return conn.db['bigchain'].count()
return conn.run(
table('bigchain')
.count())
@register_query(MongoDBConnection)
def count_backlog(conn):
return conn.db['backlog'].count()
return conn.run(
table('backlog')
.count())
@register_query(MongoDBConnection)
def write_vote(conn, vote):
conn.db['votes'].insert_one(vote)
conn.run(table('votes').insert_one(vote))
vote.pop('_id')
return vote
@register_query(MongoDBConnection)
def get_genesis_block(conn):
return conn.db['bigchain'].find_one(
return conn.run(
table('bigchain')
.find_one(
{'block.transactions.0.operation': 'GENESIS'},
{'_id': False}
)
))
@register_query(MongoDBConnection)
def get_last_voted_block(conn, node_pubkey):
last_voted = conn.db['votes']\
last_voted = conn.run(
table('votes')
.find({'node_pubkey': node_pubkey},
sort=[('vote.timestamp', -1)])
sort=[('vote.timestamp', -1)]))
# pymongo seems to return a cursor even if there are no results
# so we actually need to check the count
@ -219,7 +253,8 @@ def get_last_voted_block(conn, node_pubkey):
@register_query(MongoDBConnection)
def get_unvoted_blocks(conn, node_pubkey):
return conn.db['bigchain'].aggregate([
return conn.run(
table('bigchain').aggregate([
{'$lookup': {
'from': 'votes',
'localField': 'id',
@ -233,7 +268,7 @@ def get_unvoted_blocks(conn, node_pubkey):
{'$project': {
'votes': False, '_id': False
}}
])
]))
@register_query(MongoDBConnection)
@ -243,10 +278,12 @@ def get_txids_filtered(conn, asset_id, operation=None):
if operation:
match['block.transactions.operation'] = operation
cursor = conn.db['bigchain'].aggregate([
cursor = conn.run(
table('bigchain')
.aggregate([
{'$match': match},
{'$unwind': '$block.transactions'},
{'$match': match},
{'$project': {'block.transactions.id': True}}
])
]))
return (r['block']['transactions']['id'] for r in cursor)

View File

@ -157,3 +157,31 @@ def is_genesis_block(block):
return block.transactions[0].operation == 'GENESIS'
except AttributeError:
return block['block']['transactions'][0]['operation'] == 'GENESIS'
class Lazy:
def __init__(self):
self.stack = []
def __getattr__(self, name):
self.stack.append(name)
return self
def __call__(self, *args, **kwargs):
self.stack.append((args, kwargs))
return self
def __getitem__(self, key):
self.stack.append('__getitem__')
self.stack.append(([key], {}))
return self
def run(self, instance):
last = instance
for method, (args, kwargs) in zip(self.stack[::2], self.stack[1::2]):
last = getattr(last, method)(*args, **kwargs)
self.stack = []
return last

View File

@ -137,3 +137,11 @@ def test_is_genesis_block_returns_true_if_genesis(b):
from bigchaindb.utils import is_genesis_block
genesis_block = b.prepare_genesis_block()
assert is_genesis_block(genesis_block)
def test_lazy_execution():
from bigchaindb.utils import Lazy
l = Lazy()
l.split(',')[1].split(' ').pop(1).strip()
result = l.run('Like humans, cats tend to favor one paw over another')
assert result == 'cats'