diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..a4375c8e 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -156,7 +156,7 @@ def run_start(args): if args.start_rethinkdb: try: - proc = utils.start_rethinkdb() + proc, port = utils.start_rethinkdb() except StartupError as e: sys.exit('Error starting RethinkDB, reason is: {}'.format(e)) logger.info('RethinkDB started with PID %s' % proc.pid) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index dc035de6..02f2a997 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -5,6 +5,7 @@ for ``argparse.ArgumentParser``. import argparse import multiprocessing as mp import subprocess +import tempfile import rethinkdb as r @@ -14,41 +15,63 @@ from bigchaindb import db from bigchaindb.version import __version__ -def start_rethinkdb(): +def start_temp_rethinkdb(port=0, directory=None): + directory = directory or tempfile.mkdtemp() + + extra_opts = ['--cluster-port', '0', + '--driver-port', str(port), + '--no-http-admin', + '--directory', directory] + + return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts) + + +def start_rethinkdb(wait_for_db=True, extra_opts=None): """Start RethinkDB as a child process and wait for it to be available. + Args: + wait_for_db (bool): wait for the database to be ready + extra_opts (list): a list of extra options to be used when + starting the db + Raises: ``bigchaindb.exceptions.StartupError`` if RethinkDB cannot be started. """ - proc = subprocess.Popen(['rethinkdb', '--bind', 'all'], + if not extra_opts: + extra_opts = [] + + proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) dbname = bigchaindb.config['database']['name'] line = '' + port = None for line in proc.stdout: + if line.startswith('Listening for client driver'): + port = int(line.split()[-1]) if line.startswith('Server ready'): # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - try: - conn = db.get_conn() - # Before checking if the db is ready, we need to query - # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) - except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: - raise StartupError('Error waiting for the database `{}` ' - 'to be ready'.format(dbname)) from exc - - return proc + if wait_for_db: + try: + conn = db.get_conn() + # Before checking if the db is ready, we need to query + # the server to check if it contains that db + if r.db_list().contains(dbname).run(conn): + r.db(dbname).wait().run(conn) + except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: + raise StartupError('Error waiting for the database `{}` ' + 'to be ready'.format(dbname)) from exc + return proc, port # We are here when we exhaust the stdout of the process. # The last `line` contains info about the error. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..3e344beb 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -8,6 +8,7 @@ import rethinkdb as r import rapidjson import bigchaindb +from bigchaindb.db.utils import Connection from bigchaindb import config_utils, crypto, exceptions, util @@ -64,6 +65,7 @@ class Bigchain(object): raise exceptions.KeypairNotFoundException() self._conn = None + self.connection = Connection(host=self.host, port=self.port, db=self.dbname) @property def conn(self): @@ -262,8 +264,8 @@ class Bigchain(object): When creating a transaction one of the optional arguments is the `payload`. The payload is a generic dict that contains information about the digital asset. - To make it easy to query the bigchain for that digital asset we create a UUID for the payload and - store it with the transaction. This makes it easy for developers to keep track of their digital + To make it easy to query the bigchain for that digital asset we create a UUID for the payload and + store it with the transaction. This makes it easy for developers to keep track of their digital assets in bigchain. Args: diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 603f143c..627782e9 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -1,5 +1,6 @@ """Utils to initialize and drop the database.""" +import time import logging import rethinkdb as r @@ -11,6 +12,40 @@ from bigchaindb import exceptions logger = logging.getLogger(__name__) +class Connection: + + def __init__(self, host=None, port=None, db=None, max_tries=3): + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.db = db or bigchaindb.config['database']['name'] + self.max_tries = max_tries + self.conn = None + + def run(self, query): + if self.conn is None: + self._connect() + + for i in range(self.max_tries): + try: + return query.run(self.conn) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + self._connect() + + def _connect(self): + for i in range(self.max_tries): + try: + self.conn = r.connect(host=self.host, port=self.port, + db=self.db) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + time.sleep(2**i) + + def get_conn(): '''Get the connection to the database.''' diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..ffa8fc7b 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -1,12 +1,17 @@ """Utility classes and functions to work with the pipelines.""" +import time import rethinkdb as r +import logging from multipipes import Node from bigchaindb import Bigchain +logger = logging.getLogger(__name__) + + class ChangeFeed(Node): """This class wraps a RethinkDB changefeed adding a `prefeed`. @@ -47,8 +52,15 @@ class ChangeFeed(Node): for element in self.prefeed: self.outqueue.put(element) - for change in r.table(self.table).changes().run(self.bigchain.conn): + while True: + try: + self.run_changefeed() + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table).changes()): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py index 0299224c..957373f9 100644 --- a/tests/db/test_utils.py +++ b/tests/db/test_utils.py @@ -4,7 +4,6 @@ import pytest import rethinkdb as r import bigchaindb -from bigchaindb import util from bigchaindb.db import utils from .conftest import setup_database as _setup_database diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py new file mode 100644 index 00000000..f2f33dac --- /dev/null +++ b/tests/test_run_query_util.py @@ -0,0 +1,106 @@ +from threading import Thread +import pytest + +import rethinkdb as r + +from bigchaindb.commands.utils import start_temp_rethinkdb +from bigchaindb.db.utils import Connection + + +def test_run_a_simple_query(): + conn = Connection() + query = r.expr('1') + assert conn.run(query) == '1' + + +def test_raise_exception_when_max_tries(): + class MockQuery: + def run(self, conn): + raise r.ReqlDriverError('mock') + + conn = Connection() + + with pytest.raises(r.ReqlDriverError): + conn.run(MockQuery()) + + +def test_reconnect_when_connection_lost(): + import time + + proc = None + + def delayed_start(): + nonlocal proc + time.sleep(1) + proc, _ = start_temp_rethinkdb(38015) + + thread = Thread(target=delayed_start) + conn = Connection(port=38015) + query = r.expr('1') + thread.start() + assert conn.run(query) == '1' + proc.terminate() + proc.wait() + + +def test_changefeed_reconnects_when_connection_lost(monkeypatch): + import os + import time + import tempfile + import multiprocessing as mp + + import bigchaindb + from bigchaindb.pipelines.utils import ChangeFeed + + dbport = 38015 + dbname = 'test_' + str(os.getpid()) + directory = tempfile.mkdtemp() + + monkeypatch.setitem(bigchaindb.config, 'database', { + 'host': 'localhost', + 'port': dbport, + 'name': dbname + }) + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + # prepare DB and table + conn = r.connect(port=dbport) + r.db_create(dbname).run(conn) + r.db(dbname).table_create('cat_facts').run(conn) + + # initialize ChangeFeed and put it in a thread + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + changefeed.outqueue = mp.Queue() + t_changefeed = Thread(target=changefeed.run_forever) + + t_changefeed.start() + time.sleep(1) + + # insert some records in the table to start generating + # events that changefeed will put in `outqueue` + r.db(dbname).table('cat_facts').insert({ + 'fact': 'A group of cats is called a clowder.' + }).run(conn) + + # the event should be in the outqueue + fact = changefeed.outqueue.get()['fact'] + assert fact == 'A group of cats is called a clowder.' + + # stop the DB process + proc.terminate() + proc.wait() + + assert t_changefeed.is_alive() is True + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + time.sleep(2) + + conn = r.connect(port=dbport) + r.db(dbname).table('cat_facts').insert({ + 'fact': 'Cats sleep 70% of their lives.' + }).run(conn) + + fact = changefeed.outqueue.get()['fact'] + assert fact == 'Cats sleep 70% of their lives.'