mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Merge branch 'master' into docs/1170/move-http-api-docs-to-top-level-section
This commit is contained in:
commit
d7ccc7c00f
@ -59,6 +59,10 @@ config = {
|
||||
'workers': None, # if none, the value will be cpu_count * 2 + 1
|
||||
'threads': None, # if none, the value will be cpu_count * 2 + 1
|
||||
},
|
||||
'wsserver': {
|
||||
'host': os.environ.get('BIGCHAINDB_WSSERVER_HOST') or 'localhost',
|
||||
'port': int(os.environ.get('BIGCHAINDB_WSSERVER_PORT', 9985)),
|
||||
},
|
||||
'database': _database_map[
|
||||
os.environ.get('BIGCHAINDB_DATABASE_BACKEND', 'rethinkdb')
|
||||
],
|
||||
|
@ -96,6 +96,10 @@ def run_configure(args, skip_if_exists=False):
|
||||
val = conf['server'][key]
|
||||
conf['server'][key] = input_on_stderr('API Server {}? (default `{}`): '.format(key, val), val)
|
||||
|
||||
for key in ('host', 'port'):
|
||||
val = conf['wsserver'][key]
|
||||
conf['wsserver'][key] = input_on_stderr('WebSocket Server {}? (default `{}`): '.format(key, val), val)
|
||||
|
||||
for key in database_keys:
|
||||
val = conf['database'][key]
|
||||
conf['database'][key] = input_on_stderr('Database {}? (default `{}`): '.format(key, val), val)
|
||||
|
33
bigchaindb/events.py
Normal file
33
bigchaindb/events.py
Normal file
@ -0,0 +1,33 @@
|
||||
from enum import Enum
|
||||
from multiprocessing import Queue
|
||||
|
||||
|
||||
class EventTypes(Enum):
|
||||
BLOCK_VALID = 1
|
||||
BLOCK_INVALID = 2
|
||||
|
||||
|
||||
class Event:
|
||||
|
||||
def __init__(self, event_type, event_data):
|
||||
self.type = event_type
|
||||
self.data = event_data
|
||||
|
||||
|
||||
class EventHandler:
|
||||
|
||||
def __init__(self, events_queue):
|
||||
self.events_queue = events_queue
|
||||
|
||||
def put_event(self, event, timeout=None):
|
||||
# TODO: handle timeouts
|
||||
self.events_queue.put(event, timeout=None)
|
||||
|
||||
def get_event(self, timeout=None):
|
||||
# TODO: handle timeouts
|
||||
return self.events_queue.get(timeout=None)
|
||||
|
||||
|
||||
def setup_events_queue():
|
||||
# TODO: set bounds to the queue
|
||||
return Queue()
|
@ -13,6 +13,7 @@ from bigchaindb import backend
|
||||
from bigchaindb.backend.changefeed import ChangeFeed
|
||||
from bigchaindb.models import Block
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.events import EventHandler, Event, EventTypes
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -22,8 +23,11 @@ logger_results = logging.getLogger('pipeline.election.results')
|
||||
class Election:
|
||||
"""Election class."""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, events_queue=None):
|
||||
self.bigchain = Bigchain()
|
||||
self.event_handler = None
|
||||
if events_queue:
|
||||
self.event_handler = EventHandler(events_queue)
|
||||
|
||||
def check_for_quorum(self, next_vote):
|
||||
"""
|
||||
@ -42,6 +46,7 @@ class Election:
|
||||
next_block = self.bigchain.get_block(block_id)
|
||||
|
||||
result = self.bigchain.block_election(next_block)
|
||||
self.handle_block_events(result, block_id)
|
||||
if result['status'] == self.bigchain.BLOCK_INVALID:
|
||||
return Block.from_dict(next_block)
|
||||
|
||||
@ -67,9 +72,21 @@ class Election:
|
||||
self.bigchain.write_transaction(tx)
|
||||
return invalid_block
|
||||
|
||||
def handle_block_events(self, result, block_id):
|
||||
if self.event_handler:
|
||||
if result['status'] == self.bigchain.BLOCK_UNDECIDED:
|
||||
return
|
||||
elif result['status'] == self.bigchain.BLOCK_INVALID:
|
||||
event_type = EventTypes.BLOCK_INVALID
|
||||
elif result['status'] == self.bigchain.BLOCK_VALID:
|
||||
event_type = EventTypes.BLOCK_VALID
|
||||
|
||||
def create_pipeline():
|
||||
election = Election()
|
||||
event = Event(event_type, self.bigchain.get_block(block_id))
|
||||
self.event_handler.put_event(event)
|
||||
|
||||
|
||||
def create_pipeline(events_queue=None):
|
||||
election = Election(events_queue=events_queue)
|
||||
|
||||
election_pipeline = Pipeline([
|
||||
Node(election.check_for_quorum),
|
||||
@ -84,8 +101,8 @@ def get_changefeed():
|
||||
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)
|
||||
|
||||
|
||||
def start():
|
||||
pipeline = create_pipeline()
|
||||
def start(events_queue=None):
|
||||
pipeline = create_pipeline(events_queue=events_queue)
|
||||
pipeline.setup(indata=get_changefeed())
|
||||
pipeline.start()
|
||||
return pipeline
|
||||
|
@ -3,7 +3,8 @@ import multiprocessing as mp
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.pipelines import vote, block, election, stale
|
||||
from bigchaindb.web import server
|
||||
from bigchaindb.events import setup_events_queue
|
||||
from bigchaindb.web import server, websocket_server
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -25,6 +26,13 @@ BANNER = """
|
||||
def start():
|
||||
logger.info('Initializing BigchainDB...')
|
||||
|
||||
# Create the events queue
|
||||
# The events queue needs to be initialized once and shared between
|
||||
# processes. This seems the best way to do it
|
||||
# At this point only the election processs and the event consumer require
|
||||
# this queue.
|
||||
events_queue = setup_events_queue()
|
||||
|
||||
# start the processes
|
||||
logger.info('Starting block')
|
||||
block.start()
|
||||
@ -36,12 +44,18 @@ def start():
|
||||
stale.start()
|
||||
|
||||
logger.info('Starting election')
|
||||
election.start()
|
||||
election.start(events_queue=events_queue)
|
||||
|
||||
# start the web api
|
||||
app_server = server.create_server(bigchaindb.config['server'])
|
||||
p_webapi = mp.Process(name='webapi', target=app_server.run)
|
||||
p_webapi.start()
|
||||
|
||||
logger.info('WebSocket server started')
|
||||
p_websocket_server = mp.Process(name='ws',
|
||||
target=websocket_server.start,
|
||||
args=(events_queue,))
|
||||
p_websocket_server.start()
|
||||
|
||||
# start message
|
||||
logger.info(BANNER.format(bigchaindb.config['server']['bind']))
|
||||
|
@ -5,6 +5,9 @@ import logging
|
||||
|
||||
from flask import jsonify, request
|
||||
|
||||
from bigchaindb import config
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -21,3 +24,8 @@ def make_error(status_code, message=None):
|
||||
def base_url():
|
||||
return '%s://%s/' % (request.environ['wsgi.url_scheme'],
|
||||
request.environ['HTTP_HOST'])
|
||||
|
||||
|
||||
def base_ws_uri():
|
||||
"""Base websocket uri."""
|
||||
return 'ws://{host}:{port}'.format(**config['wsserver'])
|
||||
|
@ -4,8 +4,9 @@ import flask
|
||||
from flask_restful import Resource
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.web.views.base import base_url
|
||||
from bigchaindb.web.views.base import base_url, base_ws_uri
|
||||
from bigchaindb import version
|
||||
from bigchaindb.web.websocket_server import EVENTS_ENDPOINT
|
||||
|
||||
|
||||
class RootIndex(Resource):
|
||||
@ -30,16 +31,18 @@ class RootIndex(Resource):
|
||||
class ApiV1Index(Resource):
|
||||
def get(self):
|
||||
api_root = base_url() + 'api/v1/'
|
||||
websocket_root = base_ws_uri() + EVENTS_ENDPOINT
|
||||
docs_url = [
|
||||
'https://docs.bigchaindb.com/projects/server/en/v',
|
||||
version.__version__,
|
||||
'/http-client-server-api.html',
|
||||
]
|
||||
return {
|
||||
return flask.jsonify({
|
||||
'_links': {
|
||||
'docs': ''.join(docs_url),
|
||||
'self': api_root,
|
||||
'statuses': api_root + 'statuses/',
|
||||
'transactions': api_root + 'transactions/',
|
||||
'streams_v1': websocket_root,
|
||||
},
|
||||
}
|
||||
})
|
||||
|
174
bigchaindb/web/websocket_server.py
Normal file
174
bigchaindb/web/websocket_server.py
Normal file
@ -0,0 +1,174 @@
|
||||
"""WebSocket server for the BigchainDB Event Stream API."""
|
||||
|
||||
# NOTE
|
||||
#
|
||||
# This module contains some functions and utilities that might belong to other
|
||||
# modules. For now, I prefer to keep everything in this module. Why? Because
|
||||
# those functions are needed only here.
|
||||
#
|
||||
# When we will extend this part of the project and we find that we need those
|
||||
# functionalities elsewhere, we can start creating new modules and organizing
|
||||
# things in a better way.
|
||||
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
|
||||
from bigchaindb import config
|
||||
from bigchaindb.events import EventTypes
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
POISON_PILL = 'POISON_PILL'
|
||||
EVENTS_ENDPOINT = '/api/v1/streams/valid_tx'
|
||||
|
||||
|
||||
def _put_into_capped_queue(queue, value):
|
||||
"""Put a new item in a capped queue.
|
||||
|
||||
If the queue reached its limit, get the first element
|
||||
ready and put the new one. Note that the first element
|
||||
will be lost (that's the purpose of a capped queue).
|
||||
|
||||
Args:
|
||||
queue: a queue
|
||||
value: the value to put
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
queue.put_nowait(value)
|
||||
except asyncio.QueueFull:
|
||||
queue.get_nowait()
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
|
||||
"""Bridge between a synchronous multiprocessing queue
|
||||
and an asynchronous asyncio queue.
|
||||
|
||||
Args:
|
||||
in_queue (multiprocessing.Queue): input queue
|
||||
out_queue (asyncio.Queue): output queue
|
||||
"""
|
||||
|
||||
while True:
|
||||
value = in_queue.get()
|
||||
loop.call_soon_threadsafe(_put_into_capped_queue, out_queue, value)
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
"""Dispatch events to websockets.
|
||||
|
||||
This class implements a simple publish/subscribe pattern.
|
||||
"""
|
||||
|
||||
def __init__(self, event_source):
|
||||
"""Create a new instance.
|
||||
|
||||
Args:
|
||||
event_source: a source of events. Elements in the queue
|
||||
should be strings.
|
||||
"""
|
||||
|
||||
self.event_source = event_source
|
||||
self.subscribers = {}
|
||||
|
||||
def subscribe(self, uuid, websocket):
|
||||
"""Add a websocket to the list of subscribers.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the websocket.
|
||||
websocket: the websocket to publish information.
|
||||
"""
|
||||
|
||||
self.subscribers[uuid] = websocket
|
||||
|
||||
@asyncio.coroutine
|
||||
def publish(self):
|
||||
"""Publish new events to the subscribers."""
|
||||
|
||||
while True:
|
||||
event = yield from self.event_source.get()
|
||||
str_buffer = []
|
||||
|
||||
if event == POISON_PILL:
|
||||
return
|
||||
|
||||
if isinstance(event, str):
|
||||
str_buffer.append(event)
|
||||
|
||||
elif event.type == EventTypes.BLOCK_VALID:
|
||||
block = event.data
|
||||
|
||||
for tx in block['block']['transactions']:
|
||||
asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id']
|
||||
data = {'block_id': block['id'],
|
||||
'asset_id': asset_id,
|
||||
'tx_id': tx['id']}
|
||||
str_buffer.append(json.dumps(data))
|
||||
|
||||
for _, websocket in self.subscribers.items():
|
||||
for str_item in str_buffer:
|
||||
websocket.send_str(str_item)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def websocket_handler(request):
|
||||
"""Handle a new socket connection."""
|
||||
|
||||
logger.debug('New websocket connection.')
|
||||
websocket = web.WebSocketResponse()
|
||||
yield from websocket.prepare(request)
|
||||
uuid = uuid4()
|
||||
request.app['dispatcher'].subscribe(uuid, websocket)
|
||||
|
||||
while True:
|
||||
# Consume input buffer
|
||||
msg = yield from websocket.receive()
|
||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||
logger.debug('Websocket exception: %s', websocket.exception())
|
||||
return
|
||||
|
||||
|
||||
def init_app(event_source, *, loop=None):
|
||||
"""Init the application server.
|
||||
|
||||
Return:
|
||||
An aiohttp application.
|
||||
"""
|
||||
|
||||
dispatcher = Dispatcher(event_source)
|
||||
|
||||
# Schedule the dispatcher
|
||||
loop.create_task(dispatcher.publish())
|
||||
|
||||
app = web.Application(loop=loop)
|
||||
app['dispatcher'] = dispatcher
|
||||
app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
|
||||
return app
|
||||
|
||||
|
||||
def start(sync_event_source, loop=None):
|
||||
"""Create and start the WebSocket server."""
|
||||
|
||||
if not loop:
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
event_source = asyncio.Queue(maxsize=1024, loop=loop)
|
||||
|
||||
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
|
||||
args=(sync_event_source, event_source, loop),
|
||||
daemon=True)
|
||||
bridge.start()
|
||||
|
||||
app = init_app(event_source, loop=loop)
|
||||
aiohttp.web.run_app(app,
|
||||
host=config['wsserver']['host'],
|
||||
port=config['wsserver']['port'])
|
@ -25,9 +25,16 @@ The (single) output of a threshold condition can be used as one of the inputs of
|
||||
When one creates a condition, one can calculate its fulfillment length (e.g.
|
||||
96). The more complex the condition, the larger its fulfillment length will be.
|
||||
A BigchainDB federation can put an upper limit on the complexity of the
|
||||
conditions, either directly by setting an allowed maximum fulfillment length,
|
||||
or indirectly by setting a maximum allowed transaction size which would limit
|
||||
conditions, either directly by setting a maximum allowed fulfillment length,
|
||||
or
|
||||
`indirectly <https://github.com/bigchaindb/bigchaindb/issues/356#issuecomment-288085251>`_
|
||||
by :ref:`setting a maximum allowed transaction size <Enforcing a Max Transaction Size>`
|
||||
which would limit
|
||||
the overall complexity accross all inputs and outputs of a transaction.
|
||||
Note: At the time of writing, there was no configuration setting
|
||||
to set a maximum allowed fulfillment length,
|
||||
so the only real option was to
|
||||
:ref:`set a maximum allowed transaction size <Enforcing a Max Transaction Size>`.
|
||||
|
||||
If someone tries to make a condition where the output of a threshold condition feeds into the input of another “earlier” threshold condition (i.e. in a closed logical circuit), then their computer will take forever to calculate the (infinite) “condition URI”, at least in theory. In practice, their computer will run out of memory or their client software will timeout after a while.
|
||||
|
||||
|
@ -14,7 +14,6 @@ community projects listed below.
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
websocket-event-stream-api
|
||||
The Python Driver <https://docs.bigchaindb.com/projects/py-driver/en/latest/index.html>
|
||||
Transaction CLI <https://docs.bigchaindb.com/projects/cli/en/latest/>
|
||||
|
||||
|
@ -12,6 +12,7 @@ BigchainDB Server Documentation
|
||||
dev-and-test/index
|
||||
server-reference/index
|
||||
http-client-server-api
|
||||
websocket-event-stream-api
|
||||
drivers-clients/index
|
||||
clusters-feds/index
|
||||
data-models/index
|
||||
|
@ -8,3 +8,5 @@ Production Nodes
|
||||
node-components
|
||||
node-requirements
|
||||
setup-run-node
|
||||
reverse-proxy-notes
|
||||
|
72
docs/server/source/production-nodes/reverse-proxy-notes.md
Normal file
72
docs/server/source/production-nodes/reverse-proxy-notes.md
Normal file
@ -0,0 +1,72 @@
|
||||
# Using a Reverse Proxy
|
||||
|
||||
You may want to:
|
||||
|
||||
* rate limit inbound HTTP requests,
|
||||
* authenticate/authorize inbound HTTP requests,
|
||||
* block requests with an HTTP request body that's too large, or
|
||||
* enable HTTPS (TLS) between your users and your node.
|
||||
|
||||
While we could have built all that into BigchainDB Server,
|
||||
we didn't, because you can do all that (and more)
|
||||
using a reverse proxy such as NGINX or HAProxy.
|
||||
(You would put it in front of your BigchainDB Server,
|
||||
so that all inbound HTTP requests would arrive
|
||||
at the reverse proxy before *maybe* being proxied
|
||||
onwards to your BigchainDB Server.)
|
||||
For detailed instructions, see the documentation
|
||||
for your reverse proxy.
|
||||
|
||||
Below, we note how a reverse proxy can be used
|
||||
to do some BigchainDB-specific things.
|
||||
|
||||
You may also be interested in
|
||||
[our NGINX configuration file template](https://github.com/bigchaindb/nginx_3scale/blob/master/nginx.conf.template)
|
||||
(open source, on GitHub).
|
||||
|
||||
|
||||
## Enforcing a Max Transaction Size
|
||||
|
||||
The BigchainDB HTTP API has several endpoints,
|
||||
but only one of them, the `POST /transactions` endpoint,
|
||||
expects a non-empty HTTP request body:
|
||||
the transaction (JSON) being submitted by the user.
|
||||
|
||||
If you want to enforce a maximum-allowed transaction size
|
||||
(discarding any that are larger),
|
||||
then you can do so by configuring a maximum request body size
|
||||
in your reverse proxy.
|
||||
For example, NGINX has the `client_max_body_size`
|
||||
configuration setting. You could set it to 15 kB
|
||||
with the following line in your NGINX config file:
|
||||
|
||||
```text
|
||||
client_max_body_size 15k;
|
||||
```
|
||||
|
||||
For more information, see
|
||||
[the NGINX docs about client_max_body_size](https://nginx.org/en/docs/http/ngx_http_core_module.html#client_max_body_size).
|
||||
|
||||
Note: By enforcing a maximum transaction size, you
|
||||
[indirectly enforce a maximum crypto-conditions complexity](https://github.com/bigchaindb/bigchaindb/issues/356#issuecomment-288085251).
|
||||
|
||||
|
||||
**Aside: Why 15 kB?**
|
||||
|
||||
Both [RethinkDB](https://rethinkdb.com/limitations/) and
|
||||
[MongoDB have a maximum document size of 16 MB](https://docs.mongodb.com/manual/reference/limits/#limit-bson-document-size).
|
||||
In BigchainDB, the biggest documents are the blocks.
|
||||
A BigchainDB block can contain up to 1000 transactions,
|
||||
plus some other data (e.g. the timestamp).
|
||||
If we ignore the other data as negligible relative to all the transactions,
|
||||
then a block of size 16 MB
|
||||
will have an average transaction size of (16 MB)/1000 = 16 kB.
|
||||
Therefore by limiting the max transaction size to 15 kB,
|
||||
you can be fairly sure that no blocks will ever be
|
||||
bigger than 16 MB.
|
||||
|
||||
Note: Technically, the documents that MongoDB stores aren't the JSON
|
||||
that BigchainDB users think of; they're JSON converted to BSON.
|
||||
Moreover, [one can use GridFS with MongoDB to store larger documents](https://docs.mongodb.com/manual/core/gridfs/).
|
||||
Therefore the above calculation shoud be seen as a rough guide,
|
||||
not the last word.
|
@ -16,7 +16,10 @@ For convenience, here's a list of all the relevant environment variables (docume
|
||||
`BIGCHAINDB_DATABASE_PORT`<br>
|
||||
`BIGCHAINDB_DATABASE_NAME`<br>
|
||||
`BIGCHAINDB_DATABASE_REPLICASET`<br>
|
||||
`BIGCHAINDB_DATABASE_CONNECTION_TIMEOUT`<br>
|
||||
`BIGCHAINDB_DATABASE_MAX_TRIES`<br>
|
||||
`BIGCHAINDB_SERVER_BIND`<br>
|
||||
`BIGCHAINDB_SERVER_LOGLEVEL`<br>
|
||||
`BIGCHAINDB_SERVER_WORKERS`<br>
|
||||
`BIGCHAINDB_SERVER_THREADS`<br>
|
||||
`BIGCHAINDB_CONFIG_PATH`<br>
|
||||
@ -84,9 +87,18 @@ Note how the keys in the list are separated by colons.
|
||||
```
|
||||
|
||||
|
||||
## database.backend, database.host, database.port, database.name & database.replicaset
|
||||
## database.*
|
||||
|
||||
The database backend to use (`rethinkdb` or `mongodb`) and its hostname, port and name. If the database backend is `mongodb`, then there's a fifth setting: the name of the replica set. If the database backend is `rethinkdb`, you *can* set the name of the replica set, but it won't be used for anything.
|
||||
The settings with names of the form `database.*` are for the database backend
|
||||
(currently either RethinkDB or MongoDB). They are:
|
||||
|
||||
* `database.backend` is either `rethinkdb` or `mongodb`.
|
||||
* `database.host` is the hostname (FQDN) of the backend database.
|
||||
* `database.port` is self-explanatory.
|
||||
* `database.name` is a user-chosen name for the database inside RethinkDB or MongoDB, e.g. `bigchain`.
|
||||
* `database.replicaset` is only relevant if using MongoDB; it's the name of the MongoDB replica set, e.g. `bigchain-rs`.
|
||||
* `database.connection_timeout` is the maximum number of milliseconds that BigchainDB will wait before giving up on one attempt to connect to the database backend. Note: At the time of writing, this setting was only used by MongoDB; there was an open [issue to make RethinkDB use it as well](https://github.com/bigchaindb/bigchaindb/issues/1337).
|
||||
* `database.max_tries` is the maximum number of times that BigchainDB will try to establish a connection with the database backend. If 0, then it will try forever.
|
||||
|
||||
**Example using environment variables**
|
||||
```text
|
||||
@ -95,6 +107,8 @@ export BIGCHAINDB_DATABASE_HOST=localhost
|
||||
export BIGCHAINDB_DATABASE_PORT=27017
|
||||
export BIGCHAINDB_DATABASE_NAME=bigchain
|
||||
export BIGCHAINDB_DATABASE_REPLICASET=bigchain-rs
|
||||
export BIGCHAINDB_DATABASE_CONNECTION_TIMEOUT=5000
|
||||
export BIGCHAINDB_DATABASE_MAX_TRIES=3
|
||||
```
|
||||
|
||||
**Default values**
|
||||
@ -104,8 +118,10 @@ If (no environment variables were set and there's no local config file), or you
|
||||
"database": {
|
||||
"backend": "rethinkdb",
|
||||
"host": "localhost",
|
||||
"port": 28015,
|
||||
"name": "bigchain",
|
||||
"port": 28015
|
||||
"connection_timeout": 5000,
|
||||
"max_tries": 3
|
||||
}
|
||||
```
|
||||
|
||||
@ -114,24 +130,31 @@ If you used `bigchaindb -y configure mongodb` to create a default local config f
|
||||
"database": {
|
||||
"backend": "mongodb",
|
||||
"host": "localhost",
|
||||
"name": "bigchain",
|
||||
"port": 27017,
|
||||
"replicaset": "bigchain-rs"
|
||||
"name": "bigchain",
|
||||
"replicaset": "bigchain-rs",
|
||||
"connection_timeout": 5000,
|
||||
"max_tries": 3
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## server.bind, server.workers & server.threads
|
||||
## server.bind, server.loglevel, server.workers & server.threads
|
||||
|
||||
These settings are for the [Gunicorn HTTP server](http://gunicorn.org/), which is used to serve the [HTTP client-server API](../http-client-server-api.html).
|
||||
|
||||
`server.bind` is where to bind the Gunicorn HTTP server socket. It's a string. It can be any valid value for [Gunicorn's bind setting](http://docs.gunicorn.org/en/stable/settings.html#bind). If you want to allow IPv4 connections from anyone, on port 9984, use '0.0.0.0:9984'. In a production setting, we recommend you use Gunicorn behind a reverse proxy server. If Gunicorn and the reverse proxy are running on the same machine, then use 'localhost:PORT' where PORT is _not_ 9984 (because the reverse proxy needs to listen on port 9984). Maybe use PORT=9983 in that case because we know 9983 isn't used. If Gunicorn and the reverse proxy are running on different machines, then use 'A.B.C.D:9984' where A.B.C.D is the IP address of the reverse proxy. There's [more information about deploying behind a reverse proxy in the Gunicorn documentation](http://docs.gunicorn.org/en/stable/deploy.html). (They call it a proxy.)
|
||||
|
||||
`server.loglevel` sets the log level of Gunicorn's Error log outputs. See
|
||||
[Gunicorn's documentation](http://docs.gunicorn.org/en/latest/settings.html#loglevel)
|
||||
for more information.
|
||||
|
||||
`server.workers` is [the number of worker processes](http://docs.gunicorn.org/en/stable/settings.html#workers) for handling requests. If `None` (the default), the value will be (cpu_count * 2 + 1). `server.threads` is [the number of threads-per-worker](http://docs.gunicorn.org/en/stable/settings.html#threads) for handling requests. If `None` (the default), the value will be (cpu_count * 2 + 1). The HTTP server will be able to handle `server.workers` * `server.threads` requests simultaneously.
|
||||
|
||||
**Example using environment variables**
|
||||
```text
|
||||
export BIGCHAINDB_SERVER_BIND=0.0.0.0:9984
|
||||
export BIGCHAINDB_SERVER_LOGLEVEL=debug
|
||||
export BIGCHAINDB_SERVER_WORKERS=5
|
||||
export BIGCHAINDB_SERVER_THREADS=5
|
||||
```
|
||||
@ -140,6 +163,7 @@ export BIGCHAINDB_SERVER_THREADS=5
|
||||
```js
|
||||
"server": {
|
||||
"bind": "0.0.0.0:9984",
|
||||
"loglevel": "debug",
|
||||
"workers": 5,
|
||||
"threads": 5
|
||||
}
|
||||
@ -149,6 +173,7 @@ export BIGCHAINDB_SERVER_THREADS=5
|
||||
```js
|
||||
"server": {
|
||||
"bind": "localhost:9984",
|
||||
"loglevel": "info",
|
||||
"workers": null,
|
||||
"threads": null
|
||||
}
|
||||
|
@ -2,7 +2,9 @@ The WebSocket Event Stream API
|
||||
==============================
|
||||
|
||||
.. important::
|
||||
This is currently scheduled to be implemented in BigchainDB Server 0.10.
|
||||
The WebSocket Event Stream runs on a different port than the Web API. The
|
||||
default port for the Web API is `9984`, while the one for the Event Stream
|
||||
is `9985`.
|
||||
|
||||
BigchainDB provides real-time event streams over the WebSocket protocol with
|
||||
the Event Stream API.
|
||||
@ -28,7 +30,7 @@ response contains a ``streams_<version>`` property in ``_links``::
|
||||
|
||||
{
|
||||
"_links": {
|
||||
"streams_v1": "ws://example.com:9984/api/v1/streams/"
|
||||
"streams_v1": "ws://example.com:9985/api/v1/streams/"
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,9 +82,9 @@ the transaction's ID, associated asset ID, and containing block's ID.
|
||||
Example message::
|
||||
|
||||
{
|
||||
"txid": "<sha3-256 hash>",
|
||||
"assetid": "<sha3-256 hash>",
|
||||
"blockid": "<sha3-256 hash>"
|
||||
"tx_id": "<sha3-256 hash>",
|
||||
"asset_id": "<sha3-256 hash>",
|
||||
"block_id": "<sha3-256 hash>"
|
||||
}
|
||||
|
||||
|
2
setup.py
2
setup.py
@ -54,6 +54,7 @@ tests_require = [
|
||||
'pytest-mock',
|
||||
'pytest-xdist',
|
||||
'pytest-flask',
|
||||
'pytest-aiohttp',
|
||||
'tox',
|
||||
] + docs_require
|
||||
|
||||
@ -76,6 +77,7 @@ install_requires = [
|
||||
'multipipes~=0.1.0',
|
||||
'jsonschema~=2.5.1',
|
||||
'pyyaml~=3.12',
|
||||
'aiohttp~=2.0',
|
||||
]
|
||||
|
||||
setup(
|
||||
|
@ -124,10 +124,39 @@ def test_bigchain_export_my_pubkey_when_pubkey_not_set(monkeypatch):
|
||||
"This node's public key wasn't set anywhere so it can't be exported"
|
||||
|
||||
|
||||
def test_bigchain_run_init_when_db_exists(mock_db_init_with_existing_db):
|
||||
def test_bigchain_run_init_when_db_exists(mocker, capsys):
|
||||
from bigchaindb.commands.bigchaindb import run_init
|
||||
from bigchaindb.common.exceptions import DatabaseAlreadyExists
|
||||
init_db_mock = mocker.patch(
|
||||
'bigchaindb.commands.bigchaindb.schema.init_database',
|
||||
autospec=True,
|
||||
spec_set=True,
|
||||
)
|
||||
init_db_mock.side_effect = DatabaseAlreadyExists
|
||||
args = Namespace(config=None)
|
||||
run_init(args)
|
||||
output_message = capsys.readouterr()[1]
|
||||
print(output_message)
|
||||
assert output_message == (
|
||||
'The database already exists.\n'
|
||||
'If you wish to re-initialize it, first drop it.\n'
|
||||
)
|
||||
|
||||
|
||||
def test__run_init(mocker):
|
||||
from bigchaindb.commands.bigchaindb import _run_init
|
||||
bigchain_mock = mocker.patch(
|
||||
'bigchaindb.commands.bigchaindb.bigchaindb.Bigchain')
|
||||
init_db_mock = mocker.patch(
|
||||
'bigchaindb.commands.bigchaindb.schema.init_database',
|
||||
autospec=True,
|
||||
spec_set=True,
|
||||
)
|
||||
_run_init()
|
||||
bigchain_mock.assert_called_once_with()
|
||||
init_db_mock.assert_called_once_with(
|
||||
connection=bigchain_mock.return_value.connection)
|
||||
bigchain_mock.return_value.create_genesis_block.assert_called_once_with()
|
||||
|
||||
|
||||
@patch('bigchaindb.backend.schema.drop_database')
|
||||
|
@ -199,3 +199,27 @@ def test_full_pipeline(b, user_pk):
|
||||
tx_from_block = set([tx.id for tx in invalid_block.transactions])
|
||||
tx_from_backlog = set([tx['id'] for tx in list(query.get_stale_transactions(b.connection, 0))])
|
||||
assert tx_from_block == tx_from_backlog
|
||||
|
||||
|
||||
def test_handle_block_events():
|
||||
from bigchaindb.events import setup_events_queue, EventTypes
|
||||
|
||||
events_queue = setup_events_queue()
|
||||
e = election.Election(events_queue=events_queue)
|
||||
block_id = 'a' * 64
|
||||
|
||||
assert events_queue.qsize() == 0
|
||||
|
||||
# no event should be emitted in case a block is undecided
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_UNDECIDED}, block_id)
|
||||
assert events_queue.qsize() == 0
|
||||
|
||||
# put an invalid block event in the queue
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_INVALID}, block_id)
|
||||
event = e.event_handler.get_event()
|
||||
assert event.type == EventTypes.BLOCK_INVALID
|
||||
|
||||
# put a valid block event in the queue
|
||||
e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id)
|
||||
event = e.event_handler.get_event()
|
||||
assert event.type == EventTypes.BLOCK_VALID
|
||||
|
@ -144,6 +144,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
|
||||
DATABASE_PORT = 4242
|
||||
DATABASE_BACKEND = request.config.getoption('--database-backend')
|
||||
SERVER_BIND = '1.2.3.4:56'
|
||||
WSSERVER_HOST = '1.2.3.4'
|
||||
WSSERVER_PORT = 57
|
||||
KEYRING = 'pubkey_0:pubkey_1:pubkey_2'
|
||||
|
||||
file_config = {
|
||||
@ -157,6 +159,8 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
|
||||
'BIGCHAINDB_DATABASE_PORT': str(DATABASE_PORT),
|
||||
'BIGCHAINDB_DATABASE_BACKEND': DATABASE_BACKEND,
|
||||
'BIGCHAINDB_SERVER_BIND': SERVER_BIND,
|
||||
'BIGCHAINDB_WSSERVER_HOST': WSSERVER_HOST,
|
||||
'BIGCHAINDB_WSSERVER_PORT': WSSERVER_PORT,
|
||||
'BIGCHAINDB_KEYRING': KEYRING})
|
||||
|
||||
import bigchaindb
|
||||
@ -198,6 +202,10 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
|
||||
'workers': None,
|
||||
'threads': None,
|
||||
},
|
||||
'wsserver': {
|
||||
'host': WSSERVER_HOST,
|
||||
'port': WSSERVER_PORT,
|
||||
},
|
||||
'database': database,
|
||||
'keypair': {
|
||||
'public': None,
|
||||
|
21
tests/test_events.py
Normal file
21
tests/test_events.py
Normal file
@ -0,0 +1,21 @@
|
||||
def tests_event_handler():
|
||||
from bigchaindb.events import (EventTypes, Event, EventHandler,
|
||||
setup_events_queue)
|
||||
|
||||
# create and event
|
||||
event_data = {'msg': 'some data'}
|
||||
event = Event(EventTypes.BLOCK_VALID, event_data)
|
||||
# create the events queue
|
||||
events_queue = setup_events_queue()
|
||||
|
||||
# create event handler
|
||||
event_handler = EventHandler(events_queue)
|
||||
|
||||
# push and event to the queue
|
||||
event_handler.put_event(event)
|
||||
|
||||
# get the event from the queue
|
||||
event_from_queue = event_handler.get_event()
|
||||
|
||||
assert event_from_queue.type == event.type
|
||||
assert event_from_queue.data == event.data
|
@ -9,14 +9,16 @@ from bigchaindb.pipelines import vote, block, election, stale
|
||||
@patch.object(block, 'start')
|
||||
@patch.object(vote, 'start')
|
||||
@patch.object(Process, 'start')
|
||||
def test_processes_start(mock_vote, mock_block, mock_election, mock_stale,
|
||||
mock_process):
|
||||
@patch('bigchaindb.events.setup_events_queue', spec_set=True, autospec=True)
|
||||
def test_processes_start(mock_setup_events_queue, mock_process, mock_vote,
|
||||
mock_block, mock_election, mock_stale):
|
||||
from bigchaindb import processes
|
||||
|
||||
processes.start()
|
||||
|
||||
mock_vote.assert_called_with()
|
||||
mock_block.assert_called_with()
|
||||
mock_election.assert_called_with()
|
||||
mock_stale.assert_called_with()
|
||||
mock_process.assert_called_with()
|
||||
mock_election.assert_called_once_with(
|
||||
events_queue=mock_setup_events_queue.return_value)
|
||||
|
@ -31,5 +31,6 @@ def test_api_v1_endpoint(client):
|
||||
'self': 'http://localhost/api/v1/',
|
||||
'statuses': 'http://localhost/api/v1/statuses/',
|
||||
'transactions': 'http://localhost/api/v1/transactions/',
|
||||
'streams_v1': 'ws://localhost:9985/api/v1/streams/valid_tx',
|
||||
}
|
||||
}
|
||||
|
238
tests/web/test_websocket_server.py
Normal file
238
tests/web/test_websocket_server.py
Normal file
@ -0,0 +1,238 @@
|
||||
import asyncio
|
||||
import json
|
||||
import queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _block(b, request):
|
||||
from bigchaindb.models import Transaction
|
||||
total = getattr(request, 'param', 1)
|
||||
transactions = [
|
||||
Transaction.create(
|
||||
[b.me],
|
||||
[([b.me], 1)],
|
||||
metadata={'msg': random.random()},
|
||||
).sign([b.me_private])
|
||||
for _ in range(total)
|
||||
]
|
||||
return b.create_block(transactions)
|
||||
|
||||
|
||||
class MockWebSocket:
|
||||
def __init__(self):
|
||||
self.received = []
|
||||
|
||||
def send_str(self, s):
|
||||
self.received.append(s)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_bridge_sync_async_queue(loop):
|
||||
from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio
|
||||
|
||||
sync_queue = queue.Queue()
|
||||
async_queue = asyncio.Queue(loop=loop)
|
||||
|
||||
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
|
||||
args=(sync_queue, async_queue, loop),
|
||||
daemon=True)
|
||||
bridge.start()
|
||||
|
||||
sync_queue.put('fahren')
|
||||
sync_queue.put('auf')
|
||||
sync_queue.put('der')
|
||||
sync_queue.put('Autobahn')
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'fahren'
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'auf'
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'der'
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'Autobahn'
|
||||
|
||||
assert async_queue.qsize() == 0
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_put_into_capped_queue(loop):
|
||||
from bigchaindb.web.websocket_server import _put_into_capped_queue
|
||||
q = asyncio.Queue(maxsize=2, loop=loop)
|
||||
|
||||
_put_into_capped_queue(q, 'Friday')
|
||||
assert q._queue[0] == 'Friday'
|
||||
|
||||
_put_into_capped_queue(q, "I'm")
|
||||
assert q._queue[0] == 'Friday'
|
||||
assert q._queue[1] == "I'm"
|
||||
|
||||
_put_into_capped_queue(q, 'in')
|
||||
assert q._queue[0] == "I'm"
|
||||
assert q._queue[1] == 'in'
|
||||
|
||||
_put_into_capped_queue(q, 'love')
|
||||
assert q._queue[0] == 'in'
|
||||
assert q._queue[1] == 'love'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_capped_queue(loop):
|
||||
from bigchaindb.web.websocket_server import _multiprocessing_to_asyncio
|
||||
|
||||
sync_queue = queue.Queue()
|
||||
async_queue = asyncio.Queue(maxsize=2, loop=loop)
|
||||
|
||||
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
|
||||
args=(sync_queue, async_queue, loop),
|
||||
daemon=True)
|
||||
bridge.start()
|
||||
|
||||
sync_queue.put('we')
|
||||
sync_queue.put('are')
|
||||
sync_queue.put('the')
|
||||
sync_queue.put('robots')
|
||||
|
||||
# Wait until the thread processes all the items
|
||||
time.sleep(1)
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'the'
|
||||
|
||||
result = yield from async_queue.get()
|
||||
assert result == 'robots'
|
||||
|
||||
assert async_queue.qsize() == 0
|
||||
|
||||
|
||||
@patch('threading.Thread')
|
||||
@patch('aiohttp.web.run_app')
|
||||
@patch('bigchaindb.web.websocket_server.init_app')
|
||||
@patch('asyncio.get_event_loop', return_value='event-loop')
|
||||
@patch('asyncio.Queue', return_value='event-queue')
|
||||
def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock,
|
||||
init_app_mock, run_app_mock,
|
||||
thread_mock):
|
||||
from bigchaindb import config
|
||||
from bigchaindb.web.websocket_server import start, _multiprocessing_to_asyncio
|
||||
|
||||
start(None)
|
||||
thread_mock.assert_called_once_with(
|
||||
target=_multiprocessing_to_asyncio,
|
||||
args=(None, queue_mock.return_value, get_event_loop_mock.return_value),
|
||||
daemon=True,
|
||||
)
|
||||
thread_mock.return_value.start.assert_called_once_with()
|
||||
init_app_mock.assert_called_with('event-queue', loop='event-loop')
|
||||
run_app_mock.assert_called_once_with(
|
||||
init_app_mock.return_value,
|
||||
host=config['wsserver']['host'],
|
||||
port=config['wsserver']['port'],
|
||||
)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_websocket_string_event(test_client, loop):
|
||||
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
||||
|
||||
event_source = asyncio.Queue(loop=loop)
|
||||
app = init_app(event_source, loop=loop)
|
||||
client = yield from test_client(app)
|
||||
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
|
||||
|
||||
yield from event_source.put('hack')
|
||||
yield from event_source.put('the')
|
||||
yield from event_source.put('planet!')
|
||||
|
||||
result = yield from ws.receive()
|
||||
assert result.data == 'hack'
|
||||
|
||||
result = yield from ws.receive()
|
||||
assert result.data == 'the'
|
||||
|
||||
result = yield from ws.receive()
|
||||
assert result.data == 'planet!'
|
||||
|
||||
yield from event_source.put(POISON_PILL)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
@pytest.mark.parametrize('_block', (10,), indirect=('_block',), ids=('block',))
|
||||
def test_websocket_block_event(b, _block, test_client, loop):
|
||||
from bigchaindb import events
|
||||
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
||||
|
||||
event_source = asyncio.Queue(loop=loop)
|
||||
app = init_app(event_source, loop=loop)
|
||||
client = yield from test_client(app)
|
||||
ws = yield from client.ws_connect(EVENTS_ENDPOINT)
|
||||
block = _block.to_dict()
|
||||
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
||||
|
||||
yield from event_source.put(block_event)
|
||||
|
||||
for tx in block['block']['transactions']:
|
||||
result = yield from ws.receive()
|
||||
json_result = json.loads(result.data)
|
||||
assert json_result['tx_id'] == tx['id']
|
||||
# Since the transactions are all CREATEs, asset id == transaction id
|
||||
assert json_result['asset_id'] == tx['id']
|
||||
assert json_result['block_id'] == block['id']
|
||||
|
||||
yield from event_source.put(POISON_PILL)
|
||||
|
||||
|
||||
@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang')
|
||||
@pytest.mark.genesis
|
||||
def test_integration_from_webapi_to_websocket(monkeypatch, client, loop):
|
||||
# XXX: I think that the `pytest-aiohttp` plugin is sparkling too much
|
||||
# magic in the `asyncio` module: running this test without monkey-patching
|
||||
# `asycio.get_event_loop` (and without the `loop` fixture) raises a:
|
||||
# RuntimeError: There is no current event loop in thread 'MainThread'.
|
||||
#
|
||||
# That's pretty weird because this test doesn't use the pytest-aiohttp
|
||||
# plugin explicitely.
|
||||
monkeypatch.setattr('asyncio.get_event_loop', lambda: loop)
|
||||
|
||||
import json
|
||||
import random
|
||||
import aiohttp
|
||||
|
||||
from bigchaindb.common import crypto
|
||||
from bigchaindb import processes
|
||||
from bigchaindb.models import Transaction
|
||||
|
||||
# Start BigchainDB
|
||||
processes.start()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
import time
|
||||
time.sleep(1)
|
||||
|
||||
ws_url = client.get('http://localhost:9984/api/v1/').json['_links']['streams_v1']
|
||||
|
||||
# Connect to the WebSocket endpoint
|
||||
session = aiohttp.ClientSession()
|
||||
ws = loop.run_until_complete(session.ws_connect(ws_url))
|
||||
|
||||
# Create a keypair and generate a new asset
|
||||
user_priv, user_pub = crypto.generate_key_pair()
|
||||
asset = {'random': random.random()}
|
||||
tx = Transaction.create([user_pub], [([user_pub], 1)], asset=asset)
|
||||
tx = tx.sign([user_priv])
|
||||
# Post the transaction to the BigchainDB Web API
|
||||
client.post('/api/v1/transactions/', data=json.dumps(tx.to_dict()))
|
||||
|
||||
result = loop.run_until_complete(ws.receive())
|
||||
json_result = json.loads(result.data)
|
||||
assert json_result['tx_id'] == tx.id
|
Loading…
x
Reference in New Issue
Block a user