diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 538a5453..7a0e114c 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -44,7 +44,7 @@ class Election: def get_changefeed(): - return ChangeFeed(table='votes', operation='insert') + return ChangeFeed(table='votes', operation=ChangeFeed.INSERT) def create_pipeline(): diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index fb018453..22a5f9bc 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -20,17 +20,19 @@ class ChangeFeed(Node): to output before the actual changefeed. """ - INSERT = 'insert' - DELETE = 'delete' - UPDATE = 'update' + INSERT = 1 + DELETE = 2 + UPDATE = 4 def __init__(self, table, operation, prefeed=None): """Create a new RethinkDB ChangeFeed. Args: table (str): name of the table to listen to for changes. - operation (str): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible using + the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) prefeed (iterable): whatever set of data you want to be published first. """ @@ -51,10 +53,10 @@ class ChangeFeed(Node): is_delete = change['new_val'] is None is_update = not is_insert and not is_delete - if is_insert and self.operation == ChangeFeed.INSERT: + if is_insert and (self.operation & ChangeFeed.INSERT): self.outqueue.put(change['new_val']) - elif is_delete and self.operation == ChangeFeed.DELETE: + elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) - elif is_update and self.operation == ChangeFeed.UPDATE: + elif is_update and (self.operation & ChangeFeed.UPDATE): self.outqueue.put(change) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index 2dc259ed..6af8a291 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -144,7 +144,7 @@ def initial(): def get_changefeed(): """Create and return the changefeed for the bigchain table.""" - return ChangeFeed('bigchain', 'insert', prefeed=initial()) + return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial()) def create_pipeline(): diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index 897d64bb..ebef38c4 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -3,7 +3,7 @@ from unittest.mock import patch import rethinkdb from multipipes import Pipe -from bigchaindb.pipelines import utils +from bigchaindb.pipelines.utils import ChangeFeed MOCK_CHANGEFEED_DATA = [{ @@ -21,36 +21,50 @@ MOCK_CHANGEFEED_DATA = [{ @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_insert(mock_run): outpipe = Pipe() - changefeed = utils.ChangeFeed('backlog', 'insert') + changefeed = ChangeFeed('backlog', ChangeFeed.INSERT) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.qsize() == 0 @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_delete(mock_run): outpipe = Pipe() - changefeed = utils.ChangeFeed('backlog', 'delete') + changefeed = ChangeFeed('backlog', ChangeFeed.DELETE) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have a delete here' + assert outpipe.qsize() == 0 @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_update(mock_run): outpipe = Pipe() - changefeed = utils.ChangeFeed('backlog', 'update') + changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == {'new_val': 'seems like we have an update here', 'old_val': 'seems like we have an update here'} + assert outpipe.qsize() == 0 + + +@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +def test_changefeed_multiple_operations(mock_run): + outpipe = Pipe() + changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.get() == {'new_val': 'seems like we have an update here', + 'old_val': 'seems like we have an update here'} + assert outpipe.qsize() == 0 @patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_prefeed(mock_run): outpipe = Pipe() - changefeed = utils.ChangeFeed('backlog', 'insert', prefeed=[1, 2, 3]) + changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3]) changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.qsize() == 4 -