mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Problem: validation is slow (#2489)
Solution: parallelize the validation of transactions. This patch adds a new flag for the `start` command, namely `--experimental-parallel-validation`, that can be used to enable this experimental feature.
This commit is contained in:
parent
901b6b9d02
commit
241aae335d
@ -277,7 +277,7 @@ def run_start(args):
|
||||
|
||||
logger.info('Starting BigchainDB main process.')
|
||||
from bigchaindb.start import start
|
||||
start()
|
||||
start(args)
|
||||
|
||||
|
||||
def create_parser():
|
||||
@ -360,6 +360,12 @@ def create_parser():
|
||||
action='store_true',
|
||||
help='Skip database initialization')
|
||||
|
||||
start_parser.add_argument('--experimental-parallel-validation',
|
||||
dest='experimental_parallel_validation',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='💀 EXPERIMENTAL: parallelize validation for better throughput 💀')
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
|
127
bigchaindb/parallel_validation.py
Normal file
127
bigchaindb/parallel_validation.py
Normal file
@ -0,0 +1,127 @@
|
||||
# Copyright BigchainDB GmbH and BigchainDB contributors
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
import multiprocessing as mp
|
||||
from collections import defaultdict
|
||||
|
||||
from abci.types_pb2 import ResponseCheckTx, ResponseDeliverTx
|
||||
|
||||
from bigchaindb import BigchainDB, App
|
||||
from bigchaindb.tendermint_utils import decode_transaction
|
||||
|
||||
|
||||
CodeTypeOk = 0
|
||||
|
||||
|
||||
class ParallelValidationApp(App):
|
||||
def __init__(self, bigchaindb=None, events_queue=None):
|
||||
super().__init__(bigchaindb, events_queue)
|
||||
self.parallel_validator = ParallelValidator()
|
||||
self.parallel_validator.start()
|
||||
|
||||
def check_tx(self, raw_transaction):
|
||||
return ResponseCheckTx(code=CodeTypeOk)
|
||||
|
||||
def deliver_tx(self, raw_transaction):
|
||||
self.parallel_validator.validate(raw_transaction)
|
||||
return ResponseDeliverTx(code=CodeTypeOk)
|
||||
|
||||
def end_block(self, request_end_block):
|
||||
result = self.parallel_validator.result(timeout=30)
|
||||
for transaction in result:
|
||||
if transaction:
|
||||
self.block_txn_ids.append(transaction.id)
|
||||
self.block_transactions.append(transaction)
|
||||
|
||||
return super().end_block(request_end_block)
|
||||
|
||||
|
||||
RESET = 'reset'
|
||||
EXIT = 'exit'
|
||||
|
||||
|
||||
class ParallelValidator:
|
||||
def __init__(self, number_of_workers=mp.cpu_count()):
|
||||
self.number_of_workers = number_of_workers
|
||||
self.transaction_index = 0
|
||||
self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)]
|
||||
self.workers = []
|
||||
self.results_queue = mp.Queue()
|
||||
|
||||
def start(self):
|
||||
for routing_queue in self.routing_queues:
|
||||
worker = ValidationWorker(routing_queue, self.results_queue)
|
||||
process = mp.Process(target=worker.run)
|
||||
process.start()
|
||||
self.workers.append(process)
|
||||
|
||||
def stop(self):
|
||||
for routing_queue in self.routing_queues:
|
||||
routing_queue.put(EXIT)
|
||||
|
||||
def validate(self, raw_transaction):
|
||||
dict_transaction = decode_transaction(raw_transaction)
|
||||
index = int(dict_transaction['id'], 16) % self.number_of_workers
|
||||
self.routing_queues[index].put((self.transaction_index, dict_transaction))
|
||||
self.transaction_index += 1
|
||||
|
||||
def result(self, timeout=None):
|
||||
result_buffer = [None] * self.transaction_index
|
||||
for _ in range(self.transaction_index):
|
||||
index, transaction = self.results_queue.get(timeout=timeout)
|
||||
result_buffer[index] = transaction
|
||||
self.transaction_index = 0
|
||||
for routing_queue in self.routing_queues:
|
||||
routing_queue.put(RESET)
|
||||
return result_buffer
|
||||
|
||||
|
||||
class ValidationWorker:
|
||||
"""Run validation logic in a loop. This Worker is suitable for a Process
|
||||
life: no thrills, just a queue to get some values, and a queue to return results.
|
||||
|
||||
Note that a worker is expected to validate multiple transactions in
|
||||
multiple rounds, and it needs to keep in memory all transactions already
|
||||
validated, until a new round starts. To trigger a new round of validation,
|
||||
a ValidationWorker expects a `RESET` message. To exit the infinite loop the
|
||||
worker is in, it expects an `EXIT` message.
|
||||
"""
|
||||
|
||||
def __init__(self, in_queue, results_queue):
|
||||
self.in_queue = in_queue
|
||||
self.results_queue = results_queue
|
||||
self.bigchaindb = BigchainDB()
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
# We need a place to store already validated transactions,
|
||||
# in case of dependant transactions in the same block.
|
||||
# `validated_transactions` maps an `asset_id` with the list
|
||||
# of all other transactions sharing the same asset.
|
||||
self.validated_transactions = defaultdict(list)
|
||||
|
||||
def validate(self, dict_transaction):
|
||||
try:
|
||||
asset_id = dict_transaction['asset']['id']
|
||||
except KeyError:
|
||||
asset_id = dict_transaction['id']
|
||||
|
||||
transaction = self.bigchaindb.is_valid_transaction(
|
||||
dict_transaction,
|
||||
self.validated_transactions[asset_id])
|
||||
|
||||
if transaction:
|
||||
self.validated_transactions[asset_id].append(transaction)
|
||||
return transaction
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
message = self.in_queue.get()
|
||||
if message == RESET:
|
||||
self.reset()
|
||||
elif message == EXIT:
|
||||
return
|
||||
else:
|
||||
index, transaction = message
|
||||
self.results_queue.put((index, self.validate(transaction)))
|
@ -8,6 +8,7 @@ import setproctitle
|
||||
import bigchaindb
|
||||
from bigchaindb.lib import BigchainDB
|
||||
from bigchaindb.core import App
|
||||
from bigchaindb.parallel_validation import ParallelValidationApp
|
||||
from bigchaindb.web import server, websocket_server
|
||||
from bigchaindb.events import Exchange, EventTypes
|
||||
from bigchaindb.utils import Process
|
||||
@ -34,7 +35,7 @@ BANNER = """
|
||||
"""
|
||||
|
||||
|
||||
def start():
|
||||
def start(args):
|
||||
# Exchange object for event stream api
|
||||
logger.info('Starting BigchainDB')
|
||||
exchange = Exchange()
|
||||
@ -46,7 +47,6 @@ def start():
|
||||
p_webapi = Process(name='bigchaindb_webapi', target=app_server.run, daemon=True)
|
||||
p_webapi.start()
|
||||
|
||||
# start message
|
||||
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
|
||||
|
||||
# start websocket server
|
||||
@ -67,7 +67,10 @@ def start():
|
||||
setproctitle.setproctitle('bigchaindb')
|
||||
|
||||
# Start the ABCIServer
|
||||
app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue()))
|
||||
if args.experimental_parallel_validation:
|
||||
app = ABCIServer(app=ParallelValidationApp(events_queue=exchange.get_publisher_queue()))
|
||||
else:
|
||||
app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue()))
|
||||
app.run()
|
||||
|
||||
|
||||
|
133
tests/test_parallel_validation.py
Normal file
133
tests/test_parallel_validation.py
Normal file
@ -0,0 +1,133 @@
|
||||
# Copyright BigchainDB GmbH and BigchainDB contributors
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
import pytest
|
||||
|
||||
from bigchaindb.common.crypto import generate_key_pair
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
|
||||
pytestmark = pytest.mark.tendermint
|
||||
|
||||
|
||||
def generate_create_and_transfer(keypair=None):
|
||||
if not keypair:
|
||||
keypair = generate_key_pair()
|
||||
priv_key, pub_key = keypair
|
||||
create_tx = Transaction.create([pub_key], [([pub_key], 10)]).sign([priv_key])
|
||||
transfer_tx = Transaction.transfer(
|
||||
create_tx.to_inputs(),
|
||||
[([pub_key], 10)],
|
||||
asset_id=create_tx.id).sign([priv_key])
|
||||
return create_tx, transfer_tx
|
||||
|
||||
|
||||
def test_validation_worker_process_multiple_transactions(b):
|
||||
import multiprocessing as mp
|
||||
from bigchaindb.parallel_validation import ValidationWorker, RESET, EXIT
|
||||
|
||||
keypair = generate_key_pair()
|
||||
create_tx, transfer_tx = generate_create_and_transfer(keypair)
|
||||
double_spend = Transaction.transfer(
|
||||
create_tx.to_inputs(),
|
||||
[([keypair.public_key], 10)],
|
||||
asset_id=create_tx.id).sign([keypair.private_key])
|
||||
|
||||
in_queue, results_queue = mp.Queue(), mp.Queue()
|
||||
vw = ValidationWorker(in_queue, results_queue)
|
||||
|
||||
# Note: in the following instructions, the worker will encounter two
|
||||
# `RESET` messages, and an `EXIT` message. When a worker processes a
|
||||
# `RESET` message, it forgets all transactions it has validated. This allow
|
||||
# us to re-validate the same transactions. This won't happen in real life,
|
||||
# but it's quite handy to check if the worker actually forgot about the
|
||||
# past transactions (if not, it will return `False` because the
|
||||
# transactions look like a double spend).
|
||||
# `EXIT` makes the worker to stop the infinite loop.
|
||||
in_queue.put((0, create_tx.to_dict()))
|
||||
in_queue.put((10, transfer_tx.to_dict()))
|
||||
in_queue.put((20, double_spend.to_dict()))
|
||||
in_queue.put(RESET)
|
||||
in_queue.put((0, create_tx.to_dict()))
|
||||
in_queue.put((5, transfer_tx.to_dict()))
|
||||
in_queue.put(RESET)
|
||||
in_queue.put((20, create_tx.to_dict()))
|
||||
in_queue.put((25, double_spend.to_dict()))
|
||||
in_queue.put((30, transfer_tx.to_dict()))
|
||||
in_queue.put(EXIT)
|
||||
|
||||
vw.run()
|
||||
|
||||
assert results_queue.get() == (0, create_tx)
|
||||
assert results_queue.get() == (10, transfer_tx)
|
||||
assert results_queue.get() == (20, False)
|
||||
assert results_queue.get() == (0, create_tx)
|
||||
assert results_queue.get() == (5, transfer_tx)
|
||||
assert results_queue.get() == (20, create_tx)
|
||||
assert results_queue.get() == (25, double_spend)
|
||||
assert results_queue.get() == (30, False)
|
||||
|
||||
|
||||
def test_parallel_validator_routes_transactions_correctly(b, monkeypatch):
|
||||
import os
|
||||
from collections import defaultdict
|
||||
import multiprocessing as mp
|
||||
from json import dumps
|
||||
from bigchaindb.parallel_validation import ParallelValidator
|
||||
|
||||
# We want to make sure that the load is distributed across all workers.
|
||||
# Since introspection on an object running on a different process is
|
||||
# difficult, we create an additional queue where every worker can emit its
|
||||
# PID every time validation is called.
|
||||
validation_called_by = mp.Queue()
|
||||
|
||||
# Validate is now a passthrough, and every time it is called it will emit
|
||||
# the PID of its worker to the designated queue.
|
||||
def validate(self, dict_transaction):
|
||||
validation_called_by.put((os.getpid(), dict_transaction['id']))
|
||||
return dict_transaction
|
||||
|
||||
monkeypatch.setattr(
|
||||
'bigchaindb.parallel_validation.ValidationWorker.validate',
|
||||
validate)
|
||||
|
||||
# Transaction routing uses the `id` of the transaction. This test strips
|
||||
# down a transaction to just its `id`. We have two workers, so even ids
|
||||
# will be processed by one worker, odd ids by the other.
|
||||
transactions = [{'id': '0'}, {'id': '1'}, {'id': '2'}, {'id': '3'}]
|
||||
|
||||
pv = ParallelValidator(number_of_workers=2)
|
||||
pv.start()
|
||||
|
||||
# ParallelValidator is instantiated once, and then used several times.
|
||||
# Here we simulate this scenario by running it an arbitrary number of
|
||||
# times.
|
||||
# Note that the `ParallelValidator.result` call resets the object, and
|
||||
# makes it ready to validate a new set of transactions.
|
||||
for _ in range(2):
|
||||
# First, we push the transactions to the parallel validator instance
|
||||
for transaction in transactions:
|
||||
pv.validate(dumps(transaction).encode('utf8'))
|
||||
|
||||
assert pv.result(timeout=1) == transactions
|
||||
|
||||
# Now we analize the transaction processed by the workers
|
||||
worker_to_transactions = defaultdict(list)
|
||||
for _ in transactions:
|
||||
worker_pid, transaction_id = validation_called_by.get()
|
||||
worker_to_transactions[worker_pid].append(transaction_id)
|
||||
|
||||
# The transactions are stored in two buckets.
|
||||
for _, transaction_ids in worker_to_transactions.items():
|
||||
assert len(transaction_ids) == 2
|
||||
|
||||
# We have two workers, hence we have two different routes for
|
||||
# transactions. We have the route for even transactions, and the
|
||||
# route for odd transactions. Since we don't know which worker
|
||||
# processed what, we test that the transactions processed by a
|
||||
# worker are all even or all odd.
|
||||
assert (all(filter(lambda x: int(x) % 2 == 0, transaction_ids)) or
|
||||
all(filter(lambda x: int(x) % 2 == 1, transaction_ids)))
|
||||
|
||||
pv.stop()
|
Loading…
x
Reference in New Issue
Block a user