Simplify code.

Pop the `_id` when receiving the document on the changefeed
This commit is contained in:
Rodolphe Marques 2017-01-04 13:27:08 +01:00
parent 0d11c3a7a8
commit ea4d01dec0
2 changed files with 22 additions and 24 deletions

View File

@ -34,11 +34,6 @@ class MongoDBChangeFeed(ChangeFeed):
time.sleep(1) time.sleep(1)
def run_changefeed(self): def run_changefeed(self):
# this is kinda a hack to make sure that the connection object is
# setup
self.connection._connect()
# namespace to allow us to only listen to changes in a single
# collection
dbname = self.connection.dbname dbname = self.connection.dbname
table = self.table table = self.table
namespace = '{}.{}'.format(dbname, table) namespace = '{}.{}'.format(dbname, table)
@ -61,24 +56,29 @@ class MongoDBChangeFeed(ChangeFeed):
record = cursor.next() record = cursor.next()
except StopIteration: except StopIteration:
continue continue
else:
is_insert = record['op'] == 'i'
is_delete = record['op'] == 'd'
is_update = record['op'] == 'u'
if is_insert and (self.operation & ChangeFeed.INSERT): is_insert = record['op'] == 'i'
self.outqueue.put(record['o']) is_delete = record['op'] == 'd'
elif is_delete and (self.operation & ChangeFeed.DELETE): is_update = record['op'] == 'u'
# on delete it only returns the id of the document
self.outqueue.put(record['o']) # mongodb documents uses the `_id` for the primary key.
elif is_update and (self.operation & ChangeFeed.UPDATE): # We are not using this field at this point and we need to
# the oplog entry for updates only returns the update # remove it to prevent problems with schema validation.
# operations to apply to the document and not the # See https://github.com/bigchaindb/bigchaindb/issues/992
# document itself. So here we first read the document if is_insert and (self.operation & ChangeFeed.INSERT):
# and then return it record['o'].pop('_id', None)
doc = self.connection.conn[dbname][table]\ doc = record['o']
.find_one(record['o2']) elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(doc) # on delete it only returns the id of the document
doc = record['o']
elif is_update and (self.operation & ChangeFeed.UPDATE):
# the oplog entry for updates only returns the update
# operations to apply to the document and not the
# document itself. So here we first read the document
# and then return it.
doc = self.connection.conn[dbname][table]\
.find_one(record['o2'], projection={'_id': False})
self.outqueue.put(doc)
@register_changefeed(MongoDBConnection) @register_changefeed(MongoDBConnection)

View File

@ -44,8 +44,6 @@ class BlockPipeline:
if tx['assignee'] == self.bigchain.me: if tx['assignee'] == self.bigchain.me:
tx.pop('assignee') tx.pop('assignee')
tx.pop('assignment_timestamp') tx.pop('assignment_timestamp')
# required for mongodb
tx.pop('_id', None)
return tx return tx
def validate_tx(self, tx): def validate_tx(self, tx):