diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 59d53b72..c155bb6f 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -89,13 +89,14 @@ def run_changefeed(conn, table, last_ts): try: # XXX: hack to force reconnection, in case the connection # is lost while waiting on the cursor. See #1154. - conn.connection = 1 + conn._conn = None namespace = conn.dbname + '.' + table - cursor = conn.conn.local.oplog.rs.find( + query = conn.query().local.oplog.rs.find( {'ns': namespace, 'ts': {'$gt': last_ts}}, {'o._id': False}, cursor_type=pymongo.CursorType.TAILABLE_AWAIT ) + cursor = conn.run(query) logging.debug('Tailing oplog at %s/%s', namespace, last_ts) while cursor.alive: try: diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index a0d8ed12..17d50f02 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -286,8 +286,9 @@ def get_last_voted_block(conn, node_pubkey): @register_query(MongoDBConnection) def get_new_blocks_feed(conn, start_block_id): namespace = conn.dbname + '.bigchain' - query = {'o.id': start_block_id, 'op': 'i', 'ns': namespace} + match = {'o.id': start_block_id, 'op': 'i', 'ns': namespace} # Neccesary to find in descending order since tests may write same block id several times - last_ts = conn.conn.local.oplog.rs.find(query).sort('$natural', -1).next()['ts'] + query = conn.query().local.oplog.rs.find(match).sort('$natural', -1).next()['ts'] + last_ts = conn.run(query) feed = run_changefeed(conn, 'bigchain', last_ts) return (evt['o'] for evt in feed if evt['op'] == 'i') diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index cfe50e57..e1d11706 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -142,8 +142,7 @@ def test_connection_failure(): from bigchaindb.backend.mongodb.changefeed import run_changefeed conn = mock.MagicMock() - find = conn.conn.local.oplog.rs.find - find.side_effect = [ConnectionError(), RuntimeError()] + conn.run.side_effect = [ConnectionError(), RuntimeError()] changefeed = run_changefeed(conn, 'backlog', -1) with pytest.raises(RuntimeError): for record in changefeed: