diff --git a/bigchaindb/backend/mongodb/__init__.py b/bigchaindb/backend/mongodb/__init__.py index b43a9c24..af5293ac 100644 --- a/bigchaindb/backend/mongodb/__init__.py +++ b/bigchaindb/backend/mongodb/__init__.py @@ -16,7 +16,7 @@ generic backend interfaces to the implementations in this module. """ # Register the single dispatched modules on import. -from bigchaindb.backend.mongodb import schema, query # noqa no changefeed for now +from bigchaindb.backend.mongodb import schema, query, changefeed # noqa # MongoDBConnection should always be accessed via # ``bigchaindb.backend.connect()``. diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py new file mode 100644 index 00000000..d52927b9 --- /dev/null +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -0,0 +1,94 @@ +import logging +import time + +import pymongo +from pymongo.errors import ConnectionFailure, OperationFailure + +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.mongodb.connection import MongoDBConnection + + +logger = logging.getLogger(__name__) +register_changefeed = module_dispatch_registrar(backend.changefeed) + + +class MongoDBChangeFeed(ChangeFeed): + """This class implements a MongoDB changefeed. + + We emulate the behaviour of the RethinkDB changefeed by using a tailable + cursor that listens for events on the oplog. + """ + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + while True: + try: + self.run_changefeed() + break + except (ConnectionFailure, OperationFailure) as exc: + logger.exception(exc) + time.sleep(1) + + def run_changefeed(self): + dbname = self.connection.dbname + table = self.table + namespace = '{}.{}'.format(dbname, table) + # last timestamp in the oplog. We only care for operations happening + # in the future. + last_ts = self.connection.conn.local.oplog.rs.find()\ + .sort('$natural', pymongo.DESCENDING).limit(1)\ + .next()['ts'] + # tailable cursor. A tailable cursor will remain open even after the + # last result was returned. ``TAILABLE_AWAIT`` will block for some + # timeout after the last result was returned. If no result is received + # in the meantime it will raise a StopIteration excetiption. + cursor = self.connection.conn.local.oplog.rs.find( + {'ns': namespace, 'ts': {'$gt': last_ts}}, + cursor_type=pymongo.CursorType.TAILABLE_AWAIT + ) + + while cursor.alive: + try: + record = cursor.next() + except StopIteration: + continue + + is_insert = record['op'] == 'i' + is_delete = record['op'] == 'd' + is_update = record['op'] == 'u' + + # mongodb documents uses the `_id` for the primary key. + # We are not using this field at this point and we need to + # remove it to prevent problems with schema validation. + # See https://github.com/bigchaindb/bigchaindb/issues/992 + if is_insert and (self.operation & ChangeFeed.INSERT): + record['o'].pop('_id', None) + doc = record['o'] + elif is_delete and (self.operation & ChangeFeed.DELETE): + # on delete it only returns the id of the document + doc = record['o'] + elif is_update and (self.operation & ChangeFeed.UPDATE): + # the oplog entry for updates only returns the update + # operations to apply to the document and not the + # document itself. So here we first read the document + # and then return it. + doc = self.connection.conn[dbname][table]\ + .find_one(record['o2'], projection={'_id': False}) + self.outqueue.put(doc) + + +@register_changefeed(MongoDBConnection) +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a MongoDB changefeed. + + Returns: + An instance of + :class:`~bigchaindb.backend.mongodb.MongoDBChangeFeed`. + """ + + return MongoDBChangeFeed(table, operation, prefeed=prefeed, + connection=connection) diff --git a/bigchaindb/backend/mongodb/query.py b/bigchaindb/backend/mongodb/query.py index 369cb39d..8765d110 100644 --- a/bigchaindb/backend/mongodb/query.py +++ b/bigchaindb/backend/mongodb/query.py @@ -20,10 +20,12 @@ def write_transaction(conn, signed_transaction): @register_query(MongoDBConnection) def update_transaction(conn, transaction_id, doc): + # with mongodb we need to add update operators to the doc + doc = {'$set': doc} return conn.db['backlog']\ - .find_one_and_update({'id': transaction_id}, - doc, - return_document=ReturnDocument.AFTER) + .find_one_and_update({'id': transaction_id}, + doc, + return_document=ReturnDocument.AFTER) @register_query(MongoDBConnection) @@ -38,10 +40,20 @@ def get_stale_transactions(conn, reassign_delay): @register_query(MongoDBConnection) -def get_transaction_from_block(conn, block_id, tx_id): - # this is definitely wrong, but it's something like this - return conn.db['bigchain'].find_one({'id': block_id, - 'block.transactions.id': tx_id}) +def get_transaction_from_block(conn, transaction_id, block_id): + return conn.db['bigchain'].aggregate([ + {'$match': {'id': block_id}}, + {'$project': { + 'block.transactions': { + '$filter': { + 'input': '$block.transactions', + 'as': 'transaction', + 'cond': { + '$eq': ['$$transaction.id', transaction_id] + } + } + } + }}]).next()['block']['transactions'][0] @register_query(MongoDBConnection) @@ -90,14 +102,16 @@ def get_owned_ids(conn, owner): @register_query(MongoDBConnection) def get_votes_by_block_id(conn, block_id): return conn.db['votes']\ - .find({'vote.voting_for_block': block_id}) + .find({'vote.voting_for_block': block_id}, + projection={'_id': False}) @register_query(MongoDBConnection) def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey): return conn.db['votes']\ .find({'vote.voting_for_block': block_id, - 'node_pubkey': node_pubkey}) + 'node_pubkey': node_pubkey}, + projection={'_id': False}) @register_query(MongoDBConnection) @@ -133,8 +147,9 @@ def write_vote(conn, vote): @register_query(MongoDBConnection) def get_genesis_block(conn): - return conn.db['bigchain'].find_one({'block.transactions.0.operation' == - 'GENESIS'}) + return conn.db['bigchain'].find_one({ + 'block.transactions.0.operation': 'GENESIS' + }) @register_query(MongoDBConnection) @@ -142,7 +157,10 @@ def get_last_voted_block(conn, node_pubkey): last_voted = conn.db['votes']\ .find({'node_pubkey': node_pubkey}, sort=[('vote.timestamp', -1)]) - if not last_voted: + + # pymongo seems to return a cursor even if there are no results + # so we actually need to check the count + if last_voted.count() == 0: return get_genesis_block(conn) mapping = {v['vote']['previous_block']: v['vote']['voting_for_block'] diff --git a/bigchaindb/backend/mongodb/schema.py b/bigchaindb/backend/mongodb/schema.py index 7eaff782..fed2d1e4 100644 --- a/bigchaindb/backend/mongodb/schema.py +++ b/bigchaindb/backend/mongodb/schema.py @@ -61,13 +61,12 @@ def create_bigchain_secondary_index(conn, dbname): # to query the bigchain for a transaction id, this field is unique conn.conn[dbname]['bigchain'].create_index('block.transactions.id', - name='transaction_id', - unique=True) + name='transaction_id') # secondary index for asset uuid, this field is unique conn.conn[dbname]['bigchain']\ .create_index('block.transactions.transaction.asset.id', - name='asset_id', unique=True) + name='asset_id') def create_backlog_secondary_index(conn, dbname): diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py index b560f8bb..010bb8e1 100644 --- a/bigchaindb/pipelines/stale.py +++ b/bigchaindb/pipelines/stale.py @@ -48,6 +48,8 @@ class StaleTransactionMonitor: Returns: transaction """ + # NOTE: Maybe this is to verbose? + logger.info('Reassigning transaction with id %s', tx['id']) self.bigchain.reassign_transaction(tx) return tx diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index dd138d41..2a9b0ee5 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -5,10 +5,10 @@ of actions to do on transactions is specified in the ``create_pipeline`` function. """ +import logging from collections import Counter from multipipes import Pipeline, Node -from bigchaindb.common import exceptions import bigchaindb from bigchaindb import Bigchain @@ -16,6 +16,10 @@ from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block +from bigchaindb.common import exceptions + + +logger = logging.getLogger(__name__) class Vote: @@ -132,7 +136,9 @@ class Vote: Args: vote: the vote to write. """ - + validity = 'valid' if vote['vote']['is_block_valid'] else 'invalid' + logger.info("Voting '%s' for block %s", validity, + vote['vote']['voting_for_block']) self.bigchain.write_vote(vote) return vote diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 6be54146..2d4c48e5 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -153,14 +153,14 @@ class TestBigchainApi(object): def test_get_transaction_in_invalid_and_valid_block(self, monkeypatch, b): from bigchaindb.models import Transaction - monkeypatch.setattr('time.time', lambda: 1) + monkeypatch.setattr('time.time', lambda: 1000000000) tx1 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx1 = tx1.sign([b.me_private]) block1 = b.create_block([tx1]) b.write_block(block1) - monkeypatch.setattr('time.time', lambda: 2222222222) + monkeypatch.setattr('time.time', lambda: 2000000000) tx2 = Transaction.create([b.me], [([b.me], 1)], metadata={'msg': random.random()}) tx2 = tx2.sign([b.me_private])