From 5abe6dd7eae05b1f760fa08551cf69a2d3ad5121 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 13 Jan 2017 10:18:58 +0100 Subject: [PATCH] Added tests to the mongodb changefeed (#1035) * Added tests to the mongodb changefeed * pep8 and typo fixes --- bigchaindb/backend/mongodb/changefeed.py | 12 +- tests/backend/mongodb/test_changefeed.py | 170 ++++++++++++++++++++++- 2 files changed, 177 insertions(+), 5 deletions(-) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 05ae7150..c54bd5da 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -2,7 +2,7 @@ import logging import time import pymongo -from pymongo.errors import ConnectionFailure, OperationFailure +from pymongo import errors from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed @@ -29,7 +29,9 @@ class MongoDBChangeFeed(ChangeFeed): try: self.run_changefeed() break - except (ConnectionFailure, OperationFailure) as exc: + except (errors.ConnectionFailure, errors.OperationFailure, + errors.AutoReconnect, + errors.ServerSelectionTimeoutError) as exc: logger.exception(exc) time.sleep(1) @@ -76,8 +78,10 @@ class MongoDBChangeFeed(ChangeFeed): # operations to apply to the document and not the # document itself. So here we first read the document # and then return it. - doc = self.connection.conn[dbname][table]\ - .find_one(record['o2'], projection={'_id': False}) + doc = self.connection.conn[dbname][table].find_one( + {'_id': record['o2']}, + {'_id': False} + ) self.outqueue.put(doc) diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index 90179ab8..e7581b34 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -1 +1,169 @@ -"""MongoDB changefeed tests""" +from unittest import mock + +import pytest +from pymongo.errors import ConnectionFailure + +from multipipes import Pipe + + +@pytest.fixture +def mock_changefeed_data(): + return [ + { + 'op': 'i', + 'o': {'_id': '', 'msg': 'seems like we have an insert here'} + }, + { + 'op': 'd', + 'o': {'msg': 'seems like we have a delete here'} + }, + { + 'op': 'u', + 'o': {'msg': 'seems like we have an update here'}, + 'o2': 'some-id' + }, + ] + + +@pytest.mark.bdb +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('pymongo.cursor.Cursor.next') +def test_changefeed_insert(mock_cursor_next, mock_cursor_alive, + mock_changefeed_data): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + # setup connection and mocks + conn = connect() + # changefeed.run_forever only returns when the cursor is closed + # so we mock `alive` to be False it finishes reading the mocked data + mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, + mock.DEFAULT, mock.DEFAULT, False] + # mock the `next` method of the cursor to return the mocked data + mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data + + outpipe = Pipe() + changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT) + changefeed.outqueue = outpipe + changefeed.run_forever() + + assert outpipe.get()['msg'] == 'seems like we have an insert here' + assert outpipe.qsize() == 0 + + +@pytest.mark.bdb +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('pymongo.cursor.Cursor.next') +def test_changefeed_delete(mock_cursor_next, mock_cursor_alive, + mock_changefeed_data): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + conn = connect() + mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, + mock.DEFAULT, mock.DEFAULT, False] + mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data + + outpipe = Pipe() + changefeed = get_changefeed(conn, 'backlog', ChangeFeed.DELETE) + changefeed.outqueue = outpipe + changefeed.run_forever() + + assert outpipe.get()['msg'] == 'seems like we have a delete here' + assert outpipe.qsize() == 0 + + +@pytest.mark.bdb +@mock.patch('pymongo.collection.Collection.find_one') +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('pymongo.cursor.Cursor.next') +def test_changefeed_update(mock_cursor_next, mock_cursor_alive, + mock_cursor_find_one, mock_changefeed_data): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + conn = connect() + mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, + mock.DEFAULT, mock.DEFAULT, False] + mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data + mock_cursor_find_one.return_value = mock_changefeed_data[2]['o'] + + outpipe = Pipe() + changefeed = get_changefeed(conn, 'backlog', ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + + assert outpipe.get()['msg'] == 'seems like we have an update here' + assert outpipe.qsize() == 0 + assert mock_cursor_find_one.called_once_with( + {'_id': mock_changefeed_data[2]['o']}, + {'_id': False} + ) + + +@pytest.mark.bdb +@mock.patch('pymongo.collection.Collection.find_one') +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('pymongo.cursor.Cursor.next') +def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive, + mock_cursor_find_one, + mock_changefeed_data): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + conn = connect() + mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, + mock.DEFAULT, mock.DEFAULT, False] + mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data + mock_cursor_find_one.return_value = mock_changefeed_data[2]['o'] + + outpipe = Pipe() + changefeed = get_changefeed(conn, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + + assert outpipe.get()['msg'] == 'seems like we have an insert here' + assert outpipe.get()['msg'] == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +@pytest.mark.bdb +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('pymongo.cursor.Cursor.next') +def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive, + mock_changefeed_data): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + conn = connect() + mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT, + mock.DEFAULT, mock.DEFAULT, False] + mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data + + outpipe = Pipe() + changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT, + prefeed=[1, 2, 3]) + changefeed.outqueue = outpipe + changefeed.run_forever() + + assert outpipe.qsize() == 4 + + +@pytest.mark.bdb +@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock) +@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa +def test_connection_failure(mock_run_changefeed, mock_cursor_alive): + from bigchaindb.backend import get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed + + conn = connect() + mock_cursor_alive.return_value = False + mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT] + + changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT) + changefeed.run_forever() + + # run_changefeed raises an exception the first time its called and then + # it's called again + assert mock_run_changefeed.call_count == 2