From a23a741253d74f07f51b38ebc5d25e38e0eca9d1 Mon Sep 17 00:00:00 2001 From: Scott Sadler Date: Tue, 21 Feb 2017 13:13:40 +0100 Subject: [PATCH] document MultipipesStepper --- tests/pipelines/stepping.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/pipelines/stepping.py b/tests/pipelines/stepping.py index bb70add0..5f099750 100644 --- a/tests/pipelines/stepping.py +++ b/tests/pipelines/stepping.py @@ -53,6 +53,7 @@ class MultipipesStepper: self.processes = [] def add_input(self, prefix, node, next): + """ Add an input task; Reads from the outqueue of the Node """ name = '%s_%s' % (prefix, node.name) next_name = '%s_%s' % (prefix, next.name) @@ -76,7 +77,8 @@ class MultipipesStepper: def add_stage(self, prefix, node, next): """ - Convert pipeline stage into task. + Add a stage task, popping from own queue and appending to the queue + of the next node """ f = node.target name = '%s_%s' % (prefix, node.name) @@ -91,7 +93,8 @@ class MultipipesStepper: task = functools.wraps(f)(inner) self.tasks[name] = task - def enqueue(self, name, item): + def _enqueue(self, name, item): + """ internal function; add item(s) to queue) """ queue = self.queues.setdefault(name, []) if isinstance(item, types.GeneratorType): queue.extend(list(item)) @@ -99,6 +102,7 @@ class MultipipesStepper: queue.append(item) def step(self, name, **kwargs): + """ Advance pipeline stage. Throws Empty if no data to consume. """ logging.debug('Stepping %s', name) task = self.tasks[name] if name in self.input_tasks: @@ -111,6 +115,7 @@ class MultipipesStepper: logging.debug('Stepped %s', name) def get_counts(self): + """ Get sizes of non empty queues """ counts = {} for name in self.queues: n = len(self.queues[name]) @@ -119,10 +124,12 @@ class MultipipesStepper: return counts def __getattr__(self, name): + """ Shortcut to get a queue """ return lambda **kwargs: self.step(name, **kwargs) @contextmanager def start(self): + """ Start async inputs; changefeeds etc """ for p in self.processes: p.start() # It would be nice to have a better way to wait for changefeeds here. @@ -141,7 +148,7 @@ class Empty(Exception): pass -def update_stepper(stepper, prefix, pipeline): +def _update_stepper(stepper, prefix, pipeline): nodes = pipeline.nodes for i in range(len(nodes)): n0 = nodes[i] @@ -155,11 +162,11 @@ def create_stepper(): with patch('bigchaindb.pipelines.block.Pipeline.start'): pipeline = bigchaindb.pipelines.block.start() - update_stepper(stepper, 'block', pipeline) + _update_stepper(stepper, 'block', pipeline) with patch('bigchaindb.pipelines.stale.Pipeline.start'): pipeline = bigchaindb.pipelines.stale.start( timeout=0, backlog_reassign_delay=0) - update_stepper(stepper, 'stale', pipeline) + _update_stepper(stepper, 'stale', pipeline) return stepper