diff --git a/bigchaindb/block.py b/bigchaindb/block.py index 01fd0d56..2267ef0e 100644 --- a/bigchaindb/block.py +++ b/bigchaindb/block.py @@ -13,223 +13,6 @@ from bigchaindb.util import ProcessGroup logger = logging.getLogger(__name__) -class Block(object): - - def __init__(self, q_new_transaction): - """ - Initialize the class with the needed - """ - self._q_new_transaction = q_new_transaction - self.q_new_transaction = None - self.q_tx_to_validate = mp.Queue() - self.q_tx_validated = mp.Queue() - self.q_tx_delete = mp.Queue() - self.q_block = mp.Queue() - self.initialized = mp.Event() - self.monitor = Monitor() - - def filter_by_assignee(self): - """ - Handle transactions that are assigned to me - """ - - # create a bigchain instance - b = Bigchain() - - while True: - tx = self.q_new_transaction.get() - - # poison pill - if tx == 'stop': - self.q_tx_to_validate.put('stop') - return - - if tx['assignee'] == b.me: - tx.pop('assignee') - self.q_tx_to_validate.put(tx) - - def validate_transactions(self): - """ - Checks if the incoming transactions are valid - """ - - # create a bigchain instance - b = Bigchain() - - while True: - self.monitor.gauge('tx_queue_gauge', - self.q_tx_to_validate.qsize(), - rate=bigchaindb.config['statsd']['rate']) - tx = self.q_tx_to_validate.get() - - # poison pill - if tx == 'stop': - self.q_tx_delete.put('stop') - self.q_tx_validated.put('stop') - return - - self.q_tx_delete.put(tx['id']) - - with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']): - is_valid_transaction = b.is_valid_transaction(tx) - - if is_valid_transaction: - self.q_tx_validated.put(tx) - - def create_blocks(self): - """ - Create a block with valid transactions - """ - - # create a bigchain instance - b = Bigchain() - stop = False - - while True: - - # read up to 1000 transactions - validated_transactions = [] - for i in range(1000): - try: - tx = self.q_tx_validated.get(timeout=5) - except queue.Empty: - break - - # poison pill - if tx == 'stop': - stop = True - break - - validated_transactions.append(tx) - - # if there are no transactions skip block creation - if validated_transactions: - # create block - block = b.create_block(validated_transactions) - self.q_block.put(block) - - if stop: - self.q_block.put('stop') - return - - def write_blocks(self): - """ - Write blocks to the bigchain - """ - - # create bigchain instance - b = Bigchain() - - # Write blocks - while True: - block = self.q_block.get() - - # poison pill - if block == 'stop': - return - - with self.monitor.timer('write_block'): - b.write_block(block) - - def delete_transactions(self): - """ - Delete transactions from the backlog - """ - # create bigchain instance - b = Bigchain() - stop = False - - while True: - # try to delete in batch to reduce io - tx_to_delete = [] - for i in range(1000): - try: - tx = self.q_tx_delete.get(timeout=5) - except queue.Empty: - break - - # poison pill - if tx == 'stop': - stop = True - break - - tx_to_delete.append(tx) - - if tx_to_delete: - r.table('backlog').get_all(*tx_to_delete).delete(durability='soft').run(b.conn) - - if stop: - return - - def bootstrap(self): - """ - Get transactions from the backlog that may have been assigned to this while it was - online (not listening to the changefeed) - """ - # create bigchain instance - b = Bigchain() - - # create a queue to store initial results - q_initial = mp.Queue() - - # get initial results - initial_results = r.table('backlog')\ - .between([b.me, r.minval], [b.me, r.maxval], index='assignee__transaction_timestamp')\ - .order_by(index=r.asc('assignee__transaction_timestamp'))\ - .run(b.conn) - - # add results to the queue - for result in initial_results: - q_initial.put(result) - - for i in range(mp.cpu_count()): - q_initial.put('stop') - - return q_initial - - def start(self): - """ - Bootstrap and start the processes - """ - logger.info('bootstraping block module...') - self.q_new_transaction = self.bootstrap() - logger.info('finished reading past transactions') - self._start() - logger.info('finished bootstraping block module...') - - logger.info('starting block module...') - self.q_new_transaction = self._q_new_transaction - - # signal initialization complete - self.initialized.set() - - self._start() - logger.info('exiting block module...') - - def kill(self): - for i in range(mp.cpu_count()): - self.q_new_transaction.put('stop') - - def _start(self): - """ - Initialize, spawn, and start the processes - """ - - # initialize the processes - p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee) - p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions) - p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks) - p_write = ProcessGroup(name='write_blocks', target=self.write_blocks) - p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions) - - # start the processes - p_filter.start() - p_validate.start() - p_blocks.start() - p_write.start() - p_delete.start() - - class BlockDeleteRevert(object): def __init__(self, q_delete_to_revert): diff --git a/bigchaindb/pipelines/__init__.py b/bigchaindb/pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py new file mode 100644 index 00000000..07cf8d28 --- /dev/null +++ b/bigchaindb/pipelines/block.py @@ -0,0 +1,83 @@ +import rethinkdb as r +from pipes import Pipeline, Node + +from bigchaindb.pipelines.utils import ChangeFeed +from bigchaindb import Bigchain + + +class Block: + + def __init__(self): + self.bigchain = Bigchain() + self.txs = [] + + def filter_tx(self, tx): + if tx['assignee'] == self.bigchain.me: + tx.pop('assignee') + return tx + + def delete_tx(self, tx): + r.table('backlog')\ + .get(tx['id'])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return tx + + def validate_tx(self, tx): + tx = self.bigchain.is_valid_transaction(tx) + if tx: + return tx + + def create(self, tx, timeout=False): + if tx: + self.txs.append(tx) + if self.txs and (len(self.txs) == 1000 or timeout): + block = self.bigchain.create_block(self.txs) + self.txs = [] + return block + + def write(self, block): + self.bigchain.write_block(block) + return block + + +def initial(): + b = Bigchain() + + rs = r.table('backlog')\ + .between([b.me, r.minval], + [b.me, r.maxval], + index='assignee__transaction_timestamp')\ + .order_by(index=r.asc('assignee__transaction_timestamp'))\ + .run(b.conn) + return rs + + +def get_changefeed(): + return ChangeFeed('backlog', 'insert', prefeed=initial()) + + +def create_pipeline(): + block = Block() + + block_pipeline = Pipeline([ + Node(block.filter_tx), + Node(block.delete_tx), + Node(block.validate_tx, fraction_of_cores=1), + Node(block.create, timeout=1), + Node(block.write), + ]) + + return block_pipeline + + +def start(): + pipeline = create_pipeline() + pipeline.setup(indata=get_changefeed()) + pipeline.start() + return pipeline + + +if __name__ == '__main__': + start() diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py new file mode 100644 index 00000000..b9baf083 --- /dev/null +++ b/bigchaindb/pipelines/utils.py @@ -0,0 +1,32 @@ +import rethinkdb as r +from pipes import Node + +from bigchaindb import Bigchain + + +class ChangeFeed(Node): + + def __init__(self, table, operation, prefeed=None): + super().__init__(name='changefeed') + self.prefeed = prefeed if prefeed else [] + self.table = table + self.operation = operation + self.bigchain = Bigchain() + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + for change in r.table(self.table).changes().run(self.bigchain.conn): + + is_insert = change['old_val'] is None + is_delete = change['new_val'] is None + is_update = not is_insert and not is_delete + + if is_insert and self.operation == 'insert': + self.outqueue.put(change['new_val']) + elif is_delete and self.operation == 'delete': + self.outqueue.put(change['old_val']) + elif is_update and self.operation == 'update': + self.outqueue.put(change) + diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 2abd4879..029aeeb9 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -4,9 +4,10 @@ import multiprocessing as mp import rethinkdb as r import bigchaindb +from bigchaindb.pipelines import block from bigchaindb import Bigchain from bigchaindb.voter import Voter, Election -from bigchaindb.block import Block, BlockDeleteRevert +from bigchaindb.block import BlockDeleteRevert from bigchaindb.web import server @@ -30,7 +31,6 @@ class Processes(object): def __init__(self): # initialize the class self.q_new_block = mp.Queue() - self.q_new_transaction = mp.Queue() self.q_block_new_vote = mp.Queue() self.q_revert_delete = mp.Queue() @@ -45,7 +45,7 @@ class Processes(object): # insert if change['old_val'] is None: - self.q_new_transaction.put(change['new_val']) + pass # delete if change['new_val'] is None: @@ -80,8 +80,6 @@ class Processes(object): def start(self): logger.info('Initializing BigchainDB...') - # instantiate block and voter - block = Block(self.q_new_transaction) delete_reverter = BlockDeleteRevert(self.q_revert_delete) # start the web api @@ -92,18 +90,16 @@ class Processes(object): # initialize the processes p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain) p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) - p_block = mp.Process(name='block', target=block.start) p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start) p_voter = Voter(self.q_new_block) p_election = Election(self.q_block_new_vote) - # start the processes logger.info('starting bigchain mapper') p_map_bigchain.start() logger.info('starting backlog mapper') p_map_backlog.start() logger.info('starting block') - p_block.start() + block.start() p_block_delete_revert.start() logger.info('starting voter') @@ -112,6 +108,5 @@ class Processes(object): p_election.start() # start message - block.initialized.wait() p_voter.initialized.wait() logger.info(BANNER.format(bigchaindb.config['server']['bind'])) diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index a636315b..7082e52f 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -10,7 +10,7 @@ import cryptoconditions as cc import bigchaindb from bigchaindb import crypto, exceptions, util from bigchaindb.voter import Voter -from bigchaindb.block import Block, BlockDeleteRevert +from bigchaindb.block import BlockDeleteRevert @pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird ' @@ -610,200 +610,6 @@ class TestBlockValidation(object): class TestBigchainBlock(object): - def test_by_assignee(self, b, user_vk): - # create transactions and randomly assigne them - transactions = mp.Queue() - count_assigned_to_me = 0 - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) - if assignee == b.me: - count_assigned_to_me += 1 - - tx['assignee'] = assignee - transactions.put(tx) - transactions.put('stop') - - # create a block instance - block = Block(transactions) - block.q_new_transaction = transactions - # filter the transactions - block.filter_by_assignee() - - # check if the number of transactions assigned to the node is the same as the number in - # the queue minus 'stop' - assert block.q_tx_to_validate.qsize() - 1 == count_assigned_to_me - - def test_validate_transactions(self, b, user_vk): - # create transactions and randomly invalidate some of them by changing the hash - transactions = mp.Queue() - count_valid = 0 - for i in range(100): - valid = random.choice([True, False]) - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - if not valid: - tx['id'] = 'a' * 64 - else: - count_valid += 1 - transactions.put(tx) - transactions.put('stop') - - # create a block instance - block = Block(transactions) - block.q_tx_to_validate = transactions - # validate transactions - block.validate_transactions() - - # check if the number of valid transactions - assert block.q_tx_validated.qsize() - 1 == count_valid - assert block.q_tx_delete.qsize() - 1 == 100 - - def test_create_block(self, b, user_vk): - # create transactions - transactions = mp.Queue() - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - transactions.put(tx) - transactions.put('stop') - - # create a block instance - block = Block(transactions) - block.q_tx_validated = transactions - # create blocks - block.create_blocks() - - # check if the number of valid transactions - assert block.q_block.qsize() - 1 == 1 - - def test_write_block(self, b, user_vk): - # create transactions - transactions = [] - blocks = mp.Queue() - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - transactions.append(tx) - - # create block - block = b.create_block(transactions) - blocks.put(block) - blocks.put('stop') - - # create a block instance - block = Block(transactions) - block.q_block = blocks - - # make sure that we only have the genesis block in bigchain - r.table('bigchain').delete().run(b.conn) - b.create_genesis_block() - - # write blocks - block.write_blocks() - # lets give it some time for the block to be written - time.sleep(1) - - # check if the number of blocks in bigchain increased - assert r.table('bigchain').count() == 2 - - def test_delete_transactions(self, b, user_vk): - # make sure that there are no transactions in the backlog - r.table('backlog').delete().run(b.conn) - - # create and write transactions to the backlog - transactions = mp.Queue() - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) - transactions.put(tx['id']) - transactions.put('stop') - - # create a block instance - block = Block(transactions) - block.q_tx_delete = transactions - - # make sure that there are transactions on the backlog - r.table('backlog').count().run(b.conn) == 100 - - # run the delete process - block.delete_transactions() - # give the db time - time.sleep(1) - - # check if all transactions were deleted from the backlog - assert r.table('backlog').count() == 0 - - def test_bootstrap(self, b, user_vk): - # make sure that there are no transactions in the backlog - r.table('backlog').delete().run(b.conn) - - # create and write transactions to the backlog - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) - - # create a block instance - block = Block(None) - - # run bootstrap - initial_results = block.bootstrap() - - # we should have gotten a queue with 100 results minus the poison pills - assert initial_results.qsize() - mp.cpu_count() == 100 - - def test_start(self, b, user_vk): - # start with 100 transactions in the backlog and 100 in the changefeed - - # make sure that there are no transactions in the backlog - r.table('backlog').delete().run(b.conn) - - # create and write transactions to the backlog - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) - - # create 100 more transactions to emulate the changefeed - new_transactions = mp.Queue() - for i in range(100): - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) - new_transactions.put(tx) - - for i in range(mp.cpu_count()): - new_transactions.put('stop') - - # create a block instance - block = Block(new_transactions) - - # start the block processes - block.start() - - time.sleep(6) - - assert new_transactions.qsize() == 0 - assert r.table('backlog').count() == 0 - assert r.table('bigchain').count() == 2 - - def test_empty_queues(self, b): - # create empty queue - new_transactions = mp.Queue() - - # create block instance - block = Block(new_transactions) - - # start block process - block.start() - - # wait for 6 seconds to give it time for an empty queue exception to occur - time.sleep(6) - - # join the process - block.kill() def test_revert_delete_block(self, b): b.create_genesis_block() diff --git a/tests/pipelines/__init__.py b/tests/pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/pipelines/conftest.py b/tests/pipelines/conftest.py new file mode 100644 index 00000000..1a3a77e2 --- /dev/null +++ b/tests/pipelines/conftest.py @@ -0,0 +1,19 @@ +import pytest +from ..db import conftest + + +@pytest.fixture(autouse=True) +def restore_config(request, node_config): + from bigchaindb import config_utils + config_utils.set_config(node_config) + + +@pytest.fixture(scope='module', autouse=True) +def setup_database(request, node_config): + conftest.setup_database(request, node_config) + + +@pytest.fixture(scope='function', autouse=True) +def cleanup_tables(request, node_config): + conftest.cleanup_tables(request, node_config) + diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py new file mode 100644 index 00000000..5c6f5848 --- /dev/null +++ b/tests/pipelines/test_block_creation.py @@ -0,0 +1,124 @@ +import time +import random + +import rethinkdb as r + +from bigchaindb.pipelines import block +from pipes import Pipe + + +def test_filter_by_assignee(b, user_vk): + block_maker = block.Block() + + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx['assignee'] = b.me + + # filter_tx has side effects on the `tx` instance by popping 'assignee' + assert block_maker.filter_tx(tx) == tx + + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx['assignee'] = 'nobody' + + assert block_maker.filter_tx(tx) is None + + +def test_validate_transaction(b, user_vk): + block_maker = block.Block() + + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx['id'] = 'a' * 64 + + assert block_maker.validate_tx(tx) is None + + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + assert block_maker.validate_tx(tx) == tx + + +def test_create_block(b, user_vk): + block_maker = block.Block() + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) + + assert len(block_doc['block']['transactions']) == 100 + + +def test_write_block(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + +def test_delete_tx(b, user_vk): + block_maker = block.Block() + + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx) + + assert r.table('backlog').get(tx['id']).run(b.conn) == tx + + returned_tx = block_maker.delete_tx(tx) + + assert returned_tx == tx + assert r.table('backlog').get(tx['id']).run(b.conn) is None + + +def test_prefeed(b, user_vk): + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx) + + backlog = block.initial() + + assert len(list(backlog)) == 100 + + +def test_full_pipeline(b, user_vk): + outpipe = Pipe() + + count_assigned_to_me = 0 + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) + tx['assignee'] = assignee + if assignee == b.me: + count_assigned_to_me += 1 + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = block.create_pipeline() + pipeline.setup(indata=block.get_changefeed(), outdata=outpipe) + + pipeline.start() + time.sleep(2) + pipeline.terminate() + + block_doc = outpipe.get() + + assert len(block_doc['block']['transactions']) == count_assigned_to_me + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + assert r.table('backlog').count().run(b.conn) == 100 - count_assigned_to_me + diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py new file mode 100644 index 00000000..97443698 --- /dev/null +++ b/tests/pipelines/test_utils.py @@ -0,0 +1,15 @@ +import time + +from pipes import Pipe +from bigchaindb.pipelines import utils + + +def test_changefeed(b, user_vk): + outpipe = Pipe() + changefeed = utils.ChangeFeed('backlog', 'insert', prefeed=[1, 2, 3]) + changefeed.outqueue = outpipe + changefeed.start() + time.sleep(1) + changefeed.terminate() + + assert outpipe.qsize() == 3