Added tests to the mongodb changefeed (#1035)

* Added tests to the mongodb changefeed

* pep8 and typo fixes
This commit is contained in:
Rodolphe Marques 2017-01-13 10:18:58 +01:00 committed by GitHub
parent 8510f47ed4
commit 5abe6dd7ea
2 changed files with 177 additions and 5 deletions

View File

@ -2,7 +2,7 @@ import logging
import time import time
import pymongo import pymongo
from pymongo.errors import ConnectionFailure, OperationFailure from pymongo import errors
from bigchaindb import backend from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.backend.changefeed import ChangeFeed
@ -29,7 +29,9 @@ class MongoDBChangeFeed(ChangeFeed):
try: try:
self.run_changefeed() self.run_changefeed()
break break
except (ConnectionFailure, OperationFailure) as exc: except (errors.ConnectionFailure, errors.OperationFailure,
errors.AutoReconnect,
errors.ServerSelectionTimeoutError) as exc:
logger.exception(exc) logger.exception(exc)
time.sleep(1) time.sleep(1)
@ -76,8 +78,10 @@ class MongoDBChangeFeed(ChangeFeed):
# operations to apply to the document and not the # operations to apply to the document and not the
# document itself. So here we first read the document # document itself. So here we first read the document
# and then return it. # and then return it.
doc = self.connection.conn[dbname][table]\ doc = self.connection.conn[dbname][table].find_one(
.find_one(record['o2'], projection={'_id': False}) {'_id': record['o2']},
{'_id': False}
)
self.outqueue.put(doc) self.outqueue.put(doc)

View File

@ -1 +1,169 @@
"""MongoDB changefeed tests""" from unittest import mock
import pytest
from pymongo.errors import ConnectionFailure
from multipipes import Pipe
@pytest.fixture
def mock_changefeed_data():
return [
{
'op': 'i',
'o': {'_id': '', 'msg': 'seems like we have an insert here'}
},
{
'op': 'd',
'o': {'msg': 'seems like we have a delete here'}
},
{
'op': 'u',
'o': {'msg': 'seems like we have an update here'},
'o2': 'some-id'
},
]
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_insert(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
# setup connection and mocks
conn = connect()
# changefeed.run_forever only returns when the cursor is closed
# so we mock `alive` to be False it finishes reading the mocked data
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
# mock the `next` method of the cursor to return the mocked data
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
outpipe = Pipe()
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get()['msg'] == 'seems like we have an insert here'
assert outpipe.qsize() == 0
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_delete(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
outpipe = Pipe()
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.DELETE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get()['msg'] == 'seems like we have a delete here'
assert outpipe.qsize() == 0
@pytest.mark.bdb
@mock.patch('pymongo.collection.Collection.find_one')
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_update(mock_cursor_next, mock_cursor_alive,
mock_cursor_find_one, mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
mock_cursor_find_one.return_value = mock_changefeed_data[2]['o']
outpipe = Pipe()
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get()['msg'] == 'seems like we have an update here'
assert outpipe.qsize() == 0
assert mock_cursor_find_one.called_once_with(
{'_id': mock_changefeed_data[2]['o']},
{'_id': False}
)
@pytest.mark.bdb
@mock.patch('pymongo.collection.Collection.find_one')
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_multiple_operations(mock_cursor_next, mock_cursor_alive,
mock_cursor_find_one,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
mock_cursor_find_one.return_value = mock_changefeed_data[2]['o']
outpipe = Pipe()
changefeed = get_changefeed(conn, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get()['msg'] == 'seems like we have an insert here'
assert outpipe.get()['msg'] == 'seems like we have an update here'
assert outpipe.qsize() == 0
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('pymongo.cursor.Cursor.next')
def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive,
mock_changefeed_data):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.side_effect = [mock.DEFAULT, mock.DEFAULT,
mock.DEFAULT, mock.DEFAULT, False]
mock_cursor_next.side_effect = [mock.DEFAULT] + mock_changefeed_data
outpipe = Pipe()
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT,
prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4
@pytest.mark.bdb
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa
def test_connection_failure(mock_run_changefeed, mock_cursor_alive):
from bigchaindb.backend import get_changefeed, connect
from bigchaindb.backend.changefeed import ChangeFeed
conn = connect()
mock_cursor_alive.return_value = False
mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT]
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT)
changefeed.run_forever()
# run_changefeed raises an exception the first time its called and then
# it's called again
assert mock_run_changefeed.call_count == 2