Add block pipeline

This commit is contained in:
vrde 2016-07-21 14:16:40 +02:00
parent 9683d3f5a1
commit ecf67d1e28
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
10 changed files with 278 additions and 421 deletions

View File

@ -13,223 +13,6 @@ from bigchaindb.util import ProcessGroup
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Block(object):
def __init__(self, q_new_transaction):
"""
Initialize the class with the needed
"""
self._q_new_transaction = q_new_transaction
self.q_new_transaction = None
self.q_tx_to_validate = mp.Queue()
self.q_tx_validated = mp.Queue()
self.q_tx_delete = mp.Queue()
self.q_block = mp.Queue()
self.initialized = mp.Event()
self.monitor = Monitor()
def filter_by_assignee(self):
"""
Handle transactions that are assigned to me
"""
# create a bigchain instance
b = Bigchain()
while True:
tx = self.q_new_transaction.get()
# poison pill
if tx == 'stop':
self.q_tx_to_validate.put('stop')
return
if tx['assignee'] == b.me:
tx.pop('assignee')
self.q_tx_to_validate.put(tx)
def validate_transactions(self):
"""
Checks if the incoming transactions are valid
"""
# create a bigchain instance
b = Bigchain()
while True:
self.monitor.gauge('tx_queue_gauge',
self.q_tx_to_validate.qsize(),
rate=bigchaindb.config['statsd']['rate'])
tx = self.q_tx_to_validate.get()
# poison pill
if tx == 'stop':
self.q_tx_delete.put('stop')
self.q_tx_validated.put('stop')
return
self.q_tx_delete.put(tx['id'])
with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']):
is_valid_transaction = b.is_valid_transaction(tx)
if is_valid_transaction:
self.q_tx_validated.put(tx)
def create_blocks(self):
"""
Create a block with valid transactions
"""
# create a bigchain instance
b = Bigchain()
stop = False
while True:
# read up to 1000 transactions
validated_transactions = []
for i in range(1000):
try:
tx = self.q_tx_validated.get(timeout=5)
except queue.Empty:
break
# poison pill
if tx == 'stop':
stop = True
break
validated_transactions.append(tx)
# if there are no transactions skip block creation
if validated_transactions:
# create block
block = b.create_block(validated_transactions)
self.q_block.put(block)
if stop:
self.q_block.put('stop')
return
def write_blocks(self):
"""
Write blocks to the bigchain
"""
# create bigchain instance
b = Bigchain()
# Write blocks
while True:
block = self.q_block.get()
# poison pill
if block == 'stop':
return
with self.monitor.timer('write_block'):
b.write_block(block)
def delete_transactions(self):
"""
Delete transactions from the backlog
"""
# create bigchain instance
b = Bigchain()
stop = False
while True:
# try to delete in batch to reduce io
tx_to_delete = []
for i in range(1000):
try:
tx = self.q_tx_delete.get(timeout=5)
except queue.Empty:
break
# poison pill
if tx == 'stop':
stop = True
break
tx_to_delete.append(tx)
if tx_to_delete:
r.table('backlog').get_all(*tx_to_delete).delete(durability='soft').run(b.conn)
if stop:
return
def bootstrap(self):
"""
Get transactions from the backlog that may have been assigned to this while it was
online (not listening to the changefeed)
"""
# create bigchain instance
b = Bigchain()
# create a queue to store initial results
q_initial = mp.Queue()
# get initial results
initial_results = r.table('backlog')\
.between([b.me, r.minval], [b.me, r.maxval], index='assignee__transaction_timestamp')\
.order_by(index=r.asc('assignee__transaction_timestamp'))\
.run(b.conn)
# add results to the queue
for result in initial_results:
q_initial.put(result)
for i in range(mp.cpu_count()):
q_initial.put('stop')
return q_initial
def start(self):
"""
Bootstrap and start the processes
"""
logger.info('bootstraping block module...')
self.q_new_transaction = self.bootstrap()
logger.info('finished reading past transactions')
self._start()
logger.info('finished bootstraping block module...')
logger.info('starting block module...')
self.q_new_transaction = self._q_new_transaction
# signal initialization complete
self.initialized.set()
self._start()
logger.info('exiting block module...')
def kill(self):
for i in range(mp.cpu_count()):
self.q_new_transaction.put('stop')
def _start(self):
"""
Initialize, spawn, and start the processes
"""
# initialize the processes
p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee)
p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions)
p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks)
p_write = ProcessGroup(name='write_blocks', target=self.write_blocks)
p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions)
# start the processes
p_filter.start()
p_validate.start()
p_blocks.start()
p_write.start()
p_delete.start()
class BlockDeleteRevert(object): class BlockDeleteRevert(object):
def __init__(self, q_delete_to_revert): def __init__(self, q_delete_to_revert):

View File

View File

@ -0,0 +1,83 @@
import rethinkdb as r
from pipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
class Block:
def __init__(self):
self.bigchain = Bigchain()
self.txs = []
def filter_tx(self, tx):
if tx['assignee'] == self.bigchain.me:
tx.pop('assignee')
return tx
def delete_tx(self, tx):
r.table('backlog')\
.get(tx['id'])\
.delete(durability='hard')\
.run(self.bigchain.conn)
return tx
def validate_tx(self, tx):
tx = self.bigchain.is_valid_transaction(tx)
if tx:
return tx
def create(self, tx, timeout=False):
if tx:
self.txs.append(tx)
if self.txs and (len(self.txs) == 1000 or timeout):
block = self.bigchain.create_block(self.txs)
self.txs = []
return block
def write(self, block):
self.bigchain.write_block(block)
return block
def initial():
b = Bigchain()
rs = r.table('backlog')\
.between([b.me, r.minval],
[b.me, r.maxval],
index='assignee__transaction_timestamp')\
.order_by(index=r.asc('assignee__transaction_timestamp'))\
.run(b.conn)
return rs
def get_changefeed():
return ChangeFeed('backlog', 'insert', prefeed=initial())
def create_pipeline():
block = Block()
block_pipeline = Pipeline([
Node(block.filter_tx),
Node(block.delete_tx),
Node(block.validate_tx, fraction_of_cores=1),
Node(block.create, timeout=1),
Node(block.write),
])
return block_pipeline
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline
if __name__ == '__main__':
start()

View File

@ -0,0 +1,32 @@
import rethinkdb as r
from pipes import Node
from bigchaindb import Bigchain
class ChangeFeed(Node):
def __init__(self, table, operation, prefeed=None):
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
for change in r.table(self.table).changes().run(self.bigchain.conn):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and self.operation == 'insert':
self.outqueue.put(change['new_val'])
elif is_delete and self.operation == 'delete':
self.outqueue.put(change['old_val'])
elif is_update and self.operation == 'update':
self.outqueue.put(change)

View File

@ -4,9 +4,10 @@ import multiprocessing as mp
import rethinkdb as r import rethinkdb as r
import bigchaindb import bigchaindb
from bigchaindb.pipelines import block
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.voter import Voter, Election from bigchaindb.voter import Voter, Election
from bigchaindb.block import Block, BlockDeleteRevert from bigchaindb.block import BlockDeleteRevert
from bigchaindb.web import server from bigchaindb.web import server
@ -30,7 +31,6 @@ class Processes(object):
def __init__(self): def __init__(self):
# initialize the class # initialize the class
self.q_new_block = mp.Queue() self.q_new_block = mp.Queue()
self.q_new_transaction = mp.Queue()
self.q_block_new_vote = mp.Queue() self.q_block_new_vote = mp.Queue()
self.q_revert_delete = mp.Queue() self.q_revert_delete = mp.Queue()
@ -45,7 +45,7 @@ class Processes(object):
# insert # insert
if change['old_val'] is None: if change['old_val'] is None:
self.q_new_transaction.put(change['new_val']) pass
# delete # delete
if change['new_val'] is None: if change['new_val'] is None:
@ -80,8 +80,6 @@ class Processes(object):
def start(self): def start(self):
logger.info('Initializing BigchainDB...') logger.info('Initializing BigchainDB...')
# instantiate block and voter
block = Block(self.q_new_transaction)
delete_reverter = BlockDeleteRevert(self.q_revert_delete) delete_reverter = BlockDeleteRevert(self.q_revert_delete)
# start the web api # start the web api
@ -92,18 +90,16 @@ class Processes(object):
# initialize the processes # initialize the processes
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain) p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog) p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog)
p_block = mp.Process(name='block', target=block.start)
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start) p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
p_voter = Voter(self.q_new_block) p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote) p_election = Election(self.q_block_new_vote)
# start the processes # start the processes
logger.info('starting bigchain mapper') logger.info('starting bigchain mapper')
p_map_bigchain.start() p_map_bigchain.start()
logger.info('starting backlog mapper') logger.info('starting backlog mapper')
p_map_backlog.start() p_map_backlog.start()
logger.info('starting block') logger.info('starting block')
p_block.start() block.start()
p_block_delete_revert.start() p_block_delete_revert.start()
logger.info('starting voter') logger.info('starting voter')
@ -112,6 +108,5 @@ class Processes(object):
p_election.start() p_election.start()
# start message # start message
block.initialized.wait()
p_voter.initialized.wait() p_voter.initialized.wait()
logger.info(BANNER.format(bigchaindb.config['server']['bind'])) logger.info(BANNER.format(bigchaindb.config['server']['bind']))

View File

@ -10,7 +10,7 @@ import cryptoconditions as cc
import bigchaindb import bigchaindb
from bigchaindb import crypto, exceptions, util from bigchaindb import crypto, exceptions, util
from bigchaindb.voter import Voter from bigchaindb.voter import Voter
from bigchaindb.block import Block, BlockDeleteRevert from bigchaindb.block import BlockDeleteRevert
@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird ' @pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
@ -610,200 +610,6 @@ class TestBlockValidation(object):
class TestBigchainBlock(object): class TestBigchainBlock(object):
def test_by_assignee(self, b, user_vk):
# create transactions and randomly assigne them
transactions = mp.Queue()
count_assigned_to_me = 0
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
if assignee == b.me:
count_assigned_to_me += 1
tx['assignee'] = assignee
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_new_transaction = transactions
# filter the transactions
block.filter_by_assignee()
# check if the number of transactions assigned to the node is the same as the number in
# the queue minus 'stop'
assert block.q_tx_to_validate.qsize() - 1 == count_assigned_to_me
def test_validate_transactions(self, b, user_vk):
# create transactions and randomly invalidate some of them by changing the hash
transactions = mp.Queue()
count_valid = 0
for i in range(100):
valid = random.choice([True, False])
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
if not valid:
tx['id'] = 'a' * 64
else:
count_valid += 1
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_to_validate = transactions
# validate transactions
block.validate_transactions()
# check if the number of valid transactions
assert block.q_tx_validated.qsize() - 1 == count_valid
assert block.q_tx_delete.qsize() - 1 == 100
def test_create_block(self, b, user_vk):
# create transactions
transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_validated = transactions
# create blocks
block.create_blocks()
# check if the number of valid transactions
assert block.q_block.qsize() - 1 == 1
def test_write_block(self, b, user_vk):
# create transactions
transactions = []
blocks = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
transactions.append(tx)
# create block
block = b.create_block(transactions)
blocks.put(block)
blocks.put('stop')
# create a block instance
block = Block(transactions)
block.q_block = blocks
# make sure that we only have the genesis block in bigchain
r.table('bigchain').delete().run(b.conn)
b.create_genesis_block()
# write blocks
block.write_blocks()
# lets give it some time for the block to be written
time.sleep(1)
# check if the number of blocks in bigchain increased
assert r.table('bigchain').count() == 2
def test_delete_transactions(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
transactions.put(tx['id'])
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_delete = transactions
# make sure that there are transactions on the backlog
r.table('backlog').count().run(b.conn) == 100
# run the delete process
block.delete_transactions()
# give the db time
time.sleep(1)
# check if all transactions were deleted from the backlog
assert r.table('backlog').count() == 0
def test_bootstrap(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
# create a block instance
block = Block(None)
# run bootstrap
initial_results = block.bootstrap()
# we should have gotten a queue with 100 results minus the poison pills
assert initial_results.qsize() - mp.cpu_count() == 100
def test_start(self, b, user_vk):
# start with 100 transactions in the backlog and 100 in the changefeed
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
# create 100 more transactions to emulate the changefeed
new_transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
new_transactions.put(tx)
for i in range(mp.cpu_count()):
new_transactions.put('stop')
# create a block instance
block = Block(new_transactions)
# start the block processes
block.start()
time.sleep(6)
assert new_transactions.qsize() == 0
assert r.table('backlog').count() == 0
assert r.table('bigchain').count() == 2
def test_empty_queues(self, b):
# create empty queue
new_transactions = mp.Queue()
# create block instance
block = Block(new_transactions)
# start block process
block.start()
# wait for 6 seconds to give it time for an empty queue exception to occur
time.sleep(6)
# join the process
block.kill()
def test_revert_delete_block(self, b): def test_revert_delete_block(self, b):
b.create_genesis_block() b.create_genesis_block()

View File

View File

@ -0,0 +1,19 @@
import pytest
from ..db import conftest
@pytest.fixture(autouse=True)
def restore_config(request, node_config):
from bigchaindb import config_utils
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)

View File

@ -0,0 +1,124 @@
import time
import random
import rethinkdb as r
from bigchaindb.pipelines import block
from pipes import Pipe
def test_filter_by_assignee(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['assignee'] = b.me
# filter_tx has side effects on the `tx` instance by popping 'assignee'
assert block_maker.filter_tx(tx) == tx
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['assignee'] = 'nobody'
assert block_maker.filter_tx(tx) is None
def test_validate_transaction(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['id'] = 'a' * 64
assert block_maker.validate_tx(tx) is None
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
assert block_maker.validate_tx(tx) == tx
def test_create_block(b, user_vk):
block_maker = block.Block()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
block_maker.create(tx)
# force the output triggering a `timeout`
block_doc = block_maker.create(None, timeout=True)
assert len(block_doc['block']['transactions']) == 100
def test_write_block(b, user_vk):
block_maker = block.Block()
txs = []
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
txs.append(tx)
block_doc = b.create_block(txs)
block_maker.write(block_doc)
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
def test_delete_tx(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
assert r.table('backlog').get(tx['id']).run(b.conn) == tx
returned_tx = block_maker.delete_tx(tx)
assert returned_tx == tx
assert r.table('backlog').get(tx['id']).run(b.conn) is None
def test_prefeed(b, user_vk):
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
backlog = block.initial()
assert len(list(backlog)) == 100
def test_full_pipeline(b, user_vk):
outpipe = Pipe()
count_assigned_to_me = 0
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
tx['assignee'] = assignee
if assignee == b.me:
count_assigned_to_me += 1
r.table('backlog').insert(tx, durability='hard').run(b.conn)
assert r.table('backlog').count().run(b.conn) == 100
pipeline = block.create_pipeline()
pipeline.setup(indata=block.get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(2)
pipeline.terminate()
block_doc = outpipe.get()
assert len(block_doc['block']['transactions']) == count_assigned_to_me
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
assert r.table('backlog').count().run(b.conn) == 100 - count_assigned_to_me

View File

@ -0,0 +1,15 @@
import time
from pipes import Pipe
from bigchaindb.pipelines import utils
def test_changefeed(b, user_vk):
outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'insert', prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.start()
time.sleep(1)
changefeed.terminate()
assert outpipe.qsize() == 3