prepared AsyncIO seperation

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-02-27 22:57:09 +01:00
parent 8d3e1fd725
commit 4632a52599
No known key found for this signature in database
4 changed files with 52 additions and 19 deletions

View File

@ -24,7 +24,11 @@ from tendermint.abci.types_pb2 import (
from planetmint.application.validator import Validator
from planetmint.model.models import Models
from planetmint.abci.tendermint_utils import decode_transaction, calculate_hash, decode_validator
from planetmint.abci.tendermint_utils import (
decode_transaction,
calculate_hash,
decode_validator,
)
from planetmint.abci.block import Block
from planetmint.ipc.events import EventTypes, Event
@ -39,12 +43,18 @@ class App(BaseApplication):
transaction logic to Tendermint Core.
"""
def __init__(self, planetmint_node=None, events_queue=None, models: Models = None, validator: Validator = None):
def __init__(
self,
planetmint_node=None,
events_queue=None,
models: Models = None,
validator: Validator = None,
):
# super().__init__(abci)
logger.debug("Checking values of types")
logger.debug(dir(types_pb2))
self.events_queue = events_queue
self.validator = Validator()
self.validator = Validator()#(async_io=True)
self.models = models or Models()
self.block_txn_ids = []
self.block_txn_hash = ""
@ -56,7 +66,7 @@ class App(BaseApplication):
def log_abci_migration_error(self, chain_id, validators):
logger.error(
"An ABCI chain migration is in process. "
"Download theself.planetmint_node.get_latest_abci_chain new ABCI client and configure it with "
"Download the self.planetmint_node.get_latest_abci_chain new ABCI client and configure it with "
f"chain_id={chain_id} and validators={validators}."
)
@ -76,7 +86,10 @@ class App(BaseApplication):
chain_id = known_chain["chain_id"]
if known_chain["is_synced"]:
msg = f"Got invalid InitChain ABCI request ({genesis}) - " f"the chain {chain_id} is already synced."
msg = (
f"Got invalid InitChain ABCI request ({genesis}) - "
f"the chain {chain_id} is already synced."
)
logger.error(msg)
sys.exit(1)
if chain_id != genesis.chain_id:
@ -97,7 +110,11 @@ class App(BaseApplication):
self.models.store_validator_set(height + 1, validator_set)
abci_chain_height = 0 if known_chain is None else known_chain["height"]
self.models.store_abci_chain(abci_chain_height, genesis.chain_id, True)
self.chain = {"height": abci_chain_height, "is_synced": True, "chain_id": genesis.chain_id}
self.chain = {
"height": abci_chain_height,
"is_synced": True,
"chain_id": genesis.chain_id,
}
return ResponseInitChain()
def info(self, request):
@ -153,7 +170,9 @@ class App(BaseApplication):
chain_shift = 0 if self.chain is None else self.chain["height"]
# req_begin_block.header.num_txs not found, so removing it.
logger.debug("BEGIN BLOCK, height:%s", req_begin_block.header.height + chain_shift)
logger.debug(
"BEGIN BLOCK, height:%s", req_begin_block.header.height + chain_shift
)
self.block_txn_ids = []
self.block_transactions = []
@ -169,7 +188,9 @@ class App(BaseApplication):
self.abort_if_abci_chain_is_not_synced()
logger.debug("deliver_tx: %s", raw_transaction)
transaction = self.validator.is_valid_transaction(decode_transaction(raw_transaction), self.block_transactions)
transaction = self.validator.is_valid_transaction(
decode_transaction(raw_transaction), self.block_transactions
)
if not transaction:
logger.debug("deliver_tx: INVALID")
@ -210,7 +231,9 @@ class App(BaseApplication):
else:
self.block_txn_hash = block["app_hash"]
validator_update = self.validator.process_block(self.new_height, self.block_transactions)
validator_update = self.validator.process_block(
self.new_height, self.block_transactions
)
return ResponseEndBlock(validator_updates=validator_update)
@ -225,7 +248,11 @@ class App(BaseApplication):
if self.block_txn_ids:
self.models.store_bulk_transactions(self.block_transactions)
block = Block(app_hash=self.block_txn_hash, height=self.new_height, transactions=self.block_txn_ids)
block = Block(
app_hash=self.block_txn_hash,
height=self.new_height,
transactions=self.block_txn_ids,
)
# NOTE: storing the block should be the last operation during commit
# this effects crash recovery. Refer BEP#8 for details
self.models.store_block(block._asdict())
@ -240,7 +267,11 @@ class App(BaseApplication):
if self.events_queue:
event = Event(
EventTypes.BLOCK_VALID,
{"height": self.new_height, "hash": self.block_txn_hash, "transactions": self.block_transactions},
{
"height": self.new_height,
"hash": self.block_txn_hash,
"transactions": self.block_transactions,
},
)
self.events_queue.put(event)

View File

@ -40,11 +40,11 @@ logger = logging.getLogger(__name__)
class Validator:
def __init__(self, async_io: bool = False):
self.async_io = async_io
self.models = Models()
self.validation = Validator._get_validationmethod()
self.models = Models(async_io=async_io)
self.validation = Validator._get_validation_method()
@staticmethod
def _get_validationmethod():
def _get_validation_method():
validationPlugin = Config().get().get("validation_plugin")
if validationPlugin:

View File

@ -39,6 +39,8 @@ class DBSingleton(type):
logger.info("Backend {} not supported".format(backend))
raise ConfigurationError
modulepath, _, class_name = BACKENDS[backend].rpartition(".")
if "async_io" in kwargs and kwargs["async_io"] == True:
class_name = class_name+"AsyncIO"
Class = getattr(import_module(modulepath), class_name)
cls._instances[cls] = super(DBSingleton, Class).__call__(*args, **kwargs)
return cls._instances[cls]
@ -64,6 +66,7 @@ class DBConnection(metaclass=DBSingleton):
backend: str = None,
connection_timeout: int = None,
max_tries: int = None,
async_io: bool = False,
**kwargs
):
"""Create a new :class:`~.Connection` instance.

View File

@ -8,11 +8,10 @@ from transactions.common.exceptions import InputDoesNotExist
from planetmint import config_utils, backend
from planetmint.const import GOVERNANCE_TRANSACTION_TYPES
from planetmint.backend.connection import Connection
from planetmint.backend.tarantool.const import TARANT_TABLE_TRANSACTION, TARANT_TABLE_GOVERNANCE
from planetmint.model.fastquery import FastQuery
from planetmint.abci.tendermint_utils import key_from_base64
from planetmint.backend.connection import Connection
from planetmint.backend.tarantool.const import TARANT_TABLE_TRANSACTION, TARANT_TABLE_GOVERNANCE
from planetmint.backend.models.block import Block
from planetmint.backend.models.output import Output
from planetmint.backend.models.asset import Asset
@ -21,9 +20,9 @@ from planetmint.backend.models.dbtransaction import DbTransaction
class Models:
def __init__(self, database_connection=None):
def __init__(self, database_connection=None, async_io: bool = False):
config_utils.autoconfigure()
self.connection = database_connection if database_connection is not None else Connection()
self.connection = database_connection if database_connection is not None else Connection(async_io=async_io)
def store_bulk_transactions(self, transactions):
txns = []