diff --git a/bigchaindb/util.py b/bigchaindb/util.py index ac3bd12a..2da25f73 100644 --- a/bigchaindb/util.py +++ b/bigchaindb/util.py @@ -1,6 +1,9 @@ import json import time +import contextlib +import threading +import queue import multiprocessing as mp from datetime import datetime @@ -31,6 +34,53 @@ class ProcessGroup(object): self.processes.append(proc) +# Inspired by: +# - http://stackoverflow.com/a/24741694/597097 +def pool(builder, size, timeout=None): + """Create a pool that imposes a limit on the number of stored + instances. + + Args: + builder: a function to build an instance. + size: the size of the pool. + + Returns: + A context manager that can be used with the ``with`` + statement. + """ + + lock = threading.Lock() + local_pool = queue.Queue() + current_size = 0 + + @contextlib.contextmanager + def pooled(): + nonlocal current_size + instance = None + + # If we still have free slots, then we have room to create new + # instances. + if current_size < size: + with lock: + # We need to check again if we have slots available, since + # the situation might be different after acquiring the lock + if current_size < size: + current_size += 1 + instance = builder() + + # Watchout: current_size can be equal to size if the previous part of + # the function has been executed, that's why we need to check if the + # instance is None. + if instance is None and current_size == size: + instance = local_pool.get(timeout=timeout) + + yield instance + + local_pool.put(instance) + + return pooled + + def serialize(data): """Serialize a dict into a JSON formatted string. diff --git a/bigchaindb/web/server.py b/bigchaindb/web/server.py index b20ee049..34839bab 100644 --- a/bigchaindb/web/server.py +++ b/bigchaindb/web/server.py @@ -9,6 +9,7 @@ import multiprocessing from flask import Flask import gunicorn.app.base +from bigchaindb import util from bigchaindb import Bigchain from bigchaindb.web import views from bigchaindb.monitor import Monitor @@ -46,7 +47,7 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication): return self.application -def create_app(debug=False): +def create_app(settings): """Return an instance of the Flask application. Args: @@ -55,9 +56,12 @@ def create_app(debug=False): """ app = Flask(__name__) - app.debug = debug - app.config['bigchain'] = Bigchain() + + app.debug = settings.get('debug', False) + + app.config['bigchain_pool'] = util.pool(Bigchain, size=settings.get('threads', 4)) app.config['monitor'] = Monitor() + app.register_blueprint(views.basic_views, url_prefix='/api/v1') return app @@ -81,8 +85,8 @@ def create_server(settings): if not settings.get('threads'): settings['threads'] = (multiprocessing.cpu_count() * 2) + 1 - debug = settings.pop('debug', False) - app = create_app(debug) + app = create_app(settings) + settings.pop('debug', False) standalone = StandaloneApplication(app, settings) return standalone diff --git a/bigchaindb/web/views.py b/bigchaindb/web/views.py index 4a3b3691..f7a14078 100644 --- a/bigchaindb/web/views.py +++ b/bigchaindb/web/views.py @@ -22,12 +22,12 @@ def record(state): """This function checks if the blueprint can be initialized with the provided state.""" - bigchain = state.app.config.get('bigchain') + bigchain_pool = state.app.config.get('bigchain_pool') monitor = state.app.config.get('monitor') - if bigchain is None: - raise ValueError('This blueprint expects you to provide ' - 'database access through `bigchain`.') + if bigchain_pool is None: + raise Exception('This blueprint expects you to provide ' + 'a pool of Bigchain instances called `bigchain_pool`') if monitor is None: raise ValueError('This blueprint expects you to provide ' @@ -46,9 +46,11 @@ def get_transaction(tx_id): A JSON string containing the data about the transaction. """ - bigchain = current_app.config['bigchain'] + pool = current_app.config['bigchain_pool'] + + with pool() as bigchain: + tx = bigchain.get_transaction(tx_id) - tx = bigchain.get_transaction(tx_id) return flask.jsonify(**tx) @@ -59,7 +61,7 @@ def create_transaction(): Return: A JSON string containing the data about the transaction. """ - bigchain = current_app.config['bigchain'] + pool = current_app.config['bigchain_pool'] monitor = current_app.config['monitor'] val = {} @@ -68,17 +70,16 @@ def create_transaction(): # set to `application/json` tx = request.get_json(force=True) - if tx['transaction']['operation'] == 'CREATE': - tx = util.transform_create(tx) - tx = bigchain.consensus.sign_transaction( - tx, private_key=bigchain.me_private) + with pool() as bigchain: + if tx['transaction']['operation'] == 'CREATE': + tx = util.transform_create(tx) + tx = bigchain.consensus.sign_transaction(tx, private_key=bigchain.me_private) - if not bigchain.consensus.verify_signature(tx): - val['error'] = 'Invalid transaction signature' + if not bigchain.consensus.verify_signature(tx): + val['error'] = 'Invalid transaction signature' - with monitor.timer('write_transaction', - rate=bigchaindb.config['statsd']['rate']): - val = bigchain.write_transaction(tx) + with monitor.timer('write_transaction', rate=bigchaindb.config['statsd']['rate']): + val = bigchain.write_transaction(tx) return flask.jsonify(**tx) diff --git a/tests/test_util.py b/tests/test_util.py index f4708b59..22cc0c22 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,7 +1,33 @@ -from bigchaindb import util +import pytest +import queue + + +@pytest.fixture +def mock_queue(monkeypatch): + + class MockQueue: + items = [] + + def get(self, timeout=None): + try: + return self.items.pop() + except IndexError: + if timeout: + raise queue.Empty() + raise + + def put(self, item): + self.items.append(item) + + mockqueue = MockQueue() + + monkeypatch.setattr('queue.Queue', lambda: mockqueue) + return mockqueue def test_transform_create(b, user_private_key, user_public_key): + from bigchaindb import util + tx = util.create_tx(user_public_key, user_public_key, None, 'CREATE') tx = util.transform_create(tx) tx = util.sign_tx(tx, b.me_private) @@ -10,3 +36,88 @@ def test_transform_create(b, user_private_key, user_public_key): assert tx['transaction']['new_owner'] == user_public_key assert util.verify_signature(tx) + +def test_empty_pool_is_populated_with_instances(mock_queue): + from bigchaindb import util + + pool = util.pool(lambda: 'hello', 4) + + assert len(mock_queue.items) == 0 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 1 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 2 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 3 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 4 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 4 + + +def test_pool_blocks_if_no_instances_available(mock_queue): + from bigchaindb import util + + pool = util.pool(lambda: 'hello', 4) + + assert len(mock_queue.items) == 0 + + # We need to manually trigger the `__enter__` method so the context + # manager will "hang" and not return the resource to the pool + assert pool().__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + assert pool().__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + assert pool().__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + # We need to keep a reference of the last context manager so we can + # manually release the resource + last = pool() + assert last.__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + # This would block using `queue.Queue` but since we mocked it it will + # just raise a IndexError because it's trying to pop from an empty list. + with pytest.raises(IndexError): + assert pool().__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + # Release the last resource + last.__exit__(None, None, None) + assert len(mock_queue.items) == 1 + + assert pool().__enter__() == 'hello' + assert len(mock_queue.items) == 0 + + +def test_pool_raises_empty_exception_when_timeout(mock_queue): + from bigchaindb import util + + pool = util.pool(lambda: 'hello', 1, timeout=1) + + assert len(mock_queue.items) == 0 + + with pool() as instance: + assert instance == 'hello' + assert len(mock_queue.items) == 1 + + # take the only resource available + assert pool().__enter__() == 'hello' + + with pytest.raises(queue.Empty): + with pool() as instance: + assert instance == 'hello' + diff --git a/tests/web/conftest.py b/tests/web/conftest.py index 099f2fd3..bf1dd697 100644 --- a/tests/web/conftest.py +++ b/tests/web/conftest.py @@ -25,7 +25,7 @@ def app(request, node_config): restore_config(request, node_config) from bigchaindb.web import server - app = server.create_app(debug=True) + app = server.create_app({'debug': True}) return app