Initial implementation of the changefeed abstraction

This commit is contained in:
Rodolphe Marques 2016-12-08 18:16:16 +01:00
parent 9f57d57f24
commit 341f43267a
4 changed files with 163 additions and 9 deletions

View File

@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable.
from bigchaindb.backend import changefeed, schema, query # noqa
from bigchaindb.backend.connection import connect # noqa
from bigchaindb.backend.changefeed import get_changefeed # noqa

View File

@ -1 +1,88 @@
"""Changefeed interfaces for backends."""
from functools import singledispatch
from multipipes import Node
import bigchaindb
class ChangeFeed(Node):
"""Create a new changefeed.
It extends :class:`multipipes.Node` to make it pluggable in other
Pipelines instances, and makes usage of ``self.outqueue`` to output
the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
is volatile. This class is a helper to create changefeeds. Moreover,
it provides a way to specify a ``prefeed`` of iterable data to output
before the actual changefeed.
"""
INSERT = 1
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, *, prefeed=None, connection=None):
"""Create a new ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database (can be None).
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
if connection:
self.connection = connection
else:
self.connection = bigchaindb.backend.connect(
**bigchaindb.config['database'])
def run_forever(self):
"""Main loop of the ``multipipes.Node``
This method is responsible for first feeding the prefeed to the
outqueue and after that start the changefeed and recover from any
errors that may occur in the backend.
"""
raise NotImplementedError
def run_changefeed(self):
"""Backend specific method to run the changefeed.
The changefeed is is usually a backend cursor that is not closed when
all the results are exausted. Instead it remains open waiting for new
results.
This method should also filter each result based on the ``operation``
and put all matching results on the outqueue of ``multipipes.Node``.
"""
raise NotImplementedError
@singledispatch
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a ChangeFeed.
Args:
connection (:class:`~bigchaindb.backend.connection.Connection`):
A connection to the database.
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
"""
raise NotImplementedError

View File

@ -0,0 +1,66 @@
import time
import logging
import rethinkdb as r
from bigchaindb import backend
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
logger = logging.getLogger(__name__)
register_changefeed = module_dispatch_registrar(backend.changefeed)
class RethinkDBChangeFeed(ChangeFeed):
"""This class wraps a RethinkDB changefeed."""
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table)
.changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and (self.operation & ChangeFeed.INSERT):
self.outqueue.put(change['new_val'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(change['old_val'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
self.outqueue.put(change['new_val'])
@register_changefeed(RethinkDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
"""Return a RethinkDB changefeed.
Args:
connection (:class:`~bigchaindb.backend.rethinkdb.connection.RethinkDBConnection`): # noqa
A connection to the database.
table (str): name of the table to listen to for changes.
operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE. Combining multiple operation is possible
with the bitwise ``|`` operator
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
Returns:
An instance of
:class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`.
"""
return RethinkDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)

View File

@ -10,8 +10,11 @@ import logging
import rethinkdb as r
from multipipes import Pipeline, Node, Pipe
import bigchaindb
from bigchaindb.backend import connect
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend import get_changefeed
from bigchaindb.models import Transaction
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
@ -147,13 +150,6 @@ def initial():
.order_by(index=r.asc('assignee__transaction_timestamp')))
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
@ -174,7 +170,11 @@ def create_pipeline():
def start():
"""Create, start, and return the block pipeline."""
connection = connect(**bigchaindb.config['database'])
changefeed = get_changefeed(connection, 'backlog',
ChangeFeed.INSER | ChangeFeed.UPDATE,
preefed=initial())
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.setup(indata=changefeed)
pipeline.start()
return pipeline