mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
use MongoDBConnection.run() and fix changefeed reconnect
This commit is contained in:
parent
8f55febefb
commit
99b4777766
@ -89,13 +89,14 @@ def run_changefeed(conn, table, last_ts):
|
||||
try:
|
||||
# XXX: hack to force reconnection, in case the connection
|
||||
# is lost while waiting on the cursor. See #1154.
|
||||
conn.connection = 1
|
||||
conn._conn = None
|
||||
namespace = conn.dbname + '.' + table
|
||||
cursor = conn.conn.local.oplog.rs.find(
|
||||
query = conn.query().local.oplog.rs.find(
|
||||
{'ns': namespace, 'ts': {'$gt': last_ts}},
|
||||
{'o._id': False},
|
||||
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
|
||||
)
|
||||
cursor = conn.run(query)
|
||||
logging.debug('Tailing oplog at %s/%s', namespace, last_ts)
|
||||
while cursor.alive:
|
||||
try:
|
||||
|
@ -286,8 +286,9 @@ def get_last_voted_block(conn, node_pubkey):
|
||||
@register_query(MongoDBConnection)
|
||||
def get_new_blocks_feed(conn, start_block_id):
|
||||
namespace = conn.dbname + '.bigchain'
|
||||
query = {'o.id': start_block_id, 'op': 'i', 'ns': namespace}
|
||||
match = {'o.id': start_block_id, 'op': 'i', 'ns': namespace}
|
||||
# Neccesary to find in descending order since tests may write same block id several times
|
||||
last_ts = conn.conn.local.oplog.rs.find(query).sort('$natural', -1).next()['ts']
|
||||
query = conn.query().local.oplog.rs.find(match).sort('$natural', -1).next()['ts']
|
||||
last_ts = conn.run(query)
|
||||
feed = run_changefeed(conn, 'bigchain', last_ts)
|
||||
return (evt['o'] for evt in feed if evt['op'] == 'i')
|
||||
|
@ -142,8 +142,7 @@ def test_connection_failure():
|
||||
from bigchaindb.backend.mongodb.changefeed import run_changefeed
|
||||
|
||||
conn = mock.MagicMock()
|
||||
find = conn.conn.local.oplog.rs.find
|
||||
find.side_effect = [ConnectionError(), RuntimeError()]
|
||||
conn.run.side_effect = [ConnectionError(), RuntimeError()]
|
||||
changefeed = run_changefeed(conn, 'backlog', -1)
|
||||
with pytest.raises(RuntimeError):
|
||||
for record in changefeed:
|
||||
|
Loading…
x
Reference in New Issue
Block a user