From 4c64b6642b2d8fcb5b70bcb8886b38797644f548 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 11 May 2016 14:45:10 +0200 Subject: [PATCH] Change the block process to use all the cpus instead of only one cpu per task --- bigchaindb/block.py | 25 +++++++++++++------------ tests/db/test_bigchain_api.py | 20 +++++++++----------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/bigchaindb/block.py b/bigchaindb/block.py index 2c2e108e..651257f5 100644 --- a/bigchaindb/block.py +++ b/bigchaindb/block.py @@ -7,6 +7,7 @@ import rethinkdb as r import bigchaindb from bigchaindb import Bigchain from bigchaindb.monitor import Monitor +from bigchaindb.util import ProcessGroup logger = logging.getLogger(__name__) @@ -180,7 +181,9 @@ class Block(object): # add results to the queue for result in initial_results: q_initial.put(result) - q_initial.put('stop') + + for i in range(mp.cpu_count()): + q_initial.put('stop') return q_initial @@ -203,17 +206,21 @@ class Block(object): self._start() logger.info('exiting block module...') + def kill(self): + for i in range(mp.cpu_count()): + self.q_new_transaction.put('stop') + def _start(self): """ Initialize, spawn, and start the processes """ # initialize the processes - p_filter = mp.Process(name='filter_transactions', target=self.filter_by_assignee) - p_validate = mp.Process(name='validate_transactions', target=self.validate_transactions) - p_blocks = mp.Process(name='create_blocks', target=self.create_blocks) - p_write = mp.Process(name='write_blocks', target=self.write_blocks) - p_delete = mp.Process(name='delete_transactions', target=self.delete_transactions) + p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee) + p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions) + p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks) + p_write = ProcessGroup(name='write_blocks', target=self.write_blocks) + p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions) # start the processes p_filter.start() @@ -222,9 +229,3 @@ class Block(object): p_write.start() p_delete.start() - # join processes - p_filter.join() - p_validate.join() - p_blocks.join() - p_write.join() - p_delete.join() diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 0537e9ca..82281a8f 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -714,8 +714,8 @@ class TestBigchainBlock(object): # run bootstrap initial_results = block.bootstrap() - # we should have gotten a queue with 100 results - assert initial_results.qsize() - 1 == 100 + # we should have gotten a queue with 100 results minus the poison pills + assert initial_results.qsize() - mp.cpu_count() == 100 def test_start(self, b, user_vk): # start with 100 transactions in the backlog and 100 in the changefeed @@ -736,7 +736,9 @@ class TestBigchainBlock(object): tx = b.sign_transaction(tx, b.me_private) b.write_transaction(tx) new_transactions.put(tx) - new_transactions.put('stop') + + for i in range(mp.cpu_count()): + new_transactions.put('stop') # create a block instance block = Block(new_transactions) @@ -744,6 +746,8 @@ class TestBigchainBlock(object): # start the block processes block.start() + time.sleep(6) + assert new_transactions.qsize() == 0 assert r.table('backlog').count() == 0 assert r.table('bigchain').count() == 2 @@ -755,20 +759,14 @@ class TestBigchainBlock(object): # create block instance block = Block(new_transactions) - # create block_process - p_block = mp.Process(target=block.start) - # start block process - p_block.start() + block.start() # wait for 6 seconds to give it time for an empty queue exception to occur time.sleep(6) - # send the poison pill - new_transactions.put('stop') - # join the process - p_block.join() + block.kill() def test_duplicated_transactions(self): pytest.skip('We may have duplicates in the initial_results and changefeed')