Problem: Tests in class TestBigChainAPI from tests/db/test_bigchain_api.py were disabled

Solution: Fixed the tests that still made sense, and deleted the rest.
This commit is contained in:
z-bowen 2018-07-27 16:10:06 +02:00
parent 0a65cb57ee
commit a231c06b2e
2 changed files with 34 additions and 373 deletions

View File

@ -325,17 +325,20 @@ def _get_height(b):
def inputs(user_pk, b, alice): def inputs(user_pk, b, alice):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
# create blocks with transactions for `USER` to spend # create blocks with transactions for `USER` to spend
for block in range(4): print('Fuck you')
for height in range(1,4):
transactions = [ transactions = [
Transaction.create( Transaction.create(
[alice_pubkey(alice)], [alice_pubkey(alice)],
[([user_pk], 1)], [([user_pk], 1)],
metadata={'msg': random.random()}, metadata={'msg': random.random()},
).sign([alice_privkey(alice)]).to_dict() ).sign([alice_privkey(alice)])
for _ in range(10) for _ in range(10)
] ]
block = Block(app_hash='', height=_get_height(b), transactions=transactions) tx_ids = [tx.id for tx in transactions]
block = Block(app_hash='hash'+str(height), height=height, transactions=tx_ids)
b.store_block(block._asdict()) b.store_block(block._asdict())
b.store_bulk_transactions(transactions)
@pytest.fixture @pytest.fixture

View File

@ -7,196 +7,73 @@ from base58 import b58decode
pytestmark = pytest.mark.bdb pytestmark = pytest.mark.bdb
@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
'exceptions while running the tests. The problem seems to *not* '
'interfere with the correctness of the tests. ')
def test_remove_unclosed_sockets():
pass
class TestBigchainApi(object): class TestBigchainApi(object):
@pytest.mark.genesis @pytest.mark.genesis
def test_get_last_voted_block_cyclic_blockchain(self, b, monkeypatch, alice): @pytest.mark.tendermint
from bigchaindb.common.crypto import PrivateKey def test_get_spent_with_double_inclusion_detected(self, b, alice):
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.common.utils import serialize
from bigchaindb.models import Transaction
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key])
monkeypatch.setattr('time.time', lambda: 1)
block1 = b.create_block([tx])
b.write_block(block1)
# Manipulate vote to create a cyclic Blockchain
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
vote['vote']['previous_block'] = block1.id
vote_data = serialize(vote['vote'])
vote['signature'] = PrivateKey(alice.private_key).sign(vote_data.encode())
b.write_vote(vote)
with pytest.raises(CyclicBlockchainError):
b.get_last_voted_block()
@pytest.mark.genesis
def test_try_voting_while_constructing_cyclic_blockchain(self, b,
monkeypatch, alice):
from bigchaindb.common.exceptions import CyclicBlockchainError
from bigchaindb.models import Transaction
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key])
block1 = b.create_block([tx])
# We can simply submit twice the same block id and check if `Bigchain`
# throws
with pytest.raises(CyclicBlockchainError):
b.vote(block1.id, block1.id, True)
@pytest.mark.genesis
def test_has_previous_vote_when_already_voted(self, b, monkeypatch, alice):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.backend.exceptions import OperationError
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)]) tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key]) tx = tx.sign([alice.private_key])
monkeypatch.setattr('time.time', lambda: 1) b.store_bulk_transactions([tx])
block = b.create_block([tx])
b.write_block(block)
assert b.has_previous_vote(block.id) is False
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
assert b.has_previous_vote(block.id) is True
@pytest.mark.genesis
def test_get_spent_with_double_inclusion_detected(self, b, monkeypatch, alice):
from bigchaindb.exceptions import CriticalDoubleInclusion
from bigchaindb.models import Transaction
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key])
monkeypatch.setattr('time.time', lambda: 1000000000)
block1 = b.create_block([tx])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 1000000020)
transfer_tx = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)], transfer_tx = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)],
asset_id=tx.id) asset_id=tx.id)
transfer_tx = transfer_tx.sign([alice.private_key]) transfer_tx = transfer_tx.sign([alice.private_key])
block2 = b.create_block([transfer_tx]) b.store_bulk_transactions([transfer_tx])
b.write_block(block2)
monkeypatch.setattr('time.time', lambda: 1000000030)
transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)], transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)],
asset_id=tx.id) asset_id=tx.id)
transfer_tx2 = transfer_tx2.sign([alice.private_key]) transfer_tx2 = transfer_tx2.sign([alice.private_key])
block3 = b.create_block([transfer_tx2]) with pytest.raises(OperationError):
b.write_block(block3) b.store_bulk_transactions([transfer_tx2])
# Vote both block2 and block3 valid
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
vote = b.vote(block3.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
with pytest.raises(CriticalDoubleInclusion):
b.get_spent(tx.id, 0)
@pytest.mark.genesis @pytest.mark.genesis
def test_get_spent_with_double_spend_detected(self, b, monkeypatch, alice): @pytest.mark.tendermint
from bigchaindb.exceptions import CriticalDoubleSpend def test_get_spent_with_double_spend_detected(self, b, alice):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.common.exceptions import DoubleSpend
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)]) tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key]) tx = tx.sign([alice.private_key])
monkeypatch.setattr('time.time', lambda: 1000000000) b.store_bulk_transactions([tx])
block1 = b.create_block([tx])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 1000000020)
transfer_tx = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)], transfer_tx = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 1)],
asset_id=tx.id) asset_id=tx.id)
transfer_tx = transfer_tx.sign([alice.private_key]) transfer_tx = transfer_tx.sign([alice.private_key])
block2 = b.create_block([transfer_tx])
b.write_block(block2)
monkeypatch.setattr('time.time', lambda: 1000000030)
transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 2)], transfer_tx2 = Transaction.transfer(tx.to_inputs(), [([alice.public_key], 2)],
asset_id=tx.id) asset_id=tx.id)
transfer_tx2 = transfer_tx2.sign([alice.private_key]) transfer_tx2 = transfer_tx2.sign([alice.private_key])
block3 = b.create_block([transfer_tx2])
b.write_block(block3)
# Vote both block2 and block3 valid with pytest.raises(DoubleSpend):
vote = b.vote(block2.id, b.get_last_voted_block().id, True) b.validate_transaction(transfer_tx2, [transfer_tx])
b.write_vote(vote)
vote = b.vote(block3.id, b.get_last_voted_block().id, True) b.store_bulk_transactions([transfer_tx])
b.write_vote(vote)
with pytest.raises(DoubleSpend):
b.validate_transaction(transfer_tx2)
with pytest.raises(CriticalDoubleSpend):
b.get_spent(tx.id, 0)
@pytest.mark.genesis @pytest.mark.genesis
def test_get_block_status_for_tx_with_double_inclusion(self, b, monkeypatch, alice): @pytest.mark.tendermint
from bigchaindb.exceptions import CriticalDoubleInclusion def test_get_block_status_for_tx_with_double_inclusion(self, b, alice):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.backend.exceptions import OperationError
tx = Transaction.create([alice.public_key], [([alice.public_key], 1)]) tx = Transaction.create([alice.public_key], [([alice.public_key], 1)])
tx = tx.sign([alice.private_key]) tx = tx.sign([alice.private_key])
monkeypatch.setattr('time.time', lambda: 1000000000) b.store_bulk_transactions([tx])
block1 = b.create_block([tx])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 1000000020) with pytest.raises(OperationError):
block2 = b.create_block([tx]) b.store_bulk_transactions([tx])
b.write_block(block2)
# Vote both blocks valid (creating a double spend)
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
with pytest.raises(CriticalDoubleInclusion):
b.get_blocks_status_containing_tx(tx.id)
@pytest.mark.genesis
def test_get_transaction_in_invalid_and_valid_block(self, monkeypatch, b, alice):
from bigchaindb.models import Transaction
monkeypatch.setattr('time.time', lambda: 1000000000)
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
metadata={'msg': 1})
tx1 = tx1.sign([alice.private_key])
block1 = b.create_block([tx1])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 1000000020)
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
metadata={'msg': 2})
tx2 = tx2.sign([alice.private_key])
block2 = b.create_block([tx2])
b.write_block(block2)
# vote the first block invalid
vote = b.vote(block1.id, b.get_last_voted_block().id, False)
b.write_vote(vote)
# vote the second block valid
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
assert b.get_transaction(tx1.id) is None
assert b.get_transaction(tx2.id) == tx2
@pytest.mark.genesis @pytest.mark.genesis
@pytest.mark.tendermint
def test_text_search(self, b, alice): def test_text_search(self, b, alice):
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb.backend.exceptions import OperationError from bigchaindb.backend.exceptions import OperationError
@ -216,12 +93,7 @@ class TestBigchainApi(object):
asset=asset3).sign([alice.private_key]) asset=asset3).sign([alice.private_key])
# create the block # create the block
block = b.create_block([tx1, tx2, tx3]) b.store_bulk_transactions([tx1, tx2, tx3])
b.write_block(block)
# vote valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
# get the assets through text search # get the assets through text search
# this query only works with MongoDB # this query only works with MongoDB
@ -232,214 +104,13 @@ class TestBigchainApi(object):
else: else:
assert len(assets) == 3 assert len(assets) == 3
@pytest.mark.genesis
def test_text_search_returns_valid_only(self, monkeypatch, b, alice):
from bigchaindb.models import Transaction
from bigchaindb.backend.exceptions import OperationError
from bigchaindb.backend.localmongodb.connection import LocalMongoDBConnection
asset_valid = {'msg': 'Hello BigchainDB!'}
asset_invalid = {'msg': 'Goodbye BigchainDB!'}
monkeypatch.setattr('time.time', lambda: 1000000000)
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset_valid)
tx1 = tx1.sign([alice.private_key])
block1 = b.create_block([tx1])
b.write_block(block1)
monkeypatch.setattr('time.time', lambda: 1000000020)
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset_invalid)
tx2 = tx2.sign([alice.private_key])
block2 = b.create_block([tx2])
b.write_block(block2)
# vote the first block valid
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
# vote the second block invalid
vote = b.vote(block2.id, b.get_last_voted_block().id, False)
b.write_vote(vote)
# get assets with text search
try:
assets = list(b.text_search('bigchaindb'))
except OperationError:
assert not isinstance(b.connection, LocalMongoDBConnection)
return
# should only return one asset
assert len(assets) == 1
# should return the asset created by tx1
assert assets[0] == {
'data': {'msg': 'Hello BigchainDB!'},
'id': tx1.id
}
@pytest.mark.usefixtures('inputs')
def test_write_transaction(self, b, user_pk, user_sk):
from bigchaindb import BigchainDB
from bigchaindb.models import Transaction
input_tx = b.get_owned_ids(user_pk).pop()
input_tx = b.get_transaction(input_tx.txid)
inputs = input_tx.to_inputs()
tx = Transaction.transfer(inputs, [([user_pk], 1)],
asset_id=input_tx.id)
tx = tx.sign([user_sk])
b.write_transaction(tx)
tx_from_db, status = b.get_transaction(tx.id, include_status=True)
assert tx_from_db.to_dict() == tx.to_dict()
assert status == BigchainDB.TX_IN_BACKLOG
@pytest.mark.usefixtures('inputs')
def test_read_transaction(self, b, user_pk, user_sk):
from bigchaindb.models import Transaction
input_tx = b.get_owned_ids(user_pk).pop()
input_tx = b.get_transaction(input_tx.txid)
inputs = input_tx.to_inputs()
tx = Transaction.transfer(inputs, [([user_pk], 1)],
asset_id=input_tx.id)
tx = tx.sign([user_sk])
b.write_transaction(tx)
# create block and write it to the bighcain before retrieving the transaction
block = b.create_block([tx])
b.write_block(block)
response, status = b.get_transaction(tx.id, include_status=True)
# add validity information, which will be returned
assert tx.to_dict() == response.to_dict()
assert status == b.TX_UNDECIDED
@pytest.mark.usefixtures('inputs')
def test_read_transaction_invalid_block(self, b, user_pk, user_sk):
from bigchaindb.models import Transaction
input_tx = b.get_owned_ids(user_pk).pop()
input_tx = b.get_transaction(input_tx.txid)
inputs = input_tx.to_inputs()
tx = Transaction.transfer(inputs, [([user_pk], 1)],
asset_id=input_tx.id)
tx = tx.sign([user_sk])
# There's no need to b.write_transaction(tx) to the backlog
# create block
block = b.create_block([tx])
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
b.write_vote(vote)
response = b.get_transaction(tx.id)
# should be None, because invalid blocks are ignored
# and a copy of the tx is not in the backlog
assert response is None
@pytest.mark.usefixtures('inputs')
def test_read_transaction_invalid_block_and_backlog(self, b, user_pk, user_sk):
from bigchaindb.models import Transaction
input_tx = b.get_owned_ids(user_pk).pop()
input_tx = b.get_transaction(input_tx.txid)
inputs = input_tx.to_inputs()
tx = Transaction.transfer(inputs, [([user_pk], 1)],
asset_id=input_tx.id)
tx = tx.sign([user_sk])
# Make sure there's a copy of tx in the backlog
b.write_transaction(tx)
# create block
block = b.create_block([tx])
b.write_block(block)
# vote the block invalid
vote = b.vote(block.id, b.get_last_voted_block().id, False)
b.write_vote(vote)
# a copy of the tx is both in the backlog and in an invalid
# block, so get_transaction should return a transaction,
# and a status of TX_IN_BACKLOG
response, status = b.get_transaction(tx.id, include_status=True)
assert tx.to_dict() == response.to_dict()
assert status == b.TX_IN_BACKLOG
@pytest.mark.usefixtures('inputs')
def test_genesis_block(self, b):
from bigchaindb.backend import query
block = query.get_genesis_block(b.connection)
assert len(block['block']['transactions']) == 1
assert block['block']['transactions'][0]['operation'] == 'GENESIS'
assert block['block']['transactions'][0]['inputs'][0]['fulfills'] is None
@pytest.mark.genesis
def test_create_genesis_block_fails_if_table_not_empty(self, b):
from bigchaindb.common.exceptions import GenesisBlockAlreadyExistsError
with pytest.raises(GenesisBlockAlreadyExistsError):
b.create_genesis_block()
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
def test_get_last_block(self, b):
from bigchaindb.backend import query
# get the number of blocks
num_blocks = query.count_blocks(b.connection)
# get the last block
last_block = b.get_last_block()
assert last_block['block']['block_number'] == num_blocks - 1
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
def test_get_last_block_id(self, b):
last_block = b.get_last_block()
last_block_id = b.get_last_block_id()
assert last_block_id == last_block['id']
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
def test_get_previous_block(self, b):
last_block = b.get_last_block()
new_block = b.create_block([])
b.write_block(new_block)
prev_block = b.get_previous_block(new_block)
assert prev_block == last_block
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
def test_get_previous_block_id(self, b):
last_block = b.get_last_block()
new_block = b.create_block([])
b.write_block(new_block)
prev_block_id = b.get_previous_block_id(new_block)
assert prev_block_id == last_block['id']
def test_create_empty_block(self, b):
from bigchaindb.common.exceptions import OperationError
with pytest.raises(OperationError) as excinfo:
b.create_block([])
assert excinfo.value.args[0] == 'Empty block creation is not allowed'
@pytest.mark.usefixtures('inputs') @pytest.mark.usefixtures('inputs')
@pytest.mark.tendermint
def test_non_create_input_not_found(self, b, user_pk): def test_non_create_input_not_found(self, b, user_pk):
from cryptoconditions import Ed25519Sha256 from cryptoconditions import Ed25519Sha256
from bigchaindb.common.exceptions import InputDoesNotExist from bigchaindb.common.exceptions import InputDoesNotExist
from bigchaindb.common.transaction import Input, TransactionLink from bigchaindb.common.transaction import Input, TransactionLink
from bigchaindb.models import Transaction from bigchaindb.models import Transaction
from bigchaindb import BigchainDB
# Create an input for a non existing transaction # Create an input for a non existing transaction
input = Input(Ed25519Sha256(public_key=b58decode(user_pk)), input = Input(Ed25519Sha256(public_key=b58decode(user_pk)),
@ -447,21 +118,8 @@ class TestBigchainApi(object):
TransactionLink('somethingsomething', 0)) TransactionLink('somethingsomething', 0))
tx = Transaction.transfer([input], [([user_pk], 1)], tx = Transaction.transfer([input], [([user_pk], 1)],
asset_id='mock_asset_link') asset_id='mock_asset_link')
with pytest.raises(InputDoesNotExist): with pytest.raises(InputDoesNotExist):
tx.validate(BigchainDB()) tx.validate(b)
def test_count_backlog(self, b, user_pk, alice):
from bigchaindb.backend import query
from bigchaindb.models import Transaction
for i in range(4):
tx = Transaction.create([alice.public_key], [([user_pk], 1)],
metadata={'msg': i}) \
.sign([alice.private_key])
b.write_transaction(tx)
assert query.count_backlog(b.connection) == 4
class TestTransactionValidation(object): class TestTransactionValidation(object):