From 955b018be8995743015f3efdfa78b55332c3fd26 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 15 Feb 2017 14:58:16 +0100 Subject: [PATCH 1/6] add stepping pipeline and fix issue #1191 --- bigchaindb/backend/mongodb/changefeed.py | 4 +- bigchaindb/core.py | 4 +- tests/pipelines/conftest.py | 10 ++ tests/pipelines/stepping.py | 138 +++++++++++++++++++++++ tests/pipelines/test_steps.py | 24 ++++ 5 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 tests/pipelines/conftest.py create mode 100644 tests/pipelines/stepping.py create mode 100644 tests/pipelines/test_steps.py diff --git a/bigchaindb/backend/mongodb/changefeed.py b/bigchaindb/backend/mongodb/changefeed.py index 4a5a5b7e..3abcbeda 100644 --- a/bigchaindb/backend/mongodb/changefeed.py +++ b/bigchaindb/backend/mongodb/changefeed.py @@ -85,11 +85,13 @@ class MongoDBChangeFeed(ChangeFeed): # document itself. So here we first read the document # and then return it. doc = self.connection.conn[dbname][table].find_one( - {'_id': record['o2']}, + {'_id': record['o2']['_id']}, {'_id': False} ) self.outqueue.put(doc) + logger.debug('Record in changefeed: %s:%s', table, record['op']) + @register_changefeed(MongoDBConnection) def get_changefeed(connection, table, operation, *, prefeed=None): diff --git a/bigchaindb/core.py b/bigchaindb/core.py index e4dddb6f..a7ed93f0 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -55,7 +55,9 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] - self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] + if backlog_reassign_delay is None: + backlog_reassign_delay = bigchaindb.config['backlog_reassign_delay'] + self.backlog_reassign_delay = backlog_reassign_delay self.consensus = BaseConsensusRules self.connection = connection if connection else backend.connect(**bigchaindb.config['database']) if not self.me or not self.me_private: diff --git a/tests/pipelines/conftest.py b/tests/pipelines/conftest.py new file mode 100644 index 00000000..5b66f048 --- /dev/null +++ b/tests/pipelines/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from .stepping import create_stepper + + +@pytest.fixture +def steps(): + stepper = create_stepper() + with stepper.start(): + yield stepper diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py new file mode 100644 index 00000000..567dc846 --- /dev/null +++ b/tests/pipelines/stepping.py @@ -0,0 +1,138 @@ +""" +Pipeline stepping is a way to advance the asynchronous data pipeline +deterministically by exposing each step separately and advancing the states +manually. +""" + + +import functools +import time +import types +import logging +from contextlib import contextmanager +from unittest.mock import patch + +import bigchaindb.core +from bigchaindb.backend.changefeed import ChangeFeed +import bigchaindb.pipelines.block +import bigchaindb.pipelines.stale + + +class MultipipesStepper: + def __init__(self): + self.queues = {} + self.tasks = {} + self.input_tasks = set() + self.processes = [] + + def add_input(self, prefix, node, next): + name = '%s_%s' % (prefix, node.name) + next_name = '%s_%s' % (prefix, next.name) + + if isinstance(node, ChangeFeed): + self.processes.append(node) + + def f(*args, **kwargs): + _kwargs = {'timeout': 0.1} + _kwargs.update(kwargs) + return node.outqueue.get(*args, **kwargs) + else: + f = node.target + + def inner(**kwargs): + r = f(**kwargs) + if r is not None: + self.enqueue(next_name, r) + + self.tasks[name] = functools.wraps(f)(inner) + self.input_tasks.add(name) + + def add_stage(self, prefix, node, next): + """ + Convert pipeline stage into task. + """ + f = node.target + name = '%s_%s' % (prefix, node.name) + if next: + next_name = '%s_%s' % (prefix, next.name) + + def inner(*args): + out = f(*args) + if out is not None and next: + self.enqueue(next_name, out) + + task = functools.wraps(f)(inner) + self.tasks[name] = task + + def enqueue(self, name, item): + queue = self.queues.setdefault(name, []) + if isinstance(item, types.GeneratorType): + queue.extend(list(item)) + else: + queue.append(item) + + def step(self, name, **kwargs): + logging.debug('Stepping %s', name) + task = self.tasks[name] + if name in self.input_tasks: + task(**kwargs) + else: + queue = self.queues.get(name, []) + if not queue: + raise Empty(name) + task(queue.pop(0), **kwargs) + logging.debug('Stepped %s', name) + + def get_counts(self): + counts = {} + for name in self.queues: + n = len(self.queues[name]) + if n: + counts[name] = n + return counts + + def __getattr__(self, name): + return lambda **kwargs: self.step(name, **kwargs) + + @contextmanager + def start(self): + for p in self.processes: + p.start() + # It would be nice to have a better way to wait for changefeeds here. + # We have to wait some amount of time because the feed setup is + # happening in a different process and won't include any writes we + # perform before it is ready. + time.sleep(0.2) + try: + yield + finally: + for p in self.processes: + p.terminate() + + +class Empty(Exception): + pass + + +def update_stepper(stepper, prefix, pipeline): + nodes = pipeline.nodes + for i in range(len(nodes)): + n0 = nodes[i] + n1 = (nodes + [None])[i+1] + f = stepper.add_input if i == 0 else stepper.add_stage + f(prefix, n0, n1) + + +def create_stepper(): + stepper = MultipipesStepper() + + with patch('bigchaindb.pipelines.block.Pipeline.start'): + pipeline = bigchaindb.pipelines.block.start() + update_stepper(stepper, 'block', pipeline) + + with patch('bigchaindb.pipelines.stale.Pipeline.start'): + pipeline = bigchaindb.pipelines.stale.start( + timeout=0, backlog_reassign_delay=0) + update_stepper(stepper, 'stale', pipeline) + + return stepper diff --git a/tests/pipelines/test_steps.py b/tests/pipelines/test_steps.py new file mode 100644 index 00000000..76862bd9 --- /dev/null +++ b/tests/pipelines/test_steps.py @@ -0,0 +1,24 @@ +import pytest +import random + + +@pytest.mark.bdb +def test_stepping_changefeed_produces_update(b, steps): + input_single_create(b) + steps.block_changefeed() + steps.block_filter_tx() + + # timeouts are 0 so will reassign immediately + steps.stale_check_transactions() + steps.stale_reassign_transactions() + + # We expect 2 changefeed events + steps.block_changefeed() + steps.block_filter_tx() + + +def input_single_create(b): + from bigchaindb.common.transaction import Transaction + metadata = {'r': random.random()} + tx = Transaction.create([b.me], [([b.me], 1)], metadata) + b.write_transaction(tx) From 7469f60d17dbc68b61bcb663645ef3a328c2f90a Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 15 Feb 2017 15:15:20 +0100 Subject: [PATCH 2/6] more assertions in test --- tests/pipelines/test_steps.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/pipelines/test_steps.py b/tests/pipelines/test_steps.py index 76862bd9..287a17c4 100644 --- a/tests/pipelines/test_steps.py +++ b/tests/pipelines/test_steps.py @@ -4,9 +4,7 @@ import random @pytest.mark.bdb def test_stepping_changefeed_produces_update(b, steps): - input_single_create(b) - steps.block_changefeed() - steps.block_filter_tx() + tx = input_single_create(b) # timeouts are 0 so will reassign immediately steps.stale_check_transactions() @@ -14,7 +12,11 @@ def test_stepping_changefeed_produces_update(b, steps): # We expect 2 changefeed events steps.block_changefeed() - steps.block_filter_tx() + steps.block_changefeed() + + assert steps.get_counts() == {'block_filter_tx': 2} + assert ([tx['id'] for tx in steps.queues['block_filter_tx']] == + [tx.id, tx.id]) def input_single_create(b): @@ -22,3 +24,4 @@ def input_single_create(b): metadata = {'r': random.random()} tx = Transaction.create([b.me], [([b.me], 1)], metadata) b.write_transaction(tx) + return tx From fc04cd7bcd94a3b34801a8433c57655bf85c4f71 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Wed, 15 Feb 2017 17:31:36 +0100 Subject: [PATCH 3/6] update changefeed test for update --- tests/backend/mongodb/test_changefeed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/backend/mongodb/test_changefeed.py b/tests/backend/mongodb/test_changefeed.py index 67b54cd8..00dcaca5 100644 --- a/tests/backend/mongodb/test_changefeed.py +++ b/tests/backend/mongodb/test_changefeed.py @@ -19,7 +19,7 @@ def mock_changefeed_data(): { 'op': 'u', 'o': {'msg': 'seems like we have an update here'}, - 'o2': 'some-id' + 'o2': {'_id': 'some-id'} }, ] From 6110693ae847d83c0f1306d6fa9484e118a10f26 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Tue, 21 Feb 2017 13:04:43 +0100 Subject: [PATCH 4/6] provide more documentation for MultipipesStepper --- tests/pipelines/stepping.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py index 567dc846..bb70add0 100644 --- a/tests/pipelines/stepping.py +++ b/tests/pipelines/stepping.py @@ -2,6 +2,33 @@ Pipeline stepping is a way to advance the asynchronous data pipeline deterministically by exposing each step separately and advancing the states manually. + +The multipipes.Pipeline class implements a pipeline that advanced +asynchronously and concurrently. This module provides an interface to the +BigchainDB pipelines that is static, ie, does not advance without prompting. + +Rather than having a pipeline that is in an all or nothing running / not running +state, one can do the following: + + +steps = create_stepper() + +with steps.start(): + tx = my_create_and_write_tx() + steps.block_changefeed(timeout=1) + steps.block_filter_tx() + steps.block_validate_tx() + steps.block_create(timeout=True) + +assert steps.counts == {'block_write': 1} + +Pending items are held in the `.queues` attribute, and every task has it's own +queue (as in multipipes.Pipeline). Queues are just lists though so they can +be easily inspected. + +As a shortcut, the `.counts` attribute is provided which returns the number of +pending items for each task. This is useful to assert the expected status of +the queues after performing some sequence. """ From a23a741253d74f07f51b38ebc5d25e38e0eca9d1 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Tue, 21 Feb 2017 13:13:40 +0100 Subject: [PATCH 5/6] document MultipipesStepper --- tests/pipelines/stepping.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py index bb70add0..5f099750 100644 --- a/tests/pipelines/stepping.py +++ b/tests/pipelines/stepping.py @@ -53,6 +53,7 @@ class MultipipesStepper: self.processes = [] def add_input(self, prefix, node, next): + """ Add an input task; Reads from the outqueue of the Node """ name = '%s_%s' % (prefix, node.name) next_name = '%s_%s' % (prefix, next.name) @@ -76,7 +77,8 @@ class MultipipesStepper: def add_stage(self, prefix, node, next): """ - Convert pipeline stage into task. + Add a stage task, popping from own queue and appending to the queue + of the next node """ f = node.target name = '%s_%s' % (prefix, node.name) @@ -91,7 +93,8 @@ class MultipipesStepper: task = functools.wraps(f)(inner) self.tasks[name] = task - def enqueue(self, name, item): + def _enqueue(self, name, item): + """ internal function; add item(s) to queue) """ queue = self.queues.setdefault(name, []) if isinstance(item, types.GeneratorType): queue.extend(list(item)) @@ -99,6 +102,7 @@ class MultipipesStepper: queue.append(item) def step(self, name, **kwargs): + """ Advance pipeline stage. Throws Empty if no data to consume. """ logging.debug('Stepping %s', name) task = self.tasks[name] if name in self.input_tasks: @@ -111,6 +115,7 @@ class MultipipesStepper: logging.debug('Stepped %s', name) def get_counts(self): + """ Get sizes of non empty queues """ counts = {} for name in self.queues: n = len(self.queues[name]) @@ -119,10 +124,12 @@ class MultipipesStepper: return counts def __getattr__(self, name): + """ Shortcut to get a queue """ return lambda **kwargs: self.step(name, **kwargs) @contextmanager def start(self): + """ Start async inputs; changefeeds etc """ for p in self.processes: p.start() # It would be nice to have a better way to wait for changefeeds here. @@ -141,7 +148,7 @@ class Empty(Exception): pass -def update_stepper(stepper, prefix, pipeline): +def _update_stepper(stepper, prefix, pipeline): nodes = pipeline.nodes for i in range(len(nodes)): n0 = nodes[i] @@ -155,11 +162,11 @@ def create_stepper(): with patch('bigchaindb.pipelines.block.Pipeline.start'): pipeline = bigchaindb.pipelines.block.start() - update_stepper(stepper, 'block', pipeline) + _update_stepper(stepper, 'block', pipeline) with patch('bigchaindb.pipelines.stale.Pipeline.start'): pipeline = bigchaindb.pipelines.stale.start( timeout=0, backlog_reassign_delay=0) - update_stepper(stepper, 'stale', pipeline) + _update_stepper(stepper, 'stale', pipeline) return stepper From 37f5298962fcc7ade4893489921f92400cf77db0 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Tue, 21 Feb 2017 13:51:09 +0100 Subject: [PATCH 6/6] pull stepper changes from no-double-inclusion --- tests/pipelines/stepping.py | 24 +++++++++++++++++------- tests/pipelines/test_steps.py | 5 +++-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py index 5f099750..0e286829 100644 --- a/tests/pipelines/stepping.py +++ b/tests/pipelines/stepping.py @@ -43,6 +43,7 @@ import bigchaindb.core from bigchaindb.backend.changefeed import ChangeFeed import bigchaindb.pipelines.block import bigchaindb.pipelines.stale +import bigchaindb.pipelines.vote class MultipipesStepper: @@ -70,7 +71,7 @@ class MultipipesStepper: def inner(**kwargs): r = f(**kwargs) if r is not None: - self.enqueue(next_name, r) + self._enqueue(next_name, r) self.tasks[name] = functools.wraps(f)(inner) self.input_tasks.add(name) @@ -85,10 +86,10 @@ class MultipipesStepper: if next: next_name = '%s_%s' % (prefix, next.name) - def inner(*args): - out = f(*args) + def inner(*args, **kwargs): + out = f(*args, **kwargs) if out is not None and next: - self.enqueue(next_name, out) + self._enqueue(next_name, out) task = functools.wraps(f)(inner) self.tasks[name] = task @@ -97,8 +98,12 @@ class MultipipesStepper: """ internal function; add item(s) to queue) """ queue = self.queues.setdefault(name, []) if isinstance(item, types.GeneratorType): - queue.extend(list(item)) + items = list(item) else: + items = [item] + for item in items: + if type(item) != tuple: + item = (item,) queue.append(item) def step(self, name, **kwargs): @@ -111,10 +116,11 @@ class MultipipesStepper: queue = self.queues.get(name, []) if not queue: raise Empty(name) - task(queue.pop(0), **kwargs) + task(*queue.pop(0), **kwargs) logging.debug('Stepped %s', name) - def get_counts(self): + @property + def counts(self): """ Get sizes of non empty queues """ counts = {} for name in self.queues: @@ -169,4 +175,8 @@ def create_stepper(): timeout=0, backlog_reassign_delay=0) _update_stepper(stepper, 'stale', pipeline) + with patch('bigchaindb.pipelines.vote.Pipeline.start'): + pipeline = bigchaindb.pipelines.vote.start() + _update_stepper(stepper, 'vote', pipeline) + return stepper diff --git a/tests/pipelines/test_steps.py b/tests/pipelines/test_steps.py index 287a17c4..c63a673a 100644 --- a/tests/pipelines/test_steps.py +++ b/tests/pipelines/test_steps.py @@ -3,6 +3,7 @@ import random @pytest.mark.bdb +@pytest.mark.genesis def test_stepping_changefeed_produces_update(b, steps): tx = input_single_create(b) @@ -14,8 +15,8 @@ def test_stepping_changefeed_produces_update(b, steps): steps.block_changefeed() steps.block_changefeed() - assert steps.get_counts() == {'block_filter_tx': 2} - assert ([tx['id'] for tx in steps.queues['block_filter_tx']] == + assert steps.counts == {'block_filter_tx': 2} + assert ([tx['id'] for (tx,) in steps.queues['block_filter_tx']] == [tx.id, tx.id])