diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 59d4e721..59d53b72 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -15,7 +15,7 @@ register_changefeed = module_dispatch_registrar(backend.changefeed) class MongoDBChangeFeed(ChangeFeed): - """This class implements a MongoDB changefeed. + """This class implements a MongoDB changefeed as a multipipes Node. We emulate the behaviour of the RethinkDB changefeed by using a tailable cursor that listens for events on the oplog. @@ -65,8 +65,7 @@ class MongoDBChangeFeed(ChangeFeed): @register_changefeed(MongoDBConnection) -def get_changefeed(connection, table, operation, *, prefeed=None, - get_resumption_pointer=None): +def get_changefeed(connection, table, operation, *, prefeed=None): """Return a MongoDB changefeed. Returns: @@ -75,8 +74,7 @@ def get_changefeed(connection, table, operation, *, prefeed=None, """ return MongoDBChangeFeed(table, operation, prefeed=prefeed, - connection=connection, - get_resumption_pointer=get_resumption_pointer) + connection=connection) _FEED_STOP = False diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index 390ada9a..d0cfd5bb 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -14,22 +14,13 @@ register_changefeed = module_dispatch_registrar(backend.changefeed) class RethinkDBChangeFeed(ChangeFeed): - """This class wraps a RethinkDB changefeed.""" + """This class wraps a RethinkDB changefeed as a multipipes Node.""" def run_forever(self): for element in self.prefeed: self.outqueue.put(element) - while True: - try: - self.run_changefeed() - break - except (BackendError, r.ReqlDriverError) as exc: - logger.exception('Error connecting to the database, retrying') - time.sleep(1) - - def run_changefeed(self): - for change in self.connection.run(r.table(self.table).changes()): + for change in run_changefeed(self.connection, self.table): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete @@ -42,6 +33,19 @@ class RethinkDBChangeFeed(ChangeFeed): self.outqueue.put(change['new_val']) +def run_changefeed(connection, table): + """Encapsulate operational logic of tailing changefeed from RethinkDB + """ + while True: + try: + for change in connection.run(r.table(table).changes()): + yield change + break + except (BackendError, r.ReqlDriverError) as exc: + logger.exception('Error connecting to the database, retrying') + time.sleep(1) + + @register_changefeed(RethinkDBConnection) def get_changefeed(connection, table, operation, *, prefeed=None): """Return a RethinkDB changefeed. diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index deb02244..cc6bb96f 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -1,15 +1,20 @@ from itertools import chain +import logging as logger from time import time import rethinkdb as r from bigchaindb import backend, utils +from bigchaindb.backend.rethinkdb import changefeed from bigchaindb.common import exceptions from bigchaindb.common.transaction import Transaction from bigchaindb.backend.utils import module_dispatch_registrar from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection +logger = logger.getLogger(__name__) + + READ_MODE = 'majority' WRITE_DURABILITY = 'hard' @@ -238,3 +243,12 @@ def get_last_voted_block(connection, node_pubkey): return connection.run( r.table('bigchain', read_mode=READ_MODE) .get(last_block_id)) + + +@register_query(RethinkDBConnection) +def get_new_blocks_feed(connection, start_block_id): + logger.warning('RethinkDB changefeed unable to resume from given block: %s', + start_block_id) + # In order to get blocks in the correct order, it may be acceptable to + # look in the votes table to see what order other nodes have used. + return changefeed.run_changefeed(connection, 'bigchain') diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index ee5c3ada..5140a793 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -167,8 +167,8 @@ def get_changefeed(): """Create and return ordered changefeed of blocks starting from last voted block""" b = Bigchain() - last_voted = b.get_last_voted_block().id - feed = backend.query.get_new_blocks_feed(b.connection, last_voted) + last_block_id = b.get_last_voted_block().id + feed = backend.query.get_new_blocks_feed(b.connection, last_block_id) return Node(feed.__next__) diff --git a/tests/backend/mongodb/test_queries.py b/tests/backend/mongodb/test_queries.py index f58ba464..7df1022a 100644 --- a/tests/backend/mongodb/test_queries.py +++ b/tests/backend/mongodb/test_queries.py @@ -426,5 +426,5 @@ def test_get_new_blocks_feed(b, create_tx): assert feed.__next__() == b2 b3 = create_block() - + assert list(feed) == [b3] diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 12f84592..bc8903cb 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -509,45 +509,6 @@ def test_invalid_block_voting(monkeypatch, b, user_pk, genesis_block): vote_doc['signature']) is True -@pytest.mark.genesis -def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): - from bigchaindb.backend import query - from bigchaindb.pipelines import vote - - outpipe = Pipe() - - block_ids = [] - block_1 = dummy_block(b) - block_1.timestamp = str(random.random()) - block_ids.append(block_1.id) - b.write_block(block_1) - - block_2 = dummy_block(b) - block_2.timestamp = str(random.random()) - block_ids.append(block_2.id) - b.write_block(block_2) - - vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) - vote_pipeline.start() - - # We expects two votes, so instead of waiting an arbitrary amount - # of time, we can do two blocking calls to `get` - outpipe.get() - outpipe.get() - vote_pipeline.terminate() - - # retrive blocks from bigchain - blocks = [b.get_block(_id) for _id in block_ids] - - # retrieve votes - votes = [list(query.get_votes_by_block_id(b.connection, _id))[0] - for _id in block_ids] - - assert ({v['vote']['voting_for_block'] for v in votes} == - {block['id'] for block in blocks}) - - @pytest.mark.genesis def test_voter_checks_for_previous_vote(monkeypatch, b): from bigchaindb.backend import query