Merge pull request #939 from bigchaindb/feat/905/abstract-db-changefeed

Feat/905/abstract db changefeed
This commit is contained in:
Rodolphe Marques 2016-12-14 15:09:46 +01:00 committed by GitHub
commit 588d6eb177
14 changed files with 297 additions and 189 deletions

View File

@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable.
from bigchaindb.backend import changefeed, schema, query # noqa
from bigchaindb.backend.connection import connect # noqa
from bigchaindb.backend.changefeed import get_changefeed # noqa

View File

@ -1 +1,90 @@
"""Changefeed interfaces for backends."""
from functools import singledispatch
from multipipes import Node
import bigchaindb
class ChangeFeed(Node):
"""Create a new changefeed.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, *, prefeed=None, connection=None):
"""Create a new ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operations is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (:class:`~collections.abc.Iterable`, optional): whatever
set of data you want to be published first.
connection (:class:`~bigchaindb.backend.connection.Connection`, optional): # noqa
A connection to the database. If no connection is provided a
default connection will be created.
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
if connection:
self.connection = connection
else:
self.connection = bigchaindb.backend.connect(
**bigchaindb.config['database'])
def run_forever(self):
"""Main loop of the ``multipipes.Node``
This method is responsible for first feeding the prefeed to the
outqueue and after that starting the changefeed and recovering from any
errors that may occur in the backend.
"""
raise NotImplementedError
def run_changefeed(self):
"""Backend specific method to run the changefeed.
The changefeed is usually a backend cursor that is not closed when all
the results are exausted. Instead it remains open waiting for new
results.
This method should also filter each result based on the ``operation``
and put all matching results on the outqueue of ``multipipes.Node``.
"""
raise NotImplementedError
@singledispatch
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a ChangeFeed.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
"""
raise NotImplementedError

View File

@ -0,0 +1,54 @@
import time
import logging
import rethinkdb as r
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logging.getLogger(__name__)
register_changefeed = module_dispatch_registrar(backend.changefeed)
class RethinkDBChangeFeed(ChangeFeed):
"""This class wraps a RethinkDB changefeed."""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])
@register_changefeed(RethinkDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a RethinkDB changefeed.
Returns:
An instance of
:class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`.
"""
return RethinkDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)

View File

@ -9,8 +9,10 @@ import logging
from multipipes import Pipeline, Node, Pipe
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Transaction
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
@ -115,7 +117,8 @@ class BlockPipeline:
Returns:
:class:`~bigchaindb.models.Block`: The Block.
"""
logger.info('Write new block %s with %s transactions', block.id, len(block.transactions))
logger.info('Write new block %s with %s transactions',
block.id, len(block.transactions))
self.bigchain.write_block(block)
return block
@ -133,12 +136,6 @@ class BlockPipeline:
return block
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
@ -157,6 +154,12 @@ def create_pipeline():
return pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline()

View File

@ -8,7 +8,9 @@ import logging
from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
import bigchaindb
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.models import Block
from bigchaindb import Bigchain
@ -50,10 +52,6 @@ class Election:
return invalid_block
def get_changefeed():
return ChangeFeed(table='votes', operation=ChangeFeed.INSERT)
def create_pipeline():
election = Election()
@ -65,6 +63,11 @@ def create_pipeline():
return election_pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())

View File

@ -1,75 +0,0 @@
"""Utility classes and functions to work with the pipelines."""
import time
import rethinkdb as r
import logging
from multipipes import Node
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a ``prefeed``.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, prefeed=None, bigchain=None):
"""Create a new RethinkDB ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
bigchain (``Bigchain``): the bigchain instance to use (can be None).
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = bigchain or Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])

View File

@ -10,10 +10,12 @@ from collections import Counter
from multipipes import Pipeline, Node
from bigchaindb.common import exceptions
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.consensus import BaseConsensusRules
from bigchaindb.models import Transaction, Block
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
class Vote:
@ -142,12 +144,6 @@ def initial():
return rs
def get_changefeed():
"""Create and return the changefeed for the bigchain table."""
return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial())
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
@ -165,6 +161,12 @@ def create_pipeline():
return vote_pipeline
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT,
prefeed=initial())
def start():
"""Create, start, and return the block pipeline."""

View File

@ -24,9 +24,3 @@ Stale Transaction Monitoring
============================
.. automodule:: bigchaindb.pipelines.stale
Utilities
=========
.. automodule:: bigchaindb.pipelines.utils

View File

@ -0,0 +1,96 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.backend import connect
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_connection(mock_changefeed_data):
connection = connect(**bigchaindb.config['database'])
connection.run = Mock(return_value=mock_changefeed_data)
return connection
def test_changefeed_insert(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.INSERT)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.DELETE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection,
'backlog', ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_connection):
from bigchaindb.backend import get_changefeed
from bigchaindb.backend.changefeed import ChangeFeed
outpipe = Pipe()
changefeed = get_changefeed(mock_changefeed_connection, 'backlog',
ChangeFeed.INSERT, prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -43,3 +43,24 @@ def test_query(query_func_name, args_qty):
query_func = getattr(query, query_func_name)
with raises(NotImplementedError):
query_func(None, *range(args_qty))
@mark.parametrize('changefeed_func_name,args_qty', (
('get_changefeed', 2),
))
def test_changefeed(changefeed_func_name, args_qty):
from bigchaindb.backend import changefeed
changefeed_func = getattr(changefeed, changefeed_func_name)
with raises(NotImplementedError):
changefeed_func(None, *range(args_qty))
@mark.parametrize('changefeed_class_func_name,args_qty', (
('run_forever', 0),
('run_changefeed', 0),
))
def test_changefeed_class(changefeed_class_func_name, args_qty):
from bigchaindb.backend.changefeed import ChangeFeed
changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name)
with raises(NotImplementedError):
changefeed_class_func(None, *range(args_qty))

View File

@ -95,7 +95,8 @@ def test_duplicate_transaction(b, user_pk):
# verify tx is in the backlog
assert b.get_transaction(txs[0].id) is not None
# try to validate a transaction that's already in the chain; should not work
# try to validate a transaction that's already in the chain; should not
# work
assert block_maker.validate_tx(txs[0].to_dict()) is None
# duplicate tx should be removed from backlog

View File

@ -166,6 +166,7 @@ def test_full_pipeline(b, user_pk):
pipeline.setup(indata=election.get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(1)
# vote one block valid, one invalid
vote_valid = b.vote(valid_block.id, 'b' * 64, True)
vote_invalid = b.vote(invalid_block.id, 'c' * 64, False)

View File

@ -1,80 +0,0 @@
import pytest
from unittest.mock import Mock
from multipipes import Pipe
from bigchaindb import Bigchain
from bigchaindb.backend.connection import Connection
from bigchaindb.pipelines.utils import ChangeFeed
@pytest.fixture
def mock_changefeed_data():
return [
{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}
]
@pytest.fixture
def mock_changefeed_bigchain(mock_changefeed_data):
connection = Connection(host=None, port=None, dbname=None)
connection.run = Mock(return_value=mock_changefeed_data)
return Bigchain(connection=connection)
def test_changefeed_insert(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.qsize() == 0
def test_changefeed_delete(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
assert outpipe.qsize() == 0
def test_changefeed_update(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_multiple_operations(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE,
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
assert outpipe.get() == 'seems like we have an update here'
assert outpipe.qsize() == 0
def test_changefeed_prefeed(mock_changefeed_bigchain):
outpipe = Pipe()
changefeed = ChangeFeed('backlog',
ChangeFeed.INSERT,
prefeed=[1, 2, 3],
bigchain=mock_changefeed_bigchain)
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4

View File

@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
import time
import multiprocessing as mp
from bigchaindb import Bigchain
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed
class MockConnection:
tries = 0
@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch):
else:
time.sleep(10)
bigchain = Bigchain()
bigchain.connection = MockConnection()
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
bigchain=bigchain)
changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT,
connection=MockConnection())
changefeed.outqueue = mp.Queue()
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)