Fixed some code typos

Changed election pipeline to use the new changefeed abstraction
This commit is contained in:
Rodolphe Marques 2016-12-09 11:29:44 +01:00
parent 8266dfadb0
commit f09285d32f
5 changed files with 19 additions and 14 deletions

View File

@ -28,8 +28,7 @@ class RethinkDBChangeFeed(ChangeFeed):
time.sleep(1) time.sleep(1)
def run_changefeed(self): def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table) for change in self.connection.run(r.table(self.table).changes()):
.changes()):
is_insert = change['old_val'] is None is_insert = change['old_val'] is None
is_delete = change['new_val'] is None is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete is_update = not is_insert and not is_delete

View File

@ -9,9 +9,8 @@ import logging
from multipipes import Pipeline, Node from multipipes import Pipeline, Node
import bigchaindb import bigchaindb
from bigchaindb.backend import connect from bigchaindb.backend import connect, get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend import get_changefeed
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb import Bigchain from bigchaindb import Bigchain

View File

@ -8,7 +8,9 @@ import logging
from multipipes import Pipeline, Node from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed import bigchaindb
from bigchaindb.backend import connect, get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block from bigchaindb.models import Block
from bigchaindb import Bigchain from bigchaindb import Bigchain
@ -50,10 +52,6 @@ class Election:
return invalid_block return invalid_block
def get_changefeed():
return ChangeFeed(table='votes', operation=ChangeFeed.INSERT)
def create_pipeline(): def create_pipeline():
election = Election() election = Election()
@ -66,7 +64,9 @@ def create_pipeline():
def start(): def start():
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT)
pipeline = create_pipeline() pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed()) pipeline.setup(indata=changefeed)
pipeline.start() pipeline.start()
return pipeline return pipeline

View File

@ -160,7 +160,8 @@ def test_start(create_pipeline):
def test_full_pipeline(b, user_pk): def test_full_pipeline(b, user_pk):
import random import random
from bigchaindb.backend import query, get_changefeed import bigchaindb
from bigchaindb.backend import query, get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block, Transaction from bigchaindb.models import Block, Transaction
from bigchaindb.pipelines.block import create_pipeline, initial from bigchaindb.pipelines.block import create_pipeline, initial
@ -177,7 +178,8 @@ def test_full_pipeline(b, user_pk):
assert query.count_backlog(b.connection) == 100 assert query.count_backlog(b.connection) == 100
changefeed = get_changefeed(b.connection, 'backlog', connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE, ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial()) prefeed=initial())
pipeline = create_pipeline() pipeline = create_pipeline()

View File

@ -136,7 +136,9 @@ def test_start(mock_start):
def test_full_pipeline(b, user_pk): def test_full_pipeline(b, user_pk):
import random import random
from bigchaindb.backend import query import bigchaindb
from bigchaindb.backend import query, get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
outpipe = Pipe() outpipe = Pipe()
@ -162,10 +164,13 @@ def test_full_pipeline(b, user_pk):
invalid_block = b.create_block(txs) invalid_block = b.create_block(txs)
b.write_block(invalid_block) b.write_block(invalid_block)
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT)
pipeline = election.create_pipeline() pipeline = election.create_pipeline()
pipeline.setup(indata=election.get_changefeed(), outdata=outpipe) pipeline.setup(indata=changefeed, outdata=outpipe)
pipeline.start() pipeline.start()
time.sleep(1) time.sleep(1)
# vote one block valid, one invalid # vote one block valid, one invalid
vote_valid = b.vote(valid_block.id, 'b' * 64, True) vote_valid = b.vote(valid_block.id, 'b' * 64, True)
vote_invalid = b.vote(invalid_block.id, 'c' * 64, False) vote_invalid = b.vote(invalid_block.id, 'c' * 64, False)