Add more docs

This commit is contained in:
vrde 2016-07-28 15:43:45 +02:00
parent 40966816c9
commit 171112003f
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
2 changed files with 103 additions and 4 deletions

View File

@ -1,3 +1,10 @@
"""This module takes care of all the logic related to block creation.
The logic is encapsulated in the ``Block`` class, while the sequence
of actions to do on transactions is specified in the ``create_pipeline``
function.
"""
import rethinkdb as r import rethinkdb as r
from multipipes import Pipeline, Node from multipipes import Pipeline, Node
@ -6,17 +13,41 @@ from bigchaindb import Bigchain
class Block: class Block:
"""This class encapsulates the logic to create blocks.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self): def __init__(self):
"""Initialize the Block creator"""
self.bigchain = Bigchain() self.bigchain = Bigchain()
self.txs = [] self.txs = []
def filter_tx(self, tx): def filter_tx(self, tx):
"""Filter a transaction.
Args:
tx (dict): the transaction to process.
Returns:
The transaction if assigned to the current node,
``None`` otherwise.
"""
if tx['assignee'] == self.bigchain.me: if tx['assignee'] == self.bigchain.me:
tx.pop('assignee') tx.pop('assignee')
return tx return tx
def delete_tx(self, tx): def delete_tx(self, tx):
"""Delete a transaction.
Args:
tx (dict): the transaction to delete.
Returns:
The transaction.
"""
r.table('backlog')\ r.table('backlog')\
.get(tx['id'])\ .get(tx['id'])\
.delete(durability='hard')\ .delete(durability='hard')\
@ -25,11 +56,35 @@ class Block:
return tx return tx
def validate_tx(self, tx): def validate_tx(self, tx):
"""Validate a transaction.
Args:
tx (dict): the transaction to validate.
Returns:
The transaction if valid, ``None`` otherwise.
"""
tx = self.bigchain.is_valid_transaction(tx) tx = self.bigchain.is_valid_transaction(tx)
if tx: if tx:
return tx return tx
def create(self, tx, timeout=False): def create(self, tx, timeout=False):
"""Create a block.
This method accumulates transactions to put in a block and outputs
a block when one of the following conditions is true:
- the size limit of the block has been reached, or
- a timeout happened.
Args:
tx (dict): the transaction to validate, might be None if
a timeout happens.
timeout (bool): ``True`` if a timeout happened
(Default: ``False``).
Returns:
The block.
"""
if tx: if tx:
self.txs.append(tx) self.txs.append(tx)
if self.txs and (len(self.txs) == 1000 or timeout): if self.txs and (len(self.txs) == 1000 or timeout):
@ -38,11 +93,21 @@ class Block:
return block return block
def write(self, block): def write(self, block):
"""Write the block to the Database.
Args:
block (dict): the block of transactions to write to the database.
Returns:
The block.
"""
self.bigchain.write_block(block) self.bigchain.write_block(block)
return block return block
def initial(): def initial():
"""Return old transactions from the backlog."""
b = Bigchain() b = Bigchain()
rs = r.table('backlog')\ rs = r.table('backlog')\
@ -55,10 +120,15 @@ def initial():
def get_changefeed(): def get_changefeed():
return ChangeFeed('backlog', 'insert', prefeed=initial()) """Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial())
def create_pipeline(): def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
block = Block() block = Block()
block_pipeline = Pipeline([ block_pipeline = Pipeline([
@ -73,6 +143,7 @@ def create_pipeline():
def start(): def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline() pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed()) pipeline.setup(indata=get_changefeed())
pipeline.start() pipeline.start()

View File

@ -1,3 +1,6 @@
"""Utility classes and functions to work with the pipelines."""
import rethinkdb as r import rethinkdb as r
from multipipes import Node from multipipes import Node
@ -5,8 +8,33 @@ from bigchaindb import Bigchain
class ChangeFeed(Node): class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a `prefeed`.
It extends the ``multipipes::Node`` class to make it pluggable in
other Pipelines instances, and it makes usage of ``self.outqueue``
to output the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
it's volatile. This class is a helper to create changefeeds. Moreover
it provides a way to specify a `prefeed`, that is a set of data (iterable)
to output before the actual changefeed.
"""
INSERT = 'insert'
DELETE = 'delete'
UPDATE = 'update'
def __init__(self, table, operation, prefeed=None): def __init__(self, table, operation, prefeed=None):
"""Create a new RethinkDB ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (str): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE.
prefeed (iterable): whatever set of data you want to be published
first.
"""
super().__init__(name='changefeed') super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else [] self.prefeed = prefeed if prefeed else []
self.table = table self.table = table
@ -23,10 +51,10 @@ class ChangeFeed(Node):
is_delete = change['new_val'] is None is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete is_update = not is_insert and not is_delete
if is_insert and self.operation == 'insert': if is_insert and self.operation == ChangeFeed.INSERT:
self.outqueue.put(change['new_val']) self.outqueue.put(change['new_val'])
elif is_delete and self.operation == 'delete': elif is_delete and self.operation == ChangeFeed.DELETE:
self.outqueue.put(change['old_val']) self.outqueue.put(change['old_val'])
elif is_update and self.operation == 'update': elif is_update and self.operation == ChangeFeed.UPDATE:
self.outqueue.put(change) self.outqueue.put(change)