mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
inital refactor
This commit is contained in:
parent
ff042b5954
commit
77317178ef
50
bigchaindb/pipelines/election.py
Normal file
50
bigchaindb/pipelines/election.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
import rethinkdb as r
|
||||||
|
from multipipes import Pipeline, Node
|
||||||
|
|
||||||
|
from bigchaindb.pipelines.utils import ChangeFeed
|
||||||
|
from bigchaindb import Bigchain
|
||||||
|
|
||||||
|
|
||||||
|
class Election:
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.bigchain = Bigchain()
|
||||||
|
|
||||||
|
def check_for_quorum(self, next_vote):
|
||||||
|
"""
|
||||||
|
Checks if block has enough invalid votes to make a decision
|
||||||
|
"""
|
||||||
|
next_block = r.table('bigchain')\
|
||||||
|
.get(next_vote['vote']['voting_for_block'])\
|
||||||
|
.run(self.bigchain.conn)
|
||||||
|
if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID:
|
||||||
|
return next_block
|
||||||
|
|
||||||
|
def requeue_transactions(self, invalid_block):
|
||||||
|
"""
|
||||||
|
Liquidates transactions from invalid blocks so they can be processed again
|
||||||
|
"""
|
||||||
|
for tx in invalid_block['block']['transactions']:
|
||||||
|
self.bigchain.write_transaction(tx)
|
||||||
|
|
||||||
|
|
||||||
|
def get_changefeed():
|
||||||
|
return ChangeFeed(table='votes', operation='insert')
|
||||||
|
|
||||||
|
|
||||||
|
def create_pipeline():
|
||||||
|
election = Election()
|
||||||
|
|
||||||
|
election_pipeline = Pipeline([
|
||||||
|
Node(election.check_for_quorum),
|
||||||
|
Node(election.requeue_transactions)
|
||||||
|
])
|
||||||
|
|
||||||
|
return election_pipeline
|
||||||
|
|
||||||
|
|
||||||
|
def start():
|
||||||
|
pipeline = create_pipeline()
|
||||||
|
pipeline.setup(indata=get_changefeed())
|
||||||
|
pipeline.start()
|
||||||
|
return pipeline
|
@ -4,9 +4,9 @@ import multiprocessing as mp
|
|||||||
import rethinkdb as r
|
import rethinkdb as r
|
||||||
|
|
||||||
import bigchaindb
|
import bigchaindb
|
||||||
from bigchaindb.pipelines import block
|
from bigchaindb.pipelines import block, election
|
||||||
from bigchaindb import Bigchain
|
from bigchaindb import Bigchain
|
||||||
from bigchaindb.voter import Voter, Election
|
from bigchaindb.voter import Voter
|
||||||
from bigchaindb.block import BlockDeleteRevert
|
from bigchaindb.block import BlockDeleteRevert
|
||||||
from bigchaindb.web import server
|
from bigchaindb.web import server
|
||||||
|
|
||||||
@ -70,19 +70,18 @@ class Processes(object):
|
|||||||
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
|
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
|
||||||
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
|
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
|
||||||
p_voter = Voter(self.q_new_block)
|
p_voter = Voter(self.q_new_block)
|
||||||
p_election = Election(self.q_block_new_vote)
|
|
||||||
# start the processes
|
# start the processes
|
||||||
logger.info('starting bigchain mapper')
|
logger.info('starting bigchain mapper')
|
||||||
p_map_bigchain.start()
|
p_map_bigchain.start()
|
||||||
logger.info('starting backlog mapper')
|
logger.info('starting backlog mapper')
|
||||||
logger.info('starting block')
|
logger.info('starting block')
|
||||||
block.start()
|
block.start()
|
||||||
|
election.start()
|
||||||
p_block_delete_revert.start()
|
p_block_delete_revert.start()
|
||||||
|
|
||||||
logger.info('starting voter')
|
logger.info('starting voter')
|
||||||
p_voter.start()
|
p_voter.start()
|
||||||
logger.info('starting election')
|
logger.info('starting election')
|
||||||
p_election.start()
|
|
||||||
|
|
||||||
# start message
|
# start message
|
||||||
p_voter.initialized.wait()
|
p_voter.initialized.wait()
|
||||||
|
@ -197,68 +197,3 @@ class Voter(object):
|
|||||||
p_validate.start()
|
p_validate.start()
|
||||||
p_vote.start()
|
p_vote.start()
|
||||||
p_update.start()
|
p_update.start()
|
||||||
|
|
||||||
|
|
||||||
class Election(object):
|
|
||||||
|
|
||||||
def __init__(self, q_block_new_vote):
|
|
||||||
"""
|
|
||||||
Initialize the class with the needed queues.
|
|
||||||
|
|
||||||
Initialize a queue where blocks with new votes will be held
|
|
||||||
"""
|
|
||||||
self.q_block_new_vote = q_block_new_vote
|
|
||||||
self.q_invalid_blocks = mp.Queue()
|
|
||||||
|
|
||||||
def check_for_quorum(self):
|
|
||||||
"""
|
|
||||||
Checks if block has enough invalid votes to make a decision
|
|
||||||
"""
|
|
||||||
b = Bigchain()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
next_block = self.q_block_new_vote.get()
|
|
||||||
|
|
||||||
# poison pill
|
|
||||||
if next_block == 'stop':
|
|
||||||
self.q_invalid_blocks.put('stop')
|
|
||||||
logger.info('clean exit')
|
|
||||||
return
|
|
||||||
|
|
||||||
if b.block_election_status(next_block) == 'invalid':
|
|
||||||
self.q_invalid_blocks.put(next_block)
|
|
||||||
|
|
||||||
def requeue_transactions(self):
|
|
||||||
"""
|
|
||||||
Liquidates transactions from invalid blocks so they can be processed again
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
invalid_block = self.q_invalid_blocks.get()
|
|
||||||
|
|
||||||
# poison pill
|
|
||||||
if invalid_block == 'stop':
|
|
||||||
logger.info('clean exit')
|
|
||||||
return
|
|
||||||
|
|
||||||
b = Bigchain()
|
|
||||||
for tx in invalid_block['block']['transactions']:
|
|
||||||
b.write_transaction(tx)
|
|
||||||
|
|
||||||
def kill(self):
|
|
||||||
"""
|
|
||||||
Terminate processes
|
|
||||||
"""
|
|
||||||
self.q_block_new_vote.put('stop')
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""
|
|
||||||
Initialize, spawn, and start the processes
|
|
||||||
"""
|
|
||||||
|
|
||||||
# initialize the processes
|
|
||||||
p_quorum_check = mp.Process(name='check_for_quorum', target=self.check_for_quorum)
|
|
||||||
p_requeue_tx = mp.Process(name='requeue_tx', target=self.requeue_transactions)
|
|
||||||
|
|
||||||
# start the processes
|
|
||||||
p_quorum_check.start()
|
|
||||||
p_requeue_tx.start()
|
|
||||||
|
17
tests/pipelines/test_election.py
Normal file
17
tests/pipelines/test_election.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import time
|
||||||
|
import random
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import rethinkdb as r
|
||||||
|
|
||||||
|
from bigchaindb.pipelines import election
|
||||||
|
from multipipes import Pipe, Pipeline
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_for_quorum(b, user_vk):
|
||||||
|
e = election.Election()
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_requeue_transaction(b, user_vk):
|
||||||
|
pass
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user