test are passing without lazy

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-02-28 15:48:00 +01:00
parent 6d2187915a
commit 5875338c3a
No known key found for this signature in database
18 changed files with 688 additions and 103 deletions

View File

@ -11,7 +11,7 @@ from transactions.common.exceptions import ConfigurationError
from planetmint.config import Config
BACKENDS = {
"tarantool_db": "planetmint.backend.tarantool.connection.TarantoolDBConnection",
"tarantool_db": "planetmint.backend.tarantool.sync_io.connection.TarantoolDBConnection",
"localmongodb": "planetmint.backend.localmongodb.connection.LocalMongoDBConnection",
}

View File

@ -1,5 +1,2 @@
# Register the single dispatched modules on import.
from planetmint.backend.tarantool import query, connection, schema # noqa
# MongoDBConnection should always be accessed via
# ``planetmint.backend.connect()``.
from planetmint.backend.tarantool.sync_io import connection, query, schema

View File

@ -0,0 +1,82 @@
# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import asyncio
import asynctnt
from planetmint.config import Config
from transactions.common.exceptions import ConfigurationError
from planetmint.utils import Lazy
from planetmint.backend.connection import DBConnection
from planetmint.backend.exceptions import ConnectionError
logger = logging.getLogger(__name__)
class TarantoolDBConnection(DBConnection):
def __init__(
self,
host: str = None,
port: int = None,
login: str = None,
password: str = None,
**kwargs,
):
try:
super().__init__(host=host, port=port, login=login, password=password, **kwargs)
dbconf = Config().get()["database"]
self.init_path = dbconf["init_config"]["absolute_path"]
self.drop_path = dbconf["drop_config"]["absolute_path"]
self.__conn = None
self.connect()
self.SPACE_NAMES = [
"abci_chains",
"blocks",
"elections",
"pre_commits",
"validator_sets",
"transactions",
"outputs",
]
except tarantool.error.NetworkError as network_err:
logger.info("Host cant be reached")
raise ConnectionError
except ConfigurationError:
logger.info("Exception in _connect(): {}")
raise ConfigurationError
def query(self):
return Lazy()
def _file_content_to_bytes(self, path):
with open(path, "r") as f:
execute = f.readlines()
f.close()
return "".join(execute).encode()
async def connect(self):
if not self.__conn:
self.__conn = asynctnt.Connection(host='127.0.0.1', port=3301)
await self.__conn.connect()
return self.__conn
async def close(self):
try:
if self.__conn:
await self.__conn.disconnect()
self.__conn = None
except Exception as exc:
logger.info("Exception in planetmint.backend.tarantool.close(): {}".format(exc))
raise ConnectionError(str(exc)) from exc
async def run(self, query, only_data=True):
try:
conn = self.connect()
return conn.query.run(conn).data if only_data else query.run(conn)
except tarantool.error.OperationalError as op_error:
raise op_error
except tarantool.error.NetworkError as net_error:
raise net_error

View File

@ -4,7 +4,6 @@
# Code is Apache-2.0 and docs are CC-BY-4.0
"""Query implementation for Tarantool"""
import json
import logging
from uuid import uuid4
from operator import itemgetter
@ -16,8 +15,6 @@ from planetmint.backend.models.dbtransaction import DbTransaction
from planetmint.backend.exceptions import OperationDataInsertionError
from planetmint.exceptions import CriticalDoubleSpend
from planetmint.backend.tarantool.const import (
TARANT_TABLE_META_DATA,
TARANT_TABLE_ASSETS,
TARANT_TABLE_TRANSACTION,
TARANT_TABLE_OUTPUT,
TARANT_TABLE_SCRIPT,
@ -35,7 +32,7 @@ from planetmint.backend.tarantool.const import (
)
from planetmint.backend.utils import module_dispatch_registrar
from planetmint.backend.models import Asset, Block, Output
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
from transactions.common.transaction import Transaction

View File

@ -6,6 +6,7 @@
import logging
import tarantool
from planetmint.config import Config
from transactions.common.exceptions import ConfigurationError
from planetmint.utils import Lazy
@ -75,64 +76,19 @@ class TarantoolDBConnection(DBConnection):
return self.connect().space(space_name)
def space(self, space_name: str):
return self.query().space(space_name)
def exec(self, query, only_data=True):
try:
conn = self.connect()
conn.execute(query) if only_data else conn.execute(query)
except tarantool.error.OperationalError as op_error:
raise op_error
except tarantool.error.NetworkError as net_error:
raise net_error
def run(self, query, only_data=True):
try:
conn = self.connect()
return query.run(conn).data if only_data else query.run(conn)
except tarantool.error.OperationalError as op_error:
raise op_error
except tarantool.error.NetworkError as net_error:
raise net_error
return self.get_space(space_name)
def drop_database(self):
self.connect().call("drop")
# def run(self, query, only_data=True):
# try:
# conn = self.connect()
# return query.run(conn).data if only_data else query.run(conn)
# except tarantool.error.OperationalError as op_error:
# raise op_error
# except tarantool.error.NetworkError as net_error:
# raise net_error
def init_database(self):
self.connect().call("init")
def run_command(self, command: str, config: dict):
from subprocess import run
try:
self.close()
except ConnectionError:
pass
print(f" commands: {command}")
host_port = "%s:%s" % (self.host, self.port)
execute_cmd = self._file_content_to_bytes(path=command)
output = run(
["tarantoolctl", "connect", host_port],
input=execute_cmd,
capture_output=True,
).stderr
output = output.decode()
return output
def run_command_with_output(self, command: str):
from subprocess import run
try:
self.close()
except ConnectionError:
pass
host_port = "%s:%s" % (
Config().get()["database"]["host"],
Config().get()["database"]["port"],
)
output = run(["tarantoolctl", "connect", host_port], input=command, capture_output=True)
if output.returncode != 0:
raise Exception(f"Error while trying to execute cmd {command} on host:port {host_port}: {output.stderr}")
return output.stdout

View File

@ -0,0 +1,521 @@
# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
"""Query implementation for Tarantool"""
import logging
from uuid import uuid4
from operator import itemgetter
from typing import Union
from planetmint.backend import query
from planetmint.backend.models.dbtransaction import DbTransaction
from planetmint.backend.exceptions import OperationDataInsertionError
from planetmint.exceptions import CriticalDoubleSpend
from planetmint.backend.tarantool.const import (
TARANT_TABLE_TRANSACTION,
TARANT_TABLE_OUTPUT,
TARANT_TABLE_SCRIPT,
TARANT_TX_ID_SEARCH,
TARANT_ID_SEARCH,
TARANT_INDEX_TX_BY_ASSET_ID,
TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX,
TARANT_TABLE_GOVERNANCE,
TARANT_TABLE_ABCI_CHAINS,
TARANT_TABLE_BLOCKS,
TARANT_TABLE_VALIDATOR_SETS,
TARANT_TABLE_UTXOS,
TARANT_TABLE_PRE_COMMITS,
TARANT_TABLE_ELECTIONS,
)
from planetmint.backend.utils import module_dispatch_registrar
from planetmint.backend.models import Asset, Block, Output
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
from transactions.common.transaction import Transaction
#from planetmint.backend.tarantool.sync_io import catch_db_exception
logger = logging.getLogger(__name__)
register_query = module_dispatch_registrar(query)
from tarantool.error import OperationalError, NetworkError
from functools import wraps
def catch_db_exception(function_to_decorate):
@wraps(function_to_decorate)
def wrapper(*args, **kw):
try:
output = function_to_decorate(*args, **kw)
except OperationalError as op_error:
raise op_error
except NetworkError as net_error:
raise net_error
return output
return wrapper
@register_query(TarantoolDBConnection)
def get_complete_transactions_by_ids(connection, txids: list) -> list[DbTransaction]:
_transactions = []
for txid in txids:
tx = get_transaction_by_id(connection, txid, TARANT_TABLE_TRANSACTION)
if tx is None:
tx = get_transaction_by_id(connection, txid, TARANT_TABLE_GOVERNANCE)
if tx is None:
continue
outputs = get_outputs_by_tx_id(connection, txid)
tx.outputs = outputs
_transactions.append(tx)
return _transactions
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_outputs_by_tx_id(connection, tx_id: str) -> list[Output]:
_outputs = connection.space(TARANT_TABLE_OUTPUT).select(tx_id, index=TARANT_TX_ID_SEARCH).data
_sorted_outputs = sorted(_outputs, key=itemgetter(4))
return [Output.from_tuple(output) for output in _sorted_outputs]
@register_query(TarantoolDBConnection)
def get_transaction(connection, tx_id: str) -> Union[DbTransaction, None]:
transactions = get_complete_transactions_by_ids(connection, (tx_id))
if len(transactions) > 1 or len(transactions) == 0:
return None
else:
return transactions[0]
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transactions_by_asset(connection, asset: str, limit: int = 1000) -> list[DbTransaction]:
txs = connection.space(TARANT_TABLE_TRANSACTION).select(asset, limit=limit, index="transactions_by_asset_cid").data
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transactions_by_metadata(connection, metadata: str, limit: int = 1000) -> list[DbTransaction]:
txs = connection.space(TARANT_TABLE_TRANSACTION).select(metadata, limit=limit, index="transactions_by_metadata_cid").data
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@catch_db_exception
def store_transaction_outputs(connection, output: Output, index: int) -> str:
output_id = uuid4().hex
try:
connection.space(TARANT_TABLE_OUTPUT).insert(
(
output_id,
int(output.amount),
output.public_keys,
output.condition.to_dict(),
index,
output.transaction_id,
)
).data
return output_id
# TODO handle excpection as before
except Exception as e:
logger.info(f"Could not insert Output: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
def store_transactions(connection, signed_transactions: list, table=TARANT_TABLE_TRANSACTION):
for transaction in signed_transactions:
store_transaction(connection, transaction, table)
[
store_transaction_outputs(connection, Output.outputs_dict(output, transaction["id"]), index)
for index, output in enumerate(transaction[TARANT_TABLE_OUTPUT])
]
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_transaction(connection, transaction, table=TARANT_TABLE_TRANSACTION):
scripts = None
if TARANT_TABLE_SCRIPT in transaction:
scripts = transaction[TARANT_TABLE_SCRIPT]
asset_obj = Transaction.get_assets_tag(transaction["version"])
if transaction["version"] == "2.0":
asset_array = [transaction[asset_obj]]
else:
asset_array = transaction[asset_obj]
tx = (
transaction["id"],
transaction["operation"],
transaction["version"],
transaction["metadata"],
asset_array,
transaction["inputs"],
scripts,
)
try:
connection.space(table).insert(tx)
# TODO handle excpection as before
except Exception as e:
logger.info(f"Could not insert transactions: {e}")
if e.args[0] == 3 and e.args[1].startswith("Duplicate key exists in"):
raise CriticalDoubleSpend()
else:
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transaction_by_id(connection, transaction_id, table=TARANT_TABLE_TRANSACTION):
txs = connection.space(table).select(transaction_id, index=TARANT_ID_SEARCH)
if len(txs) == 0:
return None
return DbTransaction.from_tuple(txs[0])
@register_query(TarantoolDBConnection)
def get_transaction_single(connection, transaction_id) -> Union[DbTransaction, None]:
txs = get_complete_transactions_by_ids(txids=[transaction_id], connection=connection)
return txs[0] if len(txs) == 1 else None
@register_query(TarantoolDBConnection)
def get_transactions(connection, transactions_ids: list) -> list[DbTransaction]:
return get_complete_transactions_by_ids(txids=transactions_ids, connection=connection)
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset(connection, asset_id: str) -> Asset:
connection.space(TARANT_TABLE_TRANSACTION).select(asset_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
return Asset.from_dict(_data[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_assets(connection, assets_ids: list) -> list[Asset]:
_returned_data = []
for _id in list(set(assets_ids)):
res = connection.space(TARANT_TABLE_TRANSACTION).select(_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
if len(res) == 0:
continue
_returned_data.append(res[0])
sorted_assets = sorted(_returned_data, key=lambda k: k[1], reverse=False)
return [Asset.from_dict(asset) for asset in sorted_assets]
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str) -> list[DbTransaction]:
_inputs = connection.space(TARANT_TABLE_TRANSACTION).select(
[fullfil_transaction_id, fullfil_output_index], index=TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX
).data
return get_complete_transactions_by_ids(txids=[inp[0] for inp in _inputs], connection=connection)
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_latest_block(connection) -> Union[dict, None]:
blocks = connection.space(TARANT_TABLE_BLOCKS).select().data
if not blocks:
return None
blocks = sorted(blocks, key=itemgetter(2), reverse=True)
latest_block = Block.from_tuple(blocks[0])
return latest_block.to_dict()
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_block(connection, block: dict):
block_unique_id = uuid4().hex
try:
connection.space(TARANT_TABLE_BLOCKS).insert(
(block_unique_id, block["app_hash"], block["height"], block[TARANT_TABLE_TRANSACTION])
)
# TODO fix exception handling to be as before
except Exception as e:
logger.info(f"Could not insert block: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_txids_filtered(connection, asset_ids: list[str], operation: str = "", last_tx: bool = False) -> list[str]:
transactions = []
if operation == "CREATE":
transactions = connection.space(TARANT_TABLE_TRANSACTION).select(
[asset_ids[0], operation], index="transactions_by_id_and_operation"
).data
elif operation == "TRANSFER":
transactions = connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
else:
txs = connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_ID_SEARCH).data
asset_txs = connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
transactions = txs + asset_txs
ids = tuple([tx[0] for tx in transactions])
# NOTE: check when and where this is used and remove if not
if last_tx:
return ids[0]
return ids
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_owned_ids(connection, owner: str) -> list[DbTransaction]:
outputs = connection.space(TARANT_TABLE_OUTPUT).select(owner, index="public_keys").data
if len(outputs) == 0:
return []
txids = [output[5] for output in outputs]
unique_set_txids = set(txids)
return get_complete_transactions_by_ids(connection, unique_set_txids)
@register_query(TarantoolDBConnection)
def get_spending_transactions(connection, inputs):
_transactions = []
for inp in inputs:
_trans_list = get_spent(
fullfil_transaction_id=inp["transaction_id"],
fullfil_output_index=inp["output_index"],
connection=connection,
)
_transactions.extend(_trans_list)
return _transactions
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_block(connection, block_id=None) -> Union[dict, None]:
_block = connection.space(TARANT_TABLE_BLOCKS).select(block_id, index="height", limit=1).data
if len(_block) == 0:
return
_block = Block.from_tuple(_block[0])
return _block.to_dict()
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_block_with_transaction(connection, txid: str) -> Union[dict, None]:
_block = connection.space(TARANT_TABLE_BLOCKS).select(txid, index="block_by_transaction_id").data
if len(_block) == 0:
return
_block = Block.from_tuple(_block[0])
return _block.to_dict()
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_transactions(connection, txn_ids: list):
try:
for _id in txn_ids:
_outputs = get_outputs_by_tx_id(connection, _id)
for x in range(len(_outputs)):
connection.connect().call("delete_output", (_outputs[x].id))
for _id in txn_ids:
connection.space(TARANT_TABLE_TRANSACTION).delete(_id)
connection.space(TARANT_TABLE_GOVERNANCE).delete(_id)
# TODO handle exceptions as before
except Exception as e:
logger.info(f"Could not insert unspent output: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_unspent_outputs(connection, *unspent_outputs: list):
result = []
if unspent_outputs:
for utxo in unspent_outputs:
try:
output = connection.space(TARANT_TABLE_UTXOS).insert(
(uuid4().hex, utxo["transaction_id"], utxo["output_index"], utxo)
).data
result.append(output)
except Exception as e:
logger.info(f"Could not insert unspent output: {e}")
raise OperationDataInsertionError()
return result
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_unspent_outputs(connection, *unspent_outputs: list):
result = []
if unspent_outputs:
for utxo in unspent_outputs:
output = connection.space(TARANT_TABLE_UTXOS).delete(
(utxo["transaction_id"], utxo["output_index"]),
index="utxo_by_transaction_id_and_output_index"
).data
result.append(output)
return result
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'.
_utxos = connection.space(TARANT_TABLE_UTXOS).select([]).data
return [utx[3] for utx in _utxos]
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_pre_commit_state(connection, state: dict):
_precommit = connection.space(TARANT_TABLE_PRE_COMMITS).select([], limit=1).data
_precommitTuple = (
(uuid4().hex, state["height"], state[TARANT_TABLE_TRANSACTION])
if _precommit is None or len(_precommit) == 0
else _precommit[0]
)
try:
connection.space(TARANT_TABLE_PRE_COMMITS).upsert(
_precommitTuple,
op_list=[("=", 1, state["height"]), ("=", 2, state[TARANT_TABLE_TRANSACTION])],
limit=1,
)
# TODO handle exceptions as before
except Exception as e:
logger.info(f"Could not insert pre commit state: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_pre_commit_state(connection) -> dict:
_commit = connection.space(TARANT_TABLE_PRE_COMMITS).select([], index=TARANT_ID_SEARCH).data
if _commit is None or len(_commit) == 0:
return None
_commit = sorted(_commit, key=itemgetter(1), reverse=False)[0]
return {"height": _commit[1], TARANT_TABLE_TRANSACTION: _commit[2]}
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_validator_set(conn, validators_update: dict):
_validator = conn.space(TARANT_TABLE_VALIDATOR_SETS).select(validators_update["height"], index="height", limit=1).data
unique_id = uuid4().hex if _validator is None or len(_validator) == 0 else _validator[0][0]
try:
conn.space(TARANT_TABLE_VALIDATOR_SETS).upsert(
(unique_id, validators_update["height"], validators_update["validators"]),
op_list=[("=", 1, validators_update["height"]), ("=", 2, validators_update["validators"])],
limit=1,
)
except Exception as e:
logger.info(f"Could not insert validator set: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_validator_set(connection, height: int):
_validators = connection.space(TARANT_TABLE_VALIDATOR_SETS).select(height, index="height").data
for _valid in _validators:
connection.space(TARANT_TABLE_VALIDATOR_SETS).delete(_valid[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_election(connection, election_id: str, height: int, is_concluded: bool):
try:
connection.space(TARANT_TABLE_ELECTIONS).upsert(
(election_id, height, is_concluded), op_list=[("=", 1, height), ("=", 2, is_concluded)], limit=1
)
# TODO handle excpetions as before
except Exception as e:
logger.info(f"Could not insert election: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_elections(connection, elections: list):
try:
for election in elections:
_election = connection.space(TARANT_TABLE_ELECTIONS).insert(
(election["election_id"], election["height"], election["is_concluded"])
)
# TODO handle exceptionsa as before
except Exception as e:
logger.info(f"Could not insert elections: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_elections(connection, height: int):
_elections = connection.space(TARANT_TABLE_ELECTIONS).select(height, index="height").data
for _elec in _elections:
connection.space(TARANT_TABLE_ELECTIONS).delete(_elec[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_validator_set(connection, height: int = None):
_validators = connection.space(TARANT_TABLE_VALIDATOR_SETS).select().data
if height is not None and _validators is not None:
_validators = [
{"height": validator[1], "validators": validator[2]} for validator in _validators if validator[1] <= height
]
return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None)
elif _validators is not None:
_validators = [{"height": validator[1], "validators": validator[2]} for validator in _validators]
return next(iter(sorted(_validators, key=lambda k: k["height"], reverse=True)), None)
return None
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_election(connection, election_id: str) -> dict:
_elections = connection.space(TARANT_TABLE_ELECTIONS).select(election_id, index=TARANT_ID_SEARCH).data
if _elections is None or len(_elections) == 0:
return None
_election = sorted(_elections, key=itemgetter(0), reverse=True)[0]
return {"election_id": _election[0], "height": _election[1], "is_concluded": _election[2]}
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str) -> list[DbTransaction]:
id_transactions = connection.space(TARANT_TABLE_GOVERNANCE).select([asset_id]).data
asset_id_transactions = connection.space(TARANT_TABLE_GOVERNANCE).select([asset_id], index="governance_by_asset_id").data
transactions = id_transactions + asset_id_transactions
return get_complete_transactions_by_ids(connection, [_tx[0] for _tx in transactions])
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True):
try:
connection.space(TARANT_TABLE_ABCI_CHAINS).upsert(
(chain_id, height, is_synced),
op_list=[("=", 0, chain_id), ("=", 1, height), ("=", 2, is_synced)],
)
# TODO handle exceptions as before
except Exception as e:
logger.info(f"Could not insert abci-chain: {e}")
raise OperationDataInsertionError()
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_abci_chain(connection, height: int):
chains = connection.space(TARANT_TABLE_ABCI_CHAINS).select(height, index="height")
connection.space(TARANT_TABLE_ABCI_CHAINS).delete(chains[0][0], index="id")
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_latest_abci_chain(connection) -> Union[dict, None]:
_all_chains = connection.space(TARANT_TABLE_ABCI_CHAINS).select().data
if _all_chains is None or len(_all_chains) == 0:
return None
_chain = sorted(_all_chains, key=itemgetter(1), reverse=True)[0]
return {"chain_id": _chain[0], "height": _chain[1], "is_synced": _chain[2]}

View File

@ -3,7 +3,7 @@ import logging
from planetmint.config import Config
from planetmint.backend.utils import module_dispatch_registrar
from planetmint import backend
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
logger = logging.getLogger(__name__)
register_schema = module_dispatch_registrar(backend.schema)
@ -32,19 +32,6 @@ def create_database(connection, dbname):
logger.info("Create database `%s`.", dbname)
def run_command_with_output(command):
from subprocess import run
host_port = "%s:%s" % (
Config().get()["database"]["host"],
Config().get()["database"]["port"],
)
output = run(["tarantoolctl", "connect", host_port], input=command, capture_output=True)
if output.returncode != 0:
raise Exception(f"Error while trying to execute cmd {command} on host:port {host_port}: {output.stderr}")
return output.stdout
@register_schema(TarantoolDBConnection)
def create_tables(connection, dbname):
connection.connect().call("init")

53
poetry.lock generated
View File

@ -209,6 +209,55 @@ files = [
{file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"},
]
[[package]]
name = "asynctnt"
version = "2.0.1"
description = "A fast Tarantool Database connector for Python/asyncio."
category = "main"
optional = false
python-versions = "*"
files = [
{file = "asynctnt-2.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7c4b9e3c5cb557e4417b8845aca997441320788a0ea7d94d40692ca669239571"},
{file = "asynctnt-2.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:448959f8a1273bf7659c464c6159fccd426cbefabced9cfdc71e4cb48d518125"},
{file = "asynctnt-2.0.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c8851db8f629568e5249a1676a0b9e849a65185533a24308822b18d93601184a"},
{file = "asynctnt-2.0.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:47cd2d10c54a17fda502144bd9c77af83589ca25a1cfdfa0e59c1042aeb4df89"},
{file = "asynctnt-2.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2ed53209c29fe488bc42e362ccd45f1353da59338dea7bf77811ba41369f81ad"},
{file = "asynctnt-2.0.1-cp310-cp310-win32.whl", hash = "sha256:0bc9c09cf5e63a5ae82d4e335410f7375e9b9f5c89c44ba94202441811b2d0e3"},
{file = "asynctnt-2.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:eaca7eae2e0161bb16f775f4997e7b71aadf3b18deec8ddbe7a3d7e129cb3b2d"},
{file = "asynctnt-2.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:ef238ee5abf21c465d881ff94e8b5709637ce070ea84bc2071b5396320bd1eb9"},
{file = "asynctnt-2.0.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:15a2ab6be4b5cb12826a39ef57c027068d8c8ac8aee804913d288241236fc03d"},
{file = "asynctnt-2.0.1-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:18a7a9a6630336e97fa64401235e0647795c9b1633ce991638f1fa1e5fa8c97c"},
{file = "asynctnt-2.0.1-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:18086835b4db2aa6987097348156016854a43bfc6ed57de09f033393323afa7f"},
{file = "asynctnt-2.0.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:1bc9ec81543ea73e65c5849faa13763e054c6894b51b2fe0d05a9de075d11e48"},
{file = "asynctnt-2.0.1-cp36-cp36m-win32.whl", hash = "sha256:45bb81873916267d15d2d71600297a579bcf1698e4f75d5123b17de20e291c7b"},
{file = "asynctnt-2.0.1-cp36-cp36m-win_amd64.whl", hash = "sha256:400220c233561a494656bdf485319918cf0faf454d10b3be87ed652f93f63da2"},
{file = "asynctnt-2.0.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:4b1d9e1a389bc39ec304c00544a7ce865174079a559477eba68a19532a4313b7"},
{file = "asynctnt-2.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8c0dc3d0a7b9b2f12280abab0fe976984e1495407a1c7502e09d886ffeb124d4"},
{file = "asynctnt-2.0.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1b9e6dd3ec4a4396c2309393d828a2c5e7e9ae9b05e207219b3edd3ea691519b"},
{file = "asynctnt-2.0.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:61ac8b944b2a149947f07f8d5c9db10efd177deee1206c729039b8bb5e31fcb9"},
{file = "asynctnt-2.0.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:d1411d14fc31bdd7b9eb7b6e059a03fa373b8b6f6c7bd78d69a5488575182e65"},
{file = "asynctnt-2.0.1-cp37-cp37m-win32.whl", hash = "sha256:dd04c6bf05ddd78be50c8aee595aeeb4a7a49ab7ae84685567d2ba0cfcd0a606"},
{file = "asynctnt-2.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:bf6b94630b134c400daa1c7c1e2821afe7850e454c34795bdf11d2ff619e32fa"},
{file = "asynctnt-2.0.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d5956086983cda76a5a3534d1bbf076476f36de3deb77d74f25f5b067ca92752"},
{file = "asynctnt-2.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:631c6e68e9efd1a1992c154b53bb3d0aa3ae361c47535ee72538efd92d5d9209"},
{file = "asynctnt-2.0.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:602f5e860773c6640cd4260f588dc4659962d2d810e0c7d02e872c6440bbc6c8"},
{file = "asynctnt-2.0.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:d286d82d0f5c11ded09d9a067caf23c37a19906351ca8b9991d1e61fb6b31bf9"},
{file = "asynctnt-2.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:884897f42dc0f0e5eef9fd1a8a90e1ab811d1aaae3cdda2d6021125c7c5ddce9"},
{file = "asynctnt-2.0.1-cp38-cp38-win32.whl", hash = "sha256:98ee3194e834380e5cd79cc0b3fc07510c1023b81e1b341fe442fa94614a82f7"},
{file = "asynctnt-2.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:b95fc8b7c0be5086a65b4b629114bf214f66a0cf20599e7464ddbbb06720ecd6"},
{file = "asynctnt-2.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2da8dae75300cab35fedd8dbded6954bb82c05a596af9383d5a18b9d59d5fe47"},
{file = "asynctnt-2.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afba9b5f4a1cbf3ba16ca15a0e190697d3f7e70c93e43a4fe12f39f03274ac8e"},
{file = "asynctnt-2.0.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:871b3ac7bc20bb3b383b4244e46dbad3de684a8562bdcd6765733d47c9da51c7"},
{file = "asynctnt-2.0.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:a834c3334430dd2fa3c93ce45b2c35eef1312c9931860c25b379ea69da9b96ed"},
{file = "asynctnt-2.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8bf3d7e6ac447bf3504db837c5ec1ee9d8c275a0f9cb2319f9b5405bd228120b"},
{file = "asynctnt-2.0.1-cp39-cp39-win32.whl", hash = "sha256:6d77b0988b7c9cc15883442f85c23bb284986532a0ed9b9bf9ff710872133bfc"},
{file = "asynctnt-2.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:a743511b91f4217a05e158277148c8d9523a152343a54c58b5a950ae52661c58"},
{file = "asynctnt-2.0.1.tar.gz", hash = "sha256:b11efe38122e2b9a658aadc4a4ad1182cbf7100b5d7040206a71f9f107206ebc"},
]
[package.dependencies]
PyYAML = ">=5.0"
[[package]]
name = "attrs"
version = "22.2.0"
@ -2175,7 +2224,7 @@ cffi = ">=1.4.1"
six = "*"
[package.extras]
docs = ["sphinx (>=1.6.5)", "sphinx-rtd-theme"]
docs = ["sphinx (>=1.6.5)", "sphinx_rtd_theme"]
tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"]
[[package]]
@ -3308,4 +3357,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "ce9a30d4e4ed6d8bdd7c6cfe8f1908ffb787fc111e77fc1f709dbe8c4dba7fbb"
content-hash = "8e7e3423a56069acb929a5666dacf8cbd68434435e7638080f6dc030d371665a"

View File

@ -47,6 +47,7 @@ planetmint-ipld = ">=0.0.3"
pyasn1 = ">=0.4.8"
python-decouple = "^3.7"
planetmint-transactions = ">=0.7.0"
asynctnt = "^2.0.1"
[tool.poetry.group.dev.dependencies]
aafigure = "0.6"

View File

@ -10,7 +10,7 @@ pytestmark = pytest.mark.bdb
def test_get_txids_filtered(signed_create_tx, signed_transfer_tx, db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
# create and insert two blocks, one for the create and one for the
# transfer transaction
@ -36,7 +36,7 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx, db_conn):
def test_get_owned_ids(signed_create_tx, user_pk, db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
# insert a transaction
query.store_transactions(connection=db_conn, signed_transactions=[signed_create_tx.to_dict()])
@ -49,18 +49,18 @@ def test_get_owned_ids(signed_create_tx, user_pk, db_conn):
def test_store_block(db_conn):
from planetmint.abci.block import Block
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
block = Block(app_hash="random_utxo", height=3, transactions=[])
query.store_block(connection=db_conn, block=block._asdict())
# block = query.get_block(connection=db_conn)
blocks = db_conn.run(db_conn.space("blocks").select([]))
blocks = db_conn.space("blocks").select([]).data
assert len(blocks) == 1
def test_get_block(db_conn):
from planetmint.abci.block import Block
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
block = Block(app_hash="random_utxo", height=3, transactions=[])
@ -71,7 +71,7 @@ def test_get_block(db_conn):
def test_store_pre_commit_state(db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
state = dict(height=3, transactions=[])
@ -84,11 +84,11 @@ def test_store_pre_commit_state(db_conn):
def test_get_pre_commit_state(db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
all_pre = db_conn.run(db_conn.space("pre_commits").select([]))
all_pre = db_conn.space("pre_commits").select([]).data
for pre in all_pre:
db_conn.run(db_conn.space("pre_commits").delete(pre[0]), only_data=False)
db_conn.space("pre_commits").delete(pre[0])
# TODO First IN, First OUT
state = dict(height=3, transactions=[])
# db_context.conn.db.pre_commit.insert_one
@ -98,7 +98,7 @@ def test_get_pre_commit_state(db_conn):
def test_validator_update(db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
def gen_validator_update(height):
return {"validators": [], "height": height, "election_id": f"election_id_at_height_{height}"}
@ -160,7 +160,7 @@ def test_validator_update(db_conn):
],
)
def test_store_abci_chain(description, stores, expected, db_conn):
from planetmint.backend.tarantool import query
from planetmint.backend.tarantool.sync_io import query
for store in stores:
query.store_abci_chain(db_conn, **store)

View File

@ -8,7 +8,7 @@ import pytest
def test_get_connection_raises_a_configuration_error(monkeypatch):
from planetmint.backend.exceptions import ConnectionError
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
with pytest.raises(ConnectionError):
TarantoolDBConnection("localhost", "1337", "mydb", "password")
@ -17,7 +17,6 @@ def test_get_connection_raises_a_configuration_error(monkeypatch):
@pytest.mark.skip(reason="we currently do not suppport mongodb.")
def test_get_connection_raises_a_configuration_error_mongodb(monkeypatch):
from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
from transactions.common.exceptions import ConfigurationError
with pytest.raises(ConnectionError):
conn = LocalMongoDBConnection("localhost", "1337", "mydb", "password")

View File

@ -109,7 +109,7 @@ def test_bigchain_show_config(capsys):
def test__run_init(mocker):
init_db_mock = mocker.patch("planetmint.backend.tarantool.connection.TarantoolDBConnection.init_database")
init_db_mock = mocker.patch("planetmint.backend.tarantool.sync_io.connection.TarantoolDBConnection.init_database")
conn = Connection()
conn.init_database()

View File

@ -22,7 +22,7 @@ from logging import getLogger
from logging.config import dictConfig
from planetmint.backend.connection import Connection
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
from transactions.common import crypto
from transactions.common.transaction_mode_types import BROADCAST_TX_COMMIT
from planetmint.abci.utils import key_from_base64
@ -30,7 +30,6 @@ from planetmint.backend import schema, query
from transactions.common.crypto import key_pair_from_ed25519_key, public_key_from_ed25519_key
from planetmint.abci.block import Block
from planetmint.abci.rpc import MODE_LIST
from planetmint.model.models import Models
from tests.utils import gen_vote
from planetmint.config import Config
from transactions.types.elections.validator_election import ValidatorElection # noqa

View File

@ -3,7 +3,6 @@
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import random
import warnings
from unittest.mock import patch
import pytest
from base58 import b58decode
@ -14,7 +13,6 @@ from transactions.common.transaction import TransactionLink
from transactions.common.transaction import Transaction
from transactions.types.assets.create import Create
from transactions.types.assets.transfer import Transfer
from planetmint.model.fastquery import FastQuery
from planetmint.exceptions import CriticalDoubleSpend
pytestmark = pytest.mark.bdb
@ -48,10 +46,9 @@ class TestBigchainApi(object):
b.models.store_bulk_transactions([transfer_tx2])
def test_double_inclusion(self, b, alice):
from tarantool.error import DatabaseError
from planetmint.backend.exceptions import OperationError
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
tx = Create.generate([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key])

View File

@ -29,7 +29,7 @@ from tests.utils import delete_unspent_outputs, get_utxoset_merkle_root, store_u
def test_asset_is_separated_from_transaciton(b):
import copy
from transactions.common.crypto import generate_key_pair
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
if isinstance(b.models.connection, TarantoolDBConnection):
pytest.skip("This specific function is skipped because, assets are stored differently if using Tarantool")
@ -235,7 +235,7 @@ def test_store_zero_unspent_output(b):
@pytest.mark.bdb
def test_store_one_unspent_output(b, unspent_output_1, utxo_collection):
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
res = store_unspent_outputs(b.models.connection, unspent_output_1)
if not isinstance(b.models.connection, TarantoolDBConnection):

View File

@ -13,7 +13,7 @@ from functools import singledispatch
from planetmint import backend
from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
from planetmint.backend.tarantool.connection import TarantoolDBConnection
from planetmint.backend.tarantool.sync_io.connection import TarantoolDBConnection
from planetmint.backend.schema import TABLES
from transactions.common import crypto
from transactions.common.transaction_mode_types import BROADCAST_TX_COMMIT