document MultipipesStepper

This commit is contained in:
Scott Sadler 2017-02-21 13:13:40 +01:00
parent 6110693ae8
commit a23a741253

View File

@ -53,6 +53,7 @@ class MultipipesStepper:
self.processes = [] self.processes = []
def add_input(self, prefix, node, next): def add_input(self, prefix, node, next):
""" Add an input task; Reads from the outqueue of the Node """
name = '%s_%s' % (prefix, node.name) name = '%s_%s' % (prefix, node.name)
next_name = '%s_%s' % (prefix, next.name) next_name = '%s_%s' % (prefix, next.name)
@ -76,7 +77,8 @@ class MultipipesStepper:
def add_stage(self, prefix, node, next): def add_stage(self, prefix, node, next):
""" """
Convert pipeline stage into task. Add a stage task, popping from own queue and appending to the queue
of the next node
""" """
f = node.target f = node.target
name = '%s_%s' % (prefix, node.name) name = '%s_%s' % (prefix, node.name)
@ -91,7 +93,8 @@ class MultipipesStepper:
task = functools.wraps(f)(inner) task = functools.wraps(f)(inner)
self.tasks[name] = task self.tasks[name] = task
def enqueue(self, name, item): def _enqueue(self, name, item):
""" internal function; add item(s) to queue) """
queue = self.queues.setdefault(name, []) queue = self.queues.setdefault(name, [])
if isinstance(item, types.GeneratorType): if isinstance(item, types.GeneratorType):
queue.extend(list(item)) queue.extend(list(item))
@ -99,6 +102,7 @@ class MultipipesStepper:
queue.append(item) queue.append(item)
def step(self, name, **kwargs): def step(self, name, **kwargs):
""" Advance pipeline stage. Throws Empty if no data to consume. """
logging.debug('Stepping %s', name) logging.debug('Stepping %s', name)
task = self.tasks[name] task = self.tasks[name]
if name in self.input_tasks: if name in self.input_tasks:
@ -111,6 +115,7 @@ class MultipipesStepper:
logging.debug('Stepped %s', name) logging.debug('Stepped %s', name)
def get_counts(self): def get_counts(self):
""" Get sizes of non empty queues """
counts = {} counts = {}
for name in self.queues: for name in self.queues:
n = len(self.queues[name]) n = len(self.queues[name])
@ -119,10 +124,12 @@ class MultipipesStepper:
return counts return counts
def __getattr__(self, name): def __getattr__(self, name):
""" Shortcut to get a queue """
return lambda **kwargs: self.step(name, **kwargs) return lambda **kwargs: self.step(name, **kwargs)
@contextmanager @contextmanager
def start(self): def start(self):
""" Start async inputs; changefeeds etc """
for p in self.processes: for p in self.processes:
p.start() p.start()
# It would be nice to have a better way to wait for changefeeds here. # It would be nice to have a better way to wait for changefeeds here.
@ -141,7 +148,7 @@ class Empty(Exception):
pass pass
def update_stepper(stepper, prefix, pipeline): def _update_stepper(stepper, prefix, pipeline):
nodes = pipeline.nodes nodes = pipeline.nodes
for i in range(len(nodes)): for i in range(len(nodes)):
n0 = nodes[i] n0 = nodes[i]
@ -155,11 +162,11 @@ def create_stepper():
with patch('bigchaindb.pipelines.block.Pipeline.start'): with patch('bigchaindb.pipelines.block.Pipeline.start'):
pipeline = bigchaindb.pipelines.block.start() pipeline = bigchaindb.pipelines.block.start()
update_stepper(stepper, 'block', pipeline) _update_stepper(stepper, 'block', pipeline)
with patch('bigchaindb.pipelines.stale.Pipeline.start'): with patch('bigchaindb.pipelines.stale.Pipeline.start'):
pipeline = bigchaindb.pipelines.stale.start( pipeline = bigchaindb.pipelines.stale.start(
timeout=0, backlog_reassign_delay=0) timeout=0, backlog_reassign_delay=0)
update_stepper(stepper, 'stale', pipeline) _update_stepper(stepper, 'stale', pipeline)
return stepper return stepper