Updated query.py with tarantool queries format

This commit is contained in:
andrei 2022-02-09 15:52:26 +02:00
parent 4a36e5b0da
commit 29069f380c

View File

@ -41,10 +41,10 @@ def get_transactions(conn, transaction_ids):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_metadatas(conn, metadata): def store_metadatas(metadata, connection):
return conn.run( space = connection.space("meta_data")
conn.collection('metadata') for meta in metadata:
.insert_many(metadata, ordered=False)) space.insert((meta["id"], meta))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -56,89 +56,122 @@ def get_metadata(conn, transaction_ids):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_asset(conn, asset): def store_asset(asset, connection):
try: space = connection.space("assets")
return conn.run( unique = token_hex(8)
conn.collection('assets') space.insert((asset["id"], unique, asset["data"]))
.insert_one(asset))
except DuplicateKeyError:
pass
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_assets(conn, assets): def store_assets(assets, connection):
return conn.run( space = connection.space("assets")
conn.collection('assets') for asset in assets:
.insert_many(assets, ordered=False)) unique = token_hex(8)
space.insert((asset["id"], unique, asset["data"]))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_asset(conn, asset_id): def get_asset(asset_id: str, space):
try: _data = space.select(asset_id, index="assetid_search")
return conn.run( _data = _data.data[0]
conn.collection('assets') return {"data": _data[1]}
.find_one({'id': asset_id}, {'_id': 0, 'id': 0}))
except IndexError:
pass
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_assets(conn, asset_ids): def get_assets(assets_ids: list, space):
return conn.run( _returned_data = []
conn.collection('assets') for _id in assets_ids:
.find({'id': {'$in': asset_ids}}, asset = space.select(_id, index="assetid_search")
projection={'_id': False})) asset = asset.data[0]
_returned_data.append({"id": asset[0], "data": asset[1]})
return _returned_data
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_spent(conn, transaction_id, output): def get_spent(fullfil_transaction_id: str, fullfil_output_index: str, connection):
query = {'inputs': _transaction_object = formats.transactions.copy()
{'$elemMatch': _transaction_object["inputs"] = []
{'$and': [{'fulfills.transaction_id': transaction_id}, _transaction_object["outputs"] = []
{'fulfills.output_index': output}]}}} space = connection.space("inputs")
_inputs = space.select([fullfil_transaction_id, fullfil_output_index], index="spent_search")
return conn.run( _inputs = _inputs.data
conn.collection('transactions') _transaction_object["id"] = _inputs[0][0]
.find(query, {'_id': 0})) _transaction_object["inputs"] = [
{
"owners_before": _in[2],
"fulfills": {"transaction_id": _in[3], "output_index": _in[4]},
"fulfillment": _in[1]
} for _in in _inputs
]
space = connection.space("outputs")
_outputs = space.select(_transaction_object["id"], index="id_search")
_outputs = _outputs.data
_transaction_object["outputs"] = [
{
"public_keys": _out[5],
"amount": _out[1],
"condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]}
} for _out in _outputs
]
return _transaction_object
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_latest_block(conn): def latest_block(connection): # TODO Here is used DESCENDING OPERATOR
return conn.run( space = connection.space("blocks")
conn.collection('blocks') _all_blocks = space.select()
.find_one(projection={'_id': False}, _all_blocks = _all_blocks.data
sort=[('height', DESCENDING)])) _block = sorted(_all_blocks, key=itemgetter(1))[0]
space = connection.space("blocks_tx")
_txids = space.select(_block[2], index="block_search")
_txids = _txids.data
return {"app_hash": _block[1], "height": _block[1], "transactions": [tx[0] for tx in _txids]}
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_block(conn, block): def store_block(block, connection):
try: space = connection.space("blocks")
return conn.run( block_unique_id = token_hex(8)
conn.collection('blocks') space.insert((block["app_hash"],
.insert_one(block)) block["height"],
except DuplicateKeyError: block_unique_id))
pass space = connection.space("blocks_tx")
for txid in block["transactions"]:
space.insert((txid, block_unique_id))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_txids_filtered(conn, asset_id, operation=None, last_tx=None): def get_txids_filtered(connection, asset_id, operation=None, last_tx=None): # TODO here is used 'OR' operator
_transaction_object = formats.transactions.copy()
_transaction_object["inputs"] = []
_transaction_object["outputs"] = []
match = { actions = {
Transaction.CREATE: {'operation': 'CREATE', 'id': asset_id}, "CREATE": {"sets": ["CREATE", asset_id], "index": "transaction_search"},
Transaction.TRANSFER: {'operation': 'TRANSFER', 'asset.id': asset_id}, # 1 - operation, 2 - id (only in transactions) +
None: {'$or': [{'asset.id': asset_id}, {'id': asset_id}]}, "TRANSFER": {"sets": ["TRANSFER", asset_id], "index": "asset_search"},
# 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR
None: {"sets": [asset_id, asset_id], "index": "both_search"}
}[operation] }[operation]
space = connection.space("transactions")
cursor = conn.run(conn.collection('transactions').find(match)) if actions["sets"][0] == "CREATE":
_transactions = space.select([operation, asset_id], index=actions["index"])
_transactions = _transactions.data
elif actions["sets"][0] == "TRANSFER":
_transactions = space.select([operation, asset_id], index=actions["index"])
_transactions = _transactions.data
else:
_transactions = space.select([asset_id, asset_id], index=actions["index"])
_transactions = _transactions.data
if last_tx: if last_tx:
cursor = cursor.sort([('$natural', DESCENDING)]).limit(1) _transactions = [_transactions[0]]
return (elem['id'] for elem in cursor) return tuple([elem[0] for elem in _transactions])
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def text_search(conn, search, *, language='english', case_sensitive=False, def text_search(conn, search, *, language='english', case_sensitive=False, # TODO review text search in tarantool (maybe, remove)
diacritic_sensitive=False, text_score=False, limit=0, table='assets'): diacritic_sensitive=False, text_score=False, limit=0, table='assets'):
cursor = conn.run( cursor = conn.run(
conn.collection(table) conn.collection(table)
@ -163,46 +196,100 @@ def _remove_text_score(asset):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_owned_ids(conn, owner): def get_owned_ids(connection, owner): # TODO implement 'group_transactions_by_id'
cursor = conn.run( space = connection.space("keys")
conn.collection('transactions').aggregate([ _keys = space.select(owner, index="keys_search")
{'$match': {'outputs.public_keys': owner}},
{'$project': {'_id': False}} _outputid = _keys[0][1]
])) _transactionid = _keys[0][0]
return cursor
_transaction_object = formats.transactions.copy()
_transaction_object["inputs"] = []
_transaction_object["outputs"] = []
_transactions = []
_all_keys = space.select(_outputid, index="id_search")
_all_keys = _all_keys.data
space = connection.space("transactions")
_all_transactions = space.select(_transactionid, index="id_search")
_all_transactions = _all_transactions.data
space = connection.space("inputs")
_all_inputs = space.select(_transactionid, index="id_search")
_all_inputs = _all_inputs.data
space = connection.space("outputs")
_all_outputs = space.select(_transactionid, index="id_search")
_all_outputs = _all_outputs.data
for tsobject in _all_transactions:
local_ts = _transaction_object.copy()
local_ts["id"] = tsobject[0]
local_ts["operation"] = tsobject[1]
local_ts["version"] = tsobject[2]
for _in in _all_inputs:
if _in[0] == tsobject[0]:
local_ts["inputs"].append(
{
"owners_before": _in[2],
"fulffils": {"transaction_id": _in[3], "output_index": _in[4]},
"fulffilment": _in[1]
}
)
for _out in _all_outputs:
if _out[0] == tsobject[0]:
local_ts["outputs"].append(
{
"public_keys": [_key[2] for _key in _all_keys if _out[5] == _key[1]],
"condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]},
"amount": _out[1]
}
)
_transactions.append(local_ts)
return _transactions
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_spending_transactions(conn, inputs): def get_spending_transactions(inputs, connection):
transaction_ids = [i['transaction_id'] for i in inputs] transaction_ids = [i['transaction_id'] for i in inputs]
output_indexes = [i['output_index'] for i in inputs] output_indexes = [i['output_index'] for i in inputs]
query = {'inputs':
{'$elemMatch':
{'$and':
[
{'fulfills.transaction_id': {'$in': transaction_ids}},
{'fulfills.output_index': {'$in': output_indexes}}
]}}}
cursor = conn.run( _transactions = []
conn.collection('transactions').find(query, {'_id': False}))
return cursor for i in range(0, len(transaction_ids)):
ts_id = transaction_ids[i]
ot_id = output_indexes[i]
_trans_object = get_spent(fullfil_transaction_id=ts_id, fullfil_output_index=ot_id, connection=connection)
_transactions.append(_trans_object)
return _transactions
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_block(conn, block_id): def get_block(block_id, connection):
return conn.run( space = connection.space("blocks")
conn.collection('blocks') _block = space.select(block_id, index="block_search", limit=1)
.find_one({'height': block_id}, _block = _block.data[0]
projection={'_id': False})) _txblock = space.select(_block[2], index="block_search")
_txblock = _txblock.data
return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]}
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_block_with_transaction(conn, txid): def get_block_with_transaction(txid, connection):
return conn.run( space = connection.space("blocks_tx")
conn.collection('blocks') _all_blocks_tx = space.select(txid, index="id_search")
.find({'transactions': txid}, _all_blocks_tx = _all_blocks_tx.data
projection={'_id': False, 'height': True})) space = connection.space("blocks")
_block = space.select(_all_blocks_tx[0][1], index="block_id_search")
_block = _block.data[0]
return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _all_blocks_tx]}
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -251,11 +338,15 @@ def get_unspent_outputs(conn, *, query=None):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_pre_commit_state(conn, state): def store_pre_commit_state(state, connection):
return conn.run( space = connection.space("pre_commits")
conn.collection('pre_commit') _precommit = space.select(state["height"], index="height_search", limit=1)
.replace_one({}, state, upsert=True) unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0]
) space.upsert((unique_id, state["height"], state["transactions"]),
op_list=[('=', 0, unique_id),
('=', 1, state["height"]),
('=', 2, state["transactions"])],
limit=1)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -264,15 +355,15 @@ def get_pre_commit_state(conn):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_validator_set(conn, validators_update): def store_validator_set(validators_update, connection):
height = validators_update['height'] space = connection.space("validators")
return conn.run( _validator = space.select(validators_update["height"], index="height_search", limit=1)
conn.collection('validators').replace_one( unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0]
{'height': height}, space.upsert((unique_id, validators_update["height"], validators_update["validators"]),
validators_update, op_list=[('=', 0, unique_id),
upsert=True ('=', 1, validators_update["height"]),
) ('=', 2, validators_update["validators"])],
) limit=1)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -283,24 +374,22 @@ def delete_validator_set(conn, height):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_election(conn, election_id, height, is_concluded): def store_election(election_id, height, is_concluded, connection):
return conn.run( space = connection.space("elections")
conn.collection('elections').replace_one( space.upsert((election_id, height, is_concluded),
{'election_id': election_id, op_list=[('=', 0, election_id),
'height': height}, ('=', 1, height),
{'election_id': election_id, ('=', 2, is_concluded)],
'height': height, limit=1)
'is_concluded': is_concluded},
upsert=True,
)
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_elections(conn, elections): def store_elections(elections, connection):
return conn.run( space = connection.space("elections")
conn.collection('elections').insert_many(elections) for election in elections:
) _election = space.insert((election["election_id"],
election["height"],
election["is_concluded"]))
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -311,55 +400,46 @@ def delete_elections(conn, height):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_validator_set(conn, height=None): def get_validator_set(connection, height=None):
query = {} space = connection.space("validators")
_validators = space.select()
_validators = _validators.data
if height is not None: if height is not None:
query = {'height': {'$lte': height}} _validators = [validator for validator in _validators if validator[1] <= height]
return next(iter(sorted(_validators, key=itemgetter(1))), None)
cursor = conn.run( return next(iter(sorted(_validators, key=itemgetter(1))), None)
conn.collection('validators')
.find(query, projection={'_id': False})
.sort([('height', DESCENDING)])
.limit(1)
)
return next(cursor, None)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_election(conn, election_id): def get_election(election_id, connection):
query = {'election_id': election_id} space = connection.space("elections")
_elections = space.select(election_id, index="id_search")
return conn.run( _elections = _elections.data
conn.collection('elections') _election = sorted(_elections, key=itemgetter(0))[0]
.find_one(query, projection={'_id': False}, return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]}
sort=[('height', DESCENDING)])
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_asset_tokens_for_public_key(conn, asset_id, public_key): def get_asset_tokens_for_public_key(connection, asset_id, public_key):
query = {'outputs.public_keys': [public_key], space = connection.space("keys")
'asset.id': asset_id} _keys = space.select([public_key], index="keys_search")
space = connection.space("transactions")
cursor = conn.run( _transactions = space.select([asset_id], index="only_asset_search")
conn.collection('transactions').aggregate([ _transactions = _transactions.data
{'$match': query}, _keys = _keys.data
{'$project': {'_id': False}} _grouped_transactions = group_transaction_by_ids(connection=connection, txids=[_tx[0] for _tx in _transactions])
])) return _grouped_transactions
return cursor
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def store_abci_chain(conn, height, chain_id, is_synced=True): def store_abci_chain(height, chain_id, connection, is_synced=True):
return conn.run( space = connection.space("abci_chains")
conn.collection('abci_chains').replace_one( space.upsert((height, chain_id, is_synced),
{'height': height}, op_list=[('=', 0, height),
{'height': height, 'chain_id': chain_id, ('=', 1, chain_id),
'is_synced': is_synced}, ('=', 2, is_synced)],
upsert=True, limit=1)
)
)
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
@ -370,8 +450,8 @@ def delete_abci_chain(conn, height):
@register_query(LocalMongoDBConnection) @register_query(LocalMongoDBConnection)
def get_latest_abci_chain(conn): def get_latest_abci_chain(connection):
return conn.run( space = connection.space("abci_chains")
conn.collection('abci_chains') _all_chains = space.select()
.find_one(projection={'_id': False}, sort=[('height', DESCENDING)]) _chain = sorted(_all_chains.data, key=itemgetter(0))[0]
) return {"height": _chain[0], "is_synced": _chain[1], "chain_id": _chain[2]}