remove old tx recovery on block process start (#903)

This commit is contained in:
Ryan Henderson 2016-12-14 12:33:47 +01:00 committed by Brett Sun
parent dcede16aae
commit f725279a51
3 changed files with 19 additions and 39 deletions

View File

@ -7,7 +7,6 @@ function.
import logging
import rethinkdb as r
from multipipes import Pipeline, Node, Pipe
from bigchaindb.models import Transaction
@ -134,24 +133,10 @@ class BlockPipeline:
return block
def initial():
"""Return old transactions from the backlog."""
bigchain = Bigchain()
return bigchain.connection.run(
r.table('backlog')
.between([bigchain.me, r.minval],
[bigchain.me, r.maxval],
index='assignee__transaction_timestamp')
.order_by(index=r.asc('assignee__transaction_timestamp')))
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
def create_pipeline():

View File

@ -130,22 +130,6 @@ def test_delete_tx(b, user_pk):
assert status != b.TX_IN_BACKLOG
def test_prefeed(b, user_pk):
import random
from bigchaindb.models import Transaction
from bigchaindb.pipelines.block import initial
for i in range(100):
tx = Transaction.create([b.me], [([user_pk], 1)],
{'msg': random.random()})
tx = tx.sign([b.me_private])
b.write_transaction(tx)
backlog = initial()
assert len(list(backlog)) == 100
@patch('bigchaindb.pipelines.block.create_pipeline')
def test_start(create_pipeline):
from bigchaindb.pipelines import block
@ -164,28 +148,39 @@ def test_full_pipeline(b, user_pk):
from bigchaindb.pipelines.block import create_pipeline, get_changefeed
outpipe = Pipe()
pipeline = create_pipeline()
pipeline.setup(outdata=outpipe)
inpipe = pipeline.items[0]
# include myself here, so that some tx are actually assigned to me
b.nodes_except_me = [b.me, 'aaa', 'bbb', 'ccc']
number_assigned_to_others = 0
for i in range(100):
tx = Transaction.create([b.me], [([user_pk], 1)],
{'msg': random.random()})
tx = tx.sign([b.me_private])
b.write_transaction(tx)
tx = tx.to_dict()
assert query.count_backlog(b.connection) == 100
# simulate write_transaction
tx['assignee'] = random.choice(b.nodes_except_me)
if tx['assignee'] != b.me:
number_assigned_to_others += 1
tx['assignment_timestamp'] = time.time()
inpipe.put(tx)
assert inpipe.qsize() == 100
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(2)
pipeline.terminate()
pipeline.terminate()
block_doc = outpipe.get()
chained_block = b.get_block(block_doc.id)
chained_block = Block.from_dict(chained_block)
block_len = len(block_doc.transactions)
assert chained_block == block_doc
assert query.count_backlog(b.connection) == 100 - block_len
assert number_assigned_to_others == 100 - block_len

View File

@ -118,8 +118,8 @@ def test_check_requeue_transaction(b, user_pk):
e.requeue_transactions(test_block)
time.sleep(1)
backlog_tx, status = b.get_transaction(tx1.id, include_status=True)
#backlog_tx = b.connection.run(r.table('backlog').get(tx1.id))
assert status == b.TX_IN_BACKLOG
assert backlog_tx == tx1