From d06e8b91d83c386d400b0bc61f09a8b3207e02ef Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 6 Sep 2016 23:51:12 +0200 Subject: [PATCH 1/8] Add Connection class to manage connections --- bigchaindb/commands/bigchain.py | 2 +- bigchaindb/commands/utils.py | 49 +++++++++++---- bigchaindb/core.py | 6 +- bigchaindb/db/utils.py | 35 +++++++++++ bigchaindb/pipelines/utils.py | 14 ++++- tests/db/test_utils.py | 1 - tests/test_run_query_util.py | 106 ++++++++++++++++++++++++++++++++ 7 files changed, 195 insertions(+), 18 deletions(-) create mode 100644 tests/test_run_query_util.py diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..a4375c8e 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -156,7 +156,7 @@ def run_start(args): if args.start_rethinkdb: try: - proc = utils.start_rethinkdb() + proc, port = utils.start_rethinkdb() except StartupError as e: sys.exit('Error starting RethinkDB, reason is: {}'.format(e)) logger.info('RethinkDB started with PID %s' % proc.pid) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index dc035de6..02f2a997 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -5,6 +5,7 @@ for ``argparse.ArgumentParser``. import argparse import multiprocessing as mp import subprocess +import tempfile import rethinkdb as r @@ -14,41 +15,63 @@ from bigchaindb import db from bigchaindb.version import __version__ -def start_rethinkdb(): +def start_temp_rethinkdb(port=0, directory=None): + directory = directory or tempfile.mkdtemp() + + extra_opts = ['--cluster-port', '0', + '--driver-port', str(port), + '--no-http-admin', + '--directory', directory] + + return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts) + + +def start_rethinkdb(wait_for_db=True, extra_opts=None): """Start RethinkDB as a child process and wait for it to be available. + Args: + wait_for_db (bool): wait for the database to be ready + extra_opts (list): a list of extra options to be used when + starting the db + Raises: ``bigchaindb.exceptions.StartupError`` if RethinkDB cannot be started. """ - proc = subprocess.Popen(['rethinkdb', '--bind', 'all'], + if not extra_opts: + extra_opts = [] + + proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) dbname = bigchaindb.config['database']['name'] line = '' + port = None for line in proc.stdout: + if line.startswith('Listening for client driver'): + port = int(line.split()[-1]) if line.startswith('Server ready'): # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - try: - conn = db.get_conn() - # Before checking if the db is ready, we need to query - # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) - except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: - raise StartupError('Error waiting for the database `{}` ' - 'to be ready'.format(dbname)) from exc - - return proc + if wait_for_db: + try: + conn = db.get_conn() + # Before checking if the db is ready, we need to query + # the server to check if it contains that db + if r.db_list().contains(dbname).run(conn): + r.db(dbname).wait().run(conn) + except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: + raise StartupError('Error waiting for the database `{}` ' + 'to be ready'.format(dbname)) from exc + return proc, port # We are here when we exhaust the stdout of the process. # The last `line` contains info about the error. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..3e344beb 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -8,6 +8,7 @@ import rethinkdb as r import rapidjson import bigchaindb +from bigchaindb.db.utils import Connection from bigchaindb import config_utils, crypto, exceptions, util @@ -64,6 +65,7 @@ class Bigchain(object): raise exceptions.KeypairNotFoundException() self._conn = None + self.connection = Connection(host=self.host, port=self.port, db=self.dbname) @property def conn(self): @@ -262,8 +264,8 @@ class Bigchain(object): When creating a transaction one of the optional arguments is the `payload`. The payload is a generic dict that contains information about the digital asset. - To make it easy to query the bigchain for that digital asset we create a UUID for the payload and - store it with the transaction. This makes it easy for developers to keep track of their digital + To make it easy to query the bigchain for that digital asset we create a UUID for the payload and + store it with the transaction. This makes it easy for developers to keep track of their digital assets in bigchain. Args: diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 603f143c..627782e9 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -1,5 +1,6 @@ """Utils to initialize and drop the database.""" +import time import logging import rethinkdb as r @@ -11,6 +12,40 @@ from bigchaindb import exceptions logger = logging.getLogger(__name__) +class Connection: + + def __init__(self, host=None, port=None, db=None, max_tries=3): + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.db = db or bigchaindb.config['database']['name'] + self.max_tries = max_tries + self.conn = None + + def run(self, query): + if self.conn is None: + self._connect() + + for i in range(self.max_tries): + try: + return query.run(self.conn) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + self._connect() + + def _connect(self): + for i in range(self.max_tries): + try: + self.conn = r.connect(host=self.host, port=self.port, + db=self.db) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + time.sleep(2**i) + + def get_conn(): '''Get the connection to the database.''' diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..ffa8fc7b 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -1,12 +1,17 @@ """Utility classes and functions to work with the pipelines.""" +import time import rethinkdb as r +import logging from multipipes import Node from bigchaindb import Bigchain +logger = logging.getLogger(__name__) + + class ChangeFeed(Node): """This class wraps a RethinkDB changefeed adding a `prefeed`. @@ -47,8 +52,15 @@ class ChangeFeed(Node): for element in self.prefeed: self.outqueue.put(element) - for change in r.table(self.table).changes().run(self.bigchain.conn): + while True: + try: + self.run_changefeed() + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table).changes()): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py index 0299224c..957373f9 100644 --- a/tests/db/test_utils.py +++ b/tests/db/test_utils.py @@ -4,7 +4,6 @@ import pytest import rethinkdb as r import bigchaindb -from bigchaindb import util from bigchaindb.db import utils from .conftest import setup_database as _setup_database diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py new file mode 100644 index 00000000..f2f33dac --- /dev/null +++ b/tests/test_run_query_util.py @@ -0,0 +1,106 @@ +from threading import Thread +import pytest + +import rethinkdb as r + +from bigchaindb.commands.utils import start_temp_rethinkdb +from bigchaindb.db.utils import Connection + + +def test_run_a_simple_query(): + conn = Connection() + query = r.expr('1') + assert conn.run(query) == '1' + + +def test_raise_exception_when_max_tries(): + class MockQuery: + def run(self, conn): + raise r.ReqlDriverError('mock') + + conn = Connection() + + with pytest.raises(r.ReqlDriverError): + conn.run(MockQuery()) + + +def test_reconnect_when_connection_lost(): + import time + + proc = None + + def delayed_start(): + nonlocal proc + time.sleep(1) + proc, _ = start_temp_rethinkdb(38015) + + thread = Thread(target=delayed_start) + conn = Connection(port=38015) + query = r.expr('1') + thread.start() + assert conn.run(query) == '1' + proc.terminate() + proc.wait() + + +def test_changefeed_reconnects_when_connection_lost(monkeypatch): + import os + import time + import tempfile + import multiprocessing as mp + + import bigchaindb + from bigchaindb.pipelines.utils import ChangeFeed + + dbport = 38015 + dbname = 'test_' + str(os.getpid()) + directory = tempfile.mkdtemp() + + monkeypatch.setitem(bigchaindb.config, 'database', { + 'host': 'localhost', + 'port': dbport, + 'name': dbname + }) + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + # prepare DB and table + conn = r.connect(port=dbport) + r.db_create(dbname).run(conn) + r.db(dbname).table_create('cat_facts').run(conn) + + # initialize ChangeFeed and put it in a thread + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + changefeed.outqueue = mp.Queue() + t_changefeed = Thread(target=changefeed.run_forever) + + t_changefeed.start() + time.sleep(1) + + # insert some records in the table to start generating + # events that changefeed will put in `outqueue` + r.db(dbname).table('cat_facts').insert({ + 'fact': 'A group of cats is called a clowder.' + }).run(conn) + + # the event should be in the outqueue + fact = changefeed.outqueue.get()['fact'] + assert fact == 'A group of cats is called a clowder.' + + # stop the DB process + proc.terminate() + proc.wait() + + assert t_changefeed.is_alive() is True + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + time.sleep(2) + + conn = r.connect(port=dbport) + r.db(dbname).table('cat_facts').insert({ + 'fact': 'Cats sleep 70% of their lives.' + }).run(conn) + + fact = changefeed.outqueue.get()['fact'] + assert fact == 'Cats sleep 70% of their lives.' From 1d073ee7061c32cb67cc676a0e388084635319e9 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:40:17 +0200 Subject: [PATCH 2/8] Use new Connection class to run queries --- bigchaindb/core.py | 116 +++++++++++++++++++--------------- bigchaindb/pipelines/utils.py | 1 + tests/pipelines/test_utils.py | 13 ++-- tests/test_commands.py | 8 ++- 4 files changed, 77 insertions(+), 61 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 3e344beb..7dd9ba1e 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -140,7 +140,9 @@ class Bigchain(object): signed_transaction.update({'assignee': assignee}) # write to the backlog - response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) + response = self.connection.run( + r.table('backlog') + .insert(signed_transaction, durability=durability)) return response def get_transaction(self, txid, include_status=False): @@ -181,13 +183,16 @@ class Bigchain(object): break # Query the transaction in the target block and return - response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\ - .get_field('block').get_field('transactions')\ - .filter(lambda tx: tx['id'] == txid).run(self.conn)[0] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(target_block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == txid))[0] else: # Otherwise, check the backlog - response = r.table('backlog').get(txid).run(self.conn) + response = self.connection.run(r.table('backlog').get(txid)) if response: tx_status = self.TX_IN_BACKLOG @@ -221,8 +226,10 @@ class Bigchain(object): A list of blocks with with only election information """ # First, get information on all blocks which contain this transaction - response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\ - .pluck('votes', 'id', {'block': ['voters']}).run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(value, index=index)\ + .pluck('votes', 'id', {'block': ['voters']})) return list(response) @@ -275,11 +282,11 @@ class Bigchain(object): A list of transactions containing that payload. If no transaction exists with that payload it returns an empty list `[]` """ - cursor = r.table('bigchain', read_mode=self.read_mode) \ - .get_all(payload_uuid, index='payload_uuid') \ - .concat_map(lambda block: block['block']['transactions']) \ - .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \ - .run(self.conn) + cursor = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(payload_uuid, index='payload_uuid') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid)) transactions = list(cursor) return transactions @@ -298,11 +305,11 @@ class Bigchain(object): """ # checks if an input was already spent # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} - response = r.table('bigchain', read_mode=self.read_mode)\ - .concat_map(lambda doc: doc['block']['transactions'])\ - .filter(lambda transaction: transaction['transaction']['fulfillments'] - .contains(lambda fulfillment: fulfillment['input'] == tx_input))\ - .run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'] + .contains(lambda fulfillment: fulfillment['input'] == tx_input))) transactions = list(response) @@ -337,12 +344,12 @@ class Bigchain(object): """ # get all transactions in which owner is in the `owners_after` list - response = r.table('bigchain', read_mode=self.read_mode) \ - .concat_map(lambda doc: doc['block']['transactions']) \ - .filter(lambda tx: tx['transaction']['conditions'] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'] .contains(lambda c: c['owners_after'] - .contains(owner))) \ - .run(self.conn) + .contains(owner)))) owned = [] for tx in response: @@ -486,8 +493,9 @@ class Bigchain(object): but the vote is invalid. """ - votes = list(r.table('votes', read_mode=self.read_mode)\ - .get_all([block['id'], self.me], index='block_and_voter').run(self.conn)) + votes = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .get_all([block['id'], self.me], index='block_and_voter'))) if len(votes) > 1: raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' @@ -528,12 +536,15 @@ class Bigchain(object): """ block_serialized = rapidjson.dumps(block) - r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) + self.connection.run( + r.table('bigchain') + .insert(r.json(block_serialized), durability=durability)) # TODO: Decide if we need this method def transaction_exists(self, transaction_id): - response = r.table('bigchain', read_mode=self.read_mode)\ - .get_all(transaction_id, index='transaction_id').run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(transaction_id, index='transaction_id')) return True if len(response.items) > 0 else False def prepare_genesis_block(self): @@ -560,7 +571,9 @@ class Bigchain(object): # 2. create the block with one transaction # 3. write the block to the bigchain - blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn) + blocks_count = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .count()) if blocks_count: raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') @@ -606,30 +619,30 @@ class Bigchain(object): def write_vote(self, vote): """Write the vote to the database.""" - r.table('votes') \ - .insert(vote) \ - .run(self.conn) + self.connection.run( + r.table('votes') + .insert(vote)) def get_last_voted_block(self): """Returns the last block that this node voted on.""" try: # get the latest value for the vote timestamp (over all votes) - max_timestamp = r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['node_pubkey'] == self.me) \ - .max(r.row['vote']['timestamp']) \ - .run(self.conn)['vote']['timestamp'] + max_timestamp = self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['node_pubkey'] == self.me) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] - last_voted = list(r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['vote']['timestamp'] == max_timestamp) \ - .filter(r.row['node_pubkey'] == self.me) \ - .run(self.conn)) + last_voted = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == self.me))) except r.ReqlNonExistenceError: # return last vote if last vote exists else return Genesis block - return list(r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block) - .run(self.conn))[0] + return list(self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(util.is_genesis_block)))[0] # Now the fun starts. Since the resolution of timestamp is a second, # we might have more than one vote per timestamp. If this is the case @@ -661,19 +674,21 @@ class Bigchain(object): except KeyError: break - res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn) + res = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(last_block_id)) return res def get_unvoted_blocks(self): """Return all the blocks that has not been voted by this node.""" - unvoted = r.table('bigchain', read_mode=self.read_mode) \ - .filter(lambda block: r.table('votes', read_mode=self.read_mode) + unvoted = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(lambda block: r.table('votes', read_mode=self.read_mode) .get_all([block['id'], self.me], index='block_and_voter') - .is_empty()) \ - .order_by(r.asc(r.row['block']['timestamp'])) \ - .run(self.conn) + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) # FIXME: I (@vrde) don't like this solution. Filtering should be done at a # database level. Solving issue #444 can help untangling the situation @@ -684,9 +699,8 @@ class Bigchain(object): def block_election_status(self, block): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - votes = r.table('votes', read_mode=self.read_mode) \ - .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \ - .run(self.conn) + votes = self.connection.run(r.table('votes', read_mode=self.read_mode) + .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter')) votes = list(votes) diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index ffa8fc7b..a6b74dfc 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -55,6 +55,7 @@ class ChangeFeed(Node): while True: try: self.run_changefeed() + break except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: logger.exception(exc) time.sleep(1) diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..459ae7f0 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -1,8 +1,7 @@ from unittest.mock import patch -import rethinkdb - from multipipes import Pipe +from bigchaindb.db.utils import Connection from bigchaindb.pipelines.utils import ChangeFeed @@ -18,7 +17,7 @@ MOCK_CHANGEFEED_DATA = [{ }] -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_insert(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT) @@ -28,7 +27,7 @@ def test_changefeed_insert(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_delete(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.DELETE) @@ -38,7 +37,7 @@ def test_changefeed_delete(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_update(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) @@ -49,7 +48,7 @@ def test_changefeed_update(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_multiple_operations(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) @@ -61,7 +60,7 @@ def test_changefeed_multiple_operations(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_prefeed(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3]) diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..8a22c160 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -67,7 +67,7 @@ def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_in run_start(args) -@patch('bigchaindb.commands.utils.start_rethinkdb') +@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=(Mock(), 2)) def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_run_configure, mock_processes_start, @@ -213,8 +213,10 @@ def test_run_configure_when_config_does_exist(monkeypatch, @patch('subprocess.Popen') def test_start_rethinkdb_returns_a_process_when_successful(mock_popen): from bigchaindb.commands import utils - mock_popen.return_value = Mock(stdout=['Server ready']) - assert utils.start_rethinkdb() is mock_popen.return_value + mock_popen.return_value = Mock(stdout=[ + 'Listening for client driver 1234', + 'Server ready']) + assert utils.start_rethinkdb() == (mock_popen.return_value, 1234) @patch('subprocess.Popen') From 7097efaa33299fa85bde1ae77d062ba6782233d6 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:47:22 +0200 Subject: [PATCH 3/8] Daemonize thread to make sure test exits --- tests/test_run_query_util.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index f2f33dac..3c37e140 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -72,7 +72,7 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): # initialize ChangeFeed and put it in a thread changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) changefeed.outqueue = mp.Queue() - t_changefeed = Thread(target=changefeed.run_forever) + t_changefeed = Thread(target=changefeed.run_forever, daemon=True) t_changefeed.start() time.sleep(1) @@ -104,3 +104,7 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): fact = changefeed.outqueue.get()['fact'] assert fact == 'Cats sleep 70% of their lives.' + + # stop the DB process + proc.terminate() + proc.wait() From 52243e12715e08db193daa03846b794d6e95fde9 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:54:19 +0200 Subject: [PATCH 4/8] Use new connection class in pipeline --- bigchaindb/pipelines/block.py | 22 ++++++++++++---------- bigchaindb/pipelines/election.py | 7 ++++--- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..0e9f8d04 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -53,10 +53,10 @@ class Block: Returns: The transaction. """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) + self.bigchain.connection.run( + r.table('backlog') + .get(tx['id']) + .delete(durability='hard')) return tx @@ -118,12 +118,14 @@ def initial(): b = Bigchain() - rs = r.table('backlog')\ - .between([b.me, r.minval], - [b.me, r.maxval], - index='assignee__transaction_timestamp')\ - .order_by(index=r.asc('assignee__transaction_timestamp'))\ - .run(b.conn) + rs = b.connection.run( + r.table('backlog') + .between( + [b.me, r.minval], + [b.me, r.maxval], + index='assignee__transaction_timestamp') + .order_by(index=r.asc('assignee__transaction_timestamp'))) + return rs diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 7a0e114c..cf464e5c 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -25,9 +25,10 @@ class Election: """ Checks if block has enough invalid votes to make a decision """ - next_block = r.table('bigchain')\ - .get(next_vote['vote']['voting_for_block'])\ - .run(self.bigchain.conn) + next_block = self.bigchain.connection.run( + r.table('bigchain') + .get(next_vote['vote']['voting_for_block'])) + if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID: return next_block From 65bc86f06e05f28f5239e8b806286975d57011aa Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 01:01:58 +0200 Subject: [PATCH 5/8] Add docstring to Connection class --- bigchaindb/db/utils.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 627782e9..84aabdd4 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -13,8 +13,23 @@ logger = logging.getLogger(__name__) class Connection: + """This class is a proxy to run queries against the database, + it is: + - lazy, since it creates a connection only when needed + - resilient, because before raising exceptions it tries + more times to run the query or open a connection. + """ def __init__(self, host=None, port=None, db=None, max_tries=3): + """Create a new Connection instance. + + Args: + host (str, optional): the host to connect to. + port (int, optional): the port to connect to. + db (str, optional): the database to use. + max_tries (int, optional): how many tries before giving up. + """ + self.host = host or bigchaindb.config['database']['host'] self.port = port or bigchaindb.config['database']['port'] self.db = db or bigchaindb.config['database']['name'] @@ -22,6 +37,12 @@ class Connection: self.conn = None def run(self, query): + """Run a query. + + Args: + query: the RethinkDB query. + """ + if self.conn is None: self._connect() From 39228be454325841458c532220c27edabd358927 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 01:51:18 +0200 Subject: [PATCH 6/8] Remove dependency on external RethinkDB instance --- bigchaindb/pipelines/utils.py | 5 +- tests/test_run_query_util.py | 89 +++++++++++++++++------------------ 2 files changed, 46 insertions(+), 48 deletions(-) diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index a6b74dfc..8f4c3b99 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -29,7 +29,7 @@ class ChangeFeed(Node): DELETE = 2 UPDATE = 4 - def __init__(self, table, operation, prefeed=None): + def __init__(self, table, operation, prefeed=None, bigchain=None): """Create a new RethinkDB ChangeFeed. Args: @@ -40,13 +40,14 @@ class ChangeFeed(Node): (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) prefeed (iterable): whatever set of data you want to be published first. + bigchain (``Bigchain``): the bigchain instance to use (can be None). """ super().__init__(name='changefeed') self.prefeed = prefeed if prefeed else [] self.table = table self.operation = operation - self.bigchain = Bigchain() + self.bigchain = bigchain or Bigchain() def run_forever(self): for element in self.prefeed: diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 3c37e140..bac63048 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -1,3 +1,4 @@ +from unittest.mock import patch from threading import Thread import pytest @@ -26,85 +27,81 @@ def test_raise_exception_when_max_tries(): def test_reconnect_when_connection_lost(): import time + import rethinkdb as r - proc = None + def raise_exception(*args, **kwargs): + raise r.ReqlDriverError('mock') + + conn = Connection() + original_connect = r.connect + r.connect = raise_exception def delayed_start(): - nonlocal proc time.sleep(1) - proc, _ = start_temp_rethinkdb(38015) + r.connect = original_connect thread = Thread(target=delayed_start) - conn = Connection(port=38015) query = r.expr('1') thread.start() assert conn.run(query) == '1' - proc.terminate() - proc.wait() def test_changefeed_reconnects_when_connection_lost(monkeypatch): - import os import time - import tempfile import multiprocessing as mp - import bigchaindb + from bigchaindb import Bigchain from bigchaindb.pipelines.utils import ChangeFeed - dbport = 38015 - dbname = 'test_' + str(os.getpid()) - directory = tempfile.mkdtemp() + class MockConnection: + tries = 0 - monkeypatch.setitem(bigchaindb.config, 'database', { - 'host': 'localhost', - 'port': dbport, - 'name': dbname - }) + def run(self, *args, **kwargs): + return self - proc, _ = start_temp_rethinkdb(dbport, directory=directory) + def __iter__(self): + return self - # prepare DB and table - conn = r.connect(port=dbport) - r.db_create(dbname).run(conn) - r.db(dbname).table_create('cat_facts').run(conn) + def __next__(self): + self.tries += 1 + if self.tries == 1: + raise r.ReqlDriverError('mock') + elif self.tries == 2: + return { 'new_val': { 'fact': 'A group of cats is called a clowder.' }, + 'old_val': None } + if self.tries == 3: + raise r.ReqlDriverError('mock') + elif self.tries == 4: + return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' }, + 'old_val': None } + else: + time.sleep(10) - # initialize ChangeFeed and put it in a thread - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + + bigchain = Bigchain() + bigchain.connection = MockConnection() + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, + bigchain=bigchain) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True) t_changefeed.start() time.sleep(1) + # try 1: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. - # insert some records in the table to start generating - # events that changefeed will put in `outqueue` - r.db(dbname).table('cat_facts').insert({ - 'fact': 'A group of cats is called a clowder.' - }).run(conn) - - # the event should be in the outqueue + # try 2: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'A group of cats is called a clowder.' - # stop the DB process - proc.terminate() - proc.wait() - + # try 3: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. assert t_changefeed.is_alive() is True - proc, _ = start_temp_rethinkdb(dbport, directory=directory) - time.sleep(2) - - conn = r.connect(port=dbport) - r.db(dbname).table('cat_facts').insert({ - 'fact': 'Cats sleep 70% of their lives.' - }).run(conn) + # try 4: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'Cats sleep 70% of their lives.' - - # stop the DB process - proc.terminate() - proc.wait() From b8e6b0b830c0674deb322734971d34f772edd6a1 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 02:06:52 +0200 Subject: [PATCH 7/8] Add code coverage for start_temp_rethinkdb --- tests/test_commands.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_commands.py b/tests/test_commands.py index 8a22c160..d7cce77f 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -228,6 +228,15 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() +@patch('subprocess.Popen') +def test_start_temp_rethinkdb_returns_a_process_when_successful(mock_popen): + from bigchaindb.commands import utils + mock_popen.return_value = Mock(stdout=[ + 'Listening for client driver 1234', + 'Server ready']) + assert utils.start_temp_rethinkdb() == (mock_popen.return_value, 1234) + + @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards From b9fba73c143deb40c3f554906024f90be3f913dd Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 17:32:34 +0200 Subject: [PATCH 8/8] Remove unused code --- bigchaindb/commands/bigchain.py | 2 +- bigchaindb/commands/utils.py | 45 ++++++++++----------------------- bigchaindb/core.py | 2 +- tests/test_commands.py | 13 ++-------- tests/test_run_query_util.py | 2 -- 5 files changed, 17 insertions(+), 47 deletions(-) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index a4375c8e..1943167b 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -156,7 +156,7 @@ def run_start(args): if args.start_rethinkdb: try: - proc, port = utils.start_rethinkdb() + proc = utils.start_rethinkdb() except StartupError as e: sys.exit('Error starting RethinkDB, reason is: {}'.format(e)) logger.info('RethinkDB started with PID %s' % proc.pid) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index 02f2a997..ac2c4d36 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -5,7 +5,6 @@ for ``argparse.ArgumentParser``. import argparse import multiprocessing as mp import subprocess -import tempfile import rethinkdb as r @@ -15,18 +14,7 @@ from bigchaindb import db from bigchaindb.version import __version__ -def start_temp_rethinkdb(port=0, directory=None): - directory = directory or tempfile.mkdtemp() - - extra_opts = ['--cluster-port', '0', - '--driver-port', str(port), - '--no-http-admin', - '--directory', directory] - - return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts) - - -def start_rethinkdb(wait_for_db=True, extra_opts=None): +def start_rethinkdb(): """Start RethinkDB as a child process and wait for it to be available. @@ -40,38 +28,31 @@ def start_rethinkdb(wait_for_db=True, extra_opts=None): be started. """ - if not extra_opts: - extra_opts = [] - - proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts, + proc = subprocess.Popen(['rethinkdb', '--bind', 'all'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) dbname = bigchaindb.config['database']['name'] line = '' - port = None for line in proc.stdout: - if line.startswith('Listening for client driver'): - port = int(line.split()[-1]) if line.startswith('Server ready'): + # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - - if wait_for_db: - try: - conn = db.get_conn() - # Before checking if the db is ready, we need to query - # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) - except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: - raise StartupError('Error waiting for the database `{}` ' - 'to be ready'.format(dbname)) from exc - return proc, port + try: + conn = db.get_conn() + # Before checking if the db is ready, we need to query + # the server to check if it contains that db + if r.db_list().contains(dbname).run(conn): + r.db(dbname).wait().run(conn) + except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: + raise StartupError('Error waiting for the database `{}` ' + 'to be ready'.format(dbname)) from exc + return proc # We are here when we exhaust the stdout of the process. # The last `line` contains info about the error. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 7dd9ba1e..2248be5e 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -228,7 +228,7 @@ class Bigchain(object): # First, get information on all blocks which contain this transaction response = self.connection.run( r.table('bigchain', read_mode=self.read_mode) - .get_all(value, index=index)\ + .get_all(value, index=index) .pluck('votes', 'id', {'block': ['voters']})) return list(response) diff --git a/tests/test_commands.py b/tests/test_commands.py index d7cce77f..87b05b89 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -67,7 +67,7 @@ def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_in run_start(args) -@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=(Mock(), 2)) +@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=Mock()) def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_run_configure, mock_processes_start, @@ -216,7 +216,7 @@ def test_start_rethinkdb_returns_a_process_when_successful(mock_popen): mock_popen.return_value = Mock(stdout=[ 'Listening for client driver 1234', 'Server ready']) - assert utils.start_rethinkdb() == (mock_popen.return_value, 1234) + assert utils.start_rethinkdb() is mock_popen.return_value @patch('subprocess.Popen') @@ -228,15 +228,6 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() -@patch('subprocess.Popen') -def test_start_temp_rethinkdb_returns_a_process_when_successful(mock_popen): - from bigchaindb.commands import utils - mock_popen.return_value = Mock(stdout=[ - 'Listening for client driver 1234', - 'Server ready']) - assert utils.start_temp_rethinkdb() == (mock_popen.return_value, 1234) - - @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index bac63048..f81bd232 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -1,10 +1,8 @@ -from unittest.mock import patch from threading import Thread import pytest import rethinkdb as r -from bigchaindb.commands.utils import start_temp_rethinkdb from bigchaindb.db.utils import Connection