From 171112003fb4cbdb924a43e52f53b9307fc25f32 Mon Sep 17 00:00:00 2001 From: vrde Date: Thu, 28 Jul 2016 15:43:45 +0200 Subject: [PATCH] Add more docs --- bigchaindb/pipelines/block.py | 73 ++++++++++++++++++++++++++++++++++- bigchaindb/pipelines/utils.py | 34 ++++++++++++++-- 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 4041d7a1..a08546c8 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -1,3 +1,10 @@ +"""This module takes care of all the logic related to block creation. + +The logic is encapsulated in the ``Block`` class, while the sequence +of actions to do on transactions is specified in the ``create_pipeline`` +function. +""" + import rethinkdb as r from multipipes import Pipeline, Node @@ -6,17 +13,41 @@ from bigchaindb import Bigchain class Block: + """This class encapsulates the logic to create blocks. + + Note: + Methods of this class will be executed in different processes. + """ def __init__(self): + """Initialize the Block creator""" self.bigchain = Bigchain() self.txs = [] def filter_tx(self, tx): + """Filter a transaction. + + Args: + tx (dict): the transaction to process. + + Returns: + The transaction if assigned to the current node, + ``None`` otherwise. + """ + if tx['assignee'] == self.bigchain.me: tx.pop('assignee') return tx def delete_tx(self, tx): + """Delete a transaction. + + Args: + tx (dict): the transaction to delete. + + Returns: + The transaction. + """ r.table('backlog')\ .get(tx['id'])\ .delete(durability='hard')\ @@ -25,11 +56,35 @@ class Block: return tx def validate_tx(self, tx): + """Validate a transaction. + + Args: + tx (dict): the transaction to validate. + + Returns: + The transaction if valid, ``None`` otherwise. + """ tx = self.bigchain.is_valid_transaction(tx) if tx: return tx def create(self, tx, timeout=False): + """Create a block. + + This method accumulates transactions to put in a block and outputs + a block when one of the following conditions is true: + - the size limit of the block has been reached, or + - a timeout happened. + + Args: + tx (dict): the transaction to validate, might be None if + a timeout happens. + timeout (bool): ``True`` if a timeout happened + (Default: ``False``). + + Returns: + The block. + """ if tx: self.txs.append(tx) if self.txs and (len(self.txs) == 1000 or timeout): @@ -38,11 +93,21 @@ class Block: return block def write(self, block): + """Write the block to the Database. + + Args: + block (dict): the block of transactions to write to the database. + + Returns: + The block. + """ self.bigchain.write_block(block) return block def initial(): + """Return old transactions from the backlog.""" + b = Bigchain() rs = r.table('backlog')\ @@ -55,10 +120,15 @@ def initial(): def get_changefeed(): - return ChangeFeed('backlog', 'insert', prefeed=initial()) + """Create and return the changefeed for the backlog.""" + + return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) def create_pipeline(): + """Create and return the pipeline of operations to be distributed + on different processes.""" + block = Block() block_pipeline = Pipeline([ @@ -73,6 +143,7 @@ def create_pipeline(): def start(): + """Create, start, and return the block pipeline.""" pipeline = create_pipeline() pipeline.setup(indata=get_changefeed()) pipeline.start() diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index e879b015..fb018453 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -1,3 +1,6 @@ +"""Utility classes and functions to work with the pipelines.""" + + import rethinkdb as r from multipipes import Node @@ -5,8 +8,33 @@ from bigchaindb import Bigchain class ChangeFeed(Node): + """This class wraps a RethinkDB changefeed adding a `prefeed`. + + It extends the ``multipipes::Node`` class to make it pluggable in + other Pipelines instances, and it makes usage of ``self.outqueue`` + to output the data. + + A changefeed is a real time feed on inserts, updates, and deletes, and + it's volatile. This class is a helper to create changefeeds. Moreover + it provides a way to specify a `prefeed`, that is a set of data (iterable) + to output before the actual changefeed. + """ + + INSERT = 'insert' + DELETE = 'delete' + UPDATE = 'update' def __init__(self, table, operation, prefeed=None): + """Create a new RethinkDB ChangeFeed. + + Args: + table (str): name of the table to listen to for changes. + operation (str): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. + prefeed (iterable): whatever set of data you want to be published + first. + """ + super().__init__(name='changefeed') self.prefeed = prefeed if prefeed else [] self.table = table @@ -23,10 +51,10 @@ class ChangeFeed(Node): is_delete = change['new_val'] is None is_update = not is_insert and not is_delete - if is_insert and self.operation == 'insert': + if is_insert and self.operation == ChangeFeed.INSERT: self.outqueue.put(change['new_val']) - elif is_delete and self.operation == 'delete': + elif is_delete and self.operation == ChangeFeed.DELETE: self.outqueue.put(change['old_val']) - elif is_update and self.operation == 'update': + elif is_update and self.operation == ChangeFeed.UPDATE: self.outqueue.put(change)