From 00458cfa685d57da191a16049104d16b3fbf2cfd Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 16 Dec 2016 09:53:21 +0100 Subject: [PATCH 1/9] Initial implementation of the mongodb changefedd --- bigchaindb/backend/mongodb/changefeed.py | 89 ++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 bigchaindb/backend/mongodb/changefeed.py diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py new file mode 100644 index 00000000..1f7f540b --- /dev/null +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -0,0 +1,89 @@ +import logging +import time + +import pymongo +from pymongo.errors import ConnectionFailure, OperationFailure + +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.mongodb.connection import MongoDBConnection + + +logger = logging.getLogger(__name__) +register_changefeed = module_dispatch_registrar(backend.changefeed) + + +class MongoDBChangeFeed(ChangeFeed): + """This class implements a MongoDB changefeed. + + We emulate the behaviour of the RethinkDB changefeed by using a tailable + cursor that listens for events on the oplog. + """ + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + while True: + try: + self.run_changefeed() + break + except (ConnectionFailure, OperationFailure) as exc: + logger.exception(exc) + time.sleep(1) + + def run_changefeed(self): + # this is kinda a hack to make sure that the connection object is + # setup + self.connection._connect() + # namespace to allow us to only listen to changes in a single + # collection + namespace = '{}.{}'.format(self.connection.dbname, self.table) + # last timestamp in the oplog. We only care for operations happening + # in the future. + last_ts = self.connection.conn.local.oplog.rs.find()\ + .sort('$natural', pymongo.DESCENDING).limit(1)\ + .next()['ts'] + # tailable cursor. A tailable cursor will remain open even after the + # last result was returned. ``TAILABLE_AWAIT`` will block for some + # timeout after the last result was returned. If no result is received + # in the meantime it will raise a StopIteration excetiption. + cursor = self.connection.conn.local.oplog.rs.find( + {'ns': namespace, 'ts': {'$gt': last_ts}}, + cursor_type=pymongo.CursorType.TAILABLE_AWAIT, + oplog_replay=True + ) + + while cursor.alive: + try: + record = cursor.next() + except StopIteration: + print('mongodb cursor waiting') + else: + is_insert = record['op'] == 'i' + is_delete = record['op'] == 'd' + is_update = record['op'] == 'u' + + if is_insert and self.operation == ChangeFeed.INSERT: + self.outqueue.put(record['o']) + elif is_delete and self.operation == ChangeFeed.DELETE: + # on delete it only returns the id of the document + self.outqueue.put(record['o']) + elif is_update and self.operation == ChangeFeed.UPDATE: + # not sure what to do here. Lets return the entire + # operation + self.outqueue.put(record) + + +@register_changefeed(MongoDBConnection) +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a MongoDB changefeed. + + Returns: + An instance of + :class:`~bigchaindb.backend.mongodb.MongoDBChangeFeed`. + """ + + return MongoDBChangeFeed(table, operation, prefeed=prefeed, + connection=connection) From 9f738cd58d98f60221ddab315d6c0e5f1cafd0ab Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 22 Dec 2016 14:18:31 +0100 Subject: [PATCH 2/9] re-enable changefeed for mongodb --- bigchaindb/backend/mongodb/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigchaindb/backend/mongodb/__init__.py b/bigchaindb/backend/mongodb/__init__.py index b43a9c24..af5293ac 100644 --- a/bigchaindb/backend/mongodb/__init__.py +++ b/bigchaindb/backend/mongodb/__init__.py @@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module. """ # Register the single dispatched modules on import. -from bigchaindb.backend.mongodb import schema, query # noqa no changefeed for now +from bigchaindb.backend.mongodb import schema, query, changefeed # noqa # MongoDBConnection should always be accessed via # ``bigchaindb.backend.connect()``. From a0952df9fb7e6327aed5b565d962b5aa037b1776 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 22 Dec 2016 14:30:19 +0100 Subject: [PATCH 3/9] fixed mongodb queries to return genesis block and last voted block --- bigchaindb/backend/mongodb/query.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 369cb39d..bf7b7da7 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -133,8 +133,9 @@ def write_vote(conn, vote): @register_query(MongoDBConnection) def get_genesis_block(conn): - return conn.db['bigchain'].find_one({'block.transactions.0.operation' == - 'GENESIS'}) + return conn.db['bigchain'].find_one({ + 'block.transactions.0.operation': 'GENESIS' + }) @register_query(MongoDBConnection) @@ -142,7 +143,10 @@ def get_last_voted_block(conn, node_pubkey): last_voted = conn.db['votes']\ .find({'node_pubkey': node_pubkey}, sort=[('vote.timestamp', -1)]) - if not last_voted: + + # pymongo seems to return a cursor even if there are no results + # so we actually need to check the count + if last_voted.count() == 0: return get_genesis_block(conn) mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] From 0f88776537d2a1261d9558b90e063d8eaa90c65c Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 22 Dec 2016 15:43:22 +0100 Subject: [PATCH 4/9] small fixes to the mongodb changefeed --- bigchaindb/backend/mongodb/changefeed.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 1f7f540b..311ecbfe 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -51,26 +51,25 @@ class MongoDBChangeFeed(ChangeFeed): # in the meantime it will raise a StopIteration excetiption. cursor = self.connection.conn.local.oplog.rs.find( {'ns': namespace, 'ts': {'$gt': last_ts}}, - cursor_type=pymongo.CursorType.TAILABLE_AWAIT, - oplog_replay=True + cursor_type=pymongo.CursorType.TAILABLE_AWAIT ) while cursor.alive: try: record = cursor.next() except StopIteration: - print('mongodb cursor waiting') + continue else: is_insert = record['op'] == 'i' is_delete = record['op'] == 'd' is_update = record['op'] == 'u' - if is_insert and self.operation == ChangeFeed.INSERT: + if is_insert and (self.operation & ChangeFeed.INSERT): self.outqueue.put(record['o']) - elif is_delete and self.operation == ChangeFeed.DELETE: + elif is_delete and (self.operation & ChangeFeed.DELETE): # on delete it only returns the id of the document self.outqueue.put(record['o']) - elif is_update and self.operation == ChangeFeed.UPDATE: + elif is_update and (self.operation & ChangeFeed.UPDATE): # not sure what to do here. Lets return the entire # operation self.outqueue.put(record) From 0fdcff90f9bced8ddfcfa83cfaa491b97b0fe76a Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 22 Dec 2016 15:49:20 +0100 Subject: [PATCH 5/9] Remove uniqueness constraint of mongodb secondary indexes. --- bigchaindb/backend/mongodb/schema.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py index 7eaff782..fed2d1e4 100644 --- a/bigchaindb/backend/mongodb/schema.py +++ b/bigchaindb/backend/mongodb/schema.py @@ -61,13 +61,12 @@ def create_bigchain_secondary_index(conn, dbname): # to query the bigchain for a transaction id, this field is unique conn.conn[dbname]['bigchain'].create_index('block.transactions.id', - name='transaction_id', - unique=True) + name='transaction_id') # secondary index for asset uuid, this field is unique conn.conn[dbname]['bigchain']\ .create_index('block.transactions.transaction.asset.id', - name='asset_id', unique=True) + name='asset_id') def create_backlog_secondary_index(conn, dbname): From 3529bf91147d9832766e779bb02cf3f44cae74e1 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 3 Jan 2017 15:09:23 +0100 Subject: [PATCH 6/9] Fixed mongodb queries to work with pipelines. Added some logging to the pipeline processes --- bigchaindb/backend/mongodb/changefeed.py | 14 ++++++++++---- bigchaindb/backend/mongodb/query.py | 11 ++++++++--- bigchaindb/pipelines/block.py | 2 ++ bigchaindb/pipelines/stale.py | 2 ++ bigchaindb/pipelines/vote.py | 10 ++++++++-- 5 files changed, 30 insertions(+), 9 deletions(-) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 311ecbfe..34650df3 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -39,7 +39,9 @@ class MongoDBChangeFeed(ChangeFeed): self.connection._connect() # namespace to allow us to only listen to changes in a single # collection - namespace = '{}.{}'.format(self.connection.dbname, self.table) + dbname = self.connection.dbname + table = self.table + namespace = '{}.{}'.format(dbname, table) # last timestamp in the oplog. We only care for operations happening # in the future. last_ts = self.connection.conn.local.oplog.rs.find()\ @@ -70,9 +72,13 @@ class MongoDBChangeFeed(ChangeFeed): # on delete it only returns the id of the document self.outqueue.put(record['o']) elif is_update and (self.operation & ChangeFeed.UPDATE): - # not sure what to do here. Lets return the entire - # operation - self.outqueue.put(record) + # the oplog entry for updates only returns the update + # operations to apply to the document and not the + # document itself. So here we first read the document + # and then return it + doc = self.connection.conn[dbname][table]\ + .find_one(record['o2']) + self.outqueue.put(doc) @register_changefeed(MongoDBConnection) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index bf7b7da7..2e608845 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -20,10 +20,15 @@ def write_transaction(conn, signed_transaction): @register_query(MongoDBConnection) def update_transaction(conn, transaction_id, doc): + # with mongodb we need to add update operators to the doc + update_doc = {'$set': { + 'assignee': doc['assignee'], + 'assignment_timestamp': doc['assignment_timestamp'] + }} return conn.db['backlog']\ - .find_one_and_update({'id': transaction_id}, - doc, - return_document=ReturnDocument.AFTER) + .find_one_and_update({'id': transaction_id}, + update_doc, + return_document=ReturnDocument.AFTER) @register_query(MongoDBConnection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index d5c81741..7dda6578 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -44,6 +44,8 @@ class BlockPipeline: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') tx.pop('assignment_timestamp') + # required for mongodb + tx.pop('_id', None) return tx def validate_tx(self, tx): diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py index b560f8bb..010bb8e1 100644 --- a/bigchaindb/pipelines/stale.py +++ b/bigchaindb/pipelines/stale.py @@ -48,6 +48,8 @@ class StaleTransactionMonitor: Returns: transaction """ + # NOTE: Maybe this is to verbose? + logger.info('Reassigning transaction with id %s', tx['id']) self.bigchain.reassign_transaction(tx) return tx diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index dd138d41..2a9b0ee5 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -5,10 +5,10 @@ of actions to do on transactions is specified in the ``create_pipeline`` function. """ +import logging from collections import Counter from multipipes import Pipeline, Node -from bigchaindb.common import exceptions import bigchaindb from bigchaindb import Bigchain @@ -16,6 +16,10 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block +from bigchaindb.common import exceptions + + +logger = logging.getLogger(__name__) class Vote: @@ -132,7 +136,9 @@ class Vote: Args: vote: the vote to write. """ - + validity = 'valid' if vote['vote']['is_block_valid'] else 'invalid' + logger.info("Voting '%s' for block %s", validity, + vote['vote']['voting_for_block']) self.bigchain.write_vote(vote) return vote From 0d11c3a7a88531f51ede02fd64ae1b2c564f432e Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 3 Jan 2017 15:54:06 +0100 Subject: [PATCH 7/9] simplify update transaction --- bigchaindb/backend/mongodb/query.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 2e608845..5bb17239 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -21,13 +21,10 @@ def write_transaction(conn, signed_transaction): @register_query(MongoDBConnection) def update_transaction(conn, transaction_id, doc): # with mongodb we need to add update operators to the doc - update_doc = {'$set': { - 'assignee': doc['assignee'], - 'assignment_timestamp': doc['assignment_timestamp'] - }} + doc = {'$set': doc} return conn.db['backlog']\ .find_one_and_update({'id': transaction_id}, - update_doc, + doc, return_document=ReturnDocument.AFTER) From ea4d01dec07e9b2a0a96e2bb289aab3697e8cc86 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 4 Jan 2017 13:27:08 +0100 Subject: [PATCH 8/9] Simplify code. Pop the `_id` when receiving the document on the changefeed --- bigchaindb/backend/mongodb/changefeed.py | 44 ++++++++++++------------ bigchaindb/pipelines/block.py | 2 -- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 34650df3..d52927b9 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -34,11 +34,6 @@ class MongoDBChangeFeed(ChangeFeed): time.sleep(1) def run_changefeed(self): - # this is kinda a hack to make sure that the connection object is - # setup - self.connection._connect() - # namespace to allow us to only listen to changes in a single - # collection dbname = self.connection.dbname table = self.table namespace = '{}.{}'.format(dbname, table) @@ -61,24 +56,29 @@ class MongoDBChangeFeed(ChangeFeed): record = cursor.next() except StopIteration: continue - else: - is_insert = record['op'] == 'i' - is_delete = record['op'] == 'd' - is_update = record['op'] == 'u' - if is_insert and (self.operation & ChangeFeed.INSERT): - self.outqueue.put(record['o']) - elif is_delete and (self.operation & ChangeFeed.DELETE): - # on delete it only returns the id of the document - self.outqueue.put(record['o']) - elif is_update and (self.operation & ChangeFeed.UPDATE): - # the oplog entry for updates only returns the update - # operations to apply to the document and not the - # document itself. So here we first read the document - # and then return it - doc = self.connection.conn[dbname][table]\ - .find_one(record['o2']) - self.outqueue.put(doc) + is_insert = record['op'] == 'i' + is_delete = record['op'] == 'd' + is_update = record['op'] == 'u' + + # mongodb documents uses the `_id` for the primary key. + # We are not using this field at this point and we need to + # remove it to prevent problems with schema validation. + # See https://github.com/bigchaindb/bigchaindb/issues/992 + if is_insert and (self.operation & ChangeFeed.INSERT): + record['o'].pop('_id', None) + doc = record['o'] + elif is_delete and (self.operation & ChangeFeed.DELETE): + # on delete it only returns the id of the document + doc = record['o'] + elif is_update and (self.operation & ChangeFeed.UPDATE): + # the oplog entry for updates only returns the update + # operations to apply to the document and not the + # document itself. So here we first read the document + # and then return it. + doc = self.connection.conn[dbname][table]\ + .find_one(record['o2'], projection={'_id': False}) + self.outqueue.put(doc) @register_changefeed(MongoDBConnection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 7dda6578..d5c81741 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -44,8 +44,6 @@ class BlockPipeline: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') tx.pop('assignment_timestamp') - # required for mongodb - tx.pop('_id', None) return tx def validate_tx(self, tx): From 1b3c909d51f4285eeae099b849e488db8f9ebd4b Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 4 Jan 2017 15:43:17 +0100 Subject: [PATCH 9/9] Fixed mongodb queries Fixed some tests --- bigchaindb/backend/mongodb/query.py | 24 ++++++++++++++++++------ tests/db/test_bigchain_api.py | 4 ++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 5bb17239..8765d110 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -40,10 +40,20 @@ def get_stale_transactions(conn, reassign_delay): @register_query(MongoDBConnection) -def get_transaction_from_block(conn, block_id, tx_id): - # this is definitely wrong, but it's something like this - return conn.db['bigchain'].find_one({'id': block_id, - 'block.transactions.id': tx_id}) +def get_transaction_from_block(conn, transaction_id, block_id): + return conn.db['bigchain'].aggregate([ + {'$match': {'id': block_id}}, + {'$project': { + 'block.transactions': { + '$filter': { + 'input': '$block.transactions', + 'as': 'transaction', + 'cond': { + '$eq': ['$$transaction.id', transaction_id] + } + } + } + }}]).next()['block']['transactions'][0] @register_query(MongoDBConnection) @@ -92,14 +102,16 @@ def get_owned_ids(conn, owner): @register_query(MongoDBConnection) def get_votes_by_block_id(conn, block_id): return conn.db['votes']\ - .find({'vote.voting_for_block': block_id}) + .find({'vote.voting_for_block': block_id}, + projection={'_id': False}) @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): return conn.db['votes']\ .find({'vote.voting_for_block': block_id, - 'node_pubkey': node_pubkey}) + 'node_pubkey': node_pubkey}, + projection={'_id': False}) @register_query(MongoDBConnection) diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index e294d490..1a1beb75 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -153,14 +153,14 @@ class TestBigchainApi(object): def test_get_transaction_in_invalid_and_valid_block(self, monkeypatch, b): from bigchaindb.models import Transaction - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) tx1 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx1 = tx1.sign([b.me_private]) block1 = b.create_block([tx1]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 2000000000) tx2 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx2 = tx2.sign([b.me_private])