Merge remote-tracking branch 'remotes/origin/feat/128/multiple-input-output' into feat/201/escrow

Conflicts:
	bigchaindb/util.py
This commit is contained in:
diminator 2016-04-21 19:07:56 +02:00
commit c928669230
No known key found for this signature in database
GPG Key ID: C3D8590E6D0D439A
15 changed files with 102 additions and 35 deletions

View File

@ -37,4 +37,4 @@ config = {
# for more info. # for more info.
_config = copy.deepcopy(config) _config = copy.deepcopy(config)
from bigchaindb.core import Bigchain # noqa from bigchaindb.core import Bigchain # noqa
from bigchaindb.version import __version__ # noqa

View File

@ -11,8 +11,6 @@ from bigchaindb.monitor import Monitor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
monitor = Monitor()
class Block(object): class Block(object):
@ -27,6 +25,7 @@ class Block(object):
self.q_tx_delete = mp.Queue() self.q_tx_delete = mp.Queue()
self.q_block = mp.Queue() self.q_block = mp.Queue()
self.initialized = mp.Event() self.initialized = mp.Event()
self.monitor = Monitor()
def filter_by_assignee(self): def filter_by_assignee(self):
""" """
@ -57,7 +56,9 @@ class Block(object):
b = Bigchain() b = Bigchain()
while True: while True:
monitor.gauge('tx_queue_gauge', self.q_tx_to_validate.qsize(), rate=bigchaindb.config['statsd']['rate']) self.monitor.gauge('tx_queue_gauge',
self.q_tx_to_validate.qsize(),
rate=bigchaindb.config['statsd']['rate'])
tx = self.q_tx_to_validate.get() tx = self.q_tx_to_validate.get()
# poison pill # poison pill
@ -67,7 +68,11 @@ class Block(object):
return return
self.q_tx_delete.put(tx['id']) self.q_tx_delete.put(tx['id'])
if b.is_valid_transaction(tx):
with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']):
is_valid_transaction = b.is_valid_transaction(tx)
if is_valid_transaction:
self.q_tx_validated.put(tx) self.q_tx_validated.put(tx)
def create_blocks(self): def create_blocks(self):
@ -122,7 +127,8 @@ class Block(object):
if block == 'stop': if block == 'stop':
return return
b.write_block(block) with self.monitor.timer('write_block'):
b.write_block(block)
def delete_transactions(self): def delete_transactions(self):
""" """

View File

@ -1,7 +1,6 @@
import requests import requests
import bigchaindb import bigchaindb
from bigchaindb import util
from bigchaindb import config_utils from bigchaindb import config_utils
from bigchaindb import exceptions from bigchaindb import exceptions
from bigchaindb import crypto from bigchaindb import crypto
@ -112,5 +111,5 @@ def temp_client():
""" """
private_key, public_key = crypto.generate_key_pair() private_key, public_key = crypto.generate_key_pair()
return Client(private_key=private_key, public_key=public_key, api_endpoint='http://localhost:5000/api/v1') return Client(private_key=private_key, public_key=public_key, api_endpoint=bigchaindb.config['api_endpoint'])

View File

@ -8,23 +8,21 @@ import logstats
import bigchaindb import bigchaindb
import bigchaindb.config_utils import bigchaindb.config_utils
from bigchaindb.util import ProcessGroup from bigchaindb.util import ProcessGroup
from bigchaindb.client import temp_client
from bigchaindb.commands.utils import base_parser, start from bigchaindb.commands.utils import base_parser, start
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
USER_PUBLIC_KEY = 'qZAN9Ngs1v4qP1T5UBYw75M5f2ej7mAJx8gBMF4BBWtZ'
def _run_load(tx_left, stats): def _run_load(tx_left, stats):
logstats.thread.start(stats) logstats.thread.start(stats)
b = bigchaindb.Bigchain() client = temp_client()
# b = bigchaindb.Bigchain()
while True: while True:
tx = b.create_transaction(b.me, USER_PUBLIC_KEY, None, 'CREATE') tx = client.create()
tx_signed = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx_signed)
stats['transactions'] += 1 stats['transactions'] += 1

View File

@ -5,6 +5,8 @@ for ``argparse.ArgumentParser``.
import argparse import argparse
import multiprocessing as mp import multiprocessing as mp
from bigchaindb.version import __version__
def start(parser, scope): def start(parser, scope):
"""Utility function to execute a subcommand. """Utility function to execute a subcommand.
@ -46,7 +48,7 @@ def start(parser, scope):
func(args) func(args)
base_parser = argparse.ArgumentParser(add_help=False) base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb')
base_parser.add_argument('-c', '--config', base_parser.add_argument('-c', '--config',
help='Specify the location of the configuration file') help='Specify the location of the configuration file')
@ -55,3 +57,7 @@ base_parser.add_argument('-y', '--yes', '--yes-please',
action='store_true', action='store_true',
help='Assume "yes" as answer to all prompts and run ' help='Assume "yes" as answer to all prompts and run '
'non-interactively') 'non-interactively')
base_parser.add_argument('-v', '--version',
action='version',
version='%(prog)s {}'.format(__version__))

View File

@ -22,6 +22,8 @@ from pkg_resources import iter_entry_points, ResolutionError
import bigchaindb import bigchaindb
from bigchaindb.consensus import AbstractConsensusRules from bigchaindb.consensus import AbstractConsensusRules
# TODO: move this to a proper configuration file for logging
logging.getLogger('requests').setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CONFIG_DEFAULT_PATH = os.environ.setdefault( CONFIG_DEFAULT_PATH = os.environ.setdefault(

View File

@ -1,5 +1,6 @@
import rethinkdb as r
import random import random
import rethinkdb as r
import rapidjson import rapidjson
import bigchaindb import bigchaindb
@ -7,9 +8,6 @@ from bigchaindb import util
from bigchaindb import config_utils from bigchaindb import config_utils
from bigchaindb import exceptions from bigchaindb import exceptions
from bigchaindb import crypto from bigchaindb import crypto
from bigchaindb.monitor import Monitor
monitor = Monitor()
class GenesisBlockAlreadyExistsError(Exception): class GenesisBlockAlreadyExistsError(Exception):
@ -66,7 +64,6 @@ class Bigchain(object):
def reconnect(self): def reconnect(self):
return r.connect(host=self.host, port=self.port, db=self.dbname) return r.connect(host=self.host, port=self.port, db=self.dbname)
@monitor.timer('create_transaction', rate=bigchaindb.config['statsd']['rate'])
def create_transaction(self, *args, **kwargs): def create_transaction(self, *args, **kwargs):
"""Create a new transaction """Create a new transaction
@ -102,7 +99,6 @@ class Bigchain(object):
return self.consensus.verify_signature( return self.consensus.verify_signature(
signed_transaction, *args, **kwargs) signed_transaction, *args, **kwargs)
@monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate'])
def write_transaction(self, signed_transaction, durability='soft'): def write_transaction(self, signed_transaction, durability='soft'):
"""Write the transaction to bigchain. """Write the transaction to bigchain.
@ -248,7 +244,6 @@ class Bigchain(object):
return owned return owned
@monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate'])
def validate_transaction(self, transaction): def validate_transaction(self, transaction):
"""Validate a transaction. """Validate a transaction.
@ -317,7 +312,6 @@ class Bigchain(object):
return block return block
@monitor.timer('validate_block')
# TODO: check that the votings structure is correctly constructed # TODO: check that the votings structure is correctly constructed
def validate_block(self, block): def validate_block(self, block):
"""Validate a block. """Validate a block.
@ -360,7 +354,6 @@ class Bigchain(object):
except Exception: except Exception:
return False return False
@monitor.timer('write_block')
def write_block(self, block, durability='soft'): def write_block(self, block, durability='soft'):
"""Write a block to bigchain. """Write a block to bigchain.

View File

@ -1,13 +1,14 @@
import statsd
from platform import node from platform import node
import statsd
import bigchaindb import bigchaindb
from bigchaindb import config_utils from bigchaindb import config_utils
class Monitor(statsd.StatsClient):
"""Set up statsd monitoring
""" class Monitor(statsd.StatsClient):
"""Set up statsd monitoring."""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""Overrides statsd client, fixing prefix to messages and loading configuration """Overrides statsd client, fixing prefix to messages and loading configuration
@ -15,6 +16,7 @@ class Monitor(statsd.StatsClient):
*args: arguments (identical to Statsclient) *args: arguments (identical to Statsclient)
**kwargs: keyword arguments (identical to Statsclient) **kwargs: keyword arguments (identical to Statsclient)
""" """
config_utils.autoconfigure() config_utils.autoconfigure()
if not kwargs: if not kwargs:
@ -28,3 +30,4 @@ class Monitor(statsd.StatsClient):
if 'port' not in kwargs: if 'port' not in kwargs:
kwargs['port'] = bigchaindb.config['statsd']['port'] kwargs['port'] = bigchaindb.config['statsd']['port']
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)

2
bigchaindb/version.py Normal file
View File

@ -0,0 +1,2 @@
__version__ = '0.1.5'
__short_version__ = '0.1'

View File

@ -3,6 +3,7 @@ import multiprocessing as mp
import ctypes import ctypes
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.monitor import Monitor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,6 +57,9 @@ class Voter(object):
Initialize with a queue where new blocks added to the bigchain will be put Initialize with a queue where new blocks added to the bigchain will be put
""" """
self.monitor = Monitor()
self.q_new_block = q_new_block self.q_new_block = q_new_block
self.q_blocks_to_validate = mp.Queue() self.q_blocks_to_validate = mp.Queue()
self.q_validated_block = mp.Queue() self.q_validated_block = mp.Queue()
@ -102,7 +106,9 @@ class Voter(object):
logger.info('new_block arrived to voter') logger.info('new_block arrived to voter')
block_number = self.v_previous_block_number.value + 1 block_number = self.v_previous_block_number.value + 1
validity = b.is_valid_block(new_block)
with self.monitor.timer('validate_block'):
validity = b.is_valid_block(new_block)
self.q_validated_block.put((new_block, self.q_validated_block.put((new_block,
self.v_previous_block_id.value.decode(), self.v_previous_block_id.value.decode(),

View File

@ -7,11 +7,12 @@ import copy
import multiprocessing import multiprocessing
from flask import Flask from flask import Flask
import gunicorn.app.base
from bigchaindb import util from bigchaindb import util
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.web import views from bigchaindb.web import views
import gunicorn.app.base from bigchaindb.monitor import Monitor
class StandaloneApplication(gunicorn.app.base.BaseApplication): class StandaloneApplication(gunicorn.app.base.BaseApplication):
@ -55,8 +56,12 @@ def create_app(settings):
""" """
app = Flask(__name__) app = Flask(__name__)
app.debug = settings.get('debug', False) app.debug = settings.get('debug', False)
app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4)) app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4))
app.config['monitor'] = Monitor()
app.register_blueprint(views.basic_views, url_prefix='/api/v1') app.register_blueprint(views.basic_views, url_prefix='/api/v1')
return app return app

View File

@ -7,20 +7,33 @@ For more information please refer to the documentation in Apiary:
import flask import flask
from flask import current_app, request, Blueprint from flask import current_app, request, Blueprint
import bigchaindb
from bigchaindb import util from bigchaindb import util
basic_views = Blueprint('basic_views', __name__) basic_views = Blueprint('basic_views', __name__)
# Unfortunately I cannot find a reference to this decorator.
# This answer on SO is quite useful tho:
# - http://stackoverflow.com/a/13432373/597097
@basic_views.record @basic_views.record
def get_bigchain(state): def record(state):
"""This function checks if the blueprint can be initialized
with the provided state."""
bigchain_pool = state.app.config.get('bigchain_pool') bigchain_pool = state.app.config.get('bigchain_pool')
monitor = state.app.config.get('monitor')
if bigchain_pool is None: if bigchain_pool is None:
raise Exception('This blueprint expects you to provide ' raise Exception('This blueprint expects you to provide '
'a pool of Bigchain instances called `bigchain_pool`') 'a pool of Bigchain instances called `bigchain_pool`')
if monitor is None:
raise ValueError('This blueprint expects you to provide '
'a monitor instance to record system '
'performance.')
@basic_views.route('/transactions/<tx_id>') @basic_views.route('/transactions/<tx_id>')
def get_transaction(tx_id): def get_transaction(tx_id):
@ -49,6 +62,7 @@ def create_transaction():
A JSON string containing the data about the transaction. A JSON string containing the data about the transaction.
""" """
pool = current_app.config['bigchain_pool'] pool = current_app.config['bigchain_pool']
monitor = current_app.config['monitor']
val = {} val = {}
@ -64,7 +78,8 @@ def create_transaction():
if not bigchain.consensus.verify_signature(tx): if not bigchain.consensus.verify_signature(tx):
val['error'] = 'Invalid transaction signature' val['error'] = 'Invalid transaction signature'
val = bigchain.write_transaction(tx) with monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']):
val = bigchain.write_transaction(tx)
return flask.jsonify(**tx) return flask.jsonify(**tx)

View File

@ -30,6 +30,11 @@ from recommonmark.parser import CommonMarkParser
# ones. # ones.
import sphinx_rtd_theme import sphinx_rtd_theme
# get version
_version = {}
with open('../../bigchaindb/version.py') as fp:
exec(fp.read(), _version)
extensions = [ extensions = [
'sphinx.ext.autodoc', 'sphinx.ext.autodoc',
@ -69,9 +74,9 @@ author = 'BigchainDB Contributors'
# built documents. # built documents.
# #
# The short X.Y version. # The short X.Y version.
version = '0.1' version = _version['__short_version__']
# The full version, including alpha/beta/rc tags. # The full version, including alpha/beta/rc tags.
release = '0.1.5' release = _version['__version__']
# The language for content autogenerated by Sphinx. Refer to documentation # The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages. # for a list of supported languages.

View File

@ -6,6 +6,11 @@ For full docs visit https://bigchaindb.readthedocs.org
""" """
from setuptools import setup, find_packages from setuptools import setup, find_packages
# get the version
version = {}
with open('bigchaindb/version.py') as fp:
exec(fp.read(), version)
tests_require = [ tests_require = [
'pytest', 'pytest',
'coverage', 'coverage',
@ -32,7 +37,7 @@ docs_require = [
setup( setup(
name='BigchainDB', name='BigchainDB',
version='0.1.5', version=version['__version__'],
description='BigchainDB: A Scalable Blockchain Database', description='BigchainDB: A Scalable Blockchain Database',
long_description=__doc__, long_description=__doc__,
url='https://github.com/BigchainDB/bigchaindb/', url='https://github.com/BigchainDB/bigchaindb/',

View File

@ -1,3 +1,4 @@
from unittest.mock import patch, call
import pytest import pytest
import queue import queue
@ -120,3 +121,24 @@ def test_pool_raises_empty_exception_when_timeout(mock_queue):
with pool() as instance: with pool() as instance:
assert instance == 'hello' assert instance == 'hello'
@patch('multiprocessing.Process')
def test_process_group_instantiates_and_start_processes(mock_process):
from bigchaindb.util import ProcessGroup
def noop():
pass
concurrency = 10
pg = ProcessGroup(concurrency=concurrency, group='test_group', target=noop)
pg.start()
mock_process.assert_has_calls([call(group='test_group', target=noop,
name=None, args=(), kwargs={},
daemon=None)
for i in range(concurrency)], any_order=True)
for process in pg.processes:
process.start.assert_called_with()