blackified

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-03-01 16:11:54 +01:00
parent 367b101fb2
commit 50d1276e29
No known key found for this signature in database
7 changed files with 99 additions and 72 deletions

View File

@ -81,7 +81,9 @@ class ApplicationLogic(BaseApplication):
chain_id = known_chain["chain_id"]
if known_chain["is_synced"]:
msg = f"Got invalid InitChain ABCI request ({genesis}) - " f"the chain {chain_id} is already synced."
msg = (
f"Got invalid InitChain ABCI request ({genesis}) - " f"the chain {chain_id} is already synced."
)
logger.error(msg)
sys.exit(1)
if chain_id != genesis.chain_id:
@ -127,13 +129,13 @@ class ApplicationLogic(BaseApplication):
# logger.info(f"Tendermint version: {request.version}")
r = ResponseInfo()
block= None
block = None
try:
block = self.validator.models.get_latest_block()
except DBConcurrencyError:
block= None
block = None
except ValueError:
block=None
block = None
if block:
chain_shift = 0 if self.chain is None else self.chain["height"]
r.last_block_height = block["height"] - chain_shift
@ -166,7 +168,6 @@ class ApplicationLogic(BaseApplication):
sys.exit(1)
except ValueError:
sys.exit(1)
def begin_block(self, req_begin_block):
"""Initialize list of transaction.
@ -196,12 +197,14 @@ class ApplicationLogic(BaseApplication):
logger.debug("deliver_tx: %s", raw_transaction)
transaction = None
try:
transaction = self.validator.is_valid_transaction(decode_transaction(raw_transaction), self.block_transactions)
transaction = self.validator.is_valid_transaction(
decode_transaction(raw_transaction), self.block_transactions
)
except DBConcurrencyError:
sys.exit(1)
except ValueError:
sys.exit(1)
if not transaction:
logger.debug("deliver_tx: INVALID")
return ResponseDeliverTx(code=CodeTypeError)
@ -247,7 +250,7 @@ class ApplicationLogic(BaseApplication):
sys.exit(1)
except ValueError:
sys.exit(1)
return ResponseEndBlock(validator_updates=validator_update)
def commit(self):
@ -273,7 +276,7 @@ class ApplicationLogic(BaseApplication):
sys.exit(1)
except ValueError:
sys.exit(1)
logger.debug(
"Commit-ing new block with hash: apphash=%s ," "height=%s, txn ids=%s",
data,

View File

@ -21,8 +21,10 @@ class OperationError(BackendError):
class OperationDataInsertionError(BackendError):
"""Exception raised when a Database operation fails."""
class DBConcurrencyError(BackendError):
"""Exception raised when a Database operation fails."""
class DuplicateKeyError(OperationError):
"""Exception raised when an insert fails because the key is not unique"""

View File

@ -89,7 +89,7 @@ def get_complete_transactions_by_ids(connection, txids: list) -> list[DbTransact
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_outputs_by_tx_id(connection, tx_id: str) -> list[Output]:
_outputs = connection.connect().select(TARANT_TABLE_OUTPUT,tx_id, index=TARANT_TX_ID_SEARCH).data
_outputs = connection.connect().select(TARANT_TABLE_OUTPUT, tx_id, index=TARANT_TX_ID_SEARCH).data
_sorted_outputs = sorted(_outputs, key=itemgetter(4))
return [Output.from_tuple(output) for output in _sorted_outputs]
@ -106,7 +106,11 @@ def get_transaction(connection, tx_id: str) -> Union[DbTransaction, None]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_transactions_by_asset(connection, asset: str, limit: int = 1000) -> list[DbTransaction]:
txs = connection.connect().select(TARANT_TABLE_TRANSACTION,asset, limit=limit, index="transactions_by_asset_cid").data
txs = (
connection.connect()
.select(TARANT_TABLE_TRANSACTION, asset, limit=limit, index="transactions_by_asset_cid")
.data
)
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@ -115,8 +119,9 @@ def get_transactions_by_asset(connection, asset: str, limit: int = 1000) -> list
@catch_db_exception
def get_transactions_by_metadata(connection, metadata: str, limit: int = 1000) -> list[DbTransaction]:
txs = (
connection.connect().select(TARANT_TABLE_TRANSACTION,
metadata, limit=limit, index="transactions_by_metadata_cid").data
connection.connect()
.select(TARANT_TABLE_TRANSACTION, metadata, limit=limit, index="transactions_by_metadata_cid")
.data
)
tx_ids = [tx[0] for tx in txs]
return get_complete_transactions_by_ids(connection, tx_ids)
@ -125,16 +130,17 @@ def get_transactions_by_metadata(connection, metadata: str, limit: int = 1000) -
@catch_db_exception
def store_transaction_outputs(connection, output: Output, index: int) -> str:
output_id = uuid4().hex
connection.connect().insert(TARANT_TABLE_OUTPUT,
(
output_id,
int(output.amount),
output.public_keys,
output.condition.to_dict(),
index,
output.transaction_id,
)
).data
connection.connect().insert(
TARANT_TABLE_OUTPUT,
(
output_id,
int(output.amount),
output.public_keys,
output.condition.to_dict(),
index,
output.transaction_id,
),
).data
return output_id
@ -168,7 +174,7 @@ def store_transaction(connection, transaction, table=TARANT_TABLE_TRANSACTION):
transaction["inputs"],
scripts,
)
connection.connect().insert(table,tx)
connection.connect().insert(table, tx)
@register_query(TarantoolDBConnection)
@ -194,7 +200,7 @@ def get_transactions(connection, transactions_ids: list) -> list[DbTransaction]:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset(connection, asset_id: str) -> Asset:
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION, asset_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
return Asset.from_dict(_data[0])
@ -203,7 +209,7 @@ def get_asset(connection, asset_id: str) -> Asset:
def get_assets(connection, assets_ids: list) -> list[Asset]:
_returned_data = []
for _id in list(set(assets_ids)):
res = connection.connect().select(TARANT_TABLE_TRANSACTION,_id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
res = connection.connect().select(TARANT_TABLE_TRANSACTION, _id, index=TARANT_INDEX_TX_BY_ASSET_ID).data
if len(res) == 0:
continue
_returned_data.append(res[0])
@ -216,7 +222,12 @@ def get_assets(connection, assets_ids: list) -> list[Asset]:
@catch_db_exception
def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str) -> list[DbTransaction]:
_inputs = (
connection.connect().select(TARANT_TABLE_TRANSACTION, [fullfil_transaction_id, fullfil_output_index], index=TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX)
connection.connect()
.select(
TARANT_TABLE_TRANSACTION,
[fullfil_transaction_id, fullfil_output_index],
index=TARANT_INDEX_SPENDING_BY_ID_AND_OUTPUT_INDEX,
)
.data
)
return get_complete_transactions_by_ids(txids=[inp[0] for inp in _inputs], connection=connection)
@ -238,7 +249,9 @@ def get_latest_block(connection) -> Union[dict, None]:
@catch_db_exception
def store_block(connection, block: dict):
block_unique_id = uuid4().hex
connection.connect().insert(TARANT_TABLE_BLOCKS, (block_unique_id, block["app_hash"], block["height"], block[TARANT_TABLE_TRANSACTION]))
connection.connect().insert(
TARANT_TABLE_BLOCKS, (block_unique_id, block["app_hash"], block["height"], block[TARANT_TABLE_TRANSACTION])
)
@register_query(TarantoolDBConnection)
@ -247,17 +260,18 @@ def get_txids_filtered(connection, asset_ids: list[str], operation: str = "", la
transactions = []
if operation == "CREATE":
transactions = (
connection.connect().select(TARANT_TABLE_TRANSACTION,[asset_ids[0], operation], index="transactions_by_id_and_operation")
connection.connect()
.select(TARANT_TABLE_TRANSACTION, [asset_ids[0], operation], index="transactions_by_id_and_operation")
.data
)
elif operation == "TRANSFER":
transactions = (
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION, asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
)
else:
txs = connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_ID_SEARCH).data
txs = connection.connect().select(TARANT_TABLE_TRANSACTION, asset_ids, index=TARANT_ID_SEARCH).data
asset_txs = (
connection.connect().select(TARANT_TABLE_TRANSACTION,asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
connection.connect().select(TARANT_TABLE_TRANSACTION, asset_ids, index=TARANT_INDEX_TX_BY_ASSET_ID).data
)
transactions = txs + asset_txs
@ -273,7 +287,7 @@ def get_txids_filtered(connection, asset_ids: list[str], operation: str = "", la
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_owned_ids(connection, owner: str) -> list[DbTransaction]:
outputs = connection.connect().select(TARANT_TABLE_OUTPUT,owner, index="public_keys").data
outputs = connection.connect().select(TARANT_TABLE_OUTPUT, owner, index="public_keys").data
if len(outputs) == 0:
return []
txids = [output[5] for output in outputs]
@ -324,8 +338,8 @@ def delete_transactions(connection, txn_ids: list):
for x in range(len(_outputs)):
connection.connect().call("delete_output", (_outputs[x].id))
for _id in txn_ids:
connection.connect().delete(TARANT_TABLE_TRANSACTION,_id)
connection.connect().delete(TARANT_TABLE_GOVERNANCE,_id)
connection.connect().delete(TARANT_TABLE_TRANSACTION, _id)
connection.connect().delete(TARANT_TABLE_GOVERNANCE, _id)
@register_query(TarantoolDBConnection)
@ -336,8 +350,8 @@ def store_unspent_outputs(connection, *unspent_outputs: list):
for utxo in unspent_outputs:
try:
output = (
connection.connect().insert(TARANT_TABLE_UTXOS,
(uuid4().hex, utxo["transaction_id"], utxo["output_index"], utxo))
connection.connect()
.insert(TARANT_TABLE_UTXOS, (uuid4().hex, utxo["transaction_id"], utxo["output_index"], utxo))
.data
)
result.append(output)
@ -354,8 +368,11 @@ def delete_unspent_outputs(connection, *unspent_outputs: list):
if unspent_outputs:
for utxo in unspent_outputs:
output = (
connection.connect().delete(TARANT_TABLE_UTXOS,
(utxo["transaction_id"], utxo["output_index"]), index="utxo_by_transaction_id_and_output_index"
connection.connect()
.delete(
TARANT_TABLE_UTXOS,
(utxo["transaction_id"], utxo["output_index"]),
index="utxo_by_transaction_id_and_output_index",
)
.data
)
@ -366,20 +383,21 @@ def delete_unspent_outputs(connection, *unspent_outputs: list):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'.
_utxos = connection.connect().select(TARANT_TABLE_UTXOS,[]).data
_utxos = connection.connect().select(TARANT_TABLE_UTXOS, []).data
return [utx[3] for utx in _utxos]
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_pre_commit_state(connection, state: dict):
_precommit = connection.connect().select(TARANT_TABLE_PRE_COMMITS,[], limit=1).data
_precommit = connection.connect().select(TARANT_TABLE_PRE_COMMITS, [], limit=1).data
_precommitTuple = (
(uuid4().hex, state["height"], state[TARANT_TABLE_TRANSACTION])
if _precommit is None or len(_precommit) == 0
else _precommit[0]
)
connection.connect().upsert(TARANT_TABLE_PRE_COMMITS,
connection.connect().upsert(
TARANT_TABLE_PRE_COMMITS,
_precommitTuple,
op_list=[("=", 1, state["height"]), ("=", 2, state[TARANT_TABLE_TRANSACTION])],
)
@ -388,7 +406,7 @@ def store_pre_commit_state(connection, state: dict):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_pre_commit_state(connection) -> dict:
_commit = connection.connect().select(TARANT_TABLE_PRE_COMMITS,[], index=TARANT_ID_SEARCH).data
_commit = connection.connect().select(TARANT_TABLE_PRE_COMMITS, [], index=TARANT_ID_SEARCH).data
if _commit is None or len(_commit) == 0:
return None
_commit = sorted(_commit, key=itemgetter(1), reverse=False)[0]
@ -402,7 +420,8 @@ def store_validator_set(conn, validators_update: dict):
conn.connect().select(TARANT_TABLE_VALIDATOR_SETS, validators_update["height"], index="height", limit=1).data
)
unique_id = uuid4().hex if _validator is None or len(_validator) == 0 else _validator[0][0]
conn.connect().upsert(TARANT_TABLE_VALIDATOR_SETS,
conn.connect().upsert(
TARANT_TABLE_VALIDATOR_SETS,
(unique_id, validators_update["height"], validators_update["validators"]),
op_list=[("=", 1, validators_update["height"]), ("=", 2, validators_update["validators"])],
)
@ -411,16 +430,16 @@ def store_validator_set(conn, validators_update: dict):
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_validator_set(connection, height: int):
_validators = connection.connect().select(TARANT_TABLE_VALIDATOR_SETS,height, index="height").data
_validators = connection.connect().select(TARANT_TABLE_VALIDATOR_SETS, height, index="height").data
for _valid in _validators:
connection.connect().delete(TARANT_TABLE_VALIDATOR_SETS,_valid[0])
connection.connect().delete(TARANT_TABLE_VALIDATOR_SETS, _valid[0])
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_election(connection, election_id: str, height: int, is_concluded: bool):
connection.connect().upsert(TARANT_TABLE_ELECTIONS,
(election_id, height, is_concluded), op_list=[("=", 1, height), ("=", 2, is_concluded)]
connection.connect().upsert(
TARANT_TABLE_ELECTIONS, (election_id, height, is_concluded), op_list=[("=", 1, height), ("=", 2, is_concluded)]
)
@ -428,17 +447,17 @@ def store_election(connection, election_id: str, height: int, is_concluded: bool
@catch_db_exception
def store_elections(connection, elections: list):
for election in elections:
_election = connection.connect().insert(TARANT_TABLE_ELECTIONS,
(election["election_id"], election["height"], election["is_concluded"])
_election = connection.connect().insert(
TARANT_TABLE_ELECTIONS, (election["election_id"], election["height"], election["is_concluded"])
)
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_elections(connection, height: int):
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS,height, index="height").data
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS, height, index="height").data
for _elec in _elections:
connection.connect().delete(TARANT_TABLE_ELECTIONS,_elec[0])
connection.connect().delete(TARANT_TABLE_ELECTIONS, _elec[0])
@register_query(TarantoolDBConnection)
@ -459,7 +478,7 @@ def get_validator_set(connection, height: int = None):
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_election(connection, election_id: str) -> dict:
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS,election_id, index=TARANT_ID_SEARCH).data
_elections = connection.connect().select(TARANT_TABLE_ELECTIONS, election_id, index=TARANT_ID_SEARCH).data
if _elections is None or len(_elections) == 0:
return None
_election = sorted(_elections, key=itemgetter(0), reverse=True)[0]
@ -469,9 +488,9 @@ def get_election(connection, election_id: str) -> dict:
@register_query(TarantoolDBConnection)
@catch_db_exception
def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str) -> list[DbTransaction]:
id_transactions = connection.connect().select(TARANT_TABLE_GOVERNANCE,[asset_id]).data
id_transactions = connection.connect().select(TARANT_TABLE_GOVERNANCE, [asset_id]).data
asset_id_transactions = (
connection.connect().select(TARANT_TABLE_GOVERNANCE,[asset_id], index="governance_by_asset_id").data
connection.connect().select(TARANT_TABLE_GOVERNANCE, [asset_id], index="governance_by_asset_id").data
)
transactions = id_transactions + asset_id_transactions
@ -481,7 +500,8 @@ def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str)
@register_query(TarantoolDBConnection)
@catch_db_exception
def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True):
connection.connect().upsert(TARANT_TABLE_ABCI_CHAINS,
connection.connect().upsert(
TARANT_TABLE_ABCI_CHAINS,
(chain_id, height, is_synced),
op_list=[("=", 0, chain_id), ("=", 1, height), ("=", 2, is_synced)],
)
@ -490,8 +510,8 @@ def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = T
@register_query(TarantoolDBConnection)
@catch_db_exception
def delete_abci_chain(connection, height: int):
chains = connection.connect().select(TARANT_TABLE_ABCI_CHAINS,height, index="height")
connection.connect().delete(TARANT_TABLE_ABCI_CHAINS,chains[0][0], index="id")
chains = connection.connect().select(TARANT_TABLE_ABCI_CHAINS, height, index="height")
connection.connect().delete(TARANT_TABLE_ABCI_CHAINS, chains[0][0], index="id")
@register_query(TarantoolDBConnection)

View File

@ -291,7 +291,7 @@ def run_start(args):
validator = Validator()
validator.rollback()
del validator
logger.info("Starting Planetmint main process.")
from planetmint.start import start
@ -383,7 +383,7 @@ def create_parser():
action="store_true",
help="💀 EXPERIMENTAL: parallelize validation for better throughput 💀",
)
start_parser.add_argument(
"--web-api-only",
dest="web_api_only",

View File

@ -33,6 +33,8 @@ BANNER = """
* *
****************************************************************************
"""
def start_web_api(args):
app_server = server.create_server(
settings=Config().get()["server"], log_config=Config().get()["log"], planetmint_factory=Validator
@ -43,11 +45,11 @@ def start_web_api(args):
p_webapi = Process(name="planetmint_webapi", target=app_server.run, daemon=True)
p_webapi.start()
def start_abci_server(args):
logger.info(BANNER.format(__version__, Config().get()["server"]["bind"]))
exchange = Exchange()
# start websocket server
p_websocket_server = Process(
name="planetmint_ws",
@ -68,16 +70,16 @@ def start_abci_server(args):
setproctitle.setproctitle("planetmint")
abci_server_app = None
publisher_queue=exchange.get_publisher_queue()
publisher_queue = exchange.get_publisher_queue()
if args.experimental_parallel_validation:
abci_server_app = ParallelValidationApp(events_queue=publisher_queue)
else:
abci_server_app = ApplicationLogic(events_queue=publisher_queue)
app = ABCIServer( abci_server_app )
app = ABCIServer(abci_server_app)
app.run()
def start(args):
logger.info("Starting Planetmint")
@ -87,7 +89,7 @@ def start(args):
elif args.abci_only:
start_abci_server(args)
else:
start_web_api(args)
start_web_api(args)
start_abci_server(args)

View File

@ -54,7 +54,7 @@ def test_store_block(db_conn):
block = Block(app_hash="random_utxo", height=3, transactions=[])
query.store_block(connection=db_conn, block=block._asdict())
# block = query.get_block(connection=db_conn)
blocks = db_conn.connect().select("blocks",[]).data
blocks = db_conn.connect().select("blocks", []).data
assert len(blocks) == 1
@ -86,9 +86,9 @@ def test_store_pre_commit_state(db_conn):
def test_get_pre_commit_state(db_conn):
from planetmint.backend.tarantool.sync_io import query
all_pre = db_conn.connect().select("pre_commits",[]).data
all_pre = db_conn.connect().select("pre_commits", []).data
for pre in all_pre:
db_conn.connect().delete("pre_commits",pre[0])
db_conn.connect().delete("pre_commits", pre[0])
# TODO First IN, First OUT
state = dict(height=3, transactions=[])
# db_context.conn.db.pre_commit.insert_one

View File

@ -38,7 +38,7 @@ def test_middleware_does_notstrip_content_type_from_other_methods():
assert "CONTENT_TYPE" in mock.call_args[0][0]
def test_get_outputs_endpoint_with_content_type(client, user_pk,_bdb):
def test_get_outputs_endpoint_with_content_type(client, user_pk, _bdb):
res = client.get(
OUTPUTS_ENDPOINT + "?public_key={}".format(user_pk), headers=[("Content-Type", "application/json")]
)