diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index a6b74dfc..8f4c3b99 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -29,7 +29,7 @@ class ChangeFeed(Node): DELETE = 2 UPDATE = 4 - def __init__(self, table, operation, prefeed=None): + def __init__(self, table, operation, prefeed=None, bigchain=None): """Create a new RethinkDB ChangeFeed. Args: @@ -40,13 +40,14 @@ class ChangeFeed(Node): (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) prefeed (iterable): whatever set of data you want to be published first. + bigchain (``Bigchain``): the bigchain instance to use (can be None). """ super().__init__(name='changefeed') self.prefeed = prefeed if prefeed else [] self.table = table self.operation = operation - self.bigchain = Bigchain() + self.bigchain = bigchain or Bigchain() def run_forever(self): for element in self.prefeed: diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 3c37e140..bac63048 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -1,3 +1,4 @@ +from unittest.mock import patch from threading import Thread import pytest @@ -26,85 +27,81 @@ def test_raise_exception_when_max_tries(): def test_reconnect_when_connection_lost(): import time + import rethinkdb as r - proc = None + def raise_exception(*args, **kwargs): + raise r.ReqlDriverError('mock') + + conn = Connection() + original_connect = r.connect + r.connect = raise_exception def delayed_start(): - nonlocal proc time.sleep(1) - proc, _ = start_temp_rethinkdb(38015) + r.connect = original_connect thread = Thread(target=delayed_start) - conn = Connection(port=38015) query = r.expr('1') thread.start() assert conn.run(query) == '1' - proc.terminate() - proc.wait() def test_changefeed_reconnects_when_connection_lost(monkeypatch): - import os import time - import tempfile import multiprocessing as mp - import bigchaindb + from bigchaindb import Bigchain from bigchaindb.pipelines.utils import ChangeFeed - dbport = 38015 - dbname = 'test_' + str(os.getpid()) - directory = tempfile.mkdtemp() + class MockConnection: + tries = 0 - monkeypatch.setitem(bigchaindb.config, 'database', { - 'host': 'localhost', - 'port': dbport, - 'name': dbname - }) + def run(self, *args, **kwargs): + return self - proc, _ = start_temp_rethinkdb(dbport, directory=directory) + def __iter__(self): + return self - # prepare DB and table - conn = r.connect(port=dbport) - r.db_create(dbname).run(conn) - r.db(dbname).table_create('cat_facts').run(conn) + def __next__(self): + self.tries += 1 + if self.tries == 1: + raise r.ReqlDriverError('mock') + elif self.tries == 2: + return { 'new_val': { 'fact': 'A group of cats is called a clowder.' }, + 'old_val': None } + if self.tries == 3: + raise r.ReqlDriverError('mock') + elif self.tries == 4: + return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' }, + 'old_val': None } + else: + time.sleep(10) - # initialize ChangeFeed and put it in a thread - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + + bigchain = Bigchain() + bigchain.connection = MockConnection() + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, + bigchain=bigchain) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True) t_changefeed.start() time.sleep(1) + # try 1: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. - # insert some records in the table to start generating - # events that changefeed will put in `outqueue` - r.db(dbname).table('cat_facts').insert({ - 'fact': 'A group of cats is called a clowder.' - }).run(conn) - - # the event should be in the outqueue + # try 2: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'A group of cats is called a clowder.' - # stop the DB process - proc.terminate() - proc.wait() - + # try 3: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. assert t_changefeed.is_alive() is True - proc, _ = start_temp_rethinkdb(dbport, directory=directory) - time.sleep(2) - - conn = r.connect(port=dbport) - r.db(dbname).table('cat_facts').insert({ - 'fact': 'Cats sleep 70% of their lives.' - }).run(conn) + # try 4: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'Cats sleep 70% of their lives.' - - # stop the DB process - proc.terminate() - proc.wait()