diff --git a/bigchaindb/backend/query.py b/bigchaindb/backend/query.py index 88f215c4..930fb3e1 100644 --- a/bigchaindb/backend/query.py +++ b/bigchaindb/backend/query.py @@ -318,3 +318,18 @@ def get_unvoted_blocks(connection, node_pubkey): """ raise NotImplementedError + + +@singledispatch +def get_old_transactions(connection, node_pubkey): + """Return all the transactions from the backlog that have not been + processed by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unprocessed transactions + """ + + raise NotImplementedError diff --git a/bigchaindb/backend/rethinkdb/query.py b/bigchaindb/backend/rethinkdb/query.py index 8fa6a512..57563ad3 100644 --- a/bigchaindb/backend/rethinkdb/query.py +++ b/bigchaindb/backend/rethinkdb/query.py @@ -244,3 +244,14 @@ def get_unvoted_blocks(connection, node_pubkey): # database level. Solving issue #444 can help untangling the situation unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted) return unvoted_blocks + + +@register_query(RethinkDBConnection) +def get_old_transactions(connection, node_pubkey): + return connection.run( + r.table('backlog') + .between([node_pubkey, r.minval], + [node_pubkey, r.maxval], + index='assignee__transaction_timestamp') + .order_by(index=r.asc('assignee_transaction_timestamp')) + ) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index f0e1b89c..5a61f117 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -676,3 +676,15 @@ class Bigchain(object): return Bigchain.BLOCK_INVALID else: return Bigchain.BLOCK_UNDECIDED + + def get_old_transactions(self): + """Return all the transactions from the backlog that have not been + processed by the specified node. + + Args: + node_pubkey (str): base58 encoded public key + + Returns: + :obj:`list` of :obj:`dict`: a list of unprocessed transactions + """ + return backend.query.get_old_transactions(self.connection, self.me) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index dc0e1163..d7cc2735 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -6,9 +6,7 @@ function. """ import logging - -import rethinkdb as r -from multipipes import Pipeline, Node, Pipe +from multipipes import Pipeline, Node import bigchaindb from bigchaindb.backend import connect @@ -119,7 +117,8 @@ class BlockPipeline: Returns: :class:`~bigchaindb.models.Block`: The Block. """ - logger.info('Write new block %s with %s transactions', block.id, len(block.transactions)) + logger.info('Write new block %s with %s transactions', + block.id, len(block.transactions)) self.bigchain.write_block(block) return block @@ -142,12 +141,7 @@ def initial(): bigchain = Bigchain() - return bigchain.connection.run( - r.table('backlog') - .between([bigchain.me, r.minval], - [bigchain.me, r.maxval], - index='assignee__transaction_timestamp') - .order_by(index=r.asc('assignee__transaction_timestamp'))) + return bigchain.get_old_transactions() def create_pipeline():