mirror of
https://github.com/planetmint/planetmint.git
synced 2025-03-30 15:08:31 +00:00

* created ABCI_RPC class to seperate RPC interaction from the other ABCI interactions * renamed validation.py to validator.py * simplified planetmint/__init__.py * moved methods used by testing to tests/utils.py * making planetmint/__init__.py lean * moved ProcessGroup object to tests as it is only used there * reintegrated disabled tests Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
132 lines
5.5 KiB
Python
132 lines
5.5 KiB
Python
# Copyright © 2020 Interplanetary Database Association e.V.,
|
|
# Planetmint and IPDB software contributors.
|
|
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
|
# Code is Apache-2.0 and docs are CC-BY-4.0
|
|
|
|
import pytest
|
|
|
|
from transactions.common.crypto import generate_key_pair
|
|
from transactions.types.assets.create import Create
|
|
from transactions.types.assets.transfer import Transfer
|
|
|
|
pytestmark = pytest.mark.tendermint
|
|
|
|
|
|
def generate_create_and_transfer(keypair=None):
|
|
if not keypair:
|
|
keypair = generate_key_pair()
|
|
priv_key, pub_key = keypair
|
|
create_tx = Create.generate([pub_key], [([pub_key], 10)]).sign([priv_key])
|
|
transfer_tx = Transfer.generate(create_tx.to_inputs(), [([pub_key], 10)], asset_ids=[create_tx.id]).sign(
|
|
[priv_key]
|
|
)
|
|
return create_tx, transfer_tx
|
|
|
|
|
|
def test_validation_worker_process_multiple_transactions(b):
|
|
import multiprocessing as mp
|
|
from planetmint.abci.parallel_validation import ValidationWorker, RESET, EXIT
|
|
|
|
keypair = generate_key_pair()
|
|
create_tx, transfer_tx = generate_create_and_transfer(keypair)
|
|
double_spend = Transfer.generate(
|
|
create_tx.to_inputs(), [([keypair.public_key], 10)], asset_ids=[create_tx.id]
|
|
).sign([keypair.private_key])
|
|
|
|
in_queue, results_queue = mp.Queue(), mp.Queue()
|
|
vw = ValidationWorker(in_queue, results_queue)
|
|
|
|
# Note: in the following instructions, the worker will encounter two
|
|
# `RESET` messages, and an `EXIT` message. When a worker processes a
|
|
# `RESET` message, it forgets all transactions it has validated. This allow
|
|
# us to re-validate the same transactions. This won't happen in real life,
|
|
# but it's quite handy to check if the worker actually forgot about the
|
|
# past transactions (if not, it will return `False` because the
|
|
# transactions look like a double spend).
|
|
# `EXIT` makes the worker to stop the infinite loop.
|
|
in_queue.put((0, create_tx.to_dict()))
|
|
in_queue.put((10, transfer_tx.to_dict()))
|
|
in_queue.put((20, double_spend.to_dict()))
|
|
in_queue.put(RESET)
|
|
in_queue.put((0, create_tx.to_dict()))
|
|
in_queue.put((5, transfer_tx.to_dict()))
|
|
in_queue.put(RESET)
|
|
in_queue.put((20, create_tx.to_dict()))
|
|
in_queue.put((25, double_spend.to_dict()))
|
|
in_queue.put((30, transfer_tx.to_dict()))
|
|
in_queue.put(EXIT)
|
|
|
|
vw.run()
|
|
|
|
assert results_queue.get() == (0, create_tx)
|
|
assert results_queue.get() == (10, transfer_tx)
|
|
assert results_queue.get() == (20, False)
|
|
assert results_queue.get() == (0, create_tx)
|
|
assert results_queue.get() == (5, transfer_tx)
|
|
assert results_queue.get() == (20, create_tx)
|
|
assert results_queue.get() == (25, double_spend)
|
|
assert results_queue.get() == (30, False)
|
|
|
|
|
|
def test_parallel_validator_routes_transactions_correctly(b, monkeypatch):
|
|
import os
|
|
from collections import defaultdict
|
|
import multiprocessing as mp
|
|
from json import dumps
|
|
from planetmint.abci.parallel_validation import ParallelValidator
|
|
|
|
# We want to make sure that the load is distributed across all workers.
|
|
# Since introspection on an object running on a different process is
|
|
# difficult, we create an additional queue where every worker can emit its
|
|
# PID every time validation is called.
|
|
validation_called_by = mp.Queue()
|
|
|
|
# Validate is now a passthrough, and every time it is called it will emit
|
|
# the PID of its worker to the designated queue.
|
|
def validate(self, dict_transaction):
|
|
validation_called_by.put((os.getpid(), dict_transaction["id"]))
|
|
return dict_transaction
|
|
|
|
monkeypatch.setattr("planetmint.abci.parallel_validation.ValidationWorker.validate", validate)
|
|
|
|
# Transaction routing uses the `id` of the transaction. This test strips
|
|
# down a transaction to just its `id`. We have two workers, so even ids
|
|
# will be processed by one worker, odd ids by the other.
|
|
transactions = [{"id": "0"}, {"id": "1"}, {"id": "2"}, {"id": "3"}]
|
|
|
|
pv = ParallelValidator(number_of_workers=2)
|
|
pv.start()
|
|
|
|
# ParallelValidator is instantiated once, and then used several times.
|
|
# Here we simulate this scenario by running it an arbitrary number of
|
|
# times.
|
|
# Note that the `ParallelValidator.result` call resets the object, and
|
|
# makes it ready to validate a new set of transactions.
|
|
for _ in range(2):
|
|
# First, we push the transactions to the parallel validator instance
|
|
for transaction in transactions:
|
|
pv.validate(dumps(transaction).encode("utf8"))
|
|
|
|
assert pv.result(timeout=1) == transactions
|
|
|
|
# Now we analize the transaction processed by the workers
|
|
worker_to_transactions = defaultdict(list)
|
|
for _ in transactions:
|
|
worker_pid, transaction_id = validation_called_by.get()
|
|
worker_to_transactions[worker_pid].append(transaction_id)
|
|
|
|
# The transactions are stored in two buckets.
|
|
for _, transaction_ids in worker_to_transactions.items():
|
|
assert len(transaction_ids) == 2
|
|
|
|
# We have two workers, hence we have two different routes for
|
|
# transactions. We have the route for even transactions, and the
|
|
# route for odd transactions. Since we don't know which worker
|
|
# processed what, we test that the transactions processed by a
|
|
# worker are all even or all odd.
|
|
assert all(filter(lambda x: int(x) % 2 == 0, transaction_ids)) or all(
|
|
filter(lambda x: int(x) % 2 == 1, transaction_ids)
|
|
)
|
|
|
|
pv.stop()
|