diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index b89e0786..cb751d0f 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -10,10 +10,12 @@ from collections import Counter from multipipes import Pipeline, Node from bigchaindb.common import exceptions +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb.backend import connect, get_changefeed +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block -from bigchaindb.pipelines.utils import ChangeFeed -from bigchaindb import Bigchain class Vote: @@ -142,12 +144,6 @@ def initial(): return rs -def get_changefeed(): - """Create and return the changefeed for the bigchain table.""" - - return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial()) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -168,7 +164,10 @@ def create_pipeline(): def start(): """Create, start, and return the block pipeline.""" + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed()) + pipeline.setup(indata=changefeed) pipeline.start() return pipeline diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 0f471c0b..feac58bc 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -487,7 +487,9 @@ def test_invalid_block_voting(monkeypatch, b, user_pk): def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): - from bigchaindb.backend import query + import bigchaindb + from bigchaindb.backend import query, get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.pipelines import vote outpipe = Pipe() @@ -507,8 +509,11 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.setup(indata=changefeed, outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount @@ -535,7 +540,9 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): - from bigchaindb.backend import query + import bigchaindb + from bigchaindb.backend import query, connect, get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.pipelines import vote outpipe = Pipe() @@ -554,8 +561,11 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.setup(indata=changefeed, outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount