diff --git a/bigchaindb/commands/bigchaindb.py b/bigchaindb/commands/bigchaindb.py index 082b2753..f657b9e8 100644 --- a/bigchaindb/commands/bigchaindb.py +++ b/bigchaindb/commands/bigchaindb.py @@ -277,7 +277,7 @@ def run_start(args): logger.info('Starting BigchainDB main process.') from bigchaindb.start import start - start() + start(args) def create_parser(): @@ -360,6 +360,12 @@ def create_parser(): action='store_true', help='Skip database initialization') + start_parser.add_argument('--experimental-parallel-validation', + dest='experimental_parallel_validation', + default=False, + action='store_true', + help='💀 EXPERIMENTAL: parallelize validation for better throughput 💀') + return parser diff --git a/bigchaindb/parallel_validation.py b/bigchaindb/parallel_validation.py new file mode 100644 index 00000000..92a4d515 --- /dev/null +++ b/bigchaindb/parallel_validation.py @@ -0,0 +1,127 @@ +# Copyright BigchainDB GmbH and BigchainDB contributors +# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) +# Code is Apache-2.0 and docs are CC-BY-4.0 + +import multiprocessing as mp +from collections import defaultdict + +from abci.types_pb2 import ResponseCheckTx, ResponseDeliverTx + +from bigchaindb import BigchainDB, App +from bigchaindb.tendermint_utils import decode_transaction + + +CodeTypeOk = 0 + + +class ParallelValidationApp(App): + def __init__(self, bigchaindb=None, events_queue=None): + super().__init__(bigchaindb, events_queue) + self.parallel_validator = ParallelValidator() + self.parallel_validator.start() + + def check_tx(self, raw_transaction): + return ResponseCheckTx(code=CodeTypeOk) + + def deliver_tx(self, raw_transaction): + self.parallel_validator.validate(raw_transaction) + return ResponseDeliverTx(code=CodeTypeOk) + + def end_block(self, request_end_block): + result = self.parallel_validator.result(timeout=30) + for transaction in result: + if transaction: + self.block_txn_ids.append(transaction.id) + self.block_transactions.append(transaction) + + return super().end_block(request_end_block) + + +RESET = 'reset' +EXIT = 'exit' + + +class ParallelValidator: + def __init__(self, number_of_workers=mp.cpu_count()): + self.number_of_workers = number_of_workers + self.transaction_index = 0 + self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)] + self.workers = [] + self.results_queue = mp.Queue() + + def start(self): + for routing_queue in self.routing_queues: + worker = ValidationWorker(routing_queue, self.results_queue) + process = mp.Process(target=worker.run) + process.start() + self.workers.append(process) + + def stop(self): + for routing_queue in self.routing_queues: + routing_queue.put(EXIT) + + def validate(self, raw_transaction): + dict_transaction = decode_transaction(raw_transaction) + index = int(dict_transaction['id'], 16) % self.number_of_workers + self.routing_queues[index].put((self.transaction_index, dict_transaction)) + self.transaction_index += 1 + + def result(self, timeout=None): + result_buffer = [None] * self.transaction_index + for _ in range(self.transaction_index): + index, transaction = self.results_queue.get(timeout=timeout) + result_buffer[index] = transaction + self.transaction_index = 0 + for routing_queue in self.routing_queues: + routing_queue.put(RESET) + return result_buffer + + +class ValidationWorker: + """Run validation logic in a loop. This Worker is suitable for a Process + life: no thrills, just a queue to get some values, and a queue to return results. + + Note that a worker is expected to validate multiple transactions in + multiple rounds, and it needs to keep in memory all transactions already + validated, until a new round starts. To trigger a new round of validation, + a ValidationWorker expects a `RESET` message. To exit the infinite loop the + worker is in, it expects an `EXIT` message. + """ + + def __init__(self, in_queue, results_queue): + self.in_queue = in_queue + self.results_queue = results_queue + self.bigchaindb = BigchainDB() + self.reset() + + def reset(self): + # We need a place to store already validated transactions, + # in case of dependant transactions in the same block. + # `validated_transactions` maps an `asset_id` with the list + # of all other transactions sharing the same asset. + self.validated_transactions = defaultdict(list) + + def validate(self, dict_transaction): + try: + asset_id = dict_transaction['asset']['id'] + except KeyError: + asset_id = dict_transaction['id'] + + transaction = self.bigchaindb.is_valid_transaction( + dict_transaction, + self.validated_transactions[asset_id]) + + if transaction: + self.validated_transactions[asset_id].append(transaction) + return transaction + + def run(self): + while True: + message = self.in_queue.get() + if message == RESET: + self.reset() + elif message == EXIT: + return + else: + index, transaction = message + self.results_queue.put((index, self.validate(transaction))) diff --git a/bigchaindb/start.py b/bigchaindb/start.py index 41173be0..c23b5441 100644 --- a/bigchaindb/start.py +++ b/bigchaindb/start.py @@ -8,6 +8,7 @@ import setproctitle import bigchaindb from bigchaindb.lib import BigchainDB from bigchaindb.core import App +from bigchaindb.parallel_validation import ParallelValidationApp from bigchaindb.web import server, websocket_server from bigchaindb.events import Exchange, EventTypes from bigchaindb.utils import Process @@ -34,7 +35,7 @@ BANNER = """ """ -def start(): +def start(args): # Exchange object for event stream api logger.info('Starting BigchainDB') exchange = Exchange() @@ -46,7 +47,6 @@ def start(): p_webapi = Process(name='bigchaindb_webapi', target=app_server.run, daemon=True) p_webapi.start() - # start message logger.info(BANNER.format(bigchaindb.config['server']['bind'])) # start websocket server @@ -67,7 +67,10 @@ def start(): setproctitle.setproctitle('bigchaindb') # Start the ABCIServer - app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue())) + if args.experimental_parallel_validation: + app = ABCIServer(app=ParallelValidationApp(events_queue=exchange.get_publisher_queue())) + else: + app = ABCIServer(app=App(events_queue=exchange.get_publisher_queue())) app.run() diff --git a/tests/test_parallel_validation.py b/tests/test_parallel_validation.py new file mode 100644 index 00000000..60fdf0b6 --- /dev/null +++ b/tests/test_parallel_validation.py @@ -0,0 +1,133 @@ +# Copyright BigchainDB GmbH and BigchainDB contributors +# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) +# Code is Apache-2.0 and docs are CC-BY-4.0 + +import pytest + +from bigchaindb.common.crypto import generate_key_pair +from bigchaindb.models import Transaction + + +pytestmark = pytest.mark.tendermint + + +def generate_create_and_transfer(keypair=None): + if not keypair: + keypair = generate_key_pair() + priv_key, pub_key = keypair + create_tx = Transaction.create([pub_key], [([pub_key], 10)]).sign([priv_key]) + transfer_tx = Transaction.transfer( + create_tx.to_inputs(), + [([pub_key], 10)], + asset_id=create_tx.id).sign([priv_key]) + return create_tx, transfer_tx + + +def test_validation_worker_process_multiple_transactions(b): + import multiprocessing as mp + from bigchaindb.parallel_validation import ValidationWorker, RESET, EXIT + + keypair = generate_key_pair() + create_tx, transfer_tx = generate_create_and_transfer(keypair) + double_spend = Transaction.transfer( + create_tx.to_inputs(), + [([keypair.public_key], 10)], + asset_id=create_tx.id).sign([keypair.private_key]) + + in_queue, results_queue = mp.Queue(), mp.Queue() + vw = ValidationWorker(in_queue, results_queue) + + # Note: in the following instructions, the worker will encounter two + # `RESET` messages, and an `EXIT` message. When a worker processes a + # `RESET` message, it forgets all transactions it has validated. This allow + # us to re-validate the same transactions. This won't happen in real life, + # but it's quite handy to check if the worker actually forgot about the + # past transactions (if not, it will return `False` because the + # transactions look like a double spend). + # `EXIT` makes the worker to stop the infinite loop. + in_queue.put((0, create_tx.to_dict())) + in_queue.put((10, transfer_tx.to_dict())) + in_queue.put((20, double_spend.to_dict())) + in_queue.put(RESET) + in_queue.put((0, create_tx.to_dict())) + in_queue.put((5, transfer_tx.to_dict())) + in_queue.put(RESET) + in_queue.put((20, create_tx.to_dict())) + in_queue.put((25, double_spend.to_dict())) + in_queue.put((30, transfer_tx.to_dict())) + in_queue.put(EXIT) + + vw.run() + + assert results_queue.get() == (0, create_tx) + assert results_queue.get() == (10, transfer_tx) + assert results_queue.get() == (20, False) + assert results_queue.get() == (0, create_tx) + assert results_queue.get() == (5, transfer_tx) + assert results_queue.get() == (20, create_tx) + assert results_queue.get() == (25, double_spend) + assert results_queue.get() == (30, False) + + +def test_parallel_validator_routes_transactions_correctly(b, monkeypatch): + import os + from collections import defaultdict + import multiprocessing as mp + from json import dumps + from bigchaindb.parallel_validation import ParallelValidator + + # We want to make sure that the load is distributed across all workers. + # Since introspection on an object running on a different process is + # difficult, we create an additional queue where every worker can emit its + # PID every time validation is called. + validation_called_by = mp.Queue() + + # Validate is now a passthrough, and every time it is called it will emit + # the PID of its worker to the designated queue. + def validate(self, dict_transaction): + validation_called_by.put((os.getpid(), dict_transaction['id'])) + return dict_transaction + + monkeypatch.setattr( + 'bigchaindb.parallel_validation.ValidationWorker.validate', + validate) + + # Transaction routing uses the `id` of the transaction. This test strips + # down a transaction to just its `id`. We have two workers, so even ids + # will be processed by one worker, odd ids by the other. + transactions = [{'id': '0'}, {'id': '1'}, {'id': '2'}, {'id': '3'}] + + pv = ParallelValidator(number_of_workers=2) + pv.start() + + # ParallelValidator is instantiated once, and then used several times. + # Here we simulate this scenario by running it an arbitrary number of + # times. + # Note that the `ParallelValidator.result` call resets the object, and + # makes it ready to validate a new set of transactions. + for _ in range(2): + # First, we push the transactions to the parallel validator instance + for transaction in transactions: + pv.validate(dumps(transaction).encode('utf8')) + + assert pv.result(timeout=1) == transactions + + # Now we analize the transaction processed by the workers + worker_to_transactions = defaultdict(list) + for _ in transactions: + worker_pid, transaction_id = validation_called_by.get() + worker_to_transactions[worker_pid].append(transaction_id) + + # The transactions are stored in two buckets. + for _, transaction_ids in worker_to_transactions.items(): + assert len(transaction_ids) == 2 + + # We have two workers, hence we have two different routes for + # transactions. We have the route for even transactions, and the + # route for odd transactions. Since we don't know which worker + # processed what, we test that the transactions processed by a + # worker are all even or all odd. + assert (all(filter(lambda x: int(x) % 2 == 0, transaction_ids)) or + all(filter(lambda x: int(x) % 2 == 1, transaction_ids))) + + pv.stop()