Add hack to handle reconnection to changefeed

This commit is contained in:
vrde 2017-02-02 20:59:17 +01:00
parent 6d3c04169c
commit 98135c7df9
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D

View File

@ -1,4 +1,3 @@
import os
import logging import logging
import time import time
@ -28,9 +27,13 @@ class MongoDBChangeFeed(ChangeFeed):
while True: while True:
try: try:
# XXX: hack to force reconnection,
# the correct way to fix this is to manage errors
# for cursors.conn
self.connection.connection = None
self.run_changefeed() self.run_changefeed()
break break
except BackendError: except (BackendError, pymongo.errors.ConnectionFailure):
logger.exception('Error connecting to the database, retrying') logger.exception('Error connecting to the database, retrying')
time.sleep(1) time.sleep(1)
@ -48,13 +51,13 @@ class MongoDBChangeFeed(ChangeFeed):
# last result was returned. ``TAILABLE_AWAIT`` will block for some # last result was returned. ``TAILABLE_AWAIT`` will block for some
# timeout after the last result was returned. If no result is received # timeout after the last result was returned. If no result is received
# in the meantime it will raise a StopIteration excetiption. # in the meantime it will raise a StopIteration excetiption.
cursor = self.connection.conn.local.oplog.rs.find( cursor = self.connection.run(
self.connection.query().local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}}, {'ns': namespace, 'ts': {'$gt': last_ts}},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT cursor_type=pymongo.CursorType.TAILABLE_AWAIT
) ))
while cursor.alive: while cursor.alive:
print(os.getpid(), 'alive')
try: try:
record = cursor.next() record = cursor.next()
except StopIteration: except StopIteration: