mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Removed rethinkdb dependencies in pipelines.block
This commit is contained in:
parent
341f43267a
commit
dbf53c80e7
@ -318,3 +318,18 @@ def get_unvoted_blocks(connection, node_pubkey):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
@singledispatch
|
||||||
|
def get_old_transactions(connection, node_pubkey):
|
||||||
|
"""Return all the transactions from the backlog that have not been
|
||||||
|
processed by the specified node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_pubkey (str): base58 encoded public key
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`list` of :obj:`dict`: a list of unprocessed transactions
|
||||||
|
"""
|
||||||
|
|
||||||
|
raise NotImplementedError
|
||||||
|
@ -244,3 +244,14 @@ def get_unvoted_blocks(connection, node_pubkey):
|
|||||||
# database level. Solving issue #444 can help untangling the situation
|
# database level. Solving issue #444 can help untangling the situation
|
||||||
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
|
unvoted_blocks = filter(lambda block: not util.is_genesis_block(block), unvoted)
|
||||||
return unvoted_blocks
|
return unvoted_blocks
|
||||||
|
|
||||||
|
|
||||||
|
@register_query(RethinkDBConnection)
|
||||||
|
def get_old_transactions(connection, node_pubkey):
|
||||||
|
return connection.run(
|
||||||
|
r.table('backlog')
|
||||||
|
.between([node_pubkey, r.minval],
|
||||||
|
[node_pubkey, r.maxval],
|
||||||
|
index='assignee__transaction_timestamp')
|
||||||
|
.order_by(index=r.asc('assignee_transaction_timestamp'))
|
||||||
|
)
|
||||||
|
@ -676,3 +676,15 @@ class Bigchain(object):
|
|||||||
return Bigchain.BLOCK_INVALID
|
return Bigchain.BLOCK_INVALID
|
||||||
else:
|
else:
|
||||||
return Bigchain.BLOCK_UNDECIDED
|
return Bigchain.BLOCK_UNDECIDED
|
||||||
|
|
||||||
|
def get_old_transactions(self):
|
||||||
|
"""Return all the transactions from the backlog that have not been
|
||||||
|
processed by the specified node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_pubkey (str): base58 encoded public key
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`list` of :obj:`dict`: a list of unprocessed transactions
|
||||||
|
"""
|
||||||
|
return backend.query.get_old_transactions(self.connection, self.me)
|
||||||
|
@ -6,9 +6,7 @@ function.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from multipipes import Pipeline, Node
|
||||||
import rethinkdb as r
|
|
||||||
from multipipes import Pipeline, Node, Pipe
|
|
||||||
|
|
||||||
import bigchaindb
|
import bigchaindb
|
||||||
from bigchaindb.backend import connect
|
from bigchaindb.backend import connect
|
||||||
@ -119,7 +117,8 @@ class BlockPipeline:
|
|||||||
Returns:
|
Returns:
|
||||||
:class:`~bigchaindb.models.Block`: The Block.
|
:class:`~bigchaindb.models.Block`: The Block.
|
||||||
"""
|
"""
|
||||||
logger.info('Write new block %s with %s transactions', block.id, len(block.transactions))
|
logger.info('Write new block %s with %s transactions',
|
||||||
|
block.id, len(block.transactions))
|
||||||
self.bigchain.write_block(block)
|
self.bigchain.write_block(block)
|
||||||
return block
|
return block
|
||||||
|
|
||||||
@ -142,12 +141,7 @@ def initial():
|
|||||||
|
|
||||||
bigchain = Bigchain()
|
bigchain = Bigchain()
|
||||||
|
|
||||||
return bigchain.connection.run(
|
return bigchain.get_old_transactions()
|
||||||
r.table('backlog')
|
|
||||||
.between([bigchain.me, r.minval],
|
|
||||||
[bigchain.me, r.maxval],
|
|
||||||
index='assignee__transaction_timestamp')
|
|
||||||
.order_by(index=r.asc('assignee__transaction_timestamp')))
|
|
||||||
|
|
||||||
|
|
||||||
def create_pipeline():
|
def create_pipeline():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user