mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Remove dependency on external RethinkDB instance
This commit is contained in:
parent
65bc86f06e
commit
39228be454
@ -29,7 +29,7 @@ class ChangeFeed(Node):
|
|||||||
DELETE = 2
|
DELETE = 2
|
||||||
UPDATE = 4
|
UPDATE = 4
|
||||||
|
|
||||||
def __init__(self, table, operation, prefeed=None):
|
def __init__(self, table, operation, prefeed=None, bigchain=None):
|
||||||
"""Create a new RethinkDB ChangeFeed.
|
"""Create a new RethinkDB ChangeFeed.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -40,13 +40,14 @@ class ChangeFeed(Node):
|
|||||||
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
|
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
|
||||||
prefeed (iterable): whatever set of data you want to be published
|
prefeed (iterable): whatever set of data you want to be published
|
||||||
first.
|
first.
|
||||||
|
bigchain (``Bigchain``): the bigchain instance to use (can be None).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super().__init__(name='changefeed')
|
super().__init__(name='changefeed')
|
||||||
self.prefeed = prefeed if prefeed else []
|
self.prefeed = prefeed if prefeed else []
|
||||||
self.table = table
|
self.table = table
|
||||||
self.operation = operation
|
self.operation = operation
|
||||||
self.bigchain = Bigchain()
|
self.bigchain = bigchain or Bigchain()
|
||||||
|
|
||||||
def run_forever(self):
|
def run_forever(self):
|
||||||
for element in self.prefeed:
|
for element in self.prefeed:
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
from unittest.mock import patch
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -26,85 +27,81 @@ def test_raise_exception_when_max_tries():
|
|||||||
|
|
||||||
def test_reconnect_when_connection_lost():
|
def test_reconnect_when_connection_lost():
|
||||||
import time
|
import time
|
||||||
|
import rethinkdb as r
|
||||||
|
|
||||||
proc = None
|
def raise_exception(*args, **kwargs):
|
||||||
|
raise r.ReqlDriverError('mock')
|
||||||
|
|
||||||
|
conn = Connection()
|
||||||
|
original_connect = r.connect
|
||||||
|
r.connect = raise_exception
|
||||||
|
|
||||||
def delayed_start():
|
def delayed_start():
|
||||||
nonlocal proc
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
proc, _ = start_temp_rethinkdb(38015)
|
r.connect = original_connect
|
||||||
|
|
||||||
thread = Thread(target=delayed_start)
|
thread = Thread(target=delayed_start)
|
||||||
conn = Connection(port=38015)
|
|
||||||
query = r.expr('1')
|
query = r.expr('1')
|
||||||
thread.start()
|
thread.start()
|
||||||
assert conn.run(query) == '1'
|
assert conn.run(query) == '1'
|
||||||
proc.terminate()
|
|
||||||
proc.wait()
|
|
||||||
|
|
||||||
|
|
||||||
def test_changefeed_reconnects_when_connection_lost(monkeypatch):
|
def test_changefeed_reconnects_when_connection_lost(monkeypatch):
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
import tempfile
|
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
|
|
||||||
import bigchaindb
|
from bigchaindb import Bigchain
|
||||||
from bigchaindb.pipelines.utils import ChangeFeed
|
from bigchaindb.pipelines.utils import ChangeFeed
|
||||||
|
|
||||||
dbport = 38015
|
class MockConnection:
|
||||||
dbname = 'test_' + str(os.getpid())
|
tries = 0
|
||||||
directory = tempfile.mkdtemp()
|
|
||||||
|
|
||||||
monkeypatch.setitem(bigchaindb.config, 'database', {
|
def run(self, *args, **kwargs):
|
||||||
'host': 'localhost',
|
return self
|
||||||
'port': dbport,
|
|
||||||
'name': dbname
|
|
||||||
})
|
|
||||||
|
|
||||||
proc, _ = start_temp_rethinkdb(dbport, directory=directory)
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
# prepare DB and table
|
def __next__(self):
|
||||||
conn = r.connect(port=dbport)
|
self.tries += 1
|
||||||
r.db_create(dbname).run(conn)
|
if self.tries == 1:
|
||||||
r.db(dbname).table_create('cat_facts').run(conn)
|
raise r.ReqlDriverError('mock')
|
||||||
|
elif self.tries == 2:
|
||||||
|
return { 'new_val': { 'fact': 'A group of cats is called a clowder.' },
|
||||||
|
'old_val': None }
|
||||||
|
if self.tries == 3:
|
||||||
|
raise r.ReqlDriverError('mock')
|
||||||
|
elif self.tries == 4:
|
||||||
|
return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' },
|
||||||
|
'old_val': None }
|
||||||
|
else:
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
# initialize ChangeFeed and put it in a thread
|
|
||||||
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT)
|
bigchain = Bigchain()
|
||||||
|
bigchain.connection = MockConnection()
|
||||||
|
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
|
||||||
|
bigchain=bigchain)
|
||||||
changefeed.outqueue = mp.Queue()
|
changefeed.outqueue = mp.Queue()
|
||||||
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)
|
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)
|
||||||
|
|
||||||
t_changefeed.start()
|
t_changefeed.start()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
# try 1: MockConnection raises an error that will stop the
|
||||||
|
# ChangeFeed instance from iterating for 1 second.
|
||||||
|
|
||||||
# insert some records in the table to start generating
|
# try 2: MockConnection releases a new record. The new record
|
||||||
# events that changefeed will put in `outqueue`
|
# will be put in the outqueue of the ChangeFeed instance.
|
||||||
r.db(dbname).table('cat_facts').insert({
|
|
||||||
'fact': 'A group of cats is called a clowder.'
|
|
||||||
}).run(conn)
|
|
||||||
|
|
||||||
# the event should be in the outqueue
|
|
||||||
fact = changefeed.outqueue.get()['fact']
|
fact = changefeed.outqueue.get()['fact']
|
||||||
assert fact == 'A group of cats is called a clowder.'
|
assert fact == 'A group of cats is called a clowder.'
|
||||||
|
|
||||||
# stop the DB process
|
# try 3: MockConnection raises an error that will stop the
|
||||||
proc.terminate()
|
# ChangeFeed instance from iterating for 1 second.
|
||||||
proc.wait()
|
|
||||||
|
|
||||||
assert t_changefeed.is_alive() is True
|
assert t_changefeed.is_alive() is True
|
||||||
|
|
||||||
proc, _ = start_temp_rethinkdb(dbport, directory=directory)
|
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
# try 4: MockConnection releases a new record. The new record
|
||||||
conn = r.connect(port=dbport)
|
# will be put in the outqueue of the ChangeFeed instance.
|
||||||
r.db(dbname).table('cat_facts').insert({
|
|
||||||
'fact': 'Cats sleep 70% of their lives.'
|
|
||||||
}).run(conn)
|
|
||||||
|
|
||||||
fact = changefeed.outqueue.get()['fact']
|
fact = changefeed.outqueue.get()['fact']
|
||||||
assert fact == 'Cats sleep 70% of their lives.'
|
assert fact == 'Cats sleep 70% of their lives.'
|
||||||
|
|
||||||
# stop the DB process
|
|
||||||
proc.terminate()
|
|
||||||
proc.wait()
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user