Add tests to ProcessGroup

This commit is contained in:
vrde 2016-04-20 18:18:01 +02:00
parent dcd09264d9
commit ed1e71bfec
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D

View File

@ -1,3 +1,4 @@
from unittest.mock import patch, call
import pytest
import queue
@ -121,3 +122,24 @@ def test_pool_raises_empty_exception_when_timeout(mock_queue):
with pool() as instance:
assert instance == 'hello'
@patch('multiprocessing.Process')
def test_process_group_instantiates_and_start_processes(mock_process):
from bigchaindb.util import ProcessGroup
def noop():
pass
concurrency = 10
pg = ProcessGroup(concurrency=concurrency, group='test_group', target=noop)
pg.start()
mock_process.assert_has_calls([call(group='test_group', target=noop,
name=None, args=(), kwargs={},
daemon=None)
for i in range(concurrency)], any_order=True)
for process in pg.processes:
process.start.assert_called()