diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py index 7e37b5d2..c1deaa92 100644 --- a/bigchaindb/backend/__init__.py +++ b/bigchaindb/backend/__init__.py @@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable. from bigchaindb.backend import changefeed, schema, query # noqa from bigchaindb.backend.connection import connect # noqa +from bigchaindb.backend.changefeed import get_changefeed # noqa diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index ccba02d3..8012bf62 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -1 +1,90 @@ """Changefeed interfaces for backends.""" + +from functools import singledispatch + +from multipipes import Node + +import bigchaindb + + +class ChangeFeed(Node): + """Create a new changefeed. + + It extends :class:`multipipes.Node` to make it pluggable in other + Pipelines instances, and makes usage of ``self.outqueue`` to output + the data. + + A changefeed is a real time feed on inserts, updates, and deletes, and + is volatile. This class is a helper to create changefeeds. Moreover, + it provides a way to specify a ``prefeed`` of iterable data to output + before the actual changefeed. + """ + + INSERT = 1 + DELETE = 2 + UPDATE = 4 + + def __init__(self, table, operation, *, prefeed=None, connection=None): + """Create a new ChangeFeed. + + Args: + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operations is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (:class:`~collections.abc.Iterable`, optional): whatever + set of data you want to be published first. + connection (:class:`~bigchaindb.backend.connection.Connection`, optional): # noqa + A connection to the database. If no connection is provided a + default connection will be created. + """ + + super().__init__(name='changefeed') + self.prefeed = prefeed if prefeed else [] + self.table = table + self.operation = operation + if connection: + self.connection = connection + else: + self.connection = bigchaindb.backend.connect( + **bigchaindb.config['database']) + + def run_forever(self): + """Main loop of the ``multipipes.Node`` + + This method is responsible for first feeding the prefeed to the + outqueue and after that starting the changefeed and recovering from any + errors that may occur in the backend. + """ + raise NotImplementedError + + def run_changefeed(self): + """Backend specific method to run the changefeed. + + The changefeed is usually a backend cursor that is not closed when all + the results are exausted. Instead it remains open waiting for new + results. + + This method should also filter each result based on the ``operation`` + and put all matching results on the outqueue of ``multipipes.Node``. + """ + raise NotImplementedError + + +@singledispatch +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a ChangeFeed. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + """ + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index e69de29b..e762d905 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -0,0 +1,54 @@ +import time +import logging +import rethinkdb as r + +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + +logger = logging.getLogger(__name__) +register_changefeed = module_dispatch_registrar(backend.changefeed) + + +class RethinkDBChangeFeed(ChangeFeed): + """This class wraps a RethinkDB changefeed.""" + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + while True: + try: + self.run_changefeed() + break + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + + def run_changefeed(self): + for change in self.connection.run(r.table(self.table).changes()): + is_insert = change['old_val'] is None + is_delete = change['new_val'] is None + is_update = not is_insert and not is_delete + + if is_insert and (self.operation & ChangeFeed.INSERT): + self.outqueue.put(change['new_val']) + elif is_delete and (self.operation & ChangeFeed.DELETE): + self.outqueue.put(change['old_val']) + elif is_update and (self.operation & ChangeFeed.UPDATE): + self.outqueue.put(change['new_val']) + + +@register_changefeed(RethinkDBConnection) +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a RethinkDB changefeed. + + Returns: + An instance of + :class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`. + """ + + return RethinkDBChangeFeed(table, operation, prefeed=prefeed, + connection=connection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index ce31eb36..d5c81741 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -9,8 +9,10 @@ import logging from multipipes import Pipeline, Node, Pipe +import bigchaindb +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Transaction -from bigchaindb.pipelines.utils import ChangeFeed from bigchaindb import Bigchain @@ -115,7 +117,8 @@ class BlockPipeline: Returns: :class:`~bigchaindb.models.Block`: The Block. """ - logger.info('Write new block %s with %s transactions', block.id, len(block.transactions)) + logger.info('Write new block %s with %s transactions', + block.id, len(block.transactions)) self.bigchain.write_block(block) return block @@ -133,12 +136,6 @@ class BlockPipeline: return block -def get_changefeed(): - """Create and return the changefeed for the backlog.""" - - return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -157,6 +154,12 @@ def create_pipeline(): return pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE) + + def start(): """Create, start, and return the block pipeline.""" pipeline = create_pipeline() diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 7140761a..850613a3 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -8,7 +8,9 @@ import logging from multipipes import Pipeline, Node -from bigchaindb.pipelines.utils import ChangeFeed +import bigchaindb +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain @@ -50,10 +52,6 @@ class Election: return invalid_block -def get_changefeed(): - return ChangeFeed(table='votes', operation=ChangeFeed.INSERT) - - def create_pipeline(): election = Election() @@ -65,6 +63,11 @@ def create_pipeline(): return election_pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) + + def start(): pipeline = create_pipeline() pipeline.setup(indata=get_changefeed()) diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py deleted file mode 100644 index 71f740ee..00000000 --- a/bigchaindb/pipelines/utils.py +++ /dev/null @@ -1,75 +0,0 @@ -"""Utility classes and functions to work with the pipelines.""" - - -import time -import rethinkdb as r -import logging -from multipipes import Node - -from bigchaindb import Bigchain - - -logger = logging.getLogger(__name__) - - -class ChangeFeed(Node): - """This class wraps a RethinkDB changefeed adding a ``prefeed``. - - It extends :class:`multipipes.Node` to make it pluggable in other - Pipelines instances, and makes usage of ``self.outqueue`` to output - the data. - - A changefeed is a real time feed on inserts, updates, and deletes, and - is volatile. This class is a helper to create changefeeds. Moreover, - it provides a way to specify a ``prefeed`` of iterable data to output - before the actual changefeed. - """ - - INSERT = 1 - DELETE = 2 - UPDATE = 4 - - def __init__(self, table, operation, prefeed=None, bigchain=None): - """Create a new RethinkDB ChangeFeed. - - Args: - table (str): name of the table to listen to for changes. - operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. Combining multiple operation is possible - with the bitwise ``|`` operator - (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (iterable): whatever set of data you want to be published - first. - bigchain (``Bigchain``): the bigchain instance to use (can be None). - """ - - super().__init__(name='changefeed') - self.prefeed = prefeed if prefeed else [] - self.table = table - self.operation = operation - self.bigchain = bigchain or Bigchain() - - def run_forever(self): - for element in self.prefeed: - self.outqueue.put(element) - - while True: - try: - self.run_changefeed() - break - except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: - logger.exception(exc) - time.sleep(1) - - def run_changefeed(self): - for change in self.bigchain.connection.run(r.table(self.table).changes()): - is_insert = change['old_val'] is None - is_delete = change['new_val'] is None - is_update = not is_insert and not is_delete - - if is_insert and (self.operation & ChangeFeed.INSERT): - self.outqueue.put(change['new_val']) - elif is_delete and (self.operation & ChangeFeed.DELETE): - self.outqueue.put(change['old_val']) - elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change['new_val']) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index b89e0786..dd138d41 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -10,10 +10,12 @@ from collections import Counter from multipipes import Pipeline, Node from bigchaindb.common import exceptions +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block -from bigchaindb.pipelines.utils import ChangeFeed -from bigchaindb import Bigchain class Vote: @@ -142,12 +144,6 @@ def initial(): return rs -def get_changefeed(): - """Create and return the changefeed for the bigchain table.""" - - return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial()) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -165,6 +161,12 @@ def create_pipeline(): return vote_pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=initial()) + + def start(): """Create, start, and return the block pipeline.""" diff --git a/docs/server/source/appendices/pipelines.rst b/docs/server/source/appendices/pipelines.rst index 98b993a1..8016e8cb 100644 --- a/docs/server/source/appendices/pipelines.rst +++ b/docs/server/source/appendices/pipelines.rst @@ -24,9 +24,3 @@ Stale Transaction Monitoring ============================ .. automodule:: bigchaindb.pipelines.stale - - -Utilities -========= - -.. automodule:: bigchaindb.pipelines.utils diff --git a/tests/backend/test_changefeed.py b/tests/backend/test_changefeed.py new file mode 100644 index 00000000..d9a8f45b --- /dev/null +++ b/tests/backend/test_changefeed.py @@ -0,0 +1,96 @@ +import pytest +from unittest.mock import Mock + +from multipipes import Pipe + +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb.backend import connect + + +@pytest.fixture +def mock_changefeed_data(): + return [ + { + 'new_val': 'seems like we have an insert here', + 'old_val': None, + }, { + 'new_val': None, + 'old_val': 'seems like we have a delete here', + }, { + 'new_val': 'seems like we have an update here', + 'old_val': 'seems like we have an update here', + } + ] + + +@pytest.fixture +def mock_changefeed_connection(mock_changefeed_data): + connection = connect(**bigchaindb.config['database']) + connection.run = Mock(return_value=mock_changefeed_data) + return connection + + +def test_changefeed_insert(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.INSERT) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.qsize() == 0 + + +def test_changefeed_delete(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.DELETE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have a delete here' + assert outpipe.qsize() == 0 + + +def test_changefeed_update(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_multiple_operations(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_prefeed(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT, prefeed=[1, 2, 3]) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.qsize() == 4 diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index b11d534a..f9a76e6d 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -43,3 +43,24 @@ def test_query(query_func_name, args_qty): query_func = getattr(query, query_func_name) with raises(NotImplementedError): query_func(None, *range(args_qty)) + + +@mark.parametrize('changefeed_func_name,args_qty', ( + ('get_changefeed', 2), +)) +def test_changefeed(changefeed_func_name, args_qty): + from bigchaindb.backend import changefeed + changefeed_func = getattr(changefeed, changefeed_func_name) + with raises(NotImplementedError): + changefeed_func(None, *range(args_qty)) + + +@mark.parametrize('changefeed_class_func_name,args_qty', ( + ('run_forever', 0), + ('run_changefeed', 0), +)) +def test_changefeed_class(changefeed_class_func_name, args_qty): + from bigchaindb.backend.changefeed import ChangeFeed + changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name) + with raises(NotImplementedError): + changefeed_class_func(None, *range(args_qty)) diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 544345e1..4d3ec13f 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -95,7 +95,8 @@ def test_duplicate_transaction(b, user_pk): # verify tx is in the backlog assert b.get_transaction(txs[0].id) is not None - # try to validate a transaction that's already in the chain; should not work + # try to validate a transaction that's already in the chain; should not + # work assert block_maker.validate_tx(txs[0].to_dict()) is None # duplicate tx should be removed from backlog diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 5efe3d46..db8ba132 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -166,6 +166,7 @@ def test_full_pipeline(b, user_pk): pipeline.setup(indata=election.get_changefeed(), outdata=outpipe) pipeline.start() time.sleep(1) + # vote one block valid, one invalid vote_valid = b.vote(valid_block.id, 'b' * 64, True) vote_invalid = b.vote(invalid_block.id, 'c' * 64, False) diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py deleted file mode 100644 index ebe0db75..00000000 --- a/tests/pipelines/test_utils.py +++ /dev/null @@ -1,80 +0,0 @@ -import pytest -from unittest.mock import Mock - -from multipipes import Pipe -from bigchaindb import Bigchain -from bigchaindb.backend.connection import Connection -from bigchaindb.pipelines.utils import ChangeFeed - - -@pytest.fixture -def mock_changefeed_data(): - return [ - { - 'new_val': 'seems like we have an insert here', - 'old_val': None, - }, { - 'new_val': None, - 'old_val': 'seems like we have a delete here', - }, { - 'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here', - } - ] - - -@pytest.fixture -def mock_changefeed_bigchain(mock_changefeed_data): - connection = Connection(host=None, port=None, dbname=None) - connection.run = Mock(return_value=mock_changefeed_data) - return Bigchain(connection=connection) - - -def test_changefeed_insert(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.qsize() == 0 - - -def test_changefeed_delete(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have a delete here' - assert outpipe.qsize() == 0 - - -def test_changefeed_update(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_multiple_operations(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_prefeed(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT, - prefeed=[1, 2, 3], - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.qsize() == 4 diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 1aeea11e..b740983c 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): import time import multiprocessing as mp - from bigchaindb import Bigchain - from bigchaindb.pipelines.utils import ChangeFeed + from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed class MockConnection: tries = 0 @@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): else: time.sleep(10) - bigchain = Bigchain() - bigchain.connection = MockConnection() - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, - bigchain=bigchain) + changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT, + connection=MockConnection()) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True)