Merge remote-tracking branch 'origin/refactor-multiprocessing-for-election'

This commit is contained in:
vrde 2016-08-04 14:09:54 +02:00
commit 011e840cf5
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
5 changed files with 196 additions and 121 deletions

View File

@ -0,0 +1,65 @@
"""This module takes care of all the logic related to block status.
Specifically, what happens when a block becomes invalid. The logic is
encapsulated in the ``Election`` class, while the sequence of actions
is specified in ``create_pipeline``.
"""
import logging
import rethinkdb as r
from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class Election:
def __init__(self):
self.bigchain = Bigchain()
def check_for_quorum(self, next_vote):
"""
Checks if block has enough invalid votes to make a decision
"""
next_block = r.table('bigchain')\
.get(next_vote['vote']['voting_for_block'])\
.run(self.bigchain.conn)
if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID:
return next_block
def requeue_transactions(self, invalid_block):
"""
Liquidates transactions from invalid blocks so they can be processed again
"""
logger.info('Rewriting %s transactions from invalid block %s',
len(invalid_block['block']['transactions']),
invalid_block['id'])
for tx in invalid_block['block']['transactions']:
self.bigchain.write_transaction(tx)
return invalid_block
def get_changefeed():
return ChangeFeed(table='votes', operation='insert')
def create_pipeline():
election = Election()
election_pipeline = Pipeline([
Node(election.check_for_quorum),
Node(election.requeue_transactions)
])
return election_pipeline
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

View File

@ -4,9 +4,9 @@ import multiprocessing as mp
import rethinkdb as r
import bigchaindb
from bigchaindb.pipelines import block
from bigchaindb.pipelines import block, election
from bigchaindb import Bigchain
from bigchaindb.voter import Voter, Election
from bigchaindb.voter import Voter
from bigchaindb.block import BlockDeleteRevert
from bigchaindb.web import server
@ -31,7 +31,6 @@ class Processes(object):
def __init__(self):
# initialize the class
self.q_new_block = mp.Queue()
self.q_block_new_vote = mp.Queue()
self.q_revert_delete = mp.Queue()
def map_bigchain(self):
@ -52,10 +51,6 @@ class Processes(object):
# this should never happen in regular operation
self.q_revert_delete.put(change['old_val'])
# update (new vote)
elif change['new_val'] is not None and change['old_val'] is not None:
self.q_block_new_vote.put(change['new_val'])
def start(self):
logger.info('Initializing BigchainDB...')
@ -70,19 +65,17 @@ class Processes(object):
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote)
# start the processes
logger.info('starting bigchain mapper')
p_map_bigchain.start()
logger.info('starting backlog mapper')
logger.info('starting block')
block.start()
p_block_delete_revert.start()
logger.info('starting voter')
p_voter.start()
election.start()
logger.info('starting election')
p_election.start()
# start message
p_voter.initialized.wait()

View File

@ -197,68 +197,3 @@ class Voter(object):
p_validate.start()
p_vote.start()
p_update.start()
class Election(object):
def __init__(self, q_block_new_vote):
"""
Initialize the class with the needed queues.
Initialize a queue where blocks with new votes will be held
"""
self.q_block_new_vote = q_block_new_vote
self.q_invalid_blocks = mp.Queue()
def check_for_quorum(self):
"""
Checks if block has enough invalid votes to make a decision
"""
b = Bigchain()
while True:
next_block = self.q_block_new_vote.get()
# poison pill
if next_block == 'stop':
self.q_invalid_blocks.put('stop')
logger.info('clean exit')
return
if b.block_election_status(next_block) == 'invalid':
self.q_invalid_blocks.put(next_block)
def requeue_transactions(self):
"""
Liquidates transactions from invalid blocks so they can be processed again
"""
while True:
invalid_block = self.q_invalid_blocks.get()
# poison pill
if invalid_block == 'stop':
logger.info('clean exit')
return
b = Bigchain()
for tx in invalid_block['block']['transactions']:
b.write_transaction(tx)
def kill(self):
"""
Terminate processes
"""
self.q_block_new_vote.put('stop')
def start(self):
"""
Initialize, spawn, and start the processes
"""
# initialize the processes
p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum)
p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions)
# start the processes
p_quorum_check.start()
p_requeue_tx.start()

View File

@ -5,7 +5,7 @@ import multiprocessing as mp
from bigchaindb import util
from bigchaindb.voter import Voter, Election, BlockStream
from bigchaindb.voter import Voter, BlockStream
from bigchaindb import crypto, Bigchain
@ -471,51 +471,6 @@ class TestBlockElection(object):
assert b.block_election_status(test_block) == Bigchain.BLOCK_INVALID
r.table('votes').delete().run(b.conn)
def test_tx_rewritten_after_invalid(self, b, user_vk):
q_block_new_vote = mp.Queue()
# create blocks with transactions
tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx2 = b.create_transaction(b.me, user_vk, None, 'CREATE')
test_block_1 = b.create_block([tx1])
test_block_2 = b.create_block([tx2])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# simulate a federation with four voters
test_block_1['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
test_block_2['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
# votes for block one
vote_1 = [member.vote(test_block_1['id'], 'abc', True)
for member in test_federation]
# votes for block two
vote_2 = [member.vote(test_block_2['id'], 'abc', True) for member in test_federation[:2]] + \
[member.vote(test_block_2['id'], 'abc', False) for member in test_federation[2:]]
# construct valid block
r.table('votes').insert(vote_1, durability='hard').run(b.conn)
q_block_new_vote.put(test_block_1)
# construct invalid block
r.table('votes').insert(vote_2, durability='hard').run(b.conn)
q_block_new_vote.put(test_block_2)
election = Election(q_block_new_vote)
election.start()
time.sleep(1)
election.kill()
# tx1 was in a valid block, and should not be in the backlog
assert r.table('backlog').get(tx1['id']).run(b.conn) is None
# tx2 was in an invalid block and SHOULD be in the backlog
assert r.table('backlog').get(tx2['id']).run(b.conn)['id'] == tx2['id']
class TestBlockStream(object):

View File

@ -0,0 +1,127 @@
import time
import random
from bigchaindb import crypto, Bigchain
from unittest.mock import patch
import rethinkdb as r
from bigchaindb.pipelines import election
from multipipes import Pipe, Pipeline
def test_check_for_quorum_invalid(b, user_vk):
e = election.Election()
# create blocks with transactions
tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE')
test_block = b.create_block([tx1])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# add voters to block and write
test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
b.write_block(test_block)
# split_vote (invalid)
votes = [member.vote(test_block['id'], 'abc', True) for member in test_federation[:2]] + \
[member.vote(test_block['id'], 'abc', False) for member in test_federation[2:]]
# cast votes
r.table('votes').insert(votes, durability='hard').run(b.conn)
# since this block is now invalid, should pass to the next process
assert e.check_for_quorum(votes[-1]) == test_block
def test_check_for_quorum_valid(b, user_vk):
e = election.Election()
# create blocks with transactions
tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE')
test_block = b.create_block([tx1])
# simulate a federation with four voters
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
for key_pair in key_pairs]
# add voters to block and write
test_block['block']['voters'] = [key_pair[1] for key_pair in key_pairs]
b.write_block(test_block)
# votes for block one
votes = [member.vote(test_block['id'], 'abc', True)
for member in test_federation]
# cast votes
r.table('votes').insert(votes, durability='hard').run(b.conn)
# since this block is valid, should go nowhere
assert e.check_for_quorum(votes[-1]) is None
def test_check_requeue_transaction(b, user_vk):
e = election.Election()
# create blocks with transactions
tx1 = b.create_transaction(b.me, user_vk, None, 'CREATE')
test_block = b.create_block([tx1])
e.requeue_transactions(test_block)
assert r.table('backlog').get(tx1['id']).run(b.conn) == tx1
@patch.object(Pipeline, 'start')
def test_start(mock_start):
# TODO: `block.election` is just a wrapper around `block.create_pipeline`,
# that is tested by `test_full_pipeline`.
# If anyone has better ideas on how to test this, please do a PR :)
election.start()
mock_start.assert_called_with()
def test_full_pipeline(b, user_vk):
outpipe = Pipe()
# write two blocks
txs = []
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
txs.append(tx)
valid_block = b.create_block(txs)
b.write_block(valid_block)
txs = []
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
txs.append(tx)
invalid_block = b.create_block(txs)
b.write_block(invalid_block)
pipeline = election.create_pipeline()
pipeline.setup(indata=election.get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(1)
# vote one block valid, one invalid
vote_valid = b.vote(valid_block['id'], 'abc', True)
vote_invalid = b.vote(invalid_block['id'], 'abc', False)
r.table('votes').insert(vote_valid, durability='hard').run(b.conn)
r.table('votes').insert(vote_invalid, durability='hard').run(b.conn)
outpipe.get()
pipeline.terminate()
# only transactions from the invalid block should be returned to
# the backlog
assert r.table('backlog').count().run(b.conn) == 100
tx_from_block = set([tx['id'] for tx in invalid_block['block']['transactions']])
tx_from_backlog = set([tx['id'] for tx in list(r.table('backlog').run(b.conn))])
assert tx_from_block == tx_from_backlog