tarantool/query.py changed for using Lazy() implementation + little changes

This commit is contained in:
andrei 2022-05-31 16:22:39 +03:00
parent f1e4e386b6
commit 3d44b50a54
2 changed files with 171 additions and 141 deletions

View File

@ -43,7 +43,7 @@ class TarantoolDBConnection(Connection):
return "".join(execute).encode() return "".join(execute).encode()
def query(self): def query(self):
return Lazy return Lazy()
def _reconnect(self): def _reconnect(self):
self.db_connect = tarantool.connect(host=self.host, port=self.port) self.db_connect = tarantool.connect(host=self.host, port=self.port)
@ -54,15 +54,15 @@ class TarantoolDBConnection(Connection):
def space(self, space_name: str): def space(self, space_name: str):
return self.query().space(space_name) return self.query().space(space_name)
def run(self, query): def run(self, query, only_data=True):
try: try:
return query.run(self.db_connect) return query.run(self.db_connect).data if only_data else query.run(self.db_connect)
except tarantool.error.NetworkError: except tarantool.error.SchemaError:
return None return None
except tarantool.error.OperationalError as op_error: except tarantool.error.OperationalError as op_error:
raise op_error raise op_error
except tarantool.error.SchemaError as schema_error: except tarantool.error.NetworkError as net_error:
raise schema_error raise net_error
def get_connection(self): def get_connection(self):
return self.db_connect return self.db_connect

View File

@ -101,17 +101,20 @@ def get_transactions(connection, transactions_ids: list):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_metadatas(connection, metadata: list): def store_metadatas(connection, metadata: list):
space = connection.get_space("meta_data")
for meta in metadata: for meta in metadata:
space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"])) connection.run(
connection.space("meta_data").insert(
(meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"]))
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_metadata(connection, transaction_ids: list): def get_metadata(connection, transaction_ids: list):
_returned_data = [] _returned_data = []
space = connection.get_space("meta_data")
for _id in transaction_ids: for _id in transaction_ids:
metadata = space.select(_id, index="id_search").data metadata = connection.run(
connection.space("meta_data").select(_id, index="id_search")
).data
if len(metadata) > 0: if len(metadata) > 0:
_returned_data.append(metadata) _returned_data.append(metadata)
return _returned_data if len(_returned_data) > 0 else None return _returned_data if len(_returned_data) > 0 else None
@ -119,30 +122,32 @@ def get_metadata(connection, transaction_ids: list):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_asset(connection, asset): def store_asset(connection, asset):
space = connection.get_space("assets")
convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"])
try: try:
space.insert(convert(asset)) return connection.run(
except: # TODO Add Raise For Duplicate connection.space("assets").insert(convert(asset))
print("DUPLICATE ERROR") )
except tarantool.error.DatabaseError:
pass
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_assets(connection, assets: list): def store_assets(connection, assets: list):
space = connection.get_space("assets")
convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"]) convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"])
for asset in assets: for asset in assets:
try: try:
space.insert(convert(asset)) connection.run(
except Exception as ex: # TODO Raise ERROR for Duplicate connection.space("assets").insert(convert(asset))
print(f"EXCEPTION : {ex} ") )
except tarantool.error.DatabaseError:
pass
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_asset(connection, asset_id: str): def get_asset(connection, asset_id: str):
space = connection.get_space("assets") _data = connection.run(
_data = space.select(asset_id, index="txid_search") connection.space("assets").select(asset_id, index="txid_search")
_data = _data.data ).data
return _data[0][0] if len(_data) > 0 else [] return _data[0][0] if len(_data) > 0 else []
@ -157,25 +162,25 @@ def get_assets(connection, assets_ids: list) -> list:
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str): def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str):
space = connection.get_space("inputs") _inputs = connection.run(
_inputs = space.select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search") connection.space("inputs").select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search")
_inputs = _inputs.data ).data
_transactions = _group_transaction_by_ids(txids=[inp[0] for inp in _inputs], connection=connection) _transactions = _group_transaction_by_ids(txids=[inp[0] for inp in _inputs], connection=connection)
return _transactions return _transactions
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR
space = connection.get_space("blocks") _all_blocks = connection.run(
_all_blocks = space.select() connection.space("blocks").select()
_all_blocks = _all_blocks.data ).data
block = {"app_hash": '', "height": 0, "transactions": []} block = {"app_hash": '', "height": 0, "transactions": []}
if len(_all_blocks) > 0: if len(_all_blocks) > 0:
_block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0] _block = sorted(_all_blocks, key=itemgetter(1), reverse=True)[0]
space = connection.get_space("blocks_tx") _txids = connection.run(
_txids = space.select(_block[2], index="block_search") connection.space("blocks_tx").select(_block[2], index="block_search")
_txids = _txids.data ).data
block["app_hash"] = _block[0] block["app_hash"] = _block[0]
block["height"] = _block[1] block["height"] = _block[1]
block["transactions"] = [tx[0] for tx in _txids] block["transactions"] = [tx[0] for tx in _txids]
@ -186,14 +191,16 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_block(connection, block: dict): def store_block(connection, block: dict):
space = connection.get_space("blocks")
block_unique_id = token_hex(8) block_unique_id = token_hex(8)
space.insert((block["app_hash"], connection.run(
block["height"], connection.space("blocks").insert((block["app_hash"],
block_unique_id)) block["height"],
space = connection.get_space("blocks_tx") block_unique_id))
)
for txid in block["transactions"]: for txid in block["transactions"]:
space.insert((txid, block_unique_id)) connection.run(
connection.space("blocks_tx").insert((txid, block_unique_id))
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
@ -206,24 +213,30 @@ def get_txids_filtered(connection, asset_id: str, operation: str = None,
# 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR # 1 - operation, 2 - asset.id (linked mode) + OPERATOR OR
None: {"sets": [asset_id, asset_id]} None: {"sets": [asset_id, asset_id]}
}[operation] }[operation]
tx_space = connection.get_space("transactions")
assets_space = connection.get_space("assets")
_transactions = [] _transactions = []
if actions["sets"][0] == "CREATE": # + if actions["sets"][0] == "CREATE": # +
_transactions = tx_space.select([operation, asset_id], index=actions["index"]) _transactions = connection.run(
_transactions = _transactions.data connection.space("transactions").select([operation, asset_id], index=actions["index"])
).data
elif actions["sets"][0] == "TRANSFER": # + elif actions["sets"][0] == "TRANSFER": # +
_assets = assets_space.select([asset_id], index="only_asset_search").data _assets = connection.run(
connection.space("assets").select([asset_id], index="only_asset_search")
).data
for asset in _assets: for asset in _assets:
_txid = asset[1] _txid = asset[1]
_transactions = tx_space.select([operation, _txid], index=actions["index"]).data _transactions = connection.run(
connection.space("transactions").select([operation, _txid], index=actions["index"])
).data
if len(_transactions) != 0: if len(_transactions) != 0:
break break
else: else:
_tx_ids = tx_space.select([asset_id], index="id_search") _tx_ids = connection.run(
# _assets_ids = tx_space.select([asset_id], index="only_asset_search") connection.space("transactions").select([asset_id], index="id_search")
_assets_ids = assets_space.select([asset_id], index="only_asset_search") ).data
return tuple(set([sublist[1] for sublist in _assets_ids.data] + [sublist[0] for sublist in _tx_ids.data])) _assets_ids = connection.run(
connection.space("assets").select([asset_id], index="only_asset_search")
)
return tuple(set([sublist[1] for sublist in _assets_ids] + [sublist[0] for sublist in _tx_ids]))
if last_tx: if last_tx:
return tuple(next(iter(_transactions))) return tuple(next(iter(_transactions)))
@ -259,11 +272,12 @@ def _remove_text_score(asset):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_owned_ids(connection, owner: str): def get_owned_ids(connection, owner: str):
space = connection.get_space("keys") _keys = connection.run(
_keys = space.select(owner, index="keys_search") connection.space("keys").select(owner, index="keys_search")
if len(_keys.data) == 0: ).data
if len(_keys) == 0:
return [] return []
_transactionids = list(set([key[1] for key in _keys.data])) _transactionids = list(set([key[1] for key in _keys]))
_transactions = _group_transaction_by_ids(txids=_transactionids, connection=connection) _transactions = _group_transaction_by_ids(txids=_transactionids, connection=connection)
return _transactions return _transactions
@ -283,28 +297,29 @@ def get_spending_transactions(connection, inputs):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_block(connection, block_id=[]): def get_block(connection, block_id=[]):
space = connection.get_space("blocks") _block = connection.run(
_block = space.select(block_id, index="block_search", limit=1) connection.space("blocks").select(block_id, index="block_search", limit=1)
_block = _block.data ).data
if len(_block) == 0: if len(_block) == 0:
return [] return []
_block = _block[0] _block = _block[0]
space = connection.get_space("blocks_tx") _txblock = connection.run(
_txblock = space.select(_block[2], index="block_search") connection.space("blocks_tx").select(_block[2], index="block_search")
_txblock = _txblock.data ).data
return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]} return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]}
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_block_with_transaction(connection, txid: str): def get_block_with_transaction(connection, txid: str):
space = connection.get_space("blocks_tx") _all_blocks_tx = connection.run(
_all_blocks_tx = space.select(txid, index="id_search") connection.space("blocks_tx").select(txid, index="id_search")
_all_blocks_tx = _all_blocks_tx.data ).data
if len(_all_blocks_tx) == 0: if len(_all_blocks_tx) == 0:
return [] return []
space = connection.get_space("blocks") _block = connection.run(
_block = space.select(_all_blocks_tx[0][1], index="block_id_search") connection.space("blocks").select(_all_blocks_tx[0][1], index="block_id_search")
return [{"height": _height[1]} for _height in _block.data] ).data
return [{"height": _height[1]} for _height in _block]
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
@ -312,75 +327,77 @@ def delete_transactions(connection, txn_ids: list):
tx_space = connection.get_space("transactions") tx_space = connection.get_space("transactions")
for _id in txn_ids: for _id in txn_ids:
tx_space.delete(_id) tx_space.delete(_id)
inputs_space = connection.get_space("inputs")
outputs_space = connection.get_space("outputs")
k_space = connection.get_space("keys")
for _id in txn_ids: for _id in txn_ids:
_inputs = inputs_space.select(_id, index="id_search") _inputs = connection.run(connection.space("inputs").select(_id, index="id_search"))
_outputs = outputs_space.select(_id, index="id_search") _outputs = connection.run(connection.space("outputs").select(_id, index="id_search"))
_keys = k_space.select(_id, index="txid_search") _keys = connection.run(connection.space("keys").select(_id, index="txid_search"))
for _kID in _keys: for _kID in _keys:
k_space.delete(_kID[0], index="id_search") connection.run(connection.space("keys").delete(_kID[0], index="id_search"))
for _inpID in _inputs: for _inpID in _inputs:
inputs_space.delete(_inpID[5], index="delete_search") connection.run(connection.space("inputs").delete(_inpID[5], index="delete_search"))
for _outpID in _outputs: for _outpID in _outputs:
outputs_space.delete(_outpID[5], index="unique_search") connection.run(connection.space("outputs").delete(_outpID[5], index="unique_search"))
meta_space = connection.get_space("meta_data")
for _id in txn_ids: for _id in txn_ids:
meta_space.delete(_id, index="id_search") connection.run(connection.space("meta_data").delete(_id, index="id_search"))
assets_space = connection.get_space("assets")
for _id in txn_ids: for _id in txn_ids:
assets_space.delete(_id, index="txid_search") connection.run(connection.space("assets").delete(_id, index="txid_search"))
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_unspent_outputs(connection, *unspent_outputs: list): def store_unspent_outputs(connection, *unspent_outputs: list):
space = connection.get_space('utxos')
result = [] result = []
if unspent_outputs: if unspent_outputs:
for utxo in unspent_outputs: for utxo in unspent_outputs:
output = space.insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo))) output = connection.run(
result.append(output.data) connection.space("utxos").insert((utxo['transaction_id'], utxo['output_index'], dumps(utxo)))
).data
result.append(output)
return result return result
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def delete_unspent_outputs(connection, *unspent_outputs: list): def delete_unspent_outputs(connection, *unspent_outputs: list):
space = connection.get_space('utxos')
result = [] result = []
if unspent_outputs: if unspent_outputs:
for utxo in unspent_outputs: for utxo in unspent_outputs:
output = space.delete((utxo['transaction_id'], utxo['output_index'])) output = connection.run(
result.append(output.data) connection.space("utxos").delete((utxo['transaction_id'], utxo['output_index']))
).data
result.append(output)
return result return result
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'. def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'.
space = connection.get_space('utxos') _utxos = connection.run(
_utxos = space.select([]).data connection.space("utxos").select([])
).data
return [loads(utx[2]) for utx in _utxos] return [loads(utx[2]) for utx in _utxos]
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_pre_commit_state(connection, state: dict): def store_pre_commit_state(connection, state: dict):
space = connection.get_space("pre_commits") _precommit = connection.run(
_precommit = space.select(state["height"], index="height_search", limit=1) connection.space("pre_commits").select(state["height"], index="height_search", limit=1)
)
unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0] unique_id = token_hex(8) if (len(_precommit.data) == 0) else _precommit.data[0][0]
space.upsert((unique_id, state["height"], state["transactions"]), connection.run(
op_list=[('=', 0, unique_id), connection.space("pre_commits").upsert((unique_id, state["height"], state["transactions"]),
('=', 1, state["height"]), op_list=[('=', 0, unique_id),
('=', 2, state["transactions"])], ('=', 1, state["height"]),
limit=1) ('=', 2, state["transactions"])],
limit=1)
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_pre_commit_state(connection): def get_pre_commit_state(connection):
space = connection.get_space("pre_commits") _commit = connection.run(
_commit = space.select([], index="id_search").data connection.space("pre_commits").select([], index="id_search")
if len(_commit) == 0: )
if _commit is None or len(_commit) == 0:
return None return None
_commit = sorted(_commit, key=itemgetter(1), reverse=True)[0] _commit = sorted(_commit, key=itemgetter(1), reverse=True)[0]
return {"height": _commit[1], "transactions": _commit[2]} return {"height": _commit[1], "transactions": _commit[2]}
@ -388,56 +405,67 @@ def get_pre_commit_state(connection):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_validator_set(conn, validators_update: dict): def store_validator_set(conn, validators_update: dict):
space = conn.get_space("validators") _validator = conn.run(
_validator = space.select(validators_update["height"], index="height_search", limit=1) conn.space("validators").select(validators_update["height"], index="height_search", limit=1)
)
unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0] unique_id = token_hex(8) if (len(_validator.data) == 0) else _validator.data[0][0]
space.upsert((unique_id, validators_update["height"], validators_update["validators"]), conn.run(
op_list=[('=', 0, unique_id), conn.space("validators").upsert((unique_id, validators_update["height"], validators_update["validators"]),
('=', 1, validators_update["height"]), op_list=[('=', 0, unique_id),
('=', 2, validators_update["validators"])], ('=', 1, validators_update["height"]),
limit=1) ('=', 2, validators_update["validators"])],
limit=1)
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def delete_validator_set(connection, height: int): def delete_validator_set(connection, height: int):
space = connection.get_space("validators") _validators = connection.run(
_validators = space.select(height, index="height_search") connection.space("validators").select(height, index="height_search")
)
for _valid in _validators.data: for _valid in _validators.data:
space.delete(_valid[0]) connection.run(
connection.space("validators").delete(_valid[0])
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_election(connection, election_id: str, height: int, is_concluded: bool): def store_election(connection, election_id: str, height: int, is_concluded: bool):
space = connection.get_space("elections") connection.run(
space.upsert((election_id, height, is_concluded), connection.space("elections").upsert((election_id, height, is_concluded),
op_list=[('=', 0, election_id), op_list=[('=', 0, election_id),
('=', 1, height), ('=', 1, height),
('=', 2, is_concluded)], ('=', 2, is_concluded)],
limit=1) limit=1)
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_elections(connection, elections: list): def store_elections(connection, elections: list):
space = connection.get_space("elections")
for election in elections: for election in elections:
_election = space.insert((election["election_id"], _election = connection.run(
election["height"], connection.space("elections").insert((election["election_id"],
election["is_concluded"])) election["height"],
election["is_concluded"]))
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def delete_elections(connection, height: int): def delete_elections(connection, height: int):
space = connection.space("elections") _elections = connection.run(
_elections = space.select(height, index="height_search") connection.space("elections").select(height, index="height_search")
for _elec in _elections.data: ).data
space.delete(_elec[0]) for _elec in _elections:
connection.run(
connection.space("elections").delete(_elec[0])
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_validator_set(connection, height: int = None): def get_validator_set(connection, height: int = None):
space = connection.space("validators") _validators = connection.run(
_validators = space.select() connection.space("validators").select()
_validators = _validators.data ).data
if height is not None: if height is not None:
_validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if _validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators if
validator[1] <= height] validator[1] <= height]
@ -449,9 +477,9 @@ def get_validator_set(connection, height: int = None):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_election(connection, election_id: str): def get_election(connection, election_id: str):
space = connection.space("elections") _elections = connection.run(
_elections = space.select(election_id, index="id_search") connection.space("elections").select(election_id, index="id_search")
_elections = _elections.data ).data
if len(_elections) == 0: if len(_elections) == 0:
return None return None
_election = sorted(_elections, key=itemgetter(0), reverse=True)[0] _election = sorted(_elections, key=itemgetter(0), reverse=True)[0]
@ -460,38 +488,40 @@ def get_election(connection, election_id: str):
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str): def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str):
space = connection.space("keys") _transactions = connection.run(
# _keys = space.select([public_key], index="keys_search") connection.space("assets").select([asset_id], index="assetid_search")
space = connection.space("assets") ).data
_transactions = space.select([asset_id], index="assetid_search")
# _transactions = _transactions
# _keys = _keys.data
_grouped_transactions = _group_transaction_by_ids(connection=connection, txids=[_tx[1] for _tx in _transactions]) _grouped_transactions = _group_transaction_by_ids(connection=connection, txids=[_tx[1] for _tx in _transactions])
return _grouped_transactions return _grouped_transactions
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True): def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True):
space = connection.space("abci_chains") connection.run(
space.upsert((height, is_synced, chain_id), connection.space("abci_chains").upsert((height, is_synced, chain_id),
op_list=[('=', 0, height), op_list=[('=', 0, height),
('=', 1, is_synced), ('=', 1, is_synced),
('=', 2, chain_id)], ('=', 2, chain_id)],
limit=1) limit=1)
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def delete_abci_chain(connection, height: int): def delete_abci_chain(connection, height: int):
space = connection.space("abci_chains") _chains = connection.run(
_chains = space.select(height, index="height_search") connection.space("abci_chains").select(height, index="height_search")
for _chain in _chains.data: ).data
space.delete(_chain[2]) for _chain in _chains:
connection.run(
connection.space("abci_chains").delete(_chain[2])
)
@register_query(TarantoolDBConnection) @register_query(TarantoolDBConnection)
def get_latest_abci_chain(connection): def get_latest_abci_chain(connection):
space = connection.space("abci_chains") _all_chains = connection.run(
_all_chains = space.select().data connection.space("abci_chains").select()
).data
if len(_all_chains) == 0: if len(_all_chains) == 0:
return None return None
_chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0] _chain = sorted(_all_chains, key=itemgetter(0), reverse=True)[0]