Merge branch 'changefeed-supports-more-operations'

This commit is contained in:
vrde 2016-08-18 16:33:33 +02:00
commit 7b767affc9
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
4 changed files with 32 additions and 16 deletions

View File

@ -44,7 +44,7 @@ class Election:
def get_changefeed(): def get_changefeed():
return ChangeFeed(table='votes', operation='insert') return ChangeFeed(table='votes', operation=ChangeFeed.INSERT)
def create_pipeline(): def create_pipeline():

View File

@ -20,17 +20,19 @@ class ChangeFeed(Node):
to output before the actual changefeed. to output before the actual changefeed.
""" """
INSERT = 'insert' INSERT = 1
DELETE = 'delete' DELETE = 2
UPDATE = 'update' UPDATE = 4
def __init__(self, table, operation, prefeed=None): def __init__(self, table, operation, prefeed=None):
"""Create a new RethinkDB ChangeFeed. """Create a new RethinkDB ChangeFeed.
Args: Args:
table (str): name of the table to listen to for changes. table (str): name of the table to listen to for changes.
operation (str): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. ChangeFeed.UPDATE. Combining multiple operation is possible using
the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published prefeed (iterable): whatever set of data you want to be published
first. first.
""" """
@ -51,10 +53,10 @@ class ChangeFeed(Node):
is_delete = change['new_val'] is None is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete is_update = not is_insert and not is_delete
if is_insert and self.operation == ChangeFeed.INSERT: if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val']) self.outqueue.put(change['new_val'])
elif is_delete and self.operation == ChangeFeed.DELETE: elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val']) self.outqueue.put(change['old_val'])
elif is_update and self.operation == ChangeFeed.UPDATE: elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change) self.outqueue.put(change)

View File

@ -144,7 +144,7 @@ def initial():
def get_changefeed(): def get_changefeed():
"""Create and return the changefeed for the bigchain table.""" """Create and return the changefeed for the bigchain table."""
return ChangeFeed('bigchain', 'insert', prefeed=initial()) return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial())
def create_pipeline(): def create_pipeline():

View File

@ -3,7 +3,7 @@ from unittest.mock import patch
import rethinkdb import rethinkdb
from multipipes import Pipe from multipipes import Pipe
from bigchaindb.pipelines import utils from bigchaindb.pipelines.utils import ChangeFeed
MOCK_CHANGEFEED_DATA = [{ MOCK_CHANGEFEED_DATA = [{
@ -21,36 +21,50 @@ MOCK_CHANGEFEED_DATA = [{
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_insert(mock_run): def test_changefeed_insert(mock_run):
outpipe = Pipe() outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'insert') changefeed = ChangeFeed('backlog', ChangeFeed.INSERT)
changefeed.outqueue = outpipe changefeed.outqueue = outpipe
changefeed.run_forever() changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here' assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_delete(mock_run): def test_changefeed_delete(mock_run):
outpipe = Pipe() outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'delete') changefeed = ChangeFeed('backlog', ChangeFeed.DELETE)
changefeed.outqueue = outpipe changefeed.outqueue = outpipe
changefeed.run_forever() changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here' assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_update(mock_run): def test_changefeed_update(mock_run):
outpipe = Pipe() outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'update') changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe changefeed.outqueue = outpipe
changefeed.run_forever() changefeed.run_forever()
assert outpipe.get() == {'new_val': 'seems like we have an update here', assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'} 'old_val': 'seems like we have an update here'}
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_multiple_operations(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_prefeed(mock_run): def test_changefeed_prefeed(mock_run):
outpipe = Pipe() outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'insert', prefeed=[1, 2, 3]) changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3])
changefeed.outqueue = outpipe changefeed.outqueue = outpipe
changefeed.run_forever() changefeed.run_forever()
assert outpipe.qsize() == 4 assert outpipe.qsize() == 4