mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Merge pull request #1193 from bigchaindb/stepping-pipeline
Add stepping pipeline (test tool) and fix issue #1191
This commit is contained in:
commit
631a435a63
@ -85,11 +85,13 @@ class MongoDBChangeFeed(ChangeFeed):
|
|||||||
# document itself. So here we first read the document
|
# document itself. So here we first read the document
|
||||||
# and then return it.
|
# and then return it.
|
||||||
doc = self.connection.conn[dbname][table].find_one(
|
doc = self.connection.conn[dbname][table].find_one(
|
||||||
{'_id': record['o2']},
|
{'_id': record['o2']['_id']},
|
||||||
{'_id': False}
|
{'_id': False}
|
||||||
)
|
)
|
||||||
self.outqueue.put(doc)
|
self.outqueue.put(doc)
|
||||||
|
|
||||||
|
logger.debug('Record in changefeed: %s:%s', table, record['op'])
|
||||||
|
|
||||||
|
|
||||||
@register_changefeed(MongoDBConnection)
|
@register_changefeed(MongoDBConnection)
|
||||||
def get_changefeed(connection, table, operation, *, prefeed=None):
|
def get_changefeed(connection, table, operation, *, prefeed=None):
|
||||||
|
@ -55,7 +55,9 @@ class Bigchain(object):
|
|||||||
self.me = public_key or bigchaindb.config['keypair']['public']
|
self.me = public_key or bigchaindb.config['keypair']['public']
|
||||||
self.me_private = private_key or bigchaindb.config['keypair']['private']
|
self.me_private = private_key or bigchaindb.config['keypair']['private']
|
||||||
self.nodes_except_me = keyring or bigchaindb.config['keyring']
|
self.nodes_except_me = keyring or bigchaindb.config['keyring']
|
||||||
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
|
if backlog_reassign_delay is None:
|
||||||
|
backlog_reassign_delay = bigchaindb.config['backlog_reassign_delay']
|
||||||
|
self.backlog_reassign_delay = backlog_reassign_delay
|
||||||
self.consensus = BaseConsensusRules
|
self.consensus = BaseConsensusRules
|
||||||
self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])
|
self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])
|
||||||
if not self.me or not self.me_private:
|
if not self.me or not self.me_private:
|
||||||
|
@ -19,7 +19,7 @@ def mock_changefeed_data():
|
|||||||
{
|
{
|
||||||
'op': 'u',
|
'op': 'u',
|
||||||
'o': {'msg': 'seems like we have an update here'},
|
'o': {'msg': 'seems like we have an update here'},
|
||||||
'o2': 'some-id'
|
'o2': {'_id': 'some-id'}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
10
tests/pipelines/conftest.py
Normal file
10
tests/pipelines/conftest.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from .stepping import create_stepper
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def steps():
|
||||||
|
stepper = create_stepper()
|
||||||
|
with stepper.start():
|
||||||
|
yield stepper
|
182
tests/pipelines/stepping.py
Normal file
182
tests/pipelines/stepping.py
Normal file
@ -0,0 +1,182 @@
|
|||||||
|
"""
|
||||||
|
Pipeline stepping is a way to advance the asynchronous data pipeline
|
||||||
|
deterministically by exposing each step separately and advancing the states
|
||||||
|
manually.
|
||||||
|
|
||||||
|
The multipipes.Pipeline class implements a pipeline that advanced
|
||||||
|
asynchronously and concurrently. This module provides an interface to the
|
||||||
|
BigchainDB pipelines that is static, ie, does not advance without prompting.
|
||||||
|
|
||||||
|
Rather than having a pipeline that is in an all or nothing running / not running
|
||||||
|
state, one can do the following:
|
||||||
|
|
||||||
|
|
||||||
|
steps = create_stepper()
|
||||||
|
|
||||||
|
with steps.start():
|
||||||
|
tx = my_create_and_write_tx()
|
||||||
|
steps.block_changefeed(timeout=1)
|
||||||
|
steps.block_filter_tx()
|
||||||
|
steps.block_validate_tx()
|
||||||
|
steps.block_create(timeout=True)
|
||||||
|
|
||||||
|
assert steps.counts == {'block_write': 1}
|
||||||
|
|
||||||
|
Pending items are held in the `.queues` attribute, and every task has it's own
|
||||||
|
queue (as in multipipes.Pipeline). Queues are just lists though so they can
|
||||||
|
be easily inspected.
|
||||||
|
|
||||||
|
As a shortcut, the `.counts` attribute is provided which returns the number of
|
||||||
|
pending items for each task. This is useful to assert the expected status of
|
||||||
|
the queues after performing some sequence.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
import logging
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import bigchaindb.core
|
||||||
|
from bigchaindb.backend.changefeed import ChangeFeed
|
||||||
|
import bigchaindb.pipelines.block
|
||||||
|
import bigchaindb.pipelines.stale
|
||||||
|
import bigchaindb.pipelines.vote
|
||||||
|
|
||||||
|
|
||||||
|
class MultipipesStepper:
|
||||||
|
def __init__(self):
|
||||||
|
self.queues = {}
|
||||||
|
self.tasks = {}
|
||||||
|
self.input_tasks = set()
|
||||||
|
self.processes = []
|
||||||
|
|
||||||
|
def add_input(self, prefix, node, next):
|
||||||
|
""" Add an input task; Reads from the outqueue of the Node """
|
||||||
|
name = '%s_%s' % (prefix, node.name)
|
||||||
|
next_name = '%s_%s' % (prefix, next.name)
|
||||||
|
|
||||||
|
if isinstance(node, ChangeFeed):
|
||||||
|
self.processes.append(node)
|
||||||
|
|
||||||
|
def f(*args, **kwargs):
|
||||||
|
_kwargs = {'timeout': 0.1}
|
||||||
|
_kwargs.update(kwargs)
|
||||||
|
return node.outqueue.get(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
f = node.target
|
||||||
|
|
||||||
|
def inner(**kwargs):
|
||||||
|
r = f(**kwargs)
|
||||||
|
if r is not None:
|
||||||
|
self._enqueue(next_name, r)
|
||||||
|
|
||||||
|
self.tasks[name] = functools.wraps(f)(inner)
|
||||||
|
self.input_tasks.add(name)
|
||||||
|
|
||||||
|
def add_stage(self, prefix, node, next):
|
||||||
|
"""
|
||||||
|
Add a stage task, popping from own queue and appending to the queue
|
||||||
|
of the next node
|
||||||
|
"""
|
||||||
|
f = node.target
|
||||||
|
name = '%s_%s' % (prefix, node.name)
|
||||||
|
if next:
|
||||||
|
next_name = '%s_%s' % (prefix, next.name)
|
||||||
|
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
out = f(*args, **kwargs)
|
||||||
|
if out is not None and next:
|
||||||
|
self._enqueue(next_name, out)
|
||||||
|
|
||||||
|
task = functools.wraps(f)(inner)
|
||||||
|
self.tasks[name] = task
|
||||||
|
|
||||||
|
def _enqueue(self, name, item):
|
||||||
|
""" internal function; add item(s) to queue) """
|
||||||
|
queue = self.queues.setdefault(name, [])
|
||||||
|
if isinstance(item, types.GeneratorType):
|
||||||
|
items = list(item)
|
||||||
|
else:
|
||||||
|
items = [item]
|
||||||
|
for item in items:
|
||||||
|
if type(item) != tuple:
|
||||||
|
item = (item,)
|
||||||
|
queue.append(item)
|
||||||
|
|
||||||
|
def step(self, name, **kwargs):
|
||||||
|
""" Advance pipeline stage. Throws Empty if no data to consume. """
|
||||||
|
logging.debug('Stepping %s', name)
|
||||||
|
task = self.tasks[name]
|
||||||
|
if name in self.input_tasks:
|
||||||
|
task(**kwargs)
|
||||||
|
else:
|
||||||
|
queue = self.queues.get(name, [])
|
||||||
|
if not queue:
|
||||||
|
raise Empty(name)
|
||||||
|
task(*queue.pop(0), **kwargs)
|
||||||
|
logging.debug('Stepped %s', name)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def counts(self):
|
||||||
|
""" Get sizes of non empty queues """
|
||||||
|
counts = {}
|
||||||
|
for name in self.queues:
|
||||||
|
n = len(self.queues[name])
|
||||||
|
if n:
|
||||||
|
counts[name] = n
|
||||||
|
return counts
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
""" Shortcut to get a queue """
|
||||||
|
return lambda **kwargs: self.step(name, **kwargs)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def start(self):
|
||||||
|
""" Start async inputs; changefeeds etc """
|
||||||
|
for p in self.processes:
|
||||||
|
p.start()
|
||||||
|
# It would be nice to have a better way to wait for changefeeds here.
|
||||||
|
# We have to wait some amount of time because the feed setup is
|
||||||
|
# happening in a different process and won't include any writes we
|
||||||
|
# perform before it is ready.
|
||||||
|
time.sleep(0.2)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
for p in self.processes:
|
||||||
|
p.terminate()
|
||||||
|
|
||||||
|
|
||||||
|
class Empty(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _update_stepper(stepper, prefix, pipeline):
|
||||||
|
nodes = pipeline.nodes
|
||||||
|
for i in range(len(nodes)):
|
||||||
|
n0 = nodes[i]
|
||||||
|
n1 = (nodes + [None])[i+1]
|
||||||
|
f = stepper.add_input if i == 0 else stepper.add_stage
|
||||||
|
f(prefix, n0, n1)
|
||||||
|
|
||||||
|
|
||||||
|
def create_stepper():
|
||||||
|
stepper = MultipipesStepper()
|
||||||
|
|
||||||
|
with patch('bigchaindb.pipelines.block.Pipeline.start'):
|
||||||
|
pipeline = bigchaindb.pipelines.block.start()
|
||||||
|
_update_stepper(stepper, 'block', pipeline)
|
||||||
|
|
||||||
|
with patch('bigchaindb.pipelines.stale.Pipeline.start'):
|
||||||
|
pipeline = bigchaindb.pipelines.stale.start(
|
||||||
|
timeout=0, backlog_reassign_delay=0)
|
||||||
|
_update_stepper(stepper, 'stale', pipeline)
|
||||||
|
|
||||||
|
with patch('bigchaindb.pipelines.vote.Pipeline.start'):
|
||||||
|
pipeline = bigchaindb.pipelines.vote.start()
|
||||||
|
_update_stepper(stepper, 'vote', pipeline)
|
||||||
|
|
||||||
|
return stepper
|
28
tests/pipelines/test_steps.py
Normal file
28
tests/pipelines/test_steps.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import pytest
|
||||||
|
import random
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.bdb
|
||||||
|
@pytest.mark.genesis
|
||||||
|
def test_stepping_changefeed_produces_update(b, steps):
|
||||||
|
tx = input_single_create(b)
|
||||||
|
|
||||||
|
# timeouts are 0 so will reassign immediately
|
||||||
|
steps.stale_check_transactions()
|
||||||
|
steps.stale_reassign_transactions()
|
||||||
|
|
||||||
|
# We expect 2 changefeed events
|
||||||
|
steps.block_changefeed()
|
||||||
|
steps.block_changefeed()
|
||||||
|
|
||||||
|
assert steps.counts == {'block_filter_tx': 2}
|
||||||
|
assert ([tx['id'] for (tx,) in steps.queues['block_filter_tx']] ==
|
||||||
|
[tx.id, tx.id])
|
||||||
|
|
||||||
|
|
||||||
|
def input_single_create(b):
|
||||||
|
from bigchaindb.common.transaction import Transaction
|
||||||
|
metadata = {'r': random.random()}
|
||||||
|
tx = Transaction.create([b.me], [([b.me], 1)], metadata)
|
||||||
|
b.write_transaction(tx)
|
||||||
|
return tx
|
Loading…
x
Reference in New Issue
Block a user