Delete outdated pipelines/utils.py

Reorganized and fixed tests.
This commit is contained in:
Rodolphe Marques 2016-12-09 14:39:48 +01:00
parent 134f9e85a0
commit e303e355db
4 changed files with 100 additions and 161 deletions

View File

@ -1,75 +0,0 @@
"""Utility classes and functions to work with the pipelines."""
import time
import rethinkdb as r
import logging
from multipipes import Node
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a ``prefeed``.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, prefeed=None, bigchain=None):
"""Create a new RethinkDB ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
bigchain (``Bigchain``): the bigchain instance to use (can be None).
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = bigchain or Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])

View File

@ -0,0 +1,96 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.backend import connect
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_connection(mock_changefeed_data):
connection = connect(**bigchaindb.config['database'])
connection.run = Mock(return_value=mock_changefeed_data)
return connection
def test_changefeed_insert(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.INSERT)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.DELETE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT, prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -1,80 +0,0 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
from bigchaindb import Bigchain
from bigchaindb.backend.connection import Connection
from bigchaindb.pipelines.utils import ChangeFeed
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_bigchain(mock_changefeed_data):
connection = Connection(host=None, port=None, dbname=None)
connection.run = Mock(return_value=mock_changefeed_data)
return Bigchain(connection=connection)
def test_changefeed_insert(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE,
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT,
prefeed=[1, 2, 3],
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
import time
import multiprocessing as mp
from bigchaindb import Bigchain
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed
class MockConnection:
tries = 0
@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
else:
time.sleep(10)
bigchain = Bigchain()
bigchain.connection = MockConnection()
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
bigchain=bigchain)
changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT,
connection=MockConnection())
changefeed.outqueue = mp.Queue()
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)