Merge branch 'feat/116/more-solid-webserver' into feat/156/refactor-monitoring-code

This commit is contained in:
vrde 2016-04-15 16:21:14 +02:00
commit 203f1bf70a
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
5 changed files with 189 additions and 23 deletions

View File

@ -1,6 +1,9 @@
import json
import time
import contextlib
import threading
import queue
import multiprocessing as mp
from datetime import datetime
@ -31,6 +34,53 @@ class ProcessGroup(object):
self.processes.append(proc)
# Inspired by:
# - http://stackoverflow.com/a/24741694/597097
def pool(builder, size, timeout=None):
"""Create a pool that imposes a limit on the number of stored
instances.
Args:
builder: a function to build an instance.
size: the size of the pool.
Returns:
A context manager that can be used with the ``with``
statement.
"""
lock = threading.Lock()
local_pool = queue.Queue()
current_size = 0
@contextlib.contextmanager
def pooled():
nonlocal current_size
instance = None
# If we still have free slots, then we have room to create new
# instances.
if current_size < size:
with lock:
# We need to check again if we have slots available, since
# the situation might be different after acquiring the lock
if current_size < size:
current_size += 1
instance = builder()
# Watchout: current_size can be equal to size if the previous part of
# the function has been executed, that's why we need to check if the
# instance is None.
if instance is None and current_size == size:
instance = local_pool.get(timeout=timeout)
yield instance
local_pool.put(instance)
return pooled
def serialize(data):
"""Serialize a dict into a JSON formatted string.

View File

@ -9,6 +9,7 @@ import multiprocessing
from flask import Flask
import gunicorn.app.base
from bigchaindb import util
from bigchaindb import Bigchain
from bigchaindb.web import views
from bigchaindb.monitor import Monitor
@ -46,7 +47,7 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication):
return self.application
def create_app(debug=False):
def create_app(settings):
"""Return an instance of the Flask application.
Args:
@ -55,9 +56,12 @@ def create_app(debug=False):
"""
app = Flask(__name__)
app.debug = debug
app.config['bigchain'] = Bigchain()
app.debug = settings.get('debug', False)
app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4))
app.config['monitor'] = Monitor()
app.register_blueprint(views.basic_views, url_prefix='/api/v1')
return app
@ -81,8 +85,8 @@ def create_server(settings):
if not settings.get('threads'):
settings['threads'] = (multiprocessing.cpu_count() * 2) + 1
debug = settings.pop('debug', False)
app = create_app(debug)
app = create_app(settings)
settings.pop('debug', False)
standalone = StandaloneApplication(app, settings)
return standalone

View File

@ -22,12 +22,12 @@ def record(state):
"""This function checks if the blueprint can be initialized
with the provided state."""
bigchain = state.app.config.get('bigchain')
bigchain_pool = state.app.config.get('bigchain_pool')
monitor = state.app.config.get('monitor')
if bigchain is None:
raise ValueError('This blueprint expects you to provide '
'database access through `bigchain`.')
if bigchain_pool is None:
raise Exception('This blueprint expects you to provide '
'a pool of Bigchain instances called `bigchain_pool`')
if monitor is None:
raise ValueError('This blueprint expects you to provide '
@ -46,9 +46,11 @@ def get_transaction(tx_id):
A JSON string containing the data about the transaction.
"""
bigchain = current_app.config['bigchain']
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
tx = bigchain.get_transaction(tx_id)
return flask.jsonify(**tx)
@ -59,7 +61,7 @@ def create_transaction():
Return:
A JSON string containing the data about the transaction.
"""
bigchain = current_app.config['bigchain']
pool = current_app.config['bigchain_pool']
monitor = current_app.config['monitor']
val = {}
@ -68,16 +70,15 @@ def create_transaction():
# set to `application/json`
tx = request.get_json(force=True)
with pool() as bigchain:
if tx['transaction']['operation'] == 'CREATE':
tx = util.transform_create(tx)
tx = bigchain.consensus.sign_transaction(
tx, private_key=bigchain.me_private)
tx = bigchain.consensus.sign_transaction(tx, private_key=bigchain.me_private)
if not bigchain.consensus.verify_signature(tx):
val['error'] = 'Invalid transaction signature'
with monitor.timer('write_transaction',
rate=bigchaindb.config['statsd']['rate']):
with monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']):
val = bigchain.write_transaction(tx)
return flask.jsonify(**tx)

View File

@ -1,7 +1,33 @@
from bigchaindb import util
import pytest
import queue
@pytest.fixture
def mock_queue(monkeypatch):
class MockQueue:
items = []
def get(self, timeout=None):
try:
return self.items.pop()
except IndexError:
if timeout:
raise queue.Empty()
raise
def put(self, item):
self.items.append(item)
mockqueue = MockQueue()
monkeypatch.setattr('queue.Queue', lambda: mockqueue)
return mockqueue
def test_transform_create(b, user_private_key, user_public_key):
from bigchaindb import util
tx = util.create_tx(user_public_key, user_public_key, None, 'CREATE')
tx = util.transform_create(tx)
tx = util.sign_tx(tx, b.me_private)
@ -10,3 +36,88 @@ def test_transform_create(b, user_private_key, user_public_key):
assert tx['transaction']['new_owner'] == user_public_key
assert util.verify_signature(tx)
def test_empty_pool_is_populated_with_instances(mock_queue):
from bigchaindb import util
pool = util.pool(lambda: 'hello', 4)
assert len(mock_queue.items) == 0
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 1
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 2
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 3
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 4
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 4
def test_pool_blocks_if_no_instances_available(mock_queue):
from bigchaindb import util
pool = util.pool(lambda: 'hello', 4)
assert len(mock_queue.items) == 0
# We need to manually trigger the `__enter__` method so the context
# manager will "hang" and not return the resource to the pool
assert pool().__enter__() == 'hello'
assert len(mock_queue.items) == 0
assert pool().__enter__() == 'hello'
assert len(mock_queue.items) == 0
assert pool().__enter__() == 'hello'
assert len(mock_queue.items) == 0
# We need to keep a reference of the last context manager so we can
# manually release the resource
last = pool()
assert last.__enter__() == 'hello'
assert len(mock_queue.items) == 0
# This would block using `queue.Queue` but since we mocked it it will
# just raise a IndexError because it's trying to pop from an empty list.
with pytest.raises(IndexError):
assert pool().__enter__() == 'hello'
assert len(mock_queue.items) == 0
# Release the last resource
last.__exit__(None, None, None)
assert len(mock_queue.items) == 1
assert pool().__enter__() == 'hello'
assert len(mock_queue.items) == 0
def test_pool_raises_empty_exception_when_timeout(mock_queue):
from bigchaindb import util
pool = util.pool(lambda: 'hello', 1, timeout=1)
assert len(mock_queue.items) == 0
with pool() as instance:
assert instance == 'hello'
assert len(mock_queue.items) == 1
# take the only resource available
assert pool().__enter__() == 'hello'
with pytest.raises(queue.Empty):
with pool() as instance:
assert instance == 'hello'

View File

@ -25,7 +25,7 @@ def app(request, node_config):
restore_config(request, node_config)
from bigchaindb.web import server
app = server.create_app(debug=True)
app = server.create_app({'debug': True})
return app