mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Merge remote-tracking branch 'origin/master' into link-new-driver-docs
This commit is contained in:
commit
5879cdb97f
@ -19,7 +19,7 @@ ENV BIGCHAINDB_CONFIG_PATH /data/.bigchaindb
|
||||
ENV BIGCHAINDB_SERVER_BIND 0.0.0.0:9984
|
||||
ENV BIGCHAINDB_API_ENDPOINT http://bigchaindb:9984/api/v1
|
||||
|
||||
ENTRYPOINT ["bigchaindb", "--experimental-start-rethinkdb"]
|
||||
ENTRYPOINT ["bigchaindb", "--dev-start-rethinkdb", "--dev-allow-temp-keypair"]
|
||||
|
||||
CMD ["start"]
|
||||
|
||||
|
@ -31,6 +31,7 @@ config = {
|
||||
},
|
||||
'api_endpoint': 'http://localhost:9984/api/v1',
|
||||
'consensus_plugin': 'default',
|
||||
'backlog_reassign_delay': 30
|
||||
}
|
||||
|
||||
# We need to maintain a backup copy of the original config dict in case
|
||||
|
@ -106,6 +106,11 @@ def run_configure(args, skip_if_exists=False):
|
||||
input('Statsd {}? (default `{}`): '.format(key, val)) \
|
||||
or val
|
||||
|
||||
val = conf['backlog_reassign_delay']
|
||||
conf['backlog_reassign_delay'] = \
|
||||
input('Stale transaction reassignment delay (in seconds)? (default `{}`): '.format(val)) \
|
||||
or val
|
||||
|
||||
if config_path != '-':
|
||||
bigchaindb.config_utils.write_config(conf, config_path)
|
||||
else:
|
||||
@ -152,8 +157,20 @@ def run_drop(args):
|
||||
def run_start(args):
|
||||
"""Start the processes to run the node"""
|
||||
logger.info('BigchainDB Version {}'.format(bigchaindb.__version__))
|
||||
|
||||
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
|
||||
|
||||
if args.allow_temp_keypair:
|
||||
if not (bigchaindb.config['keypair']['private'] or
|
||||
bigchaindb.config['keypair']['public']):
|
||||
|
||||
private_key, public_key = crypto.generate_key_pair()
|
||||
bigchaindb.config['keypair']['private'] = private_key
|
||||
bigchaindb.config['keypair']['public'] = public_key
|
||||
else:
|
||||
logger.warning('Keypair found, no need to create one on the fly.')
|
||||
|
||||
|
||||
if args.start_rethinkdb:
|
||||
try:
|
||||
proc = utils.start_rethinkdb()
|
||||
@ -169,7 +186,8 @@ def run_start(args):
|
||||
sys.exit("Can't start BigchainDB, no keypair found. "
|
||||
'Did you run `bigchaindb configure`?')
|
||||
|
||||
logger.info('Starting BigchainDB main process')
|
||||
logger.info('Starting BigchainDB main process with public key %s',
|
||||
bigchaindb.config['keypair']['public'])
|
||||
processes.start()
|
||||
|
||||
|
||||
@ -227,17 +245,21 @@ def run_set_replicas(args):
|
||||
except r.ReqlOpFailedError as e:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
def main():
|
||||
def create_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Control your BigchainDB node.',
|
||||
parents=[utils.base_parser])
|
||||
|
||||
parser.add_argument('--experimental-start-rethinkdb',
|
||||
parser.add_argument('--dev-start-rethinkdb',
|
||||
dest='start_rethinkdb',
|
||||
action='store_true',
|
||||
help='Run RethinkDB on start')
|
||||
|
||||
parser.add_argument('--dev-allow-temp-keypair',
|
||||
dest='allow_temp_keypair',
|
||||
action='store_true',
|
||||
help='Generate a random keypair on start')
|
||||
|
||||
# all the commands are contained in the subparsers object,
|
||||
# the command selected by the user will be stored in `args.command`
|
||||
# that is used by the `main` function to select which other
|
||||
@ -302,8 +324,8 @@ def main():
|
||||
'is set, the count is distributed equally to all the '
|
||||
'processes')
|
||||
|
||||
utils.start(parser, globals())
|
||||
return parser
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
def main():
|
||||
utils.start(create_parser(), sys.argv[1:], globals())
|
||||
|
@ -18,6 +18,11 @@ def start_rethinkdb():
|
||||
"""Start RethinkDB as a child process and wait for it to be
|
||||
available.
|
||||
|
||||
Args:
|
||||
wait_for_db (bool): wait for the database to be ready
|
||||
extra_opts (list): a list of extra options to be used when
|
||||
starting the db
|
||||
|
||||
Raises:
|
||||
``bigchaindb.exceptions.StartupError`` if RethinkDB cannot
|
||||
be started.
|
||||
@ -33,11 +38,11 @@ def start_rethinkdb():
|
||||
|
||||
for line in proc.stdout:
|
||||
if line.startswith('Server ready'):
|
||||
|
||||
# FIXME: seems like tables are not ready when the server is ready,
|
||||
# that's why we need to query RethinkDB to know the state
|
||||
# of the database. This code assumes the tables are ready
|
||||
# when the database is ready. This seems a valid assumption.
|
||||
|
||||
try:
|
||||
conn = db.get_conn()
|
||||
# Before checking if the db is ready, we need to query
|
||||
@ -47,7 +52,6 @@ def start_rethinkdb():
|
||||
except (r.ReqlOpFailedError, r.ReqlDriverError) as exc:
|
||||
raise StartupError('Error waiting for the database `{}` '
|
||||
'to be ready'.format(dbname)) from exc
|
||||
|
||||
return proc
|
||||
|
||||
# We are here when we exhaust the stdout of the process.
|
||||
@ -55,7 +59,7 @@ def start_rethinkdb():
|
||||
raise StartupError(line)
|
||||
|
||||
|
||||
def start(parser, scope):
|
||||
def start(parser, argv, scope):
|
||||
"""Utility function to execute a subcommand.
|
||||
|
||||
The function will look up in the ``scope``
|
||||
@ -64,17 +68,18 @@ def start(parser, scope):
|
||||
|
||||
Args:
|
||||
parser: an ArgumentParser instance.
|
||||
argv: the list of command line arguments without the script name.
|
||||
scope (dict): map containing (eventually) the functions to be called.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: if ``scope`` doesn't contain a function called
|
||||
``run_<parser.args.command>``.
|
||||
"""
|
||||
args = parser.parse_args()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
return
|
||||
raise SystemExit()
|
||||
|
||||
# look up in the current scope for a function called 'run_<command>'
|
||||
# replacing all the dashes '-' with the lowercase character '_'
|
||||
@ -92,7 +97,7 @@ def start(parser, scope):
|
||||
elif args.multiprocess is None:
|
||||
args.multiprocess = mp.cpu_count()
|
||||
|
||||
func(args)
|
||||
return func(args)
|
||||
|
||||
|
||||
base_parser = argparse.ArgumentParser(add_help=False, prog='bigchaindb')
|
||||
|
@ -2,12 +2,14 @@ import random
|
||||
import math
|
||||
import collections
|
||||
from copy import deepcopy
|
||||
from time import time
|
||||
|
||||
from itertools import compress
|
||||
import rethinkdb as r
|
||||
import rapidjson
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.db.utils import Connection
|
||||
from bigchaindb import config_utils, crypto, exceptions, util
|
||||
|
||||
|
||||
@ -28,7 +30,7 @@ class Bigchain(object):
|
||||
|
||||
def __init__(self, host=None, port=None, dbname=None,
|
||||
public_key=None, private_key=None, keyring=[],
|
||||
consensus_plugin=None):
|
||||
consensus_plugin=None, backlog_reassign_delay=None):
|
||||
"""Initialize the Bigchain instance
|
||||
|
||||
A Bigchain instance has several configuration parameters (e.g. host).
|
||||
@ -56,6 +58,7 @@ class Bigchain(object):
|
||||
self.me = public_key or bigchaindb.config['keypair']['public']
|
||||
self.me_private = private_key or bigchaindb.config['keypair']['private']
|
||||
self.nodes_except_me = keyring or bigchaindb.config['keyring']
|
||||
self.backlog_reassign_delay = backlog_reassign_delay or bigchaindb.config['backlog_reassign_delay']
|
||||
self.consensus = config_utils.load_consensus_plugin(consensus_plugin)
|
||||
# change RethinkDB read mode to majority. This ensures consistency in query results
|
||||
self.read_mode = 'majority'
|
||||
@ -64,6 +67,7 @@ class Bigchain(object):
|
||||
raise exceptions.KeypairNotFoundException()
|
||||
|
||||
self._conn = None
|
||||
self.connection = Connection(host=self.host, port=self.port, db=self.dbname)
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
@ -136,11 +140,56 @@ class Bigchain(object):
|
||||
signed_transaction = deepcopy(signed_transaction)
|
||||
# update the transaction
|
||||
signed_transaction.update({'assignee': assignee})
|
||||
signed_transaction.update({'assignment_timestamp': time()})
|
||||
|
||||
# write to the backlog
|
||||
response = r.table('backlog').insert(signed_transaction, durability=durability).run(self.conn)
|
||||
response = self.connection.run(
|
||||
r.table('backlog')
|
||||
.insert(signed_transaction, durability=durability))
|
||||
return response
|
||||
|
||||
def reassign_transaction(self, transaction, durability='hard'):
|
||||
"""Assign a transaction to a new node
|
||||
|
||||
Args:
|
||||
transaction (dict): assigned transaction
|
||||
|
||||
Returns:
|
||||
dict: database response or None if no reassignment is possible
|
||||
"""
|
||||
|
||||
if self.nodes_except_me:
|
||||
try:
|
||||
federation_nodes = self.nodes_except_me + [self.me]
|
||||
index_current_assignee = federation_nodes.index(transaction['assignee'])
|
||||
new_assignee = random.choice(federation_nodes[:index_current_assignee] +
|
||||
federation_nodes[index_current_assignee + 1:])
|
||||
except ValueError:
|
||||
# current assignee not in federation
|
||||
new_assignee = random.choice(self.nodes_except_me)
|
||||
|
||||
else:
|
||||
# There is no other node to assign to
|
||||
new_assignee = self.me
|
||||
|
||||
response = r.table('backlog')\
|
||||
.get(transaction['id'])\
|
||||
.update({'assignee': new_assignee,
|
||||
'assignment_timestamp': time()},
|
||||
durability=durability).run(self.conn)
|
||||
return response
|
||||
|
||||
def get_stale_transactions(self):
|
||||
"""Get a RethinkDB cursor of stale transactions
|
||||
|
||||
Transactions are considered stale if they have been assigned a node, but are still in the
|
||||
backlog after some amount of time specified in the configuration
|
||||
"""
|
||||
|
||||
return r.table('backlog')\
|
||||
.filter(lambda tx: time() - tx['assignment_timestamp'] >
|
||||
self.backlog_reassign_delay).run(self.conn)
|
||||
|
||||
def get_transaction(self, txid, include_status=False):
|
||||
"""Retrieve a transaction with `txid` from bigchain.
|
||||
|
||||
@ -179,13 +228,16 @@ class Bigchain(object):
|
||||
break
|
||||
|
||||
# Query the transaction in the target block and return
|
||||
response = r.table('bigchain', read_mode=self.read_mode).get(target_block_id)\
|
||||
.get_field('block').get_field('transactions')\
|
||||
.filter(lambda tx: tx['id'] == txid).run(self.conn)[0]
|
||||
response = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.get(target_block_id)
|
||||
.get_field('block')
|
||||
.get_field('transactions')
|
||||
.filter(lambda tx: tx['id'] == txid))[0]
|
||||
|
||||
else:
|
||||
# Otherwise, check the backlog
|
||||
response = r.table('backlog').get(txid).run(self.conn)
|
||||
response = self.connection.run(r.table('backlog').get(txid))
|
||||
if response:
|
||||
tx_status = self.TX_IN_BACKLOG
|
||||
|
||||
@ -219,8 +271,10 @@ class Bigchain(object):
|
||||
A list of blocks with with only election information
|
||||
"""
|
||||
# First, get information on all blocks which contain this transaction
|
||||
response = r.table('bigchain', read_mode=self.read_mode).get_all(value, index=index)\
|
||||
.pluck('votes', 'id', {'block': ['voters']}).run(self.conn)
|
||||
response = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.get_all(value, index=index)
|
||||
.pluck('votes', 'id', {'block': ['voters']}))
|
||||
|
||||
return list(response)
|
||||
|
||||
@ -262,8 +316,8 @@ class Bigchain(object):
|
||||
When creating a transaction one of the optional arguments is the `payload`. The payload is a generic
|
||||
dict that contains information about the digital asset.
|
||||
|
||||
To make it easy to query the bigchain for that digital asset we create a UUID for the payload and
|
||||
store it with the transaction. This makes it easy for developers to keep track of their digital
|
||||
To make it easy to query the bigchain for that digital asset we create a UUID for the payload and
|
||||
store it with the transaction. This makes it easy for developers to keep track of their digital
|
||||
assets in bigchain.
|
||||
|
||||
Args:
|
||||
@ -273,11 +327,11 @@ class Bigchain(object):
|
||||
A list of transactions containing that payload. If no transaction exists with that payload it
|
||||
returns an empty list `[]`
|
||||
"""
|
||||
cursor = r.table('bigchain', read_mode=self.read_mode) \
|
||||
.get_all(payload_uuid, index='payload_uuid') \
|
||||
.concat_map(lambda block: block['block']['transactions']) \
|
||||
.filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid) \
|
||||
.run(self.conn)
|
||||
cursor = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.get_all(payload_uuid, index='payload_uuid')
|
||||
.concat_map(lambda block: block['block']['transactions'])
|
||||
.filter(lambda transaction: transaction['transaction']['data']['uuid'] == payload_uuid))
|
||||
|
||||
transactions = list(cursor)
|
||||
return transactions
|
||||
@ -296,11 +350,11 @@ class Bigchain(object):
|
||||
"""
|
||||
# checks if an input was already spent
|
||||
# checks if the bigchain has any transaction with input {'txid': ..., 'cid': ...}
|
||||
response = r.table('bigchain', read_mode=self.read_mode)\
|
||||
.concat_map(lambda doc: doc['block']['transactions'])\
|
||||
.filter(lambda transaction: transaction['transaction']['fulfillments']
|
||||
.contains(lambda fulfillment: fulfillment['input'] == tx_input))\
|
||||
.run(self.conn)
|
||||
response = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.concat_map(lambda doc: doc['block']['transactions'])
|
||||
.filter(lambda transaction: transaction['transaction']['fulfillments']
|
||||
.contains(lambda fulfillment: fulfillment['input'] == tx_input)))
|
||||
|
||||
transactions = list(response)
|
||||
|
||||
@ -335,12 +389,12 @@ class Bigchain(object):
|
||||
"""
|
||||
|
||||
# get all transactions in which owner is in the `owners_after` list
|
||||
response = r.table('bigchain', read_mode=self.read_mode) \
|
||||
.concat_map(lambda doc: doc['block']['transactions']) \
|
||||
.filter(lambda tx: tx['transaction']['conditions']
|
||||
response = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.concat_map(lambda doc: doc['block']['transactions'])
|
||||
.filter(lambda tx: tx['transaction']['conditions']
|
||||
.contains(lambda c: c['owners_after']
|
||||
.contains(owner))) \
|
||||
.run(self.conn)
|
||||
.contains(owner))))
|
||||
owned = []
|
||||
|
||||
for tx in response:
|
||||
@ -484,8 +538,9 @@ class Bigchain(object):
|
||||
but the vote is invalid.
|
||||
|
||||
"""
|
||||
votes = list(r.table('votes', read_mode=self.read_mode)\
|
||||
.get_all([block['id'], self.me], index='block_and_voter').run(self.conn))
|
||||
votes = list(self.connection.run(
|
||||
r.table('votes', read_mode=self.read_mode)
|
||||
.get_all([block['id'], self.me], index='block_and_voter')))
|
||||
|
||||
if len(votes) > 1:
|
||||
raise exceptions.MultipleVotesError('Block {block_id} has {n_votes} votes from public key {me}'
|
||||
@ -526,13 +581,15 @@ class Bigchain(object):
|
||||
"""
|
||||
|
||||
block_serialized = rapidjson.dumps(block)
|
||||
r.table('bigchain').insert(r.json(block_serialized), durability=durability).run(self.conn)
|
||||
self.connection.run(
|
||||
r.table('bigchain')
|
||||
.insert(r.json(block_serialized), durability=durability))
|
||||
|
||||
# TODO: Decide if we need this method
|
||||
def transaction_exists(self, transaction_id):
|
||||
response = r.table('bigchain', read_mode=self.read_mode)\
|
||||
.get_all(transaction_id, index='transaction_id').run(self.conn)
|
||||
return True if len(response.items) > 0 else False
|
||||
response = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)\
|
||||
.get_all(transaction_id, index='transaction_id'))
|
||||
return len(response.items) > 0
|
||||
|
||||
def prepare_genesis_block(self):
|
||||
"""Prepare a genesis block."""
|
||||
@ -558,7 +615,9 @@ class Bigchain(object):
|
||||
# 2. create the block with one transaction
|
||||
# 3. write the block to the bigchain
|
||||
|
||||
blocks_count = r.table('bigchain', read_mode=self.read_mode).count().run(self.conn)
|
||||
blocks_count = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.count())
|
||||
|
||||
if blocks_count:
|
||||
raise exceptions.GenesisBlockAlreadyExistsError('Cannot create the Genesis block')
|
||||
@ -604,30 +663,30 @@ class Bigchain(object):
|
||||
def write_vote(self, vote):
|
||||
"""Write the vote to the database."""
|
||||
|
||||
r.table('votes') \
|
||||
.insert(vote) \
|
||||
.run(self.conn)
|
||||
self.connection.run(
|
||||
r.table('votes')
|
||||
.insert(vote))
|
||||
|
||||
def get_last_voted_block(self):
|
||||
"""Returns the last block that this node voted on."""
|
||||
|
||||
try:
|
||||
# get the latest value for the vote timestamp (over all votes)
|
||||
max_timestamp = r.table('votes', read_mode=self.read_mode) \
|
||||
.filter(r.row['node_pubkey'] == self.me) \
|
||||
.max(r.row['vote']['timestamp']) \
|
||||
.run(self.conn)['vote']['timestamp']
|
||||
max_timestamp = self.connection.run(
|
||||
r.table('votes', read_mode=self.read_mode)
|
||||
.filter(r.row['node_pubkey'] == self.me)
|
||||
.max(r.row['vote']['timestamp']))['vote']['timestamp']
|
||||
|
||||
last_voted = list(r.table('votes', read_mode=self.read_mode) \
|
||||
.filter(r.row['vote']['timestamp'] == max_timestamp) \
|
||||
.filter(r.row['node_pubkey'] == self.me) \
|
||||
.run(self.conn))
|
||||
last_voted = list(self.connection.run(
|
||||
r.table('votes', read_mode=self.read_mode)
|
||||
.filter(r.row['vote']['timestamp'] == max_timestamp)
|
||||
.filter(r.row['node_pubkey'] == self.me)))
|
||||
|
||||
except r.ReqlNonExistenceError:
|
||||
# return last vote if last vote exists else return Genesis block
|
||||
return list(r.table('bigchain', read_mode=self.read_mode)
|
||||
.filter(util.is_genesis_block)
|
||||
.run(self.conn))[0]
|
||||
return list(self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.filter(util.is_genesis_block)))[0]
|
||||
|
||||
# Now the fun starts. Since the resolution of timestamp is a second,
|
||||
# we might have more than one vote per timestamp. If this is the case
|
||||
@ -659,19 +718,21 @@ class Bigchain(object):
|
||||
except KeyError:
|
||||
break
|
||||
|
||||
res = r.table('bigchain', read_mode=self.read_mode).get(last_block_id).run(self.conn)
|
||||
res = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.get(last_block_id))
|
||||
|
||||
return res
|
||||
|
||||
def get_unvoted_blocks(self):
|
||||
"""Return all the blocks that has not been voted by this node."""
|
||||
|
||||
unvoted = r.table('bigchain', read_mode=self.read_mode) \
|
||||
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
|
||||
unvoted = self.connection.run(
|
||||
r.table('bigchain', read_mode=self.read_mode)
|
||||
.filter(lambda block: r.table('votes', read_mode=self.read_mode)
|
||||
.get_all([block['id'], self.me], index='block_and_voter')
|
||||
.is_empty()) \
|
||||
.order_by(r.asc(r.row['block']['timestamp'])) \
|
||||
.run(self.conn)
|
||||
.is_empty())
|
||||
.order_by(r.asc(r.row['block']['timestamp'])))
|
||||
|
||||
# FIXME: I (@vrde) don't like this solution. Filtering should be done at a
|
||||
# database level. Solving issue #444 can help untangling the situation
|
||||
@ -682,9 +743,8 @@ class Bigchain(object):
|
||||
def block_election_status(self, block):
|
||||
"""Tally the votes on a block, and return the status: valid, invalid, or undecided."""
|
||||
|
||||
votes = r.table('votes', read_mode=self.read_mode) \
|
||||
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter') \
|
||||
.run(self.conn)
|
||||
votes = self.connection.run(r.table('votes', read_mode=self.read_mode)
|
||||
.between([block['id'], r.minval], [block['id'], r.maxval], index='block_and_voter'))
|
||||
|
||||
votes = list(votes)
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Utils to initialize and drop the database."""
|
||||
|
||||
import time
|
||||
import logging
|
||||
|
||||
import rethinkdb as r
|
||||
@ -11,6 +12,61 @@ from bigchaindb import exceptions
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection:
|
||||
"""This class is a proxy to run queries against the database,
|
||||
it is:
|
||||
- lazy, since it creates a connection only when needed
|
||||
- resilient, because before raising exceptions it tries
|
||||
more times to run the query or open a connection.
|
||||
"""
|
||||
|
||||
def __init__(self, host=None, port=None, db=None, max_tries=3):
|
||||
"""Create a new Connection instance.
|
||||
|
||||
Args:
|
||||
host (str, optional): the host to connect to.
|
||||
port (int, optional): the port to connect to.
|
||||
db (str, optional): the database to use.
|
||||
max_tries (int, optional): how many tries before giving up.
|
||||
"""
|
||||
|
||||
self.host = host or bigchaindb.config['database']['host']
|
||||
self.port = port or bigchaindb.config['database']['port']
|
||||
self.db = db or bigchaindb.config['database']['name']
|
||||
self.max_tries = max_tries
|
||||
self.conn = None
|
||||
|
||||
def run(self, query):
|
||||
"""Run a query.
|
||||
|
||||
Args:
|
||||
query: the RethinkDB query.
|
||||
"""
|
||||
|
||||
if self.conn is None:
|
||||
self._connect()
|
||||
|
||||
for i in range(self.max_tries):
|
||||
try:
|
||||
return query.run(self.conn)
|
||||
except r.ReqlDriverError as exc:
|
||||
if i + 1 == self.max_tries:
|
||||
raise
|
||||
else:
|
||||
self._connect()
|
||||
|
||||
def _connect(self):
|
||||
for i in range(self.max_tries):
|
||||
try:
|
||||
self.conn = r.connect(host=self.host, port=self.port,
|
||||
db=self.db)
|
||||
except r.ReqlDriverError as exc:
|
||||
if i + 1 == self.max_tries:
|
||||
raise
|
||||
else:
|
||||
time.sleep(2**i)
|
||||
|
||||
|
||||
def get_conn():
|
||||
'''Get the connection to the database.'''
|
||||
|
||||
|
@ -42,36 +42,46 @@ class Block:
|
||||
|
||||
if tx['assignee'] == self.bigchain.me:
|
||||
tx.pop('assignee')
|
||||
tx.pop('assignment_timestamp')
|
||||
return tx
|
||||
|
||||
def delete_tx(self, tx):
|
||||
"""Delete a transaction.
|
||||
|
||||
Args:
|
||||
tx (dict): the transaction to delete.
|
||||
|
||||
Returns:
|
||||
The transaction.
|
||||
"""
|
||||
r.table('backlog')\
|
||||
.get(tx['id'])\
|
||||
.delete(durability='hard')\
|
||||
.run(self.bigchain.conn)
|
||||
|
||||
return tx
|
||||
|
||||
def validate_tx(self, tx):
|
||||
"""Validate a transaction.
|
||||
|
||||
Also checks if the transaction already exists in the blockchain. If it
|
||||
does, or it's invalid, it's deleted from the backlog immediately.
|
||||
|
||||
Args:
|
||||
tx (dict): the transaction to validate.
|
||||
|
||||
Returns:
|
||||
The transaction if valid, ``None`` otherwise.
|
||||
"""
|
||||
tx = self.bigchain.is_valid_transaction(tx)
|
||||
if tx:
|
||||
if self.bigchain.transaction_exists(tx['id']):
|
||||
# if the transaction already exists, we must check whether
|
||||
# it's in a valid or undecided block
|
||||
tx, status = self.bigchain.get_transaction(tx['id'],
|
||||
include_status=True)
|
||||
if status == self.bigchain.TX_VALID \
|
||||
or status == self.bigchain.TX_UNDECIDED:
|
||||
# if the tx is already in a valid or undecided block,
|
||||
# then it no longer should be in the backlog, or added
|
||||
# to a new block. We can delete and drop it.
|
||||
r.table('backlog').get(tx['id']) \
|
||||
.delete(durability='hard') \
|
||||
.run(self.bigchain.conn)
|
||||
return None
|
||||
|
||||
tx_validated = self.bigchain.is_valid_transaction(tx)
|
||||
if tx_validated:
|
||||
return tx
|
||||
else:
|
||||
# if the transaction is not valid, remove it from the
|
||||
# backlog
|
||||
r.table('backlog').get(tx['id']) \
|
||||
.delete(durability='hard') \
|
||||
.run(self.bigchain.conn)
|
||||
return None
|
||||
|
||||
def create(self, tx, timeout=False):
|
||||
"""Create a block.
|
||||
@ -112,25 +122,44 @@ class Block:
|
||||
self.bigchain.write_block(block)
|
||||
return block
|
||||
|
||||
def delete_tx(self, block):
|
||||
"""Delete transactions.
|
||||
|
||||
Args:
|
||||
block (dict): the block containg the transactions to delete.
|
||||
|
||||
Returns:
|
||||
The block.
|
||||
"""
|
||||
r.table('backlog')\
|
||||
.get_all(*[tx['id'] for tx in block['block']['transactions']])\
|
||||
.delete(durability='hard')\
|
||||
.run(self.bigchain.conn)
|
||||
|
||||
return block
|
||||
|
||||
|
||||
def initial():
|
||||
"""Return old transactions from the backlog."""
|
||||
|
||||
b = Bigchain()
|
||||
|
||||
rs = r.table('backlog')\
|
||||
.between([b.me, r.minval],
|
||||
[b.me, r.maxval],
|
||||
index='assignee__transaction_timestamp')\
|
||||
.order_by(index=r.asc('assignee__transaction_timestamp'))\
|
||||
.run(b.conn)
|
||||
rs = b.connection.run(
|
||||
r.table('backlog')
|
||||
.between(
|
||||
[b.me, r.minval],
|
||||
[b.me, r.maxval],
|
||||
index='assignee__transaction_timestamp')
|
||||
.order_by(index=r.asc('assignee__transaction_timestamp')))
|
||||
|
||||
return rs
|
||||
|
||||
|
||||
def get_changefeed():
|
||||
"""Create and return the changefeed for the backlog."""
|
||||
|
||||
return ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=initial())
|
||||
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
|
||||
prefeed=initial())
|
||||
|
||||
|
||||
def create_pipeline():
|
||||
@ -141,10 +170,10 @@ def create_pipeline():
|
||||
|
||||
block_pipeline = Pipeline([
|
||||
Node(block.filter_tx),
|
||||
Node(block.delete_tx),
|
||||
Node(block.validate_tx, fraction_of_cores=1),
|
||||
Node(block.create, timeout=1),
|
||||
Node(block.write),
|
||||
Node(block.delete_tx),
|
||||
])
|
||||
|
||||
return block_pipeline
|
||||
|
@ -25,9 +25,10 @@ class Election:
|
||||
"""
|
||||
Checks if block has enough invalid votes to make a decision
|
||||
"""
|
||||
next_block = r.table('bigchain')\
|
||||
.get(next_vote['vote']['voting_for_block'])\
|
||||
.run(self.bigchain.conn)
|
||||
next_block = self.bigchain.connection.run(
|
||||
r.table('bigchain')
|
||||
.get(next_vote['vote']['voting_for_block']))
|
||||
|
||||
if self.bigchain.block_election_status(next_block) == self.bigchain.BLOCK_INVALID:
|
||||
return next_block
|
||||
|
||||
|
76
bigchaindb/pipelines/stale.py
Normal file
76
bigchaindb/pipelines/stale.py
Normal file
@ -0,0 +1,76 @@
|
||||
"""This module monitors for stale transactions.
|
||||
|
||||
It reassigns transactions which have been assigned a node but
|
||||
remain in the backlog past a certain amount of time.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from multipipes import Pipeline, Node
|
||||
from bigchaindb import Bigchain
|
||||
from time import sleep
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StaleTransactionMonitor:
|
||||
"""This class encapsulates the logic for re-assigning stale transactions.
|
||||
|
||||
Note:
|
||||
Methods of this class will be executed in different processes.
|
||||
"""
|
||||
|
||||
def __init__(self, timeout=5, backlog_reassign_delay=None):
|
||||
"""Initialize StaleTransaction monitor
|
||||
|
||||
Args:
|
||||
timeout: how often to check for stale tx (in sec)
|
||||
backlog_reassign_delay: How stale a transaction should
|
||||
be before reassignment (in sec). If supplied, overrides the
|
||||
Bigchain default value.
|
||||
"""
|
||||
self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay)
|
||||
self.timeout = timeout
|
||||
|
||||
def check_transactions(self):
|
||||
"""Poll backlog for stale transactions
|
||||
|
||||
Returns:
|
||||
txs (list): txs to be re assigned
|
||||
"""
|
||||
sleep(self.timeout)
|
||||
for tx in self.bigchain.get_stale_transactions():
|
||||
yield tx
|
||||
|
||||
def reassign_transactions(self, tx):
|
||||
"""Put tx back in backlog with new assignee
|
||||
|
||||
Returns:
|
||||
transaction
|
||||
"""
|
||||
self.bigchain.reassign_transaction(tx)
|
||||
return tx
|
||||
|
||||
|
||||
def create_pipeline(timeout=5, backlog_reassign_delay=5):
|
||||
"""Create and return the pipeline of operations to be distributed
|
||||
on different processes."""
|
||||
|
||||
stm = StaleTransactionMonitor(timeout=timeout,
|
||||
backlog_reassign_delay=backlog_reassign_delay)
|
||||
|
||||
monitor_pipeline = Pipeline([
|
||||
Node(stm.check_transactions),
|
||||
Node(stm.reassign_transactions)
|
||||
])
|
||||
|
||||
return monitor_pipeline
|
||||
|
||||
|
||||
def start(timeout=5, backlog_reassign_delay=5):
|
||||
"""Create, start, and return the block pipeline."""
|
||||
pipeline = create_pipeline(timeout=timeout,
|
||||
backlog_reassign_delay=backlog_reassign_delay)
|
||||
pipeline.setup()
|
||||
pipeline.start()
|
||||
return pipeline
|
@ -1,12 +1,17 @@
|
||||
"""Utility classes and functions to work with the pipelines."""
|
||||
|
||||
|
||||
import time
|
||||
import rethinkdb as r
|
||||
import logging
|
||||
from multipipes import Node
|
||||
|
||||
from bigchaindb import Bigchain
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChangeFeed(Node):
|
||||
"""This class wraps a RethinkDB changefeed adding a `prefeed`.
|
||||
|
||||
@ -24,7 +29,7 @@ class ChangeFeed(Node):
|
||||
DELETE = 2
|
||||
UPDATE = 4
|
||||
|
||||
def __init__(self, table, operation, prefeed=None):
|
||||
def __init__(self, table, operation, prefeed=None, bigchain=None):
|
||||
"""Create a new RethinkDB ChangeFeed.
|
||||
|
||||
Args:
|
||||
@ -35,20 +40,29 @@ class ChangeFeed(Node):
|
||||
(e.g. ``ChangeFeed.INSERT | ChangeFeed.UPDATE``)
|
||||
prefeed (iterable): whatever set of data you want to be published
|
||||
first.
|
||||
bigchain (``Bigchain``): the bigchain instance to use (can be None).
|
||||
"""
|
||||
|
||||
super().__init__(name='changefeed')
|
||||
self.prefeed = prefeed if prefeed else []
|
||||
self.table = table
|
||||
self.operation = operation
|
||||
self.bigchain = Bigchain()
|
||||
self.bigchain = bigchain or Bigchain()
|
||||
|
||||
def run_forever(self):
|
||||
for element in self.prefeed:
|
||||
self.outqueue.put(element)
|
||||
|
||||
for change in r.table(self.table).changes().run(self.bigchain.conn):
|
||||
while True:
|
||||
try:
|
||||
self.run_changefeed()
|
||||
break
|
||||
except (r.ReqlDriverError, r.ReqlOpFailedError) as exc:
|
||||
logger.exception(exc)
|
||||
time.sleep(1)
|
||||
|
||||
def run_changefeed(self):
|
||||
for change in self.bigchain.connection.run(r.table(self.table).changes()):
|
||||
is_insert = change['old_val'] is None
|
||||
is_delete = change['new_val'] is None
|
||||
is_update = not is_insert and not is_delete
|
||||
@ -58,5 +72,5 @@ class ChangeFeed(Node):
|
||||
elif is_delete and (self.operation & ChangeFeed.DELETE):
|
||||
self.outqueue.put(change['old_val'])
|
||||
elif is_update and (self.operation & ChangeFeed.UPDATE):
|
||||
self.outqueue.put(change)
|
||||
self.outqueue.put(change['new_val'])
|
||||
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import multiprocessing as mp
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.pipelines import vote, block, election
|
||||
from bigchaindb.pipelines import vote, block, election, stale
|
||||
from bigchaindb.web import server
|
||||
|
||||
|
||||
@ -31,6 +31,9 @@ def start():
|
||||
logger.info('Starting voter')
|
||||
vote.start()
|
||||
|
||||
logger.info('Starting stale transaction monitor')
|
||||
stale.start()
|
||||
|
||||
logger.info('Starting election')
|
||||
election.start()
|
||||
|
||||
|
@ -58,8 +58,9 @@ Drop (erase) the RethinkDB database. You will be prompted to make sure. If you w
|
||||
## bigchaindb start
|
||||
|
||||
Start BigchainDB. It always begins by trying a `bigchaindb init` first. See the note in the documentation for `bigchaindb init`.
|
||||
You can also use the `--experimental-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running,
|
||||
e.g. `bigchaindb --experimental-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops.
|
||||
You can also use the `--dev-start-rethinkdb` command line option to automatically start rethinkdb with bigchaindb if rethinkdb is not already running,
|
||||
e.g. `bigchaindb --dev-start-rethinkdb start`. Note that this will also shutdown rethinkdb when the bigchaindb process stops.
|
||||
The option `--dev-allow-temp-keypair` will generate a keypair on the fly if no keypair is found, this is useful when you want to run a temporary instance of BigchainDB in a Docker container, for example.
|
||||
|
||||
|
||||
## bigchaindb load
|
||||
|
@ -132,6 +132,7 @@ class TestBigchainApi(object):
|
||||
|
||||
response, status = b.get_transaction(tx_signed["id"], include_status=True)
|
||||
response.pop('assignee')
|
||||
response.pop('assignment_timestamp')
|
||||
# add validity information, which will be returned
|
||||
assert util.serialize(tx_signed) == util.serialize(response)
|
||||
assert status == b.TX_IN_BACKLOG
|
||||
|
@ -4,7 +4,6 @@ import pytest
|
||||
import rethinkdb as r
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb import util
|
||||
from bigchaindb.db import utils
|
||||
from .conftest import setup_database as _setup_database
|
||||
|
||||
|
@ -14,9 +14,14 @@ def test_filter_by_assignee(b, user_vk):
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
tx['assignee'] = b.me
|
||||
tx['assignment_timestamp'] = 111
|
||||
|
||||
# filter_tx has side effects on the `tx` instance by popping 'assignee'
|
||||
assert block_maker.filter_tx(tx) == tx
|
||||
# and 'assignment_timestamp'
|
||||
filtered_tx = block_maker.filter_tx(tx)
|
||||
assert filtered_tx == tx
|
||||
assert 'assignee' not in filtered_tx
|
||||
assert 'assignment_timestamp' not in filtered_tx
|
||||
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
@ -69,21 +74,59 @@ def test_write_block(b, user_vk):
|
||||
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
|
||||
|
||||
|
||||
def test_duplicate_transaction(b, user_vk):
|
||||
block_maker = block.Block()
|
||||
|
||||
txs = []
|
||||
for i in range(10):
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
txs.append(tx)
|
||||
|
||||
block_doc = b.create_block(txs)
|
||||
block_maker.write(block_doc)
|
||||
|
||||
# block is in bigchain
|
||||
assert r.table('bigchain').get(block_doc['id']).run(b.conn) == block_doc
|
||||
|
||||
b.write_transaction(txs[0])
|
||||
|
||||
# verify tx is in the backlog
|
||||
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is not None
|
||||
|
||||
# try to validate a transaction that's already in the chain; should not
|
||||
# work
|
||||
assert block_maker.validate_tx(txs[0]) is None
|
||||
|
||||
# duplicate tx should be removed from backlog
|
||||
assert r.table('backlog').get(txs[0]['id']).run(b.conn) is None
|
||||
|
||||
|
||||
def test_delete_tx(b, user_vk):
|
||||
block_maker = block.Block()
|
||||
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
b.write_transaction(tx)
|
||||
for i in range(100):
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
block_maker.create(tx)
|
||||
# make sure the tx appears in the backlog
|
||||
b.write_transaction(tx)
|
||||
|
||||
tx_backlog = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
tx_backlog.pop('assignee')
|
||||
assert tx_backlog == tx
|
||||
# force the output triggering a `timeout`
|
||||
block_doc = block_maker.create(None, timeout=True)
|
||||
|
||||
returned_tx = block_maker.delete_tx(tx)
|
||||
for tx in block_doc['block']['transactions']:
|
||||
returned_tx = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
returned_tx.pop('assignee')
|
||||
returned_tx.pop('assignment_timestamp')
|
||||
assert returned_tx == tx
|
||||
|
||||
assert returned_tx == tx
|
||||
assert r.table('backlog').get(tx['id']).run(b.conn) is None
|
||||
returned_block = block_maker.delete_tx(block_doc)
|
||||
|
||||
assert returned_block == block_doc
|
||||
|
||||
for tx in block_doc['block']['transactions']:
|
||||
assert r.table('backlog').get(tx['id']).run(b.conn) is None
|
||||
|
||||
|
||||
def test_prefeed(b, user_vk):
|
||||
@ -115,6 +158,7 @@ def test_full_pipeline(b, user_vk):
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
assignee = random.choice([b.me, 'aaa', 'bbb', 'ccc'])
|
||||
tx['assignee'] = assignee
|
||||
tx['assignment_timestamp'] = time.time()
|
||||
if assignee == b.me:
|
||||
count_assigned_to_me += 1
|
||||
r.table('backlog').insert(tx, durability='hard').run(b.conn)
|
||||
|
@ -99,6 +99,7 @@ def test_check_requeue_transaction(b, user_vk):
|
||||
e.requeue_transactions(test_block)
|
||||
tx_backlog = r.table('backlog').get(tx1['id']).run(b.conn)
|
||||
tx_backlog.pop('assignee')
|
||||
tx_backlog.pop('assignment_timestamp')
|
||||
|
||||
assert tx_backlog == tx1
|
||||
|
||||
|
116
tests/pipelines/test_stale_monitor.py
Normal file
116
tests/pipelines/test_stale_monitor.py
Normal file
@ -0,0 +1,116 @@
|
||||
import rethinkdb as r
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.pipelines import stale
|
||||
from multipipes import Pipe, Pipeline
|
||||
from unittest.mock import patch
|
||||
from bigchaindb import config_utils
|
||||
import time
|
||||
import os
|
||||
|
||||
|
||||
def test_get_stale(b, user_vk):
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
b.write_transaction(tx, durability='hard')
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
tx_stale = stm.check_transactions()
|
||||
|
||||
for _tx in tx_stale:
|
||||
_tx.pop('assignee')
|
||||
_tx.pop('assignment_timestamp')
|
||||
assert tx == _tx
|
||||
|
||||
|
||||
def test_reassign_transactions(b, user_vk):
|
||||
# test with single node
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
b.write_transaction(tx, durability='hard')
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
stm.reassign_transactions(tx)
|
||||
|
||||
# test with federation
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
b.write_transaction(tx, durability='hard')
|
||||
|
||||
stm = stale.StaleTransactionMonitor(timeout=0.001,
|
||||
backlog_reassign_delay=0.001)
|
||||
stm.bigchain.nodes_except_me = ['aaa', 'bbb', 'ccc']
|
||||
tx = list(r.table('backlog').run(b.conn))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
|
||||
reassigned_tx = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
assert reassigned_tx['assignment_timestamp'] > tx['assignment_timestamp']
|
||||
assert reassigned_tx['assignee'] != tx['assignee']
|
||||
|
||||
# test with node not in federation
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
tx.update({'assignee': 'lol'})
|
||||
tx.update({'assignment_timestamp': time.time()})
|
||||
r.table('backlog').insert(tx, durability='hard').run(b.conn)
|
||||
|
||||
tx = list(r.table('backlog').run(b.conn))[0]
|
||||
stm.reassign_transactions(tx)
|
||||
assert r.table('backlog').get(tx['id']).run(b.conn)['assignee'] != 'lol'
|
||||
|
||||
|
||||
def test_full_pipeline(user_vk):
|
||||
CONFIG = {
|
||||
'database': {
|
||||
'name': 'bigchain_test_{}'.format(os.getpid())
|
||||
},
|
||||
'keypair': {
|
||||
'private': '31Lb1ZGKTyHnmVK3LUMrAUrPNfd4sE2YyBt3UA4A25aA',
|
||||
'public': '4XYfCbabAWVUCbjTmRTFEu2sc3dFEdkse4r6X498B1s8'
|
||||
},
|
||||
'keyring': ['aaa', 'bbb'],
|
||||
'backlog_reassign_delay': 0.01
|
||||
}
|
||||
config_utils.set_config(CONFIG)
|
||||
b = Bigchain()
|
||||
outpipe = Pipe()
|
||||
|
||||
original_txs = {}
|
||||
|
||||
for i in range(100):
|
||||
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
|
||||
tx = b.sign_transaction(tx, b.me_private)
|
||||
|
||||
b.write_transaction(tx)
|
||||
original_txs[tx['id']] = r.table('backlog').get(tx['id']).run(b.conn)
|
||||
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
|
||||
pipeline = stale.create_pipeline(backlog_reassign_delay=1,
|
||||
timeout=1)
|
||||
pipeline.setup(outdata=outpipe)
|
||||
pipeline.start()
|
||||
|
||||
# timing should be careful -- test will fail if reassignment happens multiple times
|
||||
time.sleep(2)
|
||||
pipeline.terminate()
|
||||
|
||||
# to terminate
|
||||
outpipe.get()
|
||||
|
||||
assert r.table('backlog').count().run(b.conn) == 100
|
||||
reassigned_txs = list(r.table('backlog').run(b.conn))
|
||||
|
||||
# check that every assignment timestamp has increased, and every tx has a new assignee
|
||||
for reassigned_tx in reassigned_txs:
|
||||
assert reassigned_tx['assignment_timestamp'] > original_txs[reassigned_tx['id']]['assignment_timestamp']
|
||||
assert reassigned_tx['assignee'] != original_txs[reassigned_tx['id']]['assignee']
|
||||
|
||||
@patch.object(Pipeline, 'start')
|
||||
def test_start(mock_start):
|
||||
# TODO: `sta,e.start` is just a wrapper around `block.create_pipeline`,
|
||||
# that is tested by `test_full_pipeline`.
|
||||
# If anyone has better ideas on how to test this, please do a PR :)
|
||||
stale.start()
|
||||
mock_start.assert_called_with()
|
@ -1,8 +1,7 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import rethinkdb
|
||||
|
||||
from multipipes import Pipe
|
||||
from bigchaindb.db.utils import Connection
|
||||
from bigchaindb.pipelines.utils import ChangeFeed
|
||||
|
||||
|
||||
@ -18,7 +17,7 @@ MOCK_CHANGEFEED_DATA = [{
|
||||
}]
|
||||
|
||||
|
||||
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
def test_changefeed_insert(mock_run):
|
||||
outpipe = Pipe()
|
||||
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT)
|
||||
@ -28,7 +27,7 @@ def test_changefeed_insert(mock_run):
|
||||
assert outpipe.qsize() == 0
|
||||
|
||||
|
||||
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
def test_changefeed_delete(mock_run):
|
||||
outpipe = Pipe()
|
||||
changefeed = ChangeFeed('backlog', ChangeFeed.DELETE)
|
||||
@ -38,30 +37,28 @@ def test_changefeed_delete(mock_run):
|
||||
assert outpipe.qsize() == 0
|
||||
|
||||
|
||||
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
def test_changefeed_update(mock_run):
|
||||
outpipe = Pipe()
|
||||
changefeed = ChangeFeed('backlog', ChangeFeed.UPDATE)
|
||||
changefeed.outqueue = outpipe
|
||||
changefeed.run_forever()
|
||||
assert outpipe.get() == {'new_val': 'seems like we have an update here',
|
||||
'old_val': 'seems like we have an update here'}
|
||||
assert outpipe.get() == 'seems like we have an update here'
|
||||
assert outpipe.qsize() == 0
|
||||
|
||||
|
||||
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
def test_changefeed_multiple_operations(mock_run):
|
||||
outpipe = Pipe()
|
||||
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
|
||||
changefeed.outqueue = outpipe
|
||||
changefeed.run_forever()
|
||||
assert outpipe.get() == 'seems like we have an insert here'
|
||||
assert outpipe.get() == {'new_val': 'seems like we have an update here',
|
||||
'old_val': 'seems like we have an update here'}
|
||||
assert outpipe.get() == 'seems like we have an update here'
|
||||
assert outpipe.qsize() == 0
|
||||
|
||||
|
||||
@patch.object(rethinkdb.ast.RqlQuery, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
@patch.object(Connection, 'run', return_value=MOCK_CHANGEFEED_DATA)
|
||||
def test_changefeed_prefeed(mock_run):
|
||||
outpipe = Pipe()
|
||||
changefeed = ChangeFeed('backlog', ChangeFeed.INSERT, prefeed=[1, 2, 3])
|
||||
|
@ -57,23 +57,85 @@ def mock_bigchaindb_backup_config(monkeypatch):
|
||||
'keypair': {},
|
||||
'database': {'host': 'host', 'port': 12345, 'name': 'adbname'},
|
||||
'statsd': {'host': 'host', 'port': 12345, 'rate': 0.1},
|
||||
'backlog_reassign_delay': 5
|
||||
}
|
||||
monkeypatch.setattr('bigchaindb._config', config)
|
||||
|
||||
|
||||
def test_make_sure_we_dont_remove_any_command():
|
||||
# thanks to: http://stackoverflow.com/a/18161115/597097
|
||||
from bigchaindb.commands.bigchain import create_parser
|
||||
|
||||
parser = create_parser()
|
||||
|
||||
assert parser.parse_args(['configure']).command
|
||||
assert parser.parse_args(['show-config']).command
|
||||
assert parser.parse_args(['export-my-pubkey']).command
|
||||
assert parser.parse_args(['init']).command
|
||||
assert parser.parse_args(['drop']).command
|
||||
assert parser.parse_args(['start']).command
|
||||
assert parser.parse_args(['set-shards', '1']).command
|
||||
assert parser.parse_args(['set-replicas', '1']).command
|
||||
assert parser.parse_args(['load']).command
|
||||
|
||||
|
||||
def test_start_raises_if_command_not_implemented():
|
||||
from bigchaindb.commands.bigchain import utils
|
||||
from bigchaindb.commands.bigchain import create_parser
|
||||
|
||||
parser = create_parser()
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
# Will raise because `scope`, the third parameter,
|
||||
# doesn't contain the function `run_configure`
|
||||
utils.start(parser, ['configure'], {})
|
||||
|
||||
|
||||
def test_start_raises_if_no_arguments_given():
|
||||
from bigchaindb.commands.bigchain import utils
|
||||
from bigchaindb.commands.bigchain import create_parser
|
||||
|
||||
parser = create_parser()
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
utils.start(parser, [], {})
|
||||
|
||||
|
||||
@patch('multiprocessing.cpu_count', return_value=42)
|
||||
def test_start_sets_multiprocess_var_based_on_cli_args(mock_cpu_count):
|
||||
from bigchaindb.commands.bigchain import utils
|
||||
from bigchaindb.commands.bigchain import create_parser
|
||||
|
||||
def run_load(args):
|
||||
return args
|
||||
|
||||
parser = create_parser()
|
||||
|
||||
assert utils.start(parser, ['load'], {'run_load': run_load}).multiprocess == 1
|
||||
assert utils.start(parser, ['load', '--multiprocess'], {'run_load': run_load}).multiprocess == 42
|
||||
|
||||
|
||||
@patch('bigchaindb.commands.utils.start')
|
||||
def test_main_entrypoint(mock_start):
|
||||
from bigchaindb.commands.bigchain import main
|
||||
main()
|
||||
|
||||
assert mock_start.called
|
||||
|
||||
|
||||
def test_bigchain_run_start(mock_run_configure, mock_processes_start, mock_db_init_with_existing_db):
|
||||
from bigchaindb.commands.bigchain import run_start
|
||||
args = Namespace(start_rethinkdb=False, config=None, yes=True)
|
||||
args = Namespace(start_rethinkdb=False, allow_temp_keypair=False, config=None, yes=True)
|
||||
run_start(args)
|
||||
|
||||
|
||||
@patch('bigchaindb.commands.utils.start_rethinkdb')
|
||||
@patch('bigchaindb.commands.utils.start_rethinkdb', return_value=Mock())
|
||||
def test_bigchain_run_start_with_rethinkdb(mock_start_rethinkdb,
|
||||
mock_run_configure,
|
||||
mock_processes_start,
|
||||
mock_db_init_with_existing_db):
|
||||
from bigchaindb.commands.bigchain import run_start
|
||||
args = Namespace(start_rethinkdb=True, config=None, yes=True)
|
||||
args = Namespace(start_rethinkdb=True, allow_temp_keypair=False, config=None, yes=True)
|
||||
run_start(args)
|
||||
|
||||
mock_start_rethinkdb.assert_called_with()
|
||||
@ -213,7 +275,9 @@ def test_run_configure_when_config_does_exist(monkeypatch,
|
||||
@patch('subprocess.Popen')
|
||||
def test_start_rethinkdb_returns_a_process_when_successful(mock_popen):
|
||||
from bigchaindb.commands import utils
|
||||
mock_popen.return_value = Mock(stdout=['Server ready'])
|
||||
mock_popen.return_value = Mock(stdout=[
|
||||
'Listening for client driver 1234',
|
||||
'Server ready'])
|
||||
assert utils.start_rethinkdb() is mock_popen.return_value
|
||||
|
||||
|
||||
@ -226,6 +290,45 @@ def test_start_rethinkdb_exits_when_cannot_start(mock_popen):
|
||||
utils.start_rethinkdb()
|
||||
|
||||
|
||||
@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key',
|
||||
'public_key'))
|
||||
def test_allow_temp_keypair_generates_one_on_the_fly(mock_gen_keypair,
|
||||
mock_processes_start,
|
||||
mock_db_init_with_existing_db):
|
||||
import bigchaindb
|
||||
from bigchaindb.commands.bigchain import run_start
|
||||
|
||||
bigchaindb.config['keypair'] = { 'private': None, 'public': None }
|
||||
|
||||
args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True)
|
||||
run_start(args)
|
||||
|
||||
assert bigchaindb.config['keypair']['private'] == 'private_key'
|
||||
assert bigchaindb.config['keypair']['public'] == 'public_key'
|
||||
|
||||
|
||||
@patch('bigchaindb.crypto.generate_key_pair', return_value=('private_key',
|
||||
'public_key'))
|
||||
def test_allow_temp_keypair_doesnt_override_if_keypair_found(mock_gen_keypair,
|
||||
mock_processes_start,
|
||||
mock_db_init_with_existing_db):
|
||||
import bigchaindb
|
||||
from bigchaindb.commands.bigchain import run_start
|
||||
|
||||
# Preconditions for the test
|
||||
original_private_key = bigchaindb.config['keypair']['private']
|
||||
original_public_key = bigchaindb.config['keypair']['public']
|
||||
|
||||
assert isinstance(original_public_key, str)
|
||||
assert isinstance(original_private_key, str)
|
||||
|
||||
args = Namespace(allow_temp_keypair=True, start_rethinkdb=False, config=None, yes=True)
|
||||
run_start(args)
|
||||
|
||||
assert bigchaindb.config['keypair']['private'] == original_private_key
|
||||
assert bigchaindb.config['keypair']['public'] == original_public_key
|
||||
|
||||
|
||||
@patch('rethinkdb.ast.Table.reconfigure')
|
||||
def test_set_shards(mock_reconfigure, monkeypatch, b):
|
||||
from bigchaindb.commands.bigchain import run_set_shards
|
||||
|
@ -145,7 +145,8 @@ def test_env_config(monkeypatch):
|
||||
|
||||
def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
|
||||
file_config = {
|
||||
'database': {'host': 'test-host'}
|
||||
'database': {'host': 'test-host'},
|
||||
'backlog_reassign_delay': 5
|
||||
}
|
||||
monkeypatch.setattr('bigchaindb.config_utils.file_config', lambda *args, **kwargs: file_config)
|
||||
monkeypatch.setattr('os.environ', {'BIGCHAINDB_DATABASE_NAME': 'test-dbname',
|
||||
@ -180,6 +181,7 @@ def test_autoconfigure_read_both_from_file_and_env(monkeypatch):
|
||||
},
|
||||
'api_endpoint': 'http://localhost:9984/api/v1',
|
||||
'consensus_plugin': 'default',
|
||||
'backlog_reassign_delay': 5
|
||||
}
|
||||
|
||||
|
||||
|
@ -19,7 +19,8 @@ def config(request, monkeypatch):
|
||||
},
|
||||
'keyring': [],
|
||||
'CONFIGURED': True,
|
||||
'consensus_plugin': 'default'
|
||||
'consensus_plugin': 'default',
|
||||
'backlog_reassign_delay': 30
|
||||
}
|
||||
|
||||
monkeypatch.setattr('bigchaindb.config', config)
|
||||
|
23
tests/test_processes.py
Normal file
23
tests/test_processes.py
Normal file
@ -0,0 +1,23 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from multiprocessing import Process
|
||||
from bigchaindb.pipelines import vote, block, election, stale
|
||||
|
||||
|
||||
@patch.object(stale, 'start')
|
||||
@patch.object(election, 'start')
|
||||
@patch.object(block, 'start')
|
||||
@patch.object(vote, 'start')
|
||||
@patch.object(Process, 'start')
|
||||
def test_processes_start(mock_vote, mock_block, mock_election, mock_stale,
|
||||
mock_process):
|
||||
from bigchaindb import processes
|
||||
|
||||
processes.start()
|
||||
|
||||
mock_vote.assert_called_with()
|
||||
mock_block.assert_called_with()
|
||||
mock_election.assert_called_with()
|
||||
mock_stale.assert_called_with()
|
||||
mock_process.assert_called_with()
|
||||
|
105
tests/test_run_query_util.py
Normal file
105
tests/test_run_query_util.py
Normal file
@ -0,0 +1,105 @@
|
||||
from threading import Thread
|
||||
import pytest
|
||||
|
||||
import rethinkdb as r
|
||||
|
||||
from bigchaindb.db.utils import Connection
|
||||
|
||||
|
||||
def test_run_a_simple_query():
|
||||
conn = Connection()
|
||||
query = r.expr('1')
|
||||
assert conn.run(query) == '1'
|
||||
|
||||
|
||||
def test_raise_exception_when_max_tries():
|
||||
class MockQuery:
|
||||
def run(self, conn):
|
||||
raise r.ReqlDriverError('mock')
|
||||
|
||||
conn = Connection()
|
||||
|
||||
with pytest.raises(r.ReqlDriverError):
|
||||
conn.run(MockQuery())
|
||||
|
||||
|
||||
def test_reconnect_when_connection_lost():
|
||||
import time
|
||||
import rethinkdb as r
|
||||
|
||||
def raise_exception(*args, **kwargs):
|
||||
raise r.ReqlDriverError('mock')
|
||||
|
||||
conn = Connection()
|
||||
original_connect = r.connect
|
||||
r.connect = raise_exception
|
||||
|
||||
def delayed_start():
|
||||
time.sleep(1)
|
||||
r.connect = original_connect
|
||||
|
||||
thread = Thread(target=delayed_start)
|
||||
query = r.expr('1')
|
||||
thread.start()
|
||||
assert conn.run(query) == '1'
|
||||
|
||||
|
||||
def test_changefeed_reconnects_when_connection_lost(monkeypatch):
|
||||
import time
|
||||
import multiprocessing as mp
|
||||
|
||||
from bigchaindb import Bigchain
|
||||
from bigchaindb.pipelines.utils import ChangeFeed
|
||||
|
||||
class MockConnection:
|
||||
tries = 0
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
self.tries += 1
|
||||
if self.tries == 1:
|
||||
raise r.ReqlDriverError('mock')
|
||||
elif self.tries == 2:
|
||||
return { 'new_val': { 'fact': 'A group of cats is called a clowder.' },
|
||||
'old_val': None }
|
||||
if self.tries == 3:
|
||||
raise r.ReqlDriverError('mock')
|
||||
elif self.tries == 4:
|
||||
return { 'new_val': {'fact': 'Cats sleep 70% of their lives.' },
|
||||
'old_val': None }
|
||||
else:
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
bigchain = Bigchain()
|
||||
bigchain.connection = MockConnection()
|
||||
changefeed = ChangeFeed('cat_facts', ChangeFeed.INSERT,
|
||||
bigchain=bigchain)
|
||||
changefeed.outqueue = mp.Queue()
|
||||
t_changefeed = Thread(target=changefeed.run_forever, daemon=True)
|
||||
|
||||
t_changefeed.start()
|
||||
time.sleep(1)
|
||||
# try 1: MockConnection raises an error that will stop the
|
||||
# ChangeFeed instance from iterating for 1 second.
|
||||
|
||||
# try 2: MockConnection releases a new record. The new record
|
||||
# will be put in the outqueue of the ChangeFeed instance.
|
||||
fact = changefeed.outqueue.get()['fact']
|
||||
assert fact == 'A group of cats is called a clowder.'
|
||||
|
||||
# try 3: MockConnection raises an error that will stop the
|
||||
# ChangeFeed instance from iterating for 1 second.
|
||||
assert t_changefeed.is_alive() is True
|
||||
|
||||
time.sleep(2)
|
||||
# try 4: MockConnection releases a new record. The new record
|
||||
# will be put in the outqueue of the ChangeFeed instance.
|
||||
|
||||
fact = changefeed.outqueue.get()['fact']
|
||||
assert fact == 'Cats sleep 70% of their lives.'
|
Loading…
x
Reference in New Issue
Block a user