mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Problem: The pipelines library is not used anymore. (#2286)
Solution: Remove pipelines. This is part of the BigchainDB 2.* migration cleanup.
This commit is contained in:
parent
b9bceb7e3a
commit
2c171312cf
@ -26,10 +26,6 @@ Methods for managing the configuration, including loading configuration files, a
|
||||
|
||||
## Folders
|
||||
|
||||
### [`pipelines`](./pipelines)
|
||||
|
||||
Structure and implementation of various subprocesses started in [`processes.py`](./processes.py).
|
||||
|
||||
### [`commands`](./commands)
|
||||
|
||||
Contains code for the [CLI](https://docs.bigchaindb.com/projects/server/en/latest/server-reference/bigchaindb-cli.html) for BigchainDB.
|
||||
|
@ -1,39 +0,0 @@
|
||||
# Overview
|
||||
|
||||
Code in this module concerns the long-running processes of BigchainDB. Some are run as single processes while others may run as many processes in parallel. When changes are detected in the `backlog`, `block`, or `votes` tables, they are handled here. Everything is started in [`processes.py`](../processes.py).
|
||||
|
||||
All the classes defined in these files depend heavily on the [`multipipes`](https://github.com/bigchaindb/multipipes/) library. Each contains a static method `create_pipeline` which describes how the `Pipeline` is set up. Consider `votes.py`:
|
||||
|
||||
```python
|
||||
vote_pipeline = Pipeline([
|
||||
Node(voter.validate_block),
|
||||
Node(voter.ungroup),
|
||||
Node(voter.validate_tx, fraction_of_cores=1),
|
||||
Node(voter.vote),
|
||||
Node(voter.write_vote)
|
||||
])
|
||||
```
|
||||
|
||||
The process flow is described here: an incoming block is validated, then the transactions are ungrouped, validated individually (using all available cores in parallel), a vote is created, and finally written to the votes table.
|
||||
|
||||
## Files
|
||||
|
||||
### [`block.py`](./block.py)
|
||||
|
||||
Handles inserts and updates to the backlog. When a node adds a transaction to the backlog, a `BlockPipeline` instance will verify it. If the transaction is valid, it will add it to a new block; otherwise, it's dropped. Finally, after a block accumulates 1000 transactions or a timeout is reached, the process will write the block.
|
||||
|
||||
### [`election.py`](./election.py)
|
||||
|
||||
Listens for inserts to the vote table. When a new vote comes in, checks if there are enough votes on that block to declare it valid or invalid. If the block has been elected invalid, the transactions in that block are put back in the backlog.
|
||||
|
||||
### [`stale.py`](./stale.py)
|
||||
|
||||
Doesn't listen for any changes. Periodically checks the backlog for transactions that have been there too long and assigns them to a new node if possible.
|
||||
|
||||
### [`vote.py`](./vote.py)
|
||||
|
||||
Listens for inserts to the bigchain table, then votes the blocks valid or invalid. When a new block is written, the node running this process checks the validity of the block by checking the validity of each transaction. Then the vote is created based on the block validity, and cast (written) to the votes table.
|
||||
|
||||
### [`utils.py`](./utils.py)
|
||||
|
||||
Contains the `ChangeFeed` class, an abstraction which combines `multipipes` with the RethinkDB changefeed.
|
@ -1,196 +0,0 @@
|
||||
"""This module takes care of all the logic related to block creation.
|
||||
|
||||
The logic is encapsulated in the ``BlockPipeline`` class, while the sequence
|
||||
of actions to do on transactions is specified in the ``create_pipeline``
|
||||
function.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from multipipes import Pipeline, Node, Pipe
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb import backend
|
||||
from bigchaindb.backend.changefeed import ChangeFeed
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.common.exceptions import (ValidationError,
|
||||
GenesisBlockAlreadyExistsError)
|
||||
from bigchaindb import Bigchain
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlockPipeline:
|
||||
"""This class encapsulates the logic to create blocks.
|
||||
|
||||
Note:
|
||||
Methods of this class will be executed in different processes.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the BlockPipeline creator"""
|
||||
self.bigchain = Bigchain()
|
||||
self.txs = tx_collector()
|
||||
|
||||
def filter_tx(self, tx):
|
||||
"""Filter a transaction.
|
||||
|
||||
Args:
|
||||
tx (dict): the transaction to process.
|
||||
|
||||
Returns:
|
||||
dict: The transaction if assigned to the current node,
|
||||
``None`` otherwise.
|
||||
"""
|
||||
if tx['assignee'] == self.bigchain.me:
|
||||
tx.pop('assignee')
|
||||
tx.pop('assignment_timestamp')
|
||||
return tx
|
||||
|
||||
def validate_tx(self, tx):
|
||||
"""Validate a transaction.
|
||||
|
||||
Also checks if the transaction already exists in the blockchain. If it
|
||||
does, or it's invalid, it's deleted from the backlog immediately.
|
||||
|
||||
Args:
|
||||
tx (dict): the transaction to validate.
|
||||
|
||||
Returns:
|
||||
:class:`~bigchaindb.models.Transaction`: The transaction if valid,
|
||||
``None`` otherwise.
|
||||
"""
|
||||
try:
|
||||
tx = Transaction.from_dict(tx)
|
||||
except ValidationError:
|
||||
return None
|
||||
|
||||
# If transaction is in any VALID or UNDECIDED block we
|
||||
# should not include it again
|
||||
if not self.bigchain.is_new_transaction(tx.id):
|
||||
self.bigchain.delete_transaction(tx.id)
|
||||
return None
|
||||
|
||||
# If transaction is not valid it should not be included
|
||||
try:
|
||||
# Do not allow an externally submitted GENESIS transaction.
|
||||
# A simple check is enough as a pipeline is started only after the
|
||||
# creation of GENESIS block, or after the verification of a GENESIS
|
||||
# block. Voting will fail at a later stage if the GENESIS block is
|
||||
# absent.
|
||||
if tx.operation == Transaction.GENESIS:
|
||||
raise GenesisBlockAlreadyExistsError('Duplicate GENESIS transaction')
|
||||
|
||||
tx.validate(self.bigchain)
|
||||
return tx
|
||||
except ValidationError as e:
|
||||
logger.warning('Invalid tx: %s', e)
|
||||
self.bigchain.delete_transaction(tx.id)
|
||||
return None
|
||||
|
||||
def create(self, tx, timeout=False):
|
||||
"""Create a block.
|
||||
|
||||
This method accumulates transactions to put in a block and outputs
|
||||
a block when one of the following conditions is true:
|
||||
- the size limit of the block has been reached, or
|
||||
- a timeout happened.
|
||||
|
||||
Args:
|
||||
tx (:class:`~bigchaindb.models.Transaction`): the transaction
|
||||
to validate, might be None if a timeout happens.
|
||||
timeout (bool): ``True`` if a timeout happened
|
||||
(Default: ``False``).
|
||||
|
||||
Returns:
|
||||
:class:`~bigchaindb.models.Block`: The block,
|
||||
if a block is ready, or ``None``.
|
||||
"""
|
||||
txs = self.txs.send(tx)
|
||||
if len(txs) == 1000 or (timeout and txs):
|
||||
block = self.bigchain.create_block(txs)
|
||||
self.txs = tx_collector()
|
||||
return block
|
||||
|
||||
def write(self, block):
|
||||
"""Write the block to the Database.
|
||||
|
||||
Args:
|
||||
block (:class:`~bigchaindb.models.Block`): the block of
|
||||
transactions to write to the database.
|
||||
|
||||
Returns:
|
||||
:class:`~bigchaindb.models.Block`: The Block.
|
||||
"""
|
||||
logger.info('Write new block %s with %s transactions',
|
||||
block.id, len(block.transactions))
|
||||
self.bigchain.write_block(block)
|
||||
return block
|
||||
|
||||
def delete_tx(self, block):
|
||||
"""Delete transactions.
|
||||
|
||||
Args:
|
||||
block (:class:`~bigchaindb.models.Block`): the block
|
||||
containg the transactions to delete.
|
||||
|
||||
Returns:
|
||||
:class:`~bigchaindb.models.Block`: The block.
|
||||
"""
|
||||
self.bigchain.delete_transaction(*[tx.id for tx in block.transactions])
|
||||
return block
|
||||
|
||||
|
||||
def tx_collector():
|
||||
"""A helper to deduplicate transactions"""
|
||||
|
||||
def snowflake():
|
||||
txids = set()
|
||||
txs = []
|
||||
while True:
|
||||
tx = yield txs
|
||||
if tx:
|
||||
if tx.id not in txids:
|
||||
txids.add(tx.id)
|
||||
txs.append(tx)
|
||||
else:
|
||||
logger.info('Refusing to add tx to block twice: ' +
|
||||
tx.id)
|
||||
|
||||
s = snowflake()
|
||||
s.send(None)
|
||||
return s
|
||||
|
||||
|
||||
def create_pipeline():
|
||||
"""Create and return the pipeline of operations to be distributed
|
||||
on different processes.
|
||||
"""
|
||||
|
||||
block_pipeline = BlockPipeline()
|
||||
|
||||
pipeline = Pipeline([
|
||||
Pipe(maxsize=1000),
|
||||
Node(block_pipeline.filter_tx),
|
||||
Node(block_pipeline.validate_tx, fraction_of_cores=1),
|
||||
Node(block_pipeline.create, timeout=1),
|
||||
Node(block_pipeline.write),
|
||||
Node(block_pipeline.delete_tx),
|
||||
])
|
||||
|
||||
return pipeline
|
||||
|
||||
|
||||
def get_changefeed():
|
||||
connection = backend.connect(**bigchaindb.config['database'])
|
||||
return backend.get_changefeed(connection, 'backlog',
|
||||
ChangeFeed.INSERT | ChangeFeed.UPDATE)
|
||||
|
||||
|
||||
def start():
|
||||
"""Create, start, and return the block pipeline."""
|
||||
pipeline = create_pipeline()
|
||||
pipeline.setup(indata=get_changefeed())
|
||||
pipeline.start()
|
||||
return pipeline
|
@ -1,104 +0,0 @@
|
||||
"""This module takes care of all the logic related to block status.
|
||||
|
||||
Specifically, what happens when a block becomes invalid. The logic is
|
||||
encapsulated in the ``Election`` class, while the sequence of actions
|
||||
is specified in ``create_pipeline``.
|
||||
"""
|
||||
import logging
|
||||
|
||||
from multipipes import Pipeline, Node
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb import backend
|
||||
from bigchaindb.backend.changefeed import ChangeFeed
|
||||
from bigchaindb.models import Block
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.events import EventTypes, Event
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger_results = logging.getLogger('pipeline.election.results')
|
||||
|
||||
|
||||
class Election:
|
||||
"""Election class."""
|
||||
|
||||
def __init__(self, events_queue=None):
|
||||
self.bigchain = Bigchain()
|
||||
self.events_queue = events_queue
|
||||
|
||||
def check_for_quorum(self, next_vote):
|
||||
"""Checks if block has enough invalid votes to make a decision
|
||||
|
||||
Args:
|
||||
next_vote: The next vote.
|
||||
|
||||
"""
|
||||
try:
|
||||
block_id = next_vote['vote']['voting_for_block']
|
||||
node = next_vote['node_pubkey']
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
next_block = self.bigchain.get_block(block_id)
|
||||
|
||||
result = self.bigchain.block_election(next_block)
|
||||
self.handle_block_events(result, block_id)
|
||||
if result['status'] == self.bigchain.BLOCK_INVALID:
|
||||
return Block.from_dict(next_block)
|
||||
|
||||
# Log the result
|
||||
if result['status'] != self.bigchain.BLOCK_UNDECIDED:
|
||||
msg = 'node:%s block:%s status:%s' % \
|
||||
(node, block_id, result['status'])
|
||||
# Extra data can be accessed via the log formatter.
|
||||
# See logging.dictConfig.
|
||||
logger_results.debug(msg, extra={
|
||||
'current_vote': next_vote,
|
||||
'election_result': result,
|
||||
})
|
||||
|
||||
def requeue_transactions(self, invalid_block):
|
||||
"""Liquidates transactions from invalid blocks so they can be processed again
|
||||
"""
|
||||
logger.info('Rewriting %s transactions from invalid block %s',
|
||||
len(invalid_block.transactions),
|
||||
invalid_block.id)
|
||||
for tx in invalid_block.transactions:
|
||||
self.bigchain.write_transaction(tx)
|
||||
return invalid_block
|
||||
|
||||
def handle_block_events(self, result, block_id):
|
||||
if self.events_queue:
|
||||
if result['status'] == self.bigchain.BLOCK_UNDECIDED:
|
||||
return
|
||||
elif result['status'] == self.bigchain.BLOCK_INVALID:
|
||||
event_type = EventTypes.BLOCK_INVALID
|
||||
elif result['status'] == self.bigchain.BLOCK_VALID:
|
||||
event_type = EventTypes.BLOCK_VALID
|
||||
|
||||
event = Event(event_type, self.bigchain.get_block(block_id))
|
||||
self.events_queue.put(event)
|
||||
|
||||
|
||||
def create_pipeline(events_queue=None):
|
||||
election = Election(events_queue=events_queue)
|
||||
|
||||
election_pipeline = Pipeline([
|
||||
Node(election.check_for_quorum),
|
||||
Node(election.requeue_transactions)
|
||||
])
|
||||
|
||||
return election_pipeline
|
||||
|
||||
|
||||
def get_changefeed():
|
||||
connection = backend.connect(**bigchaindb.config['database'])
|
||||
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)
|
||||
|
||||
|
||||
def start(events_queue=None):
|
||||
pipeline = create_pipeline(events_queue=events_queue)
|
||||
pipeline.setup(indata=get_changefeed())
|
||||
pipeline.start()
|
||||
return pipeline
|
@ -1,78 +0,0 @@
|
||||
"""This module monitors for stale transactions.
|
||||
|
||||
It reassigns transactions which have been assigned a node but
|
||||
remain in the backlog past a certain amount of time.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from multipipes import Pipeline, Node
|
||||
from bigchaindb import Bigchain
|
||||
from time import sleep
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StaleTransactionMonitor:
|
||||
"""This class encapsulates the logic for re-assigning stale transactions.
|
||||
|
||||
Note:
|
||||
Methods of this class will be executed in different processes.
|
||||
"""
|
||||
|
||||
def __init__(self, timeout=5, backlog_reassign_delay=None):
|
||||
"""Initialize StaleTransaction monitor
|
||||
|
||||
Args:
|
||||
timeout: how often to check for stale tx (in sec)
|
||||
backlog_reassign_delay: How stale a transaction should
|
||||
be before reassignment (in sec). If supplied, overrides
|
||||
the Bigchain default value.
|
||||
"""
|
||||
self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay)
|
||||
self.timeout = timeout
|
||||
|
||||
def check_transactions(self):
|
||||
"""Poll backlog for stale transactions
|
||||
|
||||
Returns:
|
||||
txs (list): txs to be re assigned
|
||||
"""
|
||||
sleep(self.timeout)
|
||||
for tx in self.bigchain.get_stale_transactions():
|
||||
yield tx
|
||||
|
||||
def reassign_transactions(self, tx):
|
||||
"""Put tx back in backlog with new assignee
|
||||
|
||||
Returns:
|
||||
transaction
|
||||
"""
|
||||
# NOTE: Maybe this is to verbose?
|
||||
logger.info('Reassigning transaction with id %s', tx['id'])
|
||||
self.bigchain.reassign_transaction(tx)
|
||||
return tx
|
||||
|
||||
|
||||
def create_pipeline(timeout=5, backlog_reassign_delay=5):
|
||||
"""Create and return the pipeline of operations to be distributed
|
||||
on different processes.
|
||||
"""
|
||||
|
||||
stm = StaleTransactionMonitor(timeout=timeout,
|
||||
backlog_reassign_delay=backlog_reassign_delay)
|
||||
|
||||
monitor_pipeline = Pipeline([
|
||||
Node(stm.check_transactions),
|
||||
Node(stm.reassign_transactions)
|
||||
])
|
||||
|
||||
return monitor_pipeline
|
||||
|
||||
|
||||
def start(timeout=5, backlog_reassign_delay=None):
|
||||
"""Create, start, and return the block pipeline."""
|
||||
pipeline = create_pipeline(timeout=timeout,
|
||||
backlog_reassign_delay=backlog_reassign_delay)
|
||||
pipeline.start()
|
||||
return pipeline
|
@ -1,187 +0,0 @@
|
||||
"""This module takes care of all the logic related to block voting.
|
||||
|
||||
The logic is encapsulated in the ``Vote`` class, while the sequence
|
||||
of actions to do on transactions is specified in the ``create_pipeline``
|
||||
function.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import Counter
|
||||
|
||||
from multipipes import Pipeline, Node
|
||||
|
||||
from bigchaindb import backend, Bigchain
|
||||
from bigchaindb.models import Transaction, Block, FastTransaction
|
||||
from bigchaindb.common import exceptions
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Vote:
|
||||
"""This class encapsulates the logic to vote on blocks.
|
||||
|
||||
Note:
|
||||
Methods of this class will be executed in different processes.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the Block voter."""
|
||||
|
||||
# Since cannot share a connection to RethinkDB using multiprocessing,
|
||||
# we need to create a temporary instance of BigchainDB that we use
|
||||
# only to query RethinkDB
|
||||
|
||||
# This is the Bigchain instance that will be "shared" (aka: copied)
|
||||
# by all the subprocesses
|
||||
|
||||
self.bigchain = Bigchain()
|
||||
self.last_voted_id = Bigchain().get_last_voted_block().id
|
||||
|
||||
self.counters = Counter()
|
||||
self.blocks_validity_status = {}
|
||||
|
||||
dummy_tx = Transaction.create([self.bigchain.me],
|
||||
[([self.bigchain.me], 1)]).to_dict()
|
||||
self.invalid_dummy_tx = dummy_tx
|
||||
|
||||
def validate_block(self, block_dict):
|
||||
if not self.bigchain.has_previous_vote(block_dict['id']):
|
||||
try:
|
||||
block = Block.from_db(self.bigchain, block_dict, from_dict_kwargs={
|
||||
'tx_construct': FastTransaction
|
||||
})
|
||||
except (exceptions.InvalidHash):
|
||||
# XXX: if a block is invalid we should skip the `validate_tx`
|
||||
# step, but since we are in a pipeline we cannot just jump to
|
||||
# another function. Hackish solution: generate an invalid
|
||||
# transaction and propagate it to the next steps of the
|
||||
# pipeline.
|
||||
return block_dict['id'], [self.invalid_dummy_tx]
|
||||
try:
|
||||
block._validate_block(self.bigchain)
|
||||
except exceptions.ValidationError:
|
||||
# XXX: if a block is invalid we should skip the `validate_tx`
|
||||
# step, but since we are in a pipeline we cannot just jump to
|
||||
# another function. Hackish solution: generate an invalid
|
||||
# transaction and propagate it to the next steps of the
|
||||
# pipeline.
|
||||
return block.id, [self.invalid_dummy_tx]
|
||||
return block.id, block_dict['block']['transactions']
|
||||
|
||||
def ungroup(self, block_id, transactions):
|
||||
"""Given a block, ungroup the transactions in it.
|
||||
|
||||
Args:
|
||||
block_id (str): the id of the block in progress.
|
||||
transactions (list(dict)): transactions of the block in
|
||||
progress.
|
||||
|
||||
Returns:
|
||||
``None`` if the block has been already voted, an iterator that
|
||||
yields a transaction, block id, and the total number of
|
||||
transactions contained in the block otherwise.
|
||||
"""
|
||||
|
||||
num_tx = len(transactions)
|
||||
for tx in transactions:
|
||||
yield tx, block_id, num_tx
|
||||
|
||||
def validate_tx(self, tx_dict, block_id, num_tx):
|
||||
"""Validate a transaction. Transaction must also not be in any VALID
|
||||
block.
|
||||
|
||||
Args:
|
||||
tx_dict (dict): the transaction to validate
|
||||
block_id (str): the id of block containing the transaction
|
||||
num_tx (int): the total number of transactions to process
|
||||
|
||||
Returns:
|
||||
Three values are returned, the validity of the transaction,
|
||||
``block_id``, ``num_tx``.
|
||||
"""
|
||||
|
||||
try:
|
||||
tx = Transaction.from_dict(tx_dict)
|
||||
new = self.bigchain.is_new_transaction(tx.id, exclude_block_id=block_id)
|
||||
if not new:
|
||||
raise exceptions.ValidationError('Tx already exists, %s', tx.id)
|
||||
tx.validate(self.bigchain)
|
||||
valid = True
|
||||
except exceptions.ValidationError as e:
|
||||
valid = False
|
||||
logger.warning('Invalid tx: %s', e)
|
||||
|
||||
return valid, block_id, num_tx
|
||||
|
||||
def vote(self, tx_validity, block_id, num_tx):
|
||||
"""Collect the validity of transactions and cast a vote when ready.
|
||||
|
||||
Args:
|
||||
tx_validity (bool): the validity of the transaction
|
||||
block_id (str): the id of block containing the transaction
|
||||
num_tx (int): the total number of transactions to process
|
||||
|
||||
Returns:
|
||||
None, or a vote if a decision has been reached.
|
||||
"""
|
||||
|
||||
self.counters[block_id] += 1
|
||||
self.blocks_validity_status[block_id] = tx_validity and self.blocks_validity_status.get(block_id,
|
||||
True)
|
||||
|
||||
if self.counters[block_id] == num_tx:
|
||||
vote = self.bigchain.vote(block_id,
|
||||
self.last_voted_id,
|
||||
self.blocks_validity_status[block_id])
|
||||
self.last_voted_id = block_id
|
||||
del self.counters[block_id]
|
||||
del self.blocks_validity_status[block_id]
|
||||
return vote, num_tx
|
||||
|
||||
def write_vote(self, vote, num_tx):
|
||||
"""Write vote to the database.
|
||||
|
||||
Args:
|
||||
vote: the vote to write.
|
||||
"""
|
||||
validity = 'valid' if vote['vote']['is_block_valid'] else 'invalid'
|
||||
logger.info("Voting '%s' for block %s", validity,
|
||||
vote['vote']['voting_for_block'])
|
||||
self.bigchain.write_vote(vote)
|
||||
return vote
|
||||
|
||||
|
||||
def create_pipeline():
|
||||
"""Create and return the pipeline of operations to be distributed
|
||||
on different processes.
|
||||
"""
|
||||
|
||||
voter = Vote()
|
||||
|
||||
return Pipeline([
|
||||
Node(voter.validate_block),
|
||||
Node(voter.ungroup),
|
||||
Node(voter.validate_tx, fraction_of_cores=1),
|
||||
Node(voter.vote),
|
||||
Node(voter.write_vote)
|
||||
])
|
||||
|
||||
|
||||
def get_changefeed():
|
||||
"""Create and return ordered changefeed of blocks starting from
|
||||
last voted block
|
||||
"""
|
||||
b = Bigchain()
|
||||
last_block_id = b.get_last_voted_block().id
|
||||
feed = backend.query.get_new_blocks_feed(b.connection, last_block_id)
|
||||
return Node(feed.__next__, name='changefeed')
|
||||
|
||||
|
||||
def start():
|
||||
"""Create, start, and return the block pipeline."""
|
||||
|
||||
pipeline = create_pipeline()
|
||||
pipeline.setup(indata=get_changefeed())
|
||||
pipeline.start()
|
||||
return pipeline
|
@ -1,82 +0,0 @@
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb import config_utils
|
||||
from bigchaindb.pipelines import vote, block, election, stale
|
||||
from bigchaindb.events import Exchange, EventTypes
|
||||
from bigchaindb.web import server, websocket_server
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BANNER = """
|
||||
****************************************************************************
|
||||
* *
|
||||
* Initialization complete. BigchainDB Server is ready and waiting. *
|
||||
* You can send HTTP requests via the HTTP API documented in the *
|
||||
* BigchainDB Server docs at: *
|
||||
* https://bigchaindb.com/http-api *
|
||||
* *
|
||||
* Listening to client connections on: {:<15} *
|
||||
* *
|
||||
****************************************************************************
|
||||
"""
|
||||
|
||||
|
||||
def start_events_plugins(exchange):
|
||||
plugins = config_utils.load_events_plugins(
|
||||
bigchaindb.config.get('events_plugins'))
|
||||
|
||||
for name, plugin in plugins:
|
||||
logger.info('Loading events plugin %s', name)
|
||||
|
||||
event_types = getattr(plugin, 'event_types', None)
|
||||
queue = exchange.get_subscriber_queue(event_types)
|
||||
|
||||
mp.Process(name='events_plugin_{}'.format(name),
|
||||
target=plugin.run,
|
||||
args=(queue, )).start()
|
||||
|
||||
|
||||
def start():
|
||||
logger.info('Initializing BigchainDB...')
|
||||
|
||||
# Create a Exchange object.
|
||||
# The events queue needs to be initialized once and shared between
|
||||
# processes. This seems the best way to do it
|
||||
# At this point only the election processs and the event consumer require
|
||||
# this queue.
|
||||
exchange = Exchange()
|
||||
|
||||
# start the processes
|
||||
logger.info('Starting block')
|
||||
block.start()
|
||||
|
||||
logger.info('Starting voter')
|
||||
vote.start()
|
||||
|
||||
logger.info('Starting stale transaction monitor')
|
||||
stale.start()
|
||||
|
||||
logger.info('Starting election')
|
||||
election.start(events_queue=exchange.get_publisher_queue())
|
||||
|
||||
# start the web api
|
||||
app_server = server.create_server(settings=bigchaindb.config['server'],
|
||||
log_config=bigchaindb.config['log'])
|
||||
p_webapi = mp.Process(name='webapi', target=app_server.run)
|
||||
p_webapi.start()
|
||||
|
||||
logger.info('WebSocket server started')
|
||||
p_websocket_server = mp.Process(name='ws',
|
||||
target=websocket_server.start,
|
||||
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
|
||||
p_websocket_server.start()
|
||||
|
||||
# start message
|
||||
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
|
||||
|
||||
start_events_plugins(exchange)
|
||||
|
||||
exchange.run()
|
@ -1,15 +0,0 @@
|
||||
import pytest
|
||||
from bigchaindb.pipelines import block, election, vote, stale
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def processes(genesis_block):
|
||||
block_maker = block.start()
|
||||
voter = vote.start()
|
||||
election_runner = election.start()
|
||||
stale_monitor = stale.start()
|
||||
yield
|
||||
block_maker.terminate()
|
||||
voter.terminate()
|
||||
election_runner.terminate()
|
||||
stale_monitor.terminate()
|
@ -1,184 +0,0 @@
|
||||
from copy import deepcopy
|
||||
import pytest
|
||||
import random
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.core import Bigchain
|
||||
from contextlib import contextmanager
|
||||
from bigchaindb.common.crypto import generate_key_pair
|
||||
from tests.pipelines.stepping import create_stepper
|
||||
|
||||
|
||||
################################################################################
|
||||
# Test setup code
|
||||
|
||||
|
||||
@contextmanager
|
||||
def federation(n):
|
||||
"""Return a list of Bigchain objects and pipeline steppers to represent
|
||||
a BigchainDB federation
|
||||
"""
|
||||
keys = [generate_key_pair() for _ in range(n)]
|
||||
config_orig = bigchaindb.config
|
||||
|
||||
@contextmanager
|
||||
def make_nodes(i):
|
||||
"""make_nodes is a recursive context manager. Essentially it is doing:
|
||||
|
||||
with f(a[0]) as b0:
|
||||
with f(a[1]) as b1:
|
||||
with f(a[2]) as b2:
|
||||
yield [b0, b1, b2]
|
||||
|
||||
with an arbitrary depth. It is also temporarily patching global
|
||||
configuration to simulate nodes with separate identities.
|
||||
"""
|
||||
nonlocal keys
|
||||
if i == 0:
|
||||
yield []
|
||||
else:
|
||||
config = deepcopy(config_orig)
|
||||
keys = [keys[-1]] + keys[:-1] # Rotate keys
|
||||
config['keyring'] = [pub for _, pub in keys[1:]]
|
||||
config['keypair']['private'] = keys[0][0]
|
||||
config['keypair']['public'] = keys[0][1]
|
||||
bigchaindb.config = config
|
||||
stepper = create_stepper()
|
||||
with stepper.start():
|
||||
node = (Bigchain(), stepper)
|
||||
with make_nodes(i-1) as rest:
|
||||
yield [node] + rest
|
||||
|
||||
with make_nodes(n) as steppers:
|
||||
bigchaindb.config = config_orig
|
||||
yield zip(*steppers)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def federation_3():
|
||||
with federation(3) as f:
|
||||
yield f
|
||||
|
||||
|
||||
def process_tx(steps):
|
||||
steps.block_changefeed(timeout=1)
|
||||
if steps.block_filter_tx():
|
||||
steps.block_validate_tx()
|
||||
steps.block_create(timeout=True)
|
||||
steps.block_write()
|
||||
steps.block_delete_tx()
|
||||
|
||||
|
||||
def input_single_create(b):
|
||||
from bigchaindb.common.transaction import Transaction
|
||||
metadata = {'r': random.random()}
|
||||
tx = Transaction.create([b.me], [([b.me], 1)], metadata).sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
return tx
|
||||
|
||||
|
||||
def process_vote(steps, result=None):
|
||||
steps.vote_changefeed()
|
||||
steps.vote_validate_block()
|
||||
steps.vote_ungroup()
|
||||
steps.vote_validate_tx()
|
||||
if result is not None:
|
||||
steps.queues['vote_vote'][0][0] = result
|
||||
vote = steps.vote_vote()
|
||||
steps.vote_write_vote()
|
||||
return vote
|
||||
|
||||
|
||||
################################################################################
|
||||
# Tests here on down
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
@pytest.mark.skip_travis_rdb
|
||||
def test_elect_valid(federation_3):
|
||||
[bx, (s0, s1, s2)] = federation_3
|
||||
tx = input_single_create(bx[0])
|
||||
process_tx(s0)
|
||||
process_tx(s1)
|
||||
process_tx(s2)
|
||||
process_vote(s2, False)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'undecided'
|
||||
process_vote(s0, True)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'undecided'
|
||||
process_vote(s1, True)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'valid'
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
@pytest.mark.skip_travis_rdb
|
||||
def test_elect_invalid(federation_3):
|
||||
[bx, (s0, s1, s2)] = federation_3
|
||||
tx = input_single_create(bx[0])
|
||||
process_tx(s0)
|
||||
process_tx(s1)
|
||||
process_tx(s2)
|
||||
process_vote(s1, True)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'undecided'
|
||||
process_vote(s2, False)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'undecided'
|
||||
process_vote(s0, False)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] is None
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
@pytest.mark.skip_travis_rdb
|
||||
def test_elect_sybill(federation_3):
|
||||
[bx, (s0, s1, s2)] = federation_3
|
||||
tx = input_single_create(bx[0])
|
||||
process_tx(s0)
|
||||
process_tx(s1)
|
||||
process_tx(s2)
|
||||
# What we need is some votes from unknown nodes!
|
||||
# Incorrectly signed votes are ineligible.
|
||||
for s in [s0, s1, s2]:
|
||||
s.vote.bigchain.me_private = generate_key_pair()[0]
|
||||
process_vote(s0, True)
|
||||
process_vote(s1, True)
|
||||
process_vote(s2, True)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'undecided'
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_elect_dos(federation_3):
|
||||
"""https://github.com/bigchaindb/bigchaindb/issues/1314
|
||||
Test that a node cannot block another node's opportunity to vote
|
||||
on a block by writing an incorrectly signed vote
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@pytest.mark.skip('Revisit when we have block election status cache')
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_elect_bad_block_voters_list(federation_3):
|
||||
"""See https://github.com/bigchaindb/bigchaindb/issues/1224"""
|
||||
[bx, (s0, s1, s2)] = federation_3
|
||||
b = s0.block.bigchain
|
||||
# First remove other nodes from node 0 so that it self assigns the tx
|
||||
b.nodes_except_me = []
|
||||
tx = input_single_create(b)
|
||||
# Now create a block voters list which will not match other keyrings
|
||||
b.nodes_except_me = [bx[1].me]
|
||||
process_tx(s0)
|
||||
process_vote(s0)
|
||||
process_vote(s1)
|
||||
process_vote(s2)
|
||||
for i in range(3):
|
||||
assert bx[i].get_transaction(tx.id, True)[1] == 'invalid'
|
@ -1,62 +0,0 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = [pytest.mark.bdb, pytest.mark.usefixtures('processes')]
|
||||
|
||||
|
||||
@pytest.mark.serial
|
||||
def test_double_create(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.backend.query import count_blocks
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'test': 'test'}).sign([b.me_private])
|
||||
|
||||
b.write_transaction(tx)
|
||||
time.sleep(5)
|
||||
b.write_transaction(tx)
|
||||
time.sleep(5)
|
||||
tx_returned = b.get_transaction(tx.id)
|
||||
|
||||
# test that the tx can be queried
|
||||
assert tx_returned == tx
|
||||
# test the transaction appears only once
|
||||
last_voted_block = b.get_last_voted_block()
|
||||
assert len(last_voted_block.transactions) == 1
|
||||
assert count_blocks(b.connection) == 2
|
||||
|
||||
|
||||
@pytest.mark.dspend
|
||||
@pytest.mark.usefixtures('inputs')
|
||||
def test_get_owned_ids_works_after_double_spend(b, user_pk, user_sk):
|
||||
"""Test for #633 https://github.com/bigchaindb/bigchaindb/issues/633"""
|
||||
from bigchaindb.common.exceptions import DoubleSpend
|
||||
from bigchaindb.models import Transaction
|
||||
input_valid = b.get_owned_ids(user_pk).pop()
|
||||
input_valid = b.get_transaction(input_valid.txid)
|
||||
tx_valid = Transaction.transfer(input_valid.to_inputs(),
|
||||
[([user_pk], 1)],
|
||||
input_valid.id,
|
||||
{'1': 1}).sign([user_sk])
|
||||
|
||||
# write the valid tx and wait for voting/block to catch up
|
||||
b.write_transaction(tx_valid)
|
||||
time.sleep(5)
|
||||
|
||||
# doesn't throw an exception
|
||||
b.get_owned_ids(user_pk)
|
||||
|
||||
# create another transaction with the same input
|
||||
tx_double_spend = Transaction.transfer(input_valid.to_inputs(),
|
||||
[([user_pk], 1)],
|
||||
input_valid.id,
|
||||
{'2': 2}).sign([user_sk])
|
||||
|
||||
# write the double spend tx
|
||||
b.write_transaction(tx_double_spend)
|
||||
time.sleep(2)
|
||||
|
||||
# still doesn't throw an exception
|
||||
b.get_owned_ids(user_pk)
|
||||
with pytest.raises(DoubleSpend):
|
||||
b.validate_transaction(tx_double_spend)
|
@ -1,10 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from .stepping import create_stepper
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def steps():
|
||||
stepper = create_stepper()
|
||||
with stepper.start():
|
||||
yield stepper
|
@ -1,183 +0,0 @@
|
||||
"""Pipeline stepping is a way to advance the asynchronous data pipeline
|
||||
deterministically by exposing each step separately and advancing the states
|
||||
manually.
|
||||
|
||||
The multipipes.Pipeline class implements a pipeline that advanced
|
||||
asynchronously and concurrently. This module provides an interface to the
|
||||
BigchainDB pipelines that is static, ie, does not advance without prompting.
|
||||
|
||||
Rather than having a pipeline that is in an all or nothing running / not running
|
||||
state, one can do the following:
|
||||
|
||||
|
||||
steps = create_stepper()
|
||||
|
||||
with steps.start():
|
||||
tx = my_create_and_write_tx()
|
||||
steps.block_changefeed(timeout=1)
|
||||
steps.block_filter_tx()
|
||||
steps.block_validate_tx()
|
||||
steps.block_create(timeout=True)
|
||||
|
||||
assert steps.counts == {'block_write': 1}
|
||||
|
||||
Pending items are held in the `.queues` attribute, and every task has it's own
|
||||
queue (as in multipipes.Pipeline). Queues are just lists though so they can
|
||||
be easily inspected.
|
||||
|
||||
As a shortcut, the `.counts` attribute is provided which returns the number of
|
||||
pending items for each task. This is useful to assert the expected status of
|
||||
the queues after performing some sequence.
|
||||
"""
|
||||
|
||||
|
||||
import functools
|
||||
import time
|
||||
import types
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import patch
|
||||
|
||||
import bigchaindb.core
|
||||
import bigchaindb.pipelines.block
|
||||
import bigchaindb.pipelines.stale
|
||||
import bigchaindb.pipelines.vote
|
||||
|
||||
|
||||
class MultipipesStepper:
|
||||
def __init__(self):
|
||||
self.queues = {}
|
||||
self.tasks = {}
|
||||
self.input_tasks = set()
|
||||
self.processes = []
|
||||
|
||||
def add_input(self, prefix, node, next):
|
||||
"""Add an input task; Reads from the outqueue of the Node"""
|
||||
name = '%s_%s' % (prefix, node.name)
|
||||
next_name = '%s_%s' % (prefix, next.name)
|
||||
|
||||
if node.name == 'changefeed':
|
||||
self.processes.append(node)
|
||||
|
||||
def f(*args, **kwargs):
|
||||
_kwargs = {'timeout': 0.1}
|
||||
_kwargs.update(kwargs)
|
||||
return node.outqueue.get(*args, **kwargs)
|
||||
else:
|
||||
f = node.target
|
||||
|
||||
def inner(**kwargs):
|
||||
r = f(**kwargs)
|
||||
if r is not None:
|
||||
self._enqueue(next_name, r)
|
||||
return r
|
||||
|
||||
self.tasks[name] = functools.wraps(f)(inner)
|
||||
self.input_tasks.add(name)
|
||||
|
||||
def add_stage(self, prefix, node, next):
|
||||
"""Add a stage task, popping from own queue and appending to the queue
|
||||
of the next node
|
||||
"""
|
||||
f = node.target
|
||||
name = '%s_%s' % (prefix, node.name)
|
||||
if next:
|
||||
next_name = '%s_%s' % (prefix, next.name)
|
||||
|
||||
def inner(*args, **kwargs):
|
||||
out = f(*args, **kwargs)
|
||||
if out is not None and next:
|
||||
self._enqueue(next_name, out)
|
||||
return out
|
||||
|
||||
task = functools.wraps(f)(inner)
|
||||
self.tasks[name] = task
|
||||
|
||||
def _enqueue(self, name, item):
|
||||
"""Internal function; add item(s) to queue)"""
|
||||
queue = self.queues.setdefault(name, [])
|
||||
if isinstance(item, types.GeneratorType):
|
||||
items = list(item)
|
||||
else:
|
||||
items = [item]
|
||||
for item in items:
|
||||
if type(item) != tuple:
|
||||
item = (item,)
|
||||
queue.append(list(item))
|
||||
|
||||
def step(self, name, **kwargs):
|
||||
"""Advance pipeline stage. Throws Empty if no data to consume."""
|
||||
logging.debug('Stepping %s', name)
|
||||
task = self.tasks[name]
|
||||
if name in self.input_tasks:
|
||||
return task(**kwargs)
|
||||
else:
|
||||
queue = self.queues.get(name, [])
|
||||
if not queue:
|
||||
raise Empty(name)
|
||||
return task(*queue.pop(0), **kwargs)
|
||||
logging.debug('Stepped %s', name)
|
||||
|
||||
@property
|
||||
def counts(self):
|
||||
"""Get sizes of non empty queues"""
|
||||
counts = {}
|
||||
for name in self.queues:
|
||||
n = len(self.queues[name])
|
||||
if n:
|
||||
counts[name] = n
|
||||
return counts
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Shortcut to get a queue"""
|
||||
return lambda **kwargs: self.step(name, **kwargs)
|
||||
|
||||
@contextmanager
|
||||
def start(self):
|
||||
"""Start async inputs; changefeeds etc"""
|
||||
for p in self.processes:
|
||||
p.start()
|
||||
# It would be nice to have a better way to wait for changefeeds here.
|
||||
# We have to wait some amount of time because the feed setup is
|
||||
# happening in a different process and won't include any writes we
|
||||
# perform before it is ready.
|
||||
time.sleep(0.2)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
for p in self.processes:
|
||||
p.terminate()
|
||||
|
||||
|
||||
class Empty(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _update_stepper(stepper, prefix, pipeline):
|
||||
nodes = pipeline.nodes
|
||||
for i in range(len(nodes)):
|
||||
n0 = nodes[i]
|
||||
n1 = (nodes + [None])[i+1]
|
||||
f = stepper.add_input if i == 0 else stepper.add_stage
|
||||
f(prefix, n0, n1)
|
||||
# Expose pipeline state
|
||||
setattr(stepper, prefix, nodes[-1].target.__self__)
|
||||
|
||||
|
||||
def create_stepper():
|
||||
stepper = MultipipesStepper()
|
||||
|
||||
with patch('bigchaindb.pipelines.block.Pipeline.start'):
|
||||
pipeline = bigchaindb.pipelines.block.start()
|
||||
_update_stepper(stepper, 'block', pipeline)
|
||||
|
||||
with patch('bigchaindb.pipelines.stale.Pipeline.start'):
|
||||
pipeline = bigchaindb.pipelines.stale.start(
|
||||
timeout=0, backlog_reassign_delay=0)
|
||||
_update_stepper(stepper, 'stale', pipeline)
|
||||
|
||||
with patch('bigchaindb.pipelines.vote.Pipeline.start'):
|
||||
pipeline = bigchaindb.pipelines.vote.start()
|
||||
_update_stepper(stepper, 'vote', pipeline)
|
||||
|
||||
return stepper
|
@ -1,238 +0,0 @@
|
||||
import random
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
from multipipes import Pipe
|
||||
import pytest
|
||||
|
||||
|
||||
def test_filter_by_assignee(b, signed_create_tx):
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
|
||||
block_maker = BlockPipeline()
|
||||
|
||||
tx = signed_create_tx.to_dict()
|
||||
tx.update({'assignee': b.me, 'assignment_timestamp': 111})
|
||||
|
||||
# filter_tx has side effects on the `tx` instance by popping 'assignee'
|
||||
# and 'assignment_timestamp'
|
||||
filtered_tx = block_maker.filter_tx(tx)
|
||||
assert filtered_tx == tx
|
||||
assert 'assignee' not in filtered_tx
|
||||
assert 'assignment_timestamp' not in filtered_tx
|
||||
|
||||
tx = signed_create_tx.to_dict()
|
||||
tx.update({'assignee': 'nobody'})
|
||||
|
||||
assert block_maker.filter_tx(tx) is None
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_validate_transaction(b, create_tx):
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
|
||||
block_maker = BlockPipeline()
|
||||
|
||||
assert block_maker.validate_tx(create_tx.to_dict()) is None
|
||||
|
||||
valid_tx = create_tx.sign([b.me_private])
|
||||
assert block_maker.validate_tx(valid_tx.to_dict()) == valid_tx
|
||||
|
||||
|
||||
def test_validate_transaction_handles_exceptions(b, signed_create_tx):
|
||||
"""This test makes sure that `BlockPipeline.validate_tx` handles possible
|
||||
exceptions from `Transaction.from_dict`.
|
||||
"""
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
block_maker = BlockPipeline()
|
||||
from bigchaindb.common.exceptions import ValidationError
|
||||
|
||||
tx_dict = signed_create_tx.to_dict()
|
||||
|
||||
with patch('bigchaindb.models.Transaction.validate') as validate:
|
||||
# Assert that validationerror gets caught
|
||||
validate.side_effect = ValidationError()
|
||||
assert block_maker.validate_tx(tx_dict) is None
|
||||
|
||||
# Assert that another error doesnt
|
||||
validate.side_effect = IOError()
|
||||
with pytest.raises(IOError):
|
||||
block_maker.validate_tx(tx_dict)
|
||||
|
||||
|
||||
def test_create_block(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
|
||||
block_maker = BlockPipeline()
|
||||
|
||||
for _ in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
block_maker.create(tx)
|
||||
|
||||
# force the output triggering a `timeout`
|
||||
block_doc = block_maker.create(None, timeout=True)
|
||||
|
||||
assert len(block_doc.transactions) == 100
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_write_block(b, user_pk):
|
||||
from bigchaindb.models import Block, Transaction
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
|
||||
block_maker = BlockPipeline()
|
||||
|
||||
txs = []
|
||||
for _ in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
txs.append(tx)
|
||||
|
||||
block_doc = b.create_block(txs)
|
||||
block_maker.write(block_doc)
|
||||
expected = b.get_block(block_doc.id)
|
||||
expected = Block.from_dict(expected)
|
||||
|
||||
assert expected == block_doc
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_duplicate_transaction(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import block
|
||||
block_maker = block.BlockPipeline()
|
||||
|
||||
txs = []
|
||||
for _ in range(10):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
txs.append(tx)
|
||||
|
||||
block_doc = b.create_block(txs)
|
||||
block_maker.write(block_doc)
|
||||
|
||||
# block is in bigchain
|
||||
assert b.get_block(block_doc.id) == block_doc.to_dict()
|
||||
|
||||
b.write_transaction(txs[0])
|
||||
|
||||
# verify tx is in the backlog
|
||||
assert b.get_transaction(txs[0].id) is not None
|
||||
|
||||
# try to validate a transaction that's already in the chain; should not
|
||||
# work
|
||||
assert block_maker.validate_tx(txs[0].to_dict()) is None
|
||||
|
||||
# duplicate tx should be removed from backlog
|
||||
response, status = b.get_transaction(txs[0].id, include_status=True)
|
||||
assert status != b.TX_IN_BACKLOG
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_delete_tx(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines.block import BlockPipeline
|
||||
block_maker = BlockPipeline()
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
block_maker.create(tx)
|
||||
# make sure the tx appears in the backlog
|
||||
b.write_transaction(tx)
|
||||
|
||||
# force the output triggering a `timeout`
|
||||
block_doc = block_maker.create(None, timeout=True)
|
||||
|
||||
for tx in block_doc.to_dict()['block']['transactions']:
|
||||
returned_tx = b.get_transaction(tx['id']).to_dict()
|
||||
assert returned_tx == tx
|
||||
|
||||
returned_block = block_maker.delete_tx(block_doc)
|
||||
|
||||
assert returned_block == block_doc
|
||||
|
||||
for tx in block_doc.to_dict()['block']['transactions']:
|
||||
returned_tx, status = b.get_transaction(tx['id'], include_status=True)
|
||||
assert status != b.TX_IN_BACKLOG
|
||||
|
||||
|
||||
@patch('bigchaindb.pipelines.block.create_pipeline')
|
||||
@pytest.mark.bdb
|
||||
def test_start(create_pipeline):
|
||||
from bigchaindb.pipelines import block
|
||||
|
||||
pipeline = block.start()
|
||||
assert create_pipeline.called
|
||||
assert create_pipeline.return_value.setup.called
|
||||
assert create_pipeline.return_value.start.called
|
||||
assert pipeline == create_pipeline.return_value
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_full_pipeline(b, user_pk):
|
||||
from bigchaindb.models import Block, Transaction
|
||||
from bigchaindb.pipelines.block import create_pipeline
|
||||
|
||||
outpipe = Pipe()
|
||||
|
||||
pipeline = create_pipeline()
|
||||
pipeline.setup(outdata=outpipe)
|
||||
inpipe = pipeline.items[0]
|
||||
|
||||
# include myself here, so that some tx are actually assigned to me
|
||||
b.nodes_except_me = [b.me, 'aaa', 'bbb', 'ccc']
|
||||
number_assigned_to_others = 0
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
|
||||
tx = tx.to_dict()
|
||||
|
||||
# simulate write_transaction
|
||||
tx['assignee'] = random.choice(b.nodes_except_me)
|
||||
if tx['assignee'] != b.me:
|
||||
number_assigned_to_others += 1
|
||||
tx['assignment_timestamp'] = time.time()
|
||||
inpipe.put(tx)
|
||||
|
||||
assert inpipe.qsize() == 100
|
||||
|
||||
pipeline.start()
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
pipeline.terminate()
|
||||
block_doc = outpipe.get()
|
||||
chained_block = b.get_block(block_doc.id)
|
||||
chained_block = Block.from_dict(chained_block)
|
||||
|
||||
block_len = len(block_doc.transactions)
|
||||
assert chained_block == block_doc
|
||||
assert number_assigned_to_others == 100 - block_len
|
||||
|
||||
|
||||
def test_block_snowflake(create_tx, signed_transfer_tx):
|
||||
from bigchaindb.pipelines.block import tx_collector
|
||||
snowflake = tx_collector()
|
||||
assert snowflake.send(create_tx) == [create_tx]
|
||||
snowflake.send(signed_transfer_tx)
|
||||
snowflake.send(create_tx)
|
||||
assert snowflake.send(None) == [create_tx, signed_transfer_tx]
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_duplicate_genesis_transaction(b, genesis_block, genesis_tx):
|
||||
# Try to create a duplicate GENESIS transaction
|
||||
# Expect None as it will be rejected during validation
|
||||
|
||||
from bigchaindb.pipelines import block
|
||||
block_maker = block.BlockPipeline()
|
||||
assert block_maker.validate_tx(genesis_tx.to_dict()) is None
|
@ -1,230 +0,0 @@
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from bigchaindb.common import crypto
|
||||
from multipipes import Pipe, Pipeline
|
||||
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.pipelines import election
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_check_for_quorum_invalid(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
e = election.Election()
|
||||
|
||||
# create blocks with transactions
|
||||
tx1 = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx1.sign([b.me_private])
|
||||
test_block = b.create_block([tx1])
|
||||
|
||||
# simulate a federation with four voters
|
||||
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
|
||||
test_federation = [Bigchain(public_key=key_pair[1], private_key=key_pair[0])
|
||||
for key_pair in key_pairs]
|
||||
|
||||
# add voters to block and write
|
||||
test_block.voters = [key_pair[1] for key_pair in key_pairs]
|
||||
test_block = test_block.sign(b.me_private)
|
||||
b.write_block(test_block)
|
||||
|
||||
# split_vote (invalid)
|
||||
votes = [member.vote(test_block.id, 'a' * 64, True)
|
||||
for member in test_federation[:2]] + \
|
||||
[member.vote(test_block.id, 'b' * 64, False)
|
||||
for member in test_federation[2:]]
|
||||
|
||||
# cast votes
|
||||
for vote in votes:
|
||||
b.write_vote(vote)
|
||||
|
||||
# since this block is now invalid, should pass to the next process
|
||||
assert e.check_for_quorum(votes[-1]) == test_block
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_check_for_quorum_invalid_prev_node(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
e = election.Election()
|
||||
|
||||
# create blocks with transactions
|
||||
tx1 = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx1.sign([b.me_private])
|
||||
test_block = b.create_block([tx1])
|
||||
|
||||
# simulate a federation with four voters
|
||||
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
|
||||
test_federation = [
|
||||
Bigchain(public_key=key_pair[1], private_key=key_pair[0])
|
||||
for key_pair in key_pairs
|
||||
]
|
||||
|
||||
# add voters to block and write
|
||||
test_block.voters = [key_pair[1] for key_pair in key_pairs]
|
||||
test_block = test_block.sign(b.me_private)
|
||||
b.write_block(test_block)
|
||||
|
||||
# split vote over prev node
|
||||
votes = [member.vote(test_block.id, 'a' * 64, True)
|
||||
for member in test_federation[:2]] + \
|
||||
[member.vote(test_block.id, 'b' * 64, True)
|
||||
for member in test_federation[2:]]
|
||||
|
||||
# cast votes
|
||||
for vote in votes:
|
||||
b.write_vote(vote)
|
||||
|
||||
# since nodes cannot agree on prev block, the block is invalid
|
||||
assert e.check_for_quorum(votes[-1]) == test_block
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_check_for_quorum_valid(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
# simulate a federation with four voters
|
||||
key_pairs = [crypto.generate_key_pair() for _ in range(4)]
|
||||
test_federation = [
|
||||
Bigchain(public_key=key_pair[1], private_key=key_pair[0])
|
||||
for key_pair in key_pairs
|
||||
]
|
||||
|
||||
b.nodes_except_me = [key_pair[1] for key_pair in key_pairs]
|
||||
|
||||
# create blocks with transactions
|
||||
tx1 = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx1.sign([b.me_private])
|
||||
test_block = b.create_block([tx1])
|
||||
|
||||
# add voters to block and write
|
||||
test_block = test_block.sign(b.me_private)
|
||||
b.write_block(test_block)
|
||||
|
||||
# votes for block one
|
||||
votes = [member.vote(test_block.id, 'a' * 64, True)
|
||||
for member in test_federation]
|
||||
# cast votes
|
||||
for vote in votes:
|
||||
b.write_vote(vote)
|
||||
|
||||
e = election.Election()
|
||||
e.bigchain = b
|
||||
|
||||
# since this block is valid, should go nowhere
|
||||
assert e.check_for_quorum(votes[-1]) is None
|
||||
|
||||
|
||||
@patch('bigchaindb.core.Bigchain.get_block')
|
||||
def test_invalid_vote(get_block, b):
|
||||
e = election.Election()
|
||||
assert e.check_for_quorum({}) is None
|
||||
get_block.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_check_requeue_transaction(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
e = election.Election()
|
||||
|
||||
# create blocks with transactions
|
||||
tx1 = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx1.sign([b.me_private])
|
||||
test_block = b.create_block([tx1])
|
||||
|
||||
e.requeue_transactions(test_block)
|
||||
|
||||
time.sleep(1)
|
||||
backlog_tx, status = b.get_transaction(tx1.id, include_status=True)
|
||||
assert status == b.TX_IN_BACKLOG
|
||||
assert backlog_tx == tx1
|
||||
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
def test_start(mock_start):
|
||||
# TODO: `block.election` is just a wrapper around `block.create_pipeline`,
|
||||
# that is tested by `test_full_pipeline`.
|
||||
# If anyone has better ideas on how to test this, please do a PR :)
|
||||
election.start()
|
||||
mock_start.assert_called_with()
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_full_pipeline(b, user_pk):
|
||||
import random
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
outpipe = Pipe()
|
||||
|
||||
# write two blocks
|
||||
txs = []
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
{'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
txs.append(tx)
|
||||
|
||||
valid_block = b.create_block(txs)
|
||||
b.write_block(valid_block)
|
||||
|
||||
txs = []
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
{'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
txs.append(tx)
|
||||
|
||||
invalid_block = b.create_block(txs)
|
||||
b.write_block(invalid_block)
|
||||
|
||||
pipeline = election.create_pipeline()
|
||||
pipeline.setup(indata=election.get_changefeed(), outdata=outpipe)
|
||||
pipeline.start()
|
||||
time.sleep(1)
|
||||
|
||||
# vote one block valid, one invalid
|
||||
vote_valid = b.vote(valid_block.id, 'b' * 64, True)
|
||||
vote_invalid = b.vote(invalid_block.id, 'c' * 64, False)
|
||||
|
||||
b.write_vote(vote_valid)
|
||||
b.write_vote(vote_invalid)
|
||||
|
||||
outpipe.get()
|
||||
pipeline.terminate()
|
||||
|
||||
# only transactions from the invalid block should be returned to
|
||||
# the backlog
|
||||
assert query.count_backlog(b.connection) == 100
|
||||
# NOTE: I'm still, I'm still tx from the block.
|
||||
tx_from_block = set([tx.id for tx in invalid_block.transactions])
|
||||
tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))])
|
||||
assert tx_from_block == tx_from_backlog
|
||||
|
||||
|
||||
def test_handle_block_events():
|
||||
from bigchaindb.events import Exchange, EventTypes
|
||||
|
||||
exchange = Exchange()
|
||||
events_queue = exchange.get_publisher_queue()
|
||||
e = election.Election(events_queue=events_queue)
|
||||
block_id = 'a' * 64
|
||||
|
||||
assert events_queue.qsize() == 0
|
||||
|
||||
# no event should be emitted in case a block is undecided
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id)
|
||||
assert events_queue.qsize() == 0
|
||||
|
||||
# put an invalid block event in the queue
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id)
|
||||
event = events_queue.get()
|
||||
assert event.type == EventTypes.BLOCK_INVALID
|
||||
|
||||
# put a valid block event in the queue
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id)
|
||||
event = events_queue.get()
|
||||
assert event.type == EventTypes.BLOCK_VALID
|
@ -1,134 +0,0 @@
|
||||
import random
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.pipelines import stale
|
||||
from multipipes import Pipe, Pipeline
|
||||
from unittest.mock import patch
|
||||
from bigchaindb import config_utils
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_get_stale(b, user_pk):
|
||||
from bigchaindb.models import Transaction
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
tx_stale = stm.check_transactions()
|
||||
|
||||
for _tx in tx_stale:
|
||||
_tx.pop('assignee')
|
||||
_tx.pop('assignment_timestamp')
|
||||
assert tx.to_dict() == _tx
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_reassign_transactions(b, user_pk):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.models import Transaction
|
||||
# test with single node
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
# This worked previously because transaction['assignee'] was only used if
|
||||
# bigchain.nodes_except_me was not empty.
|
||||
tx_dict = tx.to_dict()
|
||||
tx_dict['assignee'] = b.me
|
||||
stm.reassign_transactions(tx_dict)
|
||||
|
||||
# test with federation
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
|
||||
tx = list(query.get_stale_transactions(b.connection, 0))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
|
||||
reassigned_tx = list(query.get_stale_transactions(b.connection, 0))[0]
|
||||
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
|
||||
assert reassigned_tx['assignee'] != tx['assignee']
|
||||
|
||||
# test with node not in federation
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
stm.bigchain.nodes_except_me = ['lol']
|
||||
b.write_transaction(tx)
|
||||
stm.bigchain.nodes_except_me = []
|
||||
|
||||
tx = list(query.get_stale_transactions(b.connection, 0))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
assert tx['assignee'] != 'lol'
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_full_pipeline(monkeypatch, user_pk):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.models import Transaction
|
||||
CONFIG = {
|
||||
'keyring': ['aaa', 'bbb'],
|
||||
'backlog_reassign_delay': 0.01
|
||||
}
|
||||
config_utils.update_config(CONFIG)
|
||||
b = Bigchain()
|
||||
|
||||
original_txs = {}
|
||||
original_txc = []
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1)
|
||||
|
||||
for i in range(100):
|
||||
tx = Transaction.create([b.me], [([user_pk], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
original_txc.append(tx.to_dict())
|
||||
|
||||
b.write_transaction(tx)
|
||||
original_txs = list(query.get_stale_transactions(b.connection, 0))
|
||||
original_txs = {tx['id']: tx for tx in original_txs}
|
||||
|
||||
assert len(original_txs) == 100
|
||||
|
||||
monkeypatch.undo()
|
||||
|
||||
inpipe = Pipe()
|
||||
# Each time the StaleTransactionMonitor pipeline runs, it reassigns
|
||||
# all eligible transactions. Passing this inpipe prevents that from
|
||||
# taking place more than once.
|
||||
inpipe.put(())
|
||||
outpipe = Pipe()
|
||||
pipeline = stale.create_pipeline(backlog_reassign_delay=1,
|
||||
timeout=1)
|
||||
pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
pipeline.start()
|
||||
|
||||
# to terminate
|
||||
for _ in range(100):
|
||||
outpipe.get()
|
||||
|
||||
pipeline.terminate()
|
||||
|
||||
assert len(list(query.get_stale_transactions(b.connection, 0))) == 100
|
||||
reassigned_txs = list(query.get_stale_transactions(b.connection, 0))
|
||||
|
||||
# check that every assignment timestamp has increased, and every tx has a new assignee
|
||||
for reassigned_tx in reassigned_txs:
|
||||
assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp']
|
||||
assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee']
|
||||
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
def test_start(mock_start):
|
||||
# TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`,
|
||||
# that is tested by `test_full_pipeline`.
|
||||
# If anyone has better ideas on how to test this, please do a PR :)
|
||||
stale.start()
|
||||
mock_start.assert_called_with()
|
@ -1,45 +0,0 @@
|
||||
import pytest
|
||||
import random
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_stepping_changefeed_produces_update(b, steps):
|
||||
tx = input_single_create(b)
|
||||
|
||||
# timeouts are 0 so will reassign immediately
|
||||
steps.stale_check_transactions()
|
||||
steps.stale_reassign_transactions()
|
||||
|
||||
# We expect 2 changefeed events
|
||||
steps.block_changefeed()
|
||||
steps.block_changefeed()
|
||||
|
||||
assert steps.counts == {'block_filter_tx': 2}
|
||||
assert ([tx['id'] for (tx,) in steps.queues['block_filter_tx']] ==
|
||||
[tx.id, tx.id])
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@pytest.mark.genesis
|
||||
def test_dupe_tx_in_block(b, steps):
|
||||
tx = input_single_create(b)
|
||||
for i in range(2):
|
||||
steps.stale_check_transactions()
|
||||
steps.stale_reassign_transactions()
|
||||
steps.block_changefeed()
|
||||
steps.block_filter_tx()
|
||||
steps.block_validate_tx()
|
||||
steps.block_validate_tx()
|
||||
assert steps.counts == {'block_create': 2}
|
||||
steps.block_create(timeout=False)
|
||||
block = steps.block_create(timeout=True)
|
||||
assert block.transactions == [tx]
|
||||
|
||||
|
||||
def input_single_create(b):
|
||||
from bigchaindb.common.transaction import Transaction
|
||||
metadata = {'r': random.random()}
|
||||
tx = Transaction.create([b.me], [([b.me], 1)], metadata).sign([b.me_private])
|
||||
b.write_transaction(tx)
|
||||
return tx
|
@ -1,613 +0,0 @@
|
||||
import random
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
from multipipes import Pipe, Pipeline
|
||||
import pytest
|
||||
|
||||
|
||||
# TODO: dummy_tx and dummy_block could be fixtures
|
||||
def dummy_tx(b):
|
||||
from bigchaindb.models import Transaction
|
||||
tx = Transaction.create([b.me], [([b.me], 1)],
|
||||
metadata={'msg': random.random()})
|
||||
tx = tx.sign([b.me_private])
|
||||
return tx
|
||||
|
||||
|
||||
def dummy_block(b):
|
||||
block = b.create_block([dummy_tx(b) for _ in range(10)])
|
||||
return block
|
||||
|
||||
|
||||
def decouple_assets(b, block):
|
||||
# the block comming from the database does not contain the assets
|
||||
# so we need to pass the block without the assets and store the assets
|
||||
# so that the voting pipeline can reconstruct it
|
||||
assets, block_dict = block.decouple_assets()
|
||||
b.write_assets(assets)
|
||||
return block_dict
|
||||
|
||||
|
||||
def decouple_metadata(b, block, block_dict):
|
||||
# the block comming from the database does not contain the metadata
|
||||
# so we need to pass the block without the metadata and store the metadata
|
||||
# so that the voting pipeline can reconstruct it
|
||||
metadata, block_dict = block.decouple_metadata(block_dict)
|
||||
if metadata:
|
||||
b.write_metadata(metadata)
|
||||
return block_dict
|
||||
|
||||
|
||||
DUMMY_SHA3 = '0123456789abcdef' * 4
|
||||
|
||||
|
||||
def test_vote_creation_valid(b):
|
||||
from bigchaindb.common import crypto
|
||||
from bigchaindb.common.utils import serialize
|
||||
|
||||
# create valid block
|
||||
block = dummy_block(b)
|
||||
# retrieve vote
|
||||
vote = b.vote(block.id, DUMMY_SHA3, True)
|
||||
|
||||
# assert vote is correct
|
||||
assert vote['vote']['voting_for_block'] == block.id
|
||||
assert vote['vote']['previous_block'] == DUMMY_SHA3
|
||||
assert vote['vote']['is_block_valid'] is True
|
||||
assert vote['vote']['invalid_reason'] is None
|
||||
assert vote['node_pubkey'] == b.me
|
||||
assert isinstance(vote['signature'], str)
|
||||
assert crypto.PublicKey(b.me).verify(serialize(vote['vote']).encode(),
|
||||
vote['signature']) is True
|
||||
|
||||
|
||||
def test_vote_creation_invalid(b):
|
||||
from bigchaindb.common import crypto
|
||||
from bigchaindb.common.utils import serialize
|
||||
|
||||
# create valid block
|
||||
block = dummy_block(b)
|
||||
# retrieve vote
|
||||
vote = b.vote(block.id, DUMMY_SHA3, False)
|
||||
|
||||
# assert vote is correct
|
||||
assert vote['vote']['voting_for_block'] == block.id
|
||||
assert vote['vote']['previous_block'] == DUMMY_SHA3
|
||||
assert vote['vote']['is_block_valid'] is False
|
||||
assert vote['vote']['invalid_reason'] is None
|
||||
assert vote['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialize(vote['vote']).encode(),
|
||||
vote['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_vote_ungroup_returns_a_set_of_results(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
block = dummy_block(b)
|
||||
vote_obj = vote.Vote()
|
||||
txs = list(vote_obj.ungroup(block.id, block.transactions))
|
||||
|
||||
assert len(txs) == 10
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_vote_validate_block(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
tx = dummy_tx(b)
|
||||
block = b.create_block([tx])
|
||||
block_dict = decouple_assets(b, block)
|
||||
block_dict = decouple_metadata(b, block, block_dict)
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
validation = vote_obj.validate_block(block_dict)
|
||||
assert validation[0] == block.id
|
||||
for tx1, tx2 in zip(validation[1], block.transactions):
|
||||
assert tx1 == tx2.to_dict()
|
||||
|
||||
block = b.create_block([tx])
|
||||
# NOTE: Setting a blocks signature to `None` invalidates it.
|
||||
block.signature = None
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
validation = vote_obj.validate_block(block.to_dict())
|
||||
assert validation[0] == block.id
|
||||
for tx1, tx2 in zip(validation[1], [vote_obj.invalid_dummy_tx]):
|
||||
assert tx1 == tx2
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_validate_block_with_invalid_id(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
tx = dummy_tx(b)
|
||||
block = b.create_block([tx]).to_dict()
|
||||
block['id'] = 'an invalid id'
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
block_id, invalid_dummy_tx = vote_obj.validate_block(block)
|
||||
assert block_id == block['id']
|
||||
assert invalid_dummy_tx == [vote_obj.invalid_dummy_tx]
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_validate_block_with_duplicated_transactions(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
tx = dummy_tx(b)
|
||||
block = b.create_block([tx, tx]).to_dict()
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
block_id, invalid_dummy_tx = vote_obj.validate_block(block)
|
||||
assert invalid_dummy_tx == [vote_obj.invalid_dummy_tx]
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_validate_block_with_invalid_signature(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
tx = dummy_tx(b)
|
||||
block = b.create_block([tx]).to_dict()
|
||||
block['signature'] = 'an invalid signature'
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
block_id, invalid_dummy_tx = vote_obj.validate_block(block)
|
||||
assert block_id == block['id']
|
||||
assert invalid_dummy_tx == [vote_obj.invalid_dummy_tx]
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_vote_validate_transaction(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
from bigchaindb.common.exceptions import ValidationError
|
||||
|
||||
tx = dummy_tx(b).to_dict()
|
||||
vote_obj = vote.Vote()
|
||||
validation = vote_obj.validate_tx(tx, 123, 1)
|
||||
assert validation == (True, 123, 1)
|
||||
|
||||
with patch('bigchaindb.models.Transaction.validate') as validate:
|
||||
# Assert that validationerror gets caught
|
||||
validate.side_effect = ValidationError()
|
||||
validation = vote_obj.validate_tx(tx, 456, 10)
|
||||
assert validation == (False, 456, 10)
|
||||
|
||||
# Assert that another error doesnt
|
||||
validate.side_effect = IOError()
|
||||
with pytest.raises(IOError):
|
||||
validation = vote_obj.validate_tx(tx, 456, 10)
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_vote_accumulates_transactions(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
vote_obj = vote.Vote()
|
||||
|
||||
tx = dummy_tx(b)
|
||||
|
||||
validation = vote_obj.validate_tx(tx.to_dict(), 123, 1)
|
||||
assert validation == (True, 123, 1)
|
||||
|
||||
tx.inputs[0].fulfillment.signature = 64*b'z'
|
||||
validation = vote_obj.validate_tx(tx.to_dict(), 456, 10)
|
||||
assert validation == (False, 456, 10)
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_valid_block_voting_sequential(b, genesis_block, monkeypatch):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_obj = vote.Vote()
|
||||
block = dummy_block(b).to_dict()
|
||||
txs = block['block']['transactions']
|
||||
|
||||
for tx, block_id, num_tx in vote_obj.ungroup(block['id'], txs):
|
||||
last_vote = vote_obj.vote(*vote_obj.validate_tx(tx, block_id, num_tx))
|
||||
|
||||
vote_obj.write_vote(*last_vote)
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block_id, b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': True,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_valid_block_voting_multiprocessing(b, genesis_block, monkeypatch):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
block = dummy_block(b)
|
||||
block_dict = decouple_assets(b, block)
|
||||
block_dict = decouple_metadata(b, block, block_dict)
|
||||
|
||||
inpipe.put(block_dict)
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': True,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_valid_block_voting_with_create_transaction(b,
|
||||
genesis_block,
|
||||
monkeypatch):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
# create a `CREATE` transaction
|
||||
test_user_priv, test_user_pub = crypto.generate_key_pair()
|
||||
tx = Transaction.create([b.me], [([test_user_pub], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
block = b.create_block([tx])
|
||||
block_dict = decouple_assets(b, block)
|
||||
block_dict = decouple_metadata(b, block, block_dict)
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
inpipe.put(block_dict)
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': True,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_valid_block_voting_with_transfer_transactions(monkeypatch,
|
||||
b, genesis_block):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
# create a `CREATE` transaction
|
||||
test_user_priv, test_user_pub = crypto.generate_key_pair()
|
||||
tx = Transaction.create([b.me], [([test_user_pub], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1000000000)
|
||||
block = b.create_block([tx])
|
||||
b.write_block(block)
|
||||
|
||||
# create a `TRANSFER` transaction
|
||||
test_user2_priv, test_user2_pub = crypto.generate_key_pair()
|
||||
tx2 = Transaction.transfer(tx.to_inputs(), [([test_user2_pub], 1)],
|
||||
asset_id=tx.id)
|
||||
tx2 = tx2.sign([test_user_priv])
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 2000000000)
|
||||
block2 = b.create_block([tx2])
|
||||
b.write_block(block2)
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
vote_pipeline.start()
|
||||
inpipe.put(block.to_dict())
|
||||
time.sleep(1)
|
||||
inpipe.put(block2.to_dict())
|
||||
vote_out = outpipe.get()
|
||||
vote2_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': True,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '2000000000'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
vote2_rs = query.get_votes_by_block_id_and_voter(b.connection, block2.id, b.me)
|
||||
vote2_doc = vote2_rs.next()
|
||||
assert vote2_out['vote'] == vote2_doc['vote']
|
||||
assert vote2_doc['vote'] == {'voting_for_block': block2.id,
|
||||
'previous_block': block.id,
|
||||
'is_block_valid': True,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '2000000000'}
|
||||
|
||||
serialized_vote2 = utils.serialize(vote2_doc['vote']).encode()
|
||||
assert vote2_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote2,
|
||||
vote2_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason=('Needs important modification following issue #1891:'
|
||||
'https://github.com/bigchaindb/bigchaindb/issues/1891')
|
||||
)
|
||||
@pytest.mark.bdb
|
||||
def test_unsigned_tx_in_block_voting(monkeypatch, b, user_pk, genesis_block):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
# NOTE: `tx` is invalid, because it wasn't signed.
|
||||
tx = Transaction.create([b.me], [([b.me], 1)])
|
||||
block = b.create_block([tx])
|
||||
|
||||
inpipe.put(block.to_dict())
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block.id, b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block.id,
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': False,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_invalid_id_tx_in_block_voting(monkeypatch, b, user_pk, genesis_block):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
# NOTE: `tx` is invalid, because its id is not corresponding to its content
|
||||
tx = Transaction.create([b.me], [([b.me], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
block = b.create_block([tx]).to_dict()
|
||||
block['block']['transactions'][0]['id'] = 'an invalid tx id'
|
||||
|
||||
inpipe.put(block)
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': False,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_invalid_content_in_tx_in_block_voting(monkeypatch, b,
|
||||
user_pk, genesis_block):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.models import Transaction
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
# NOTE: `tx` is invalid, because its content is not corresponding to its id
|
||||
tx = Transaction.create([b.me], [([b.me], 1)])
|
||||
tx = tx.sign([b.me_private])
|
||||
block = b.create_block([tx]).to_dict()
|
||||
block['block']['transactions'][0]['id'] = 'an invalid tx id'
|
||||
|
||||
inpipe.put(block)
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': False,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_invalid_block_voting(monkeypatch, b, user_pk, genesis_block):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.common import crypto, utils
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1111111111)
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
|
||||
block = dummy_block(b).to_dict()
|
||||
block['block']['id'] = 'this-is-not-a-valid-hash'
|
||||
|
||||
inpipe.put(block)
|
||||
vote_pipeline.start()
|
||||
vote_out = outpipe.get()
|
||||
vote_pipeline.terminate()
|
||||
|
||||
vote_rs = query.get_votes_by_block_id_and_voter(b.connection, block['id'], b.me)
|
||||
vote_doc = vote_rs.next()
|
||||
assert vote_out['vote'] == vote_doc['vote']
|
||||
assert vote_doc['vote'] == {'voting_for_block': block['id'],
|
||||
'previous_block': genesis_block.id,
|
||||
'is_block_valid': False,
|
||||
'invalid_reason': None,
|
||||
'timestamp': '1111111111'}
|
||||
|
||||
serialized_vote = utils.serialize(vote_doc['vote']).encode()
|
||||
assert vote_doc['node_pubkey'] == b.me
|
||||
assert crypto.PublicKey(b.me).verify(serialized_vote,
|
||||
vote_doc['signature']) is True
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_voter_checks_for_previous_vote(monkeypatch, b):
|
||||
from bigchaindb.backend import query
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
inpipe = Pipe()
|
||||
outpipe = Pipe()
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1000000000)
|
||||
|
||||
monkeypatch.setattr('time.time', lambda: 1000000020)
|
||||
block_1 = dummy_block(b)
|
||||
inpipe.put(block_1.to_dict())
|
||||
assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 0
|
||||
|
||||
vote_pipeline = vote.create_pipeline()
|
||||
vote_pipeline.setup(indata=inpipe, outdata=outpipe)
|
||||
vote_pipeline.start()
|
||||
|
||||
# wait for the result
|
||||
outpipe.get()
|
||||
|
||||
# queue block for voting AGAIN
|
||||
monkeypatch.setattr('time.time', lambda: 1000000030)
|
||||
inpipe.put(block_1.to_dict())
|
||||
|
||||
# queue another block
|
||||
monkeypatch.setattr('time.time', lambda: 1000000040)
|
||||
block_2 = dummy_block(b)
|
||||
inpipe.put(block_2.to_dict())
|
||||
|
||||
# wait for the result of the new block
|
||||
outpipe.get()
|
||||
|
||||
vote_pipeline.terminate()
|
||||
|
||||
assert len(list(query.get_votes_by_block_id(b.connection, block_1.id))) == 1
|
||||
assert len(list(query.get_votes_by_block_id(b.connection, block_2.id))) == 1
|
||||
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
@pytest.mark.genesis
|
||||
def test_start(mock_start, b):
|
||||
# TODO: `block.start` is just a wrapper around `vote.create_pipeline`,
|
||||
# that is tested by `test_full_pipeline`.
|
||||
# If anyone has better ideas on how to test this, please do a PR :)
|
||||
from bigchaindb.pipelines import vote
|
||||
vote.start()
|
||||
mock_start.assert_called_with()
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_vote_no_double_inclusion(b):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
tx = dummy_tx(b)
|
||||
block = b.create_block([tx])
|
||||
r = vote.Vote().validate_tx(tx.to_dict(), block.id, 1)
|
||||
assert r == (True, block.id, 1)
|
||||
|
||||
b.write_block(block)
|
||||
r = vote.Vote().validate_tx(tx.to_dict(), 'other_block_id', 1)
|
||||
assert r == (False, 'other_block_id', 1)
|
||||
|
||||
|
||||
@pytest.mark.genesis
|
||||
def test_duplicate_transaction(signed_create_tx):
|
||||
from bigchaindb.pipelines import vote
|
||||
|
||||
with patch('bigchaindb.core.Bigchain.is_new_transaction') as is_new:
|
||||
is_new.return_value = False
|
||||
res = vote.Vote().validate_tx(signed_create_tx.to_dict(), 'a', 1)
|
||||
assert res == (False, 'a', 1)
|
||||
assert is_new.call_count == 1
|
@ -1,47 +0,0 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from multiprocessing import Process
|
||||
from bigchaindb.pipelines import vote, block, election, stale
|
||||
|
||||
|
||||
@patch.object(stale, 'start')
|
||||
@patch.object(election, 'start')
|
||||
@patch.object(block, 'start')
|
||||
@patch.object(vote, 'start')
|
||||
@patch.object(Process, 'start')
|
||||
@patch('bigchaindb.events.Exchange.get_publisher_queue', spec_set=True, autospec=True)
|
||||
@patch('bigchaindb.events.Exchange.run', spec_set=True, autospec=True)
|
||||
def test_processes_start(mock_exchange_run, mock_exchange, mock_process, mock_vote,
|
||||
mock_block, mock_election, mock_stale):
|
||||
from bigchaindb import processes
|
||||
|
||||
processes.start()
|
||||
|
||||
mock_vote.assert_called_with()
|
||||
mock_block.assert_called_with()
|
||||
mock_stale.assert_called_with()
|
||||
mock_process.assert_called_with()
|
||||
mock_election.assert_called_once_with(
|
||||
events_queue=mock_exchange.return_value)
|
||||
|
||||
|
||||
@patch.object(Process, 'start')
|
||||
def test_start_events_plugins(mock_process, monkeypatch):
|
||||
|
||||
class MockPlugin:
|
||||
def __init__(self, event_types):
|
||||
self.event_types = event_types
|
||||
|
||||
def run(self, queue):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr('bigchaindb.config_utils.load_events_plugins',
|
||||
lambda names: [('one', MockPlugin(1)),
|
||||
('two', MockPlugin(2))])
|
||||
|
||||
from bigchaindb import processes
|
||||
from bigchaindb.events import Exchange
|
||||
|
||||
exchange = Exchange()
|
||||
processes.start_events_plugins(exchange)
|
||||
assert len(exchange.queues) == 2
|
Loading…
x
Reference in New Issue
Block a user