From 341f43267ac9fe9d24dee1e3d9337f247f28dfc1 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 8 Dec 2016 18:16:16 +0100 Subject: [PATCH 01/12] Initial implementation of the changefeed abstraction --- bigchaindb/backend/__init__.py | 1 + bigchaindb/backend/changefeed.py | 87 ++++++++++++++++++++++ bigchaindb/backend/rethinkdb/changefeed.py | 66 ++++++++++++++++ bigchaindb/pipelines/block.py | 18 ++--- 4 files changed, 163 insertions(+), 9 deletions(-) diff --git a/bigchaindb/backend/__init__.py b/bigchaindb/backend/__init__.py index 7e37b5d2..c1deaa92 100644 --- a/bigchaindb/backend/__init__.py +++ b/bigchaindb/backend/__init__.py @@ -10,3 +10,4 @@ configuration or the ``BIGCHAINDB_DATABASE_BACKEND`` environment variable. from bigchaindb.backend import changefeed, schema, query # noqa from bigchaindb.backend.connection import connect # noqa +from bigchaindb.backend.changefeed import get_changefeed # noqa diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index ccba02d3..cd8d721a 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -1 +1,88 @@ """Changefeed interfaces for backends.""" + +from functools import singledispatch +from multipipes import Node + +import bigchaindb + + +class ChangeFeed(Node): + """Create a new changefeed. + + It extends :class:`multipipes.Node` to make it pluggable in other + Pipelines instances, and makes usage of ``self.outqueue`` to output + the data. + + A changefeed is a real time feed on inserts, updates, and deletes, and + is volatile. This class is a helper to create changefeeds. Moreover, + it provides a way to specify a ``prefeed`` of iterable data to output + before the actual changefeed. + """ + + INSERT = 1 + DELETE = 2 + UPDATE = 4 + + def __init__(self, table, operation, *, prefeed=None, connection=None): + """Create a new ChangeFeed. + + Args: + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database (can be None). + """ + + super().__init__(name='changefeed') + self.prefeed = prefeed if prefeed else [] + self.table = table + self.operation = operation + if connection: + self.connection = connection + else: + self.connection = bigchaindb.backend.connect( + **bigchaindb.config['database']) + + def run_forever(self): + """Main loop of the ``multipipes.Node`` + + This method is responsible for first feeding the prefeed to the + outqueue and after that start the changefeed and recover from any + errors that may occur in the backend. + """ + raise NotImplementedError + + def run_changefeed(self): + """Backend specific method to run the changefeed. + + The changefeed is is usually a backend cursor that is not closed when + all the results are exausted. Instead it remains open waiting for new + results. + + This method should also filter each result based on the ``operation`` + and put all matching results on the outqueue of ``multipipes.Node``. + """ + raise NotImplementedError + + +@singledispatch +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a ChangeFeed. + + Args: + connection (:class:`~bigchaindb.backend.connection.Connection`): + A connection to the database. + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + """ + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index e69de29b..8a5e4973 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -0,0 +1,66 @@ +import time +import logging +import rethinkdb as r + +from bigchaindb import backend +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend.utils import module_dispatch_registrar +from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection + + +logger = logging.getLogger(__name__) +register_changefeed = module_dispatch_registrar(backend.changefeed) + + +class RethinkDBChangeFeed(ChangeFeed): + """This class wraps a RethinkDB changefeed.""" + + def run_forever(self): + for element in self.prefeed: + self.outqueue.put(element) + + while True: + try: + self.run_changefeed() + break + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table) + .changes()): + is_insert = change['old_val'] is None + is_delete = change['new_val'] is None + is_update = not is_insert and not is_delete + + if is_insert and (self.operation & ChangeFeed.INSERT): + self.outqueue.put(change['new_val']) + elif is_delete and (self.operation & ChangeFeed.DELETE): + self.outqueue.put(change['old_val']) + elif is_update and (self.operation & ChangeFeed.UPDATE): + self.outqueue.put(change['new_val']) + + +@register_changefeed(RethinkDBConnection) +def get_changefeed(connection, table, operation, *, prefeed=None): + """Return a RethinkDB changefeed. + + Args: + connection (:class:`~bigchaindb.backend.rethinkdb.connection.RethinkDBConnection`): # noqa + A connection to the database. + table (str): name of the table to listen to for changes. + operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or + ChangeFeed.UPDATE. Combining multiple operation is possible + with the bitwise ``|`` operator + (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) + prefeed (iterable): whatever set of data you want to be published + first. + + Returns: + An instance of + :class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`. + """ + + return RethinkDBChangeFeed(table, operation, prefeed=prefeed, + connection=connection) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0394aa23..dc0e1163 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -10,8 +10,11 @@ import logging import rethinkdb as r from multipipes import Pipeline, Node, Pipe +import bigchaindb +from bigchaindb.backend import connect +from bigchaindb.backend.changefeed import ChangeFeed +from bigchaindb.backend import get_changefeed from bigchaindb.models import Transaction -from bigchaindb.pipelines.utils import ChangeFeed from bigchaindb import Bigchain @@ -147,13 +150,6 @@ def initial(): .order_by(index=r.asc('assignee__transaction_timestamp'))) -def get_changefeed(): - """Create and return the changefeed for the backlog.""" - - return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -174,7 +170,11 @@ def create_pipeline(): def start(): """Create, start, and return the block pipeline.""" + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'backlog', + ChangeFeed.INSER | ChangeFeed.UPDATE, + preefed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed()) + pipeline.setup(indata=changefeed) pipeline.start() return pipeline From dbf53c80e7533e1e2afca23f92ff74c69d615533 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 8 Dec 2016 18:33:52 +0100 Subject: [PATCH 02/12] Removed rethinkdb dependencies in pipelines.block --- bigchaindb/backend/query.py | 15 +++++++++++++++ bigchaindb/backend/rethinkdb/query.py | 11 +++++++++++ bigchaindb/core.py | 12 ++++++++++++ bigchaindb/pipelines/block.py | 14 ++++---------- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 88f215c4..930fb3e1 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -318,3 +318,18 @@ def get_unvoted_blocks(connection, node_pubkey): """ raise NotImplementedError + + +@singledispatch +def get_old_transactions(connection, node_pubkey): + """Return all the transactions from the backlog that have not been + processed by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unprocessed transactions + """ + + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 8fa6a512..57563ad3 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -244,3 +244,14 @@ def get_unvoted_blocks(connection, node_pubkey): # database level. Solving issue #444 can help untangling the situation unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) return unvoted_blocks + + +@register_query(RethinkDBConnection) +def get_old_transactions(connection, node_pubkey): + return connection.run( + r.table('backlog') + .between([node_pubkey, r.minval], + [node_pubkey, r.maxval], + index='assignee__transaction_timestamp') + .order_by(index=r.asc('assignee_transaction_timestamp')) + ) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index f0e1b89c..5a61f117 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -676,3 +676,15 @@ class Bigchain(object): return Bigchain.BLOCK_INVALID else: return Bigchain.BLOCK_UNDECIDED + + def get_old_transactions(self): + """Return all the transactions from the backlog that have not been + processed by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unprocessed transactions + """ + return backend.query.get_old_transactions(self.connection, self.me) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index dc0e1163..d7cc2735 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -6,9 +6,7 @@ function. """ import logging - -import rethinkdb as r -from multipipes import Pipeline, Node, Pipe +from multipipes import Pipeline, Node import bigchaindb from bigchaindb.backend import connect @@ -119,7 +117,8 @@ class BlockPipeline: Returns: :class:`~bigchaindb.models.Block`: The Block. """ - logger.info('Write new block %s with %s transactions', block.id, len(block.transactions)) + logger.info('Write new block %s with %s transactions', + block.id, len(block.transactions)) self.bigchain.write_block(block) return block @@ -142,12 +141,7 @@ def initial(): bigchain = Bigchain() - return bigchain.connection.run( - r.table('backlog') - .between([bigchain.me, r.minval], - [bigchain.me, r.maxval], - index='assignee__transaction_timestamp') - .order_by(index=r.asc('assignee__transaction_timestamp'))) + return bigchain.get_old_transactions() def create_pipeline(): From 8266dfadb0a5305bacbffdc34af25c71eae868a1 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Thu, 8 Dec 2016 18:41:11 +0100 Subject: [PATCH 03/12] Fixed typos Fixed failing test --- bigchaindb/backend/rethinkdb/query.py | 2 +- bigchaindb/pipelines/block.py | 4 ++-- tests/pipelines/test_block_creation.py | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 57563ad3..61f008c2 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -253,5 +253,5 @@ def get_old_transactions(connection, node_pubkey): .between([node_pubkey, r.minval], [node_pubkey, r.maxval], index='assignee__transaction_timestamp') - .order_by(index=r.asc('assignee_transaction_timestamp')) + .order_by(index=r.asc('assignee__transaction_timestamp')) ) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index d7cc2735..0f490ae4 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -166,8 +166,8 @@ def start(): """Create, start, and return the block pipeline.""" connection = connect(**bigchaindb.config['database']) changefeed = get_changefeed(connection, 'backlog', - ChangeFeed.INSER | ChangeFeed.UPDATE, - preefed=initial()) + ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) pipeline = create_pipeline() pipeline.setup(indata=changefeed) pipeline.start() diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 0616365d..a48749fc 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -95,7 +95,8 @@ def test_duplicate_transaction(b, user_pk): # verify tx is in the backlog assert b.get_transaction(txs[0].id) is not None - # try to validate a transaction that's already in the chain; should not work + # try to validate a transaction that's already in the chain; should not + # work assert block_maker.validate_tx(txs[0].to_dict()) is None # duplicate tx should be removed from backlog @@ -159,9 +160,10 @@ def test_start(create_pipeline): def test_full_pipeline(b, user_pk): import random - from bigchaindb.backend import query + from bigchaindb.backend import query, get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block, Transaction - from bigchaindb.pipelines.block import create_pipeline, get_changefeed + from bigchaindb.pipelines.block import create_pipeline, initial outpipe = Pipe() # include myself here, so that some tx are actually assigned to me @@ -175,8 +177,11 @@ def test_full_pipeline(b, user_pk): assert query.count_backlog(b.connection) == 100 + changefeed = get_changefeed(b.connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed(), outdata=outpipe) + pipeline.setup(indata=changefeed, outdata=outpipe) pipeline.start() time.sleep(2) From f09285d32f193c7e0156fb458507aa5fbe74e105 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 9 Dec 2016 11:29:44 +0100 Subject: [PATCH 04/12] Fixed some code typos Changed election pipeline to use the new changefeed abstraction --- bigchaindb/backend/rethinkdb/changefeed.py | 3 +-- bigchaindb/pipelines/block.py | 3 +-- bigchaindb/pipelines/election.py | 12 ++++++------ tests/pipelines/test_block_creation.py | 6 ++++-- tests/pipelines/test_election.py | 9 +++++++-- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index 8a5e4973..921f76ba 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -28,8 +28,7 @@ class RethinkDBChangeFeed(ChangeFeed): time.sleep(1) def run_changefeed(self): - for change in self.bigchain.connection.run(r.table(self.table) - .changes()): + for change in self.connection.run(r.table(self.table).changes()): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0f490ae4..14e9e5a4 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -9,9 +9,8 @@ import logging from multipipes import Pipeline, Node import bigchaindb -from bigchaindb.backend import connect +from bigchaindb.backend import connect, get_changefeed from bigchaindb.backend.changefeed import ChangeFeed -from bigchaindb.backend import get_changefeed from bigchaindb.models import Transaction from bigchaindb import Bigchain diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 7140761a..4ba11e8e 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -8,7 +8,9 @@ import logging from multipipes import Pipeline, Node -from bigchaindb.pipelines.utils import ChangeFeed +import bigchaindb +from bigchaindb.backend import connect, get_changefeed +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain @@ -50,10 +52,6 @@ class Election: return invalid_block -def get_changefeed(): - return ChangeFeed(table='votes', operation=ChangeFeed.INSERT) - - def create_pipeline(): election = Election() @@ -66,7 +64,9 @@ def create_pipeline(): def start(): + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed()) + pipeline.setup(indata=changefeed) pipeline.start() return pipeline diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index a48749fc..ec07a57f 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -160,7 +160,8 @@ def test_start(create_pipeline): def test_full_pipeline(b, user_pk): import random - from bigchaindb.backend import query, get_changefeed + import bigchaindb + from bigchaindb.backend import query, get_changefeed, connect from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block, Transaction from bigchaindb.pipelines.block import create_pipeline, initial @@ -177,7 +178,8 @@ def test_full_pipeline(b, user_pk): assert query.count_backlog(b.connection) == 100 - changefeed = get_changefeed(b.connection, 'backlog', + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, prefeed=initial()) pipeline = create_pipeline() diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index f9fee0bb..0b5b5ca8 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -136,7 +136,9 @@ def test_start(mock_start): def test_full_pipeline(b, user_pk): import random - from bigchaindb.backend import query + import bigchaindb + from bigchaindb.backend import query, get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Transaction outpipe = Pipe() @@ -162,10 +164,13 @@ def test_full_pipeline(b, user_pk): invalid_block = b.create_block(txs) b.write_block(invalid_block) + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT) pipeline = election.create_pipeline() - pipeline.setup(indata=election.get_changefeed(), outdata=outpipe) + pipeline.setup(indata=changefeed, outdata=outpipe) pipeline.start() time.sleep(1) + # vote one block valid, one invalid vote_valid = b.vote(valid_block.id, 'b' * 64, True) vote_invalid = b.vote(invalid_block.id, 'c' * 64, False) From 134f9e85a00b117bc5221f37dd94c8926994e7ce Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 9 Dec 2016 14:02:11 +0100 Subject: [PATCH 05/12] Updated vote pipeline to use the changefeed abstraction Fixed tests. --- bigchaindb/pipelines/vote.py | 17 ++++++++--------- tests/pipelines/test_vote.py | 18 ++++++++++++++---- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index b89e0786..cb751d0f 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -10,10 +10,12 @@ from collections import Counter from multipipes import Pipeline, Node from bigchaindb.common import exceptions +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb.backend import connect, get_changefeed +from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block -from bigchaindb.pipelines.utils import ChangeFeed -from bigchaindb import Bigchain class Vote: @@ -142,12 +144,6 @@ def initial(): return rs -def get_changefeed(): - """Create and return the changefeed for the bigchain table.""" - - return ChangeFeed('bigchain', operation=ChangeFeed.INSERT, prefeed=initial()) - - def create_pipeline(): """Create and return the pipeline of operations to be distributed on different processes.""" @@ -168,7 +164,10 @@ def create_pipeline(): def start(): """Create, start, and return the block pipeline.""" + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed()) + pipeline.setup(indata=changefeed) pipeline.start() return pipeline diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index 0f471c0b..feac58bc 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -487,7 +487,9 @@ def test_invalid_block_voting(monkeypatch, b, user_pk): def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): - from bigchaindb.backend import query + import bigchaindb + from bigchaindb.backend import query, get_changefeed, connect + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.pipelines import vote outpipe = Pipe() @@ -507,8 +509,11 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.setup(indata=changefeed, outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount @@ -535,7 +540,9 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): - from bigchaindb.backend import query + import bigchaindb + from bigchaindb.backend import query, connect, get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.pipelines import vote outpipe = Pipe() @@ -554,8 +561,11 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) + connection = connect(**bigchaindb.config['database']) + changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) + vote_pipeline.setup(indata=changefeed, outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount From e303e355db553db09fbe09c6687fdda49938540d Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Fri, 9 Dec 2016 14:39:48 +0100 Subject: [PATCH 06/12] Delete outdated pipelines/utils.py Reorganized and fixed tests. --- bigchaindb/pipelines/utils.py | 75 ------------------------- tests/backend/test_changefeed.py | 96 ++++++++++++++++++++++++++++++++ tests/pipelines/test_utils.py | 80 -------------------------- tests/test_run_query_util.py | 10 ++-- 4 files changed, 100 insertions(+), 161 deletions(-) delete mode 100644 bigchaindb/pipelines/utils.py create mode 100644 tests/backend/test_changefeed.py delete mode 100644 tests/pipelines/test_utils.py diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py deleted file mode 100644 index 71f740ee..00000000 --- a/bigchaindb/pipelines/utils.py +++ /dev/null @@ -1,75 +0,0 @@ -"""Utility classes and functions to work with the pipelines.""" - - -import time -import rethinkdb as r -import logging -from multipipes import Node - -from bigchaindb import Bigchain - - -logger = logging.getLogger(__name__) - - -class ChangeFeed(Node): - """This class wraps a RethinkDB changefeed adding a ``prefeed``. - - It extends :class:`multipipes.Node` to make it pluggable in other - Pipelines instances, and makes usage of ``self.outqueue`` to output - the data. - - A changefeed is a real time feed on inserts, updates, and deletes, and - is volatile. This class is a helper to create changefeeds. Moreover, - it provides a way to specify a ``prefeed`` of iterable data to output - before the actual changefeed. - """ - - INSERT = 1 - DELETE = 2 - UPDATE = 4 - - def __init__(self, table, operation, prefeed=None, bigchain=None): - """Create a new RethinkDB ChangeFeed. - - Args: - table (str): name of the table to listen to for changes. - operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. Combining multiple operation is possible - with the bitwise ``|`` operator - (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (iterable): whatever set of data you want to be published - first. - bigchain (``Bigchain``): the bigchain instance to use (can be None). - """ - - super().__init__(name='changefeed') - self.prefeed = prefeed if prefeed else [] - self.table = table - self.operation = operation - self.bigchain = bigchain or Bigchain() - - def run_forever(self): - for element in self.prefeed: - self.outqueue.put(element) - - while True: - try: - self.run_changefeed() - break - except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: - logger.exception(exc) - time.sleep(1) - - def run_changefeed(self): - for change in self.bigchain.connection.run(r.table(self.table).changes()): - is_insert = change['old_val'] is None - is_delete = change['new_val'] is None - is_update = not is_insert and not is_delete - - if is_insert and (self.operation & ChangeFeed.INSERT): - self.outqueue.put(change['new_val']) - elif is_delete and (self.operation & ChangeFeed.DELETE): - self.outqueue.put(change['old_val']) - elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change['new_val']) diff --git a/tests/backend/test_changefeed.py b/tests/backend/test_changefeed.py new file mode 100644 index 00000000..d9a8f45b --- /dev/null +++ b/tests/backend/test_changefeed.py @@ -0,0 +1,96 @@ +import pytest +from unittest.mock import Mock + +from multipipes import Pipe + +import bigchaindb +from bigchaindb import Bigchain +from bigchaindb.backend import connect + + +@pytest.fixture +def mock_changefeed_data(): + return [ + { + 'new_val': 'seems like we have an insert here', + 'old_val': None, + }, { + 'new_val': None, + 'old_val': 'seems like we have a delete here', + }, { + 'new_val': 'seems like we have an update here', + 'old_val': 'seems like we have an update here', + } + ] + + +@pytest.fixture +def mock_changefeed_connection(mock_changefeed_data): + connection = connect(**bigchaindb.config['database']) + connection.run = Mock(return_value=mock_changefeed_data) + return connection + + +def test_changefeed_insert(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.INSERT) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.qsize() == 0 + + +def test_changefeed_delete(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.DELETE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have a delete here' + assert outpipe.qsize() == 0 + + +def test_changefeed_update(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, + 'backlog', ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_multiple_operations(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.get() == 'seems like we have an insert here' + assert outpipe.get() == 'seems like we have an update here' + assert outpipe.qsize() == 0 + + +def test_changefeed_prefeed(mock_changefeed_connection): + from bigchaindb.backend import get_changefeed + from bigchaindb.backend.changefeed import ChangeFeed + + outpipe = Pipe() + changefeed = get_changefeed(mock_changefeed_connection, 'backlog', + ChangeFeed.INSERT, prefeed=[1, 2, 3]) + changefeed.outqueue = outpipe + changefeed.run_forever() + assert outpipe.qsize() == 4 diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py deleted file mode 100644 index ebe0db75..00000000 --- a/tests/pipelines/test_utils.py +++ /dev/null @@ -1,80 +0,0 @@ -import pytest -from unittest.mock import Mock - -from multipipes import Pipe -from bigchaindb import Bigchain -from bigchaindb.backend.connection import Connection -from bigchaindb.pipelines.utils import ChangeFeed - - -@pytest.fixture -def mock_changefeed_data(): - return [ - { - 'new_val': 'seems like we have an insert here', - 'old_val': None, - }, { - 'new_val': None, - 'old_val': 'seems like we have a delete here', - }, { - 'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here', - } - ] - - -@pytest.fixture -def mock_changefeed_bigchain(mock_changefeed_data): - connection = Connection(host=None, port=None, dbname=None) - connection.run = Mock(return_value=mock_changefeed_data) - return Bigchain(connection=connection) - - -def test_changefeed_insert(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.qsize() == 0 - - -def test_changefeed_delete(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.DELETE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have a delete here' - assert outpipe.qsize() == 0 - - -def test_changefeed_update(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE, bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_multiple_operations(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == 'seems like we have an update here' - assert outpipe.qsize() == 0 - - -def test_changefeed_prefeed(mock_changefeed_bigchain): - outpipe = Pipe() - changefeed = ChangeFeed('backlog', - ChangeFeed.INSERT, - prefeed=[1, 2, 3], - bigchain=mock_changefeed_bigchain) - changefeed.outqueue = outpipe - changefeed.run_forever() - assert outpipe.qsize() == 4 diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 1aeea11e..b740983c 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -48,8 +48,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): import time import multiprocessing as mp - from bigchaindb import Bigchain - from bigchaindb.pipelines.utils import ChangeFeed + from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend.rethinkdb.changefeed import RethinkDBChangeFeed class MockConnection: tries = 0 @@ -75,10 +75,8 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): else: time.sleep(10) - bigchain = Bigchain() - bigchain.connection = MockConnection() - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, - bigchain=bigchain) + changefeed = RethinkDBChangeFeed('cat_facts', ChangeFeed.INSERT, + connection=MockConnection()) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True) From 007b574eb8a56d159f77adc294b3aac4ea00e2bd Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 13 Dec 2016 11:27:29 +0100 Subject: [PATCH 07/12] fixed missing import --- bigchaindb/pipelines/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 14e9e5a4..d666591b 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -6,7 +6,7 @@ function. """ import logging -from multipipes import Pipeline, Node +from multipipes import Pipeline, Node, Pipe import bigchaindb from bigchaindb.backend import connect, get_changefeed From 8c46e407614c390d477ca8fceb2d27db93ec7896 Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 13 Dec 2016 11:40:20 +0100 Subject: [PATCH 08/12] fixed documentation --- docs/server/source/appendices/pipelines.rst | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/server/source/appendices/pipelines.rst b/docs/server/source/appendices/pipelines.rst index 98b993a1..8016e8cb 100644 --- a/docs/server/source/appendices/pipelines.rst +++ b/docs/server/source/appendices/pipelines.rst @@ -24,9 +24,3 @@ Stale Transaction Monitoring ============================ .. automodule:: bigchaindb.pipelines.stale - - -Utilities -========= - -.. automodule:: bigchaindb.pipelines.utils From d246a1498f78b87e01206717f6b7dc9657d9ae4b Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 13 Dec 2016 12:46:52 +0100 Subject: [PATCH 09/12] Added some tests for changefeed generics pepify import googlify docstring --- bigchaindb/backend/changefeed.py | 5 +++-- tests/backend/test_generics.py | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index cd8d721a..459dc06c 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -1,6 +1,7 @@ """Changefeed interfaces for backends.""" from functools import singledispatch + from multipipes import Node import bigchaindb @@ -32,8 +33,8 @@ class ChangeFeed(Node): ChangeFeed.UPDATE. Combining multiple operation is possible with the bitwise ``|`` operator (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (iterable): whatever set of data you want to be published - first. + prefeed (:class:`~collections.abc.Iterable`): whatever set of data + you want to be published first. connection (:class:`~bigchaindb.backend.connection.Connection`): A connection to the database (can be None). """ diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index b11d534a..d2d29921 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -37,9 +37,31 @@ def test_schema(schema_func_name, args_qty): ('get_votes_by_block_id_and_voter', 2), ('update_transaction', 2), ('get_transaction_from_block', 2), + ('get_old_transactions', 1), )) def test_query(query_func_name, args_qty): from bigchaindb.backend import query query_func = getattr(query, query_func_name) with raises(NotImplementedError): query_func(None, *range(args_qty)) + + +@mark.parametrize('changefeed_func_name,args_qty', ( + ('get_changefeed', 2), +)) +def test_changefeed(changefeed_func_name, args_qty): + from bigchaindb.backend import changefeed + changefeed_func = getattr(changefeed, changefeed_func_name) + with raises(NotImplementedError): + changefeed_func(None, *range(args_qty)) + + +@mark.parametrize('changefeed_class_func_name,args_qty', ( + ('run_forever', 0), + ('run_changefeed', 0), +)) +def test_changefeed_class(changefeed_class_func_name, args_qty): + from bigchaindb.backend.changefeed import ChangeFeed + changefeed_class_func = getattr(ChangeFeed, changefeed_class_func_name) + with raises(NotImplementedError): + changefeed_class_func(None, *range(args_qty)) From 530681c9054b1acd1aa02c646bb882ccdd4d6b1b Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Tue, 13 Dec 2016 15:11:25 +0100 Subject: [PATCH 10/12] Re-added the get_changefeed methos in the pipelines. Fixed tests. Fixed typos. --- bigchaindb/backend/changefeed.py | 13 +++++++------ bigchaindb/backend/rethinkdb/changefeed.py | 11 ----------- bigchaindb/pipelines/block.py | 15 +++++++++------ bigchaindb/pipelines/election.py | 11 +++++++---- bigchaindb/pipelines/vote.py | 13 ++++++++----- tests/pipelines/test_block_creation.py | 12 +++--------- tests/pipelines/test_election.py | 8 ++------ tests/pipelines/test_vote.py | 18 ++++-------------- 8 files changed, 40 insertions(+), 61 deletions(-) diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index 459dc06c..3b2104f1 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -30,13 +30,14 @@ class ChangeFeed(Node): Args: table (str): name of the table to listen to for changes. operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. Combining multiple operation is possible + ChangeFeed.UPDATE. Combining multiple operations is possible with the bitwise ``|`` operator (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (:class:`~collections.abc.Iterable`): whatever set of data - you want to be published first. - connection (:class:`~bigchaindb.backend.connection.Connection`): - A connection to the database (can be None). + prefeed (:class:`~collections.abc.Iterable`, optional): whatever + set of data you want to be published first. + connection (:class:`~bigchaindb.backend.connection.Connection`, optional): # noqa + A connection to the database. If no connection is provided a + default connection will be created. """ super().__init__(name='changefeed') @@ -53,7 +54,7 @@ class ChangeFeed(Node): """Main loop of the ``multipipes.Node`` This method is responsible for first feeding the prefeed to the - outqueue and after that start the changefeed and recover from any + outqueue and after that starting the changefeed and recovering from any errors that may occur in the backend. """ raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/changefeed.py b/bigchaindb/backend/rethinkdb/changefeed.py index 921f76ba..e762d905 100644 --- a/bigchaindb/backend/rethinkdb/changefeed.py +++ b/bigchaindb/backend/rethinkdb/changefeed.py @@ -45,17 +45,6 @@ class RethinkDBChangeFeed(ChangeFeed): def get_changefeed(connection, table, operation, *, prefeed=None): """Return a RethinkDB changefeed. - Args: - connection (:class:`~bigchaindb.backend.rethinkdb.connection.RethinkDBConnection`): # noqa - A connection to the database. - table (str): name of the table to listen to for changes. - operation (int): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or - ChangeFeed.UPDATE. Combining multiple operation is possible - with the bitwise ``|`` operator - (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) - prefeed (iterable): whatever set of data you want to be published - first. - Returns: An instance of :class:`~bigchaindb.backend.rethinkdb.RethinkDBChangeFeed`. diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index d666591b..bd62a677 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -9,7 +9,7 @@ import logging from multipipes import Pipeline, Node, Pipe import bigchaindb -from bigchaindb.backend import connect, get_changefeed +from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Transaction from bigchaindb import Bigchain @@ -161,13 +161,16 @@ def create_pipeline(): return pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'backlog', + ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) + + def start(): """Create, start, and return the block pipeline.""" - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=changefeed) + pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 4ba11e8e..850613a3 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -9,7 +9,7 @@ import logging from multipipes import Pipeline, Node import bigchaindb -from bigchaindb.backend import connect, get_changefeed +from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.models import Block from bigchaindb import Bigchain @@ -63,10 +63,13 @@ def create_pipeline(): return election_pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT) + + def start(): - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT) pipeline = create_pipeline() - pipeline.setup(indata=changefeed) + pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/bigchaindb/pipelines/vote.py b/bigchaindb/pipelines/vote.py index cb751d0f..dd138d41 100644 --- a/bigchaindb/pipelines/vote.py +++ b/bigchaindb/pipelines/vote.py @@ -12,7 +12,7 @@ from bigchaindb.common import exceptions import bigchaindb from bigchaindb import Bigchain -from bigchaindb.backend import connect, get_changefeed +from bigchaindb import backend from bigchaindb.backend.changefeed import ChangeFeed from bigchaindb.consensus import BaseConsensusRules from bigchaindb.models import Transaction, Block @@ -161,13 +161,16 @@ def create_pipeline(): return vote_pipeline +def get_changefeed(): + connection = backend.connect(**bigchaindb.config['database']) + return backend.get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, + prefeed=initial()) + + def start(): """Create, start, and return the block pipeline.""" - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, - prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=changefeed) + pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ec07a57f..5c09f14e 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -160,11 +160,9 @@ def test_start(create_pipeline): def test_full_pipeline(b, user_pk): import random - import bigchaindb - from bigchaindb.backend import query, get_changefeed, connect - from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend import query from bigchaindb.models import Block, Transaction - from bigchaindb.pipelines.block import create_pipeline, initial + from bigchaindb.pipelines.block import create_pipeline, get_changefeed outpipe = Pipe() # include myself here, so that some tx are actually assigned to me @@ -178,12 +176,8 @@ def test_full_pipeline(b, user_pk): assert query.count_backlog(b.connection) == 100 - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) pipeline = create_pipeline() - pipeline.setup(indata=changefeed, outdata=outpipe) + pipeline.setup(indata=get_changefeed(), outdata=outpipe) pipeline.start() time.sleep(2) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index 0b5b5ca8..51dad9e4 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -136,9 +136,7 @@ def test_start(mock_start): def test_full_pipeline(b, user_pk): import random - import bigchaindb - from bigchaindb.backend import query, get_changefeed, connect - from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend import query from bigchaindb.models import Transaction outpipe = Pipe() @@ -164,10 +162,8 @@ def test_full_pipeline(b, user_pk): invalid_block = b.create_block(txs) b.write_block(invalid_block) - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'votes', ChangeFeed.INSERT) pipeline = election.create_pipeline() - pipeline.setup(indata=changefeed, outdata=outpipe) + pipeline.setup(indata=election.get_changefeed(), outdata=outpipe) pipeline.start() time.sleep(1) diff --git a/tests/pipelines/test_vote.py b/tests/pipelines/test_vote.py index feac58bc..0f471c0b 100644 --- a/tests/pipelines/test_vote.py +++ b/tests/pipelines/test_vote.py @@ -487,9 +487,7 @@ def test_invalid_block_voting(monkeypatch, b, user_pk): def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): - import bigchaindb - from bigchaindb.backend import query, get_changefeed, connect - from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend import query from bigchaindb.pipelines import vote outpipe = Pipe() @@ -509,11 +507,8 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, - prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=changefeed, outdata=outpipe) + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount @@ -540,9 +535,7 @@ def test_voter_considers_unvoted_blocks_when_single_node(monkeypatch, b): def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): - import bigchaindb - from bigchaindb.backend import query, connect, get_changefeed - from bigchaindb.backend.changefeed import ChangeFeed + from bigchaindb.backend import query from bigchaindb.pipelines import vote outpipe = Pipe() @@ -561,11 +554,8 @@ def test_voter_chains_blocks_with_the_previous_ones(monkeypatch, b): block_ids.append(block_2.id) b.write_block(block_2) - connection = connect(**bigchaindb.config['database']) - changefeed = get_changefeed(connection, 'bigchain', ChangeFeed.INSERT, - prefeed=vote.initial()) vote_pipeline = vote.create_pipeline() - vote_pipeline.setup(indata=changefeed, outdata=outpipe) + vote_pipeline.setup(indata=vote.get_changefeed(), outdata=outpipe) vote_pipeline.start() # We expects two votes, so instead of waiting an arbitrary amount From 14a2c13a06ce2568835f1a8e8f44aa3379415a9a Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 14 Dec 2016 13:12:47 +0100 Subject: [PATCH 11/12] Removed get_old_transactions Fixed tests. --- bigchaindb/backend/query.py | 15 --------------- bigchaindb/backend/rethinkdb/query.py | 11 ----------- bigchaindb/core.py | 12 ------------ bigchaindb/pipelines/block.py | 3 +-- tests/backend/test_generics.py | 1 - 5 files changed, 1 insertion(+), 41 deletions(-) diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 930fb3e1..88f215c4 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -318,18 +318,3 @@ def get_unvoted_blocks(connection, node_pubkey): """ raise NotImplementedError - - -@singledispatch -def get_old_transactions(connection, node_pubkey): - """Return all the transactions from the backlog that have not been - processed by the specified node. - - Args: - node_pubkey (str): base58 encoded public key - - Returns: - :obj:`list` of :obj:`dict`: a list of unprocessed transactions - """ - - raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 61f008c2..8fa6a512 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -244,14 +244,3 @@ def get_unvoted_blocks(connection, node_pubkey): # database level. Solving issue #444 can help untangling the situation unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) return unvoted_blocks - - -@register_query(RethinkDBConnection) -def get_old_transactions(connection, node_pubkey): - return connection.run( - r.table('backlog') - .between([node_pubkey, r.minval], - [node_pubkey, r.maxval], - index='assignee__transaction_timestamp') - .order_by(index=r.asc('assignee__transaction_timestamp')) - ) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 5a61f117..f0e1b89c 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -676,15 +676,3 @@ class Bigchain(object): return Bigchain.BLOCK_INVALID else: return Bigchain.BLOCK_UNDECIDED - - def get_old_transactions(self): - """Return all the transactions from the backlog that have not been - processed by the specified node. - - Args: - node_pubkey (str): base58 encoded public key - - Returns: - :obj:`list` of :obj:`dict`: a list of unprocessed transactions - """ - return backend.query.get_old_transactions(self.connection, self.me) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index b8fcfa30..d5c81741 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -157,8 +157,7 @@ def create_pipeline(): def get_changefeed(): connection = backend.connect(**bigchaindb.config['database']) return backend.get_changefeed(connection, 'backlog', - ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) + ChangeFeed.INSERT | ChangeFeed.UPDATE) def start(): diff --git a/tests/backend/test_generics.py b/tests/backend/test_generics.py index d2d29921..f9a76e6d 100644 --- a/tests/backend/test_generics.py +++ b/tests/backend/test_generics.py @@ -37,7 +37,6 @@ def test_schema(schema_func_name, args_qty): ('get_votes_by_block_id_and_voter', 2), ('update_transaction', 2), ('get_transaction_from_block', 2), - ('get_old_transactions', 1), )) def test_query(query_func_name, args_qty): from bigchaindb.backend import query From 3cfe8cb160ab85859a592e13e1f2a4e39c8ed3eb Mon Sep 17 00:00:00 2001 From: Rodolphe Marques Date: Wed, 14 Dec 2016 13:28:37 +0100 Subject: [PATCH 12/12] fixed typo --- bigchaindb/backend/changefeed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigchaindb/backend/changefeed.py b/bigchaindb/backend/changefeed.py index 3b2104f1..8012bf62 100644 --- a/bigchaindb/backend/changefeed.py +++ b/bigchaindb/backend/changefeed.py @@ -62,8 +62,8 @@ class ChangeFeed(Node): def run_changefeed(self): """Backend specific method to run the changefeed. - The changefeed is is usually a backend cursor that is not closed when - all the results are exausted. Instead it remains open waiting for new + The changefeed is usually a backend cursor that is not closed when all + the results are exausted. Instead it remains open waiting for new results. This method should also filter each result based on the ``operation``