mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00

Replaced SigningKey with PrivateKey Replaced all occurences of signing key with private key Replaced all occurences of verifying key with public key
1324 lines
52 KiB
Python
1324 lines
52 KiB
Python
from time import sleep
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
|
|
'exceptions while running the tests. The problem seems to *not* '
|
|
'interfere with the correctness of the tests. ')
|
|
def test_remove_unclosed_sockets():
|
|
pass
|
|
|
|
|
|
# TODO: Get rid of this and move to conftest
|
|
def dummy_tx():
|
|
import bigchaindb
|
|
from bigchaindb.models import Transaction
|
|
b = bigchaindb.Bigchain()
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
return tx
|
|
|
|
|
|
# TODO: Get rid of this and move to conftest
|
|
def dummy_block():
|
|
import bigchaindb
|
|
b = bigchaindb.Bigchain()
|
|
block = b.create_block([dummy_tx()])
|
|
return block
|
|
|
|
|
|
class TestBigchainApi(object):
|
|
def test_get_last_voted_block_cyclic_blockchain(self, b, monkeypatch):
|
|
from bigchaindb.common.crypto import PrivateKey
|
|
from bigchaindb.common.exceptions import CyclicBlockchainError
|
|
from bigchaindb.common.util import serialize
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block1 = b.create_block([tx])
|
|
b.write_block(block1, durability='hard')
|
|
|
|
# Manipulate vote to create a cyclic Blockchain
|
|
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
|
|
vote['vote']['previous_block'] = block1.id
|
|
vote_data = serialize(vote['vote'])
|
|
vote['signature'] = PrivateKey(b.me_private).sign(vote_data.encode())
|
|
b.write_vote(vote)
|
|
|
|
with pytest.raises(CyclicBlockchainError):
|
|
b.get_last_voted_block()
|
|
|
|
def test_try_voting_while_constructing_cyclic_blockchain(self, b,
|
|
monkeypatch):
|
|
from bigchaindb.common.exceptions import CyclicBlockchainError
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
block1 = b.create_block([tx])
|
|
|
|
# We can simply submit twice the same block id and check if `Bigchain`
|
|
# throws
|
|
with pytest.raises(CyclicBlockchainError):
|
|
b.vote(block1.id, block1.id, True)
|
|
|
|
def test_has_previous_vote_when_already_voted(self, b, monkeypatch):
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
assert b.has_previous_vote(block.id, block.voters) is False
|
|
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
assert b.has_previous_vote(block.id, block.voters) is True
|
|
|
|
|
|
def test_get_transactions_for_metadata_mismatch(self, b):
|
|
matches = b.get_tx_by_metadata_id('missing')
|
|
assert not matches
|
|
|
|
def test_get_spent_with_double_spend(self, b, monkeypatch):
|
|
from bigchaindb.common.exceptions import DoubleSpend
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block1 = b.create_block([tx])
|
|
b.write_block(block1, durability='hard')
|
|
|
|
monkeypatch.setattr('time.time', lambda: 2)
|
|
transfer_tx = Transaction.transfer(tx.to_inputs(), [b.me], tx.asset)
|
|
transfer_tx = transfer_tx.sign([b.me_private])
|
|
block2 = b.create_block([transfer_tx])
|
|
b.write_block(block2, durability='hard')
|
|
|
|
monkeypatch.setattr('time.time', lambda: 3)
|
|
transfer_tx2 = Transaction.transfer(tx.to_inputs(), [b.me], tx.asset)
|
|
transfer_tx2 = transfer_tx2.sign([b.me_private])
|
|
block3 = b.create_block([transfer_tx2])
|
|
b.write_block(block3, durability='hard')
|
|
|
|
# Vote both block2 and block3 valid to provoke a double spend
|
|
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
vote = b.vote(block3.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
with pytest.raises(DoubleSpend):
|
|
b.get_spent(tx.id, 0)
|
|
|
|
def test_get_block_status_for_tx_with_double_spend(self, b, monkeypatch):
|
|
from bigchaindb.common.exceptions import DoubleSpend
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
tx = Transaction.create([b.me], [b.me])
|
|
tx = tx.sign([b.me_private])
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block1 = b.create_block([tx])
|
|
b.write_block(block1, durability='hard')
|
|
|
|
monkeypatch.setattr('time.time', lambda: 2)
|
|
block2 = b.create_block([tx])
|
|
b.write_block(block2, durability='hard')
|
|
|
|
# Vote both blocks valid (creating a double spend)
|
|
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
with pytest.raises(DoubleSpend):
|
|
b.get_blocks_status_containing_tx(tx.id)
|
|
|
|
def test_get_transaction_in_invalid_and_valid_block(self, monkeypatch, b):
|
|
from bigchaindb.models import Transaction
|
|
|
|
b.create_genesis_block()
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
tx1 = Transaction.create([b.me], [b.me])
|
|
tx1 = tx1.sign([b.me_private])
|
|
block1 = b.create_block([tx1])
|
|
b.write_block(block1, durability='hard')
|
|
|
|
monkeypatch.setattr('time.time', lambda: 2)
|
|
tx2 = Transaction.create([b.me], [b.me])
|
|
tx2 = tx2.sign([b.me_private])
|
|
block2 = b.create_block([tx2])
|
|
b.write_block(block2, durability='hard')
|
|
|
|
# vote the first block invalid
|
|
vote = b.vote(block1.id, b.get_last_voted_block().id, False)
|
|
b.write_vote(vote)
|
|
|
|
# vote the second block valid
|
|
vote = b.vote(block2.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
assert b.get_transaction(tx1.id) is None
|
|
assert b.get_transaction(tx2.id) == tx2
|
|
|
|
def test_get_transactions_for_metadata(self, b, user_vk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
metadata = {'msg': 'Hello BigchainDB!'}
|
|
tx = Transaction.create([b.me], [user_vk], metadata=metadata)
|
|
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
matches = b.get_tx_by_payload_uuid(tx.metadata.data_id)
|
|
assert len(matches) == 1
|
|
assert matches[0].id == tx.id
|
|
|
|
def test_get_transactions_for_metadata(self, b, user_vk):
|
|
matches = b.get_tx_by_metadata_id('missing')
|
|
assert not matches
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_write_transaction(self, b, user_vk, user_sk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
response = b.write_transaction(tx)
|
|
|
|
assert response['skipped'] == 0
|
|
assert response['deleted'] == 0
|
|
assert response['unchanged'] == 0
|
|
assert response['errors'] == 0
|
|
assert response['replaced'] == 0
|
|
assert response['inserted'] == 1
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_read_transaction(self, b, user_vk, user_sk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
b.write_transaction(tx)
|
|
|
|
# create block and write it to the bighcain before retrieving the transaction
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
response, status = b.get_transaction(tx.id, include_status=True)
|
|
# add validity information, which will be returned
|
|
assert tx.to_dict() == response.to_dict()
|
|
assert status == b.TX_UNDECIDED
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_read_transaction_invalid_block(self, b, user_vk, user_sk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
# There's no need to b.write_transaction(tx) to the backlog
|
|
|
|
# create block
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block invalid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, False)
|
|
b.write_vote(vote)
|
|
response = b.get_transaction(tx.id)
|
|
|
|
# should be None, because invalid blocks are ignored
|
|
# and a copy of the tx is not in the backlog
|
|
assert response is None
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_read_transaction_invalid_block_and_backlog(self, b, user_vk, user_sk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
|
|
# Make sure there's a copy of tx in the backlog
|
|
b.write_transaction(tx)
|
|
|
|
# create block
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block invalid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, False)
|
|
b.write_vote(vote)
|
|
|
|
# a copy of the tx is both in the backlog and in an invalid
|
|
# block, so get_transaction should return a transaction,
|
|
# and a status of TX_IN_BACKLOG
|
|
response, status = b.get_transaction(tx.id, include_status=True)
|
|
assert tx.to_dict() == response.to_dict()
|
|
assert status == b.TX_IN_BACKLOG
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_genesis_block(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.util import is_genesis_block
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
response = list(r.table('bigchain')
|
|
.filter(is_genesis_block)
|
|
.run(get_conn()))
|
|
|
|
assert len(response) == 1
|
|
block = response[0]
|
|
assert len(block['block']['transactions']) == 1
|
|
assert block['block']['transactions'][0]['transaction']['operation'] == 'GENESIS'
|
|
assert block['block']['transactions'][0]['transaction']['fulfillments'][0]['input'] is None
|
|
|
|
def test_create_genesis_block_fails_if_table_not_empty(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.common.exceptions import GenesisBlockAlreadyExistsError
|
|
from bigchaindb.util import is_genesis_block
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
b.create_genesis_block()
|
|
|
|
with pytest.raises(GenesisBlockAlreadyExistsError):
|
|
b.create_genesis_block()
|
|
|
|
genesis_blocks = list(r.table('bigchain')
|
|
.filter(is_genesis_block)
|
|
.run(get_conn()))
|
|
|
|
assert len(genesis_blocks) == 1
|
|
|
|
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
|
|
def test_get_last_block(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
# get the number of blocks
|
|
num_blocks = r.table('bigchain').count().run(get_conn())
|
|
|
|
# get the last block
|
|
last_block = b.get_last_block()
|
|
|
|
assert last_block['block']['block_number'] == num_blocks - 1
|
|
|
|
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
|
|
def test_get_last_block_id(self, b):
|
|
last_block = b.get_last_block()
|
|
last_block_id = b.get_last_block_id()
|
|
|
|
assert last_block_id == last_block['id']
|
|
|
|
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
|
|
def test_get_previous_block(self, b):
|
|
last_block = b.get_last_block()
|
|
new_block = b.create_block([])
|
|
b.write_block(new_block, durability='hard')
|
|
|
|
prev_block = b.get_previous_block(new_block)
|
|
|
|
assert prev_block == last_block
|
|
|
|
@pytest.mark.skipif(reason='This test may not make sense after changing the chainification mode')
|
|
def test_get_previous_block_id(self, b):
|
|
last_block = b.get_last_block()
|
|
new_block = b.create_block([])
|
|
b.write_block(new_block, durability='hard')
|
|
|
|
prev_block_id = b.get_previous_block_id(new_block)
|
|
|
|
assert prev_block_id == last_block['id']
|
|
|
|
def test_create_empty_block(self, b):
|
|
from bigchaindb.common.exceptions import OperationError
|
|
|
|
with pytest.raises(OperationError) as excinfo:
|
|
b.create_block([])
|
|
|
|
assert excinfo.value.args[0] == 'Empty block creation is not allowed'
|
|
|
|
def test_get_last_voted_block_returns_genesis_if_no_votes_has_been_casted(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb import util
|
|
from bigchaindb.models import Block
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
b.create_genesis_block()
|
|
genesis = list(r.table('bigchain')
|
|
.filter(util.is_genesis_block)
|
|
.run(get_conn()))[0]
|
|
genesis = Block.from_dict(genesis)
|
|
gb = b.get_last_voted_block()
|
|
assert gb == genesis
|
|
assert b.validate_block(gb) == gb
|
|
|
|
def test_get_last_voted_block_returns_the_correct_block_same_timestamp(self, b, monkeypatch):
|
|
genesis = b.create_genesis_block()
|
|
|
|
assert b.get_last_voted_block() == genesis
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block_1 = dummy_block()
|
|
monkeypatch.setattr('time.time', lambda: 2)
|
|
block_2 = dummy_block()
|
|
monkeypatch.setattr('time.time', lambda: 3)
|
|
block_3 = dummy_block()
|
|
|
|
b.write_block(block_1, durability='hard')
|
|
b.write_block(block_2, durability='hard')
|
|
b.write_block(block_3, durability='hard')
|
|
|
|
# make sure all the votes are written with the same timestamps
|
|
monkeypatch.setattr('time.time', lambda: 4)
|
|
b.write_vote(b.vote(block_1.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_1.id
|
|
|
|
b.write_vote(b.vote(block_2.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_2.id
|
|
|
|
b.write_vote(b.vote(block_3.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_3.id
|
|
|
|
def test_get_last_voted_block_returns_the_correct_block_different_timestamps(self, b, monkeypatch):
|
|
genesis = b.create_genesis_block()
|
|
|
|
assert b.get_last_voted_block() == genesis
|
|
|
|
monkeypatch.setattr('time.time', lambda: 1)
|
|
block_1 = dummy_block()
|
|
monkeypatch.setattr('time.time', lambda: 2)
|
|
block_2 = dummy_block()
|
|
monkeypatch.setattr('time.time', lambda: 3)
|
|
block_3 = dummy_block()
|
|
|
|
b.write_block(block_1, durability='hard')
|
|
b.write_block(block_2, durability='hard')
|
|
b.write_block(block_3, durability='hard')
|
|
|
|
# make sure all the votes are written with different timestamps
|
|
monkeypatch.setattr('time.time', lambda: 4)
|
|
b.write_vote(b.vote(block_1.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_1.id
|
|
|
|
monkeypatch.setattr('time.time', lambda: 5)
|
|
b.write_vote(b.vote(block_2.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_2.id
|
|
|
|
monkeypatch.setattr('time.time', lambda: 6)
|
|
b.write_vote(b.vote(block_3.id, b.get_last_voted_block().id, True))
|
|
assert b.get_last_voted_block().id == block_3.id
|
|
|
|
def test_no_vote_written_if_block_already_has_vote(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.models import Block
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
genesis = b.create_genesis_block()
|
|
block_1 = dummy_block()
|
|
b.write_block(block_1, durability='hard')
|
|
|
|
b.write_vote(b.vote(block_1.id, genesis.id, True))
|
|
retrieved_block_1 = r.table('bigchain').get(block_1.id).run(get_conn())
|
|
retrieved_block_1 = Block.from_dict(retrieved_block_1)
|
|
|
|
# try to vote again on the retrieved block, should do nothing
|
|
b.write_vote(b.vote(retrieved_block_1.id, genesis.id, True))
|
|
retrieved_block_2 = r.table('bigchain').get(block_1.id).run(get_conn())
|
|
retrieved_block_2 = Block.from_dict(retrieved_block_2)
|
|
|
|
assert retrieved_block_1 == retrieved_block_2
|
|
|
|
def test_more_votes_than_voters(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.common.exceptions import MultipleVotesError
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
b.create_genesis_block()
|
|
block_1 = dummy_block()
|
|
b.write_block(block_1, durability='hard')
|
|
# insert duplicate votes
|
|
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
|
vote_2 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
|
vote_2['node_pubkey'] = 'aaaaaaa'
|
|
r.table('votes').insert(vote_1).run(get_conn())
|
|
r.table('votes').insert(vote_2).run(get_conn())
|
|
|
|
with pytest.raises(MultipleVotesError) as excinfo:
|
|
b.block_election_status(block_1.id, block_1.voters)
|
|
assert excinfo.value.args[0] == 'Block {block_id} has {n_votes} votes cast, but only {n_voters} voters'\
|
|
.format(block_id=block_1.id, n_votes=str(2), n_voters=str(1))
|
|
|
|
def test_multiple_votes_single_node(self, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.common.exceptions import MultipleVotesError
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
genesis = b.create_genesis_block()
|
|
block_1 = dummy_block()
|
|
b.write_block(block_1, durability='hard')
|
|
# insert duplicate votes
|
|
for i in range(2):
|
|
r.table('votes').insert(b.vote(block_1.id, genesis.id, True)).run(get_conn())
|
|
|
|
with pytest.raises(MultipleVotesError) as excinfo:
|
|
b.block_election_status(block_1.id, block_1.voters)
|
|
assert excinfo.value.args[0] == 'Block {block_id} has multiple votes ({n_votes}) from voting node {node_id}'\
|
|
.format(block_id=block_1.id, n_votes=str(2), node_id=b.me)
|
|
|
|
with pytest.raises(MultipleVotesError) as excinfo:
|
|
b.has_previous_vote(block_1.id, block_1.voters)
|
|
assert excinfo.value.args[0] == 'Block {block_id} has {n_votes} votes from public key {me}'\
|
|
.format(block_id=block_1.id, n_votes=str(2), me=b.me)
|
|
|
|
def test_improper_vote_error(selfs, b):
|
|
import rethinkdb as r
|
|
from bigchaindb.common.exceptions import ImproperVoteError
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
b.create_genesis_block()
|
|
block_1 = dummy_block()
|
|
b.write_block(block_1, durability='hard')
|
|
vote_1 = b.vote(block_1.id, b.get_last_voted_block().id, True)
|
|
# mangle the signature
|
|
vote_1['signature'] = 'a' * 87
|
|
r.table('votes').insert(vote_1).run(get_conn())
|
|
with pytest.raises(ImproperVoteError) as excinfo:
|
|
b.has_previous_vote(block_1.id, block_1.id)
|
|
assert excinfo.value.args[0] == 'Block {block_id} already has an incorrectly signed ' \
|
|
'vote from public key {me}'.format(block_id=block_1.id, me=b.me)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_assign_transaction_one_node(self, b, user_vk, user_sk):
|
|
import rethinkdb as r
|
|
from bigchaindb.models import Transaction
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
b.write_transaction(tx)
|
|
|
|
# retrieve the transaction
|
|
response = r.table('backlog').get(tx.id).run(get_conn())
|
|
|
|
# check if the assignee is the current node
|
|
assert response['assignee'] == b.me
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_assign_transaction_multiple_nodes(self, b, user_vk, user_sk):
|
|
import rethinkdb as r
|
|
from bigchaindb.common.crypto import generate_key_pair
|
|
from bigchaindb.models import Transaction
|
|
from bigchaindb.db.utils import get_conn
|
|
|
|
# create 5 federation nodes
|
|
for _ in range(5):
|
|
b.nodes_except_me.append(generate_key_pair()[1])
|
|
|
|
# test assignee for several transactions
|
|
for _ in range(20):
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
b.write_transaction(tx)
|
|
|
|
# retrieve the transaction
|
|
response = r.table('backlog').get(tx.id).run(get_conn())
|
|
|
|
# check if the assignee is one of the _other_ federation nodes
|
|
assert response['assignee'] in b.nodes_except_me
|
|
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_non_create_input_not_found(self, b, user_vk):
|
|
from cryptoconditions import Ed25519Fulfillment
|
|
from bigchaindb.common.exceptions import TransactionDoesNotExist
|
|
from bigchaindb.common.transaction import (Fulfillment, Asset,
|
|
TransactionLink)
|
|
from bigchaindb.models import Transaction
|
|
from bigchaindb import Bigchain
|
|
|
|
# Create a fulfillment for a non existing transaction
|
|
fulfillment = Fulfillment(Ed25519Fulfillment(public_key=user_vk),
|
|
[user_vk],
|
|
TransactionLink('somethingsomething', 0))
|
|
tx = Transaction.transfer([fulfillment], [user_vk], Asset())
|
|
|
|
with pytest.raises(TransactionDoesNotExist) as excinfo:
|
|
tx.validate(Bigchain())
|
|
|
|
|
|
class TestTransactionValidation(object):
|
|
def test_create_operation_with_inputs(self, b, user_vk, create_tx):
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
|
|
# Manipulate fulfillment so that it has a `tx_input` defined even
|
|
# though it shouldn't have one
|
|
create_tx.fulfillments[0].tx_input = TransactionLink('abc', 0)
|
|
with pytest.raises(ValueError) as excinfo:
|
|
b.validate_transaction(create_tx)
|
|
assert excinfo.value.args[0] == 'A CREATE operation has no inputs'
|
|
|
|
def test_transfer_operation_no_inputs(self, b, user_vk,
|
|
signed_transfer_tx):
|
|
signed_transfer_tx.fulfillments[0].tx_input = None
|
|
with pytest.raises(ValueError) as excinfo:
|
|
b.validate_transaction(signed_transfer_tx)
|
|
|
|
assert excinfo.value.args[0] == 'Only `CREATE` transactions can have null inputs'
|
|
|
|
def test_non_create_input_not_found(self, b, user_vk, signed_transfer_tx):
|
|
from bigchaindb.common.exceptions import TransactionDoesNotExist
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
|
|
signed_transfer_tx.fulfillments[0].tx_input = TransactionLink('c', 0)
|
|
with pytest.raises(TransactionDoesNotExist):
|
|
b.validate_transaction(signed_transfer_tx)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_non_create_valid_input_wrong_owner(self, b, user_vk):
|
|
from bigchaindb.common.crypto import generate_key_pair
|
|
from bigchaindb.common.exceptions import InvalidSignature
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_transaction = b.get_transaction(input_tx.txid)
|
|
sk, vk = generate_key_pair()
|
|
tx = Transaction.create([vk], [user_vk])
|
|
tx.operation = 'TRANSFER'
|
|
tx.asset = input_transaction.asset
|
|
tx.fulfillments[0].tx_input = input_tx
|
|
|
|
with pytest.raises(InvalidSignature):
|
|
b.validate_transaction(tx)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_non_create_double_spend(self, b, signed_create_tx,
|
|
signed_transfer_tx):
|
|
from bigchaindb.common.exceptions import DoubleSpend
|
|
|
|
block1 = b.create_block([signed_create_tx])
|
|
b.write_block(block1)
|
|
|
|
# vote block valid
|
|
vote = b.vote(block1.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
b.write_transaction(signed_transfer_tx)
|
|
block = b.create_block([signed_transfer_tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote block valid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
sleep(1)
|
|
|
|
signed_transfer_tx.timestamp = 123
|
|
# FIXME: https://github.com/bigchaindb/bigchaindb/issues/592
|
|
with pytest.raises(DoubleSpend):
|
|
b.validate_transaction(signed_transfer_tx)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_valid_non_create_transaction_after_block_creation(self, b,
|
|
user_vk,
|
|
user_sk):
|
|
from bigchaindb.models import Transaction
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
transfer_tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
transfer_tx = transfer_tx.sign([user_sk])
|
|
|
|
assert transfer_tx == b.validate_transaction(transfer_tx)
|
|
|
|
# create block
|
|
block = b.create_block([transfer_tx])
|
|
assert b.validate_block(block) == block
|
|
b.write_block(block, durability='hard')
|
|
|
|
# check that the transaction is still valid after being written to the
|
|
# bigchain
|
|
assert transfer_tx == b.validate_transaction(transfer_tx)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_fulfillment_not_in_valid_block(self, b, user_vk, user_sk):
|
|
from bigchaindb.models import Transaction
|
|
from bigchaindb.common.exceptions import FulfillmentNotInValidBlock
|
|
|
|
input_tx = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(input_tx.txid)
|
|
inputs = input_tx.to_inputs()
|
|
|
|
# create a transaction that's valid but not in a voted valid block
|
|
transfer_tx = Transaction.transfer(inputs, [user_vk], input_tx.asset)
|
|
transfer_tx = transfer_tx.sign([user_sk])
|
|
|
|
assert transfer_tx == b.validate_transaction(transfer_tx)
|
|
|
|
# create block
|
|
block = b.create_block([transfer_tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# create transaction with the undecided input
|
|
tx_invalid = Transaction.transfer(transfer_tx.to_inputs(), [user_vk],
|
|
transfer_tx.asset)
|
|
tx_invalid = tx_invalid.sign([user_sk])
|
|
|
|
with pytest.raises(FulfillmentNotInValidBlock):
|
|
b.validate_transaction(tx_invalid)
|
|
|
|
|
|
class TestBlockValidation(object):
|
|
@pytest.mark.skipif(reason='Separated tx validation from block creation.')
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_invalid_transactions_in_block(self, b, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.common.exceptions import TransactionOwnerError
|
|
from bigchaindb.common.util import gen_timestamp
|
|
|
|
from bigchaindb import util
|
|
|
|
# invalid transaction
|
|
valid_input = b.get_owned_ids(user_vk).pop()
|
|
tx_invalid = b.create_transaction('a', 'b', valid_input, 'c')
|
|
|
|
block = b.create_block([tx_invalid])
|
|
|
|
# create a block with invalid transactions
|
|
block = {
|
|
'timestamp': gen_timestamp(),
|
|
'transactions': [tx_invalid],
|
|
'node_pubkey': b.me,
|
|
'voters': b.nodes_except_me
|
|
}
|
|
|
|
# NOTE: This is not the correct function anymore, but this test is
|
|
# skipped
|
|
block_data = util.serialize_block(block)
|
|
block_hash = crypto.hash_data(block_data)
|
|
block_signature = crypto.PrivateKey(b.me_private).sign(block_data)
|
|
|
|
block = {
|
|
'id': block_hash,
|
|
'block': block,
|
|
'signature': block_signature,
|
|
'votes': []
|
|
}
|
|
|
|
with pytest.raises(TransactionOwnerError) as excinfo:
|
|
# TODO: Adjust this to the new Block model (test is currently
|
|
# skipped.
|
|
b.validate_block(block)
|
|
|
|
assert excinfo.value.args[0] == 'owner_before `a` does not own the input `{}`'.format(valid_input)
|
|
|
|
def test_invalid_signature(self, b):
|
|
from bigchaindb.common.exceptions import InvalidSignature
|
|
from bigchaindb.common import crypto
|
|
|
|
# create a valid block
|
|
block = dummy_block()
|
|
|
|
# replace the block signature with an invalid one
|
|
block.signature = crypto.PrivateKey(b.me_private).sign(b'wrongdata')
|
|
|
|
# check that validate_block raises an InvalidSignature exception
|
|
with pytest.raises(InvalidSignature):
|
|
b.validate_block(block)
|
|
|
|
def test_invalid_node_pubkey(self, b):
|
|
from bigchaindb.common.exceptions import OperationError
|
|
from bigchaindb.common import crypto
|
|
|
|
# blocks can only be created by a federation node
|
|
# create a valid block
|
|
block = dummy_block()
|
|
|
|
# create some temp keys
|
|
tmp_sk, tmp_vk = crypto.generate_key_pair()
|
|
|
|
# change the block node_pubkey
|
|
block.node_pubkey = tmp_vk
|
|
|
|
# just to make sure lets re-hash the block and create a valid signature
|
|
# from a non federation node
|
|
block = block.sign(tmp_sk)
|
|
|
|
# check that validate_block raises an OperationError
|
|
with pytest.raises(OperationError):
|
|
b.validate_block(block)
|
|
|
|
|
|
class TestMultipleInputs(object):
|
|
def test_transfer_single_owner_single_input(self, b, inputs, user_vk,
|
|
user_sk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
tx_link = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(tx_link.txid)
|
|
inputs = input_tx.to_inputs()
|
|
tx = Transaction.transfer(inputs, [user2_vk], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
|
|
# validate transaction
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == 1
|
|
assert len(tx.conditions) == 1
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_transfer_single_owners_multiple_inputs(self, b, user_sk, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
# get inputs
|
|
owned_inputs = b.get_owned_ids(user_vk)
|
|
input_txs = [b.get_transaction(tx_link.txid) for tx_link
|
|
in owned_inputs]
|
|
inputs = sum([input_tx.to_inputs() for input_tx in input_txs], [])
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user_vk]])
|
|
tx = tx.sign([user_sk])
|
|
assert b.validate_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == len(inputs)
|
|
assert len(tx.conditions) == len(inputs)
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_transfer_single_owners_single_input_from_multiple_outputs(self, b,
|
|
user_sk,
|
|
user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
# get inputs
|
|
owned_inputs = b.get_owned_ids(user_vk)
|
|
input_txs = [b.get_transaction(tx_link.txid) for tx_link
|
|
in owned_inputs]
|
|
inputs = sum([input_tx.to_inputs() for input_tx in input_txs], [])
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user2_vk]])
|
|
tx = tx.sign([user_sk])
|
|
|
|
# create block with the transaction
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote block valid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
# get inputs from user2
|
|
owned_inputs = b.get_owned_ids(user2_vk)
|
|
assert len(owned_inputs) == len(inputs)
|
|
|
|
# create a transaction with a single input from a multiple output transaction
|
|
tx_link = owned_inputs.pop()
|
|
inputs = b.get_transaction(tx_link.txid).to_inputs([0])
|
|
tx = Transaction.transfer(inputs, [user_vk])
|
|
tx = tx.sign([user2_sk])
|
|
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == 1
|
|
assert len(tx.conditions) == 1
|
|
|
|
def test_single_owner_before_multiple_owners_after_single_input(self, b,
|
|
user_sk,
|
|
user_vk,
|
|
inputs):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
owned_inputs = b.get_owned_ids(user_vk)
|
|
tx_link = owned_inputs.pop()
|
|
input_tx = b.get_transaction(tx_link.txid)
|
|
tx = Transaction.transfer(input_tx.to_inputs(), [[user2_vk, user3_vk]], input_tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == 1
|
|
assert len(tx.conditions) == 1
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_single_owner_before_multiple_owners_after_multiple_inputs(self, b,
|
|
user_sk,
|
|
user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
owned_inputs = b.get_owned_ids(user_vk)
|
|
input_txs = [b.get_transaction(tx_link.txid) for tx_link
|
|
in owned_inputs]
|
|
inputs = sum([input_tx.to_inputs() for input_tx in input_txs], [])
|
|
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user2_vk, user3_vk]])
|
|
tx = tx.sign([user_sk])
|
|
|
|
# create block with the transaction
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote block valid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
# validate transaction
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == len(inputs)
|
|
assert len(tx.conditions) == len(inputs)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_multiple_owners_before_single_owner_after_single_input(self, b,
|
|
user_sk,
|
|
user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk, user2_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote block valid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
owned_input = b.get_owned_ids(user_vk).pop()
|
|
input_tx = b.get_transaction(owned_input.txid)
|
|
inputs = input_tx.to_inputs()
|
|
|
|
transfer_tx = Transaction.transfer(inputs, [user3_vk], input_tx.asset)
|
|
transfer_tx = transfer_tx.sign([user_sk, user2_sk])
|
|
|
|
# validate transaction
|
|
assert b.is_valid_transaction(transfer_tx) == transfer_tx
|
|
assert len(transfer_tx.fulfillments) == 1
|
|
assert len(transfer_tx.conditions) == 1
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
@pytest.mark.usefixtures('inputs_shared')
|
|
def test_multiple_owners_before_single_owner_after_multiple_inputs(self, b,
|
|
user_sk, user_vk, user2_vk, user2_sk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
# create a new users
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
tx_links = b.get_owned_ids(user_vk)
|
|
inputs = sum([b.get_transaction(tx_link.txid).to_inputs() for tx_link
|
|
in tx_links], [])
|
|
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user3_vk]])
|
|
tx = tx.sign([user_sk, user2_sk])
|
|
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == len(inputs)
|
|
assert len(tx.conditions) == len(inputs)
|
|
|
|
@pytest.mark.usefixtures('inputs')
|
|
def test_multiple_owners_before_multiple_owners_after_single_input(self, b,
|
|
user_sk,
|
|
user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
user4_sk, user4_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk, user2_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote block valid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, True)
|
|
b.write_vote(vote)
|
|
|
|
# get input
|
|
tx_link = b.get_owned_ids(user_vk).pop()
|
|
tx_input = b.get_transaction(tx_link.txid)
|
|
|
|
tx = Transaction.transfer(tx_input.to_inputs(), [[user3_vk, user4_vk]], tx_input.asset)
|
|
tx = tx.sign([user_sk, user2_sk])
|
|
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == 1
|
|
assert len(tx.conditions) == 1
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
@pytest.mark.usefixtures('inputs_shared')
|
|
def test_multiple_owners_before_multiple_owners_after_multiple_inputs(self, b,
|
|
user_sk, user_vk,
|
|
user2_sk, user2_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
# create a new users
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
user4_sk, user4_vk = crypto.generate_key_pair()
|
|
|
|
tx_links = b.get_owned_ids(user_vk)
|
|
inputs = sum([b.get_transaction(tx_link.txid).to_inputs() for tx_link
|
|
in tx_links], [])
|
|
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user3_vk, user4_vk]])
|
|
tx = tx.sign([user_sk, user2_sk])
|
|
|
|
assert b.is_valid_transaction(tx) == tx
|
|
assert len(tx.fulfillments) == len(inputs)
|
|
assert len(tx.conditions) == len(inputs)
|
|
|
|
def test_get_owned_ids_single_tx_single_output(self, b, user_sk, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
assert owned_inputs_user1 == [TransactionLink(tx.id, 0)]
|
|
assert owned_inputs_user2 == []
|
|
|
|
tx = Transaction.transfer(tx.to_inputs(), [user2_vk], tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
assert owned_inputs_user1 == []
|
|
assert owned_inputs_user2 == [TransactionLink(tx.id, 0)]
|
|
|
|
def test_get_owned_ids_single_tx_single_output_invalid_block(self, b,
|
|
user_sk,
|
|
user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
from bigchaindb.models import Transaction
|
|
|
|
genesis = b.create_genesis_block()
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block VALID
|
|
vote = b.vote(block.id, genesis.id, True)
|
|
b.write_vote(vote)
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
assert owned_inputs_user1 == [TransactionLink(tx.id, 0)]
|
|
assert owned_inputs_user2 == []
|
|
|
|
# NOTE: The transaction itself is valid, still will mark the block
|
|
# as invalid to mock the behavior.
|
|
tx_invalid = Transaction.transfer(tx.to_inputs(), [user2_vk], tx.asset)
|
|
tx_invalid = tx_invalid.sign([user_sk])
|
|
block = b.create_block([tx_invalid])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block invalid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, False)
|
|
b.write_vote(vote)
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
|
|
# should be the same as before (note tx, not tx_invalid)
|
|
assert owned_inputs_user1 == [TransactionLink(tx.id, 0)]
|
|
assert owned_inputs_user2 == []
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
def test_get_owned_ids_single_tx_multiple_outputs(self, b, user_sk,
|
|
user_vk):
|
|
import random
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
transactions = []
|
|
for i in range(2):
|
|
payload = {'somedata': random.randint(0, 255)}
|
|
tx = Transaction.create([b.me], [user_vk], payload)
|
|
tx = tx.sign([b.me_private])
|
|
transactions.append(tx)
|
|
block = b.create_block(transactions)
|
|
b.write_block(block, durability='hard')
|
|
|
|
# get input
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
|
|
expected_owned_inputs_user1 = [TransactionLink(tx.id, 0) for tx
|
|
in transactions]
|
|
assert owned_inputs_user1 == expected_owned_inputs_user1
|
|
assert owned_inputs_user2 == []
|
|
|
|
inputs = sum([tx.to_inputs() for tx in transactions], [])
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user2_vk]])
|
|
tx = tx.sign([user_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
assert owned_inputs_user1 == []
|
|
assert owned_inputs_user2 == [TransactionLink(tx.id, 0),
|
|
TransactionLink(tx.id, 1)]
|
|
|
|
def test_get_owned_ids_multiple_owners(self, b, user_sk, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.common.transaction import TransactionLink
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk, user2_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
expected_owned_inputs_user1 = [TransactionLink(tx.id, 0)]
|
|
|
|
assert owned_inputs_user1 == owned_inputs_user2
|
|
assert owned_inputs_user1 == expected_owned_inputs_user1
|
|
|
|
tx = Transaction.transfer(tx.to_inputs(), [user3_vk], tx.asset)
|
|
tx = tx.sign([user_sk, user2_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
owned_inputs_user2 = b.get_owned_ids(user2_vk)
|
|
assert owned_inputs_user1 == owned_inputs_user2
|
|
assert owned_inputs_user1 == []
|
|
|
|
def test_get_spent_single_tx_single_output(self, b, user_sk, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk).pop()
|
|
|
|
# check spents
|
|
input_txid = owned_inputs_user1.txid
|
|
input_cid = owned_inputs_user1.cid
|
|
spent_inputs_user1 = b.get_spent(input_txid, input_cid)
|
|
assert spent_inputs_user1 is None
|
|
|
|
# create a transaction and block
|
|
tx = Transaction.transfer(tx.to_inputs(), [user2_vk], tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
spent_inputs_user1 = b.get_spent(input_txid, input_cid)
|
|
assert spent_inputs_user1 == tx
|
|
|
|
def test_get_spent_single_tx_single_output_invalid_block(self, b, user_sk, user_vk):
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
genesis = b.create_genesis_block()
|
|
|
|
# create a new users
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
tx = Transaction.create([b.me], [user_vk])
|
|
tx = tx.sign([b.me_private])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block VALID
|
|
vote = b.vote(block.id, genesis.id, True)
|
|
b.write_vote(vote)
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk).pop()
|
|
|
|
# check spents
|
|
input_txid = owned_inputs_user1.txid
|
|
input_cid = owned_inputs_user1.cid
|
|
spent_inputs_user1 = b.get_spent(input_txid, input_cid)
|
|
assert spent_inputs_user1 is None
|
|
|
|
# create a transaction and block
|
|
tx = Transaction.transfer(tx.to_inputs(), [user2_vk], tx.asset)
|
|
tx = tx.sign([user_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# vote the block invalid
|
|
vote = b.vote(block.id, b.get_last_voted_block().id, False)
|
|
b.write_vote(vote)
|
|
# NOTE: I have no idea why this line is here
|
|
b.get_transaction(tx.id)
|
|
spent_inputs_user1 = b.get_spent(input_txid, input_cid)
|
|
|
|
# Now there should be no spents (the block is invalid)
|
|
assert spent_inputs_user1 is None
|
|
|
|
@pytest.mark.skipif(reason=('Multiple inputs are only allowed for the '
|
|
'same asset. Remove this after implementing ',
|
|
'multiple assets'))
|
|
def test_get_spent_single_tx_multiple_outputs(self, b, user_sk, user_vk):
|
|
import random
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
# create a new users
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
|
|
transactions = []
|
|
for i in range(3):
|
|
payload = {'somedata': random.randint(0, 255)}
|
|
tx = Transaction.create([b.me], [user_vk], payload)
|
|
tx = tx.sign([b.me_private])
|
|
transactions.append(tx)
|
|
block = b.create_block(transactions)
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
|
|
# check spents
|
|
for input_tx in owned_inputs_user1:
|
|
assert b.get_spent(input_tx.txid, input_tx.cid) is None
|
|
|
|
# select inputs to use
|
|
inputs = sum([tx.to_inputs() for tx in transactions[:2]], [])
|
|
|
|
# create a transaction and block
|
|
tx = Transaction.transfer(inputs, len(inputs) * [[user2_vk]])
|
|
tx = tx.sign([user_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# check that used inputs are marked as spent
|
|
for ffill in inputs:
|
|
assert b.get_spent(ffill.tx_input.txid, ffill.tx_input.cid) == tx
|
|
|
|
# check if remaining transaction that was unspent is also perceived
|
|
# spendable by BigchainDB
|
|
assert b.get_spent(transactions[2].id, 0) is None
|
|
|
|
def test_get_spent_multiple_owners(self, b, user_sk, user_vk):
|
|
import random
|
|
from bigchaindb.common import crypto
|
|
from bigchaindb.models import Transaction
|
|
|
|
user2_sk, user2_vk = crypto.generate_key_pair()
|
|
user3_sk, user3_vk = crypto.generate_key_pair()
|
|
|
|
transactions = []
|
|
for i in range(3):
|
|
payload = {'somedata': random.randint(0, 255)}
|
|
tx = Transaction.create([b.me], [user_vk, user2_vk], payload)
|
|
tx = tx.sign([b.me_private])
|
|
transactions.append(tx)
|
|
block = b.create_block(transactions)
|
|
b.write_block(block, durability='hard')
|
|
|
|
owned_inputs_user1 = b.get_owned_ids(user_vk)
|
|
|
|
# check spents
|
|
for input_tx in owned_inputs_user1:
|
|
assert b.get_spent(input_tx.txid, input_tx.cid) is None
|
|
|
|
# create a transaction
|
|
tx = Transaction.transfer(transactions[0].to_inputs(), [user3_vk], transactions[0].asset)
|
|
tx = tx.sign([user_sk, user2_sk])
|
|
block = b.create_block([tx])
|
|
b.write_block(block, durability='hard')
|
|
|
|
# check that used inputs are marked as spent
|
|
assert b.get_spent(transactions[0].id, 0) == tx
|
|
|
|
# check that the other remain marked as unspent
|
|
for unspent in transactions[1:]:
|
|
assert b.get_spent(unspent.id, 0) is None
|