Merge branch 'make-db-connection-more-robust'

This commit is contained in:
vrde 2016-09-19 17:20:05 +02:00
commit 616d170e9a
No known key found for this signature in database
GPG Key ID: 6581C7C39B3D397D
10 changed files with 277 additions and 79 deletions

View File

@ -18,6 +18,11 @@ def start_rethinkdb():
"""Start RethinkDB as a child process and wait for it to be
available.
Args:
wait_for_db (bool): wait for the database to be ready
extra_opts (list): a list of extra options to be used when
starting the db
Raises:
``bigchaindb.exceptions.StartupError`` if RethinkDB cannot
be started.
@ -33,11 +38,11 @@ def start_rethinkdb():
for line in proc.stdout:
if line.startswith('Server ready'):
# FIXME: seems like tables are not ready when the server is ready,
# that's why we need to query RethinkDB to know the state
# of the database. This code assumes the tables are ready
# when the database is ready. This seems a valid assumption.
try:
conn = db.get_conn()
# Before checking if the db is ready, we need to query
@ -47,7 +52,6 @@ def start_rethinkdb():
except (r.ReqlOpFailedError, r.ReqlDriverError) as exc:
raise StartupError('Error waiting for the database `{}` '
'to be ready'.format(dbname)) from exc
return proc
# We are here when we exhaust the stdout of the process.

View File

@ -9,6 +9,7 @@ import rethinkdb as r
import rapidjson
import bigchaindb
from bigchaindb.db.utils import Connection
from bigchaindb import config_utils, crypto, exceptions, util
@ -66,6 +67,7 @@ class Bigchain(object):
raise exceptions.KeypairNotFoundException()
self._conn = None
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
@property
def conn(self):
@ -141,7 +143,9 @@ class Bigchain(object):
signed_transaction.update({'assignment_timestamp': time()})
# write to the backlog
response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn)
response = self.connection.run(
r.table('backlog')
.insert(signed_transaction, durability=durability))
return response
def reassign_transaction(self, transaction, durability='hard'):
@ -224,13 +228,16 @@ class Bigchain(object):
break
# Query the transaction in the target block and return
response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\
.get_field('block').get_field('transactions')\
.filter(lambda tx: tx['id'] == txid).run(self.conn)[0]
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(target_block_id)
.get_field('block')
.get_field('transactions')
.filter(lambda tx: tx['id'] == txid))[0]
else:
# Otherwise, check the backlog
response = r.table('backlog').get(txid).run(self.conn)
response = self.connection.run(r.table('backlog').get(txid))
if response:
tx_status = self.TX_IN_BACKLOG
@ -264,8 +271,10 @@ class Bigchain(object):
A list of blocks with with only election information
"""
# First, get information on all blocks which contain this transaction
response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\
.pluck('votes', 'id', {'block': ['voters']}).run(self.conn)
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(value, index=index)
.pluck('votes', 'id', {'block': ['voters']}))
return list(response)
@ -318,11 +327,11 @@ class Bigchain(object):
A list of transactions containing that payload. If no transaction exists with that payload it
returns an empty list `[]`
"""
cursor = r.table('bigchain', read_mode=self.read_mode) \
.get_all(payload_uuid, index='payload_uuid') \
.concat_map(lambda block: block['block']['transactions']) \
.filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \
.run(self.conn)
cursor = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get_all(payload_uuid, index='payload_uuid')
.concat_map(lambda block: block['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid))
transactions = list(cursor)
return transactions
@ -341,11 +350,11 @@ class Bigchain(object):
"""
# checks if an input was already spent
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
response = r.table('bigchain', read_mode=self.read_mode)\
.concat_map(lambda doc: doc['block']['transactions'])\
.filter(lambda transaction: transaction['transaction']['fulfillments']
.contains(lambda fulfillment: fulfillment['input'] == tx_input))\
.run(self.conn)
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda transaction: transaction['transaction']['fulfillments']
.contains(lambda fulfillment: fulfillment['input'] == tx_input)))
transactions = list(response)
@ -380,12 +389,12 @@ class Bigchain(object):
"""
# get all transactions in which owner is in the `owners_after` list
response = r.table('bigchain', read_mode=self.read_mode) \
.concat_map(lambda doc: doc['block']['transactions']) \
.filter(lambda tx: tx['transaction']['conditions']
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.concat_map(lambda doc: doc['block']['transactions'])
.filter(lambda tx: tx['transaction']['conditions']
.contains(lambda c: c['owners_after']
.contains(owner))) \
.run(self.conn)
.contains(owner))))
owned = []
for tx in response:
@ -529,8 +538,9 @@ class Bigchain(object):
but the vote is invalid.
"""
votes = list(r.table('votes', read_mode=self.read_mode)\
.get_all([block['id'], self.me], index='block_and_voter').run(self.conn))
votes = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], self.me], index='block_and_voter')))
if len(votes) > 1:
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
@ -571,11 +581,14 @@ class Bigchain(object):
"""
block_serialized = rapidjson.dumps(block)
r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn)
self.connection.run(
r.table('bigchain')
.insert(r.json(block_serialized), durability=durability))
def transaction_exists(self, transaction_id):
response = r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id').run(self.conn)
response = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)\
.get_all(transaction_id, index='transaction_id'))
return len(response.items) > 0
def prepare_genesis_block(self):
@ -602,7 +615,9 @@ class Bigchain(object):
# 2. create the block with one transaction
# 3. write the block to the bigchain
blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn)
blocks_count = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.count())
if blocks_count:
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
@ -648,30 +663,30 @@ class Bigchain(object):
def write_vote(self, vote):
"""Write the vote to the database."""
r.table('votes') \
.insert(vote) \
.run(self.conn)
self.connection.run(
r.table('votes')
.insert(vote))
def get_last_voted_block(self):
"""Returns the last block that this node voted on."""
try:
# get the latest value for the vote timestamp (over all votes)
max_timestamp = r.table('votes', read_mode=self.read_mode) \
.filter(r.row['node_pubkey'] == self.me) \
.max(r.row['vote']['timestamp']) \
.run(self.conn)['vote']['timestamp']
max_timestamp = self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['node_pubkey'] == self.me)
.max(r.row['vote']['timestamp']))['vote']['timestamp']
last_voted = list(r.table('votes', read_mode=self.read_mode) \
.filter(r.row['vote']['timestamp'] == max_timestamp) \
.filter(r.row['node_pubkey'] == self.me) \
.run(self.conn))
last_voted = list(self.connection.run(
r.table('votes', read_mode=self.read_mode)
.filter(r.row['vote']['timestamp'] == max_timestamp)
.filter(r.row['node_pubkey'] == self.me)))
except r.ReqlNonExistenceError:
# return last vote if last vote exists else return Genesis block
return list(r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)
.run(self.conn))[0]
return list(self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(util.is_genesis_block)))[0]
# Now the fun starts. Since the resolution of timestamp is a second,
# we might have more than one vote per timestamp. If this is the case
@ -703,19 +718,21 @@ class Bigchain(object):
except KeyError:
break
res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn)
res = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.get(last_block_id))
return res
def get_unvoted_blocks(self):
"""Return all the blocks that has not been voted by this node."""
unvoted = r.table('bigchain', read_mode=self.read_mode) \
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
unvoted = self.connection.run(
r.table('bigchain', read_mode=self.read_mode)
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
.get_all([block['id'], self.me], index='block_and_voter')
.is_empty()) \
.order_by(r.asc(r.row['block']['timestamp'])) \
.run(self.conn)
.is_empty())
.order_by(r.asc(r.row['block']['timestamp'])))
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
# database level. Solving issue #444 can help untangling the situation
@ -726,9 +743,8 @@ class Bigchain(object):
def block_election_status(self, block):
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
votes = r.table('votes', read_mode=self.read_mode) \
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \
.run(self.conn)
votes = self.connection.run(r.table('votes', read_mode=self.read_mode)
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter'))
votes = list(votes)

View File

@ -1,5 +1,6 @@
"""Utils to initialize and drop the database."""
import time
import logging
import rethinkdb as r
@ -11,6 +12,61 @@ from bigchaindb import exceptions
logger = logging.getLogger(__name__)
class Connection:
"""This class is a proxy to run queries against the database,
it is:
- lazy, since it creates a connection only when needed
- resilient, because before raising exceptions it tries
more times to run the query or open a connection.
"""
def __init__(self, host=None, port=None, db=None, max_tries=3):
"""Create a new Connection instance.
Args:
host (str, optional): the host to connect to.
port (int, optional): the port to connect to.
db (str, optional): the database to use.
max_tries (int, optional): how many tries before giving up.
"""
self.host = host or bigchaindb.config['database']['host']
self.port = port or bigchaindb.config['database']['port']
self.db = db or bigchaindb.config['database']['name']
self.max_tries = max_tries
self.conn = None
def run(self, query):
"""Run a query.
Args:
query: the RethinkDB query.
"""
if self.conn is None:
self._connect()
for i in range(self.max_tries):
try:
return query.run(self.conn)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
self._connect()
def _connect(self):
for i in range(self.max_tries):
try:
self.conn = r.connect(host=self.host, port=self.port,
db=self.db)
except r.ReqlDriverError as exc:
if i + 1 == self.max_tries:
raise
else:
time.sleep(2**i)
def get_conn():
'''Get the connection to the database.'''

View File

@ -144,12 +144,14 @@ def initial():
b = Bigchain()
rs = r.table('backlog')\
.between([b.me, r.minval],
[b.me, r.maxval],
index='assignee__transaction_timestamp')\
.order_by(index=r.asc('assignee__transaction_timestamp'))\
.run(b.conn)
rs = b.connection.run(
r.table('backlog')
.between(
[b.me, r.minval],
[b.me, r.maxval],
index='assignee__transaction_timestamp')
.order_by(index=r.asc('assignee__transaction_timestamp')))
return rs

View File

@ -25,9 +25,10 @@ class Election:
"""
Checks if block has enough invalid votes to make a decision
"""
next_block = r.table('bigchain')\
.get(next_vote['vote']['voting_for_block'])\
.run(self.bigchain.conn)
next_block = self.bigchain.connection.run(
r.table('bigchain')
.get(next_vote['vote']['voting_for_block']))
if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID:
return next_block

View File

@ -1,12 +1,17 @@
"""Utility classes and functions to work with the pipelines."""
import time
import rethinkdb as r
import logging
from multipipes import Node
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
class ChangeFeed(Node):
"""This class wraps a RethinkDB changefeed adding a `prefeed`.
@ -24,7 +29,7 @@ class ChangeFeed(Node):
DELETE = 2
UPDATE = 4
def __init__(self, table, operation, prefeed=None):
def __init__(self, table, operation, prefeed=None, bigchain=None):
"""Create a new RethinkDB ChangeFeed.
Args:
@ -35,20 +40,29 @@ class ChangeFeed(Node):
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
prefeed (iterable): whatever set of data you want to be published
first.
bigchain (``Bigchain``): the bigchain instance to use (can be None).
"""
super().__init__(name='changefeed')
self.prefeed = prefeed if prefeed else []
self.table = table
self.operation = operation
self.bigchain = Bigchain()
self.bigchain = bigchain or Bigchain()
def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)
for change in r.table(self.table).changes().run(self.bigchain.conn):
while True:
try:
self.run_changefeed()
break
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
logger.exception(exc)
time.sleep(1)
def run_changefeed(self):
for change in self.bigchain.connection.run(r.table(self.table).changes()):
is_insert = change['old_val'] is None
is_delete = change['new_val'] is None
is_update = not is_insert and not is_delete

View File

@ -4,7 +4,6 @@ import pytest
import rethinkdb as r
import bigchaindb
from bigchaindb import util
from bigchaindb.db import utils
from .conftest import setup_database as _setup_database

View File

@ -1,8 +1,7 @@
from unittest.mock import patch
import rethinkdb
from multipipes import Pipe
from bigchaindb.db.utils import Connection
from bigchaindb.pipelines.utils import ChangeFeed
@ -18,7 +17,7 @@ MOCK_CHANGEFEED_DATA = [{
}]
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_insert(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT)
@ -28,7 +27,7 @@ def test_changefeed_insert(mock_run):
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_delete(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE)
@ -38,7 +37,7 @@ def test_changefeed_delete(mock_run):
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_update(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
@ -48,7 +47,7 @@ def test_changefeed_update(mock_run):
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_multiple_operations(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
@ -59,7 +58,7 @@ def test_changefeed_multiple_operations(mock_run):
assert outpipe.qsize() == 0
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
def test_changefeed_prefeed(mock_run):
outpipe = Pipe()
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3])

View File

@ -68,7 +68,7 @@ def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_in
run_start(args)
@patch('bigchaindb.commands.utils.start_rethinkdb')
@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=Mock())
def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb,
mock_run_configure,
mock_processes_start,
@ -214,7 +214,9 @@ def test_run_configure_when_config_does_exist(monkeypatch,
@patch('subprocess.Popen')
def test_start_rethinkdb_returns_a_process_when_successful(mock_popen):
from bigchaindb.commands import utils
mock_popen.return_value = Mock(stdout=['Server ready'])
mock_popen.return_value = Mock(stdout=[
'Listening for client driver 1234',
'Server ready'])
assert utils.start_rethinkdb() is mock_popen.return_value

View File

@ -0,0 +1,105 @@
from threading import Thread
import pytest
import rethinkdb as r
from bigchaindb.db.utils import Connection
def test_run_a_simple_query():
conn = Connection()
query = r.expr('1')
assert conn.run(query) == '1'
def test_raise_exception_when_max_tries():
class MockQuery:
def run(self, conn):
raise r.ReqlDriverError('mock')
conn = Connection()
with pytest.raises(r.ReqlDriverError):
conn.run(MockQuery())
def test_reconnect_when_connection_lost():
import time
import rethinkdb as r
def raise_exception(*args, **kwargs):
raise r.ReqlDriverError('mock')
conn = Connection()
original_connect = r.connect
r.connect = raise_exception
def delayed_start():
time.sleep(1)
r.connect = original_connect
thread = Thread(target=delayed_start)
query = r.expr('1')
thread.start()
assert conn.run(query) == '1'
def test_changefeed_reconnects_when_connection_lost(monkeypatch):
import time
import multiprocessing as mp
from bigchaindb import Bigchain
from bigchaindb.pipelines.utils import ChangeFeed
class MockConnection:
tries = 0
def run(self, *args, **kwargs):
return self
def __iter__(self):
return self
def __next__(self):
self.tries += 1
if self.tries == 1:
raise r.ReqlDriverError('mock')
elif self.tries == 2:
return { 'new_val': { 'fact': 'A group of cats is called a clowder.' },
'old_val': None }
if self.tries == 3:
raise r.ReqlDriverError('mock')
elif self.tries == 4:
return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' },
'old_val': None }
else:
time.sleep(10)
bigchain = Bigchain()
bigchain.connection = MockConnection()
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
bigchain=bigchain)
changefeed.outqueue = mp.Queue()
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)
t_changefeed.start()
time.sleep(1)
# try 1: MockConnection raises an error that will stop the
# ChangeFeed instance from iterating for 1 second.
# try 2: MockConnection releases a new record. The new record
# will be put in the outqueue of the ChangeFeed instance.
fact = changefeed.outqueue.get()['fact']
assert fact == 'A group of cats is called a clowder.'
# try 3: MockConnection raises an error that will stop the
# ChangeFeed instance from iterating for 1 second.
assert t_changefeed.is_alive() is True
time.sleep(2)
# try 4: MockConnection releases a new record. The new record
# will be put in the outqueue of the ChangeFeed instance.
fact = changefeed.outqueue.get()['fact']
assert fact == 'Cats sleep 70% of their lives.'