Merge remote-tracking branch 'origin/master' into k8s-node

This commit is contained in:
Troy McConaghy 2017-03-05 17:20:33 +01:00
commit 6cda75b44b
14 changed files with 337 additions and 132 deletions

View File

@ -10,6 +10,8 @@ _database_rethinkdb = {
'host': os.environ.get('BIGCHAINDB_DATABASE_HOST', 'localhost'),
'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 28015)),
'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'),
'connection_timeout': 5000,
'max_tries': 3,
}
_database_mongodb = {
@ -18,6 +20,8 @@ _database_mongodb = {
'port': int(os.environ.get('BIGCHAINDB_DATABASE_PORT', 27017)),
'name': os.environ.get('BIGCHAINDB_DATABASE_NAME', 'bigchain'),
'replicaset': os.environ.get('BIGCHAINDB_DATABASE_REPLICASET', 'bigchain-rs'),
'connection_timeout': 5000,
'max_tries': 3,
}
_database_map = {

View File

@ -1,8 +1,10 @@
from itertools import repeat
from importlib import import_module
import logging
import bigchaindb
from bigchaindb.common.exceptions import ConfigurationError
from bigchaindb.backend.exceptions import ConnectionError
BACKENDS = {
@ -13,7 +15,8 @@ BACKENDS = {
logger = logging.getLogger(__name__)
def connect(backend=None, host=None, port=None, name=None, replicaset=None):
def connect(backend=None, host=None, port=None, name=None, max_tries=None,
connection_timeout=None, replicaset=None):
"""Create a new connection to the database backend.
All arguments default to the current configuration's values if not
@ -58,7 +61,9 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None):
raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc
logger.debug('Connection: {}'.format(Class))
return Class(host, port, dbname, replicaset=replicaset)
return Class(host=host, port=port, dbname=dbname,
max_tries=max_tries, connection_timeout=connection_timeout,
replicaset=replicaset)
class Connection:
@ -68,17 +73,41 @@ class Connection:
from and implements this class.
"""
def __init__(self, host=None, port=None, dbname=None, *args, **kwargs):
def __init__(self, host=None, port=None, dbname=None,
connection_timeout=None, max_tries=None,
**kwargs):
"""Create a new :class:`~.Connection` instance.
Args:
host (str): the host to connect to.
port (int): the port to connect to.
dbname (str): the name of the database to use.
connection_timeout (int, optional): the milliseconds to wait
until timing out the database connection attempt.
Defaults to 5000ms.
max_tries (int, optional): how many tries before giving up,
if 0 then try forever. Defaults to 3.
**kwargs: arbitrary keyword arguments provided by the
configuration's ``database`` settings
"""
dbconf = bigchaindb.config['database']
self.host = host or dbconf['host']
self.port = port or dbconf['port']
self.dbname = dbname or dbconf['name']
self.connection_timeout = connection_timeout if connection_timeout is not None\
else dbconf['connection_timeout']
self.max_tries = max_tries if max_tries is not None else dbconf['max_tries']
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
self._conn = None
@property
def conn(self):
if self._conn is None:
self.connect()
return self._conn
def run(self, query):
"""Run a query.
@ -94,3 +123,26 @@ class Connection:
"""
raise NotImplementedError()
def connect(self):
"""Try to connect to the database.
Raises:
:exc:`~ConnectionError`: If the connection to the database
fails.
"""
attempt = 0
for i in self.max_tries_counter:
attempt += 1
try:
self._conn = self._connect()
except ConnectionError as exc:
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
attempt, self.max_tries if self.max_tries != 0 else '',
self.host, self.port, self.connection_timeout)
if attempt == self.max_tries:
logger.critical('Cannot connect to the Database. Giving up.')
raise ConnectionError() from exc
else:
break

View File

@ -1,6 +1,5 @@
import time
import logging
from itertools import repeat
import pymongo
@ -15,46 +14,20 @@ from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
# TODO: waiting for #1082 to be merged
# to move this constants in the configuration.
CONNECTION_TIMEOUT = 4000 # in milliseconds
MAX_RETRIES = 3 # number of tries before giving up, if 0 then try forever
class MongoDBConnection(Connection):
def __init__(self, host=None, port=None, dbname=None,
connection_timeout=None, max_tries=None,
replicaset=None):
def __init__(self, replicaset=None, **kwargs):
"""Create a new Connection instance.
Args:
host (str, optional): the host to connect to.
port (int, optional): the port to connect to.
dbname (str, optional): the database to use.
connection_timeout (int, optional): the milliseconds to wait
until timing out the database connection attempt.
max_tries (int, optional): how many tries before giving up,
if 0 then try forever.
replicaset (str, optional): the name of the replica set to
connect to.
**kwargs: arbitrary keyword arguments provided by the
configuration's ``database`` settings
"""
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
super().__init__(**kwargs)
self.replicaset = replicaset or bigchaindb.config['database']['replicaset']
self.dbname = dbname or bigchaindb.config['database']['name']
self.connection_timeout = connection_timeout if connection_timeout is not None else CONNECTION_TIMEOUT
self.max_tries = max_tries if max_tries is not None else MAX_RETRIES
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
self.connection = None
@property
def conn(self):
if self.connection is None:
self._connect()
return self.connection
@property
def db(self):
@ -94,34 +67,23 @@ class MongoDBConnection(Connection):
fails.
"""
attempt = 0
for i in self.max_tries_counter:
attempt += 1
try:
# we should only return a connection if the replica set is
# initialized. initialize_replica_set will check if the
# replica set is initialized else it will initialize it.
initialize_replica_set(self.host, self.port, self.connection_timeout)
try:
# we should only return a connection if the replica set is
# initialized. initialize_replica_set will check if the
# replica set is initialized else it will initialize it.
initialize_replica_set(self.host, self.port, self.connection_timeout)
# FYI: this might raise a `ServerSelectionTimeoutError`,
# that is a subclass of `ConnectionFailure`.
return pymongo.MongoClient(self.host,
self.port,
replicaset=self.replicaset,
serverselectiontimeoutms=self.connection_timeout)
# FYI: this might raise a `ServerSelectionTimeoutError`,
# that is a subclass of `ConnectionFailure`.
self.connection = pymongo.MongoClient(self.host,
self.port,
replicaset=self.replicaset,
serverselectiontimeoutms=self.connection_timeout)
# `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`.
except (pymongo.errors.ConnectionFailure,
pymongo.errors.OperationFailure) as exc:
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
attempt, self.max_tries if self.max_tries != 0 else '',
self.host, self.port, self.connection_timeout)
if attempt == self.max_tries:
logger.critical('Cannot connect to the Database. Giving up.')
raise ConnectionError() from exc
else:
break
# `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`.
except (pymongo.errors.ConnectionFailure,
pymongo.errors.OperationFailure) as exc:
raise ConnectionError() from exc
def initialize_replica_set(host, port, connection_timeout):
@ -166,9 +128,10 @@ def _check_replica_set(conn):
replSet option.
"""
options = conn.admin.command('getCmdLineOpts')
print(options)
try:
repl_opts = options['parsed']['replication']
repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet']
repl_set_name = repl_opts.get('replSetName', repl_opts.get('replSet'))
except KeyError:
raise ConfigurationError('mongod was not started with'
' the replSet option.')

View File

@ -3,6 +3,7 @@ import logging
import rethinkdb as r
from bigchaindb import backend
from bigchaindb.backend.exceptions import BackendError
from bigchaindb.backend.changefeed import ChangeFeed
from bigchaindb.backend.utils import module_dispatch_registrar
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
@ -23,8 +24,8 @@ class RethinkDBChangeFeed(ChangeFeed):
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
except (BackendError, r.ReqlDriverError) as exc:
logger.exception('Error connecting to the database, retrying')
time.sleep(1)
def run_changefeed(self):

View File

@ -1,11 +1,7 @@
import time
import logging
import rethinkdb as r
from bigchaindb.backend.connection import Connection
logger = logging.getLogger(__name__)
from bigchaindb.backend.exceptions import ConnectionError, OperationError
class RethinkDBConnection(Connection):
@ -17,23 +13,6 @@ class RethinkDBConnection(Connection):
more times to run the query or open a connection.
"""
def __init__(self, host, port, dbname, max_tries=3, **kwargs):
"""Create a new :class:`~.RethinkDBConnection` instance.
See :meth:`.Connection.__init__` for
:attr:`host`, :attr:`port`, and :attr:`dbname`.
Args:
max_tries (int, optional): how many tries before giving up.
Defaults to 3.
"""
self.host = host
self.port = port
self.dbname = dbname
self.max_tries = max_tries
self.conn = None
def run(self, query):
"""Run a RethinkDB query.
@ -45,16 +24,10 @@ class RethinkDBConnection(Connection):
:attr:`~.RethinkDBConnection.max_tries`.
"""
if self.conn is None:
self._connect()
for i in range(self.max_tries):
try:
return query.run(self.conn)
except r.ReqlDriverError:
if i + 1 == self.max_tries:
raise
self._connect()
try:
return query.run(self.conn)
except r.ReqlDriverError as exc:
raise OperationError from exc
def _connect(self):
"""Set a connection to RethinkDB.
@ -66,16 +39,7 @@ class RethinkDBConnection(Connection):
:attr:`~.RethinkDBConnection.max_tries`.
"""
for i in range(1, self.max_tries + 1):
logging.debug('Connecting to database %s:%s/%s. (Attempt %s/%s)',
self.host, self.port, self.dbname, i, self.max_tries)
try:
self.conn = r.connect(host=self.host, port=self.port, db=self.dbname)
except r.ReqlDriverError:
if i == self.max_tries:
raise
wait_time = 2**i
logging.debug('Error connecting to database, waiting %ss', wait_time)
time.sleep(wait_time)
else:
break
try:
return r.connect(host=self.host, port=self.port, db=self.dbname)
except r.ReqlDriverError as exc:
raise ConnectionError from exc

View File

@ -430,14 +430,13 @@ class Bigchain(object):
# check if the owner is in the condition `owners_after`
if len(output['public_keys']) == 1:
if output['condition']['details']['public_key'] == owner:
tx_link = TransactionLink(tx['id'], index)
links.append(TransactionLink(tx['id'], index))
else:
# for transactions with multiple `public_keys` there will be several subfulfillments nested
# in the condition. We need to iterate the subfulfillments to make sure there is a
# subfulfillment for `owner`
if utils.condition_details_has_owner(output['condition']['details'], owner):
tx_link = TransactionLink(tx['id'], index)
links.append(tx_link)
links.append(TransactionLink(tx['id'], index))
return links
def get_owned_ids(self, owner):

View File

@ -75,6 +75,8 @@ docker run \
--name=rethinkdb \
--publish=172.17.0.1:28015:28015 \
--publish=172.17.0.1:58080:8080 \
--restart=always \
--volume "$HOME/bigchaindb_docker:/data" \
rethinkdb:2.3
```
@ -85,11 +87,25 @@ You can also access the RethinkDB dashboard at
#### For MongoDB
Note: MongoDB runs as user `mongodb` which had the UID `999` and GID `999`
inside the container. For the volume to be mounted properly, as user `mongodb`
in your host, you should have a `mongodb` user with UID and GID `999`.
If you have another user on the host with UID `999`, the mapped files will
be owned by this user in the host.
If there is no owner with UID 999, you can create the corresponding user and
group.
`groupadd -r --gid 999 mongodb && useradd -r --uid 999 -g mongodb mongodb`
```text
docker run \
--detach \
--name=mongodb \
--publish=172.17.0.1:27017:27017 \
--restart=always \
--volume=/tmp/mongodb_docker/db:/data/db \
--volume=/tmp/mongodb_docker/configdb:/data/configdb \
mongo:3.4.1 --replSet=bigchain-rs
```
@ -100,6 +116,7 @@ docker run \
--detach \
--name=bigchaindb \
--publish=59984:9984 \
--restart=always \
--volume=$HOME/bigchaindb_docker:/data \
bigchaindb/bigchaindb \
start

View File

@ -21,7 +21,7 @@ Step 2: Configure kubectl
The default location of the kubectl configuration file is ``~/.kube/config``.
If you don't have that file, then you need to get it.
If you deployed your Kubernetes cluster on Azure
**Azure.** If you deployed your Kubernetes cluster on Azure
using the Azure CLI 2.0 (as per :doc:`our template <template-kubernetes-azure>`),
then you can get the ``~/.kube/config`` file using:
@ -32,15 +32,128 @@ then you can get the ``~/.kube/config`` file using:
--name <ACS cluster name>
Step 3: Run a MongoDB Container
-------------------------------
Step 3: Create a StorageClass
-----------------------------
To start a MongoDB Docker container in a pod on one of the cluster nodes:
MongoDB needs somewhere to store its data persistently,
outside the container where MongoDB is running.
Explaining how Kubernetes handles persistent volumes,
and the associated terminology,
is beyond the scope of this documentation;
see `the Kubernetes docs about persistent volumes
<https://kubernetes.io/docs/user-guide/persistent-volumes>`_.
The first thing to do is create a Kubernetes StorageClass.
**Azure.** First, you need an Azure storage account.
If you deployed your Kubernetes cluster on Azure
using the Azure CLI 2.0
(as per :doc:`our template <template-kubernetes-azure>`),
then the `az acs create` command already created two
storage accounts in the same location and resource group
as your Kubernetes cluster.
Both should have the same "storage account SKU": ``Standard_LRS``.
Standard storage is lower-cost and lower-performance.
It uses hard disk drives (HDD).
LRS means locally-redundant storage: three replicas
in the same data center.
Premium storage is higher-cost and higher-performance.
It uses solid state drives (SSD).
At the time of writing,
when we created a storage account with SKU ``Premium_LRS``
and tried to use that,
the PersistentVolumeClaim would get stuck in a "Pending" state.
For future reference, the command to create a storage account is
`az storage account create <https://docs.microsoft.com/en-us/cli/azure/storage/account#create>`_.
Create a Kubernetes Storage Class named ``slow``
by writing a file named ``azureStorageClass.yml`` containing:
.. code:: yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1beta1
metadata:
name: slow
provisioner: kubernetes.io/azure-disk
parameters:
skuName: Standard_LRS
location: <region where your cluster is located>
and then:
.. code:: bash
$ kubectl ?????
$ kubectl apply -f azureStorageClass.yml
You can check if it worked using ``kubectl get storageclasses``.
Note that there is no line of the form
``storageAccount: <azure storage account name>``
under ``parameters:``. When we included one
and then created a PersistentVolumeClaim based on it,
the PersistentVolumeClaim would get stuck
in a "Pending" state.
Kubernetes just looks for a storageAccount
with the specified skuName and location.
Note: The BigchainDB Dashboard can be deployed
as a Docker container, like everything else.
Step 4: Create a PersistentVolumeClaim
--------------------------------------
Next, you'll create a PersistentVolumeClaim named ``mongoclaim``.
Create a file named ``mongoclaim.yml``
with the following contents:
.. code:: yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: mongoclaim
annotations:
volume.beta.kubernetes.io/storage-class: slow
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
Note how there's no explicit mention of Azure, AWS or whatever.
``ReadWriteOnce`` (RWO) means the volume can be mounted as
read-write by a single Kubernetes node.
(``ReadWriteOnce`` is the *only* access mode supported
by AzureDisk.)
``storage: 20Gi`` means the volume has a size of 20
`gibibytes <https://en.wikipedia.org/wiki/Gibibyte>`_.
(You can change that if you like.)
Create ``mongoclaim`` in your Kubernetes cluster:
.. code:: bash
$ kubectl apply -f mongoclaim.yml
You can check its status using:
.. code:: bash
$ kubectl get pvc
Initially, the status of ``mongoclaim`` might be "Pending"
but it should become "Bound" fairly quickly.
.. code:: bash
$ kubectl describe pvc
Name: mongoclaim
Namespace: default
StorageClass: slow
Status: Bound
Volume: pvc-ebed81f1-fdca-11e6-abf0-000d3a27ab21
Labels: <none>
Capacity: 2Gi
Access Modes: RWO
No events.

View File

@ -45,6 +45,12 @@ on most common operating systems
<https://docs.microsoft.com/en-us/cli/azure/install-az-cli2>`_.
Do that.
First, update the Azure CLI to the latest version:
.. code:: bash
$ az component update
Next, login to your account using:
.. code:: bash

View File

@ -40,7 +40,7 @@ def connection():
# executed to make sure that the replica set is correctly initialized.
# Here we force the the connection setup so that all required
# `Database.command` are executed before we mock them it in the tests.
connection._connect()
connection.connect()
return connection

View File

@ -34,6 +34,7 @@ def test_run_a_simple_query():
def test_raise_exception_when_max_tries():
from bigchaindb.backend import connect
from bigchaindb.backend.exceptions import OperationError
class MockQuery:
def run(self, conn):
@ -41,28 +42,41 @@ def test_raise_exception_when_max_tries():
conn = connect()
with pytest.raises(r.ReqlDriverError):
with pytest.raises(OperationError):
conn.run(MockQuery())
def test_reconnect_when_connection_lost():
from bigchaindb.backend import connect
def raise_exception(*args, **kwargs):
raise r.ReqlDriverError('mock')
conn = connect()
original_connect = r.connect
r.connect = raise_exception
def delayed_start():
time.sleep(1)
r.connect = original_connect
with patch('rethinkdb.connect') as mock_connect:
mock_connect.side_effect = [
r.ReqlDriverError('mock'),
original_connect()
]
thread = Thread(target=delayed_start)
query = r.expr('1')
thread.start()
assert conn.run(query) == '1'
conn = connect()
query = r.expr('1')
assert conn.run(query) == '1'
def test_reconnect_when_connection_lost_tries_n_times():
from bigchaindb.backend import connect
from bigchaindb.backend.exceptions import ConnectionError
with patch('rethinkdb.connect') as mock_connect:
mock_connect.side_effect = [
r.ReqlDriverError('mock'),
r.ReqlDriverError('mock'),
r.ReqlDriverError('mock')
]
conn = connect(max_tries=3)
query = r.expr('1')
with pytest.raises(ConnectionError):
assert conn.run(query) == '1'
def test_changefeed_reconnects_when_connection_lost(monkeypatch):

View File

@ -169,12 +169,17 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch, request):
'host': DATABASE_HOST,
'port': DATABASE_PORT,
'name': DATABASE_NAME,
'connection_timeout': 5000,
'max_tries': 3
}
database_mongodb = {
'backend': 'mongodb',
'host': DATABASE_HOST,
'port': DATABASE_PORT,
'name': DATABASE_NAME,
'connection_timeout': 5000,
'max_tries': 3,
'replicaset': 'bigchain-rs',
}

View File

@ -10,6 +10,8 @@ def config(request, monkeypatch):
'port': 28015,
'name': 'bigchain',
'replicaset': 'bigchain-rs',
'connection_timeout': 5000,
'max_tries': 3
},
'keypair': {
'public': 'pubkey',

View File

@ -47,3 +47,68 @@ def test_get_outputs_endpoint_with_invalid_unspent(client, user_pk):
res = client.get(OUTPUTS_ENDPOINT + params)
assert expected == res.json
assert res.status_code == 400
@pytest.mark.bdb
@pytest.mark.usefixtures('inputs')
def test_get_divisble_transactions_returns_500(b, client):
from bigchaindb.models import Transaction
from bigchaindb.common import crypto
import json
TX_ENDPOINT = '/api/v1/transactions'
def mine(tx_list):
block = b.create_block(tx_list)
b.write_block(block)
# vote the block valid
vote = b.vote(block.id, b.get_last_voted_block().id, True)
b.write_vote(vote)
alice_priv, alice_pub = crypto.generate_key_pair()
bob_priv, bob_pub = crypto.generate_key_pair()
carly_priv, carly_pub = crypto.generate_key_pair()
create_tx = Transaction.create([alice_pub], [([alice_pub], 4)])
create_tx.sign([alice_priv])
res = client.post(TX_ENDPOINT, data=json.dumps(create_tx.to_dict()))
assert res.status_code == 202
mine([create_tx])
transfer_tx = Transaction.transfer(create_tx.to_inputs(),
[([alice_pub], 3), ([bob_pub], 1)],
asset_id=create_tx.id)
transfer_tx.sign([alice_priv])
res = client.post(TX_ENDPOINT, data=json.dumps(transfer_tx.to_dict()))
assert res.status_code == 202
mine([transfer_tx])
transfer_tx_carly = Transaction.transfer([transfer_tx.to_inputs()[1]],
[([carly_pub], 1)],
asset_id=create_tx.id)
transfer_tx_carly.sign([bob_priv])
res = client.post(TX_ENDPOINT, data=json.dumps(transfer_tx_carly.to_dict()))
assert res.status_code == 202
mine([transfer_tx_carly])
asset_id = create_tx.id
url = TX_ENDPOINT + "?asset_id=" + asset_id
assert client.get(url).status_code == 200
assert len(client.get(url).json) == 3
url = OUTPUTS_ENDPOINT + '?public_key=' + alice_pub
assert client.get(url).status_code == 200
url = OUTPUTS_ENDPOINT + '?public_key=' + bob_pub
assert client.get(url).status_code == 200
url = OUTPUTS_ENDPOINT + '?public_key=' + carly_pub
assert client.get(url).status_code == 200