Add Connection class to manage connections

This commit is contained in:
vrde 2016-09-06 23:51:12 +02:00
parent 8e6f0804e2
commit d06e8b91d8
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
7 changed files with 195 additions and 18 deletions

View File

@ -156,7 +156,7 @@ def run_start(args):
if args.start_rethinkdb:
try:
proc = utils.start_rethinkdb()
proc, port = utils.start_rethinkdb()
except StartupError as e:
sys.exit('Error starting RethinkDB, reason is: {}'.format(e))
logger.info('RethinkDB started with PID %s' % proc.pid)

View File

@ -5,6 +5,7 @@ for ``argparse.ArgumentParser``.
import argparse
import multiprocessing as mp
import subprocess
import tempfile
import rethinkdb as r
@ -14,41 +15,63 @@ from bigchaindb import db
from bigchaindb.version import __version__
def start_rethinkdb():
def start_temp_rethinkdb(port=0, directory=None):
directory = directory or tempfile.mkdtemp()
extra_opts = ['--cluster-port', '0',
'--driver-port', str(port),
'--no-http-admin',
'--directory', directory]
return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts)
def start_rethinkdb(wait_for_db=True, extra_opts=None):
"""Start RethinkDB as a child process and wait for it to be
available.
Args:
wait_for_db (bool): wait for the database to be ready
extra_opts (list): a list of extra options to be used when
starting the db
Raises:
``bigchaindb.exceptions.StartupError`` if RethinkDB cannot
be started.
"""
proc = subprocess.Popen(['rethinkdb', '--bind', 'all'],
if not extra_opts:
extra_opts = []
proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
dbname = bigchaindb.config['database']['name']
line = ''
port = None
for line in proc.stdout:
if line.startswith('Listening for client driver'):
port = int(line.split()[-1])
if line.startswith('Server ready'):
# FIXME: seems like tables are not ready when the server is ready,
# that's why we need to query RethinkDB to know the state
# of the database. This code assumes the tables are ready
# when the database is ready. This seems a valid assumption.
try:
conn = db.get_conn()
# Before checking if the db is ready, we need to query
# the server to check if it contains that db
if r.db_list().contains(dbname).run(conn):
r.db(dbname).wait().run(conn)
except (r.ReqlOpFailedError, r.ReqlDriverError) as exc:
raise StartupError('Error waiting for the database `{}` '
'to be ready'.format(dbname)) from exc
return proc
if wait_for_db:
try:
conn = db.get_conn()
# Before checking if the db is ready, we need to query
# the server to check if it contains that db
if r.db_list().contains(dbname).run(conn):
r.db(dbname).wait().run(conn)
except (r.ReqlOpFailedError, r.ReqlDriverError) as exc:
raise StartupError('Error waiting for the database `{}` '
'to be ready'.format(dbname)) from exc
return proc, port
# We are here when we exhaust the stdout of the process.
# The last `line` contains info about the error.

View File

@ -8,6 +8,7 @@ import rethinkdb as r
import rapidjson
import bigchaindb
from bigchaindb.db.utils import Connection
from bigchaindb import config_utils, crypto, exceptions, util
@ -64,6 +65,7 @@ class Bigchain(object):
raise exceptions.KeypairNotFoundException()
self._conn = None
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
@property
def conn(self):
@ -262,8 +264,8 @@ class Bigchain(object):
When creating a transaction one of the optional arguments is the `payload`. The payload is a generic
dict that contains information about the digital asset.
To make it easy to query the bigchain for that digital asset we create a UUID for the payload and
store it with the transaction. This makes it easy for developers to keep track of their digital
To make it easy to query the bigchain for that digital asset we create a UUID for the payload and
store it with the transaction. This makes it easy for developers to keep track of their digital
assets in bigchain.
Args:

View File

@ -1,5 +1,6 @@
"""Utils to initialize and drop the database."""
import time
import logging
import rethinkdb as r
@ -11,6 +12,40 @@ from bigchaindb import exceptions
logger = logging.getLogger(__name__)
class Connection:
def __init__(self, host=None, port=None, db=None, max_tries=3):
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.db = db or bigchaindb.config['database']['name']
self.max_tries = max_tries
self.conn = None
def run(self, query):
if self.conn is None:
self._connect()
for i in range(self.max_tries):
try:
return query.run(self.conn)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
self._connect()
def _connect(self):
for i in range(self.max_tries):
try:
self.conn = r.connect(host=self.host, port=self.port,
db=self.db)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
time.sleep(2**i)
def get_conn():
'''Get the connection to the database.'''

View File

@ -1,12 +1,17 @@
"""Utility classes and functions to work with the pipelines."""
import time
import rethinkdb as r
import logging
from multipipes import Node
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a `prefeed`.
@ -47,8 +52,15 @@ class ChangeFeed(Node):
for element in self.prefeed:
self.outqueue.put(element)
for change in r.table(self.table).changes().run(self.bigchain.conn):
while True:
try:
self.run_changefeed()
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete

View File

@ -4,7 +4,6 @@ import pytest
import rethinkdb as r
import bigchaindb
from bigchaindb import util
from bigchaindb.db import utils
from .conftest import setup_database as _setup_database

View File

@ -0,0 +1,106 @@
from threading import Thread
import pytest
import rethinkdb as r
from bigchaindb.commands.utils import start_temp_rethinkdb
from bigchaindb.db.utils import Connection
def test_run_a_simple_query():
conn = Connection()
query = r.expr('1')
assert conn.run(query) == '1'
def test_raise_exception_when_max_tries():
class MockQuery:
def run(self, conn):
raise r.ReqlDriverError('mock')
conn = Connection()
with pytest.raises(r.ReqlDriverError):
conn.run(MockQuery())
def test_reconnect_when_connection_lost():
import time
proc = None
def delayed_start():
nonlocal proc
time.sleep(1)
proc, _ = start_temp_rethinkdb(38015)
thread = Thread(target=delayed_start)
conn = Connection(port=38015)
query = r.expr('1')
thread.start()
assert conn.run(query) == '1'
proc.terminate()
proc.wait()
def test_changefeed_reconnects_when_connection_lost(monkeypatch):
import os
import time
import tempfile
import multiprocessing as mp
import bigchaindb
from bigchaindb.pipelines.utils import ChangeFeed
dbport = 38015
dbname = 'test_' + str(os.getpid())
directory = tempfile.mkdtemp()
monkeypatch.setitem(bigchaindb.config, 'database', {
'host': 'localhost',
'port': dbport,
'name': dbname
})
proc, _ = start_temp_rethinkdb(dbport, directory=directory)
# prepare DB and table
conn = r.connect(port=dbport)
r.db_create(dbname).run(conn)
r.db(dbname).table_create('cat_facts').run(conn)
# initialize ChangeFeed and put it in a thread
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT)
changefeed.outqueue = mp.Queue()
t_changefeed = Thread(target=changefeed.run_forever)
t_changefeed.start()
time.sleep(1)
# insert some records in the table to start generating
# events that changefeed will put in `outqueue`
r.db(dbname).table('cat_facts').insert({
'fact': 'A group of cats is called a clowder.'
}).run(conn)
# the event should be in the outqueue
fact = changefeed.outqueue.get()['fact']
assert fact == 'A group of cats is called a clowder.'
# stop the DB process
proc.terminate()
proc.wait()
assert t_changefeed.is_alive() is True
proc, _ = start_temp_rethinkdb(dbport, directory=directory)
time.sleep(2)
conn = r.connect(port=dbport)
r.db(dbname).table('cat_facts').insert({
'fact': 'Cats sleep 70% of their lives.'
}).run(conn)
fact = changefeed.outqueue.get()['fact']
assert fact == 'Cats sleep 70% of their lives.'