rethinkdb changefeed update

This commit is contained in:
Scott Sadler 2017-04-13 10:53:13 +02:00
parent 3bb5973f60
commit 8f55febefb
6 changed files with 35 additions and 58 deletions

View File

@ -15,7 +15,7 @@ register_changefeed = module_dispatch_registrar(backend.changefeed)
class MongoDBChangeFeed(ChangeFeed):
"""This class implements a MongoDB changefeed.
"""This class implements a MongoDB changefeed as a multipipes Node.
We emulate the behaviour of the RethinkDB changefeed by using a tailable
cursor that listens for events on the oplog.
@ -65,8 +65,7 @@ class MongoDBChangeFeed(ChangeFeed):
@register_changefeed(MongoDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None,
get_resumption_pointer=None):
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a MongoDB changefeed.
Returns:
@ -75,8 +74,7 @@ def get_changefeed(connection, table, operation, *, prefeed=None,
"""
return MongoDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection,
get_resumption_pointer=get_resumption_pointer)
connection=connection)
_FEED_STOP = False

View File

@ -14,22 +14,13 @@ register_changefeed = module_dispatch_registrar(backend.changefeed)
class RethinkDBChangeFeed(ChangeFeed):
"""This class wraps a RethinkDB changefeed."""
"""This class wraps a RethinkDB changefeed as a multipipes Node."""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (BackendError, r.ReqlDriverError) as exc:
logger.exception('Error connecting to the database, retrying')
time.sleep(1)
def run_changefeed(self):
for change in self.connection.run(r.table(self.table).changes()):
for change in run_changefeed(self.connection, self.table):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
@ -42,6 +33,19 @@ class RethinkDBChangeFeed(ChangeFeed):
self.outqueue.put(change['new_val'])
def run_changefeed(connection, table):
"""Encapsulate operational logic of tailing changefeed from RethinkDB
"""
while True:
try:
for change in connection.run(r.table(table).changes()):
yield change
break
except (BackendError, r.ReqlDriverError) as exc:
logger.exception('Error connecting to the database, retrying')
time.sleep(1)
@register_changefeed(RethinkDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a RethinkDB changefeed.

View File

@ -1,15 +1,20 @@
from itertools import chain
import logging as logger
from time import time
import rethinkdb as r
from bigchaindb import backend, utils
from bigchaindb.backend.rethinkdb import changefeed
from bigchaindb.common import exceptions
from bigchaindb.common.transaction import Transaction
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logger.getLogger(__name__)
READ_MODE = 'majority'
WRITE_DURABILITY = 'hard'
@ -238,3 +243,12 @@ def get_last_voted_block(connection, node_pubkey):
return connection.run(
r.table('bigchain', read_mode=READ_MODE)
.get(last_block_id))
@register_query(RethinkDBConnection)
def get_new_blocks_feed(connection, start_block_id):
logger.warning('RethinkDB changefeed unable to resume from given block: %s',
start_block_id)
# In order to get blocks in the correct order, it may be acceptable to
# look in the votes table to see what order other nodes have used.
return changefeed.run_changefeed(connection, 'bigchain')

View File

@ -167,8 +167,8 @@ def get_changefeed():
"""Create and return ordered changefeed of blocks starting from
last voted block"""
b = Bigchain()
last_voted = b.get_last_voted_block().id
feed = backend.query.get_new_blocks_feed(b.connection, last_voted)
last_block_id = b.get_last_voted_block().id
feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
return Node(feed.__next__)

View File

@ -426,5 +426,5 @@ def test_get_new_blocks_feed(b, create_tx):
assert feed.__next__() == b2
b3 = create_block()
assert list(feed) == [b3]

View File

@ -509,45 +509,6 @@ def test_invalid_block_voting(monkeypatch, b, user_pk, genesis_block):
vote_doc['signature']) is True
@pytest.mark.genesis
def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b):
from bigchaindb.backend import query
from bigchaindb.pipelines import vote
outpipe = Pipe()
block_ids = []
block_1 = dummy_block(b)
block_1.timestamp = str(random.random())
block_ids.append(block_1.id)
b.write_block(block_1)
block_2 = dummy_block(b)
block_2.timestamp = str(random.random())
block_ids.append(block_2.id)
b.write_block(block_2)
vote_pipeline = vote.create_pipeline()
vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe)
vote_pipeline.start()
# We expects two votes, so instead of waiting an arbitrary amount
# of time, we can do two blocking calls to `get`
outpipe.get()
outpipe.get()
vote_pipeline.terminate()
# retrive blocks from bigchain
blocks = [b.get_block(_id) for _id in block_ids]
# retrieve votes
votes = [list(query.get_votes_by_block_id(b.connection, _id))[0]
for _id in block_ids]
assert ({v['vote']['voting_for_block'] for v in votes} ==
{block['id'] for block in blocks})
@pytest.mark.genesis
def test_voter_checks_for_previous_vote(monkeypatch, b):
from bigchaindb.backend import query