Updated vote pipeline to use the changefeed abstraction

Fixed tests.
This commit is contained in:
Rodolphe Marques 2016-12-09 14:02:11 +01:00
parent f09285d32f
commit 134f9e85a0
2 changed files with 22 additions and 13 deletions

View File

@ -10,10 +10,12 @@ from collections import Counter
from multipipes import Pipeline, Node from multipipes import Pipeline, Node
from bigchaindb.common import exceptions from bigchaindb.common import exceptions
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.backend import connect, get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.consensus import BaseConsensusRules from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Transaction, Block from bigchaindb.models import Transaction, Block
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
class Vote: class Vote:
@ -142,12 +144,6 @@ def initial():
return rs return rs
def get_changefeed():
"""Create and return the changefeed for the bigchain table."""
return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial())
def create_pipeline(): def create_pipeline():
"""Create and return the pipeline of operations to be distributed """Create and return the pipeline of operations to be distributed
on different processes.""" on different processes."""
@ -168,7 +164,10 @@ def create_pipeline():
def start(): def start():
"""Create, start, and return the block pipeline.""" """Create, start, and return the block pipeline."""
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=initial())
pipeline = create_pipeline() pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed()) pipeline.setup(indata=changefeed)
pipeline.start() pipeline.start()
return pipeline return pipeline

View File

@ -487,7 +487,9 @@ def test_invalid_block_voting(monkeypatch, b, user_pk):
def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
from bigchaindb.backend import query import bigchaindb
from bigchaindb.backend import query, get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.pipelines import vote from bigchaindb.pipelines import vote
outpipe = Pipe() outpipe = Pipe()
@ -507,8 +509,11 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
block_ids.append(block_2.id) block_ids.append(block_2.id)
b.write_block(block_2) b.write_block(block_2)
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=vote.initial())
vote_pipeline = vote.create_pipeline() vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) vote_pipeline.setup(indata=changefeed, outdata=outpipe)
vote_pipeline.start() vote_pipeline.start()
# We expects two votes, so instead of waiting an arbitrary amount # We expects two votes, so instead of waiting an arbitrary amount
@ -535,7 +540,9 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b):
def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
from bigchaindb.backend import query import bigchaindb
from bigchaindb.backend import query, connect, get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.pipelines import vote from bigchaindb.pipelines import vote
outpipe = Pipe() outpipe = Pipe()
@ -554,8 +561,11 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
block_ids.append(block_2.id) block_ids.append(block_2.id)
b.write_block(block_2) b.write_block(block_2)
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=vote.initial())
vote_pipeline = vote.create_pipeline() vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) vote_pipeline.setup(indata=changefeed, outdata=outpipe)
vote_pipeline.start() vote_pipeline.start()
# We expects two votes, so instead of waiting an arbitrary amount # We expects two votes, so instead of waiting an arbitrary amount