mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
29 lines
758 B
Python
29 lines
758 B
Python
import pytest
|
|
import random
|
|
|
|
|
|
@pytest.mark.bdb
|
|
@pytest.mark.genesis
|
|
def test_stepping_changefeed_produces_update(b, steps):
|
|
tx = input_single_create(b)
|
|
|
|
# timeouts are 0 so will reassign immediately
|
|
steps.stale_check_transactions()
|
|
steps.stale_reassign_transactions()
|
|
|
|
# We expect 2 changefeed events
|
|
steps.block_changefeed()
|
|
steps.block_changefeed()
|
|
|
|
assert steps.counts == {'block_filter_tx': 2}
|
|
assert ([tx['id'] for (tx,) in steps.queues['block_filter_tx']] ==
|
|
[tx.id, tx.id])
|
|
|
|
|
|
def input_single_create(b):
|
|
from bigchaindb.common.transaction import Transaction
|
|
metadata = {'r': random.random()}
|
|
tx = Transaction.create([b.me], [([b.me], 1)], metadata)
|
|
b.write_transaction(tx)
|
|
return tx
|