From 1c444cda790b8253d5d44435c81743f83b7b9a32 Mon Sep 17 00:00:00 2001 From: andrei Date: Wed, 9 Feb 2022 17:16:31 +0200 Subject: [PATCH] restored back query.py (mongodb), and query.py(tarantool) move to folder tarantool --- planetmint/backend/localmongodb/query.py | 408 ++++++++------------ planetmint/backend/tarantool/query.py | 457 +++++++++++++++++++++++ 2 files changed, 621 insertions(+), 244 deletions(-) create mode 100644 planetmint/backend/tarantool/query.py diff --git a/planetmint/backend/localmongodb/query.py b/planetmint/backend/localmongodb/query.py index bdf5152..217a775 100644 --- a/planetmint/backend/localmongodb/query.py +++ b/planetmint/backend/localmongodb/query.py @@ -1,5 +1,5 @@ # Copyright © 2020 Interplanetary Database Association e.V., -# Planetmint and IPDB software contributors. +# BigchainDB and IPDB software contributors. # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # Code is Apache-2.0 and docs are CC-BY-4.0 @@ -7,11 +7,11 @@ from pymongo import DESCENDING -from planetmint import backend -from planetmint.backend.exceptions import DuplicateKeyError -from planetmint.backend.utils import module_dispatch_registrar -from planetmint.backend.localmongodb.connection import LocalMongoDBConnection -from planetmint.common.transaction import Transaction +from bigchaindb import backend +from bigchaindb.backend.exceptions import DuplicateKeyError +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection +from bigchaindb.common.transaction import Transaction register_query = module_dispatch_registrar(backend.query) @@ -41,10 +41,10 @@ def get_transactions(conn, transaction_ids): @register_query(LocalMongoDBConnection) -def store_metadatas(metadata, connection): - space = connection.space("meta_data") - for meta in metadata: - space.insert((meta["id"], meta)) +def store_metadatas(conn, metadata): + return conn.run( + conn.collection('metadata') + .insert_many(metadata, ordered=False)) @register_query(LocalMongoDBConnection) @@ -56,122 +56,89 @@ def get_metadata(conn, transaction_ids): @register_query(LocalMongoDBConnection) -def store_asset(asset, connection): - space = connection.space("assets") - unique = token_hex(8) - space.insert((asset["id"], unique, asset["data"])) +def store_asset(conn, asset): + try: + return conn.run( + conn.collection('assets') + .insert_one(asset)) + except DuplicateKeyError: + pass @register_query(LocalMongoDBConnection) -def store_assets(assets, connection): - space = connection.space("assets") - for asset in assets: - unique = token_hex(8) - space.insert((asset["id"], unique, asset["data"])) +def store_assets(conn, assets): + return conn.run( + conn.collection('assets') + .insert_many(assets, ordered=False)) @register_query(LocalMongoDBConnection) -def get_asset(asset_id: str, space): - _data = space.select(asset_id, index="assetid_search") - _data = _data.data[0] - return {"data": _data[1]} +def get_asset(conn, asset_id): + try: + return conn.run( + conn.collection('assets') + .find_one({'id': asset_id}, {'_id': 0, 'id': 0})) + except IndexError: + pass @register_query(LocalMongoDBConnection) -def get_assets(assets_ids: list, space): - _returned_data = [] - for _id in assets_ids: - asset = space.select(_id, index="assetid_search") - asset = asset.data[0] - _returned_data.append({"id": asset[0], "data": asset[1]}) - return _returned_data +def get_assets(conn, asset_ids): + return conn.run( + conn.collection('assets') + .find({'id': {'$in': asset_ids}}, + projection={'_id': False})) @register_query(LocalMongoDBConnection) -def get_spent(fullfil_transaction_id: str, fullfil_output_index: str, connection): - _transaction_object = formats.transactions.copy() - _transaction_object["inputs"] = [] - _transaction_object["outputs"] = [] - space = connection.space("inputs") - _inputs = space.select([fullfil_transaction_id, fullfil_output_index], index="spent_search") - _inputs = _inputs.data - _transaction_object["id"] = _inputs[0][0] - _transaction_object["inputs"] = [ - { - "owners_before": _in[2], - "fulfills": {"transaction_id": _in[3], "output_index": _in[4]}, - "fulfillment": _in[1] - } for _in in _inputs - ] - space = connection.space("outputs") - _outputs = space.select(_transaction_object["id"], index="id_search") - _outputs = _outputs.data - _transaction_object["outputs"] = [ - { - "public_keys": _out[5], - "amount": _out[1], - "condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]} - } for _out in _outputs - ] - return _transaction_object +def get_spent(conn, transaction_id, output): + query = {'inputs': + {'$elemMatch': + {'$and': [{'fulfills.transaction_id': transaction_id}, + {'fulfills.output_index': output}]}}} + + return conn.run( + conn.collection('transactions') + .find(query, {'_id': 0})) @register_query(LocalMongoDBConnection) -def latest_block(connection): # TODO Here is used DESCENDING OPERATOR - space = connection.space("blocks") - _all_blocks = space.select() - _all_blocks = _all_blocks.data - _block = sorted(_all_blocks, key=itemgetter(1))[0] - space = connection.space("blocks_tx") - _txids = space.select(_block[2], index="block_search") - _txids = _txids.data - return {"app_hash": _block[1], "height": _block[1], "transactions": [tx[0] for tx in _txids]} +def get_latest_block(conn): + return conn.run( + conn.collection('blocks') + .find_one(projection={'_id': False}, + sort=[('height', DESCENDING)])) @register_query(LocalMongoDBConnection) -def store_block(block, connection): - space = connection.space("blocks") - block_unique_id = token_hex(8) - space.insert((block["app_hash"], - block["height"], - block_unique_id)) - space = connection.space("blocks_tx") - for txid in block["transactions"]: - space.insert((txid, block_unique_id)) +def store_block(conn, block): + try: + return conn.run( + conn.collection('blocks') + .insert_one(block)) + except DuplicateKeyError: + pass @register_query(LocalMongoDBConnection) -def get_txids_filtered(connection, asset_id, operation=None, last_tx=None): # TODO here is used 'OR' operator - _transaction_object = formats.transactions.copy() - _transaction_object["inputs"] = [] - _transaction_object["outputs"] = [] +def get_txids_filtered(conn, asset_id, operation=None, last_tx=None): - actions = { - "CREATE": {"sets": ["CREATE", asset_id], "index": "transaction_search"}, - # 1 - operation, 2 - id (only in transactions) + - "TRANSFER": {"sets": ["TRANSFER", asset_id], "index": "asset_search"}, - # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR - None: {"sets": [asset_id, asset_id], "index": "both_search"} + match = { + Transaction.CREATE: {'operation': 'CREATE', 'id': asset_id}, + Transaction.TRANSFER: {'operation': 'TRANSFER', 'asset.id': asset_id}, + None: {'$or': [{'asset.id': asset_id}, {'id': asset_id}]}, }[operation] - space = connection.space("transactions") - if actions["sets"][0] == "CREATE": - _transactions = space.select([operation, asset_id], index=actions["index"]) - _transactions = _transactions.data - elif actions["sets"][0] == "TRANSFER": - _transactions = space.select([operation, asset_id], index=actions["index"]) - _transactions = _transactions.data - else: - _transactions = space.select([asset_id, asset_id], index=actions["index"]) - _transactions = _transactions.data + + cursor = conn.run(conn.collection('transactions').find(match)) if last_tx: - _transactions = [_transactions[0]] + cursor = cursor.sort([('$natural', DESCENDING)]).limit(1) - return tuple([elem[0] for elem in _transactions]) + return (elem['id'] for elem in cursor) @register_query(LocalMongoDBConnection) -def text_search(conn, search, *, language='english', case_sensitive=False, # TODO review text search in tarantool (maybe, remove) +def text_search(conn, search, *, language='english', case_sensitive=False, diacritic_sensitive=False, text_score=False, limit=0, table='assets'): cursor = conn.run( conn.collection(table) @@ -196,100 +163,46 @@ def _remove_text_score(asset): @register_query(LocalMongoDBConnection) -def get_owned_ids(connection, owner): # TODO implement 'group_transactions_by_id' - space = connection.space("keys") - _keys = space.select(owner, index="keys_search") - - _outputid = _keys[0][1] - _transactionid = _keys[0][0] - - _transaction_object = formats.transactions.copy() - _transaction_object["inputs"] = [] - _transaction_object["outputs"] = [] - - _transactions = [] - - _all_keys = space.select(_outputid, index="id_search") - _all_keys = _all_keys.data - - space = connection.space("transactions") - _all_transactions = space.select(_transactionid, index="id_search") - _all_transactions = _all_transactions.data - - space = connection.space("inputs") - _all_inputs = space.select(_transactionid, index="id_search") - _all_inputs = _all_inputs.data - - space = connection.space("outputs") - _all_outputs = space.select(_transactionid, index="id_search") - _all_outputs = _all_outputs.data - - for tsobject in _all_transactions: - local_ts = _transaction_object.copy() - local_ts["id"] = tsobject[0] - local_ts["operation"] = tsobject[1] - local_ts["version"] = tsobject[2] - - for _in in _all_inputs: - if _in[0] == tsobject[0]: - local_ts["inputs"].append( - { - "owners_before": _in[2], - "fulffils": {"transaction_id": _in[3], "output_index": _in[4]}, - "fulffilment": _in[1] - } - ) - for _out in _all_outputs: - if _out[0] == tsobject[0]: - local_ts["outputs"].append( - { - "public_keys": [_key[2] for _key in _all_keys if _out[5] == _key[1]], - "condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]}, - "amount": _out[1] - } - ) - _transactions.append(local_ts) - - return _transactions +def get_owned_ids(conn, owner): + cursor = conn.run( + conn.collection('transactions').aggregate([ + {'$match': {'outputs.public_keys': owner}}, + {'$project': {'_id': False}} + ])) + return cursor @register_query(LocalMongoDBConnection) -def get_spending_transactions(inputs, connection): +def get_spending_transactions(conn, inputs): transaction_ids = [i['transaction_id'] for i in inputs] output_indexes = [i['output_index'] for i in inputs] + query = {'inputs': + {'$elemMatch': + {'$and': + [ + {'fulfills.transaction_id': {'$in': transaction_ids}}, + {'fulfills.output_index': {'$in': output_indexes}} + ]}}} - _transactions = [] - - for i in range(0, len(transaction_ids)): - ts_id = transaction_ids[i] - ot_id = output_indexes[i] - - _trans_object = get_spent(fullfil_transaction_id=ts_id, fullfil_output_index=ot_id, connection=connection) - _transactions.append(_trans_object) - - return _transactions + cursor = conn.run( + conn.collection('transactions').find(query, {'_id': False})) + return cursor @register_query(LocalMongoDBConnection) -def get_block(block_id, connection): - space = connection.space("blocks") - _block = space.select(block_id, index="block_search", limit=1) - _block = _block.data[0] - _txblock = space.select(_block[2], index="block_search") - _txblock = _txblock.data - return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} +def get_block(conn, block_id): + return conn.run( + conn.collection('blocks') + .find_one({'height': block_id}, + projection={'_id': False})) @register_query(LocalMongoDBConnection) -def get_block_with_transaction(txid, connection): - space = connection.space("blocks_tx") - _all_blocks_tx = space.select(txid, index="id_search") - _all_blocks_tx = _all_blocks_tx.data - space = connection.space("blocks") - - _block = space.select(_all_blocks_tx[0][1], index="block_id_search") - _block = _block.data[0] - return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _all_blocks_tx]} +def get_block_with_transaction(conn, txid): + return conn.run( + conn.collection('blocks') + .find({'transactions': txid}, + projection={'_id': False, 'height': True})) @register_query(LocalMongoDBConnection) @@ -338,15 +251,11 @@ def get_unspent_outputs(conn, *, query=None): @register_query(LocalMongoDBConnection) -def store_pre_commit_state(state, connection): - space = connection.space("pre_commits") - _precommit = space.select(state["height"], index="height_search", limit=1) - unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] - space.upsert((unique_id, state["height"], state["transactions"]), - op_list=[('=', 0, unique_id), - ('=', 1, state["height"]), - ('=', 2, state["transactions"])], - limit=1) +def store_pre_commit_state(conn, state): + return conn.run( + conn.collection('pre_commit') + .replace_one({}, state, upsert=True) + ) @register_query(LocalMongoDBConnection) @@ -355,15 +264,15 @@ def get_pre_commit_state(conn): @register_query(LocalMongoDBConnection) -def store_validator_set(validators_update, connection): - space = connection.space("validators") - _validator = space.select(validators_update["height"], index="height_search", limit=1) - unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] - space.upsert((unique_id, validators_update["height"], validators_update["validators"]), - op_list=[('=', 0, unique_id), - ('=', 1, validators_update["height"]), - ('=', 2, validators_update["validators"])], - limit=1) +def store_validator_set(conn, validators_update): + height = validators_update['height'] + return conn.run( + conn.collection('validators').replace_one( + {'height': height}, + validators_update, + upsert=True + ) + ) @register_query(LocalMongoDBConnection) @@ -374,22 +283,24 @@ def delete_validator_set(conn, height): @register_query(LocalMongoDBConnection) -def store_election(election_id, height, is_concluded, connection): - space = connection.space("elections") - space.upsert((election_id, height, is_concluded), - op_list=[('=', 0, election_id), - ('=', 1, height), - ('=', 2, is_concluded)], - limit=1) +def store_election(conn, election_id, height, is_concluded): + return conn.run( + conn.collection('elections').replace_one( + {'election_id': election_id, + 'height': height}, + {'election_id': election_id, + 'height': height, + 'is_concluded': is_concluded}, + upsert=True, + ) + ) @register_query(LocalMongoDBConnection) -def store_elections(elections, connection): - space = connection.space("elections") - for election in elections: - _election = space.insert((election["election_id"], - election["height"], - election["is_concluded"])) +def store_elections(conn, elections): + return conn.run( + conn.collection('elections').insert_many(elections) + ) @register_query(LocalMongoDBConnection) @@ -400,46 +311,55 @@ def delete_elections(conn, height): @register_query(LocalMongoDBConnection) -def get_validator_set(connection, height=None): - space = connection.space("validators") - _validators = space.select() - _validators = _validators.data +def get_validator_set(conn, height=None): + query = {} if height is not None: - _validators = [validator for validator in _validators if validator[1] <= height] - return next(iter(sorted(_validators, key=itemgetter(1))), None) + query = {'height': {'$lte': height}} - return next(iter(sorted(_validators, key=itemgetter(1))), None) + cursor = conn.run( + conn.collection('validators') + .find(query, projection={'_id': False}) + .sort([('height', DESCENDING)]) + .limit(1) + ) + + return next(cursor, None) @register_query(LocalMongoDBConnection) -def get_election(election_id, connection): - space = connection.space("elections") - _elections = space.select(election_id, index="id_search") - _elections = _elections.data - _election = sorted(_elections, key=itemgetter(0))[0] - return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]} +def get_election(conn, election_id): + query = {'election_id': election_id} + + return conn.run( + conn.collection('elections') + .find_one(query, projection={'_id': False}, + sort=[('height', DESCENDING)]) + ) @register_query(LocalMongoDBConnection) -def get_asset_tokens_for_public_key(connection, asset_id, public_key): - space = connection.space("keys") - _keys = space.select([public_key], index="keys_search") - space = connection.space("transactions") - _transactions = space.select([asset_id], index="only_asset_search") - _transactions = _transactions.data - _keys = _keys.data - _grouped_transactions = group_transaction_by_ids(connection=connection, txids=[_tx[0] for _tx in _transactions]) - return _grouped_transactions +def get_asset_tokens_for_public_key(conn, asset_id, public_key): + query = {'outputs.public_keys': [public_key], + 'asset.id': asset_id} + + cursor = conn.run( + conn.collection('transactions').aggregate([ + {'$match': query}, + {'$project': {'_id': False}} + ])) + return cursor @register_query(LocalMongoDBConnection) -def store_abci_chain(height, chain_id, connection, is_synced=True): - space = connection.space("abci_chains") - space.upsert((height, chain_id, is_synced), - op_list=[('=', 0, height), - ('=', 1, chain_id), - ('=', 2, is_synced)], - limit=1) +def store_abci_chain(conn, height, chain_id, is_synced=True): + return conn.run( + conn.collection('abci_chains').replace_one( + {'height': height}, + {'height': height, 'chain_id': chain_id, + 'is_synced': is_synced}, + upsert=True, + ) + ) @register_query(LocalMongoDBConnection) @@ -450,8 +370,8 @@ def delete_abci_chain(conn, height): @register_query(LocalMongoDBConnection) -def get_latest_abci_chain(connection): - space = connection.space("abci_chains") - _all_chains = space.select() - _chain = sorted(_all_chains.data, key=itemgetter(0))[0] - return {"height": _chain[0], "is_synced": _chain[1], "chain_id": _chain[2]} +def get_latest_abci_chain(conn): + return conn.run( + conn.collection('abci_chains') + .find_one(projection={'_id': False}, sort=[('height', DESCENDING)]) + ) diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py new file mode 100644 index 0000000..bdf5152 --- /dev/null +++ b/planetmint/backend/tarantool/query.py @@ -0,0 +1,457 @@ +# Copyright © 2020 Interplanetary Database Association e.V., +# Planetmint and IPDB software contributors. +# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) +# Code is Apache-2.0 and docs are CC-BY-4.0 + +"""Query implementation for MongoDB""" + +from pymongo import DESCENDING + +from planetmint import backend +from planetmint.backend.exceptions import DuplicateKeyError +from planetmint.backend.utils import module_dispatch_registrar +from planetmint.backend.localmongodb.connection import LocalMongoDBConnection +from planetmint.common.transaction import Transaction + +register_query = module_dispatch_registrar(backend.query) + + +@register_query(LocalMongoDBConnection) +def store_transactions(conn, signed_transactions): + return conn.run(conn.collection('transactions') + .insert_many(signed_transactions)) + + +@register_query(LocalMongoDBConnection) +def get_transaction(conn, transaction_id): + return conn.run( + conn.collection('transactions') + .find_one({'id': transaction_id}, {'_id': 0})) + + +@register_query(LocalMongoDBConnection) +def get_transactions(conn, transaction_ids): + try: + return conn.run( + conn.collection('transactions') + .find({'id': {'$in': transaction_ids}}, + projection={'_id': False})) + except IndexError: + pass + + +@register_query(LocalMongoDBConnection) +def store_metadatas(metadata, connection): + space = connection.space("meta_data") + for meta in metadata: + space.insert((meta["id"], meta)) + + +@register_query(LocalMongoDBConnection) +def get_metadata(conn, transaction_ids): + return conn.run( + conn.collection('metadata') + .find({'id': {'$in': transaction_ids}}, + projection={'_id': False})) + + +@register_query(LocalMongoDBConnection) +def store_asset(asset, connection): + space = connection.space("assets") + unique = token_hex(8) + space.insert((asset["id"], unique, asset["data"])) + + +@register_query(LocalMongoDBConnection) +def store_assets(assets, connection): + space = connection.space("assets") + for asset in assets: + unique = token_hex(8) + space.insert((asset["id"], unique, asset["data"])) + + +@register_query(LocalMongoDBConnection) +def get_asset(asset_id: str, space): + _data = space.select(asset_id, index="assetid_search") + _data = _data.data[0] + return {"data": _data[1]} + + +@register_query(LocalMongoDBConnection) +def get_assets(assets_ids: list, space): + _returned_data = [] + for _id in assets_ids: + asset = space.select(_id, index="assetid_search") + asset = asset.data[0] + _returned_data.append({"id": asset[0], "data": asset[1]}) + return _returned_data + + +@register_query(LocalMongoDBConnection) +def get_spent(fullfil_transaction_id: str, fullfil_output_index: str, connection): + _transaction_object = formats.transactions.copy() + _transaction_object["inputs"] = [] + _transaction_object["outputs"] = [] + space = connection.space("inputs") + _inputs = space.select([fullfil_transaction_id, fullfil_output_index], index="spent_search") + _inputs = _inputs.data + _transaction_object["id"] = _inputs[0][0] + _transaction_object["inputs"] = [ + { + "owners_before": _in[2], + "fulfills": {"transaction_id": _in[3], "output_index": _in[4]}, + "fulfillment": _in[1] + } for _in in _inputs + ] + space = connection.space("outputs") + _outputs = space.select(_transaction_object["id"], index="id_search") + _outputs = _outputs.data + _transaction_object["outputs"] = [ + { + "public_keys": _out[5], + "amount": _out[1], + "condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]} + } for _out in _outputs + ] + return _transaction_object + + +@register_query(LocalMongoDBConnection) +def latest_block(connection): # TODO Here is used DESCENDING OPERATOR + space = connection.space("blocks") + _all_blocks = space.select() + _all_blocks = _all_blocks.data + _block = sorted(_all_blocks, key=itemgetter(1))[0] + space = connection.space("blocks_tx") + _txids = space.select(_block[2], index="block_search") + _txids = _txids.data + return {"app_hash": _block[1], "height": _block[1], "transactions": [tx[0] for tx in _txids]} + + +@register_query(LocalMongoDBConnection) +def store_block(block, connection): + space = connection.space("blocks") + block_unique_id = token_hex(8) + space.insert((block["app_hash"], + block["height"], + block_unique_id)) + space = connection.space("blocks_tx") + for txid in block["transactions"]: + space.insert((txid, block_unique_id)) + + +@register_query(LocalMongoDBConnection) +def get_txids_filtered(connection, asset_id, operation=None, last_tx=None): # TODO here is used 'OR' operator + _transaction_object = formats.transactions.copy() + _transaction_object["inputs"] = [] + _transaction_object["outputs"] = [] + + actions = { + "CREATE": {"sets": ["CREATE", asset_id], "index": "transaction_search"}, + # 1 - operation, 2 - id (only in transactions) + + "TRANSFER": {"sets": ["TRANSFER", asset_id], "index": "asset_search"}, + # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR + None: {"sets": [asset_id, asset_id], "index": "both_search"} + }[operation] + space = connection.space("transactions") + if actions["sets"][0] == "CREATE": + _transactions = space.select([operation, asset_id], index=actions["index"]) + _transactions = _transactions.data + elif actions["sets"][0] == "TRANSFER": + _transactions = space.select([operation, asset_id], index=actions["index"]) + _transactions = _transactions.data + else: + _transactions = space.select([asset_id, asset_id], index=actions["index"]) + _transactions = _transactions.data + + if last_tx: + _transactions = [_transactions[0]] + + return tuple([elem[0] for elem in _transactions]) + + +@register_query(LocalMongoDBConnection) +def text_search(conn, search, *, language='english', case_sensitive=False, # TODO review text search in tarantool (maybe, remove) + diacritic_sensitive=False, text_score=False, limit=0, table='assets'): + cursor = conn.run( + conn.collection(table) + .find({'$text': { + '$search': search, + '$language': language, + '$caseSensitive': case_sensitive, + '$diacriticSensitive': diacritic_sensitive}}, + {'score': {'$meta': 'textScore'}, '_id': False}) + .sort([('score', {'$meta': 'textScore'})]) + .limit(limit)) + + if text_score: + return cursor + + return (_remove_text_score(obj) for obj in cursor) + + +def _remove_text_score(asset): + asset.pop('score', None) + return asset + + +@register_query(LocalMongoDBConnection) +def get_owned_ids(connection, owner): # TODO implement 'group_transactions_by_id' + space = connection.space("keys") + _keys = space.select(owner, index="keys_search") + + _outputid = _keys[0][1] + _transactionid = _keys[0][0] + + _transaction_object = formats.transactions.copy() + _transaction_object["inputs"] = [] + _transaction_object["outputs"] = [] + + _transactions = [] + + _all_keys = space.select(_outputid, index="id_search") + _all_keys = _all_keys.data + + space = connection.space("transactions") + _all_transactions = space.select(_transactionid, index="id_search") + _all_transactions = _all_transactions.data + + space = connection.space("inputs") + _all_inputs = space.select(_transactionid, index="id_search") + _all_inputs = _all_inputs.data + + space = connection.space("outputs") + _all_outputs = space.select(_transactionid, index="id_search") + _all_outputs = _all_outputs.data + + for tsobject in _all_transactions: + local_ts = _transaction_object.copy() + local_ts["id"] = tsobject[0] + local_ts["operation"] = tsobject[1] + local_ts["version"] = tsobject[2] + + for _in in _all_inputs: + if _in[0] == tsobject[0]: + local_ts["inputs"].append( + { + "owners_before": _in[2], + "fulffils": {"transaction_id": _in[3], "output_index": _in[4]}, + "fulffilment": _in[1] + } + ) + for _out in _all_outputs: + if _out[0] == tsobject[0]: + local_ts["outputs"].append( + { + "public_keys": [_key[2] for _key in _all_keys if _out[5] == _key[1]], + "condition": {"details": {"type": _out[3], "public_key": _out[4]}, "uri": _out[2]}, + "amount": _out[1] + } + ) + _transactions.append(local_ts) + + return _transactions + + +@register_query(LocalMongoDBConnection) +def get_spending_transactions(inputs, connection): + transaction_ids = [i['transaction_id'] for i in inputs] + output_indexes = [i['output_index'] for i in inputs] + + _transactions = [] + + for i in range(0, len(transaction_ids)): + ts_id = transaction_ids[i] + ot_id = output_indexes[i] + + _trans_object = get_spent(fullfil_transaction_id=ts_id, fullfil_output_index=ot_id, connection=connection) + _transactions.append(_trans_object) + + return _transactions + + +@register_query(LocalMongoDBConnection) +def get_block(block_id, connection): + space = connection.space("blocks") + _block = space.select(block_id, index="block_search", limit=1) + _block = _block.data[0] + _txblock = space.select(_block[2], index="block_search") + _txblock = _txblock.data + return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} + + +@register_query(LocalMongoDBConnection) +def get_block_with_transaction(txid, connection): + space = connection.space("blocks_tx") + _all_blocks_tx = space.select(txid, index="id_search") + _all_blocks_tx = _all_blocks_tx.data + space = connection.space("blocks") + + _block = space.select(_all_blocks_tx[0][1], index="block_id_search") + _block = _block.data[0] + return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _all_blocks_tx]} + + +@register_query(LocalMongoDBConnection) +def delete_transactions(conn, txn_ids): + conn.run(conn.collection('assets').delete_many({'id': {'$in': txn_ids}})) + conn.run(conn.collection('metadata').delete_many({'id': {'$in': txn_ids}})) + conn.run(conn.collection('transactions').delete_many({'id': {'$in': txn_ids}})) + + +@register_query(LocalMongoDBConnection) +def store_unspent_outputs(conn, *unspent_outputs): + if unspent_outputs: + try: + return conn.run( + conn.collection('utxos').insert_many( + unspent_outputs, + ordered=False, + ) + ) + except DuplicateKeyError: + # TODO log warning at least + pass + + +@register_query(LocalMongoDBConnection) +def delete_unspent_outputs(conn, *unspent_outputs): + if unspent_outputs: + return conn.run( + conn.collection('utxos').delete_many({ + '$or': [{ + '$and': [ + {'transaction_id': unspent_output['transaction_id']}, + {'output_index': unspent_output['output_index']}, + ], + } for unspent_output in unspent_outputs] + }) + ) + + +@register_query(LocalMongoDBConnection) +def get_unspent_outputs(conn, *, query=None): + if query is None: + query = {} + return conn.run(conn.collection('utxos').find(query, + projection={'_id': False})) + + +@register_query(LocalMongoDBConnection) +def store_pre_commit_state(state, connection): + space = connection.space("pre_commits") + _precommit = space.select(state["height"], index="height_search", limit=1) + unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] + space.upsert((unique_id, state["height"], state["transactions"]), + op_list=[('=', 0, unique_id), + ('=', 1, state["height"]), + ('=', 2, state["transactions"])], + limit=1) + + +@register_query(LocalMongoDBConnection) +def get_pre_commit_state(conn): + return conn.run(conn.collection('pre_commit').find_one()) + + +@register_query(LocalMongoDBConnection) +def store_validator_set(validators_update, connection): + space = connection.space("validators") + _validator = space.select(validators_update["height"], index="height_search", limit=1) + unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] + space.upsert((unique_id, validators_update["height"], validators_update["validators"]), + op_list=[('=', 0, unique_id), + ('=', 1, validators_update["height"]), + ('=', 2, validators_update["validators"])], + limit=1) + + +@register_query(LocalMongoDBConnection) +def delete_validator_set(conn, height): + return conn.run( + conn.collection('validators').delete_many({'height': height}) + ) + + +@register_query(LocalMongoDBConnection) +def store_election(election_id, height, is_concluded, connection): + space = connection.space("elections") + space.upsert((election_id, height, is_concluded), + op_list=[('=', 0, election_id), + ('=', 1, height), + ('=', 2, is_concluded)], + limit=1) + + +@register_query(LocalMongoDBConnection) +def store_elections(elections, connection): + space = connection.space("elections") + for election in elections: + _election = space.insert((election["election_id"], + election["height"], + election["is_concluded"])) + + +@register_query(LocalMongoDBConnection) +def delete_elections(conn, height): + return conn.run( + conn.collection('elections').delete_many({'height': height}) + ) + + +@register_query(LocalMongoDBConnection) +def get_validator_set(connection, height=None): + space = connection.space("validators") + _validators = space.select() + _validators = _validators.data + if height is not None: + _validators = [validator for validator in _validators if validator[1] <= height] + return next(iter(sorted(_validators, key=itemgetter(1))), None) + + return next(iter(sorted(_validators, key=itemgetter(1))), None) + + +@register_query(LocalMongoDBConnection) +def get_election(election_id, connection): + space = connection.space("elections") + _elections = space.select(election_id, index="id_search") + _elections = _elections.data + _election = sorted(_elections, key=itemgetter(0))[0] + return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]} + + +@register_query(LocalMongoDBConnection) +def get_asset_tokens_for_public_key(connection, asset_id, public_key): + space = connection.space("keys") + _keys = space.select([public_key], index="keys_search") + space = connection.space("transactions") + _transactions = space.select([asset_id], index="only_asset_search") + _transactions = _transactions.data + _keys = _keys.data + _grouped_transactions = group_transaction_by_ids(connection=connection, txids=[_tx[0] for _tx in _transactions]) + return _grouped_transactions + + +@register_query(LocalMongoDBConnection) +def store_abci_chain(height, chain_id, connection, is_synced=True): + space = connection.space("abci_chains") + space.upsert((height, chain_id, is_synced), + op_list=[('=', 0, height), + ('=', 1, chain_id), + ('=', 2, is_synced)], + limit=1) + + +@register_query(LocalMongoDBConnection) +def delete_abci_chain(conn, height): + return conn.run( + conn.collection('abci_chains').delete_many({'height': height}) + ) + + +@register_query(LocalMongoDBConnection) +def get_latest_abci_chain(connection): + space = connection.space("abci_chains") + _all_chains = space.select() + _chain = sorted(_all_chains.data, key=itemgetter(0))[0] + return {"height": _chain[0], "is_synced": _chain[1], "chain_id": _chain[2]}