diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 34650df3..d52927b9 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -34,11 +34,6 @@ class MongoDBChangeFeed(ChangeFeed): time.sleep(1) def run_changefeed(self): - # this is kinda a hack to make sure that the connection object is - # setup - self.connection._connect() - # namespace to allow us to only listen to changes in a single - # collection dbname = self.connection.dbname table = self.table namespace = '{}.{}'.format(dbname, table) @@ -61,24 +56,29 @@ class MongoDBChangeFeed(ChangeFeed): record = cursor.next() except StopIteration: continue - else: - is_insert = record['op'] == 'i' - is_delete = record['op'] == 'd' - is_update = record['op'] == 'u' - if is_insert and (self.operation & ChangeFeed.INSERT): - self.outqueue.put(record['o']) - elif is_delete and (self.operation & ChangeFeed.DELETE): - # on delete it only returns the id of the document - self.outqueue.put(record['o']) - elif is_update and (self.operation & ChangeFeed.UPDATE): - # the oplog entry for updates only returns the update - # operations to apply to the document and not the - # document itself. So here we first read the document - # and then return it - doc = self.connection.conn[dbname][table]\ - .find_one(record['o2']) - self.outqueue.put(doc) + is_insert = record['op'] == 'i' + is_delete = record['op'] == 'd' + is_update = record['op'] == 'u' + + # mongodb documents uses the `_id` for the primary key. + # We are not using this field at this point and we need to + # remove it to prevent problems with schema validation. + # See https://github.com/bigchaindb/bigchaindb/issues/992 + if is_insert and (self.operation & ChangeFeed.INSERT): + record['o'].pop('_id', None) + doc = record['o'] + elif is_delete and (self.operation & ChangeFeed.DELETE): + # on delete it only returns the id of the document + doc = record['o'] + elif is_update and (self.operation & ChangeFeed.UPDATE): + # the oplog entry for updates only returns the update + # operations to apply to the document and not the + # document itself. So here we first read the document + # and then return it. + doc = self.connection.conn[dbname][table]\ + .find_one(record['o2'], projection={'_id': False}) + self.outqueue.put(doc) @register_changefeed(MongoDBConnection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 7dda6578..d5c81741 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -44,8 +44,6 @@ class BlockPipeline: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') tx.pop('assignment_timestamp') - # required for mongodb - tx.pop('_id', None) return tx def validate_tx(self, tx):