Merge remote-tracking branch 'refs/remotes/bigchaindb/master'

This commit is contained in:
Elad-PC\elad 2016-08-03 06:18:33 +02:00
commit 42e0aef447
21 changed files with 704 additions and 531 deletions

View File

@ -11,14 +11,55 @@ For reference, the possible headings are:
* **Removed** for deprecated features removed in this release.
* **Fixed** for any bug fixes.
* **Security** to invite users to upgrade in case of vulnerabilities.
* **External Contributors** to list contributors outside of ascribe GmbH.
* **External Contributors** to list contributors outside of BigchainDB GmbH.
* **Notes**
## [0.5.0] - YYYY-MM-DD
Tag name: v0.5.0
## [0.5.1] - YYYY-MM-DD
Tag name: v0.5.1
= commit:
committed:
committed:
### Added
- New third table, the 'votes' table: [Pull Request #379](https://github.com/bigchaindb/bigchaindb/pull/379)
- Added `get_tx_by_payload_uuid()` including index: [Pull Request #411](https://github.com/bigchaindb/bigchaindb/pull/411)
- Ability to deploy a test cluster on AWS using Amazon Elastic Block Store (EBS) for storage: [Pull Request #469](https://github.com/bigchaindb/bigchaindb/pull/469)
- Ability to add different-size payloads to transactions when filling the backlog for benchmarking: [Pull Request #273](https://github.com/bigchaindb/bigchaindb/pull/273)
### Changed
- Votes are no longer appended to the blocks inside the 'bigchain' table. They're now written to their own table, the 'votes' table: [Pull Request #379](https://github.com/bigchaindb/bigchaindb/pull/379)
- Refactored how blocks get constructed using the new approach to doing multiprocessing, using the `multipipes` package: [Pull Request #484](https://github.com/bigchaindb/bigchaindb/pull/484)
- Changed all queries to use `read_mode='majority'`: [Pull Request #497](https://github.com/bigchaindb/bigchaindb/pull/497)
- Revised how base software gets deployed on AWS instances: [Pull Request #478](https://github.com/bigchaindb/bigchaindb/pull/478)
- Refactored `db.utils.init()`: [Pull Request #471](https://github.com/bigchaindb/bigchaindb/pull/471)
### External Contributors
- @shauns - [Pull Request #411](https://github.com/bigchaindb/bigchaindb/pull/411)
- @lonelypeanut - [Pull Request #479](https://github.com/bigchaindb/bigchaindb/pull/479)
- @lluminita - Pull Requests [#435](https://github.com/bigchaindb/bigchaindb/pull/435) & [#471](https://github.com/bigchaindb/bigchaindb/pull/471)
### Notes
- Several additions and changes to the documentation: Pull Requests
[#416](https://github.com/bigchaindb/bigchaindb/pull/416),
[#417](https://github.com/bigchaindb/bigchaindb/pull/417),
[#418](https://github.com/bigchaindb/bigchaindb/pull/418),
[#420](https://github.com/bigchaindb/bigchaindb/pull/420),
[#421](https://github.com/bigchaindb/bigchaindb/pull/421),
[#422](https://github.com/bigchaindb/bigchaindb/pull/422),
[#423](https://github.com/bigchaindb/bigchaindb/pull/423),
[#425](https://github.com/bigchaindb/bigchaindb/pull/425),
[#428](https://github.com/bigchaindb/bigchaindb/pull/428),
[#430](https://github.com/bigchaindb/bigchaindb/pull/430),
[#431](https://github.com/bigchaindb/bigchaindb/pull/431),
[#435](https://github.com/bigchaindb/bigchaindb/pull/435),
[#442](https://github.com/bigchaindb/bigchaindb/pull/442),
[#472](https://github.com/bigchaindb/bigchaindb/pull/472),
[#481](https://github.com/bigchaindb/bigchaindb/pull/481)
## [0.5.0] - 2016-07-04
Tag name: v0.5.0
= commit: 38329531304952128b48f2e5603db5fa08069c26
committed: July 4, 2016, 1:07 PM GMT+2
### Added
- New `bigchaindb set-replicas` subcommand: [Pull Request #392](https://github.com/bigchaindb/bigchaindb/pull/392)

View File

@ -14,16 +14,27 @@ from bigchaindb.util import ProcessGroup
from bigchaindb.commands import utils
SIZE_OF_FILLER = {'minimal': 0,
'small': 10**3,
'medium': 10**4,
'large': 10**5}
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_write_transaction(tx_left):
def create_write_transaction(tx_left, payload_filler):
b = Bigchain()
payload_dict = {}
if payload_filler:
payload_dict['filler'] = payload_filler
while tx_left > 0:
# use uuid to prevent duplicate transactions (transactions with the same hash)
# Include a random uuid string in the payload to prevent duplicate
# transactions (i.e. transactions with the same hash)
payload_dict['msg'] = str(uuid.uuid4())
tx = b.create_transaction(b.me, b.me, None, 'CREATE',
payload={'msg': str(uuid.uuid4())})
payload=payload_dict)
tx_signed = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx_signed)
tx_left -= 1
@ -31,7 +42,9 @@ def create_write_transaction(tx_left):
def run_add_backlog(args):
tx_left = args.num_transactions // mp.cpu_count()
workers = ProcessGroup(target=create_write_transaction, args=(tx_left,))
payload_filler = 'x' * SIZE_OF_FILLER[args.payload_size]
workers = ProcessGroup(target=create_write_transaction,
args=(tx_left, payload_filler))
workers.start()
@ -105,8 +118,13 @@ def main():
# add transactions to backlog
backlog_parser = subparsers.add_parser('add-backlog',
help='Add transactions to the backlog')
backlog_parser.add_argument('num_transactions', metavar='num_transactions', type=int, default=0,
backlog_parser.add_argument('num_transactions', metavar='num_transactions',
type=int, default=0,
help='Number of transactions to add to the backlog')
backlog_parser.add_argument('-s', '--payload-size',
choices=SIZE_OF_FILLER.keys(),
default='minimal',
help='Payload size')
# set statsd host
statsd_parser = subparsers.add_parser('set-statsd-host',

View File

@ -13,223 +13,6 @@ from bigchaindb.util import ProcessGroup
logger = logging.getLogger(__name__)
class Block(object):
def __init__(self, q_new_transaction):
"""
Initialize the class with the needed
"""
self._q_new_transaction = q_new_transaction
self.q_new_transaction = None
self.q_tx_to_validate = mp.Queue()
self.q_tx_validated = mp.Queue()
self.q_tx_delete = mp.Queue()
self.q_block = mp.Queue()
self.initialized = mp.Event()
self.monitor = Monitor()
def filter_by_assignee(self):
"""
Handle transactions that are assigned to me
"""
# create a bigchain instance
b = Bigchain()
while True:
tx = self.q_new_transaction.get()
# poison pill
if tx == 'stop':
self.q_tx_to_validate.put('stop')
return
if tx['assignee'] == b.me:
tx.pop('assignee')
self.q_tx_to_validate.put(tx)
def validate_transactions(self):
"""
Checks if the incoming transactions are valid
"""
# create a bigchain instance
b = Bigchain()
while True:
self.monitor.gauge('tx_queue_gauge',
self.q_tx_to_validate.qsize(),
rate=bigchaindb.config['statsd']['rate'])
tx = self.q_tx_to_validate.get()
# poison pill
if tx == 'stop':
self.q_tx_delete.put('stop')
self.q_tx_validated.put('stop')
return
self.q_tx_delete.put(tx['id'])
with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']):
is_valid_transaction = b.is_valid_transaction(tx)
if is_valid_transaction:
self.q_tx_validated.put(tx)
def create_blocks(self):
"""
Create a block with valid transactions
"""
# create a bigchain instance
b = Bigchain()
stop = False
while True:
# read up to 1000 transactions
validated_transactions = []
for i in range(1000):
try:
tx = self.q_tx_validated.get(timeout=5)
except queue.Empty:
break
# poison pill
if tx == 'stop':
stop = True
break
validated_transactions.append(tx)
# if there are no transactions skip block creation
if validated_transactions:
# create block
block = b.create_block(validated_transactions)
self.q_block.put(block)
if stop:
self.q_block.put('stop')
return
def write_blocks(self):
"""
Write blocks to the bigchain
"""
# create bigchain instance
b = Bigchain()
# Write blocks
while True:
block = self.q_block.get()
# poison pill
if block == 'stop':
return
with self.monitor.timer('write_block'):
b.write_block(block)
def delete_transactions(self):
"""
Delete transactions from the backlog
"""
# create bigchain instance
b = Bigchain()
stop = False
while True:
# try to delete in batch to reduce io
tx_to_delete = []
for i in range(1000):
try:
tx = self.q_tx_delete.get(timeout=5)
except queue.Empty:
break
# poison pill
if tx == 'stop':
stop = True
break
tx_to_delete.append(tx)
if tx_to_delete:
r.table('backlog').get_all(*tx_to_delete).delete(durability='soft').run(b.conn)
if stop:
return
def bootstrap(self):
"""
Get transactions from the backlog that may have been assigned to this while it was
online (not listening to the changefeed)
"""
# create bigchain instance
b = Bigchain()
# create a queue to store initial results
q_initial = mp.Queue()
# get initial results
initial_results = r.table('backlog')\
.between([b.me, r.minval], [b.me, r.maxval], index='assignee__transaction_timestamp')\
.order_by(index=r.asc('assignee__transaction_timestamp'))\
.run(b.conn)
# add results to the queue
for result in initial_results:
q_initial.put(result)
for i in range(mp.cpu_count()):
q_initial.put('stop')
return q_initial
def start(self):
"""
Bootstrap and start the processes
"""
logger.info('bootstraping block module...')
self.q_new_transaction = self.bootstrap()
logger.info('finished reading past transactions')
self._start()
logger.info('finished bootstraping block module...')
logger.info('starting block module...')
self.q_new_transaction = self._q_new_transaction
# signal initialization complete
self.initialized.set()
self._start()
logger.info('exiting block module...')
def kill(self):
for i in range(mp.cpu_count()):
self.q_new_transaction.put('stop')
def _start(self):
"""
Initialize, spawn, and start the processes
"""
# initialize the processes
p_filter = ProcessGroup(name='filter_transactions', target=self.filter_by_assignee)
p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions)
p_blocks = ProcessGroup(name='create_blocks', target=self.create_blocks)
p_write = ProcessGroup(name='write_blocks', target=self.write_blocks)
p_delete = ProcessGroup(name='delete_transactions', target=self.delete_transactions)
# start the processes
p_filter.start()
p_validate.start()
p_blocks.start()
p_write.start()
p_delete.start()
class BlockDeleteRevert(object):
def __init__(self, q_delete_to_revert):

View File

@ -51,6 +51,8 @@ class Bigchain(object):
self.me_private = private_key or bigchaindb.config['keypair']['private']
self.nodes_except_me = keyring or bigchaindb.config['keyring']
self.consensus = config_utils.load_consensus_plugin(consensus_plugin)
# change RethinkDB read mode to majority. This ensures consistency in query results
self.read_mode = 'majority'
if not self.me or not self.me_private:
raise exceptions.KeypairNotFoundException()
@ -162,8 +164,9 @@ class Bigchain(object):
break
# Query the transaction in the target block and return
response = r.table('bigchain').get(target_block_id).get_field('block')\
.get_field('transactions').filter(lambda tx: tx['id'] == txid).run(self.conn)
response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\
.get_field('block').get_field('transactions')\
.filter(lambda tx: tx['id'] == txid).run(self.conn)
return response[0]
@ -181,7 +184,7 @@ class Bigchain(object):
A list of blocks with with only election information
"""
# First, get information on all blocks which contain this transaction
response = r.table('bigchain').get_all(value, index=index)\
response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\
.pluck('votes', 'id', {'block': ['voters']}).run(self.conn)
return list(response)
@ -235,7 +238,7 @@ class Bigchain(object):
A list of transactions containing that payload. If no transaction exists with that payload it
returns an empty list `[]`
"""
cursor = r.table('bigchain') \
cursor = r.table('bigchain', read_mode=self.read_mode) \
.get_all(payload_uuid, index='payload_uuid') \
.concat_map(lambda block: block['block']['transactions']) \
.filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \
@ -258,7 +261,8 @@ class Bigchain(object):
"""
# checks if an input was already spent
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
response = r.table('bigchain').concat_map(lambda doc: doc['block']['transactions'])\
response = r.table('bigchain', read_mode=self.read_mode)\
.concat_map(lambda doc: doc['block']['transactions'])\
.filter(lambda transaction: transaction['transaction']['fulfillments']
.contains(lambda fulfillment: fulfillment['input'] == tx_input))\
.run(self.conn)
@ -296,7 +300,7 @@ class Bigchain(object):
"""
# get all transactions in which owner is in the `new_owners` list
response = r.table('bigchain') \
response = r.table('bigchain', read_mode=self.read_mode) \
.concat_map(lambda doc: doc['block']['transactions']) \
.filter(lambda tx: tx['transaction']['conditions']
.contains(lambda c: c['new_owners']
@ -353,7 +357,7 @@ class Bigchain(object):
transaction (dict): transaction to check.
Returns:
bool: `True` if the transaction is valid, `False` otherwise
`transaction` if the transaction is valid, `False` otherwise
"""
try:
@ -444,7 +448,8 @@ class Bigchain(object):
but the vote is invalid.
"""
votes = list(r.table('votes').get_all([block['id'], self.me], index='block_and_voter').run(self.conn))
votes = list(r.table('votes', read_mode=self.read_mode)\
.get_all([block['id'], self.me], index='block_and_voter').run(self.conn))
if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
@ -489,7 +494,8 @@ class Bigchain(object):
# TODO: Decide if we need this method
def transaction_exists(self, transaction_id):
response = r.table('bigchain').get_all(transaction_id, index='transaction_id').run(self.conn)
response = r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id').run(self.conn)
return True if len(response.items) > 0 else False
def prepare_genesis_block(self):
@ -516,7 +522,7 @@ class Bigchain(object):
# 2. create the block with one transaction
# 3. write the block to the bigchain
blocks_count = r.table('bigchain').count().run(self.conn)
blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn)
if blocks_count:
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
@ -575,19 +581,19 @@ class Bigchain(object):
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = r.table('votes') \
max_timestamp = r.table('votes', read_mode=self.read_mode) \
.filter(r.row['node_pubkey'] == self.me) \
.max(r.row['vote']['timestamp']) \
.run(self.conn)['vote']['timestamp']
last_voted = list(r.table('votes') \
last_voted = list(r.table('votes', read_mode=self.read_mode) \
.filter(r.row['vote']['timestamp'] == max_timestamp) \
.filter(r.row['node_pubkey'] == self.me) \
.run(self.conn))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return list(r.table('bigchain')
return list(r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)
.run(self.conn))[0]
@ -621,15 +627,16 @@ class Bigchain(object):
except KeyError:
break
res = r.table('bigchain').get(last_block_id).run(self.conn)
res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn)
return res
def get_unvoted_blocks(self):
"""Return all the blocks that has not been voted by this node."""
unvoted = r.table('bigchain') \
.filter(lambda block: r.table('votes').get_all([block['id'], self.me], index='block_and_voter')
unvoted = r.table('bigchain', read_mode=self.read_mode) \
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], self.me], index='block_and_voter')
.is_empty()) \
.order_by(r.asc(r.row['block']['timestamp'])) \
.run(self.conn)
@ -643,7 +650,7 @@ class Bigchain(object):
def block_election_status(self, block):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = r.table('votes') \
votes = r.table('votes', read_mode=self.read_mode) \
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \
.run(self.conn)

View File

View File

@ -0,0 +1,159 @@
"""This module takes care of all the logic related to block creation.
The logic is encapsulated in the ``Block`` class, while the sequence
of actions to do on transactions is specified in the ``create_pipeline``
function.
"""
import logging
import rethinkdb as r
from multipipes import Pipeline, Node
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class Block:
"""This class encapsulates the logic to create blocks.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self):
"""Initialize the Block creator"""
self.bigchain = Bigchain()
self.txs = []
def filter_tx(self, tx):
"""Filter a transaction.
Args:
tx (dict): the transaction to process.
Returns:
The transaction if assigned to the current node,
``None`` otherwise.
"""
if tx['assignee'] == self.bigchain.me:
tx.pop('assignee')
return tx
def delete_tx(self, tx):
"""Delete a transaction.
Args:
tx (dict): the transaction to delete.
Returns:
The transaction.
"""
r.table('backlog')\
.get(tx['id'])\
.delete(durability='hard')\
.run(self.bigchain.conn)
return tx
def validate_tx(self, tx):
"""Validate a transaction.
Args:
tx (dict): the transaction to validate.
Returns:
The transaction if valid, ``None`` otherwise.
"""
tx = self.bigchain.is_valid_transaction(tx)
if tx:
return tx
def create(self, tx, timeout=False):
"""Create a block.
This method accumulates transactions to put in a block and outputs
a block when one of the following conditions is true:
- the size limit of the block has been reached, or
- a timeout happened.
Args:
tx (dict): the transaction to validate, might be None if
a timeout happens.
timeout (bool): ``True`` if a timeout happened
(Default: ``False``).
Returns:
The block, if a block is ready, or ``None``.
"""
if tx:
self.txs.append(tx)
if len(self.txs) == 1000 or (timeout and self.txs):
block = self.bigchain.create_block(self.txs)
self.txs = []
return block
def write(self, block):
"""Write the block to the Database.
Args:
block (dict): the block of transactions to write to the database.
Returns:
The block.
"""
logger.info('Write new block %s with %s transactions',
block['id'],
len(block['block']['transactions']))
self.bigchain.write_block(block)
return block
def initial():
"""Return old transactions from the backlog."""
b = Bigchain()
rs = r.table('backlog')\
.between([b.me, r.minval],
[b.me, r.maxval],
index='assignee__transaction_timestamp')\
.order_by(index=r.asc('assignee__transaction_timestamp'))\
.run(b.conn)
return rs
def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial())
def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
block = Block()
block_pipeline = Pipeline([
Node(block.filter_tx),
Node(block.delete_tx),
Node(block.validate_tx, fraction_of_cores=1),
Node(block.create, timeout=1),
Node(block.write),
])
return block_pipeline
def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

View File

@ -0,0 +1,60 @@
"""Utility classes and functions to work with the pipelines."""
import rethinkdb as r
from multipipes import Node
from bigchaindb import Bigchain
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a `prefeed`.
It extends the ``multipipes::Node`` class to make it pluggable in
other Pipelines instances, and it makes usage of ``self.outqueue``
to output the data.
A changefeed is a real time feed on inserts, updates, and deletes, and
it's volatile. This class is a helper to create changefeeds. Moreover
it provides a way to specify a `prefeed`, that is a set of data (iterable)
to output before the actual changefeed.
"""
INSERT = 'insert'
DELETE = 'delete'
UPDATE = 'update'
def __init__(self, table, operation, prefeed=None):
"""Create a new RethinkDB ChangeFeed.
Args:
table (str): name of the table to listen to for changes.
operation (str): can be ChangeFeed.INSERT, ChangeFeed.DELETE, or
ChangeFeed.UPDATE.
prefeed (iterable): whatever set of data you want to be published
first.
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
for change in r.table(self.table).changes().run(self.bigchain.conn):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete
if is_insert and self.operation == ChangeFeed.INSERT:
self.outqueue.put(change['new_val'])
elif is_delete and self.operation == ChangeFeed.DELETE:
self.outqueue.put(change['old_val'])
elif is_update and self.operation == ChangeFeed.UPDATE:
self.outqueue.put(change)

View File

@ -4,9 +4,10 @@ import multiprocessing as mp
import rethinkdb as r
import bigchaindb
from bigchaindb.pipelines import block
from bigchaindb import Bigchain
from bigchaindb.voter import Voter, Election
from bigchaindb.block import Block, BlockDeleteRevert
from bigchaindb.block import BlockDeleteRevert
from bigchaindb.web import server
@ -30,31 +31,9 @@ class Processes(object):
def __init__(self):
# initialize the class
self.q_new_block = mp.Queue()
self.q_new_transaction = mp.Queue()
self.q_block_new_vote = mp.Queue()
self.q_revert_delete = mp.Queue()
def map_backlog(self):
# listen to changes on the backlog and redirect the changes
# to the correct queues
# create a bigchain instance
b = Bigchain()
for change in r.table('backlog').changes().run(b.conn):
# insert
if change['old_val'] is None:
self.q_new_transaction.put(change['new_val'])
# delete
if change['new_val'] is None:
pass
# update
if change['new_val'] is not None and change['old_val'] is not None:
pass
def map_bigchain(self):
# listen to changes on the bigchain and redirect the changes
# to the correct queues
@ -80,8 +59,6 @@ class Processes(object):
def start(self):
logger.info('Initializing BigchainDB...')
# instantiate block and voter
block = Block(self.q_new_transaction)
delete_reverter = BlockDeleteRevert(self.q_revert_delete)
# start the web api
@ -91,19 +68,15 @@ class Processes(object):
# initialize the processes
p_map_bigchain = mp.Process(name='bigchain_mapper', target=self.map_bigchain)
p_map_backlog = mp.Process(name='backlog_mapper', target=self.map_backlog)
p_block = mp.Process(name='block', target=block.start)
p_block_delete_revert = mp.Process(name='block_delete_revert', target=delete_reverter.start)
p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote)
# start the processes
logger.info('starting bigchain mapper')
p_map_bigchain.start()
logger.info('starting backlog mapper')
p_map_backlog.start()
logger.info('starting block')
p_block.start()
block.start()
p_block_delete_revert.start()
logger.info('starting voter')
@ -112,6 +85,5 @@ class Processes(object):
p_election.start()
# start message
block.initialized.wait()
p_voter.initialized.wait()
logger.info(BANNER.format(bigchaindb.config['server']['bind']))

View File

@ -1,2 +1,2 @@
__version__ = '0.5.0'
__version__ = '0.5.1'
__short_version__ = '0.5'

View File

@ -32,6 +32,11 @@ echo "WHAT_TO_DEPLOY = "$WHAT_TO_DEPLOY
echo "USE_KEYPAIRS_FILE = "$USE_KEYPAIRS_FILE
echo "IMAGE_ID = "$IMAGE_ID
echo "INSTANCE_TYPE = "$INSTANCE_TYPE
echo "USING_EBS = "$USING_EBS
if [ "$USING_EBS" = True ]; then
echo "EBS_VOLUME_SIZE = "$EBS_VOLUME_SIZE
echo "EBS_OPTIMIZED = "$EBS_OPTIMIZED
fi
# Check for AWS private key file (.pem file)
if [ ! -f "pem/bigchaindb.pem" ]; then
@ -95,8 +100,12 @@ fab upgrade_setuptools
if [ "$WHAT_TO_DEPLOY" == "servers" ]; then
# (Re)create the RethinkDB configuration file conf/rethinkdb.conf
python create_rethinkdb_conf.py
# Rollout storage backend (RethinkDB) and start it
# Rollout RethinkDB and start it
fab prep_rethinkdb_storage:$USING_EBS
fab install_rethinkdb
fab configure_rethinkdb
fab delete_rethinkdb_data
fab start_rethinkdb
fi
# Rollout BigchainDB (but don't start it yet)
@ -148,6 +157,8 @@ if [ "$WHAT_TO_DEPLOY" == "servers" ]; then
# definition of init_bigchaindb() in fabfile.py to see why.
fab init_bigchaindb
fab set_shards:$NUM_NODES
echo "To set the replication factor to 3, do: fab set_replicas:3"
echo "To start BigchainDB on all the nodes, do: fab start_bigchaindb"
else
# Deploying clients
# The only thing to configure on clients is the api_endpoint

View File

@ -18,7 +18,7 @@
# NUM_NODES is the number of nodes to deploy
NUM_NODES=3
# PYPI_OR_BRANCH is either "pypi" or the name of a local Git branch
# BRANCH is either "pypi" or the name of a local Git branch
# (e.g. "master" or "feat/3627/optional-delimiter-in-txfile")
# It's where to get the BigchainDB code to be deployed on the nodes
BRANCH="master"
@ -49,3 +49,19 @@ IMAGE_ID="ami-accff2b1"
# Examples: "m3.2xlarge", "c3.8xlarge", "c4.8xlarge"
# For all options, see https://aws.amazon.com/ec2/instance-types/
INSTANCE_TYPE="m3.2xlarge"
# USING_EBS is True if you want to attach an Amazon EBS volume
USING_EBS=False
# EBS_VOLUME_SIZE is the size of the EBS volume to attach, in GiB
# Since we assume 'gp2' volumes (for now), the possible range is 1 to 16384
# If USING_EBS=False, EBS_VOLUME_SIZE is irrelevant and not used
EBS_VOLUME_SIZE=30
# EBS_OPTIMIZED is True or False, depending on whether you want
# EBS-optimized instances. See:
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSOptimized.html
# Not all instance types support EBS optimization.
# Setting EBS_OPTIMIZED=True may cost more, but not always.
# If USING_EBS=False, EBS_OPTIMIZED is irrelevant and not used
EBS_OPTIMIZED=False

View File

@ -97,45 +97,94 @@ def upgrade_setuptools():
sudo('pip3 install --upgrade setuptools')
# Install RethinkDB
# Prepare RethinkDB storage
@task
@parallel
def install_rethinkdb():
"""Installation of RethinkDB"""
with settings(warn_only=True):
# preparing filesystem
sudo("mkdir -p /data")
# Locally mounted storage (m3.2xlarge, but also c3.xxx)
def prep_rethinkdb_storage(USING_EBS):
"""Prepare RethinkDB storage"""
# Convert USING_EBS from a string to a bool
USING_EBS = (USING_EBS.lower() == 'true')
# Make the /data directory for RethinkDB data
sudo("mkdir -p /data")
# OLD: with settings(warn_only=True):
if USING_EBS: # on /dev/xvdp
# See https://tinyurl.com/h2nut68
sudo("mkfs -t ext4 /dev/xvdp")
sudo("mount /dev/xvdp /data")
# To mount this EBS volume on every system reboot,
# add an entry for the device to the /etc/fstab file.
# First, make a copy of the current /etc/fstab file
sudo("cp /etc/fstab /etc/fstab.orig")
# Append a line to /etc/fstab
sudo("echo '/dev/xvdp /data ext4 defaults,nofail,nobootwait 0 2' >> /etc/fstab")
# Veryify the /etc/fstab file. If something is wrong with it,
# then this should produce an error:
sudo("mount -a")
# Set the I/O scheduler for /dev/xdvp to deadline
with settings(sudo_user='root'):
sudo("echo deadline > /sys/block/xvdp/queue/scheduler")
else: # not using EBS.
# Using the "instance store" that comes with the instance.
# If the instance store comes with more than one volume,
# this only mounts ONE of them: /dev/xvdb
# For example, m3.2xlarge instances have /dev/xvdb and /dev/xvdc
# and /mnt is mounted on /dev/xvdb by default.
try:
sudo("umount /mnt")
sudo("mkfs -t ext4 /dev/xvdb")
sudo("mount /dev/xvdb /data")
except:
pass
# persist settings to fstab
sudo("rm -rf /etc/fstab")
sudo("echo 'LABEL=cloudimg-rootfs / ext4 defaults,discard 0 0' >> /etc/fstab")
sudo("echo '/dev/xvdb /data ext4 defaults,noatime 0 0' >> /etc/fstab")
# activate deadline scheduler
# Set the I/O scheduler for /dev/xdvb to deadline
with settings(sudo_user='root'):
sudo("echo deadline > /sys/block/xvdb/queue/scheduler")
# install rethinkdb
sudo("echo 'deb http://download.rethinkdb.com/apt trusty main' | sudo tee /etc/apt/sources.list.d/rethinkdb.list")
sudo("wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -")
sudo("apt-get update")
sudo("apt-get -y install rethinkdb")
# change fs to user
sudo('chown -R rethinkdb:rethinkdb /data')
# copy config file to target system
put('conf/rethinkdb.conf',
'/etc/rethinkdb/instances.d/instance1.conf',
mode=0600,
use_sudo=True)
# initialize data-dir
sudo('rm -rf /data/*')
# finally restart instance
sudo('/etc/init.d/rethinkdb restart')
# Install RethinkDB
@task
@parallel
def install_rethinkdb():
"""Install RethinkDB"""
sudo("echo 'deb http://download.rethinkdb.com/apt trusty main' | sudo tee /etc/apt/sources.list.d/rethinkdb.list")
sudo("wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -")
sudo("apt-get update")
sudo("apt-get -y install rethinkdb")
# Change owner:group of the RethinkDB data directory to rethinkdb:rethinkdb
sudo('chown -R rethinkdb:rethinkdb /data')
# Configure RethinkDB
@task
@parallel
def configure_rethinkdb():
"""Copy the RethinkDB config file to the remote host"""
put('conf/rethinkdb.conf',
'/etc/rethinkdb/instances.d/instance1.conf',
mode=0600,
use_sudo=True)
# Delete RethinkDB data
@task
@parallel
def delete_rethinkdb_data():
"""Delete the contents of the RethinkDB /data directory
but not the directory itself.
"""
sudo('rm -rf /data/*')
# Start RethinkDB
@task
@parallel
def start_rethinkdb():
"""Start RethinkDB"""
sudo('/etc/init.d/rethinkdb restart')
# Install BigchainDB from PyPI
@ -197,13 +246,20 @@ def init_bigchaindb():
run('bigchaindb init', pty=False)
# Set the number of shards (in the backlog and bigchain tables)
# Set the number of shards (in all tables)
@task
@hosts(public_dns_names[0])
def set_shards(num_shards):
run('bigchaindb set-shards {}'.format(num_shards))
# Set the number of replicas (in all tables)
@task
@hosts(public_dns_names[0])
def set_replicas(num_replicas):
run('bigchaindb set-replicas {}'.format(num_replicas))
# Start BigchainDB using screen
@task
@parallel

View File

@ -24,7 +24,8 @@ from awscommon import get_naeips
SETTINGS = ['NUM_NODES', 'BRANCH', 'WHAT_TO_DEPLOY', 'USE_KEYPAIRS_FILE',
'IMAGE_ID', 'INSTANCE_TYPE']
'IMAGE_ID', 'INSTANCE_TYPE', 'USING_EBS', 'EBS_VOLUME_SIZE',
'EBS_OPTIMIZED']
class SettingsTypeError(TypeError):
@ -76,7 +77,7 @@ if not isinstance(WHAT_TO_DEPLOY, str):
raise SettingsTypeError('WHAT_TO_DEPLOY should be a string')
if not isinstance(USE_KEYPAIRS_FILE, bool):
msg = 'USE_KEYPAIRS_FILE should a boolean (True or False)'
msg = 'USE_KEYPAIRS_FILE should be a boolean (True or False)'
raise SettingsTypeError(msg)
if not isinstance(IMAGE_ID, str):
@ -85,6 +86,15 @@ if not isinstance(IMAGE_ID, str):
if not isinstance(INSTANCE_TYPE, str):
raise SettingsTypeError('INSTANCE_TYPE should be a string')
if not isinstance(USING_EBS, bool):
raise SettingsTypeError('USING_EBS should be a boolean (True or False)')
if not isinstance(EBS_VOLUME_SIZE, int):
raise SettingsTypeError('EBS_VOLUME_SIZE should be an int')
if not isinstance(EBS_OPTIMIZED, bool):
raise SettingsTypeError('EBS_OPTIMIZED should be a boolean (True or False)')
if NUM_NODES > 64:
raise ValueError('NUM_NODES should be less than or equal to 64. '
'The AWS deployment configuration file sets it to {}'.
@ -95,6 +105,12 @@ if WHAT_TO_DEPLOY not in ['servers', 'clients']:
'The AWS deployment configuration file sets it to {}'.
format(WHAT_TO_DEPLOY))
# Since we assume 'gp2' volumes (for now), the possible range is 1 to 16384
if EBS_VOLUME_SIZE > 16384:
raise ValueError('EBS_VOLUME_SIZE should be <= 16384. '
'The AWS deployment configuration file sets it to {}'.
format(EBS_VOLUME_SIZE))
# Get an AWS EC2 "resource"
# See http://boto3.readthedocs.org/en/latest/guide/resources.html
ec2 = boto3.resource(service_name='ec2')
@ -158,14 +174,40 @@ print('Commencing launch of {} instances on Amazon EC2...'.
for _ in range(NUM_NODES):
# Request the launch of one instance at a time
# (so list_of_instances should contain only one item)
list_of_instances = ec2.create_instances(
ImageId=IMAGE_ID,
MinCount=1,
MaxCount=1,
KeyName='bigchaindb',
InstanceType=INSTANCE_TYPE,
SecurityGroupIds=['bigchaindb']
)
# See https://tinyurl.com/hbjewbb
if USING_EBS:
dm = {
'DeviceName': '/dev/sdp',
# Why /dev/sdp? See https://tinyurl.com/z2zqm6n
'Ebs': {
'VolumeSize': EBS_VOLUME_SIZE, # GiB
'DeleteOnTermination': False,
'VolumeType': 'gp2',
'Encrypted': False
},
# 'NoDevice': 'device'
# Suppresses the specified device included
# in the block device mapping of the AMI.
}
list_of_instances = ec2.create_instances(
ImageId=IMAGE_ID,
MinCount=1,
MaxCount=1,
KeyName='bigchaindb',
InstanceType=INSTANCE_TYPE,
SecurityGroupIds=['bigchaindb'],
BlockDeviceMappings=[dm],
EbsOptimized=EBS_OPTIMIZED
)
else: # not USING_EBS
list_of_instances = ec2.create_instances(
ImageId=IMAGE_ID,
MinCount=1,
MaxCount=1,
KeyName='bigchaindb',
InstanceType=INSTANCE_TYPE,
SecurityGroupIds=['bigchaindb']
)
# Tag the just-launched instances (should be just one)
for instance in list_of_instances:

View File

@ -1,32 +1,19 @@
# Example RethinkDB Storage Setups
## Example 1: A Partition of an AWS Instance Store
## Example Amazon EC2 Setups
Many [AWS EC2 instance types](https://aws.amazon.com/ec2/instance-types/) comes with an [instance store](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html): temporary storage that disappears when the instance disappears. The size and setup of an instance store depends on the EC2 instance type.
We have some scripts for [deploying a _test_ BigchainDB cluster on AWS](../clusters-feds/deploy-on-aws.html). Those scripts include command sequences to set up storage for RethinkDB.
In particular, look in the file [/deploy-cluster-aws/fabfile.py](https://github.com/bigchaindb/bigchaindb/blob/master/deploy-cluster-aws/fabfile.py), under `def prep_rethinkdb_storage(USING_EBS)`. Note that there are two cases:
We have some scripts for [deploying a _test_ BigchainDB cluster on AWS](../clusters-feds/deploy-on-aws.html). Those scripts include commands to set up a partition (`/dev/xvdb`) on an instance store for RethinkDB data. Those commands can be found in the file `/deploy-cluster-aws/fabfile.py`, under `def install_rethinkdb()` (i.e. the Fabric function to install RethinkDB).
1. **Using EBS ([Amazon Elastic Block Store](https://aws.amazon.com/ebs/)).** This is always an option, and for some instance types ("EBS-only"), it's the only option.
2. **Using an "instance store" volume provided with an Amazon EC2 instance.** Note that our scripts only use one of the (possibly many) volumes in the instance store.
An AWS instance store is convenient, but it's intended for "buffers, caches, scratch data, and other temporary content." Moreover:
There's some explanation of the steps in the [Amazon EC2 documentation about making an Amazon EBS volume available for use](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-using-volumes.html).
* You pay for all the storage, regardless of how much you use.
* You can't increase the size of the instance store.
* If the instance stops, terminates, or reboots, you lose the associated instance store.
* Instance store data isn't replicated, so if the underlying disk drive fails, you lose the data in the instance store.
* "You can't detach an instance store volume from one instance and attach it to a different instance."
The [AWS documentation says](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html), "...do not rely on instance store for valuable, long-term data. Instead, you can build a degree of redundancy (for example, RAID 1/5/6), or use a file system (for example, HDFS and MapR-FS) that supports redundancy and fault tolerance."
**Even if you don't use an AWS instance store partition to store your node's RethinkDB data, you may find it useful to read the steps in `def install_rethinkdb()`: [see fabfile.py](https://github.com/bigchaindb/bigchaindb/blob/master/deploy-cluster-aws/fabfile.py).**
You shouldn't use an EC2 "instance store" to store RethinkDB data for a production node, because it's not replicated and it's only intended for temporary, ephemeral data. If the associated instance crashes, is stopped, or is terminated, the data in the instance store is lost forever. Amazon EBS storage is replicated, has incremental snapshots, and is low-latency.
## Example 2: An Amazon EBS Volume
TODO
Note: Amazon EBS volumes are always replicated.
## Example 3: Using Amazon EFS
## Example Using Amazon EFS
TODO

View File

@ -103,6 +103,9 @@ WHAT_TO_DEPLOY="servers"
USE_KEYPAIRS_FILE=False
IMAGE_ID="ami-accff2b1"
INSTANCE_TYPE="m3.2xlarge"
USING_EBS=False
EBS_VOLUME_SIZE=30
EBS_OPTIMIZED=False
```
If you're happy with those settings, then you can skip to the next step. Otherwise, you could make a copy of `example_deploy_conf.py` (e.g. `cp example_deploy_conf.py my_deploy_conf.py`) and then edit the copy using a text editor.
@ -126,6 +129,8 @@ Step 3 is to launch the nodes ("instances") on AWS, to install all the necessary
cd bigchaindb
cd deploy-cluster-aws
./awsdeploy.sh my_deploy_conf.py
# Only if you want to set the replication factor to 3
fab set_replicas:3
# Only if you want to start BigchainDB on all the nodes:
fab start_bigchaindb
```

View File

@ -104,6 +104,7 @@ setup(
'flask==0.10.1',
'requests==2.9',
'gunicorn~=19.0',
'multipipes~=0.1.0',
],
setup_requires=['pytest-runner'],
tests_require=tests_require,

View File

@ -10,7 +10,7 @@ import cryptoconditions as cc
import bigchaindb
from bigchaindb import crypto, exceptions, util
from bigchaindb.voter import Voter
from bigchaindb.block import Block, BlockDeleteRevert
from bigchaindb.block import BlockDeleteRevert
@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
@ -610,200 +610,6 @@ class TestBlockValidation(object):
class TestBigchainBlock(object):
def test_by_assignee(self, b, user_vk):
# create transactions and randomly assigne them
transactions = mp.Queue()
count_assigned_to_me = 0
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
if assignee == b.me:
count_assigned_to_me += 1
tx['assignee'] = assignee
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_new_transaction = transactions
# filter the transactions
block.filter_by_assignee()
# check if the number of transactions assigned to the node is the same as the number in
# the queue minus 'stop'
assert block.q_tx_to_validate.qsize() - 1 == count_assigned_to_me
def test_validate_transactions(self, b, user_vk):
# create transactions and randomly invalidate some of them by changing the hash
transactions = mp.Queue()
count_valid = 0
for i in range(100):
valid = random.choice([True, False])
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
if not valid:
tx['id'] = 'a' * 64
else:
count_valid += 1
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_to_validate = transactions
# validate transactions
block.validate_transactions()
# check if the number of valid transactions
assert block.q_tx_validated.qsize() - 1 == count_valid
assert block.q_tx_delete.qsize() - 1 == 100
def test_create_block(self, b, user_vk):
# create transactions
transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
transactions.put(tx)
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_validated = transactions
# create blocks
block.create_blocks()
# check if the number of valid transactions
assert block.q_block.qsize() - 1 == 1
def test_write_block(self, b, user_vk):
# create transactions
transactions = []
blocks = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
transactions.append(tx)
# create block
block = b.create_block(transactions)
blocks.put(block)
blocks.put('stop')
# create a block instance
block = Block(transactions)
block.q_block = blocks
# make sure that we only have the genesis block in bigchain
r.table('bigchain').delete().run(b.conn)
b.create_genesis_block()
# write blocks
block.write_blocks()
# lets give it some time for the block to be written
time.sleep(1)
# check if the number of blocks in bigchain increased
assert r.table('bigchain').count() == 2
def test_delete_transactions(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
transactions.put(tx['id'])
transactions.put('stop')
# create a block instance
block = Block(transactions)
block.q_tx_delete = transactions
# make sure that there are transactions on the backlog
r.table('backlog').count().run(b.conn) == 100
# run the delete process
block.delete_transactions()
# give the db time
time.sleep(1)
# check if all transactions were deleted from the backlog
assert r.table('backlog').count() == 0
def test_bootstrap(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
# create a block instance
block = Block(None)
# run bootstrap
initial_results = block.bootstrap()
# we should have gotten a queue with 100 results minus the poison pills
assert initial_results.qsize() - mp.cpu_count() == 100
def test_start(self, b, user_vk):
# start with 100 transactions in the backlog and 100 in the changefeed
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
# create and write transactions to the backlog
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
# create 100 more transactions to emulate the changefeed
new_transactions = mp.Queue()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
new_transactions.put(tx)
for i in range(mp.cpu_count()):
new_transactions.put('stop')
# create a block instance
block = Block(new_transactions)
# start the block processes
block.start()
time.sleep(6)
assert new_transactions.qsize() == 0
assert r.table('backlog').count() == 0
assert r.table('bigchain').count() == 2
def test_empty_queues(self, b):
# create empty queue
new_transactions = mp.Queue()
# create block instance
block = Block(new_transactions)
# start block process
block.start()
# wait for 6 seconds to give it time for an empty queue exception to occur
time.sleep(6)
# join the process
block.kill()
def test_revert_delete_block(self, b):
b.create_genesis_block()

View File

View File

@ -0,0 +1,19 @@
import pytest
from ..db import conftest
@pytest.fixture(autouse=True)
def restore_config(request, node_config):
from bigchaindb import config_utils
config_utils.set_config(node_config)
@pytest.fixture(scope='module', autouse=True)
def setup_database(request, node_config):
conftest.setup_database(request, node_config)
@pytest.fixture(scope='function', autouse=True)
def cleanup_tables(request, node_config):
conftest.cleanup_tables(request, node_config)

View File

@ -0,0 +1,134 @@
import time
import random
from unittest.mock import patch
import rethinkdb as r
from bigchaindb.pipelines import block
from multipipes import Pipe, Pipeline
def test_filter_by_assignee(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['assignee'] = b.me
# filter_tx has side effects on the `tx` instance by popping 'assignee'
assert block_maker.filter_tx(tx) == tx
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['assignee'] = 'nobody'
assert block_maker.filter_tx(tx) is None
def test_validate_transaction(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
tx['id'] = 'a' * 64
assert block_maker.validate_tx(tx) is None
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
assert block_maker.validate_tx(tx) == tx
def test_create_block(b, user_vk):
block_maker = block.Block()
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
block_maker.create(tx)
# force the output triggering a `timeout`
block_doc = block_maker.create(None, timeout=True)
assert len(block_doc['block']['transactions']) == 100
def test_write_block(b, user_vk):
block_maker = block.Block()
txs = []
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
txs.append(tx)
block_doc = b.create_block(txs)
block_maker.write(block_doc)
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
def test_delete_tx(b, user_vk):
block_maker = block.Block()
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
assert r.table('backlog').get(tx['id']).run(b.conn) == tx
returned_tx = block_maker.delete_tx(tx)
assert returned_tx == tx
assert r.table('backlog').get(tx['id']).run(b.conn) is None
def test_prefeed(b, user_vk):
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx)
backlog = block.initial()
assert len(list(backlog)) == 100
@patch.object(Pipeline, 'start')
def test_start(mock_start):
# TODO: `block.start` is just a wrapper around `block.create_pipeline`,
# that is tested by `test_full_pipeline`.
# If anyone has better ideas on how to test this, please do a PR :)
block.start()
mock_start.assert_called_with()
def test_full_pipeline(b, user_vk):
outpipe = Pipe()
count_assigned_to_me = 0
for i in range(100):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
tx['assignee'] = assignee
if assignee == b.me:
count_assigned_to_me += 1
r.table('backlog').insert(tx, durability='hard').run(b.conn)
assert r.table('backlog').count().run(b.conn) == 100
pipeline = block.create_pipeline()
pipeline.setup(indata=block.get_changefeed(), outdata=outpipe)
pipeline.start()
time.sleep(2)
pipeline.terminate()
block_doc = outpipe.get()
assert len(block_doc['block']['transactions']) == count_assigned_to_me
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
assert r.table('backlog').count().run(b.conn) == 100 - count_assigned_to_me

View File

@ -0,0 +1,56 @@
from unittest.mock import patch
import rethinkdb
from multipipes import Pipe
from bigchaindb.pipelines import utils
MOCK_CHANGEFEED_DATA = [{
'new_val': 'seems like we have an insert here',
'old_val': None,
}, {
'new_val': None,
'old_val': 'seems like we have a delete here',
}, {
'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here',
}]
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_insert(mock_run):
outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'insert')
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have an insert here'
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_delete(mock_run):
outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'delete')
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == 'seems like we have a delete here'
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_update(mock_run):
outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'update')
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.get() == {'new_val': 'seems like we have an update here',
'old_val': 'seems like we have an update here'}
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_prefeed(mock_run):
outpipe = Pipe()
changefeed = utils.ChangeFeed('backlog', 'insert', prefeed=[1, 2, 3])
changefeed.outqueue = outpipe
changefeed.run_forever()
assert outpipe.qsize() == 4