* added new start options: --abci-only and --web-api-only to enable seperate execution of the services

* added exception handling to the abci app
* removed space() object usage and thereby halfed the amoutn of DB lookups

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-03-01 14:56:32 +01:00
parent 2d83d449cf
commit b2a76a6d17
No known key found for this signature in database
9 changed files with 228 additions and 296 deletions

View File

@ -23,10 +23,11 @@ from tendermint.abci.types_pb2 import (
)
from planetmint.application.validator import Validator
from planetmint.model.models import Models
from planetmint.abci.utils import decode_validator, decode_transaction, calculate_hash
from planetmint.abci.block import Block
from planetmint.ipc.events import EventTypes, Event
from planetmint.backend.exceptions import DBConcurrencyError
CodeTypeError = 1
logger = logging.getLogger(__name__)
@ -43,20 +44,18 @@ class ApplicationLogic(BaseApplication):
self,
validator: Validator = None,
events_queue=None,
models: Models = None,
):
# super().__init__(abci)
logger.debug("Checking values of types")
logger.debug(dir(types_pb2))
self.events_queue = events_queue
self.validator = validator if validator else Validator() # (async_io=True)
self.models = models or Models()
self.validator = validator if validator else Validator()
self.block_txn_ids = []
self.block_txn_hash = ""
self.block_transactions = []
self.validators = None
self.new_height = None
self.chain = self.models.get_latest_abci_chain()
self.chain = self.validator.models.get_latest_abci_chain()
def log_abci_migration_error(self, chain_id, validators):
logger.error(
@ -68,7 +67,7 @@ class ApplicationLogic(BaseApplication):
def abort_if_abci_chain_is_not_synced(self):
if self.chain is None or self.chain["is_synced"]:
return
validators = self.models.get_validators()
validators = self.validator.models.get_validators()
self.log_abci_migration_error(self.chain["chain_id"], validators)
sys.exit(1)
@ -76,37 +75,42 @@ class ApplicationLogic(BaseApplication):
"""Initialize chain upon genesis or a migration"""
app_hash = ""
height = 0
known_chain = self.models.get_latest_abci_chain()
if known_chain is not None:
chain_id = known_chain["chain_id"]
try:
known_chain = self.validator.models.get_latest_abci_chain()
if known_chain is not None:
chain_id = known_chain["chain_id"]
if known_chain["is_synced"]:
msg = f"Got invalid InitChain ABCI request ({genesis}) - " f"the chain {chain_id} is already synced."
logger.error(msg)
if known_chain["is_synced"]:
msg = f"Got invalid InitChain ABCI request ({genesis}) - " f"the chain {chain_id} is already synced."
logger.error(msg)
sys.exit(1)
if chain_id != genesis.chain_id:
validators = self.validator.models.get_validators()
self.log_abci_migration_error(chain_id, validators)
sys.exit(1)
# set migration values for app hash and height
block = self.validator.models.get_latest_block()
app_hash = "" if block is None else block["app_hash"]
height = 0 if block is None else block["height"] + 1
known_validators = self.validator.models.get_validators()
validator_set = [decode_validator(v) for v in genesis.validators]
if known_validators and known_validators != validator_set:
self.log_abci_migration_error(known_chain["chain_id"], known_validators)
sys.exit(1)
if chain_id != genesis.chain_id:
validators = self.models.get_validators()
self.log_abci_migration_error(chain_id, validators)
sys.exit(1)
# set migration values for app hash and height
block = self.models.get_latest_block()
app_hash = "" if block is None else block["app_hash"]
height = 0 if block is None else block["height"] + 1
known_validators = self.models.get_validators()
validator_set = [decode_validator(v) for v in genesis.validators]
if known_validators and known_validators != validator_set:
self.log_abci_migration_error(known_chain["chain_id"], known_validators)
block = Block(app_hash=app_hash, height=height, transactions=[])
self.validator.models.store_block(block._asdict())
self.validator.models.store_validator_set(height + 1, validator_set)
abci_chain_height = 0 if known_chain is None else known_chain["height"]
self.validator.models.store_abci_chain(abci_chain_height, genesis.chain_id, True)
self.chain = {
"height": abci_chain_height,
"is_synced": True,
"chain_id": genesis.chain_id,
}
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
block = Block(app_hash=app_hash, height=height, transactions=[])
self.models.store_block(block._asdict())
self.models.store_validator_set(height + 1, validator_set)
abci_chain_height = 0 if known_chain is None else known_chain["height"]
self.models.store_abci_chain(abci_chain_height, genesis.chain_id, True)
self.chain = {
"height": abci_chain_height,
"is_synced": True,
"chain_id": genesis.chain_id,
}
return ResponseInitChain()
def info(self, request):
@ -123,7 +127,13 @@ class ApplicationLogic(BaseApplication):
# logger.info(f"Tendermint version: {request.version}")
r = ResponseInfo()
block = self.models.get_latest_block()
block= None
try:
block = self.validator.models.get_latest_block()
except DBConcurrencyError:
block= None
except ValueError:
block=None
if block:
chain_shift = 0 if self.chain is None else self.chain["height"]
r.last_block_height = block["height"] - chain_shift
@ -145,12 +155,18 @@ class ApplicationLogic(BaseApplication):
logger.debug("check_tx: %s", raw_transaction)
transaction = decode_transaction(raw_transaction)
if self.validator.is_valid_transaction(transaction):
logger.debug("check_tx: VALID")
return ResponseCheckTx(code=OkCode)
else:
logger.debug("check_tx: INVALID")
return ResponseCheckTx(code=CodeTypeError)
try:
if self.validator.is_valid_transaction(transaction):
logger.debug("check_tx: VALID")
return ResponseCheckTx(code=OkCode)
else:
logger.debug("check_tx: INVALID")
return ResponseCheckTx(code=CodeTypeError)
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
def begin_block(self, req_begin_block):
"""Initialize list of transaction.
@ -178,8 +194,14 @@ class ApplicationLogic(BaseApplication):
self.abort_if_abci_chain_is_not_synced()
logger.debug("deliver_tx: %s", raw_transaction)
transaction = self.validator.is_valid_transaction(decode_transaction(raw_transaction), self.block_transactions)
transaction = None
try:
transaction = self.validator.is_valid_transaction(decode_transaction(raw_transaction), self.block_transactions)
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
if not transaction:
logger.debug("deliver_tx: INVALID")
return ResponseDeliverTx(code=CodeTypeError)
@ -207,20 +229,25 @@ class ApplicationLogic(BaseApplication):
# `end_block` or `commit`
logger.debug(f"Updating pre-commit state: {self.new_height}")
pre_commit_state = dict(height=self.new_height, transactions=self.block_txn_ids)
self.models.store_pre_commit_state(pre_commit_state)
try:
self.validator.models.store_pre_commit_state(pre_commit_state)
block_txn_hash = calculate_hash(self.block_txn_ids)
block = self.models.get_latest_block()
block_txn_hash = calculate_hash(self.block_txn_ids)
block = self.validator.models.get_latest_block()
logger.debug("BLOCK: ", block)
logger.debug("BLOCK: ", block)
if self.block_txn_ids:
self.block_txn_hash = calculate_hash([block["app_hash"], block_txn_hash])
else:
self.block_txn_hash = block["app_hash"]
validator_update = self.validator.process_block(self.new_height, self.block_transactions)
if self.block_txn_ids:
self.block_txn_hash = calculate_hash([block["app_hash"], block_txn_hash])
else:
self.block_txn_hash = block["app_hash"]
validator_update = self.validator.process_block(self.new_height, self.block_transactions)
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
return ResponseEndBlock(validator_updates=validator_update)
def commit(self):
@ -229,20 +256,24 @@ class ApplicationLogic(BaseApplication):
self.abort_if_abci_chain_is_not_synced()
data = self.block_txn_hash.encode("utf-8")
try:
# register a new block only when new transactions are received
if self.block_txn_ids:
self.validator.models.store_bulk_transactions(self.block_transactions)
# register a new block only when new transactions are received
if self.block_txn_ids:
self.models.store_bulk_transactions(self.block_transactions)
block = Block(
app_hash=self.block_txn_hash,
height=self.new_height,
transactions=self.block_txn_ids,
)
# NOTE: storing the block should be the last operation during commit
# this effects crash recovery. Refer BEP#8 for details
self.models.store_block(block._asdict())
block = Block(
app_hash=self.block_txn_hash,
height=self.new_height,
transactions=self.block_txn_ids,
)
# NOTE: storing the block should be the last operation during commit
# this effects crash recovery. Refer BEP#8 for details
self.validator.models.store_block(block._asdict())
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
logger.debug(
"Commit-ing new block with hash: apphash=%s ," "height=%s, txn ids=%s",
data,

View File

@ -21,6 +21,8 @@ class OperationError(BackendError):
class OperationDataInsertionError(BackendError):
"""Exception raised when a Database operation fails."""
class DBConcurrencyError(BackendError):
"""Exception raised when a Database operation fails."""
class DuplicateKeyError(OperationError):
"""Exception raised when an insert fails because the key is not unique"""

View File

@ -79,9 +79,6 @@ class TarantoolDBConnection(DBConnection):
def get_space(self, space_name: str):
return self.connect().space(space_name)
def space(self, space_name: str):
return self.get_space(space_name)
def drop_database(self):
self.connect().call("drop")

View File

@ -14,6 +14,7 @@ from planetmint.backend import query
from planetmint.backend.models.dbtransaction import DbTransaction
from planetmint.backend.exceptions import OperationDataInsertionError
from planetmint.exceptions import CriticalDoubleSpend
from planetmint.backend.exceptions import DBConcurrencyError
from planetmint.backend.tarantool.const import (
TARANT_TABLE_TRANSACTION,
TARANT_TABLE_OUTPUT,
@ -53,6 +54,18 @@ def catch_db_exception(function_to_decorate):
raise schema_error
except NetworkError as net_error:
raise net_error
except ValueError:
logger.info(f"ValueError in Query/DB instruction: {e}: raising DBConcurrencyError")
raise DBConcurrencyError
except AttributeError:
logger.info(f"Attribute in Query/DB instruction: {e}: raising DBConcurrencyError")
raise DBConcurrencyError
except Exception as e:
logger.info(f"Could not insert transactions: {e}")
if e.args[0] == 3 and e.args[1].startswith("Duplicate key exists in"):
raise CriticalDoubleSpend()
else:
raise OperationDataInsertionError()
return output
return wrapper
@ -76,7 +89,7 @@ def get_complete_transactions_by_ids(connection, txids: list) -> list[DbTransact
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_outputs_by_tx_id(connection, tx_id: str) -> list[Output]:
_outputs = connection.space(TARANT_TABLE_OUTPUT).select(tx_id, index=TARANT_TX_ID_SEARCH).data
_outputs = connection.connect().select(TARANT_TABLE_OUTPUT,tx_id, index=TARANT_TX_ID_SEARCH).data
_sorted_outputs = sorted(_outputs, key=itemgetter(4))
return [Output.from_tuple(output) for output in _sorted_outputs]
@ -93,7 +106,7 @@ def get_transaction(connection, tx_id: str) -> Union[DbTransaction, None]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transactions_by_asset(connection, asset: str, limit: int = 1000) -> list[DbTransaction]:
txs = connection.space(TARANT_TABLE_TRANSACTION).select(asset, limit=limit, index="transactions_by_asset_cid").data
txs = connection.connect().select(TARANT_TABLE_TRANSACTION,asset, limit=limit, index="transactions_by_asset_cid").data
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@ -102,9 +115,8 @@ def get_transactions_by_asset(connection, asset: str, limit: int = 1000) -> list
@catch_db_exception
def get_transactions_by_metadata(connection, metadata: str, limit: int = 1000) -> list[DbTransaction]:
txs = (
connection.space(TARANT_TABLE_TRANSACTION)
.select(metadata, limit=limit, index="transactions_by_metadata_cid")
.data
connection.connect().select(TARANT_TABLE_TRANSACTION,
metadata, limit=limit, index="transactions_by_metadata_cid").data
)
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@ -113,9 +125,8 @@ def get_transactions_by_metadata(connection, metadata: str, limit: int = 1000) -
@catch_db_exception
def store_transaction_outputs(connection, output: Output, index: int) -> str:
output_id = uuid4().hex
try:
connection.space(TARANT_TABLE_OUTPUT).insert(
(
connection.connect().insert(TARANT_TABLE_OUTPUT,
(
output_id,
int(output.amount),
output.public_keys,
@ -124,16 +135,7 @@ def store_transaction_outputs(connection, output: Output, index: int) -> str:
output.transaction_id,
)
).data
return output_id
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert Output: {e}")
raise OperationDataInsertionError()
return output_id
@register_query(TarantoolDBConnection)
@ -166,26 +168,13 @@ def store_transaction(connection, transaction, table=TARANT_TABLE_TRANSACTION):
transaction["inputs"],
scripts,
)
try:
connection.space(table).insert(tx)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert transactions: {e}")
if e.args[0] == 3 and e.args[1].startswith("Duplicate key exists in"):
raise CriticalDoubleSpend()
else:
raise OperationDataInsertionError()
connection.connect().insert(table,tx)
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transaction_by_id(connection, transaction_id, table=TARANT_TABLE_TRANSACTION):
txs = connection.space(table).select(transaction_id, index=TARANT_ID_SEARCH)
txs = connection.connect().select(table, transaction_id, index=TARANT_ID_SEARCH)
if len(txs) == 0:
return None
return DbTransaction.from_tuple(txs[0])
@ -205,7 +194,7 @@ def get_transactions(connection, transactions_ids: list) -> list[DbTransaction]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset(connection, asset_id: str) -> Asset:
connection.space(TARANT_TABLE_TRANSACTION).select(asset_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
return Asset.from_dict(_data[0])
@ -214,7 +203,7 @@ def get_asset(connection, asset_id: str) -> Asset:
def get_assets(connection, assets_ids: list) -> list[Asset]:
_returned_data = []
for _id in list(set(assets_ids)):
res = connection.space(TARANT_TABLE_TRANSACTION).select(_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
res = connection.connect().select(TARANT_TABLE_TRANSACTION,_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
if len(res) == 0:
continue
_returned_data.append(res[0])
@ -227,8 +216,7 @@ def get_assets(connection, assets_ids: list) -> list[Asset]:
@catch_db_exception
def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str) -> list[DbTransaction]:
_inputs = (
connection.space(TARANT_TABLE_TRANSACTION)
.select([fullfil_transaction_id, fullfil_output_index], index=TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX)
connection.connect().select(TARANT_TABLE_TRANSACTION, [fullfil_transaction_id, fullfil_output_index], index=TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX)
.data
)
return get_complete_transactions_by_ids(txids=[inp[0] for inp in _inputs], connection=connection)
@ -237,7 +225,7 @@ def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_latest_block(connection) -> Union[dict, None]:
blocks = connection.space(TARANT_TABLE_BLOCKS).select().data
blocks = connection.connect().select(TARANT_TABLE_BLOCKS).data
if not blocks:
return None
@ -250,17 +238,7 @@ def get_latest_block(connection) -> Union[dict, None]:
@catch_db_exception
def store_block(connection, block: dict):
block_unique_id = uuid4().hex
try:
connection.space(TARANT_TABLE_BLOCKS).insert(
(block_unique_id, block["app_hash"], block["height"], block[TARANT_TABLE_TRANSACTION])
)
except OperationalError as op_error:
raise op_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert block: {e}")
raise OperationDataInsertionError()
connection.connect().insert(TARANT_TABLE_BLOCKS, (block_unique_id, block["app_hash"], block["height"], block[TARANT_TABLE_TRANSACTION]))
@register_query(TarantoolDBConnection)
@ -269,18 +247,17 @@ def get_txids_filtered(connection, asset_ids: list[str], operation: str = "", la
transactions = []
if operation == "CREATE":
transactions = (
connection.space(TARANT_TABLE_TRANSACTION)
.select([asset_ids[0], operation], index="transactions_by_id_and_operation")
connection.connect().select(TARANT_TABLE_TRANSACTION,[asset_ids[0], operation], index="transactions_by_id_and_operation")
.data
)
elif operation == "TRANSFER":
transactions = (
connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
)
else:
txs = connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_ID_SEARCH).data
txs = connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_ID_SEARCH).data
asset_txs = (
connection.space(TARANT_TABLE_TRANSACTION).select(asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
)
transactions = txs + asset_txs
@ -296,7 +273,7 @@ def get_txids_filtered(connection, asset_ids: list[str], operation: str = "", la
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_owned_ids(connection, owner: str) -> list[DbTransaction]:
outputs = connection.space(TARANT_TABLE_OUTPUT).select(owner, index="public_keys").data
outputs = connection.connect().select(TARANT_TABLE_OUTPUT,owner, index="public_keys").data
if len(outputs) == 0:
return []
txids = [output[5] for output in outputs]
@ -322,7 +299,7 @@ def get_spending_transactions(connection, inputs):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_block(connection, block_id=None) -> Union[dict, None]:
_block = connection.space(TARANT_TABLE_BLOCKS).select(block_id, index="height", limit=1).data
_block = connection.connect().select(TARANT_TABLE_BLOCKS, block_id, index="height", limit=1).data
if len(_block) == 0:
return
_block = Block.from_tuple(_block[0])
@ -332,7 +309,7 @@ def get_block(connection, block_id=None) -> Union[dict, None]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_block_with_transaction(connection, txid: str) -> Union[dict, None]:
_block = connection.space(TARANT_TABLE_BLOCKS).select(txid, index="block_by_transaction_id").data
_block = connection.connect().select(TARANT_TABLE_BLOCKS, txid, index="block_by_transaction_id").data
if len(_block) == 0:
return
_block = Block.from_tuple(_block[0])
@ -342,21 +319,13 @@ def get_block_with_transaction(connection, txid: str) -> Union[dict, None]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_transactions(connection, txn_ids: list):
try:
for _id in txn_ids:
_outputs = get_outputs_by_tx_id(connection, _id)
for x in range(len(_outputs)):
connection.connect().call("delete_output", (_outputs[x].id))
for _id in txn_ids:
connection.space(TARANT_TABLE_TRANSACTION).delete(_id)
connection.space(TARANT_TABLE_GOVERNANCE).delete(_id)
except OperationalError as op_error:
raise op_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert unspent output: {e}")
raise OperationDataInsertionError()
for _id in txn_ids:
_outputs = get_outputs_by_tx_id(connection, _id)
for x in range(len(_outputs)):
connection.connect().call("delete_output", (_outputs[x].id))
for _id in txn_ids:
connection.connect().delete(TARANT_TABLE_TRANSACTION,_id)
connection.connect().delete(TARANT_TABLE_GOVERNANCE,_id)
@register_query(TarantoolDBConnection)
@ -367,8 +336,8 @@ def store_unspent_outputs(connection, *unspent_outputs: list):
for utxo in unspent_outputs:
try:
output = (
connection.space(TARANT_TABLE_UTXOS)
.insert((uuid4().hex, utxo["transaction_id"], utxo["output_index"], utxo))
connection.connect().insert(TARANT_TABLE_UTXOS,
(uuid4().hex, utxo["transaction_id"], utxo["output_index"], utxo))
.data
)
result.append(output)
@ -385,8 +354,7 @@ def delete_unspent_outputs(connection, *unspent_outputs: list):
if unspent_outputs:
for utxo in unspent_outputs:
output = (
connection.space(TARANT_TABLE_UTXOS)
.delete(
connection.connect().delete(TARANT_TABLE_UTXOS,
(utxo["transaction_id"], utxo["output_index"]), index="utxo_by_transaction_id_and_output_index"
)
.data
@ -398,39 +366,29 @@ def delete_unspent_outputs(connection, *unspent_outputs: list):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'.
_utxos = connection.space(TARANT_TABLE_UTXOS).select([]).data
_utxos = connection.connect().select(TARANT_TABLE_UTXOS,[]).data
return [utx[3] for utx in _utxos]
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_pre_commit_state(connection, state: dict):
_precommit = connection.space(TARANT_TABLE_PRE_COMMITS).select([], limit=1).data
_precommit = connection.connect().select(TARANT_TABLE_PRE_COMMITS,[], limit=1).data
_precommitTuple = (
(uuid4().hex, state["height"], state[TARANT_TABLE_TRANSACTION])
if _precommit is None or len(_precommit) == 0
else _precommit[0]
)
try:
connection.space(TARANT_TABLE_PRE_COMMITS).upsert(
_precommitTuple,
op_list=[("=", 1, state["height"]), ("=", 2, state[TARANT_TABLE_TRANSACTION])],
)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert pre commit state: {e}")
raise OperationDataInsertionError()
connection.connect().upsert(TARANT_TABLE_PRE_COMMITS,
_precommitTuple,
op_list=[("=", 1, state["height"]), ("=", 2, state[TARANT_TABLE_TRANSACTION])],
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_pre_commit_state(connection) -> dict:
_commit = connection.space(TARANT_TABLE_PRE_COMMITS).select([], index=TARANT_ID_SEARCH).data
_commit = connection.connect().select(TARANT_TABLE_PRE_COMMITS,[], index=TARANT_ID_SEARCH).data
if _commit is None or len(_commit) == 0:
return None
_commit = sorted(_commit, key=itemgetter(1), reverse=False)[0]
@ -441,82 +399,52 @@ def get_pre_commit_state(connection) -> dict:
@catch_db_exception
def store_validator_set(conn, validators_update: dict):
_validator = (
conn.space(TARANT_TABLE_VALIDATOR_SETS).select(validators_update["height"], index="height", limit=1).data
conn.connect().select(TARANT_TABLE_VALIDATOR_SETS, validators_update["height"], index="height", limit=1).data
)
unique_id = uuid4().hex if _validator is None or len(_validator) == 0 else _validator[0][0]
try:
conn.space(TARANT_TABLE_VALIDATOR_SETS).upsert(
(unique_id, validators_update["height"], validators_update["validators"]),
op_list=[("=", 1, validators_update["height"]), ("=", 2, validators_update["validators"])],
)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert validator set: {e}")
raise OperationDataInsertionError()
conn.connect().upsert(TARANT_TABLE_VALIDATOR_SETS,
(unique_id, validators_update["height"], validators_update["validators"]),
op_list=[("=", 1, validators_update["height"]), ("=", 2, validators_update["validators"])],
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_validator_set(connection, height: int):
_validators = connection.space(TARANT_TABLE_VALIDATOR_SETS).select(height, index="height").data
_validators = connection.connect().select(TARANT_TABLE_VALIDATOR_SETS,height, index="height").data
for _valid in _validators:
connection.space(TARANT_TABLE_VALIDATOR_SETS).delete(_valid[0])
connection.connect().delete(TARANT_TABLE_VALIDATOR_SETS,_valid[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_election(connection, election_id: str, height: int, is_concluded: bool):
try:
connection.space(TARANT_TABLE_ELECTIONS).upsert(
(election_id, height, is_concluded), op_list=[("=", 1, height), ("=", 2, is_concluded)]
)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert election: {e}")
raise OperationDataInsertionError()
connection.connect().upsert(TARANT_TABLE_ELECTIONS,
(election_id, height, is_concluded), op_list=[("=", 1, height), ("=", 2, is_concluded)]
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_elections(connection, elections: list):
try:
for election in elections:
_election = connection.space(TARANT_TABLE_ELECTIONS).insert(
(election["election_id"], election["height"], election["is_concluded"])
)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert elections: {e}")
raise OperationDataInsertionError()
for election in elections:
_election = connection.connect().insert(TARANT_TABLE_ELECTIONS,
(election["election_id"], election["height"], election["is_concluded"])
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_elections(connection, height: int):
_elections = connection.space(TARANT_TABLE_ELECTIONS).select(height, index="height").data
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS,height, index="height").data
for _elec in _elections:
connection.space(TARANT_TABLE_ELECTIONS).delete(_elec[0])
connection.connect().delete(TARANT_TABLE_ELECTIONS,_elec[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_validator_set(connection, height: int = None):
_validators = connection.space(TARANT_TABLE_VALIDATOR_SETS).select().data
_validators = connection.connect().select(TARANT_TABLE_VALIDATOR_SETS).data
if height is not None and _validators is not None:
_validators = [
{"height": validator[1], "validators": validator[2]} for validator in _validators if validator[1] <= height
@ -531,7 +459,7 @@ def get_validator_set(connection, height: int = None):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_election(connection, election_id: str) -> dict:
_elections = connection.space(TARANT_TABLE_ELECTIONS).select(election_id, index=TARANT_ID_SEARCH).data
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS,election_id, index=TARANT_ID_SEARCH).data
if _elections is None or len(_elections) == 0:
return None
_election = sorted(_elections, key=itemgetter(0), reverse=True)[0]
@ -541,9 +469,9 @@ def get_election(connection, election_id: str) -> dict:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str) -> list[DbTransaction]:
id_transactions = connection.space(TARANT_TABLE_GOVERNANCE).select([asset_id]).data
id_transactions = connection.connect().select(TARANT_TABLE_GOVERNANCE,[asset_id]).data
asset_id_transactions = (
connection.space(TARANT_TABLE_GOVERNANCE).select([asset_id], index="governance_by_asset_id").data
connection.connect().select(TARANT_TABLE_GOVERNANCE,[asset_id], index="governance_by_asset_id").data
)
transactions = id_transactions + asset_id_transactions
@ -553,33 +481,23 @@ def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str)
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True):
try:
connection.space(TARANT_TABLE_ABCI_CHAINS).upsert(
(chain_id, height, is_synced),
op_list=[("=", 0, chain_id), ("=", 1, height), ("=", 2, is_synced)],
)
except OperationalError as op_error:
raise op_error
except SchemaError as schema_error:
raise schema_error
except NetworkError as net_error:
raise net_error
except Exception as e:
logger.info(f"Could not insert abci-chain: {e}")
raise OperationDataInsertionError()
connection.connect().upsert(TARANT_TABLE_ABCI_CHAINS,
(chain_id, height, is_synced),
op_list=[("=", 0, chain_id), ("=", 1, height), ("=", 2, is_synced)],
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_abci_chain(connection, height: int):
chains = connection.space(TARANT_TABLE_ABCI_CHAINS).select(height, index="height")
connection.space(TARANT_TABLE_ABCI_CHAINS).delete(chains[0][0], index="id")
chains = connection.connect().select(TARANT_TABLE_ABCI_CHAINS,height, index="height")
connection.connect().delete(TARANT_TABLE_ABCI_CHAINS,chains[0][0], index="id")
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_latest_abci_chain(connection) -> Union[dict, None]:
_all_chains = connection.space(TARANT_TABLE_ABCI_CHAINS).select().data
_all_chains = connection.connect().select(TARANT_TABLE_ABCI_CHAINS).data
if _all_chains is None or len(_all_chains) == 0:
return None
_chain = sorted(_all_chains, key=itemgetter(1), reverse=True)[0]

View File

@ -24,7 +24,6 @@ from transactions.types.elections.validator_election import ValidatorElection
from transactions.common.transaction import Transaction
from planetmint.abci.rpc import ABCI_RPC
from planetmint.abci.utils import load_node_key, public_key_from_base64
from planetmint.application.validator import Validator
from planetmint.backend import schema
@ -112,7 +111,7 @@ def run_configure(args):
def run_election(args):
"""Initiate and manage elections"""
b = Planetmint()
b = Validator()
# Call the function specified by args.action, as defined above
globals()[f"run_election_{args.action}"](args, b)
@ -292,7 +291,8 @@ def run_start(args):
validator = Validator()
validator.rollback()
del validator
logger.info("Starting Planetmint main process.")
from planetmint.start import start
@ -384,6 +384,21 @@ def create_parser():
action="store_true",
help="💀 EXPERIMENTAL: parallelize validation for better throughput 💀",
)
start_parser.add_argument(
"--web-api-only",
dest="web_api_only",
default=False,
action="store_true",
help="💀 EXPERIMENTAL: seperate web API from ABCI server 💀",
)
start_parser.add_argument(
"--abci-only",
dest="abci_only",
default=False,
action="store_true",
help="💀 EXPERIMENTAL: seperate web API from ABCI server 💀",
)
return parser

View File

@ -33,21 +33,21 @@ BANNER = """
* *
****************************************************************************
"""
def start(args):
# Exchange object for event stream api
logger.info("Starting Planetmint")
exchange = Exchange()
# start the web api
def start_web_api(args):
app_server = server.create_server(
settings=Config().get()["server"], log_config=Config().get()["log"], planetmint_factory=Validator
)
p_webapi = Process(name="planetmint_webapi", target=app_server.run, daemon=True)
p_webapi.start()
if args.web_api_only:
app_server.run()
else:
p_webapi = Process(name="planetmint_webapi", target=app_server.run, daemon=True)
p_webapi.start()
def start_abci_server(args):
logger.info(BANNER.format(__version__, Config().get()["server"]["bind"]))
exchange = Exchange()
# start websocket server
p_websocket_server = Process(
name="planetmint_ws",
@ -67,20 +67,28 @@ def start(args):
setproctitle.setproctitle("planetmint")
# Start the ABCIServer
abci_server_app = None
publisher_queue=exchange.get_publisher_queue()
if args.experimental_parallel_validation:
app = ABCIServer(
app=ParallelValidationApp(
events_queue=exchange.get_publisher_queue(),
)
)
abci_server_app = ParallelValidationApp(events_queue=publisher_queue)
else:
app = ABCIServer(
app=ApplicationLogic(
events_queue=exchange.get_publisher_queue(),
)
)
abci_server_app = ApplicationLogic(events_queue=publisher_queue)
app = ABCIServer( abci_server_app )
app.run()
def start(args):
logger.info("Starting Planetmint")
if args.web_api_only:
start_web_api(args)
elif args.abci_only:
start_abci_server(args)
else:
start_web_api(args)
start_abci_server(args)
if __name__ == "__main__":

39
poetry.lock generated
View File

@ -12,25 +12,6 @@ files = [
{file = "aafigure-0.6.tar.gz", hash = "sha256:49f2c1fd2b579c1fffbac1386a2670b3f6f475cc7ff6cc04d8b984888c2d9e1e"},
]
[[package]]
name = "abci"
version = "0.8.3"
description = "Python based ABCI Server for Tendermint"
category = "main"
optional = false
python-versions = ">=3.9"
files = [
{file = "abci-0.8.3-py3-none-any.whl", hash = "sha256:9f6b9d9a28ccb90bc158572fd05d7a196e09cbe774c72053a18a7b5f602e406f"},
{file = "abci-0.8.3.tar.gz", hash = "sha256:2c85c7aa78b7a1785b72e6af789ca9f132088b2ba63d90e85af114d65b609fb3"},
]
[package.dependencies]
colorlog = ">=3.1.4"
protobuf = ">=3.6.1"
[package.extras]
dev = ["black", "build", "pytest", "pytest-cov", "twine"]
[[package]]
name = "aiohttp"
version = "3.8.1"
@ -554,24 +535,6 @@ files = [
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]]
name = "colorlog"
version = "6.7.0"
description = "Add colours to the output of Python's logging module."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "colorlog-6.7.0-py2.py3-none-any.whl", hash = "sha256:0d33ca236784a1ba3ff9c532d4964126d8a2c44f1f0cb1d2b0728196f512f662"},
{file = "colorlog-6.7.0.tar.gz", hash = "sha256:bd94bd21c1e13fac7bd3153f4bc3a7dc0eb0974b8bc2fdf1a989e474f6e582e5"},
]
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
[package.extras]
development = ["black", "flake8", "mypy", "pytest", "types-colorama"]
[[package]]
name = "commonmark"
version = "0.9.1"
@ -3461,4 +3424,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "897f5a79e5084fc44f88d55ad8900da39fec9fa7dc14abb33a50c569e8e7c841"
content-hash = "ca3fd9f0bf5cec090ba164e79ab12e68bf9fece00c66b321933f721918faf349"

View File

@ -26,7 +26,6 @@ python = "^3.9"
chardet = "3.0.4"
base58 = "2.1.1"
aiohttp = "3.8.1"
abci = "0.8.3"
flask-cors = "3.0.10"
flask-restful = "0.3.9"
flask = "2.1.2"

View File

@ -37,7 +37,6 @@ def test_get_txids_filtered(signed_create_tx, signed_transfer_tx, db_conn):
def test_get_owned_ids(signed_create_tx, user_pk, db_conn):
from planetmint.backend.tarantool.sync_io import query
from planetmint.backend.connection import Connection
# insert a transaction
query.store_transactions(connection=db_conn, signed_transactions=[signed_create_tx.to_dict()])
@ -55,7 +54,7 @@ def test_store_block(db_conn):
block = Block(app_hash="random_utxo", height=3, transactions=[])
query.store_block(connection=db_conn, block=block._asdict())
# block = query.get_block(connection=db_conn)
blocks = db_conn.space("blocks").select([]).data
blocks = db_conn.connect().select("blocks",[]).data
assert len(blocks) == 1
@ -87,9 +86,9 @@ def test_store_pre_commit_state(db_conn):
def test_get_pre_commit_state(db_conn):
from planetmint.backend.tarantool.sync_io import query
all_pre = db_conn.space("pre_commits").select([]).data
all_pre = db_conn.connect().select("pre_commits",[]).data
for pre in all_pre:
db_conn.space("pre_commits").delete(pre[0])
db_conn.connect().delete("pre_commits",pre[0])
# TODO First IN, First OUT
state = dict(height=3, transactions=[])
# db_context.conn.db.pre_commit.insert_one