From d06e8b91d83c386d400b0bc61f09a8b3207e02ef Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 6 Sep 2016 23:51:12 +0200 Subject: [PATCH 01/18] Add Connection class to manage connections --- bigchaindb/commands/bigchain.py | 2 +- bigchaindb/commands/utils.py | 49 +++++++++++---- bigchaindb/core.py | 6 +- bigchaindb/db/utils.py | 35 +++++++++++ bigchaindb/pipelines/utils.py | 14 ++++- tests/db/test_utils.py | 1 - tests/test_run_query_util.py | 106 ++++++++++++++++++++++++++++++++ 7 files changed, 195 insertions(+), 18 deletions(-) create mode 100644 tests/test_run_query_util.py diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..a4375c8e 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -156,7 +156,7 @@ def run_start(args): if args.start_rethinkdb: try: - proc = utils.start_rethinkdb() + proc, port = utils.start_rethinkdb() except StartupError as e: sys.exit('Error starting RethinkDB, reason is: {}'.format(e)) logger.info('RethinkDB started with PID %s' % proc.pid) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index dc035de6..02f2a997 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -5,6 +5,7 @@ for ``argparse.ArgumentParser``. import argparse import multiprocessing as mp import subprocess +import tempfile import rethinkdb as r @@ -14,41 +15,63 @@ from bigchaindb import db from bigchaindb.version import __version__ -def start_rethinkdb(): +def start_temp_rethinkdb(port=0, directory=None): + directory = directory or tempfile.mkdtemp() + + extra_opts = ['--cluster-port', '0', + '--driver-port', str(port), + '--no-http-admin', + '--directory', directory] + + return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts) + + +def start_rethinkdb(wait_for_db=True, extra_opts=None): """Start RethinkDB as a child process and wait for it to be available. + Args: + wait_for_db (bool): wait for the database to be ready + extra_opts (list): a list of extra options to be used when + starting the db + Raises: ``bigchaindb.exceptions.StartupError`` if RethinkDB cannot be started. """ - proc = subprocess.Popen(['rethinkdb', '--bind', 'all'], + if not extra_opts: + extra_opts = [] + + proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) dbname = bigchaindb.config['database']['name'] line = '' + port = None for line in proc.stdout: + if line.startswith('Listening for client driver'): + port = int(line.split()[-1]) if line.startswith('Server ready'): # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - try: - conn = db.get_conn() - # Before checking if the db is ready, we need to query - # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) - except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: - raise StartupError('Error waiting for the database `{}` ' - 'to be ready'.format(dbname)) from exc - - return proc + if wait_for_db: + try: + conn = db.get_conn() + # Before checking if the db is ready, we need to query + # the server to check if it contains that db + if r.db_list().contains(dbname).run(conn): + r.db(dbname).wait().run(conn) + except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: + raise StartupError('Error waiting for the database `{}` ' + 'to be ready'.format(dbname)) from exc + return proc, port # We are here when we exhaust the stdout of the process. # The last `line` contains info about the error. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..3e344beb 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -8,6 +8,7 @@ import rethinkdb as r import rapidjson import bigchaindb +from bigchaindb.db.utils import Connection from bigchaindb import config_utils, crypto, exceptions, util @@ -64,6 +65,7 @@ class Bigchain(object): raise exceptions.KeypairNotFoundException() self._conn = None + self.connection = Connection(host=self.host, port=self.port, db=self.dbname) @property def conn(self): @@ -262,8 +264,8 @@ class Bigchain(object): When creating a transaction one of the optional arguments is the `payload`. The payload is a generic dict that contains information about the digital asset. - To make it easy to query the bigchain for that digital asset we create a UUID for the payload and - store it with the transaction. This makes it easy for developers to keep track of their digital + To make it easy to query the bigchain for that digital asset we create a UUID for the payload and + store it with the transaction. This makes it easy for developers to keep track of their digital assets in bigchain. Args: diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 603f143c..627782e9 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -1,5 +1,6 @@ """Utils to initialize and drop the database.""" +import time import logging import rethinkdb as r @@ -11,6 +12,40 @@ from bigchaindb import exceptions logger = logging.getLogger(__name__) +class Connection: + + def __init__(self, host=None, port=None, db=None, max_tries=3): + self.host = host or bigchaindb.config['database']['host'] + self.port = port or bigchaindb.config['database']['port'] + self.db = db or bigchaindb.config['database']['name'] + self.max_tries = max_tries + self.conn = None + + def run(self, query): + if self.conn is None: + self._connect() + + for i in range(self.max_tries): + try: + return query.run(self.conn) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + self._connect() + + def _connect(self): + for i in range(self.max_tries): + try: + self.conn = r.connect(host=self.host, port=self.port, + db=self.db) + except r.ReqlDriverError as exc: + if i + 1 == self.max_tries: + raise + else: + time.sleep(2**i) + + def get_conn(): '''Get the connection to the database.''' diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..ffa8fc7b 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -1,12 +1,17 @@ """Utility classes and functions to work with the pipelines.""" +import time import rethinkdb as r +import logging from multipipes import Node from bigchaindb import Bigchain +logger = logging.getLogger(__name__) + + class ChangeFeed(Node): """This class wraps a RethinkDB changefeed adding a `prefeed`. @@ -47,8 +52,15 @@ class ChangeFeed(Node): for element in self.prefeed: self.outqueue.put(element) - for change in r.table(self.table).changes().run(self.bigchain.conn): + while True: + try: + self.run_changefeed() + except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: + logger.exception(exc) + time.sleep(1) + def run_changefeed(self): + for change in self.bigchain.connection.run(r.table(self.table).changes()): is_insert = change['old_val'] is None is_delete = change['new_val'] is None is_update = not is_insert and not is_delete diff --git a/tests/db/test_utils.py b/tests/db/test_utils.py index 0299224c..957373f9 100644 --- a/tests/db/test_utils.py +++ b/tests/db/test_utils.py @@ -4,7 +4,6 @@ import pytest import rethinkdb as r import bigchaindb -from bigchaindb import util from bigchaindb.db import utils from .conftest import setup_database as _setup_database diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py new file mode 100644 index 00000000..f2f33dac --- /dev/null +++ b/tests/test_run_query_util.py @@ -0,0 +1,106 @@ +from threading import Thread +import pytest + +import rethinkdb as r + +from bigchaindb.commands.utils import start_temp_rethinkdb +from bigchaindb.db.utils import Connection + + +def test_run_a_simple_query(): + conn = Connection() + query = r.expr('1') + assert conn.run(query) == '1' + + +def test_raise_exception_when_max_tries(): + class MockQuery: + def run(self, conn): + raise r.ReqlDriverError('mock') + + conn = Connection() + + with pytest.raises(r.ReqlDriverError): + conn.run(MockQuery()) + + +def test_reconnect_when_connection_lost(): + import time + + proc = None + + def delayed_start(): + nonlocal proc + time.sleep(1) + proc, _ = start_temp_rethinkdb(38015) + + thread = Thread(target=delayed_start) + conn = Connection(port=38015) + query = r.expr('1') + thread.start() + assert conn.run(query) == '1' + proc.terminate() + proc.wait() + + +def test_changefeed_reconnects_when_connection_lost(monkeypatch): + import os + import time + import tempfile + import multiprocessing as mp + + import bigchaindb + from bigchaindb.pipelines.utils import ChangeFeed + + dbport = 38015 + dbname = 'test_' + str(os.getpid()) + directory = tempfile.mkdtemp() + + monkeypatch.setitem(bigchaindb.config, 'database', { + 'host': 'localhost', + 'port': dbport, + 'name': dbname + }) + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + # prepare DB and table + conn = r.connect(port=dbport) + r.db_create(dbname).run(conn) + r.db(dbname).table_create('cat_facts').run(conn) + + # initialize ChangeFeed and put it in a thread + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + changefeed.outqueue = mp.Queue() + t_changefeed = Thread(target=changefeed.run_forever) + + t_changefeed.start() + time.sleep(1) + + # insert some records in the table to start generating + # events that changefeed will put in `outqueue` + r.db(dbname).table('cat_facts').insert({ + 'fact': 'A group of cats is called a clowder.' + }).run(conn) + + # the event should be in the outqueue + fact = changefeed.outqueue.get()['fact'] + assert fact == 'A group of cats is called a clowder.' + + # stop the DB process + proc.terminate() + proc.wait() + + assert t_changefeed.is_alive() is True + + proc, _ = start_temp_rethinkdb(dbport, directory=directory) + + time.sleep(2) + + conn = r.connect(port=dbport) + r.db(dbname).table('cat_facts').insert({ + 'fact': 'Cats sleep 70% of their lives.' + }).run(conn) + + fact = changefeed.outqueue.get()['fact'] + assert fact == 'Cats sleep 70% of their lives.' From 1d073ee7061c32cb67cc676a0e388084635319e9 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:40:17 +0200 Subject: [PATCH 02/18] Use new Connection class to run queries --- bigchaindb/core.py | 116 +++++++++++++++++++--------------- bigchaindb/pipelines/utils.py | 1 + tests/pipelines/test_utils.py | 13 ++-- tests/test_commands.py | 8 ++- 4 files changed, 77 insertions(+), 61 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 3e344beb..7dd9ba1e 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -140,7 +140,9 @@ class Bigchain(object): signed_transaction.update({'assignee': assignee}) # write to the backlog - response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) + response = self.connection.run( + r.table('backlog') + .insert(signed_transaction, durability=durability)) return response def get_transaction(self, txid, include_status=False): @@ -181,13 +183,16 @@ class Bigchain(object): break # Query the transaction in the target block and return - response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\ - .get_field('block').get_field('transactions')\ - .filter(lambda tx: tx['id'] == txid).run(self.conn)[0] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(target_block_id) + .get_field('block') + .get_field('transactions') + .filter(lambda tx: tx['id'] == txid))[0] else: # Otherwise, check the backlog - response = r.table('backlog').get(txid).run(self.conn) + response = self.connection.run(r.table('backlog').get(txid)) if response: tx_status = self.TX_IN_BACKLOG @@ -221,8 +226,10 @@ class Bigchain(object): A list of blocks with with only election information """ # First, get information on all blocks which contain this transaction - response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\ - .pluck('votes', 'id', {'block': ['voters']}).run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(value, index=index)\ + .pluck('votes', 'id', {'block': ['voters']})) return list(response) @@ -275,11 +282,11 @@ class Bigchain(object): A list of transactions containing that payload. If no transaction exists with that payload it returns an empty list `[]` """ - cursor = r.table('bigchain', read_mode=self.read_mode) \ - .get_all(payload_uuid, index='payload_uuid') \ - .concat_map(lambda block: block['block']['transactions']) \ - .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \ - .run(self.conn) + cursor = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(payload_uuid, index='payload_uuid') + .concat_map(lambda block: block['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid)) transactions = list(cursor) return transactions @@ -298,11 +305,11 @@ class Bigchain(object): """ # checks if an input was already spent # checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...} - response = r.table('bigchain', read_mode=self.read_mode)\ - .concat_map(lambda doc: doc['block']['transactions'])\ - .filter(lambda transaction: transaction['transaction']['fulfillments'] - .contains(lambda fulfillment: fulfillment['input'] == tx_input))\ - .run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda transaction: transaction['transaction']['fulfillments'] + .contains(lambda fulfillment: fulfillment['input'] == tx_input))) transactions = list(response) @@ -337,12 +344,12 @@ class Bigchain(object): """ # get all transactions in which owner is in the `owners_after` list - response = r.table('bigchain', read_mode=self.read_mode) \ - .concat_map(lambda doc: doc['block']['transactions']) \ - .filter(lambda tx: tx['transaction']['conditions'] + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .concat_map(lambda doc: doc['block']['transactions']) + .filter(lambda tx: tx['transaction']['conditions'] .contains(lambda c: c['owners_after'] - .contains(owner))) \ - .run(self.conn) + .contains(owner)))) owned = [] for tx in response: @@ -486,8 +493,9 @@ class Bigchain(object): but the vote is invalid. """ - votes = list(r.table('votes', read_mode=self.read_mode)\ - .get_all([block['id'], self.me], index='block_and_voter').run(self.conn)) + votes = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .get_all([block['id'], self.me], index='block_and_voter'))) if len(votes) > 1: raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}' @@ -528,12 +536,15 @@ class Bigchain(object): """ block_serialized = rapidjson.dumps(block) - r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) + self.connection.run( + r.table('bigchain') + .insert(r.json(block_serialized), durability=durability)) # TODO: Decide if we need this method def transaction_exists(self, transaction_id): - response = r.table('bigchain', read_mode=self.read_mode)\ - .get_all(transaction_id, index='transaction_id').run(self.conn) + response = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get_all(transaction_id, index='transaction_id')) return True if len(response.items) > 0 else False def prepare_genesis_block(self): @@ -560,7 +571,9 @@ class Bigchain(object): # 2. create the block with one transaction # 3. write the block to the bigchain - blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn) + blocks_count = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .count()) if blocks_count: raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block') @@ -606,30 +619,30 @@ class Bigchain(object): def write_vote(self, vote): """Write the vote to the database.""" - r.table('votes') \ - .insert(vote) \ - .run(self.conn) + self.connection.run( + r.table('votes') + .insert(vote)) def get_last_voted_block(self): """Returns the last block that this node voted on.""" try: # get the latest value for the vote timestamp (over all votes) - max_timestamp = r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['node_pubkey'] == self.me) \ - .max(r.row['vote']['timestamp']) \ - .run(self.conn)['vote']['timestamp'] + max_timestamp = self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['node_pubkey'] == self.me) + .max(r.row['vote']['timestamp']))['vote']['timestamp'] - last_voted = list(r.table('votes', read_mode=self.read_mode) \ - .filter(r.row['vote']['timestamp'] == max_timestamp) \ - .filter(r.row['node_pubkey'] == self.me) \ - .run(self.conn)) + last_voted = list(self.connection.run( + r.table('votes', read_mode=self.read_mode) + .filter(r.row['vote']['timestamp'] == max_timestamp) + .filter(r.row['node_pubkey'] == self.me))) except r.ReqlNonExistenceError: # return last vote if last vote exists else return Genesis block - return list(r.table('bigchain', read_mode=self.read_mode) - .filter(util.is_genesis_block) - .run(self.conn))[0] + return list(self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(util.is_genesis_block)))[0] # Now the fun starts. Since the resolution of timestamp is a second, # we might have more than one vote per timestamp. If this is the case @@ -661,19 +674,21 @@ class Bigchain(object): except KeyError: break - res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn) + res = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .get(last_block_id)) return res def get_unvoted_blocks(self): """Return all the blocks that has not been voted by this node.""" - unvoted = r.table('bigchain', read_mode=self.read_mode) \ - .filter(lambda block: r.table('votes', read_mode=self.read_mode) + unvoted = self.connection.run( + r.table('bigchain', read_mode=self.read_mode) + .filter(lambda block: r.table('votes', read_mode=self.read_mode) .get_all([block['id'], self.me], index='block_and_voter') - .is_empty()) \ - .order_by(r.asc(r.row['block']['timestamp'])) \ - .run(self.conn) + .is_empty()) + .order_by(r.asc(r.row['block']['timestamp']))) # FIXME: I (@vrde) don't like this solution. Filtering should be done at a # database level. Solving issue #444 can help untangling the situation @@ -684,9 +699,8 @@ class Bigchain(object): def block_election_status(self, block): """Tally the votes on a block, and return the status: valid, invalid, or undecided.""" - votes = r.table('votes', read_mode=self.read_mode) \ - .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \ - .run(self.conn) + votes = self.connection.run(r.table('votes', read_mode=self.read_mode) + .between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter')) votes = list(votes) diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index ffa8fc7b..a6b74dfc 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -55,6 +55,7 @@ class ChangeFeed(Node): while True: try: self.run_changefeed() + break except (r.ReqlDriverError, r.ReqlOpFailedError) as exc: logger.exception(exc) time.sleep(1) diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..459ae7f0 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -1,8 +1,7 @@ from unittest.mock import patch -import rethinkdb - from multipipes import Pipe +from bigchaindb.db.utils import Connection from bigchaindb.pipelines.utils import ChangeFeed @@ -18,7 +17,7 @@ MOCK_CHANGEFEED_DATA = [{ }] -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_insert(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT) @@ -28,7 +27,7 @@ def test_changefeed_insert(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_delete(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.DELETE) @@ -38,7 +37,7 @@ def test_changefeed_delete(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_update(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) @@ -49,7 +48,7 @@ def test_changefeed_update(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_multiple_operations(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE) @@ -61,7 +60,7 @@ def test_changefeed_multiple_operations(mock_run): assert outpipe.qsize() == 0 -@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA) +@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA) def test_changefeed_prefeed(mock_run): outpipe = Pipe() changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3]) diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..8a22c160 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -67,7 +67,7 @@ def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_in run_start(args) -@patch('bigchaindb.commands.utils.start_rethinkdb') +@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=(Mock(), 2)) def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_run_configure, mock_processes_start, @@ -213,8 +213,10 @@ def test_run_configure_when_config_does_exist(monkeypatch, @patch('subprocess.Popen') def test_start_rethinkdb_returns_a_process_when_successful(mock_popen): from bigchaindb.commands import utils - mock_popen.return_value = Mock(stdout=['Server ready']) - assert utils.start_rethinkdb() is mock_popen.return_value + mock_popen.return_value = Mock(stdout=[ + 'Listening for client driver 1234', + 'Server ready']) + assert utils.start_rethinkdb() == (mock_popen.return_value, 1234) @patch('subprocess.Popen') From 7097efaa33299fa85bde1ae77d062ba6782233d6 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:47:22 +0200 Subject: [PATCH 03/18] Daemonize thread to make sure test exits --- tests/test_run_query_util.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index f2f33dac..3c37e140 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -72,7 +72,7 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): # initialize ChangeFeed and put it in a thread changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) changefeed.outqueue = mp.Queue() - t_changefeed = Thread(target=changefeed.run_forever) + t_changefeed = Thread(target=changefeed.run_forever, daemon=True) t_changefeed.start() time.sleep(1) @@ -104,3 +104,7 @@ def test_changefeed_reconnects_when_connection_lost(monkeypatch): fact = changefeed.outqueue.get()['fact'] assert fact == 'Cats sleep 70% of their lives.' + + # stop the DB process + proc.terminate() + proc.wait() From 52243e12715e08db193daa03846b794d6e95fde9 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 00:54:19 +0200 Subject: [PATCH 04/18] Use new connection class in pipeline --- bigchaindb/pipelines/block.py | 22 ++++++++++++---------- bigchaindb/pipelines/election.py | 7 ++++--- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..0e9f8d04 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -53,10 +53,10 @@ class Block: Returns: The transaction. """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) + self.bigchain.connection.run( + r.table('backlog') + .get(tx['id']) + .delete(durability='hard')) return tx @@ -118,12 +118,14 @@ def initial(): b = Bigchain() - rs = r.table('backlog')\ - .between([b.me, r.minval], - [b.me, r.maxval], - index='assignee__transaction_timestamp')\ - .order_by(index=r.asc('assignee__transaction_timestamp'))\ - .run(b.conn) + rs = b.connection.run( + r.table('backlog') + .between( + [b.me, r.minval], + [b.me, r.maxval], + index='assignee__transaction_timestamp') + .order_by(index=r.asc('assignee__transaction_timestamp'))) + return rs diff --git a/bigchaindb/pipelines/election.py b/bigchaindb/pipelines/election.py index 7a0e114c..cf464e5c 100644 --- a/bigchaindb/pipelines/election.py +++ b/bigchaindb/pipelines/election.py @@ -25,9 +25,10 @@ class Election: """ Checks if block has enough invalid votes to make a decision """ - next_block = r.table('bigchain')\ - .get(next_vote['vote']['voting_for_block'])\ - .run(self.bigchain.conn) + next_block = self.bigchain.connection.run( + r.table('bigchain') + .get(next_vote['vote']['voting_for_block'])) + if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID: return next_block From 65bc86f06e05f28f5239e8b806286975d57011aa Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 01:01:58 +0200 Subject: [PATCH 05/18] Add docstring to Connection class --- bigchaindb/db/utils.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/bigchaindb/db/utils.py b/bigchaindb/db/utils.py index 627782e9..84aabdd4 100644 --- a/bigchaindb/db/utils.py +++ b/bigchaindb/db/utils.py @@ -13,8 +13,23 @@ logger = logging.getLogger(__name__) class Connection: + """This class is a proxy to run queries against the database, + it is: + - lazy, since it creates a connection only when needed + - resilient, because before raising exceptions it tries + more times to run the query or open a connection. + """ def __init__(self, host=None, port=None, db=None, max_tries=3): + """Create a new Connection instance. + + Args: + host (str, optional): the host to connect to. + port (int, optional): the port to connect to. + db (str, optional): the database to use. + max_tries (int, optional): how many tries before giving up. + """ + self.host = host or bigchaindb.config['database']['host'] self.port = port or bigchaindb.config['database']['port'] self.db = db or bigchaindb.config['database']['name'] @@ -22,6 +37,12 @@ class Connection: self.conn = None def run(self, query): + """Run a query. + + Args: + query: the RethinkDB query. + """ + if self.conn is None: self._connect() From 39228be454325841458c532220c27edabd358927 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 01:51:18 +0200 Subject: [PATCH 06/18] Remove dependency on external RethinkDB instance --- bigchaindb/pipelines/utils.py | 5 +- tests/test_run_query_util.py | 89 +++++++++++++++++------------------ 2 files changed, 46 insertions(+), 48 deletions(-) diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index a6b74dfc..8f4c3b99 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -29,7 +29,7 @@ class ChangeFeed(Node): DELETE = 2 UPDATE = 4 - def __init__(self, table, operation, prefeed=None): + def __init__(self, table, operation, prefeed=None, bigchain=None): """Create a new RethinkDB ChangeFeed. Args: @@ -40,13 +40,14 @@ class ChangeFeed(Node): (e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``) prefeed (iterable): whatever set of data you want to be published first. + bigchain (``Bigchain``): the bigchain instance to use (can be None). """ super().__init__(name='changefeed') self.prefeed = prefeed if prefeed else [] self.table = table self.operation = operation - self.bigchain = Bigchain() + self.bigchain = bigchain or Bigchain() def run_forever(self): for element in self.prefeed: diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index 3c37e140..bac63048 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -1,3 +1,4 @@ +from unittest.mock import patch from threading import Thread import pytest @@ -26,85 +27,81 @@ def test_raise_exception_when_max_tries(): def test_reconnect_when_connection_lost(): import time + import rethinkdb as r - proc = None + def raise_exception(*args, **kwargs): + raise r.ReqlDriverError('mock') + + conn = Connection() + original_connect = r.connect + r.connect = raise_exception def delayed_start(): - nonlocal proc time.sleep(1) - proc, _ = start_temp_rethinkdb(38015) + r.connect = original_connect thread = Thread(target=delayed_start) - conn = Connection(port=38015) query = r.expr('1') thread.start() assert conn.run(query) == '1' - proc.terminate() - proc.wait() def test_changefeed_reconnects_when_connection_lost(monkeypatch): - import os import time - import tempfile import multiprocessing as mp - import bigchaindb + from bigchaindb import Bigchain from bigchaindb.pipelines.utils import ChangeFeed - dbport = 38015 - dbname = 'test_' + str(os.getpid()) - directory = tempfile.mkdtemp() + class MockConnection: + tries = 0 - monkeypatch.setitem(bigchaindb.config, 'database', { - 'host': 'localhost', - 'port': dbport, - 'name': dbname - }) + def run(self, *args, **kwargs): + return self - proc, _ = start_temp_rethinkdb(dbport, directory=directory) + def __iter__(self): + return self - # prepare DB and table - conn = r.connect(port=dbport) - r.db_create(dbname).run(conn) - r.db(dbname).table_create('cat_facts').run(conn) + def __next__(self): + self.tries += 1 + if self.tries == 1: + raise r.ReqlDriverError('mock') + elif self.tries == 2: + return { 'new_val': { 'fact': 'A group of cats is called a clowder.' }, + 'old_val': None } + if self.tries == 3: + raise r.ReqlDriverError('mock') + elif self.tries == 4: + return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' }, + 'old_val': None } + else: + time.sleep(10) - # initialize ChangeFeed and put it in a thread - changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT) + + bigchain = Bigchain() + bigchain.connection = MockConnection() + changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT, + bigchain=bigchain) changefeed.outqueue = mp.Queue() t_changefeed = Thread(target=changefeed.run_forever, daemon=True) t_changefeed.start() time.sleep(1) + # try 1: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. - # insert some records in the table to start generating - # events that changefeed will put in `outqueue` - r.db(dbname).table('cat_facts').insert({ - 'fact': 'A group of cats is called a clowder.' - }).run(conn) - - # the event should be in the outqueue + # try 2: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'A group of cats is called a clowder.' - # stop the DB process - proc.terminate() - proc.wait() - + # try 3: MockConnection raises an error that will stop the + # ChangeFeed instance from iterating for 1 second. assert t_changefeed.is_alive() is True - proc, _ = start_temp_rethinkdb(dbport, directory=directory) - time.sleep(2) - - conn = r.connect(port=dbport) - r.db(dbname).table('cat_facts').insert({ - 'fact': 'Cats sleep 70% of their lives.' - }).run(conn) + # try 4: MockConnection releases a new record. The new record + # will be put in the outqueue of the ChangeFeed instance. fact = changefeed.outqueue.get()['fact'] assert fact == 'Cats sleep 70% of their lives.' - - # stop the DB process - proc.terminate() - proc.wait() From b8e6b0b830c0674deb322734971d34f772edd6a1 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 02:06:52 +0200 Subject: [PATCH 07/18] Add code coverage for start_temp_rethinkdb --- tests/test_commands.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_commands.py b/tests/test_commands.py index 8a22c160..d7cce77f 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -228,6 +228,15 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() +@patch('subprocess.Popen') +def test_start_temp_rethinkdb_returns_a_process_when_successful(mock_popen): + from bigchaindb.commands import utils + mock_popen.return_value = Mock(stdout=[ + 'Listening for client driver 1234', + 'Server ready']) + assert utils.start_temp_rethinkdb() == (mock_popen.return_value, 1234) + + @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards From 92981e003dbe37d1e54144950ae0e3c009ccb60d Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Wed, 7 Sep 2016 16:26:41 +0200 Subject: [PATCH 08/18] Core/198/handle stale transactions (#359) * add timestamp to transaction assignment * add reassignment delay to configuration * refactor to multipipes * # This is a combination of 7 commits. # The first commit's message is: stale transaction monitor and tests # The 2nd commit message will be skipped: # simplify logic # The 3rd commit message will be skipped: # node will assign to self # The 4th commit message will be skipped: # block listens for insert and update # The 5th commit message will be skipped: # more test coverage # The 6th commit message will be skipped: # test coverage # The 7th commit message will be skipped: # test coverage * stale transaction monitor and tests * update operation only returns new value --- bigchaindb/__init__.py | 1 + bigchaindb/commands/bigchain.py | 5 ++ bigchaindb/core.py | 47 +++++++++- bigchaindb/pipelines/block.py | 3 +- bigchaindb/pipelines/stale.py | 76 ++++++++++++++++ bigchaindb/pipelines/utils.py | 2 +- bigchaindb/processes.py | 5 +- tests/db/test_bigchain_api.py | 1 + tests/pipelines/test_block_creation.py | 1 + tests/pipelines/test_election.py | 1 + tests/pipelines/test_stale_monitor.py | 116 +++++++++++++++++++++++++ tests/pipelines/test_utils.py | 6 +- tests/test_commands.py | 1 + tests/test_config_utils.py | 4 +- tests/test_core.py | 3 +- tests/test_processes.py | 23 +++++ 16 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 bigchaindb/pipelines/stale.py create mode 100644 tests/pipelines/test_stale_monitor.py create mode 100644 tests/test_processes.py diff --git a/bigchaindb/__init__.py b/bigchaindb/__init__.py index ee864ead..d5fe15e6 100644 --- a/bigchaindb/__init__.py +++ b/bigchaindb/__init__.py @@ -31,6 +31,7 @@ config = { }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } # We need to maintain a backup copy of the original config dict in case diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 1943167b..5edca2c4 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False): input('Statsd {}? (default `{}`): '.format(key, val)) \ or val + val = conf['backlog_reassign_delay'] + conf['backlog_reassign_delay'] = \ + input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \ + or val + if config_path != '-': bigchaindb.config_utils.write_config(conf, config_path) else: diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 95cccfc7..cc753a42 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -2,6 +2,7 @@ import random import math import collections from copy import deepcopy +from time import time from itertools import compress import rethinkdb as r @@ -28,7 +29,7 @@ class Bigchain(object): def __init__(self, host=None, port=None, dbname=None, public_key=None, private_key=None, keyring=[], - consensus_plugin=None): + consensus_plugin=None, backlog_reassign_delay=None): """Initialize the Bigchain instance A Bigchain instance has several configuration parameters (e.g. host). @@ -56,6 +57,7 @@ class Bigchain(object): self.me = public_key or bigchaindb.config['keypair']['public'] self.me_private = private_key or bigchaindb.config['keypair']['private'] self.nodes_except_me = keyring or bigchaindb.config['keyring'] + self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay'] self.consensus = config_utils.load_consensus_plugin(consensus_plugin) # change RethinkDB read mode to majority. This ensures consistency in query results self.read_mode = 'majority' @@ -136,11 +138,54 @@ class Bigchain(object): signed_transaction = deepcopy(signed_transaction) # update the transaction signed_transaction.update({'assignee': assignee}) + signed_transaction.update({'assignment_timestamp': time()}) # write to the backlog response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn) return response + def reassign_transaction(self, transaction, durability='hard'): + """Assign a transaction to a new node + + Args: + transaction (dict): assigned transaction + + Returns: + dict: database response or None if no reassignment is possible + """ + + if self.nodes_except_me: + try: + federation_nodes = self.nodes_except_me + [self.me] + index_current_assignee = federation_nodes.index(transaction['assignee']) + new_assignee = random.choice(federation_nodes[:index_current_assignee] + + federation_nodes[index_current_assignee + 1:]) + except ValueError: + # current assignee not in federation + new_assignee = random.choice(self.nodes_except_me) + + else: + # There is no other node to assign to + new_assignee = self.me + + response = r.table('backlog')\ + .get(transaction['id'])\ + .update({'assignee': new_assignee, + 'assignment_timestamp': time()}, + durability=durability).run(self.conn) + return response + + def get_stale_transactions(self): + """Get a RethinkDB cursor of stale transactions + + Transactions are considered stale if they have been assigned a node, but are still in the + backlog after some amount of time specified in the configuration + """ + + return r.table('backlog')\ + .filter(lambda tx: time() - tx['assignment_timestamp'] > + self.backlog_reassign_delay).run(self.conn) + def get_transaction(self, txid, include_status=False): """Retrieve a transaction with `txid` from bigchain. diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 59375c57..47fbd008 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -130,7 +130,8 @@ def initial(): def get_changefeed(): """Create and return the changefeed for the backlog.""" - return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial()) + return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE, + prefeed=initial()) def create_pipeline(): diff --git a/bigchaindb/pipelines/stale.py b/bigchaindb/pipelines/stale.py new file mode 100644 index 00000000..e1e14e21 --- /dev/null +++ b/bigchaindb/pipelines/stale.py @@ -0,0 +1,76 @@ +"""This module monitors for stale transactions. + +It reassigns transactions which have been assigned a node but +remain in the backlog past a certain amount of time. +""" + +import logging +from multipipes import Pipeline, Node +from bigchaindb import Bigchain +from time import sleep + + +logger = logging.getLogger(__name__) + + +class StaleTransactionMonitor: + """This class encapsulates the logic for re-assigning stale transactions. + + Note: + Methods of this class will be executed in different processes. + """ + + def __init__(self, timeout=5, backlog_reassign_delay=None): + """Initialize StaleTransaction monitor + + Args: + timeout: how often to check for stale tx (in sec) + backlog_reassign_delay: How stale a transaction should + be before reassignment (in sec). If supplied, overrides the + Bigchain default value. + """ + self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay) + self.timeout = timeout + + def check_transactions(self): + """Poll backlog for stale transactions + + Returns: + txs (list): txs to be re assigned + """ + sleep(self.timeout) + for tx in self.bigchain.get_stale_transactions(): + yield tx + + def reassign_transactions(self, tx): + """Put tx back in backlog with new assignee + + Returns: + transaction + """ + self.bigchain.reassign_transaction(tx) + return tx + + +def create_pipeline(timeout=5, backlog_reassign_delay=5): + """Create and return the pipeline of operations to be distributed + on different processes.""" + + stm = StaleTransactionMonitor(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + + monitor_pipeline = Pipeline([ + Node(stm.check_transactions), + Node(stm.reassign_transactions) + ]) + + return monitor_pipeline + + +def start(timeout=5, backlog_reassign_delay=5): + """Create, start, and return the block pipeline.""" + pipeline = create_pipeline(timeout=timeout, + backlog_reassign_delay=backlog_reassign_delay) + pipeline.setup() + pipeline.start() + return pipeline diff --git a/bigchaindb/pipelines/utils.py b/bigchaindb/pipelines/utils.py index 22a5f9bc..9c28907e 100644 --- a/bigchaindb/pipelines/utils.py +++ b/bigchaindb/pipelines/utils.py @@ -58,5 +58,5 @@ class ChangeFeed(Node): elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(change['old_val']) elif is_update and (self.operation & ChangeFeed.UPDATE): - self.outqueue.put(change) + self.outqueue.put(change['new_val']) diff --git a/bigchaindb/processes.py b/bigchaindb/processes.py index 4b8aa0eb..c3625fdc 100644 --- a/bigchaindb/processes.py +++ b/bigchaindb/processes.py @@ -2,7 +2,7 @@ import logging import multiprocessing as mp import bigchaindb -from bigchaindb.pipelines import vote, block, election +from bigchaindb.pipelines import vote, block, election, stale from bigchaindb.web import server @@ -31,6 +31,9 @@ def start(): logger.info('Starting voter') vote.start() + logger.info('Starting stale transaction monitor') + stale.start() + logger.info('Starting election') election.start() diff --git a/tests/db/test_bigchain_api.py b/tests/db/test_bigchain_api.py index 7242402a..166e4929 100644 --- a/tests/db/test_bigchain_api.py +++ b/tests/db/test_bigchain_api.py @@ -132,6 +132,7 @@ class TestBigchainApi(object): response, status = b.get_transaction(tx_signed["id"], include_status=True) response.pop('assignee') + response.pop('assignment_timestamp') # add validity information, which will be returned assert util.serialize(tx_signed) == util.serialize(response) assert status == b.TX_IN_BACKLOG diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index ce300730..4abf72cc 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -78,6 +78,7 @@ def test_delete_tx(b, user_vk): tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx returned_tx = block_maker.delete_tx(tx) diff --git a/tests/pipelines/test_election.py b/tests/pipelines/test_election.py index a71714df..02a0e39d 100644 --- a/tests/pipelines/test_election.py +++ b/tests/pipelines/test_election.py @@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk): e.requeue_transactions(test_block) tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn) tx_backlog.pop('assignee') + tx_backlog.pop('assignment_timestamp') assert tx_backlog == tx1 diff --git a/tests/pipelines/test_stale_monitor.py b/tests/pipelines/test_stale_monitor.py new file mode 100644 index 00000000..f6cb4a0b --- /dev/null +++ b/tests/pipelines/test_stale_monitor.py @@ -0,0 +1,116 @@ +import rethinkdb as r +from bigchaindb import Bigchain +from bigchaindb.pipelines import stale +from multipipes import Pipe, Pipeline +from unittest.mock import patch +from bigchaindb import config_utils +import time +import os + + +def test_get_stale(b, user_vk): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + tx_stale = stm.check_transactions() + + for _tx in tx_stale: + _tx.pop('assignee') + _tx.pop('assignment_timestamp') + assert tx == _tx + + +def test_reassign_transactions(b, user_vk): + # test with single node + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.reassign_transactions(tx) + + # test with federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + b.write_transaction(tx, durability='hard') + + stm = stale.StaleTransactionMonitor(timeout=0.001, + backlog_reassign_delay=0.001) + stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc'] + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + + reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn) + assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp'] + assert reassigned_tx['assignee'] != tx['assignee'] + + # test with node not in federation + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + tx.update({'assignee': 'lol'}) + tx.update({'assignment_timestamp': time.time()}) + r.table('backlog').insert(tx, durability='hard').run(b.conn) + + tx = list(r.table('backlog').run(b.conn))[0] + stm.reassign_transactions(tx) + assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol' + + +def test_full_pipeline(user_vk): + CONFIG = { + 'database': { + 'name': 'bigchain_test_{}'.format(os.getpid()) + }, + 'keypair': { + 'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA', + 'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8' + }, + 'keyring': ['aaa', 'bbb'], + 'backlog_reassign_delay': 0.01 + } + config_utils.set_config(CONFIG) + b = Bigchain() + outpipe = Pipe() + + original_txs = {} + + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + + b.write_transaction(tx) + original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn) + + assert r.table('backlog').count().run(b.conn) == 100 + + pipeline = stale.create_pipeline(backlog_reassign_delay=1, + timeout=1) + pipeline.setup(outdata=outpipe) + pipeline.start() + + # timing should be careful -- test will fail if reassignment happens multiple times + time.sleep(2) + pipeline.terminate() + + # to terminate + outpipe.get() + + assert r.table('backlog').count().run(b.conn) == 100 + reassigned_txs = list(r.table('backlog').run(b.conn)) + + # check that every assignment timestamp has increased, and every tx has a new assignee + for reassigned_tx in reassigned_txs: + assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp'] + assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee'] + +@patch.object(Pipeline, 'start') +def test_start(mock_start): + # TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`, + # that is tested by `test_full_pipeline`. + # If anyone has better ideas on how to test this, please do a PR :) + stale.start() + mock_start.assert_called_with() diff --git a/tests/pipelines/test_utils.py b/tests/pipelines/test_utils.py index ebef38c4..f46c1a9d 100644 --- a/tests/pipelines/test_utils.py +++ b/tests/pipelines/test_utils.py @@ -44,8 +44,7 @@ def test_changefeed_update(mock_run): changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE) changefeed.outqueue = outpipe changefeed.run_forever() - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 @@ -56,8 +55,7 @@ def test_changefeed_multiple_operations(mock_run): changefeed.outqueue = outpipe changefeed.run_forever() assert outpipe.get() == 'seems like we have an insert here' - assert outpipe.get() == {'new_val': 'seems like we have an update here', - 'old_val': 'seems like we have an update here'} + assert outpipe.get() == 'seems like we have an update here' assert outpipe.qsize() == 0 diff --git a/tests/test_commands.py b/tests/test_commands.py index c515479f..0b8df348 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -57,6 +57,7 @@ def mock_bigchaindb_backup_config(monkeypatch): 'keypair': {}, 'database': {'host': 'host', 'port': 12345, 'name': 'adbname'}, 'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb._config', config) diff --git a/tests/test_config_utils.py b/tests/test_config_utils.py index bce2c9a8..4d2b3a1e 100644 --- a/tests/test_config_utils.py +++ b/tests/test_config_utils.py @@ -145,7 +145,8 @@ def test_env_config(monkeypatch): def test_autoconfigure_read_both_from_file_and_env(monkeypatch): file_config = { - 'database': {'host': 'test-host'} + 'database': {'host': 'test-host'}, + 'backlog_reassign_delay': 5 } monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config) monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname', @@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch): }, 'api_endpoint': 'http://localhost:9984/api/v1', 'consensus_plugin': 'default', + 'backlog_reassign_delay': 5 } diff --git a/tests/test_core.py b/tests/test_core.py index 2650ff37..397158d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -19,7 +19,8 @@ def config(request, monkeypatch): }, 'keyring': [], 'CONFIGURED': True, - 'consensus_plugin': 'default' + 'consensus_plugin': 'default', + 'backlog_reassign_delay': 30 } monkeypatch.setattr('bigchaindb.config', config) diff --git a/tests/test_processes.py b/tests/test_processes.py new file mode 100644 index 00000000..7013dc03 --- /dev/null +++ b/tests/test_processes.py @@ -0,0 +1,23 @@ +from unittest.mock import patch + +from multiprocessing import Process +from bigchaindb.pipelines import vote, block, election, stale + + +@patch.object(stale, 'start') +@patch.object(election, 'start') +@patch.object(block, 'start') +@patch.object(vote, 'start') +@patch.object(Process, 'start') +def test_processes_start(mock_vote, mock_block, mock_election, mock_stale, + mock_process): + from bigchaindb import processes + + processes.start() + + mock_vote.assert_called_with() + mock_block.assert_called_with() + mock_election.assert_called_with() + mock_stale.assert_called_with() + mock_process.assert_called_with() + From b9fba73c143deb40c3f554906024f90be3f913dd Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 7 Sep 2016 17:32:34 +0200 Subject: [PATCH 09/18] Remove unused code --- bigchaindb/commands/bigchain.py | 2 +- bigchaindb/commands/utils.py | 45 ++++++++++----------------------- bigchaindb/core.py | 2 +- tests/test_commands.py | 13 ++-------- tests/test_run_query_util.py | 2 -- 5 files changed, 17 insertions(+), 47 deletions(-) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index a4375c8e..1943167b 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -156,7 +156,7 @@ def run_start(args): if args.start_rethinkdb: try: - proc, port = utils.start_rethinkdb() + proc = utils.start_rethinkdb() except StartupError as e: sys.exit('Error starting RethinkDB, reason is: {}'.format(e)) logger.info('RethinkDB started with PID %s' % proc.pid) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index 02f2a997..ac2c4d36 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -5,7 +5,6 @@ for ``argparse.ArgumentParser``. import argparse import multiprocessing as mp import subprocess -import tempfile import rethinkdb as r @@ -15,18 +14,7 @@ from bigchaindb import db from bigchaindb.version import __version__ -def start_temp_rethinkdb(port=0, directory=None): - directory = directory or tempfile.mkdtemp() - - extra_opts = ['--cluster-port', '0', - '--driver-port', str(port), - '--no-http-admin', - '--directory', directory] - - return start_rethinkdb(wait_for_db=False, extra_opts=extra_opts) - - -def start_rethinkdb(wait_for_db=True, extra_opts=None): +def start_rethinkdb(): """Start RethinkDB as a child process and wait for it to be available. @@ -40,38 +28,31 @@ def start_rethinkdb(wait_for_db=True, extra_opts=None): be started. """ - if not extra_opts: - extra_opts = [] - - proc = subprocess.Popen(['rethinkdb', '--bind', 'all'] + extra_opts, + proc = subprocess.Popen(['rethinkdb', '--bind', 'all'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) dbname = bigchaindb.config['database']['name'] line = '' - port = None for line in proc.stdout: - if line.startswith('Listening for client driver'): - port = int(line.split()[-1]) if line.startswith('Server ready'): + # FIXME: seems like tables are not ready when the server is ready, # that's why we need to query RethinkDB to know the state # of the database. This code assumes the tables are ready # when the database is ready. This seems a valid assumption. - - if wait_for_db: - try: - conn = db.get_conn() - # Before checking if the db is ready, we need to query - # the server to check if it contains that db - if r.db_list().contains(dbname).run(conn): - r.db(dbname).wait().run(conn) - except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: - raise StartupError('Error waiting for the database `{}` ' - 'to be ready'.format(dbname)) from exc - return proc, port + try: + conn = db.get_conn() + # Before checking if the db is ready, we need to query + # the server to check if it contains that db + if r.db_list().contains(dbname).run(conn): + r.db(dbname).wait().run(conn) + except (r.ReqlOpFailedError, r.ReqlDriverError) as exc: + raise StartupError('Error waiting for the database `{}` ' + 'to be ready'.format(dbname)) from exc + return proc # We are here when we exhaust the stdout of the process. # The last `line` contains info about the error. diff --git a/bigchaindb/core.py b/bigchaindb/core.py index 7dd9ba1e..2248be5e 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -228,7 +228,7 @@ class Bigchain(object): # First, get information on all blocks which contain this transaction response = self.connection.run( r.table('bigchain', read_mode=self.read_mode) - .get_all(value, index=index)\ + .get_all(value, index=index) .pluck('votes', 'id', {'block': ['voters']})) return list(response) diff --git a/tests/test_commands.py b/tests/test_commands.py index d7cce77f..87b05b89 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -67,7 +67,7 @@ def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_in run_start(args) -@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=(Mock(), 2)) +@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=Mock()) def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_run_configure, mock_processes_start, @@ -216,7 +216,7 @@ def test_start_rethinkdb_returns_a_process_when_successful(mock_popen): mock_popen.return_value = Mock(stdout=[ 'Listening for client driver 1234', 'Server ready']) - assert utils.start_rethinkdb() == (mock_popen.return_value, 1234) + assert utils.start_rethinkdb() is mock_popen.return_value @patch('subprocess.Popen') @@ -228,15 +228,6 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() -@patch('subprocess.Popen') -def test_start_temp_rethinkdb_returns_a_process_when_successful(mock_popen): - from bigchaindb.commands import utils - mock_popen.return_value = Mock(stdout=[ - 'Listening for client driver 1234', - 'Server ready']) - assert utils.start_temp_rethinkdb() == (mock_popen.return_value, 1234) - - @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards diff --git a/tests/test_run_query_util.py b/tests/test_run_query_util.py index bac63048..f81bd232 100644 --- a/tests/test_run_query_util.py +++ b/tests/test_run_query_util.py @@ -1,10 +1,8 @@ -from unittest.mock import patch from threading import Thread import pytest import rethinkdb as r -from bigchaindb.commands.utils import start_temp_rethinkdb from bigchaindb.db.utils import Connection From 9426c7f866e67f096549be50dd66b6f04ca0aa79 Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Thu, 8 Sep 2016 11:26:25 +0200 Subject: [PATCH 10/18] fix assignment timestamp in block (#627) --- bigchaindb/pipelines/block.py | 1 + tests/pipelines/test_block_creation.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 47fbd008..1d42423f 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -42,6 +42,7 @@ class Block: if tx['assignee'] == self.bigchain.me: tx.pop('assignee') + tx.pop('assignment_timestamp') return tx def delete_tx(self, tx): diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index 4abf72cc..a41f65b5 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk): tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) tx['assignee'] = b.me + tx['assignment_timestamp'] = 111 # filter_tx has side effects on the `tx` instance by popping 'assignee' - assert block_maker.filter_tx(tx) == tx + # and 'assignment_timestamp' + filtered_tx = block_maker.filter_tx(tx) + assert filtered_tx == tx + assert 'assignee' not in filtered_tx + assert 'assignment_timestamp' not in filtered_tx tx = b.create_transaction(b.me, user_vk, None, 'CREATE') tx = b.sign_transaction(tx, b.me_private) @@ -116,6 +121,7 @@ def test_full_pipeline(b, user_vk): tx = b.sign_transaction(tx, b.me_private) assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc']) tx['assignee'] = assignee + tx['assignment_timestamp'] = time.time() if assignee == b.me: count_assigned_to_me += 1 r.table('backlog').insert(tx, durability='hard').run(b.conn) From 404f3a1c4582607e95b0d8a567d6f9b77f19ec2b Mon Sep 17 00:00:00 2001 From: Ryan Henderson Date: Thu, 8 Sep 2016 11:45:48 +0200 Subject: [PATCH 11/18] Delete transactions after block is written (#609) * delete transactions after block is written * cleanup transaction_exists * check for duplicate transactions * delete invalid tx from backlog * test duplicate transaction --- bigchaindb/core.py | 3 +- bigchaindb/pipelines/block.py | 63 ++++++++++++++++++-------- tests/pipelines/test_block_creation.py | 57 +++++++++++++++++++---- 3 files changed, 92 insertions(+), 31 deletions(-) diff --git a/bigchaindb/core.py b/bigchaindb/core.py index cc753a42..132a9ce7 100644 --- a/bigchaindb/core.py +++ b/bigchaindb/core.py @@ -573,11 +573,10 @@ class Bigchain(object): block_serialized = rapidjson.dumps(block) r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn) - # TODO: Decide if we need this method def transaction_exists(self, transaction_id): response = r.table('bigchain', read_mode=self.read_mode)\ .get_all(transaction_id, index='transaction_id').run(self.conn) - return True if len(response.items) > 0 else False + return len(response.items) > 0 def prepare_genesis_block(self): """Prepare a genesis block.""" diff --git a/bigchaindb/pipelines/block.py b/bigchaindb/pipelines/block.py index 1d42423f..1cd2f6c9 100644 --- a/bigchaindb/pipelines/block.py +++ b/bigchaindb/pipelines/block.py @@ -45,34 +45,43 @@ class Block: tx.pop('assignment_timestamp') return tx - def delete_tx(self, tx): - """Delete a transaction. - - Args: - tx (dict): the transaction to delete. - - Returns: - The transaction. - """ - r.table('backlog')\ - .get(tx['id'])\ - .delete(durability='hard')\ - .run(self.bigchain.conn) - - return tx - def validate_tx(self, tx): """Validate a transaction. + Also checks if the transaction already exists in the blockchain. If it + does, or it's invalid, it's deleted from the backlog immediately. + Args: tx (dict): the transaction to validate. Returns: The transaction if valid, ``None`` otherwise. """ - tx = self.bigchain.is_valid_transaction(tx) - if tx: + if self.bigchain.transaction_exists(tx['id']): + # if the transaction already exists, we must check whether + # it's in a valid or undecided block + tx, status = self.bigchain.get_transaction(tx['id'], + include_status=True) + if status == self.bigchain.TX_VALID \ + or status == self.bigchain.TX_UNDECIDED: + # if the tx is already in a valid or undecided block, + # then it no longer should be in the backlog, or added + # to a new block. We can delete and drop it. + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None + + tx_validated = self.bigchain.is_valid_transaction(tx) + if tx_validated: return tx + else: + # if the transaction is not valid, remove it from the + # backlog + r.table('backlog').get(tx['id']) \ + .delete(durability='hard') \ + .run(self.bigchain.conn) + return None def create(self, tx, timeout=False): """Create a block. @@ -113,6 +122,22 @@ class Block: self.bigchain.write_block(block) return block + def delete_tx(self, block): + """Delete transactions. + + Args: + block (dict): the block containg the transactions to delete. + + Returns: + The block. + """ + r.table('backlog')\ + .get_all(*[tx['id'] for tx in block['block']['transactions']])\ + .delete(durability='hard')\ + .run(self.bigchain.conn) + + return block + def initial(): """Return old transactions from the backlog.""" @@ -143,10 +168,10 @@ def create_pipeline(): block_pipeline = Pipeline([ Node(block.filter_tx), - Node(block.delete_tx), Node(block.validate_tx, fraction_of_cores=1), Node(block.create, timeout=1), Node(block.write), + Node(block.delete_tx), ]) return block_pipeline diff --git a/tests/pipelines/test_block_creation.py b/tests/pipelines/test_block_creation.py index a41f65b5..a1ab6a19 100644 --- a/tests/pipelines/test_block_creation.py +++ b/tests/pipelines/test_block_creation.py @@ -74,22 +74,59 @@ def test_write_block(b, user_vk): assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc +def test_duplicate_transaction(b, user_vk): + block_maker = block.Block() + + txs = [] + for i in range(10): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + txs.append(tx) + + block_doc = b.create_block(txs) + block_maker.write(block_doc) + + # block is in bigchain + assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc + + b.write_transaction(txs[0]) + + # verify tx is in the backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None + + # try to validate a transaction that's already in the chain; should not + # work + assert block_maker.validate_tx(txs[0]) is None + + # duplicate tx should be removed from backlog + assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None + + def test_delete_tx(b, user_vk): block_maker = block.Block() - tx = b.create_transaction(b.me, user_vk, None, 'CREATE') - tx = b.sign_transaction(tx, b.me_private) - b.write_transaction(tx) + for i in range(100): + tx = b.create_transaction(b.me, user_vk, None, 'CREATE') + tx = b.sign_transaction(tx, b.me_private) + block_maker.create(tx) + # make sure the tx appears in the backlog + b.write_transaction(tx) - tx_backlog = r.table('backlog').get(tx['id']).run(b.conn) - tx_backlog.pop('assignee') - tx_backlog.pop('assignment_timestamp') - assert tx_backlog == tx + # force the output triggering a `timeout` + block_doc = block_maker.create(None, timeout=True) - returned_tx = block_maker.delete_tx(tx) + for tx in block_doc['block']['transactions']: + returned_tx = r.table('backlog').get(tx['id']).run(b.conn) + returned_tx.pop('assignee') + returned_tx.pop('assignment_timestamp') + assert returned_tx == tx - assert returned_tx == tx - assert r.table('backlog').get(tx['id']).run(b.conn) is None + returned_block = block_maker.delete_tx(block_doc) + + assert returned_block == block_doc + + for tx in block_doc['block']['transactions']: + assert r.table('backlog').get(tx['id']).run(b.conn) is None def test_prefeed(b, user_vk): From 7944e0cd98ffa57a84efd713ea79da4a1acaebe2 Mon Sep 17 00:00:00 2001 From: vrde Date: Tue, 20 Sep 2016 16:49:56 +0200 Subject: [PATCH 12/18] Allow temporary keypair if no conf file found Closes #482, closes #559 --- Dockerfile | 2 +- bigchaindb/commands/bigchain.py | 22 +++++++++- .../source/server-reference/bigchaindb-cli.md | 5 ++- tests/test_commands.py | 40 ++++++++++++++++++- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0fcac07f..089c3ffe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ ENV BIGCHAINDB_CONFIG_PATH /data/.bigchaindb ENV BIGCHAINDB_SERVER_BIND 0.0.0.0:9984 ENV BIGCHAINDB_API_ENDPOINT http://bigchaindb:9984/api/v1 -ENTRYPOINT ["bigchaindb", "--experimental-start-rethinkdb"] +ENTRYPOINT ["bigchaindb", "--dev-start-rethinkdb", "--dev-allow-temp-keypair"] CMD ["start"] diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 5edca2c4..55665fc1 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -157,8 +157,20 @@ def run_drop(args): def run_start(args): """Start the processes to run the node""" logger.info('BigchainDB Version {}'.format(bigchaindb.__version__)) + bigchaindb.config_utils.autoconfigure(filename=args.config, force=True) + if args.allow_temp_keypair: + if not (bigchaindb.config['keypair']['private'] or + bigchaindb.config['keypair']['public']): + + private_key, public_key = crypto.generate_key_pair() + bigchaindb.config['keypair']['private'] = private_key + bigchaindb.config['keypair']['public'] = public_key + else: + logger.warning('Keypair found, no need to create one on the fly.') + + if args.start_rethinkdb: try: proc = utils.start_rethinkdb() @@ -174,7 +186,8 @@ def run_start(args): sys.exit("Can't start BigchainDB, no keypair found. " 'Did you run `bigchaindb configure`?') - logger.info('Starting BigchainDB main process') + logger.info('Starting BigchainDB main process with public key %s', + bigchaindb.config['keypair']['public']) processes.start() @@ -238,11 +251,16 @@ def main(): description='Control your BigchainDB node.', parents=[utils.base_parser]) - parser.add_argument('--experimental-start-rethinkdb', + parser.add_argument('--dev-start-rethinkdb', dest='start_rethinkdb', action='store_true', help='Run RethinkDB on start') + parser.add_argument('--dev-allow-temp-keypair', + dest='allow_temp_keypair', + action='store_true', + help='Generate a random keypair on start') + # all the commands are contained in the subparsers object, # the command selected by the user will be stored in `args.command` # that is used by the `main` function to select which other diff --git a/docs/source/server-reference/bigchaindb-cli.md b/docs/source/server-reference/bigchaindb-cli.md index 15dd4475..3e11446a 100644 --- a/docs/source/server-reference/bigchaindb-cli.md +++ b/docs/source/server-reference/bigchaindb-cli.md @@ -58,8 +58,9 @@ Drop (erase) the RethinkDB database. You will be prompted to make sure. If you w ## bigchaindb start Start BigchainDB. It always begins by trying a `bigchaindb init` first. See the note in the documentation for `bigchaindb init`. -You can also use the `--experimental-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running, -e.g. `bigchaindb --experimental-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops. +You can also use the `--dev-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running, +e.g. `bigchaindb --dev-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops. +The option `--dev-allow-temp-keypair` will generate a keypair on the fly if no keypair is found, this is useful when you want to run a temporary instance of BigchainDB in a Docker container, for example. ## bigchaindb load diff --git a/tests/test_commands.py b/tests/test_commands.py index 587d38f8..215649f8 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -64,7 +64,7 @@ def mock_bigchaindb_backup_config(monkeypatch): def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_init_with_existing_db): from bigchaindb.commands.bigchain import run_start - args = Namespace(start_rethinkdb=False, config=None, yes=True) + args = Namespace(start_rethinkdb=False, allow_temp_keypair=False, config=None, yes=True) run_start(args) @@ -74,7 +74,7 @@ def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb, mock_processes_start, mock_db_init_with_existing_db): from bigchaindb.commands.bigchain import run_start - args = Namespace(start_rethinkdb=True, config=None, yes=True) + args = Namespace(start_rethinkdb=True, allow_temp_keypair=False, config=None, yes=True) run_start(args) mock_start_rethinkdb.assert_called_with() @@ -229,6 +229,42 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen): utils.start_rethinkdb() +@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key', + 'public_key')) +def test_allow_temp_keypair_generates_one_on_the_fly(mock_gen_keypair, + mock_processes_start, + mock_db_init_with_existing_db): + import bigchaindb + from bigchaindb.commands.bigchain import run_start + + bigchaindb.config['keypair'] = { 'private': None, 'public': None } + + args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True) + run_start(args) + + assert bigchaindb.config['keypair']['private'] == 'private_key' + assert bigchaindb.config['keypair']['public'] == 'public_key' + + +@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key', + 'public_key')) +def test_allow_temp_keypair_doesnt_override_if_keypair_found(mock_gen_keypair, + mock_processes_start, + mock_db_init_with_existing_db): + import bigchaindb + from bigchaindb.commands.bigchain import run_start + + # Preconditions for the test + assert isinstance(bigchaindb.config['keypair']['private'], str) + assert isinstance(bigchaindb.config['keypair']['public'], str) + + args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True) + run_start(args) + + assert bigchaindb.config['keypair']['private'] != 'private_key' + assert bigchaindb.config['keypair']['public'] != 'public_key' + + @patch('rethinkdb.ast.Table.reconfigure') def test_set_shards(mock_reconfigure, monkeypatch, b): from bigchaindb.commands.bigchain import run_set_shards From 9b709b7f98e94f2b65e62556f207791defd7bfa3 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 00:46:48 +0200 Subject: [PATCH 13/18] Add tests for argparse --- bigchaindb/commands/bigchain.py | 9 ++++----- bigchaindb/commands/utils.py | 8 ++++---- tests/test_commands.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/bigchaindb/commands/bigchain.py b/bigchaindb/commands/bigchain.py index 55665fc1..46622867 100644 --- a/bigchaindb/commands/bigchain.py +++ b/bigchaindb/commands/bigchain.py @@ -245,8 +245,7 @@ def run_set_replicas(args): except r.ReqlOpFailedError as e: logger.warn(e) - -def main(): +def create_parser(): parser = argparse.ArgumentParser( description='Control your BigchainDB node.', parents=[utils.base_parser]) @@ -325,8 +324,8 @@ def main(): 'is set, the count is distributed equally to all the ' 'processes') - utils.start(parser, globals()) + return parser -if __name__ == '__main__': - main() +def main(): + utils.start(create_parser(), sys.argv[1:], globals()) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index ac2c4d36..b53938d6 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -59,7 +59,7 @@ def start_rethinkdb(): raise StartupError(line) -def start(parser, scope): +def start(parser, argv, scope): """Utility function to execute a subcommand. The function will look up in the ``scope`` @@ -74,11 +74,11 @@ def start(parser, scope): NotImplementedError: if ``scope`` doesn't contain a function called ``run_``. """ - args = parser.parse_args() + args = parser.parse_args(argv) if not args.command: parser.print_help() - return + raise SystemExit() # look up in the current scope for a function called 'run_' # replacing all the dashes '-' with the lowercase character '_' @@ -96,7 +96,7 @@ def start(parser, scope): elif args.multiprocess is None: args.multiprocess = mp.cpu_count() - func(args) + return func(args) base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb') diff --git a/tests/test_commands.py b/tests/test_commands.py index 215649f8..6b7d5f53 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -62,6 +62,34 @@ def mock_bigchaindb_backup_config(monkeypatch): monkeypatch.setattr('bigchaindb._config', config) +def test_make_sure_we_dont_remove_any_command(): + # thanks to: http://stackoverflow.com/a/18161115/597097 + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + parser = create_parser() + + with pytest.raises(SystemExit): + utils.start(parser, [], {}) + + assert parser.parse_args(['configure']).command + assert parser.parse_args(['show-config']).command + assert parser.parse_args(['export-my-pubkey']).command + assert parser.parse_args(['init']).command + assert parser.parse_args(['drop']).command + assert parser.parse_args(['start']).command + assert parser.parse_args(['set-shards', '1']).command + assert parser.parse_args(['set-replicas', '1']).command + assert parser.parse_args(['load']).command + + +@patch('bigchaindb.commands.utils.start') +def test_main_entrypoint(mock_start): + from bigchaindb.commands.bigchain import main + main() + + assert mock_start.called + + def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_init_with_existing_db): from bigchaindb.commands.bigchain import run_start args = Namespace(start_rethinkdb=False, allow_temp_keypair=False, config=None, yes=True) From 5f603f52ef93b804cf42ef705cc15ea908631893 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 14:10:04 +0200 Subject: [PATCH 14/18] Add more test coverage for commands --- tests/test_commands.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/test_commands.py b/tests/test_commands.py index 6b7d5f53..41736770 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -66,6 +66,7 @@ def test_make_sure_we_dont_remove_any_command(): # thanks to: http://stackoverflow.com/a/18161115/597097 from bigchaindb.commands.bigchain import utils from bigchaindb.commands.bigchain import create_parser + parser = create_parser() with pytest.raises(SystemExit): @@ -82,6 +83,32 @@ def test_make_sure_we_dont_remove_any_command(): assert parser.parse_args(['load']).command +def test_start_raises_if_command_not_implemented(): + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + with pytest.raises(NotImplementedError): + # Will raise because `scope`, the third parameter, + # doesn't contain the function `run_configure` + utils.start(parser, ['configure'], {}) + + +@patch('multiprocessing.cpu_count', return_value=42) +def test_start_sets_multiprocess_var_based_on_cli_args(mock_cpu_count): + def run_load(args): + return args + + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + assert utils.start(parser, ['load'], {'run_load': run_load}).multiprocess == 1 + assert utils.start(parser, ['load', '--multiprocess'], {'run_load': run_load}).multiprocess == 42 + + @patch('bigchaindb.commands.utils.start') def test_main_entrypoint(mock_start): from bigchaindb.commands.bigchain import main From b2ac60ba3e808f608aacbd7625bb7592b79c4dd8 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 14:57:23 +0200 Subject: [PATCH 15/18] Add documentation for missing parameter --- bigchaindb/commands/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigchaindb/commands/utils.py b/bigchaindb/commands/utils.py index b53938d6..36b78653 100644 --- a/bigchaindb/commands/utils.py +++ b/bigchaindb/commands/utils.py @@ -68,6 +68,7 @@ def start(parser, argv, scope): Args: parser: an ArgumentParser instance. + argv: the list of command line arguments without the script name. scope (dict): map containing (eventually) the functions to be called. Raises: From dea2df9db0abac7d5a2e6c12bb98039a150be888 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 15:00:13 +0200 Subject: [PATCH 16/18] Separate test for empty args --- tests/test_commands.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 41736770..4e16441e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -64,14 +64,10 @@ def mock_bigchaindb_backup_config(monkeypatch): def test_make_sure_we_dont_remove_any_command(): # thanks to: http://stackoverflow.com/a/18161115/597097 - from bigchaindb.commands.bigchain import utils from bigchaindb.commands.bigchain import create_parser parser = create_parser() - with pytest.raises(SystemExit): - utils.start(parser, [], {}) - assert parser.parse_args(['configure']).command assert parser.parse_args(['show-config']).command assert parser.parse_args(['export-my-pubkey']).command @@ -95,6 +91,16 @@ def test_start_raises_if_command_not_implemented(): utils.start(parser, ['configure'], {}) +def test_start_raises_if_no_arguments_given(): + from bigchaindb.commands.bigchain import utils + from bigchaindb.commands.bigchain import create_parser + + parser = create_parser() + + with pytest.raises(SystemExit): + utils.start(parser, [], {}) + + @patch('multiprocessing.cpu_count', return_value=42) def test_start_sets_multiprocess_var_based_on_cli_args(mock_cpu_count): def run_load(args): From 989a943fea30d9b042bfe0a731b74aa2cc144594 Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 15:01:42 +0200 Subject: [PATCH 17/18] Move func definition after imports --- tests/test_commands.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 4e16441e..8ee79008 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -103,12 +103,12 @@ def test_start_raises_if_no_arguments_given(): @patch('multiprocessing.cpu_count', return_value=42) def test_start_sets_multiprocess_var_based_on_cli_args(mock_cpu_count): - def run_load(args): - return args - from bigchaindb.commands.bigchain import utils from bigchaindb.commands.bigchain import create_parser + def run_load(args): + return args + parser = create_parser() assert utils.start(parser, ['load'], {'run_load': run_load}).multiprocess == 1 From 27f585f39d26ad39e36743901a0ed2fc737e755a Mon Sep 17 00:00:00 2001 From: vrde Date: Wed, 21 Sep 2016 15:09:54 +0200 Subject: [PATCH 18/18] Improve test to check if original vals are kept --- tests/test_commands.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 8ee79008..4efa8f96 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -316,14 +316,17 @@ def test_allow_temp_keypair_doesnt_override_if_keypair_found(mock_gen_keypair, from bigchaindb.commands.bigchain import run_start # Preconditions for the test - assert isinstance(bigchaindb.config['keypair']['private'], str) - assert isinstance(bigchaindb.config['keypair']['public'], str) + original_private_key = bigchaindb.config['keypair']['private'] + original_public_key = bigchaindb.config['keypair']['public'] + + assert isinstance(original_public_key, str) + assert isinstance(original_private_key, str) args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True) run_start(args) - assert bigchaindb.config['keypair']['private'] != 'private_key' - assert bigchaindb.config['keypair']['public'] != 'public_key' + assert bigchaindb.config['keypair']['private'] == original_private_key + assert bigchaindb.config['keypair']['public'] == original_public_key @patch('rethinkdb.ast.Table.reconfigure')