Change the block process to use all the cpus instead of only one cpu per

task
This commit is contained in:
Rodolphe Marques 2016-05-11 14:45:10 +02:00
parent 3989346044
commit 4c64b6642b
2 changed files with 22 additions and 23 deletions

View File

@ -7,6 +7,7 @@ import rethinkdb as r
import bigchaindb import bigchaindb
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.monitor import Monitor from bigchaindb.monitor import Monitor
from bigchaindb.util import ProcessGroup
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -180,7 +181,9 @@ class Block(object):
# add results to the queue # add results to the queue
for result in initial_results: for result in initial_results:
q_initial.put(result) q_initial.put(result)
q_initial.put('stop')
for i in range(mp.cpu_count()):
q_initial.put('stop')
return q_initial return q_initial
@ -203,17 +206,21 @@ class Block(object):
self._start() self._start()
logger.info('exiting block module...') logger.info('exiting block module...')
def kill(self):
for i in range(mp.cpu_count()):
self.q_new_transaction.put('stop')
def _start(self): def _start(self):
""" """
Initialize, spawn, and start the processes Initialize, spawn, and start the processes
""" """
# initialize the processes # initialize the processes
p_filter = mp.Process(name='filter_transactions', target=self.filter_by_assignee) p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee)
p_validate = mp.Process(name='validate_transactions', target=self.validate_transactions) p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions)
p_blocks = mp.Process(name='create_blocks', target=self.create_blocks) p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks)
p_write = mp.Process(name='write_blocks', target=self.write_blocks) p_write = ProcessGroup(name='write_blocks', target=self.write_blocks)
p_delete = mp.Process(name='delete_transactions', target=self.delete_transactions) p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions)
# start the processes # start the processes
p_filter.start() p_filter.start()
@ -222,9 +229,3 @@ class Block(object):
p_write.start() p_write.start()
p_delete.start() p_delete.start()
# join processes
p_filter.join()
p_validate.join()
p_blocks.join()
p_write.join()
p_delete.join()

View File

@ -714,8 +714,8 @@ class TestBigchainBlock(object):
# run bootstrap # run bootstrap
initial_results = block.bootstrap() initial_results = block.bootstrap()
# we should have gotten a queue with 100 results # we should have gotten a queue with 100 results minus the poison pills
assert initial_results.qsize() - 1 == 100 assert initial_results.qsize() - mp.cpu_count() == 100
def test_start(self, b, user_vk): def test_start(self, b, user_vk):
# start with 100 transactions in the backlog and 100 in the changefeed # start with 100 transactions in the backlog and 100 in the changefeed
@ -736,7 +736,9 @@ class TestBigchainBlock(object):
tx = b.sign_transaction(tx, b.me_private) tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx) b.write_transaction(tx)
new_transactions.put(tx) new_transactions.put(tx)
new_transactions.put('stop')
for i in range(mp.cpu_count()):
new_transactions.put('stop')
# create a block instance # create a block instance
block = Block(new_transactions) block = Block(new_transactions)
@ -744,6 +746,8 @@ class TestBigchainBlock(object):
# start the block processes # start the block processes
block.start() block.start()
time.sleep(6)
assert new_transactions.qsize() == 0 assert new_transactions.qsize() == 0
assert r.table('backlog').count() == 0 assert r.table('backlog').count() == 0
assert r.table('bigchain').count() == 2 assert r.table('bigchain').count() == 2
@ -755,20 +759,14 @@ class TestBigchainBlock(object):
# create block instance # create block instance
block = Block(new_transactions) block = Block(new_transactions)
# create block_process
p_block = mp.Process(target=block.start)
# start block process # start block process
p_block.start() block.start()
# wait for 6 seconds to give it time for an empty queue exception to occur # wait for 6 seconds to give it time for an empty queue exception to occur
time.sleep(6) time.sleep(6)
# send the poison pill
new_transactions.put('stop')
# join the process # join the process
p_block.join() block.kill()
def test_duplicated_transactions(self): def test_duplicated_transactions(self):
pytest.skip('We may have duplicates in the initial_results and changefeed') pytest.skip('We may have duplicates in the initial_results and changefeed')