Merge pull request #181 from bigchaindb/feat/156/refactor-monitoring-code

Move monitoring code from the main class to processes
This commit is contained in:
Ryan Henderson 2016-04-21 16:22:07 +02:00
commit 1d50c8965d
10 changed files with 79 additions and 32 deletions

View File

@ -11,8 +11,6 @@ from bigchaindb.monitor import Monitor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
monitor = Monitor()
class Block(object): class Block(object):
@ -27,6 +25,7 @@ class Block(object):
self.q_tx_delete = mp.Queue() self.q_tx_delete = mp.Queue()
self.q_block = mp.Queue() self.q_block = mp.Queue()
self.initialized = mp.Event() self.initialized = mp.Event()
self.monitor = Monitor()
def filter_by_assignee(self): def filter_by_assignee(self):
""" """
@ -57,7 +56,9 @@ class Block(object):
b = Bigchain() b = Bigchain()
while True: while True:
monitor.gauge('tx_queue_gauge', self.q_tx_to_validate.qsize(), rate=bigchaindb.config['statsd']['rate']) self.monitor.gauge('tx_queue_gauge',
self.q_tx_to_validate.qsize(),
rate=bigchaindb.config['statsd']['rate'])
tx = self.q_tx_to_validate.get() tx = self.q_tx_to_validate.get()
# poison pill # poison pill
@ -67,7 +68,11 @@ class Block(object):
return return
self.q_tx_delete.put(tx['id']) self.q_tx_delete.put(tx['id'])
if b.is_valid_transaction(tx):
with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']):
is_valid_transaction = b.is_valid_transaction(tx)
if is_valid_transaction:
self.q_tx_validated.put(tx) self.q_tx_validated.put(tx)
def create_blocks(self): def create_blocks(self):
@ -122,6 +127,7 @@ class Block(object):
if block == 'stop': if block == 'stop':
return return
with self.monitor.timer('write_block'):
b.write_block(block) b.write_block(block)
def delete_transactions(self): def delete_transactions(self):

View File

@ -1,7 +1,6 @@
import requests import requests
import bigchaindb import bigchaindb
from bigchaindb import util
from bigchaindb import config_utils from bigchaindb import config_utils
from bigchaindb import exceptions from bigchaindb import exceptions
from bigchaindb import crypto from bigchaindb import crypto
@ -112,5 +111,5 @@ def temp_client():
""" """
private_key, public_key = crypto.generate_key_pair() private_key, public_key = crypto.generate_key_pair()
return Client(private_key=private_key, public_key=public_key, api_endpoint='http://localhost:5000/api/v1') return Client(private_key=private_key, public_key=public_key, api_endpoint=bigchaindb.config['api_endpoint'])

View File

@ -8,23 +8,21 @@ import logstats
import bigchaindb import bigchaindb
import bigchaindb.config_utils import bigchaindb.config_utils
from bigchaindb.util import ProcessGroup from bigchaindb.util import ProcessGroup
from bigchaindb.client import temp_client
from bigchaindb.commands.utils import base_parser, start from bigchaindb.commands.utils import base_parser, start
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
USER_PUBLIC_KEY = 'qZAN9Ngs1v4qP1T5UBYw75M5f2ej7mAJx8gBMF4BBWtZ'
def _run_load(tx_left, stats): def _run_load(tx_left, stats):
logstats.thread.start(stats) logstats.thread.start(stats)
b = bigchaindb.Bigchain() client = temp_client()
# b = bigchaindb.Bigchain()
while True: while True:
tx = b.create_transaction(b.me, USER_PUBLIC_KEY, None, 'CREATE') tx = client.create()
tx_signed = b.sign_transaction(tx, b.me_private)
b.write_transaction(tx_signed)
stats['transactions'] += 1 stats['transactions'] += 1

View File

@ -22,6 +22,8 @@ from pkg_resources import iter_entry_points, ResolutionError
import bigchaindb import bigchaindb
from bigchaindb.consensus import AbstractConsensusRules from bigchaindb.consensus import AbstractConsensusRules
# TODO: move this to a proper configuration file for logging
logging.getLogger('requests').setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CONFIG_DEFAULT_PATH = os.environ.setdefault( CONFIG_DEFAULT_PATH = os.environ.setdefault(

View File

@ -1,17 +1,13 @@
import rethinkdb as r
import random import random
import rapidjson
import rethinkdb as r
import rapidjson
import bigchaindb import bigchaindb
from bigchaindb import util from bigchaindb import util
from bigchaindb import config_utils from bigchaindb import config_utils
from bigchaindb import exceptions from bigchaindb import exceptions
from bigchaindb import crypto from bigchaindb import crypto
from bigchaindb.monitor import Monitor
monitor = Monitor()
class GenesisBlockAlreadyExistsError(Exception): class GenesisBlockAlreadyExistsError(Exception):
@ -68,7 +64,6 @@ class Bigchain(object):
def reconnect(self): def reconnect(self):
return r.connect(host=self.host, port=self.port, db=self.dbname) return r.connect(host=self.host, port=self.port, db=self.dbname)
@monitor.timer('create_transaction', rate=bigchaindb.config['statsd']['rate'])
def create_transaction(self, *args, **kwargs): def create_transaction(self, *args, **kwargs):
"""Create a new transaction """Create a new transaction
@ -104,7 +99,6 @@ class Bigchain(object):
return self.consensus.verify_signature( return self.consensus.verify_signature(
signed_transaction, *args, **kwargs) signed_transaction, *args, **kwargs)
@monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate'])
def write_transaction(self, signed_transaction, durability='soft'): def write_transaction(self, signed_transaction, durability='soft'):
"""Write the transaction to bigchain. """Write the transaction to bigchain.
@ -239,7 +233,6 @@ class Bigchain(object):
return owned return owned
@monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate'])
def validate_transaction(self, transaction): def validate_transaction(self, transaction):
"""Validate a transaction. """Validate a transaction.
@ -308,7 +301,6 @@ class Bigchain(object):
return block return block
@monitor.timer('validate_block')
# TODO: check that the votings structure is correctly constructed # TODO: check that the votings structure is correctly constructed
def validate_block(self, block): def validate_block(self, block):
"""Validate a block. """Validate a block.
@ -351,7 +343,6 @@ class Bigchain(object):
except Exception: except Exception:
return False return False
@monitor.timer('write_block')
def write_block(self, block, durability='soft'): def write_block(self, block, durability='soft'):
"""Write a block to bigchain. """Write a block to bigchain.

View File

@ -1,13 +1,14 @@
import statsd
from platform import node from platform import node
import statsd
import bigchaindb import bigchaindb
from bigchaindb import config_utils from bigchaindb import config_utils
class Monitor(statsd.StatsClient):
"""Set up statsd monitoring
""" class Monitor(statsd.StatsClient):
"""Set up statsd monitoring."""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""Overrides statsd client, fixing prefix to messages and loading configuration """Overrides statsd client, fixing prefix to messages and loading configuration
@ -15,6 +16,7 @@ class Monitor(statsd.StatsClient):
*args: arguments (identical to Statsclient) *args: arguments (identical to Statsclient)
**kwargs: keyword arguments (identical to Statsclient) **kwargs: keyword arguments (identical to Statsclient)
""" """
config_utils.autoconfigure() config_utils.autoconfigure()
if not kwargs: if not kwargs:
@ -28,3 +30,4 @@ class Monitor(statsd.StatsClient):
if 'port' not in kwargs: if 'port' not in kwargs:
kwargs['port'] = bigchaindb.config['statsd']['port'] kwargs['port'] = bigchaindb.config['statsd']['port']
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)

View File

@ -3,6 +3,7 @@ import multiprocessing as mp
import ctypes import ctypes
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.monitor import Monitor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,6 +57,9 @@ class Voter(object):
Initialize with a queue where new blocks added to the bigchain will be put Initialize with a queue where new blocks added to the bigchain will be put
""" """
self.monitor = Monitor()
self.q_new_block = q_new_block self.q_new_block = q_new_block
self.q_blocks_to_validate = mp.Queue() self.q_blocks_to_validate = mp.Queue()
self.q_validated_block = mp.Queue() self.q_validated_block = mp.Queue()
@ -102,6 +106,8 @@ class Voter(object):
logger.info('new_block arrived to voter') logger.info('new_block arrived to voter')
block_number = self.v_previous_block_number.value + 1 block_number = self.v_previous_block_number.value + 1
with self.monitor.timer('validate_block'):
validity = b.is_valid_block(new_block) validity = b.is_valid_block(new_block)
self.q_validated_block.put((new_block, self.q_validated_block.put((new_block,

View File

@ -7,11 +7,12 @@ import copy
import multiprocessing import multiprocessing
from flask import Flask from flask import Flask
import gunicorn.app.base
from bigchaindb import util from bigchaindb import util
from bigchaindb import Bigchain from bigchaindb import Bigchain
from bigchaindb.web import views from bigchaindb.web import views
import gunicorn.app.base from bigchaindb.monitor import Monitor
class StandaloneApplication(gunicorn.app.base.BaseApplication): class StandaloneApplication(gunicorn.app.base.BaseApplication):
@ -55,8 +56,12 @@ def create_app(settings):
""" """
app = Flask(__name__) app = Flask(__name__)
app.debug = settings.get('debug', False) app.debug = settings.get('debug', False)
app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4)) app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4))
app.config['monitor'] = Monitor()
app.register_blueprint(views.basic_views, url_prefix='/api/v1') app.register_blueprint(views.basic_views, url_prefix='/api/v1')
return app return app

View File

@ -7,20 +7,33 @@ For more information please refer to the documentation in Apiary:
import flask import flask
from flask import current_app, request, Blueprint from flask import current_app, request, Blueprint
import bigchaindb
from bigchaindb import util from bigchaindb import util
basic_views = Blueprint('basic_views', __name__) basic_views = Blueprint('basic_views', __name__)
# Unfortunately I cannot find a reference to this decorator.
# This answer on SO is quite useful tho:
# - http://stackoverflow.com/a/13432373/597097
@basic_views.record @basic_views.record
def get_bigchain(state): def record(state):
"""This function checks if the blueprint can be initialized
with the provided state."""
bigchain_pool = state.app.config.get('bigchain_pool') bigchain_pool = state.app.config.get('bigchain_pool')
monitor = state.app.config.get('monitor')
if bigchain_pool is None: if bigchain_pool is None:
raise Exception('This blueprint expects you to provide ' raise Exception('This blueprint expects you to provide '
'a pool of Bigchain instances called `bigchain_pool`') 'a pool of Bigchain instances called `bigchain_pool`')
if monitor is None:
raise ValueError('This blueprint expects you to provide '
'a monitor instance to record system '
'performance.')
@basic_views.route('/transactions/<tx_id>') @basic_views.route('/transactions/<tx_id>')
def get_transaction(tx_id): def get_transaction(tx_id):
@ -49,6 +62,7 @@ def create_transaction():
A JSON string containing the data about the transaction. A JSON string containing the data about the transaction.
""" """
pool = current_app.config['bigchain_pool'] pool = current_app.config['bigchain_pool']
monitor = current_app.config['monitor']
val = {} val = {}
@ -64,6 +78,7 @@ def create_transaction():
if not bigchain.consensus.verify_signature(tx): if not bigchain.consensus.verify_signature(tx):
val['error'] = 'Invalid transaction signature' val['error'] = 'Invalid transaction signature'
with monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']):
val = bigchain.write_transaction(tx) val = bigchain.write_transaction(tx)
return flask.jsonify(**tx) return flask.jsonify(**tx)

View File

@ -1,3 +1,4 @@
from unittest.mock import patch, call
import pytest import pytest
import queue import queue
@ -121,3 +122,24 @@ def test_pool_raises_empty_exception_when_timeout(mock_queue):
with pool() as instance: with pool() as instance:
assert instance == 'hello' assert instance == 'hello'
@patch('multiprocessing.Process')
def test_process_group_instantiates_and_start_processes(mock_process):
from bigchaindb.util import ProcessGroup
def noop():
pass
concurrency = 10
pg = ProcessGroup(concurrency=concurrency, group='test_group', target=noop)
pg.start()
mock_process.assert_has_calls([call(group='test_group', target=noop,
name=None, args=(), kwargs={},
daemon=None)
for i in range(concurrency)], any_order=True)
for process in pg.processes:
process.start.assert_called_with()