diff --git a/bigchaindb/block.py b/bigchaindb/block.py index 96da9207..2c2e108e 100644 --- a/bigchaindb/block.py +++ b/bigchaindb/block.py @@ -11,8 +11,6 @@ from bigchaindb.monitor import Monitor logger = logging.getLogger(__name__) -monitor = Monitor() - class Block(object): @@ -27,6 +25,7 @@ class Block(object): self.q_tx_delete = mp.Queue() self.q_block = mp.Queue() self.initialized = mp.Event() + self.monitor = Monitor() def filter_by_assignee(self): """ @@ -57,7 +56,9 @@ class Block(object): b = Bigchain() while True: - monitor.gauge('tx_queue_gauge', self.q_tx_to_validate.qsize(), rate=bigchaindb.config['statsd']['rate']) + self.monitor.gauge('tx_queue_gauge', + self.q_tx_to_validate.qsize(), + rate=bigchaindb.config['statsd']['rate']) tx = self.q_tx_to_validate.get() # poison pill @@ -67,7 +68,11 @@ class Block(object): return self.q_tx_delete.put(tx['id']) - if b.is_valid_transaction(tx): + + with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']): + is_valid_transaction = b.is_valid_transaction(tx) + + if is_valid_transaction: self.q_tx_validated.put(tx) def create_blocks(self): @@ -122,7 +127,8 @@ class Block(object): if block == 'stop': return - b.write_block(block) + with self.monitor.timer('write_block'): + b.write_block(block) def delete_transactions(self): """ diff --git a/bigchaindb/client.py b/bigchaindb/client.py index 67ce51a9..fcf1a7f8 100644 --- a/bigchaindb/client.py +++ b/bigchaindb/client.py @@ -1,7 +1,6 @@ import requests import bigchaindb -from bigchaindb import util from bigchaindb import config_utils from bigchaindb import exceptions from bigchaindb import crypto @@ -112,5 +111,5 @@ def temp_client(): """ private_key, public_key = crypto.generate_key_pair() - return Client(private_key=private_key, public_key=public_key, api_endpoint='http://localhost:5000/api/v1') + return Client(private_key=private_key, public_key=public_key, api_endpoint=bigchaindb.config['api_endpoint']) diff --git a/bigchaindb/commands/bigchain_benchmark.py b/bigchaindb/commands/bigchain_benchmark.py index 177f0934..9c0edc63 100644 --- a/bigchaindb/commands/bigchain_benchmark.py +++ b/bigchaindb/commands/bigchain_benchmark.py @@ -8,23 +8,21 @@ import logstats import bigchaindb import bigchaindb.config_utils from bigchaindb.util import ProcessGroup +from bigchaindb.client import temp_client from bigchaindb.commands.utils import base_parser, start logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -USER_PUBLIC_KEY = 'qZAN9Ngs1v4qP1T5UBYw75M5f2ej7mAJx8gBMF4BBWtZ' - def _run_load(tx_left, stats): logstats.thread.start(stats) - b = bigchaindb.Bigchain() + client = temp_client() + # b = bigchaindb.Bigchain() while True: - tx = b.create_transaction(b.me, USER_PUBLIC_KEY, None, 'CREATE') - tx_signed = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx_signed) + tx = client.create() stats['transactions'] += 1 diff --git a/bigchaindb/config_utils.py b/bigchaindb/config_utils.py index 98be7fc9..3ec17656 100644 --- a/bigchaindb/config_utils.py +++ b/bigchaindb/config_utils.py @@ -22,6 +22,8 @@ from pkg_resources import iter_entry_points, ResolutionError import bigchaindb from bigchaindb.consensus import AbstractConsensusRules +# TODO: move this to a proper configuration file for logging +logging.getLogger('requests').setLevel(logging.WARNING) logger = logging.getLogger(__name__) CONFIG_DEFAULT_PATH = os.environ.setdefault( diff --git a/bigchaindb/core.py b/bigchaindb/core.py index ea4b2576..0683170b 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -1,17 +1,13 @@ -import rethinkdb as r import random -import rapidjson +import rethinkdb as r +import rapidjson import bigchaindb from bigchaindb import util from bigchaindb import config_utils from bigchaindb import exceptions from bigchaindb import crypto -from bigchaindb.monitor import Monitor - - -monitor = Monitor() class GenesisBlockAlreadyExistsError(Exception): @@ -68,7 +64,6 @@ class Bigchain(object): def reconnect(self): return r.connect(host=self.host, port=self.port, db=self.dbname) - @monitor.timer('create_transaction', rate=bigchaindb.config['statsd']['rate']) def create_transaction(self, *args, **kwargs): """Create a new transaction @@ -104,7 +99,6 @@ class Bigchain(object): return self.consensus.verify_signature( signed_transaction, *args, **kwargs) - @monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']) def write_transaction(self, signed_transaction, durability='soft'): """Write the transaction to bigchain. @@ -239,7 +233,6 @@ class Bigchain(object): return owned - @monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']) def validate_transaction(self, transaction): """Validate a transaction. @@ -308,7 +301,6 @@ class Bigchain(object): return block - @monitor.timer('validate_block') # TODO: check that the votings structure is correctly constructed def validate_block(self, block): """Validate a block. @@ -351,7 +343,6 @@ class Bigchain(object): except Exception: return False - @monitor.timer('write_block') def write_block(self, block, durability='soft'): """Write a block to bigchain. diff --git a/bigchaindb/monitor.py b/bigchaindb/monitor.py index 9094fea3..3c14da1e 100644 --- a/bigchaindb/monitor.py +++ b/bigchaindb/monitor.py @@ -1,13 +1,14 @@ -import statsd from platform import node +import statsd + import bigchaindb from bigchaindb import config_utils -class Monitor(statsd.StatsClient): - """Set up statsd monitoring - """ +class Monitor(statsd.StatsClient): + """Set up statsd monitoring.""" + def __init__(self, *args, **kwargs): """Overrides statsd client, fixing prefix to messages and loading configuration @@ -15,6 +16,7 @@ class Monitor(statsd.StatsClient): *args: arguments (identical to Statsclient) **kwargs: keyword arguments (identical to Statsclient) """ + config_utils.autoconfigure() if not kwargs: @@ -28,3 +30,4 @@ class Monitor(statsd.StatsClient): if 'port' not in kwargs: kwargs['port'] = bigchaindb.config['statsd']['port'] super().__init__(*args, **kwargs) + diff --git a/bigchaindb/voter.py b/bigchaindb/voter.py index 3ed73636..9a34c0d8 100644 --- a/bigchaindb/voter.py +++ b/bigchaindb/voter.py @@ -3,6 +3,7 @@ import multiprocessing as mp import ctypes from bigchaindb import Bigchain +from bigchaindb.monitor import Monitor logger = logging.getLogger(__name__) @@ -56,6 +57,9 @@ class Voter(object): Initialize with a queue where new blocks added to the bigchain will be put """ + + self.monitor = Monitor() + self.q_new_block = q_new_block self.q_blocks_to_validate = mp.Queue() self.q_validated_block = mp.Queue() @@ -102,7 +106,9 @@ class Voter(object): logger.info('new_block arrived to voter') block_number = self.v_previous_block_number.value + 1 - validity = b.is_valid_block(new_block) + + with self.monitor.timer('validate_block'): + validity = b.is_valid_block(new_block) self.q_validated_block.put((new_block, self.v_previous_block_id.value.decode(), diff --git a/bigchaindb/web/server.py b/bigchaindb/web/server.py index 3e26a007..e557bbd0 100644 --- a/bigchaindb/web/server.py +++ b/bigchaindb/web/server.py @@ -7,11 +7,12 @@ import copy import multiprocessing from flask import Flask +import gunicorn.app.base from bigchaindb import util from bigchaindb import Bigchain from bigchaindb.web import views -import gunicorn.app.base +from bigchaindb.monitor import Monitor class StandaloneApplication(gunicorn.app.base.BaseApplication): @@ -55,8 +56,12 @@ def create_app(settings): """ app = Flask(__name__) + app.debug = settings.get('debug', False) + app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4)) + app.config['monitor'] = Monitor() + app.register_blueprint(views.basic_views, url_prefix='/api/v1') return app diff --git a/bigchaindb/web/views.py b/bigchaindb/web/views.py index cf618fff..f7a14078 100644 --- a/bigchaindb/web/views.py +++ b/bigchaindb/web/views.py @@ -7,20 +7,33 @@ For more information please refer to the documentation in Apiary: import flask from flask import current_app, request, Blueprint +import bigchaindb from bigchaindb import util basic_views = Blueprint('basic_views', __name__) +# Unfortunately I cannot find a reference to this decorator. +# This answer on SO is quite useful tho: +# - http://stackoverflow.com/a/13432373/597097 @basic_views.record -def get_bigchain(state): +def record(state): + """This function checks if the blueprint can be initialized + with the provided state.""" + bigchain_pool = state.app.config.get('bigchain_pool') + monitor = state.app.config.get('monitor') if bigchain_pool is None: raise Exception('This blueprint expects you to provide ' 'a pool of Bigchain instances called `bigchain_pool`') + if monitor is None: + raise ValueError('This blueprint expects you to provide ' + 'a monitor instance to record system ' + 'performance.') + @basic_views.route('/transactions/') def get_transaction(tx_id): @@ -49,6 +62,7 @@ def create_transaction(): A JSON string containing the data about the transaction. """ pool = current_app.config['bigchain_pool'] + monitor = current_app.config['monitor'] val = {} @@ -64,7 +78,8 @@ def create_transaction(): if not bigchain.consensus.verify_signature(tx): val['error'] = 'Invalid transaction signature' - val = bigchain.write_transaction(tx) + with monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']): + val = bigchain.write_transaction(tx) return flask.jsonify(**tx) diff --git a/tests/test_util.py b/tests/test_util.py index 22cc0c22..b470053c 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,3 +1,4 @@ +from unittest.mock import patch, call import pytest import queue @@ -121,3 +122,24 @@ def test_pool_raises_empty_exception_when_timeout(mock_queue): with pool() as instance: assert instance == 'hello' + +@patch('multiprocessing.Process') +def test_process_group_instantiates_and_start_processes(mock_process): + from bigchaindb.util import ProcessGroup + + def noop(): + pass + + concurrency = 10 + + pg = ProcessGroup(concurrency=concurrency, group='test_group', target=noop) + pg.start() + + mock_process.assert_has_calls([call(group='test_group', target=noop, + name=None, args=(), kwargs={}, + daemon=None) + for i in range(concurrency)], any_order=True) + + for process in pg.processes: + process.start.assert_called_with() +