From 341f43267ac9fe9d24dee1e3d9337f247f28dfc1 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 8 Dec 2016 18:16:16 +0100 Subject: [PATCH] Initial implementation of the changefeed abstraction --- bigchaindb/backend/__init__.py | 1 + bigchaindb/backend/changefeed.py | 87 ++++++++++++++++++++++ bigchaindb/backend/rethinkdb/changefeed.py | 66 ++++++++++++++++ bigchaindb/pipelines/block.py | 18 ++--- 4 files changed, 163 insertions(+), 9 deletions(-) diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py index 7e37b5d2..c1deaa92 100644 --- a/bigchaindb/backend/__init__.py +++ b/bigchaindb/backend/__init__.py @@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable. from bigchaindb.backend import changefeed, schema, query # noqa from bigchaindb.backend.connection import connect # noqa +from bigchaindb.backend.changefeed import get_changefeed # noqa diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index ccba02d3..cd8d721a 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -1 +1,88 @@ """Changefeed interfaces for backends.""" + +from functools import singledispatch +from multipipes import Node + +import bigchaindb + + +class ChangeFeed(Node): + """Create a new changefeed. + + It extends :class:`multipipes.Node` to make it pluggable in other + Pipelines instances, and makes usage of ``self.outqueue`` to output + the data. + + A changefeed is a real time feed on inserts, updates, and deletes, and + is volatile. This class is a helper to create changefeeds. Moreover, + it provides a way to specify a ``prefeed`` of iterable data to output + before the actual changefeed. + """ + + INSERT = 1 + DELETE = 2 + UPDATE = 4 + + def __init__(self, table, operation, *, prefeed=None, connection=None): + """Create a new ChangeFeed. + + Args: + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database (can be None). + """ + + super().__init__(name='changefeed') + self.prefeed = prefeed if prefeed else [] + self.table = table + self.operation = operation + if connection: + self.connection = connection + else: + self.connection = bigchaindb.backend.connect( + **bigchaindb.config['database']) + + def run_forever(self): + """Main loop of the ``multipipes.Node`` + + This method is responsible for first feeding the prefeed to the + outqueue and after that start the changefeed and recover from any + errors that may occur in the backend. + """ + raise NotImplementedError + + def run_changefeed(self): + """Backend specific method to run the changefeed. + + The changefeed is is usually a backend cursor that is not closed when + all the results are exausted. Instead it remains open waiting for new + results. + + This method should also filter each result based on the ``operation`` + and put all matching results on the outqueue of ``multipipes.Node``. + """ + raise NotImplementedError + + +@singledispatch +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a ChangeFeed. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + """ + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index e69de29b..8a5e4973 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -0,0 +1,66 @@ +import time +import logging +import rethinkdb as r + +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + +logger = logging.getLogger(__name__) +register_changefeed = module_dispatch_registrar(backend.changefeed) + + +class RethinkDBChangeFeed(ChangeFeed): + """This class wraps a RethinkDB changefeed.""" + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + while True: + try: + self.run_changefeed() + break + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table) + .changes()): + is_insert = change['old_val'] is None + is_delete = change['new_val'] is None + is_update = not is_insert and not is_delete + + if is_insert and (self.operation & ChangeFeed.INSERT): + self.outqueue.put(change['new_val']) + elif is_delete and (self.operation & ChangeFeed.DELETE): + self.outqueue.put(change['old_val']) + elif is_update and (self.operation & ChangeFeed.UPDATE): + self.outqueue.put(change['new_val']) + + +@register_changefeed(RethinkDBConnection) +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a RethinkDB changefeed. + + Args: + connection (:class:`~bigchaindb.backend.rethinkdb.connection.RethinkDBConnection`): # noqa + A connection to the database. + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + + Returns: + An instance of + :class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`. + """ + + return RethinkDBChangeFeed(table, operation, prefeed=prefeed, + connection=connection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0394aa23..dc0e1163 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -10,8 +10,11 @@ import logging import rethinkdb as r from multipipes import Pipeline, Node, Pipe +import bigchaindb +from bigchaindb.backend import connect +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend import get_changefeed from bigchaindb.models import Transaction -from bigchaindb.pipelines.utils import ChangeFeed from bigchaindb import Bigchain @@ -147,13 +150,6 @@ def initial(): .order_by(index=r.asc('assignee__transaction_timestamp'))) -def get_changefeed(): - """Create and return the changefeed for the backlog.""" - - return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -174,7 +170,11 @@ def create_pipeline(): def start(): """Create, start, and return the block pipeline.""" + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'backlog', + ChangeFeed.INSER | ChangeFeed.UPDATE, + preefed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed()) + pipeline.setup(indata=changefeed) pipeline.start() return pipeline