From e303e355db553db09fbe09c6687fdda49938540d Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 9 Dec 2016 14:39:48 +0100 Subject: [PATCH] Delete outdated pipelines/utils.py Reorganized and fixed tests. --- bigchaindb/pipelines/utils.py | 75 ------------------------- tests/backend/test_changefeed.py | 96 ++++++++++++++++++++++++++++++++ tests/pipelines/test_utils.py | 80 -------------------------- tests/test_run_query_util.py | 10 ++-- 4 files changed, 100 insertions(+), 161 deletions(-) delete mode 100644 bigchaindb/pipelines/utils.py create mode 100644 tests/backend/test_changefeed.py delete mode 100644 tests/pipelines/test_utils.py diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py deleted file mode 100644 index 71f740ee..00000000 --- a/bigchaindb/pipelines/utils.py +++ /dev/null @@ -1,75 +0,0 @@ -"""Utility classes and functions to work with the pipelines.""" - - -import time -import rethinkdb as r -import logging -from multipipes import Node - -from bigchaindb import Bigchain - - -logger = logging.getLogger(__name__) - - -class ChangeFeed(Node): - """This class wraps a RethinkDB changefeed adding a ``prefeed``. - - It extends :class:`multipipes.Node` to make it pluggable in other - Pipelines instances, and makes usage of ``self.outqueue`` to output - the data. - - A changefeed is a real time feed on inserts, updates, and deletes, and - is volatile. This class is a helper to create changefeeds. Moreover, - it provides a way to specify a ``prefeed`` of iterable data to output - before the actual changefeed. - """ - - INSERT = 1 - DELETE = 2 - UPDATE = 4 - - def __init__(self, table, operation, prefeed=None, bigchain=None): - """Create a new RethinkDB ChangeFeed. - - Args: - table (str): name of the table to listen to for changes. - operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. Combining multiple operation is possible - with the bitwise ``|`` operator - (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (iterable): whatever set of data you want to be published - first. - bigchain (``Bigchain``): the bigchain instance to use (can be None). - """ - - super().__init__(name='changefeed') - self.prefeed = prefeed if prefeed else [] - self.table = table - self.operation = operation - self.bigchain = bigchain or Bigchain() - - def run_forever(self): - for element in self.prefeed: - self.outqueue.put(element) - - while True: - try: - self.run_changefeed() - break - except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: - logger.exception(exc) - time.sleep(1) - - def run_changefeed(self): - for change in self.bigchain.connection.run(r.table(self.table).changes()): - is_insert = change['old_val'] is None - is_delete = change['new_val'] is None - is_update = not is_insert and not is_delete - - if is_insert and (self.operation & ChangeFeed.INSERT): - self.outqueue.put(change['new_val']) - elif is_delete and (self.operation & ChangeFeed.DELETE): - self.outqueue.put(change['old_val']) - elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change['new_val']) diff --git a/tests/backend/test_changefeed.py b/tests/backend/test_changefeed.py new file mode 100644 index 00000000..d9a8f45b --- /dev/null +++ b/tests/backend/test_changefeed.py @@ -0,0 +1,96 @@ +import pytest +from unittest.mock import Mock + +from multipipes import Pipe + +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb.backend import connect + + +@pytest.fixture +def mock_changefeed_data(): + return [ + { + 'new_val': 'seems like we have an insert here', + 'old_val': None, + }, { + 'new_val': None, + 'old_val': 'seems like we have a delete here', + }, { + 'new_val': 'seems like we have an update here', + 'old_val': 'seems like we have an update here', + } + ] + + +@pytest.fixture +def mock_changefeed_connection(mock_changefeed_data): + connection = connect(**bigchaindb.config['database']) + connection.run = Mock(return_value=mock_changefeed_data) + return connection + + +def test_changefeed_insert(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.INSERT) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.qsize() == 0 + + +def test_changefeed_delete(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.DELETE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have a delete here' + assert outpipe.qsize() == 0 + + +def test_changefeed_update(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_multiple_operations(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_prefeed(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT, prefeed=[1, 2, 3]) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.qsize() == 4 diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py deleted file mode 100644 index ebe0db75..00000000 --- a/tests/pipelines/test_utils.py +++ /dev/null @@ -1,80 +0,0 @@ -import pytest -from unittest.mock import Mock - -from multipipes import Pipe -from bigchaindb import Bigchain -from bigchaindb.backend.connection import Connection -from bigchaindb.pipelines.utils import ChangeFeed - - -@pytest.fixture -def mock_changefeed_data(): - return [ - { - 'new_val': 'seems like we have an insert here', - 'old_val': None, - }, { - 'new_val': None, - 'old_val': 'seems like we have a delete here', - }, { - 'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here', - } - ] - - -@pytest.fixture -def mock_changefeed_bigchain(mock_changefeed_data): - connection = Connection(host=None, port=None, dbname=None) - connection.run = Mock(return_value=mock_changefeed_data) - return Bigchain(connection=connection) - - -def test_changefeed_insert(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.qsize() == 0 - - -def test_changefeed_delete(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have a delete here' - assert outpipe.qsize() == 0 - - -def test_changefeed_update(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_multiple_operations(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_prefeed(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT, - prefeed=[1, 2, 3], - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.qsize() == 4 diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 1aeea11e..b740983c 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): import time import multiprocessing as mp - from bigchaindb import Bigchain - from bigchaindb.pipelines.utils import ChangeFeed + from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed class MockConnection: tries = 0 @@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): else: time.sleep(10) - bigchain = Bigchain() - bigchain.connection = MockConnection() - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, - bigchain=bigchain) + changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT, + connection=MockConnection()) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True)