From f725279a5197ade09b3746f3b91e67dfb00a0cf4 Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Wed, 14 Dec 2016 12:33:47 +0100 Subject: [PATCH] remove old tx recovery on block process start (#903) --- bigchaindb/pipelines/block.py | 17 +---------- tests/pipelines/test_block_creation.py | 39 +++++++++++--------------- tests/pipelines/test_election.py | 2 +- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 0394aa23..ce31eb36 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -7,7 +7,6 @@ function. import logging -import rethinkdb as r from multipipes import Pipeline, Node, Pipe from bigchaindb.models import Transaction @@ -134,24 +133,10 @@ class BlockPipeline: return block -def initial(): - """Return old transactions from the backlog.""" - - bigchain = Bigchain() - - return bigchain.connection.run( - r.table('backlog') - .between([bigchain.me, r.minval], - [bigchain.me, r.maxval], - index='assignee__transaction_timestamp') - .order_by(index=r.asc('assignee__transaction_timestamp'))) - - def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, - prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) def create_pipeline(): diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 0616365d..544345e1 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -130,22 +130,6 @@ def test_delete_tx(b, user_pk): assert status != b.TX_IN_BACKLOG -def test_prefeed(b, user_pk): - import random - from bigchaindb.models import Transaction - from bigchaindb.pipelines.block import initial - - for i in range(100): - tx = Transaction.create([b.me], [([user_pk], 1)], - {'msg': random.random()}) - tx = tx.sign([b.me_private]) - b.write_transaction(tx) - - backlog = initial() - - assert len(list(backlog)) == 100 - - @patch('bigchaindb.pipelines.block.create_pipeline') def test_start(create_pipeline): from bigchaindb.pipelines import block @@ -164,28 +148,39 @@ def test_full_pipeline(b, user_pk): from bigchaindb.pipelines.block import create_pipeline, get_changefeed outpipe = Pipe() + + pipeline = create_pipeline() + pipeline.setup(outdata=outpipe) + inpipe = pipeline.items[0] + # include myself here, so that some tx are actually assigned to me b.nodes_except_me = [b.me, 'aaa', 'bbb', 'ccc'] + number_assigned_to_others = 0 for i in range(100): tx = Transaction.create([b.me], [([user_pk], 1)], {'msg': random.random()}) tx = tx.sign([b.me_private]) - b.write_transaction(tx) + tx = tx.to_dict() - assert query.count_backlog(b.connection) == 100 + # simulate write_transaction + tx['assignee'] = random.choice(b.nodes_except_me) + if tx['assignee'] != b.me: + number_assigned_to_others += 1 + tx['assignment_timestamp'] = time.time() + inpipe.put(tx) + + assert inpipe.qsize() == 100 - pipeline = create_pipeline() - pipeline.setup(indata=get_changefeed(), outdata=outpipe) pipeline.start() time.sleep(2) - pipeline.terminate() + pipeline.terminate() block_doc = outpipe.get() chained_block = b.get_block(block_doc.id) chained_block = Block.from_dict(chained_block) block_len = len(block_doc.transactions) assert chained_block == block_doc - assert query.count_backlog(b.connection) == 100 - block_len + assert number_assigned_to_others == 100 - block_len diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index f9fee0bb..5efe3d46 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -118,8 +118,8 @@ def test_check_requeue_transaction(b, user_pk): e.requeue_transactions(test_block) + time.sleep(1) backlog_tx, status = b.get_transaction(tx1.id, include_status=True) - #backlog_tx = b.connection.run(r.table('backlog').get(tx1.id)) assert status == b.TX_IN_BACKLOG assert backlog_tx == tx1