From 632ea7ffc1b243ca17220b832229a1c145dc0cba Mon Sep 17 00:00:00 2001 From: andrei Date: Mon, 30 May 2022 15:32:34 +0300 Subject: [PATCH 01/10] implemented decorator for running queries in try catch --- planetmint/backend/tarantool/query.py | 159 +++++++++++++++----------- 1 file changed, 91 insertions(+), 68 deletions(-) diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py index c9ab8a4..487ab51 100644 --- a/planetmint/backend/tarantool/query.py +++ b/planetmint/backend/tarantool/query.py @@ -19,7 +19,21 @@ from json import dumps, loads register_query = module_dispatch_registrar(query) -@register_query(TarantoolDBConnection) +def run_handled_query(func): + def wrapper_run_query(*args, **kwargs): + try: + resp = func(*args, **kwargs) + return resp + except tarantool.error.SchemaError: + return None + except tarantool.error.NetworkError: + return None + except Exception as err: + raise err + + return wrapper_run_query + + def _group_transaction_by_ids(connection, txids: list): txspace = connection.space("transactions") inxspace = connection.space("inputs") @@ -55,6 +69,7 @@ def _group_transaction_by_ids(connection, txids: list): return _transactions +@run_handled_query @register_query(TarantoolDBConnection) def store_transactions(connection, signed_transactions: list): txspace = connection.space("transactions") @@ -88,18 +103,21 @@ def store_transactions(connection, signed_transactions: list): assetsxspace.insert(txtuples["asset"]) +@run_handled_query @register_query(TarantoolDBConnection) def get_transaction(connection, transaction_id: str): _transactions = _group_transaction_by_ids(txids=[transaction_id], connection=connection) return next(iter(_transactions), None) +@run_handled_query @register_query(TarantoolDBConnection) def get_transactions(connection, transactions_ids: list): _transactions = _group_transaction_by_ids(txids=transactions_ids, connection=connection) return _transactions +@run_handled_query @register_query(TarantoolDBConnection) def store_metadatas(connection, metadata: list): space = connection.space("meta_data") @@ -107,6 +125,7 @@ def store_metadatas(connection, metadata: list): space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) +@run_handled_query @register_query(TarantoolDBConnection) def get_metadata(connection, transaction_ids: list): _returned_data = [] @@ -118,6 +137,7 @@ def get_metadata(connection, transaction_ids: list): return _returned_data if len(_returned_data) > 0 else None +@run_handled_query @register_query(TarantoolDBConnection) def store_asset(connection, asset): space = connection.space("assets") @@ -128,6 +148,7 @@ def store_asset(connection, asset): print("DUPLICATE ERROR") +@run_handled_query @register_query(TarantoolDBConnection) def store_assets(connection, assets: list): space = connection.space("assets") @@ -139,6 +160,7 @@ def store_assets(connection, assets: list): print(f"EXCEPTION : {ex} ") +@run_handled_query @register_query(TarantoolDBConnection) def get_asset(connection, asset_id: str): space = connection.space("assets") @@ -147,6 +169,7 @@ def get_asset(connection, asset_id: str): return _data[0][0] if len(_data) > 0 else [] +@run_handled_query @register_query(TarantoolDBConnection) def get_assets(connection, assets_ids: list) -> list: _returned_data = [] @@ -156,6 +179,7 @@ def get_assets(connection, assets_ids: list) -> list: return sorted(_returned_data, key=lambda k: k["id"], reverse=False) +@run_handled_query @register_query(TarantoolDBConnection) def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): space = connection.space("inputs") @@ -165,31 +189,28 @@ def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str return _transactions +@run_handled_query @register_query(TarantoolDBConnection) def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR - try: - space = connection.space("blocks") - _all_blocks = space.select() - _all_blocks = _all_blocks.data - block = {"app_hash": '', "height": 0, "transactions": []} + space = connection.space("blocks") + _all_blocks = space.select() + _all_blocks = _all_blocks.data + block = {"app_hash": '', "height": 0, "transactions": []} - if len(_all_blocks) > 0: - _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] - space = connection.space("blocks_tx") - _txids = space.select(_block[2], index="block_search") - _txids = _txids.data - block["app_hash"] = _block[0] - block["height"] = _block[1] - block["transactions"] = [tx[0] for tx in _txids] - else: - block = None - return block - except tarantool.error.SchemaError: - return None - except Exception as err: - raise err + if len(_all_blocks) > 0: + _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] + space = connection.space("blocks_tx") + _txids = space.select(_block[2], index="block_search") + _txids = _txids.data + block["app_hash"] = _block[0] + block["height"] = _block[1] + block["transactions"] = [tx[0] for tx in _txids] + else: + block = None + return block +@run_handled_query @register_query(TarantoolDBConnection) def store_block(connection, block: dict): space = connection.space("blocks") @@ -202,6 +223,7 @@ def store_block(connection, block: dict): space.insert((txid, block_unique_id)) +@run_handled_query @register_query(TarantoolDBConnection) def get_txids_filtered(connection, asset_id: str, operation: str = None, last_tx: any = None): # TODO here is used 'OR' operator @@ -263,6 +285,7 @@ def _remove_text_score(asset): return asset +@run_handled_query @register_query(TarantoolDBConnection) def get_owned_ids(connection, owner: str): space = connection.space("keys") @@ -274,6 +297,7 @@ def get_owned_ids(connection, owner: str): return _transactions +@run_handled_query @register_query(TarantoolDBConnection) def get_spending_transactions(connection, inputs): _transactions = [] @@ -287,12 +311,12 @@ def get_spending_transactions(connection, inputs): return _transactions +@run_handled_query @register_query(TarantoolDBConnection) def get_block(connection, block_id=[]): space = connection.space("blocks") _block = space.select(block_id, index="block_search", limit=1) _block = _block.data - print(f"QUERY 1 :: {_block}") if len(_block) == 0: return [] _block = _block[0] @@ -302,6 +326,7 @@ def get_block(connection, block_id=[]): return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} +@run_handled_query @register_query(TarantoolDBConnection) def get_block_with_transaction(connection, txid: str): space = connection.space("blocks_tx") @@ -314,6 +339,7 @@ def get_block_with_transaction(connection, txid: str): return [{"height": _height[1]} for _height in _block.data] +@run_handled_query @register_query(TarantoolDBConnection) def delete_transactions(connection, txn_ids: list): tx_space = connection.space("transactions") @@ -342,6 +368,7 @@ def delete_transactions(connection, txn_ids: list): assets_space.delete(_id, index="txid_search") +@run_handled_query @register_query(TarantoolDBConnection) def store_unspent_outputs(connection, *unspent_outputs: list): space = connection.space('utxos') @@ -352,6 +379,8 @@ def store_unspent_outputs(connection, *unspent_outputs: list): result.append(output.data) return result + +@run_handled_query @register_query(TarantoolDBConnection) def delete_unspent_outputs(connection, *unspent_outputs: list): space = connection.space('utxos') @@ -363,6 +392,7 @@ def delete_unspent_outputs(connection, *unspent_outputs: list): return result +@run_handled_query @register_query(TarantoolDBConnection) def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. space = connection.space('utxos') @@ -370,6 +400,7 @@ def get_unspent_outputs(connection, query=None): # for now we don't have implem return [loads(utx[2]) for utx in _utxos] +@run_handled_query @register_query(TarantoolDBConnection) def store_pre_commit_state(connection, state: dict): space = connection.space("pre_commits") @@ -382,21 +413,18 @@ def store_pre_commit_state(connection, state: dict): limit=1) +@run_handled_query @register_query(TarantoolDBConnection) def get_pre_commit_state(connection): - try: - space = connection.space("pre_commits") - _commit = space.select([], index="id_search").data - if len(_commit) == 0: - return None - _commit = sorted(_commit, key=itemgetter(1), reverse=True)[0] - return {"height": _commit[1], "transactions": _commit[2]} - except tarantool.error.SchemaError: + space = connection.space("pre_commits") + _commit = space.select([], index="id_search").data + if len(_commit) == 0: return None - except Exception as err: - raise err + _commit = sorted(_commit, key=itemgetter(1), reverse=True)[0] + return {"height": _commit[1], "transactions": _commit[2]} +@run_handled_query @register_query(TarantoolDBConnection) def store_validator_set(conn, validators_update: dict): space = conn.space("validators") @@ -409,6 +437,7 @@ def store_validator_set(conn, validators_update: dict): limit=1) +@run_handled_query @register_query(TarantoolDBConnection) def delete_validator_set(connection, height: int): space = connection.space("validators") @@ -417,6 +446,7 @@ def delete_validator_set(connection, height: int): space.delete(_valid[0]) +@run_handled_query @register_query(TarantoolDBConnection) def store_election(connection, election_id: str, height: int, is_concluded: bool): space = connection.space("elections") @@ -427,6 +457,7 @@ def store_election(connection, election_id: str, height: int, is_concluded: bool limit=1) +@run_handled_query @register_query(TarantoolDBConnection) def store_elections(connection, elections: list): space = connection.space("elections") @@ -436,6 +467,7 @@ def store_elections(connection, elections: list): election["is_concluded"])) +@run_handled_query @register_query(TarantoolDBConnection) def delete_elections(connection, height: int): space = connection.space("elections") @@ -444,41 +476,34 @@ def delete_elections(connection, height: int): space.delete(_elec[0]) +@run_handled_query @register_query(TarantoolDBConnection) def get_validator_set(connection, height: int = None): - try: - space = connection.space("validators") - _validators = space.select() - _validators = _validators.data - if height is not None: - _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if - validator[1] <= height] - return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None) - else: - _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators] - return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None) - except tarantool.error.SchemaError: - return None - except Exception as err: - raise err + space = connection.space("validators") + _validators = space.select() + _validators = _validators.data + if height is not None: + _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if + validator[1] <= height] + return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None) + else: + _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators] + return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None) +@run_handled_query @register_query(TarantoolDBConnection) def get_election(connection, election_id: str): - try: - space = connection.space("elections") - _elections = space.select(election_id, index="id_search") - _elections = _elections.data - if len(_elections) == 0: - return None - _election = sorted(_elections, key=itemgetter(0), reverse=True)[0] - return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]} - except tarantool.error.SchemaError: + space = connection.space("elections") + _elections = space.select(election_id, index="id_search") + _elections = _elections.data + if len(_elections) == 0: return None - except Exception as err: - raise err + _election = sorted(_elections, key=itemgetter(0), reverse=True)[0] + return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]} +@run_handled_query @register_query(TarantoolDBConnection) def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): space = connection.space("keys") @@ -491,6 +516,7 @@ def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): return _grouped_transactions +@run_handled_query @register_query(TarantoolDBConnection) def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True): space = connection.space("abci_chains") @@ -501,6 +527,7 @@ def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = T limit=1) +@run_handled_query @register_query(TarantoolDBConnection) def delete_abci_chain(connection, height: int): space = connection.space("abci_chains") @@ -509,16 +536,12 @@ def delete_abci_chain(connection, height: int): space.delete(_chain[2]) +@run_handled_query @register_query(TarantoolDBConnection) def get_latest_abci_chain(connection): - try: - space = connection.space("abci_chains") - _all_chains = space.select().data - if len(_all_chains) == 0: - return None - _chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0] - return {"height": _chain[0], "is_synced": _chain[1], "chain_id": _chain[2]} - except tarantool.error.SchemaError: + space = connection.space("abci_chains") + _all_chains = space.select().data + if len(_all_chains) == 0: return None - except Exception as err: - raise err + _chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0] + return {"height": _chain[0], "is_synced": _chain[1], "chain_id": _chain[2]} From dad02b1ebc7cbf50b6b4027718adc14bb6cd02fc Mon Sep 17 00:00:00 2001 From: andrei Date: Mon, 30 May 2022 15:35:17 +0300 Subject: [PATCH 02/10] removed import of interface --- tests/backend/tarantool/test_queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/backend/tarantool/test_queries.py b/tests/backend/tarantool/test_queries.py index 7435df6..3a25c34 100644 --- a/tests/backend/tarantool/test_queries.py +++ b/tests/backend/tarantool/test_queries.py @@ -230,7 +230,7 @@ def test_get_owned_ids(signed_create_tx, user_pk, db_conn): def test_get_spending_transactions(user_pk, user_sk, db_conn): from planetmint.models import Transaction - # from planetmint.backend.connection import Connection + # from planetmint.backend.tarantool import query conn = db_conn.get_connection() From ccb2df14773a40888bb84c389cc27b728f2c46ce Mon Sep 17 00:00:00 2001 From: andrei Date: Mon, 30 May 2022 15:36:12 +0300 Subject: [PATCH 03/10] Removed print + removed import of interface directly --- tests/conftest.py | 8 ++++---- tests/utils.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5c1d2a0..f4a097f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -123,12 +123,12 @@ def _configure_planetmint(request): @pytest.fixture(scope='session') def _setup_database(_configure_planetmint): # TODO Here is located setup database - from planetmint.backend.connection import Connection + from planetmint.backend.connection import connect from planetmint.config import Config print('Initializing test db') dbname = Config().get()['database']['name'] - conn = Connection() + conn = connect() _drop_db(conn, dbname) schema.init_database(conn) @@ -137,7 +137,7 @@ def _setup_database(_configure_planetmint): # TODO Here is located setup databa yield print('Deleting `{}` database'.format(dbname)) - conn = Connection() + conn = connect() _drop_db(conn, dbname) print('Finished deleting `{}`'.format(dbname)) @@ -145,7 +145,6 @@ def _setup_database(_configure_planetmint): # TODO Here is located setup databa @pytest.fixture def _bdb(_setup_database, _configure_planetmint): - print(f"BDB CALL") from planetmint.backend import Connection from planetmint.transactions.common.memoize import to_dict, from_dict from planetmint.models import Transaction @@ -363,6 +362,7 @@ def inputs(user_pk, b, alice): def _drop_db(conn, dbname): + print(f"CONNECTION FOR DROPPING {conn}") try: schema.drop_database(conn, dbname) except DatabaseDoesNotExist: diff --git a/tests/utils.py b/tests/utils.py index 786b02d..ac4d40c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ import random from functools import singledispatch from planetmint.backend.localmongodb.connection import LocalMongoDBConnection -from planetmint.backend.tarantool.connection import TarantoolDB +from planetmint.backend.tarantool.connection import TarantoolDBConnection from planetmint.backend.schema import TABLES, SPACE_NAMES from planetmint.transactions.common import crypto from planetmint.transactions.common.transaction_mode_types import BROADCAST_TX_COMMIT @@ -30,7 +30,7 @@ def flush_localmongo_db(connection, dbname): getattr(connection.conn[dbname], t).delete_many({}) -@flush_db.register(TarantoolDB) +@flush_db.register(TarantoolDBConnection) def flush_tarantool_db(connection, dbname): for s in SPACE_NAMES: _space = connection.space(space_name=s) From 76faca34fceb16d3073ada18c4c3a1f342e22075 Mon Sep 17 00:00:00 2001 From: andrei Date: Mon, 30 May 2022 15:39:56 +0300 Subject: [PATCH 04/10] removed importing of interface --- tests/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f4a097f..01bd5c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -145,12 +145,12 @@ def _setup_database(_configure_planetmint): # TODO Here is located setup databa @pytest.fixture def _bdb(_setup_database, _configure_planetmint): - from planetmint.backend import Connection + from planetmint.backend.connection import connect from planetmint.transactions.common.memoize import to_dict, from_dict from planetmint.models import Transaction from .utils import flush_db from planetmint.config import Config - conn = Connection() + conn = connect() yield dbname = Config().get()['database']['name'] flush_db(conn, dbname) From dc8138f6499467e5d8bbb69630a7f515e916f095 Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 15:13:16 +0300 Subject: [PATCH 05/10] added new requirement to setup.py --- planetmint/backend/query.py | 1 - planetmint/backend/tarantool/query.py | 49 --------------------------- setup.py | 3 +- 3 files changed, 2 insertions(+), 51 deletions(-) diff --git a/planetmint/backend/query.py b/planetmint/backend/query.py index 353b080..0f4d044 100644 --- a/planetmint/backend/query.py +++ b/planetmint/backend/query.py @@ -6,7 +6,6 @@ """Query interfaces for backends.""" from functools import singledispatch - from planetmint.backend.exceptions import OperationError diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py index 487ab51..26869ff 100644 --- a/planetmint/backend/tarantool/query.py +++ b/planetmint/backend/tarantool/query.py @@ -19,21 +19,6 @@ from json import dumps, loads register_query = module_dispatch_registrar(query) -def run_handled_query(func): - def wrapper_run_query(*args, **kwargs): - try: - resp = func(*args, **kwargs) - return resp - except tarantool.error.SchemaError: - return None - except tarantool.error.NetworkError: - return None - except Exception as err: - raise err - - return wrapper_run_query - - def _group_transaction_by_ids(connection, txids: list): txspace = connection.space("transactions") inxspace = connection.space("inputs") @@ -69,7 +54,6 @@ def _group_transaction_by_ids(connection, txids: list): return _transactions -@run_handled_query @register_query(TarantoolDBConnection) def store_transactions(connection, signed_transactions: list): txspace = connection.space("transactions") @@ -103,21 +87,18 @@ def store_transactions(connection, signed_transactions: list): assetsxspace.insert(txtuples["asset"]) -@run_handled_query @register_query(TarantoolDBConnection) def get_transaction(connection, transaction_id: str): _transactions = _group_transaction_by_ids(txids=[transaction_id], connection=connection) return next(iter(_transactions), None) -@run_handled_query @register_query(TarantoolDBConnection) def get_transactions(connection, transactions_ids: list): _transactions = _group_transaction_by_ids(txids=transactions_ids, connection=connection) return _transactions -@run_handled_query @register_query(TarantoolDBConnection) def store_metadatas(connection, metadata: list): space = connection.space("meta_data") @@ -125,7 +106,6 @@ def store_metadatas(connection, metadata: list): space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) -@run_handled_query @register_query(TarantoolDBConnection) def get_metadata(connection, transaction_ids: list): _returned_data = [] @@ -137,7 +117,6 @@ def get_metadata(connection, transaction_ids: list): return _returned_data if len(_returned_data) > 0 else None -@run_handled_query @register_query(TarantoolDBConnection) def store_asset(connection, asset): space = connection.space("assets") @@ -148,7 +127,6 @@ def store_asset(connection, asset): print("DUPLICATE ERROR") -@run_handled_query @register_query(TarantoolDBConnection) def store_assets(connection, assets: list): space = connection.space("assets") @@ -160,7 +138,6 @@ def store_assets(connection, assets: list): print(f"EXCEPTION : {ex} ") -@run_handled_query @register_query(TarantoolDBConnection) def get_asset(connection, asset_id: str): space = connection.space("assets") @@ -169,7 +146,6 @@ def get_asset(connection, asset_id: str): return _data[0][0] if len(_data) > 0 else [] -@run_handled_query @register_query(TarantoolDBConnection) def get_assets(connection, assets_ids: list) -> list: _returned_data = [] @@ -179,7 +155,6 @@ def get_assets(connection, assets_ids: list) -> list: return sorted(_returned_data, key=lambda k: k["id"], reverse=False) -@run_handled_query @register_query(TarantoolDBConnection) def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): space = connection.space("inputs") @@ -189,7 +164,6 @@ def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str return _transactions -@run_handled_query @register_query(TarantoolDBConnection) def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR space = connection.space("blocks") @@ -210,7 +184,6 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR return block -@run_handled_query @register_query(TarantoolDBConnection) def store_block(connection, block: dict): space = connection.space("blocks") @@ -223,7 +196,6 @@ def store_block(connection, block: dict): space.insert((txid, block_unique_id)) -@run_handled_query @register_query(TarantoolDBConnection) def get_txids_filtered(connection, asset_id: str, operation: str = None, last_tx: any = None): # TODO here is used 'OR' operator @@ -285,7 +257,6 @@ def _remove_text_score(asset): return asset -@run_handled_query @register_query(TarantoolDBConnection) def get_owned_ids(connection, owner: str): space = connection.space("keys") @@ -297,7 +268,6 @@ def get_owned_ids(connection, owner: str): return _transactions -@run_handled_query @register_query(TarantoolDBConnection) def get_spending_transactions(connection, inputs): _transactions = [] @@ -311,7 +281,6 @@ def get_spending_transactions(connection, inputs): return _transactions -@run_handled_query @register_query(TarantoolDBConnection) def get_block(connection, block_id=[]): space = connection.space("blocks") @@ -326,7 +295,6 @@ def get_block(connection, block_id=[]): return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} -@run_handled_query @register_query(TarantoolDBConnection) def get_block_with_transaction(connection, txid: str): space = connection.space("blocks_tx") @@ -339,7 +307,6 @@ def get_block_with_transaction(connection, txid: str): return [{"height": _height[1]} for _height in _block.data] -@run_handled_query @register_query(TarantoolDBConnection) def delete_transactions(connection, txn_ids: list): tx_space = connection.space("transactions") @@ -368,7 +335,6 @@ def delete_transactions(connection, txn_ids: list): assets_space.delete(_id, index="txid_search") -@run_handled_query @register_query(TarantoolDBConnection) def store_unspent_outputs(connection, *unspent_outputs: list): space = connection.space('utxos') @@ -380,7 +346,6 @@ def store_unspent_outputs(connection, *unspent_outputs: list): return result -@run_handled_query @register_query(TarantoolDBConnection) def delete_unspent_outputs(connection, *unspent_outputs: list): space = connection.space('utxos') @@ -392,7 +357,6 @@ def delete_unspent_outputs(connection, *unspent_outputs: list): return result -@run_handled_query @register_query(TarantoolDBConnection) def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. space = connection.space('utxos') @@ -400,7 +364,6 @@ def get_unspent_outputs(connection, query=None): # for now we don't have implem return [loads(utx[2]) for utx in _utxos] -@run_handled_query @register_query(TarantoolDBConnection) def store_pre_commit_state(connection, state: dict): space = connection.space("pre_commits") @@ -413,7 +376,6 @@ def store_pre_commit_state(connection, state: dict): limit=1) -@run_handled_query @register_query(TarantoolDBConnection) def get_pre_commit_state(connection): space = connection.space("pre_commits") @@ -424,7 +386,6 @@ def get_pre_commit_state(connection): return {"height": _commit[1], "transactions": _commit[2]} -@run_handled_query @register_query(TarantoolDBConnection) def store_validator_set(conn, validators_update: dict): space = conn.space("validators") @@ -437,7 +398,6 @@ def store_validator_set(conn, validators_update: dict): limit=1) -@run_handled_query @register_query(TarantoolDBConnection) def delete_validator_set(connection, height: int): space = connection.space("validators") @@ -446,7 +406,6 @@ def delete_validator_set(connection, height: int): space.delete(_valid[0]) -@run_handled_query @register_query(TarantoolDBConnection) def store_election(connection, election_id: str, height: int, is_concluded: bool): space = connection.space("elections") @@ -457,7 +416,6 @@ def store_election(connection, election_id: str, height: int, is_concluded: bool limit=1) -@run_handled_query @register_query(TarantoolDBConnection) def store_elections(connection, elections: list): space = connection.space("elections") @@ -467,7 +425,6 @@ def store_elections(connection, elections: list): election["is_concluded"])) -@run_handled_query @register_query(TarantoolDBConnection) def delete_elections(connection, height: int): space = connection.space("elections") @@ -476,7 +433,6 @@ def delete_elections(connection, height: int): space.delete(_elec[0]) -@run_handled_query @register_query(TarantoolDBConnection) def get_validator_set(connection, height: int = None): space = connection.space("validators") @@ -491,7 +447,6 @@ def get_validator_set(connection, height: int = None): return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None) -@run_handled_query @register_query(TarantoolDBConnection) def get_election(connection, election_id: str): space = connection.space("elections") @@ -503,7 +458,6 @@ def get_election(connection, election_id: str): return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]} -@run_handled_query @register_query(TarantoolDBConnection) def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): space = connection.space("keys") @@ -516,7 +470,6 @@ def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): return _grouped_transactions -@run_handled_query @register_query(TarantoolDBConnection) def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True): space = connection.space("abci_chains") @@ -527,7 +480,6 @@ def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = T limit=1) -@run_handled_query @register_query(TarantoolDBConnection) def delete_abci_chain(connection, height: int): space = connection.space("abci_chains") @@ -536,7 +488,6 @@ def delete_abci_chain(connection, height: int): space.delete(_chain[2]) -@run_handled_query @register_query(TarantoolDBConnection) def get_latest_abci_chain(connection): space = connection.space("abci_chains") diff --git a/setup.py b/setup.py index cec7758..2c27b8e 100644 --- a/setup.py +++ b/setup.py @@ -94,7 +94,8 @@ install_requires = [ 'requests==2.25.1', 'setproctitle==1.2.2', 'werkzeug==2.0.3', - 'nest-asyncio==1.5.5' + 'nest-asyncio==1.5.5', + 'protobuf==3.19.0' ] From 69c3c93ae6e5fac73196616d1942faab2684186c Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 15:21:50 +0300 Subject: [PATCH 06/10] added query() method, that will return Lazy Object --- planetmint/backend/tarantool/connection.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index d2c640e..47dcac6 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -8,6 +8,7 @@ import tarantool from planetmint.config import Config from planetmint.transactions.common.exceptions import ConfigurationError +from planetmint.utils import Lazy from planetmint.backend.connection import Connection logger = logging.getLogger(__name__) @@ -47,6 +48,9 @@ class TarantoolDBConnection(Connection): f.close() return "".join(execute).encode() + def query(self): + return Lazy + def _reconnect(self): self.db_connect = tarantool.connect(host=self.host, port=self.port) From 1e00abcf4eae07c66a3f67b94a2ac57b935fec57 Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 15:22:31 +0300 Subject: [PATCH 07/10] removed unecessary comments from connection class --- planetmint/backend/tarantool/connection.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index 47dcac6..e9c3786 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -22,16 +22,10 @@ class TarantoolDBConnection(Connection): # TODO add user support later on print(f"host : {host}") print(f"port : {port}") - # self.db_connect = tarantool.connect(host=host, port=port, user=user, password=password) # TODO : raise configuraiton error if the connection cannot be established self.db_connect = tarantool.connect(host=self.host, port=self.port) self.init_path = Config().get()["database"]["init_config"]["absolute_path"] self.drop_path = Config().get()["database"]["drop_config"]["absolute_path"] - # args_reset_db = kwargs.get("kwargs").get("reset_database") if "kwargs" in kwargs else None - # if reset_database or args_reset_db is True: - # self.drop_database() - # self.init_database() - # self._reconnect() self.SPACE_NAMES = ["abci_chains", "assets", "blocks", "blocks_tx", "elections", "meta_data", "pre_commits", "validators", "transactions", "inputs", "outputs", "keys"] From 79f4dd4c7ec242de2557e769f21e8cc5a132de97 Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 15:27:30 +0300 Subject: [PATCH 08/10] get_space method is the same as old space method. --- planetmint/backend/tarantool/connection.py | 5 +- planetmint/backend/tarantool/query.py | 88 +++++++++++----------- tests/backend/tarantool/test_queries.py | 4 +- tests/backend/tarantool/test_schema.py | 2 +- tests/tendermint/test_lib.py | 4 +- 5 files changed, 53 insertions(+), 50 deletions(-) diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index e9c3786..11f22c9 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -48,9 +48,12 @@ class TarantoolDBConnection(Connection): def _reconnect(self): self.db_connect = tarantool.connect(host=self.host, port=self.port) - def space(self, space_name: str): + def get_space(self, space_name: str): return self.db_connect.space(space_name) + def space(self, space_name: str): + return self.query().space(space_name) + def get_connection(self): return self.db_connect diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py index 26869ff..3a64e04 100644 --- a/planetmint/backend/tarantool/query.py +++ b/planetmint/backend/tarantool/query.py @@ -20,12 +20,12 @@ register_query = module_dispatch_registrar(query) def _group_transaction_by_ids(connection, txids: list): - txspace = connection.space("transactions") - inxspace = connection.space("inputs") - outxspace = connection.space("outputs") - keysxspace = connection.space("keys") - assetsxspace = connection.space("assets") - metaxspace = connection.space("meta_data") + txspace = connection.get_space("transactions") + inxspace = connection.get_space("inputs") + outxspace = connection.get_space("outputs") + keysxspace = connection.get_space("keys") + assetsxspace = connection.get_space("assets") + metaxspace = connection.get_space("meta_data") _transactions = [] for txid in txids: _txobject = txspace.select(txid, index="id_search") @@ -56,12 +56,12 @@ def _group_transaction_by_ids(connection, txids: list): @register_query(TarantoolDBConnection) def store_transactions(connection, signed_transactions: list): - txspace = connection.space("transactions") - inxspace = connection.space("inputs") - outxspace = connection.space("outputs") - keysxspace = connection.space("keys") - metadatasxspace = connection.space("meta_data") - assetsxspace = connection.space("assets") + txspace = connection.get_space("transactions") + inxspace = connection.get_space("inputs") + outxspace = connection.get_space("outputs") + keysxspace = connection.get_space("keys") + metadatasxspace = connection.get_space("meta_data") + assetsxspace = connection.get_space("assets") for transaction in signed_transactions: txprepare = TransactionDecompose(transaction) @@ -101,7 +101,7 @@ def get_transactions(connection, transactions_ids: list): @register_query(TarantoolDBConnection) def store_metadatas(connection, metadata: list): - space = connection.space("meta_data") + space = connection.get_space("meta_data") for meta in metadata: space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) @@ -109,7 +109,7 @@ def store_metadatas(connection, metadata: list): @register_query(TarantoolDBConnection) def get_metadata(connection, transaction_ids: list): _returned_data = [] - space = connection.space("meta_data") + space = connection.get_space("meta_data") for _id in transaction_ids: metadata = space.select(_id, index="id_search").data if len(metadata) > 0: @@ -119,7 +119,7 @@ def get_metadata(connection, transaction_ids: list): @register_query(TarantoolDBConnection) def store_asset(connection, asset): - space = connection.space("assets") + space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) try: space.insert(convert(asset)) @@ -129,7 +129,7 @@ def store_asset(connection, asset): @register_query(TarantoolDBConnection) def store_assets(connection, assets: list): - space = connection.space("assets") + space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) for asset in assets: try: @@ -140,7 +140,7 @@ def store_assets(connection, assets: list): @register_query(TarantoolDBConnection) def get_asset(connection, asset_id: str): - space = connection.space("assets") + space = connection.get_space("assets") _data = space.select(asset_id, index="txid_search") _data = _data.data return _data[0][0] if len(_data) > 0 else [] @@ -157,7 +157,7 @@ def get_assets(connection, assets_ids: list) -> list: @register_query(TarantoolDBConnection) def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): - space = connection.space("inputs") + space = connection.get_space("inputs") _inputs = space.select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") _inputs = _inputs.data _transactions = _group_transaction_by_ids(txids=[inp[0] for inp in _inputs], connection=connection) @@ -166,14 +166,14 @@ def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str @register_query(TarantoolDBConnection) def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR - space = connection.space("blocks") + space = connection.get_space("blocks") _all_blocks = space.select() _all_blocks = _all_blocks.data block = {"app_hash": '', "height": 0, "transactions": []} if len(_all_blocks) > 0: _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] - space = connection.space("blocks_tx") + space = connection.get_space("blocks_tx") _txids = space.select(_block[2], index="block_search") _txids = _txids.data block["app_hash"] = _block[0] @@ -186,12 +186,12 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR @register_query(TarantoolDBConnection) def store_block(connection, block: dict): - space = connection.space("blocks") + space = connection.get_space("blocks") block_unique_id = token_hex(8) space.insert((block["app_hash"], block["height"], block_unique_id)) - space = connection.space("blocks_tx") + space = connection.get_space("blocks_tx") for txid in block["transactions"]: space.insert((txid, block_unique_id)) @@ -206,8 +206,8 @@ def get_txids_filtered(connection, asset_id: str, operation: str = None, # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR None: {"sets": [asset_id, asset_id]} }[operation] - tx_space = connection.space("transactions") - assets_space = connection.space("assets") + tx_space = connection.get_space("transactions") + assets_space = connection.get_space("assets") _transactions = [] if actions["sets"][0] == "CREATE": # + _transactions = tx_space.select([operation, asset_id], index=actions["index"]) @@ -259,7 +259,7 @@ def _remove_text_score(asset): @register_query(TarantoolDBConnection) def get_owned_ids(connection, owner: str): - space = connection.space("keys") + space = connection.get_space("keys") _keys = space.select(owner, index="keys_search") if len(_keys.data) == 0: return [] @@ -283,13 +283,13 @@ def get_spending_transactions(connection, inputs): @register_query(TarantoolDBConnection) def get_block(connection, block_id=[]): - space = connection.space("blocks") + space = connection.get_space("blocks") _block = space.select(block_id, index="block_search", limit=1) _block = _block.data if len(_block) == 0: return [] _block = _block[0] - space = connection.space("blocks_tx") + space = connection.get_space("blocks_tx") _txblock = space.select(_block[2], index="block_search") _txblock = _txblock.data return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} @@ -297,24 +297,24 @@ def get_block(connection, block_id=[]): @register_query(TarantoolDBConnection) def get_block_with_transaction(connection, txid: str): - space = connection.space("blocks_tx") + space = connection.get_space("blocks_tx") _all_blocks_tx = space.select(txid, index="id_search") _all_blocks_tx = _all_blocks_tx.data if len(_all_blocks_tx) == 0: return [] - space = connection.space("blocks") + space = connection.get_space("blocks") _block = space.select(_all_blocks_tx[0][1], index="block_id_search") return [{"height": _height[1]} for _height in _block.data] @register_query(TarantoolDBConnection) def delete_transactions(connection, txn_ids: list): - tx_space = connection.space("transactions") + tx_space = connection.get_space("transactions") for _id in txn_ids: tx_space.delete(_id) - inputs_space = connection.space("inputs") - outputs_space = connection.space("outputs") - k_space = connection.space("keys") + inputs_space = connection.get_space("inputs") + outputs_space = connection.get_space("outputs") + k_space = connection.get_space("keys") for _id in txn_ids: _inputs = inputs_space.select(_id, index="id_search") _outputs = outputs_space.select(_id, index="id_search") @@ -326,18 +326,18 @@ def delete_transactions(connection, txn_ids: list): for _outpID in _outputs: outputs_space.delete(_outpID[5], index="unique_search") - meta_space = connection.space("meta_data") + meta_space = connection.get_space("meta_data") for _id in txn_ids: meta_space.delete(_id, index="id_search") - assets_space = connection.space("assets") + assets_space = connection.get_space("assets") for _id in txn_ids: assets_space.delete(_id, index="txid_search") @register_query(TarantoolDBConnection) def store_unspent_outputs(connection, *unspent_outputs: list): - space = connection.space('utxos') + space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: @@ -348,7 +348,7 @@ def store_unspent_outputs(connection, *unspent_outputs: list): @register_query(TarantoolDBConnection) def delete_unspent_outputs(connection, *unspent_outputs: list): - space = connection.space('utxos') + space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: @@ -359,14 +359,14 @@ def delete_unspent_outputs(connection, *unspent_outputs: list): @register_query(TarantoolDBConnection) def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. - space = connection.space('utxos') + space = connection.get_space('utxos') _utxos = space.select([]).data return [loads(utx[2]) for utx in _utxos] @register_query(TarantoolDBConnection) def store_pre_commit_state(connection, state: dict): - space = connection.space("pre_commits") + space = connection.get_space("pre_commits") _precommit = space.select(state["height"], index="height_search", limit=1) unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] space.upsert((unique_id, state["height"], state["transactions"]), @@ -378,7 +378,7 @@ def store_pre_commit_state(connection, state: dict): @register_query(TarantoolDBConnection) def get_pre_commit_state(connection): - space = connection.space("pre_commits") + space = connection.get_space("pre_commits") _commit = space.select([], index="id_search").data if len(_commit) == 0: return None @@ -388,7 +388,7 @@ def get_pre_commit_state(connection): @register_query(TarantoolDBConnection) def store_validator_set(conn, validators_update: dict): - space = conn.space("validators") + space = conn.get_space("validators") _validator = space.select(validators_update["height"], index="height_search", limit=1) unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] space.upsert((unique_id, validators_update["height"], validators_update["validators"]), @@ -400,7 +400,7 @@ def store_validator_set(conn, validators_update: dict): @register_query(TarantoolDBConnection) def delete_validator_set(connection, height: int): - space = connection.space("validators") + space = connection.get_space("validators") _validators = space.select(height, index="height_search") for _valid in _validators.data: space.delete(_valid[0]) @@ -408,7 +408,7 @@ def delete_validator_set(connection, height: int): @register_query(TarantoolDBConnection) def store_election(connection, election_id: str, height: int, is_concluded: bool): - space = connection.space("elections") + space = connection.get_space("elections") space.upsert((election_id, height, is_concluded), op_list=[('=', 0, election_id), ('=', 1, height), @@ -418,7 +418,7 @@ def store_election(connection, election_id: str, height: int, is_concluded: bool @register_query(TarantoolDBConnection) def store_elections(connection, elections: list): - space = connection.space("elections") + space = connection.get_space("elections") for election in elections: _election = space.insert((election["election_id"], election["height"], diff --git a/tests/backend/tarantool/test_queries.py b/tests/backend/tarantool/test_queries.py index 3a25c34..ef316fa 100644 --- a/tests/backend/tarantool/test_queries.py +++ b/tests/backend/tarantool/test_queries.py @@ -186,7 +186,7 @@ def test_write_metadata(db_conn): query.store_metadatas(connection=conn, metadata=metadata) # check that 3 assets were written to the database - space = conn.space("meta_data") + space = conn.get_space("meta_data") metadatas = [] for meta in metadata: _data = space.select(meta["id"]) @@ -306,7 +306,7 @@ def test_store_block(db_conn): transactions=[]) query.store_block(connection=conn, block=block._asdict()) # block = query.get_block(connection=conn) - blocks = conn.space("blocks").select([]) + blocks = conn.get_space("blocks").select([]) assert len(blocks.data) == 1 diff --git a/tests/backend/tarantool/test_schema.py b/tests/backend/tarantool/test_schema.py index c9d144f..d6e7485 100644 --- a/tests/backend/tarantool/test_schema.py +++ b/tests/backend/tarantool/test_schema.py @@ -10,7 +10,7 @@ def _check_spaces_by_list(conn, space_names): _exists = [] for name in space_names: try: - conn.space(name) + conn.get_space(name) _exists.append(name) except: pass diff --git a/tests/tendermint/test_lib.py b/tests/tendermint/test_lib.py index 592ce3e..3533835 100644 --- a/tests/tendermint/test_lib.py +++ b/tests/tendermint/test_lib.py @@ -157,7 +157,7 @@ def test_post_transaction_invalid_mode(b): @pytest.mark.bdb def test_update_utxoset(b, signed_create_tx, signed_transfer_tx, db_conn): b.update_utxoset(signed_create_tx) - utxoset = db_conn.space('utxos') + utxoset = db_conn.get_space('utxos') assert utxoset.select().rowcount == 1 utxo = utxoset.select().data assert utxo[0][0] == signed_create_tx.id @@ -318,7 +318,7 @@ def test_delete_one_unspent_outputs(b, utxoset): assert utxo_collection.count_documents( {'transaction_id': 'a', 'output_index': 0}) == 0 else: - utx_space = b.connection.space("utxos") + utx_space = b.connection.get_space("utxos") res1 = utx_space.select(['a', 1], index="id_search").data res2 = utx_space.select(['b', 0], index="id_search").data assert len(res1) + len(res2) == 2 From f1e4e386b680b0adf0475bfcaf0219a610a7f844 Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 15:31:48 +0300 Subject: [PATCH 09/10] Added run() method to TarantoolDBConnection Class. --- planetmint/backend/tarantool/connection.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index 11f22c9..34b769d 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -54,6 +54,16 @@ class TarantoolDBConnection(Connection): def space(self, space_name: str): return self.query().space(space_name) + def run(self, query): + try: + return query.run(self.db_connect) + except tarantool.error.NetworkError: + return None + except tarantool.error.OperationalError as op_error: + raise op_error + except tarantool.error.SchemaError as schema_error: + raise schema_error + def get_connection(self): return self.db_connect From 3d44b50a54876a63f774b51bb94ddf3b71b33215 Mon Sep 17 00:00:00 2001 From: andrei Date: Tue, 31 May 2022 16:22:39 +0300 Subject: [PATCH 10/10] tarantool/query.py changed for using Lazy() implementation + little changes --- planetmint/backend/tarantool/connection.py | 12 +- planetmint/backend/tarantool/query.py | 300 +++++++++++---------- 2 files changed, 171 insertions(+), 141 deletions(-) diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index 34b769d..7889a91 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -43,7 +43,7 @@ class TarantoolDBConnection(Connection): return "".join(execute).encode() def query(self): - return Lazy + return Lazy() def _reconnect(self): self.db_connect = tarantool.connect(host=self.host, port=self.port) @@ -54,15 +54,15 @@ class TarantoolDBConnection(Connection): def space(self, space_name: str): return self.query().space(space_name) - def run(self, query): + def run(self, query, only_data=True): try: - return query.run(self.db_connect) - except tarantool.error.NetworkError: + return query.run(self.db_connect).data if only_data else query.run(self.db_connect) + except tarantool.error.SchemaError: return None except tarantool.error.OperationalError as op_error: raise op_error - except tarantool.error.SchemaError as schema_error: - raise schema_error + except tarantool.error.NetworkError as net_error: + raise net_error def get_connection(self): return self.db_connect diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py index 3a64e04..dc9acfa 100644 --- a/planetmint/backend/tarantool/query.py +++ b/planetmint/backend/tarantool/query.py @@ -101,17 +101,20 @@ def get_transactions(connection, transactions_ids: list): @register_query(TarantoolDBConnection) def store_metadatas(connection, metadata: list): - space = connection.get_space("meta_data") for meta in metadata: - space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) + connection.run( + connection.space("meta_data").insert( + (meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) + ) @register_query(TarantoolDBConnection) def get_metadata(connection, transaction_ids: list): _returned_data = [] - space = connection.get_space("meta_data") for _id in transaction_ids: - metadata = space.select(_id, index="id_search").data + metadata = connection.run( + connection.space("meta_data").select(_id, index="id_search") + ).data if len(metadata) > 0: _returned_data.append(metadata) return _returned_data if len(_returned_data) > 0 else None @@ -119,30 +122,32 @@ def get_metadata(connection, transaction_ids: list): @register_query(TarantoolDBConnection) def store_asset(connection, asset): - space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) try: - space.insert(convert(asset)) - except: # TODO Add Raise For Duplicate - print("DUPLICATE ERROR") + return connection.run( + connection.space("assets").insert(convert(asset)) + ) + except tarantool.error.DatabaseError: + pass @register_query(TarantoolDBConnection) def store_assets(connection, assets: list): - space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) for asset in assets: try: - space.insert(convert(asset)) - except Exception as ex: # TODO Raise ERROR for Duplicate - print(f"EXCEPTION : {ex} ") + connection.run( + connection.space("assets").insert(convert(asset)) + ) + except tarantool.error.DatabaseError: + pass @register_query(TarantoolDBConnection) def get_asset(connection, asset_id: str): - space = connection.get_space("assets") - _data = space.select(asset_id, index="txid_search") - _data = _data.data + _data = connection.run( + connection.space("assets").select(asset_id, index="txid_search") + ).data return _data[0][0] if len(_data) > 0 else [] @@ -157,25 +162,25 @@ def get_assets(connection, assets_ids: list) -> list: @register_query(TarantoolDBConnection) def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): - space = connection.get_space("inputs") - _inputs = space.select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") - _inputs = _inputs.data + _inputs = connection.run( + connection.space("inputs").select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") + ).data _transactions = _group_transaction_by_ids(txids=[inp[0] for inp in _inputs], connection=connection) return _transactions @register_query(TarantoolDBConnection) def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR - space = connection.get_space("blocks") - _all_blocks = space.select() - _all_blocks = _all_blocks.data + _all_blocks = connection.run( + connection.space("blocks").select() + ).data block = {"app_hash": '', "height": 0, "transactions": []} if len(_all_blocks) > 0: _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] - space = connection.get_space("blocks_tx") - _txids = space.select(_block[2], index="block_search") - _txids = _txids.data + _txids = connection.run( + connection.space("blocks_tx").select(_block[2], index="block_search") + ).data block["app_hash"] = _block[0] block["height"] = _block[1] block["transactions"] = [tx[0] for tx in _txids] @@ -186,14 +191,16 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR @register_query(TarantoolDBConnection) def store_block(connection, block: dict): - space = connection.get_space("blocks") block_unique_id = token_hex(8) - space.insert((block["app_hash"], - block["height"], - block_unique_id)) - space = connection.get_space("blocks_tx") + connection.run( + connection.space("blocks").insert((block["app_hash"], + block["height"], + block_unique_id)) + ) for txid in block["transactions"]: - space.insert((txid, block_unique_id)) + connection.run( + connection.space("blocks_tx").insert((txid, block_unique_id)) + ) @register_query(TarantoolDBConnection) @@ -206,24 +213,30 @@ def get_txids_filtered(connection, asset_id: str, operation: str = None, # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR None: {"sets": [asset_id, asset_id]} }[operation] - tx_space = connection.get_space("transactions") - assets_space = connection.get_space("assets") _transactions = [] if actions["sets"][0] == "CREATE": # + - _transactions = tx_space.select([operation, asset_id], index=actions["index"]) - _transactions = _transactions.data + _transactions = connection.run( + connection.space("transactions").select([operation, asset_id], index=actions["index"]) + ).data elif actions["sets"][0] == "TRANSFER": # + - _assets = assets_space.select([asset_id], index="only_asset_search").data + _assets = connection.run( + connection.space("assets").select([asset_id], index="only_asset_search") + ).data for asset in _assets: _txid = asset[1] - _transactions = tx_space.select([operation, _txid], index=actions["index"]).data + _transactions = connection.run( + connection.space("transactions").select([operation, _txid], index=actions["index"]) + ).data if len(_transactions) != 0: break else: - _tx_ids = tx_space.select([asset_id], index="id_search") - # _assets_ids = tx_space.select([asset_id], index="only_asset_search") - _assets_ids = assets_space.select([asset_id], index="only_asset_search") - return tuple(set([sublist[1] for sublist in _assets_ids.data] + [sublist[0] for sublist in _tx_ids.data])) + _tx_ids = connection.run( + connection.space("transactions").select([asset_id], index="id_search") + ).data + _assets_ids = connection.run( + connection.space("assets").select([asset_id], index="only_asset_search") + ) + return tuple(set([sublist[1] for sublist in _assets_ids] + [sublist[0] for sublist in _tx_ids])) if last_tx: return tuple(next(iter(_transactions))) @@ -259,11 +272,12 @@ def _remove_text_score(asset): @register_query(TarantoolDBConnection) def get_owned_ids(connection, owner: str): - space = connection.get_space("keys") - _keys = space.select(owner, index="keys_search") - if len(_keys.data) == 0: + _keys = connection.run( + connection.space("keys").select(owner, index="keys_search") + ).data + if len(_keys) == 0: return [] - _transactionids = list(set([key[1] for key in _keys.data])) + _transactionids = list(set([key[1] for key in _keys])) _transactions = _group_transaction_by_ids(txids=_transactionids, connection=connection) return _transactions @@ -283,28 +297,29 @@ def get_spending_transactions(connection, inputs): @register_query(TarantoolDBConnection) def get_block(connection, block_id=[]): - space = connection.get_space("blocks") - _block = space.select(block_id, index="block_search", limit=1) - _block = _block.data + _block = connection.run( + connection.space("blocks").select(block_id, index="block_search", limit=1) + ).data if len(_block) == 0: return [] _block = _block[0] - space = connection.get_space("blocks_tx") - _txblock = space.select(_block[2], index="block_search") - _txblock = _txblock.data + _txblock = connection.run( + connection.space("blocks_tx").select(_block[2], index="block_search") + ).data return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} @register_query(TarantoolDBConnection) def get_block_with_transaction(connection, txid: str): - space = connection.get_space("blocks_tx") - _all_blocks_tx = space.select(txid, index="id_search") - _all_blocks_tx = _all_blocks_tx.data + _all_blocks_tx = connection.run( + connection.space("blocks_tx").select(txid, index="id_search") + ).data if len(_all_blocks_tx) == 0: return [] - space = connection.get_space("blocks") - _block = space.select(_all_blocks_tx[0][1], index="block_id_search") - return [{"height": _height[1]} for _height in _block.data] + _block = connection.run( + connection.space("blocks").select(_all_blocks_tx[0][1], index="block_id_search") + ).data + return [{"height": _height[1]} for _height in _block] @register_query(TarantoolDBConnection) @@ -312,75 +327,77 @@ def delete_transactions(connection, txn_ids: list): tx_space = connection.get_space("transactions") for _id in txn_ids: tx_space.delete(_id) - inputs_space = connection.get_space("inputs") - outputs_space = connection.get_space("outputs") - k_space = connection.get_space("keys") for _id in txn_ids: - _inputs = inputs_space.select(_id, index="id_search") - _outputs = outputs_space.select(_id, index="id_search") - _keys = k_space.select(_id, index="txid_search") + _inputs = connection.run(connection.space("inputs").select(_id, index="id_search")) + _outputs = connection.run(connection.space("outputs").select(_id, index="id_search")) + _keys = connection.run(connection.space("keys").select(_id, index="txid_search")) for _kID in _keys: - k_space.delete(_kID[0], index="id_search") + connection.run(connection.space("keys").delete(_kID[0], index="id_search")) for _inpID in _inputs: - inputs_space.delete(_inpID[5], index="delete_search") + connection.run(connection.space("inputs").delete(_inpID[5], index="delete_search")) for _outpID in _outputs: - outputs_space.delete(_outpID[5], index="unique_search") + connection.run(connection.space("outputs").delete(_outpID[5], index="unique_search")) - meta_space = connection.get_space("meta_data") for _id in txn_ids: - meta_space.delete(_id, index="id_search") + connection.run(connection.space("meta_data").delete(_id, index="id_search")) - assets_space = connection.get_space("assets") for _id in txn_ids: - assets_space.delete(_id, index="txid_search") + connection.run(connection.space("assets").delete(_id, index="txid_search")) @register_query(TarantoolDBConnection) def store_unspent_outputs(connection, *unspent_outputs: list): - space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: - output = space.insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo))) - result.append(output.data) + output = connection.run( + connection.space("utxos").insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo))) + ).data + result.append(output) return result @register_query(TarantoolDBConnection) def delete_unspent_outputs(connection, *unspent_outputs: list): - space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: - output = space.delete((utxo['transaction_id'], utxo['output_index'])) - result.append(output.data) + output = connection.run( + connection.space("utxos").delete((utxo['transaction_id'], utxo['output_index'])) + ).data + result.append(output) return result @register_query(TarantoolDBConnection) def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. - space = connection.get_space('utxos') - _utxos = space.select([]).data + _utxos = connection.run( + connection.space("utxos").select([]) + ).data return [loads(utx[2]) for utx in _utxos] @register_query(TarantoolDBConnection) def store_pre_commit_state(connection, state: dict): - space = connection.get_space("pre_commits") - _precommit = space.select(state["height"], index="height_search", limit=1) + _precommit = connection.run( + connection.space("pre_commits").select(state["height"], index="height_search", limit=1) + ) unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] - space.upsert((unique_id, state["height"], state["transactions"]), - op_list=[('=', 0, unique_id), - ('=', 1, state["height"]), - ('=', 2, state["transactions"])], - limit=1) + connection.run( + connection.space("pre_commits").upsert((unique_id, state["height"], state["transactions"]), + op_list=[('=', 0, unique_id), + ('=', 1, state["height"]), + ('=', 2, state["transactions"])], + limit=1) + ) @register_query(TarantoolDBConnection) def get_pre_commit_state(connection): - space = connection.get_space("pre_commits") - _commit = space.select([], index="id_search").data - if len(_commit) == 0: + _commit = connection.run( + connection.space("pre_commits").select([], index="id_search") + ) + if _commit is None or len(_commit) == 0: return None _commit = sorted(_commit, key=itemgetter(1), reverse=True)[0] return {"height": _commit[1], "transactions": _commit[2]} @@ -388,56 +405,67 @@ def get_pre_commit_state(connection): @register_query(TarantoolDBConnection) def store_validator_set(conn, validators_update: dict): - space = conn.get_space("validators") - _validator = space.select(validators_update["height"], index="height_search", limit=1) + _validator = conn.run( + conn.space("validators").select(validators_update["height"], index="height_search", limit=1) + ) unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] - space.upsert((unique_id, validators_update["height"], validators_update["validators"]), - op_list=[('=', 0, unique_id), - ('=', 1, validators_update["height"]), - ('=', 2, validators_update["validators"])], - limit=1) + conn.run( + conn.space("validators").upsert((unique_id, validators_update["height"], validators_update["validators"]), + op_list=[('=', 0, unique_id), + ('=', 1, validators_update["height"]), + ('=', 2, validators_update["validators"])], + limit=1) + ) @register_query(TarantoolDBConnection) def delete_validator_set(connection, height: int): - space = connection.get_space("validators") - _validators = space.select(height, index="height_search") + _validators = connection.run( + connection.space("validators").select(height, index="height_search") + ) for _valid in _validators.data: - space.delete(_valid[0]) + connection.run( + connection.space("validators").delete(_valid[0]) + ) @register_query(TarantoolDBConnection) def store_election(connection, election_id: str, height: int, is_concluded: bool): - space = connection.get_space("elections") - space.upsert((election_id, height, is_concluded), - op_list=[('=', 0, election_id), - ('=', 1, height), - ('=', 2, is_concluded)], - limit=1) + connection.run( + connection.space("elections").upsert((election_id, height, is_concluded), + op_list=[('=', 0, election_id), + ('=', 1, height), + ('=', 2, is_concluded)], + limit=1) + ) @register_query(TarantoolDBConnection) def store_elections(connection, elections: list): - space = connection.get_space("elections") for election in elections: - _election = space.insert((election["election_id"], - election["height"], - election["is_concluded"])) + _election = connection.run( + connection.space("elections").insert((election["election_id"], + election["height"], + election["is_concluded"])) + ) @register_query(TarantoolDBConnection) def delete_elections(connection, height: int): - space = connection.space("elections") - _elections = space.select(height, index="height_search") - for _elec in _elections.data: - space.delete(_elec[0]) + _elections = connection.run( + connection.space("elections").select(height, index="height_search") + ).data + for _elec in _elections: + connection.run( + connection.space("elections").delete(_elec[0]) + ) @register_query(TarantoolDBConnection) def get_validator_set(connection, height: int = None): - space = connection.space("validators") - _validators = space.select() - _validators = _validators.data + _validators = connection.run( + connection.space("validators").select() + ).data if height is not None: _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if validator[1] <= height] @@ -449,9 +477,9 @@ def get_validator_set(connection, height: int = None): @register_query(TarantoolDBConnection) def get_election(connection, election_id: str): - space = connection.space("elections") - _elections = space.select(election_id, index="id_search") - _elections = _elections.data + _elections = connection.run( + connection.space("elections").select(election_id, index="id_search") + ).data if len(_elections) == 0: return None _election = sorted(_elections, key=itemgetter(0), reverse=True)[0] @@ -460,38 +488,40 @@ def get_election(connection, election_id: str): @register_query(TarantoolDBConnection) def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): - space = connection.space("keys") - # _keys = space.select([public_key], index="keys_search") - space = connection.space("assets") - _transactions = space.select([asset_id], index="assetid_search") - # _transactions = _transactions - # _keys = _keys.data + _transactions = connection.run( + connection.space("assets").select([asset_id], index="assetid_search") + ).data _grouped_transactions = _group_transaction_by_ids(connection=connection, txids=[_tx[1] for _tx in _transactions]) return _grouped_transactions @register_query(TarantoolDBConnection) def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True): - space = connection.space("abci_chains") - space.upsert((height, is_synced, chain_id), - op_list=[('=', 0, height), - ('=', 1, is_synced), - ('=', 2, chain_id)], - limit=1) + connection.run( + connection.space("abci_chains").upsert((height, is_synced, chain_id), + op_list=[('=', 0, height), + ('=', 1, is_synced), + ('=', 2, chain_id)], + limit=1) + ) @register_query(TarantoolDBConnection) def delete_abci_chain(connection, height: int): - space = connection.space("abci_chains") - _chains = space.select(height, index="height_search") - for _chain in _chains.data: - space.delete(_chain[2]) + _chains = connection.run( + connection.space("abci_chains").select(height, index="height_search") + ).data + for _chain in _chains: + connection.run( + connection.space("abci_chains").delete(_chain[2]) + ) @register_query(TarantoolDBConnection) def get_latest_abci_chain(connection): - space = connection.space("abci_chains") - _all_chains = space.select().data + _all_chains = connection.run( + connection.space("abci_chains").select() + ).data if len(_all_chains) == 0: return None _chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0]