planetmint/planetmint/parallel_validation.py
Lorenz Herzberger e401995637
updated asset migration (#276)
* started replacing asset with assets

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* switched asset_id for asset_ids

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* __init__ transfer now uses multiple assets

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added VS Code debugging section to tests/README.md

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed typo

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* minor changes to get_transaction

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* Added ignore for .vscode

* convert cursor to list

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed get_assets

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* Resolved the error of validation for assets

* added additional check to get_transaction

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* adjusted backend queries for multiple assets

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* adjusted common tests for multiple assets

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed db test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed election test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed almost all tendermint tests

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some command test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed validation test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added comment to get_transaction_filtered

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some core test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* Updated the unspent output function of Transaction according to new assets model

* fixed txlist tests

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added comments to backend get_asset function, removed duplicate

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed further test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some more tests

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added TODOs and changed test_client for aiohttp_client

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* Solved assert comparision issue and handled key error with fallback.

* Converted asset to assets in websocket

* Resolved store transaction tests

* added check to fix vote test cases, requires future change

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed test case

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed test case, added TODO on transaction.py for COMPOSE DECOMPOSE

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed last test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added planetmint-driver branch reference to dockerfiles for testing

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* Updated asset to assets on the acceptance/integration test suite

* changed get transaction list api point to use comma separated txids

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed acceptance test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* change cryptoconditions to asset-migration branch

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* adjusted doc strings

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed integration tests, removed unused code and adjusted some docstrings

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* adjusted some test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed some more test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed tendermint test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed web test cases

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* bumped up planetmint-transactions version number

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* resolved linting issues

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* fixed abci test

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* updated changelog and version umber

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>
Co-authored-by: ArpitShukla007 <arpitnshukla@gmail.com>
2022-10-28 11:45:27 +02:00

131 lines
4.7 KiB
Python

# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import multiprocessing as mp
from collections import defaultdict
from planetmint import App
from planetmint.lib import Planetmint
from planetmint.tendermint_utils import decode_transaction
from abci.application import OkCode
from tendermint.abci.types_pb2 import (
ResponseCheckTx,
ResponseDeliverTx,
)
class ParallelValidationApp(App):
def __init__(self, planetmint=None, events_queue=None):
super().__init__(planetmint, events_queue)
self.parallel_validator = ParallelValidator()
self.parallel_validator.start()
def check_tx(self, raw_transaction):
return ResponseCheckTx(code=OkCode)
def deliver_tx(self, raw_transaction):
self.parallel_validator.validate(raw_transaction)
return ResponseDeliverTx(code=OkCode)
def end_block(self, request_end_block):
result = self.parallel_validator.result(timeout=30)
for transaction in result:
if transaction:
self.block_txn_ids.append(transaction.id)
self.block_transactions.append(transaction)
return super().end_block(request_end_block)
RESET = "reset"
EXIT = "exit"
class ParallelValidator:
def __init__(self, number_of_workers=mp.cpu_count()):
self.number_of_workers = number_of_workers
self.transaction_index = 0
self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)]
self.workers = []
self.results_queue = mp.Queue()
def start(self):
for routing_queue in self.routing_queues:
worker = ValidationWorker(routing_queue, self.results_queue)
process = mp.Process(target=worker.run)
process.start()
self.workers.append(process)
def stop(self):
for routing_queue in self.routing_queues:
routing_queue.put(EXIT)
def validate(self, raw_transaction):
dict_transaction = decode_transaction(raw_transaction)
index = int(dict_transaction["id"], 16) % self.number_of_workers
self.routing_queues[index].put((self.transaction_index, dict_transaction))
self.transaction_index += 1
def result(self, timeout=None):
result_buffer = [None] * self.transaction_index
for _ in range(self.transaction_index):
index, transaction = self.results_queue.get(timeout=timeout)
result_buffer[index] = transaction
self.transaction_index = 0
for routing_queue in self.routing_queues:
routing_queue.put(RESET)
return result_buffer
class ValidationWorker:
"""Run validation logic in a loop. This Worker is suitable for a Process
life: no thrills, just a queue to get some values, and a queue to return results.
Note that a worker is expected to validate multiple transactions in
multiple rounds, and it needs to keep in memory all transactions already
validated, until a new round starts. To trigger a new round of validation,
a ValidationWorker expects a `RESET` message. To exit the infinite loop the
worker is in, it expects an `EXIT` message.
"""
def __init__(self, in_queue, results_queue):
self.in_queue = in_queue
self.results_queue = results_queue
self.planetmint = Planetmint()
self.reset()
def reset(self):
# We need a place to store already validated transactions,
# in case of dependant transactions in the same block.
# `validated_transactions` maps an `asset_id` with the list
# of all other transactions sharing the same asset.
self.validated_transactions = defaultdict(list)
def validate(self, dict_transaction):
# TODO: this will only work for now, no multiasset support => needs to be refactored for COMPOSE/DECOMPOSE
try:
asset_id = dict_transaction["assets"][0]["id"]
except KeyError:
asset_id = dict_transaction["id"]
except TypeError:
asset_id = dict_transaction["id"]
transaction = self.planetmint.is_valid_transaction(dict_transaction, self.validated_transactions[asset_id])
if transaction:
self.validated_transactions[asset_id].append(transaction)
return transaction
def run(self):
while True:
message = self.in_queue.get()
if message == RESET:
self.reset()
elif message == EXIT:
return
else:
index, transaction = message
self.results_queue.put((index, self.validate(transaction)))