small fixes to the mongodb changefeed

This commit is contained in:
Rodolphe Marques 2016-12-22 15:43:22 +01:00
parent a0952df9fb
commit 0f88776537

View File

@ -51,26 +51,25 @@ class MongoDBChangeFeed(ChangeFeed):
# in the meantime it will raise a StopIteration excetiption. # in the meantime it will raise a StopIteration excetiption.
cursor = self.connection.conn.local.oplog.rs.find( cursor = self.connection.conn.local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}}, {'ns': namespace, 'ts': {'$gt': last_ts}},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT, cursor_type=pymongo.CursorType.TAILABLE_AWAIT
oplog_replay=True
) )
while cursor.alive: while cursor.alive:
try: try:
record = cursor.next() record = cursor.next()
except StopIteration: except StopIteration:
print('mongodb cursor waiting') continue
else: else:
is_insert = record['op'] == 'i' is_insert = record['op'] == 'i'
is_delete = record['op'] == 'd' is_delete = record['op'] == 'd'
is_update = record['op'] == 'u' is_update = record['op'] == 'u'
if is_insert and self.operation == ChangeFeed.INSERT: if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(record['o']) self.outqueue.put(record['o'])
elif is_delete and self.operation == ChangeFeed.DELETE: elif is_delete and (self.operation & ChangeFeed.DELETE):
# on delete it only returns the id of the document # on delete it only returns the id of the document
self.outqueue.put(record['o']) self.outqueue.put(record['o'])
elif is_update and self.operation == ChangeFeed.UPDATE: elif is_update and (self.operation & ChangeFeed.UPDATE):
# not sure what to do here. Lets return the entire # not sure what to do here. Lets return the entire
# operation # operation
self.outqueue.put(record) self.outqueue.put(record)