diff --git a/planetmint/backend/tarantool/connection.py b/planetmint/backend/tarantool/connection.py index 34b769d..7889a91 100644 --- a/planetmint/backend/tarantool/connection.py +++ b/planetmint/backend/tarantool/connection.py @@ -43,7 +43,7 @@ class TarantoolDBConnection(Connection): return "".join(execute).encode() def query(self): - return Lazy + return Lazy() def _reconnect(self): self.db_connect = tarantool.connect(host=self.host, port=self.port) @@ -54,15 +54,15 @@ class TarantoolDBConnection(Connection): def space(self, space_name: str): return self.query().space(space_name) - def run(self, query): + def run(self, query, only_data=True): try: - return query.run(self.db_connect) - except tarantool.error.NetworkError: + return query.run(self.db_connect).data if only_data else query.run(self.db_connect) + except tarantool.error.SchemaError: return None except tarantool.error.OperationalError as op_error: raise op_error - except tarantool.error.SchemaError as schema_error: - raise schema_error + except tarantool.error.NetworkError as net_error: + raise net_error def get_connection(self): return self.db_connect diff --git a/planetmint/backend/tarantool/query.py b/planetmint/backend/tarantool/query.py index 3a64e04..dc9acfa 100644 --- a/planetmint/backend/tarantool/query.py +++ b/planetmint/backend/tarantool/query.py @@ -101,17 +101,20 @@ def get_transactions(connection, transactions_ids: list): @register_query(TarantoolDBConnection) def store_metadatas(connection, metadata: list): - space = connection.get_space("meta_data") for meta in metadata: - space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) + connection.run( + connection.space("meta_data").insert( + (meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) + ) @register_query(TarantoolDBConnection) def get_metadata(connection, transaction_ids: list): _returned_data = [] - space = connection.get_space("meta_data") for _id in transaction_ids: - metadata = space.select(_id, index="id_search").data + metadata = connection.run( + connection.space("meta_data").select(_id, index="id_search") + ).data if len(metadata) > 0: _returned_data.append(metadata) return _returned_data if len(_returned_data) > 0 else None @@ -119,30 +122,32 @@ def get_metadata(connection, transaction_ids: list): @register_query(TarantoolDBConnection) def store_asset(connection, asset): - space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) try: - space.insert(convert(asset)) - except: # TODO Add Raise For Duplicate - print("DUPLICATE ERROR") + return connection.run( + connection.space("assets").insert(convert(asset)) + ) + except tarantool.error.DatabaseError: + pass @register_query(TarantoolDBConnection) def store_assets(connection, assets: list): - space = connection.get_space("assets") convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) for asset in assets: try: - space.insert(convert(asset)) - except Exception as ex: # TODO Raise ERROR for Duplicate - print(f"EXCEPTION : {ex} ") + connection.run( + connection.space("assets").insert(convert(asset)) + ) + except tarantool.error.DatabaseError: + pass @register_query(TarantoolDBConnection) def get_asset(connection, asset_id: str): - space = connection.get_space("assets") - _data = space.select(asset_id, index="txid_search") - _data = _data.data + _data = connection.run( + connection.space("assets").select(asset_id, index="txid_search") + ).data return _data[0][0] if len(_data) > 0 else [] @@ -157,25 +162,25 @@ def get_assets(connection, assets_ids: list) -> list: @register_query(TarantoolDBConnection) def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): - space = connection.get_space("inputs") - _inputs = space.select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") - _inputs = _inputs.data + _inputs = connection.run( + connection.space("inputs").select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") + ).data _transactions = _group_transaction_by_ids(txids=[inp[0] for inp in _inputs], connection=connection) return _transactions @register_query(TarantoolDBConnection) def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR - space = connection.get_space("blocks") - _all_blocks = space.select() - _all_blocks = _all_blocks.data + _all_blocks = connection.run( + connection.space("blocks").select() + ).data block = {"app_hash": '', "height": 0, "transactions": []} if len(_all_blocks) > 0: _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] - space = connection.get_space("blocks_tx") - _txids = space.select(_block[2], index="block_search") - _txids = _txids.data + _txids = connection.run( + connection.space("blocks_tx").select(_block[2], index="block_search") + ).data block["app_hash"] = _block[0] block["height"] = _block[1] block["transactions"] = [tx[0] for tx in _txids] @@ -186,14 +191,16 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR @register_query(TarantoolDBConnection) def store_block(connection, block: dict): - space = connection.get_space("blocks") block_unique_id = token_hex(8) - space.insert((block["app_hash"], - block["height"], - block_unique_id)) - space = connection.get_space("blocks_tx") + connection.run( + connection.space("blocks").insert((block["app_hash"], + block["height"], + block_unique_id)) + ) for txid in block["transactions"]: - space.insert((txid, block_unique_id)) + connection.run( + connection.space("blocks_tx").insert((txid, block_unique_id)) + ) @register_query(TarantoolDBConnection) @@ -206,24 +213,30 @@ def get_txids_filtered(connection, asset_id: str, operation: str = None, # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR None: {"sets": [asset_id, asset_id]} }[operation] - tx_space = connection.get_space("transactions") - assets_space = connection.get_space("assets") _transactions = [] if actions["sets"][0] == "CREATE": # + - _transactions = tx_space.select([operation, asset_id], index=actions["index"]) - _transactions = _transactions.data + _transactions = connection.run( + connection.space("transactions").select([operation, asset_id], index=actions["index"]) + ).data elif actions["sets"][0] == "TRANSFER": # + - _assets = assets_space.select([asset_id], index="only_asset_search").data + _assets = connection.run( + connection.space("assets").select([asset_id], index="only_asset_search") + ).data for asset in _assets: _txid = asset[1] - _transactions = tx_space.select([operation, _txid], index=actions["index"]).data + _transactions = connection.run( + connection.space("transactions").select([operation, _txid], index=actions["index"]) + ).data if len(_transactions) != 0: break else: - _tx_ids = tx_space.select([asset_id], index="id_search") - # _assets_ids = tx_space.select([asset_id], index="only_asset_search") - _assets_ids = assets_space.select([asset_id], index="only_asset_search") - return tuple(set([sublist[1] for sublist in _assets_ids.data] + [sublist[0] for sublist in _tx_ids.data])) + _tx_ids = connection.run( + connection.space("transactions").select([asset_id], index="id_search") + ).data + _assets_ids = connection.run( + connection.space("assets").select([asset_id], index="only_asset_search") + ) + return tuple(set([sublist[1] for sublist in _assets_ids] + [sublist[0] for sublist in _tx_ids])) if last_tx: return tuple(next(iter(_transactions))) @@ -259,11 +272,12 @@ def _remove_text_score(asset): @register_query(TarantoolDBConnection) def get_owned_ids(connection, owner: str): - space = connection.get_space("keys") - _keys = space.select(owner, index="keys_search") - if len(_keys.data) == 0: + _keys = connection.run( + connection.space("keys").select(owner, index="keys_search") + ).data + if len(_keys) == 0: return [] - _transactionids = list(set([key[1] for key in _keys.data])) + _transactionids = list(set([key[1] for key in _keys])) _transactions = _group_transaction_by_ids(txids=_transactionids, connection=connection) return _transactions @@ -283,28 +297,29 @@ def get_spending_transactions(connection, inputs): @register_query(TarantoolDBConnection) def get_block(connection, block_id=[]): - space = connection.get_space("blocks") - _block = space.select(block_id, index="block_search", limit=1) - _block = _block.data + _block = connection.run( + connection.space("blocks").select(block_id, index="block_search", limit=1) + ).data if len(_block) == 0: return [] _block = _block[0] - space = connection.get_space("blocks_tx") - _txblock = space.select(_block[2], index="block_search") - _txblock = _txblock.data + _txblock = connection.run( + connection.space("blocks_tx").select(_block[2], index="block_search") + ).data return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} @register_query(TarantoolDBConnection) def get_block_with_transaction(connection, txid: str): - space = connection.get_space("blocks_tx") - _all_blocks_tx = space.select(txid, index="id_search") - _all_blocks_tx = _all_blocks_tx.data + _all_blocks_tx = connection.run( + connection.space("blocks_tx").select(txid, index="id_search") + ).data if len(_all_blocks_tx) == 0: return [] - space = connection.get_space("blocks") - _block = space.select(_all_blocks_tx[0][1], index="block_id_search") - return [{"height": _height[1]} for _height in _block.data] + _block = connection.run( + connection.space("blocks").select(_all_blocks_tx[0][1], index="block_id_search") + ).data + return [{"height": _height[1]} for _height in _block] @register_query(TarantoolDBConnection) @@ -312,75 +327,77 @@ def delete_transactions(connection, txn_ids: list): tx_space = connection.get_space("transactions") for _id in txn_ids: tx_space.delete(_id) - inputs_space = connection.get_space("inputs") - outputs_space = connection.get_space("outputs") - k_space = connection.get_space("keys") for _id in txn_ids: - _inputs = inputs_space.select(_id, index="id_search") - _outputs = outputs_space.select(_id, index="id_search") - _keys = k_space.select(_id, index="txid_search") + _inputs = connection.run(connection.space("inputs").select(_id, index="id_search")) + _outputs = connection.run(connection.space("outputs").select(_id, index="id_search")) + _keys = connection.run(connection.space("keys").select(_id, index="txid_search")) for _kID in _keys: - k_space.delete(_kID[0], index="id_search") + connection.run(connection.space("keys").delete(_kID[0], index="id_search")) for _inpID in _inputs: - inputs_space.delete(_inpID[5], index="delete_search") + connection.run(connection.space("inputs").delete(_inpID[5], index="delete_search")) for _outpID in _outputs: - outputs_space.delete(_outpID[5], index="unique_search") + connection.run(connection.space("outputs").delete(_outpID[5], index="unique_search")) - meta_space = connection.get_space("meta_data") for _id in txn_ids: - meta_space.delete(_id, index="id_search") + connection.run(connection.space("meta_data").delete(_id, index="id_search")) - assets_space = connection.get_space("assets") for _id in txn_ids: - assets_space.delete(_id, index="txid_search") + connection.run(connection.space("assets").delete(_id, index="txid_search")) @register_query(TarantoolDBConnection) def store_unspent_outputs(connection, *unspent_outputs: list): - space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: - output = space.insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo))) - result.append(output.data) + output = connection.run( + connection.space("utxos").insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo))) + ).data + result.append(output) return result @register_query(TarantoolDBConnection) def delete_unspent_outputs(connection, *unspent_outputs: list): - space = connection.get_space('utxos') result = [] if unspent_outputs: for utxo in unspent_outputs: - output = space.delete((utxo['transaction_id'], utxo['output_index'])) - result.append(output.data) + output = connection.run( + connection.space("utxos").delete((utxo['transaction_id'], utxo['output_index'])) + ).data + result.append(output) return result @register_query(TarantoolDBConnection) def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. - space = connection.get_space('utxos') - _utxos = space.select([]).data + _utxos = connection.run( + connection.space("utxos").select([]) + ).data return [loads(utx[2]) for utx in _utxos] @register_query(TarantoolDBConnection) def store_pre_commit_state(connection, state: dict): - space = connection.get_space("pre_commits") - _precommit = space.select(state["height"], index="height_search", limit=1) + _precommit = connection.run( + connection.space("pre_commits").select(state["height"], index="height_search", limit=1) + ) unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] - space.upsert((unique_id, state["height"], state["transactions"]), - op_list=[('=', 0, unique_id), - ('=', 1, state["height"]), - ('=', 2, state["transactions"])], - limit=1) + connection.run( + connection.space("pre_commits").upsert((unique_id, state["height"], state["transactions"]), + op_list=[('=', 0, unique_id), + ('=', 1, state["height"]), + ('=', 2, state["transactions"])], + limit=1) + ) @register_query(TarantoolDBConnection) def get_pre_commit_state(connection): - space = connection.get_space("pre_commits") - _commit = space.select([], index="id_search").data - if len(_commit) == 0: + _commit = connection.run( + connection.space("pre_commits").select([], index="id_search") + ) + if _commit is None or len(_commit) == 0: return None _commit = sorted(_commit, key=itemgetter(1), reverse=True)[0] return {"height": _commit[1], "transactions": _commit[2]} @@ -388,56 +405,67 @@ def get_pre_commit_state(connection): @register_query(TarantoolDBConnection) def store_validator_set(conn, validators_update: dict): - space = conn.get_space("validators") - _validator = space.select(validators_update["height"], index="height_search", limit=1) + _validator = conn.run( + conn.space("validators").select(validators_update["height"], index="height_search", limit=1) + ) unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] - space.upsert((unique_id, validators_update["height"], validators_update["validators"]), - op_list=[('=', 0, unique_id), - ('=', 1, validators_update["height"]), - ('=', 2, validators_update["validators"])], - limit=1) + conn.run( + conn.space("validators").upsert((unique_id, validators_update["height"], validators_update["validators"]), + op_list=[('=', 0, unique_id), + ('=', 1, validators_update["height"]), + ('=', 2, validators_update["validators"])], + limit=1) + ) @register_query(TarantoolDBConnection) def delete_validator_set(connection, height: int): - space = connection.get_space("validators") - _validators = space.select(height, index="height_search") + _validators = connection.run( + connection.space("validators").select(height, index="height_search") + ) for _valid in _validators.data: - space.delete(_valid[0]) + connection.run( + connection.space("validators").delete(_valid[0]) + ) @register_query(TarantoolDBConnection) def store_election(connection, election_id: str, height: int, is_concluded: bool): - space = connection.get_space("elections") - space.upsert((election_id, height, is_concluded), - op_list=[('=', 0, election_id), - ('=', 1, height), - ('=', 2, is_concluded)], - limit=1) + connection.run( + connection.space("elections").upsert((election_id, height, is_concluded), + op_list=[('=', 0, election_id), + ('=', 1, height), + ('=', 2, is_concluded)], + limit=1) + ) @register_query(TarantoolDBConnection) def store_elections(connection, elections: list): - space = connection.get_space("elections") for election in elections: - _election = space.insert((election["election_id"], - election["height"], - election["is_concluded"])) + _election = connection.run( + connection.space("elections").insert((election["election_id"], + election["height"], + election["is_concluded"])) + ) @register_query(TarantoolDBConnection) def delete_elections(connection, height: int): - space = connection.space("elections") - _elections = space.select(height, index="height_search") - for _elec in _elections.data: - space.delete(_elec[0]) + _elections = connection.run( + connection.space("elections").select(height, index="height_search") + ).data + for _elec in _elections: + connection.run( + connection.space("elections").delete(_elec[0]) + ) @register_query(TarantoolDBConnection) def get_validator_set(connection, height: int = None): - space = connection.space("validators") - _validators = space.select() - _validators = _validators.data + _validators = connection.run( + connection.space("validators").select() + ).data if height is not None: _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if validator[1] <= height] @@ -449,9 +477,9 @@ def get_validator_set(connection, height: int = None): @register_query(TarantoolDBConnection) def get_election(connection, election_id: str): - space = connection.space("elections") - _elections = space.select(election_id, index="id_search") - _elections = _elections.data + _elections = connection.run( + connection.space("elections").select(election_id, index="id_search") + ).data if len(_elections) == 0: return None _election = sorted(_elections, key=itemgetter(0), reverse=True)[0] @@ -460,38 +488,40 @@ def get_election(connection, election_id: str): @register_query(TarantoolDBConnection) def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): - space = connection.space("keys") - # _keys = space.select([public_key], index="keys_search") - space = connection.space("assets") - _transactions = space.select([asset_id], index="assetid_search") - # _transactions = _transactions - # _keys = _keys.data + _transactions = connection.run( + connection.space("assets").select([asset_id], index="assetid_search") + ).data _grouped_transactions = _group_transaction_by_ids(connection=connection, txids=[_tx[1] for _tx in _transactions]) return _grouped_transactions @register_query(TarantoolDBConnection) def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True): - space = connection.space("abci_chains") - space.upsert((height, is_synced, chain_id), - op_list=[('=', 0, height), - ('=', 1, is_synced), - ('=', 2, chain_id)], - limit=1) + connection.run( + connection.space("abci_chains").upsert((height, is_synced, chain_id), + op_list=[('=', 0, height), + ('=', 1, is_synced), + ('=', 2, chain_id)], + limit=1) + ) @register_query(TarantoolDBConnection) def delete_abci_chain(connection, height: int): - space = connection.space("abci_chains") - _chains = space.select(height, index="height_search") - for _chain in _chains.data: - space.delete(_chain[2]) + _chains = connection.run( + connection.space("abci_chains").select(height, index="height_search") + ).data + for _chain in _chains: + connection.run( + connection.space("abci_chains").delete(_chain[2]) + ) @register_query(TarantoolDBConnection) def get_latest_abci_chain(connection): - space = connection.space("abci_chains") - _all_chains = space.select().data + _all_chains = connection.run( + connection.space("abci_chains").select() + ).data if len(_all_chains) == 0: return None _chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0]