mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-10-13 13:34:05 +00:00
Merge pull request #1100 from bigchaindb/implement-connection-run-for-mongodb
[WIP] Add run interface to MongoDB connection (aka: lazy queries)
This commit is contained in:
commit
e6d99d74c0
@ -32,6 +32,7 @@ def connect(backend=None, host=None, port=None, name=None, replicaset=None):
|
||||
based on the given (or defaulted) :attr:`backend`.
|
||||
|
||||
Raises:
|
||||
:exc:`~ConnectionError`: If the connection to the database fails.
|
||||
:exc:`~ConfigurationError`: If the given (or defaulted) :attr:`backend`
|
||||
is not supported or could not be loaded.
|
||||
"""
|
||||
@ -83,6 +84,13 @@ class Connection:
|
||||
|
||||
Args:
|
||||
query: the query to run
|
||||
Raises:
|
||||
:exc:`~DuplicateKeyError`: If the query fails because of a
|
||||
duplicate key constraint.
|
||||
:exc:`~OperationFailure`: If the query fails for any other
|
||||
reason.
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
@ -1,5 +1,17 @@
|
||||
from bigchaindb.exceptions import BigchainDBError
|
||||
|
||||
|
||||
class DatabaseOpFailedError(BigchainDBError):
|
||||
"""Exception for database operation errors."""
|
||||
class BackendError(BigchainDBError):
|
||||
"""Top level exception for any backend exception."""
|
||||
|
||||
|
||||
class ConnectionError(BackendError):
|
||||
"""Exception raised when the connection to the backend fails."""
|
||||
|
||||
|
||||
class OperationError(BackendError):
|
||||
"""Exception raised when a backend operation fails."""
|
||||
|
||||
|
||||
class DuplicateKeyError(OperationError):
|
||||
"""Exception raised when an insert fails because the key is not unique"""
|
||||
|
@ -5,7 +5,7 @@ from pymongo.errors import OperationFailure
|
||||
|
||||
from bigchaindb.backend import admin
|
||||
from bigchaindb.backend.utils import module_dispatch_registrar
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
from bigchaindb.backend.mongodb.connection import MongoDBConnection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -24,7 +24,7 @@ def add_replicas(connection, replicas):
|
||||
form "hostname:port".
|
||||
|
||||
Raises:
|
||||
DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB
|
||||
OperationError: If the reconfiguration fails due to a MongoDB
|
||||
:exc:`OperationFailure`
|
||||
"""
|
||||
# get current configuration
|
||||
@ -50,7 +50,7 @@ def add_replicas(connection, replicas):
|
||||
try:
|
||||
connection.conn.admin.command('replSetReconfig', conf['config'])
|
||||
except OperationFailure as exc:
|
||||
raise DatabaseOpFailedError(exc.details['errmsg'])
|
||||
raise OperationError(exc.details['errmsg'])
|
||||
|
||||
|
||||
@register_admin(MongoDBConnection)
|
||||
@ -64,7 +64,7 @@ def remove_replicas(connection, replicas):
|
||||
form "hostname:port".
|
||||
|
||||
Raises:
|
||||
DatabaseOpFailedError: If the reconfiguration fails due to a MongoDB
|
||||
OperationError: If the reconfiguration fails due to a MongoDB
|
||||
:exc:`OperationFailure`
|
||||
"""
|
||||
# get the current configuration
|
||||
@ -83,4 +83,4 @@ def remove_replicas(connection, replicas):
|
||||
try:
|
||||
connection.conn.admin.command('replSetReconfig', conf['config'])
|
||||
except OperationFailure as exc:
|
||||
raise DatabaseOpFailedError(exc.details['errmsg'])
|
||||
raise OperationError(exc.details['errmsg'])
|
||||
|
@ -2,12 +2,12 @@ import logging
|
||||
import time
|
||||
|
||||
import pymongo
|
||||
from pymongo import errors
|
||||
|
||||
from bigchaindb import backend
|
||||
from bigchaindb.backend.changefeed import ChangeFeed
|
||||
from bigchaindb.backend.utils import module_dispatch_registrar
|
||||
from bigchaindb.backend.mongodb.connection import MongoDBConnection
|
||||
from bigchaindb.backend.exceptions import BackendError
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -27,12 +27,16 @@ class MongoDBChangeFeed(ChangeFeed):
|
||||
|
||||
while True:
|
||||
try:
|
||||
# XXX: hack to force reconnection. Why? Because the cursor
|
||||
# in `run_changefeed` does not run in the context of a
|
||||
# Connection object, so if the connection is lost we need
|
||||
# to manually reset the connection to None.
|
||||
# See #1154
|
||||
self.connection.connection = None
|
||||
self.run_changefeed()
|
||||
break
|
||||
except (errors.ConnectionFailure, errors.OperationFailure,
|
||||
errors.AutoReconnect,
|
||||
errors.ServerSelectionTimeoutError) as exc:
|
||||
logger.exception(exc)
|
||||
except (BackendError, pymongo.errors.ConnectionFailure):
|
||||
logger.exception('Error connecting to the database, retrying')
|
||||
time.sleep(1)
|
||||
|
||||
def run_changefeed(self):
|
||||
@ -41,17 +45,19 @@ class MongoDBChangeFeed(ChangeFeed):
|
||||
namespace = '{}.{}'.format(dbname, table)
|
||||
# last timestamp in the oplog. We only care for operations happening
|
||||
# in the future.
|
||||
last_ts = self.connection.conn.local.oplog.rs.find()\
|
||||
.sort('$natural', pymongo.DESCENDING).limit(1)\
|
||||
.next()['ts']
|
||||
last_ts = self.connection.run(
|
||||
self.connection.query().local.oplog.rs.find()
|
||||
.sort('$natural', pymongo.DESCENDING).limit(1)
|
||||
.next()['ts'])
|
||||
# tailable cursor. A tailable cursor will remain open even after the
|
||||
# last result was returned. ``TAILABLE_AWAIT`` will block for some
|
||||
# timeout after the last result was returned. If no result is received
|
||||
# in the meantime it will raise a StopIteration excetiption.
|
||||
cursor = self.connection.conn.local.oplog.rs.find(
|
||||
{'ns': namespace, 'ts': {'$gt': last_ts}},
|
||||
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
|
||||
)
|
||||
cursor = self.connection.run(
|
||||
self.connection.query().local.oplog.rs.find(
|
||||
{'ns': namespace, 'ts': {'$gt': last_ts}},
|
||||
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
|
||||
))
|
||||
|
||||
while cursor.alive:
|
||||
try:
|
||||
|
@ -1,27 +1,42 @@
|
||||
import time
|
||||
import logging
|
||||
from itertools import repeat
|
||||
|
||||
from pymongo import MongoClient
|
||||
from pymongo import errors
|
||||
import pymongo
|
||||
|
||||
import bigchaindb
|
||||
from bigchaindb.common import exceptions
|
||||
from bigchaindb.utils import Lazy
|
||||
from bigchaindb.common.exceptions import ConfigurationError
|
||||
from bigchaindb.backend.exceptions import (DuplicateKeyError,
|
||||
OperationError,
|
||||
ConnectionError)
|
||||
from bigchaindb.backend.connection import Connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO: waiting for #1082 to be merged
|
||||
# to move this constants in the configuration.
|
||||
|
||||
CONNECTION_TIMEOUT = 4000 # in milliseconds
|
||||
MAX_RETRIES = 3 # number of tries before giving up, if 0 then try forever
|
||||
|
||||
|
||||
class MongoDBConnection(Connection):
|
||||
|
||||
def __init__(self, host=None, port=None, dbname=None, max_tries=3,
|
||||
replicaset=None, **kwargs):
|
||||
def __init__(self, host=None, port=None, dbname=None,
|
||||
connection_timeout=None, max_tries=None,
|
||||
replicaset=None):
|
||||
"""Create a new Connection instance.
|
||||
|
||||
Args:
|
||||
host (str, optional): the host to connect to.
|
||||
port (int, optional): the port to connect to.
|
||||
dbname (str, optional): the database to use.
|
||||
max_tries (int, optional): how many tries before giving up.
|
||||
connection_timeout (int, optional): the milliseconds to wait
|
||||
until timing out the database connection attempt.
|
||||
max_tries (int, optional): how many tries before giving up,
|
||||
if 0 then try forever.
|
||||
replicaset (str, optional): the name of the replica set to
|
||||
connect to.
|
||||
"""
|
||||
@ -30,7 +45,9 @@ class MongoDBConnection(Connection):
|
||||
self.port = port or bigchaindb.config['database']['port']
|
||||
self.replicaset = replicaset or bigchaindb.config['database']['replicaset']
|
||||
self.dbname = dbname or bigchaindb.config['database']['name']
|
||||
self.max_tries = max_tries
|
||||
self.connection_timeout = connection_timeout if connection_timeout is not None else CONNECTION_TIMEOUT
|
||||
self.max_tries = max_tries if max_tries is not None else MAX_RETRIES
|
||||
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
|
||||
self.connection = None
|
||||
|
||||
@property
|
||||
@ -43,32 +60,80 @@ class MongoDBConnection(Connection):
|
||||
def db(self):
|
||||
return self.conn[self.dbname]
|
||||
|
||||
def _connect(self):
|
||||
# we should only return a connection if the replica set is
|
||||
# initialized. initialize_replica_set will check if the
|
||||
# replica set is initialized else it will initialize it.
|
||||
initialize_replica_set()
|
||||
def query(self):
|
||||
return Lazy()
|
||||
|
||||
for i in range(self.max_tries):
|
||||
def collection(self, name):
|
||||
"""Return a lazy object that can be used to compose a query.
|
||||
|
||||
Args:
|
||||
name (str): the name of the collection to query.
|
||||
"""
|
||||
return self.query()[self.dbname][name]
|
||||
|
||||
def run(self, query):
|
||||
try:
|
||||
try:
|
||||
self.connection = MongoClient(self.host, self.port,
|
||||
replicaset=self.replicaset)
|
||||
except errors.ConnectionFailure:
|
||||
if i + 1 == self.max_tries:
|
||||
raise
|
||||
else:
|
||||
time.sleep(2**i)
|
||||
return query.run(self.conn)
|
||||
except pymongo.errors.AutoReconnect as exc:
|
||||
logger.warning('Lost connection to the database, '
|
||||
'retrying query.')
|
||||
return query.run(self.conn)
|
||||
except pymongo.errors.AutoReconnect as exc:
|
||||
raise ConnectionError from exc
|
||||
except pymongo.errors.DuplicateKeyError as exc:
|
||||
raise DuplicateKeyError from exc
|
||||
except pymongo.errors.OperationFailure as exc:
|
||||
raise OperationError from exc
|
||||
|
||||
def _connect(self):
|
||||
"""Try to connect to the database.
|
||||
|
||||
Raises:
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
for i in self.max_tries_counter:
|
||||
attempt += 1
|
||||
|
||||
try:
|
||||
# we should only return a connection if the replica set is
|
||||
# initialized. initialize_replica_set will check if the
|
||||
# replica set is initialized else it will initialize it.
|
||||
initialize_replica_set(self.host, self.port, self.connection_timeout)
|
||||
|
||||
# FYI: this might raise a `ServerSelectionTimeoutError`,
|
||||
# that is a subclass of `ConnectionFailure`.
|
||||
self.connection = pymongo.MongoClient(self.host,
|
||||
self.port,
|
||||
replicaset=self.replicaset,
|
||||
serverselectiontimeoutms=self.connection_timeout)
|
||||
|
||||
# `initialize_replica_set` might raise `ConnectionFailure` or `OperationFailure`.
|
||||
except (pymongo.errors.ConnectionFailure,
|
||||
pymongo.errors.OperationFailure) as exc:
|
||||
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
|
||||
attempt, self.max_tries if self.max_tries != 0 else '∞',
|
||||
self.host, self.port, self.connection_timeout)
|
||||
if attempt == self.max_tries:
|
||||
logger.critical('Cannot connect to the Database. Giving up.')
|
||||
raise ConnectionError() from exc
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def initialize_replica_set():
|
||||
def initialize_replica_set(host, port, connection_timeout):
|
||||
"""Initialize a replica set. If already initialized skip."""
|
||||
|
||||
# Setup a MongoDB connection
|
||||
# The reason we do this instead of `backend.connect` is that
|
||||
# `backend.connect` will connect you to a replica set but this fails if
|
||||
# you try to connect to a replica set that is not yet initialized
|
||||
conn = MongoClient(host=bigchaindb.config['database']['host'],
|
||||
port=bigchaindb.config['database']['port'])
|
||||
conn = pymongo.MongoClient(host=host,
|
||||
port=port,
|
||||
serverselectiontimeoutms=connection_timeout)
|
||||
_check_replica_set(conn)
|
||||
host = '{}:{}'.format(bigchaindb.config['database']['host'],
|
||||
bigchaindb.config['database']['port'])
|
||||
@ -77,7 +142,7 @@ def initialize_replica_set():
|
||||
|
||||
try:
|
||||
conn.admin.command('replSetInitiate', config)
|
||||
except errors.OperationFailure as exc_info:
|
||||
except pymongo.errors.OperationFailure as exc_info:
|
||||
if exc_info.details['codeName'] == 'AlreadyInitialized':
|
||||
return
|
||||
raise
|
||||
@ -105,17 +170,16 @@ def _check_replica_set(conn):
|
||||
repl_opts = options['parsed']['replication']
|
||||
repl_set_name = repl_opts.get('replSetName', None) or repl_opts['replSet']
|
||||
except KeyError:
|
||||
raise exceptions.ConfigurationError('mongod was not started with'
|
||||
' the replSet option.')
|
||||
raise ConfigurationError('mongod was not started with'
|
||||
' the replSet option.')
|
||||
|
||||
bdb_repl_set_name = bigchaindb.config['database']['replicaset']
|
||||
if repl_set_name != bdb_repl_set_name:
|
||||
raise exceptions.ConfigurationError('The replicaset configuration of '
|
||||
'bigchaindb (`{}`) needs to match '
|
||||
'the replica set name from MongoDB'
|
||||
' (`{}`)'
|
||||
.format(bdb_repl_set_name,
|
||||
repl_set_name))
|
||||
raise ConfigurationError('The replicaset configuration of '
|
||||
'bigchaindb (`{}`) needs to match '
|
||||
'the replica set name from MongoDB'
|
||||
' (`{}`)'.format(bdb_repl_set_name,
|
||||
repl_set_name))
|
||||
|
||||
|
||||
def _wait_for_replica_set_initialization(conn):
|
||||
|
@ -3,11 +3,11 @@
|
||||
from time import time
|
||||
|
||||
from pymongo import ReturnDocument
|
||||
from pymongo import errors
|
||||
|
||||
from bigchaindb import backend
|
||||
from bigchaindb.common.exceptions import CyclicBlockchainError
|
||||
from bigchaindb.common.transaction import Transaction
|
||||
from bigchaindb.backend.exceptions import DuplicateKeyError
|
||||
from bigchaindb.backend.utils import module_dispatch_registrar
|
||||
from bigchaindb.backend.mongodb.connection import MongoDBConnection
|
||||
|
||||
@ -18,8 +18,10 @@ register_query = module_dispatch_registrar(backend.query)
|
||||
@register_query(MongoDBConnection)
|
||||
def write_transaction(conn, signed_transaction):
|
||||
try:
|
||||
return conn.db['backlog'].insert_one(signed_transaction)
|
||||
except errors.DuplicateKeyError:
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.insert_one(signed_transaction))
|
||||
except DuplicateKeyError:
|
||||
return
|
||||
|
||||
|
||||
@ -27,40 +29,49 @@ def write_transaction(conn, signed_transaction):
|
||||
def update_transaction(conn, transaction_id, doc):
|
||||
# with mongodb we need to add update operators to the doc
|
||||
doc = {'$set': doc}
|
||||
return conn.db['backlog']\
|
||||
.find_one_and_update({'id': transaction_id},
|
||||
doc,
|
||||
return_document=ReturnDocument.AFTER)
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.find_one_and_update(
|
||||
{'id': transaction_id},
|
||||
doc,
|
||||
return_document=ReturnDocument.AFTER))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def delete_transaction(conn, *transaction_id):
|
||||
return conn.db['backlog'].delete_many({'id': {'$in': transaction_id}})
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.delete_many({'id': {'$in': transaction_id}}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_stale_transactions(conn, reassign_delay):
|
||||
return conn.db['backlog']\
|
||||
.find({'assignment_timestamp': {'$lt': time() - reassign_delay}},
|
||||
projection={'_id': False})
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.find({'assignment_timestamp': {'$lt': time() - reassign_delay}},
|
||||
projection={'_id': False}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_transaction_from_block(conn, transaction_id, block_id):
|
||||
try:
|
||||
return conn.db['bigchain'].aggregate([
|
||||
{'$match': {'id': block_id}},
|
||||
{'$project': {
|
||||
'block.transactions': {
|
||||
'$filter': {
|
||||
'input': '$block.transactions',
|
||||
'as': 'transaction',
|
||||
'cond': {
|
||||
'$eq': ['$$transaction.id', transaction_id]
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.aggregate([
|
||||
{'$match': {'id': block_id}},
|
||||
{'$project': {
|
||||
'block.transactions': {
|
||||
'$filter': {
|
||||
'input': '$block.transactions',
|
||||
'as': 'transaction',
|
||||
'cond': {
|
||||
'$eq': ['$$transaction.id', transaction_id]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}}]).next()['block']['transactions'].pop()
|
||||
}}])
|
||||
.next()['block']['transactions']
|
||||
.pop())
|
||||
except (StopIteration, IndexError):
|
||||
# StopIteration is raised if the block was not found
|
||||
# IndexError is returned if the block is found but no transactions
|
||||
@ -70,17 +81,20 @@ def get_transaction_from_block(conn, transaction_id, block_id):
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_transaction_from_backlog(conn, transaction_id):
|
||||
return conn.db['backlog']\
|
||||
.find_one({'id': transaction_id},
|
||||
projection={'_id': False, 'assignee': False,
|
||||
'assignment_timestamp': False})
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.find_one({'id': transaction_id},
|
||||
projection={'_id': False,
|
||||
'assignee': False,
|
||||
'assignment_timestamp': False}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_blocks_status_from_transaction(conn, transaction_id):
|
||||
return conn.db['bigchain']\
|
||||
.find({'block.transactions.id': transaction_id},
|
||||
projection=['id', 'block.voters'])
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.find({'block.transactions.id': transaction_id},
|
||||
projection=['id', 'block.voters']))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
@ -107,24 +121,28 @@ def get_txids_filtered(conn, asset_id, operation=None):
|
||||
{'$match': match},
|
||||
{'$project': {'block.transactions.id': True}}
|
||||
]
|
||||
cursor = conn.db['bigchain'].aggregate(pipeline)
|
||||
cursor = conn.run(
|
||||
conn.collection('bigchain')
|
||||
.aggregate(pipeline))
|
||||
return (elem['block']['transactions']['id'] for elem in cursor)
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_asset_by_id(conn, asset_id):
|
||||
cursor = conn.db['bigchain'].aggregate([
|
||||
{'$match': {
|
||||
'block.transactions.id': asset_id,
|
||||
'block.transactions.operation': 'CREATE'
|
||||
}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {
|
||||
'block.transactions.id': asset_id,
|
||||
'block.transactions.operation': 'CREATE'
|
||||
}},
|
||||
{'$project': {'block.transactions.asset': True}}
|
||||
])
|
||||
cursor = conn.run(
|
||||
conn.collection('bigchain')
|
||||
.aggregate([
|
||||
{'$match': {
|
||||
'block.transactions.id': asset_id,
|
||||
'block.transactions.operation': 'CREATE'
|
||||
}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {
|
||||
'block.transactions.id': asset_id,
|
||||
'block.transactions.operation': 'CREATE'
|
||||
}},
|
||||
{'$project': {'block.transactions.asset': True}}
|
||||
]))
|
||||
# we need to access some nested fields before returning so lets use a
|
||||
# generator to avoid having to read all records on the cursor at this point
|
||||
return (elem['block']['transactions'] for elem in cursor)
|
||||
@ -132,17 +150,18 @@ def get_asset_by_id(conn, asset_id):
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_spent(conn, transaction_id, output):
|
||||
cursor = conn.db['bigchain'].aggregate([
|
||||
{'$match': {
|
||||
'block.transactions.inputs.fulfills.txid': transaction_id,
|
||||
'block.transactions.inputs.fulfills.output': output
|
||||
}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {
|
||||
'block.transactions.inputs.fulfills.txid': transaction_id,
|
||||
'block.transactions.inputs.fulfills.output': output
|
||||
}}
|
||||
])
|
||||
cursor = conn.run(
|
||||
conn.collection('bigchain').aggregate([
|
||||
{'$match': {
|
||||
'block.transactions.inputs.fulfills.txid': transaction_id,
|
||||
'block.transactions.inputs.fulfills.output': output
|
||||
}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {
|
||||
'block.transactions.inputs.fulfills.txid': transaction_id,
|
||||
'block.transactions.inputs.fulfills.output': output
|
||||
}}
|
||||
]))
|
||||
# we need to access some nested fields before returning so lets use a
|
||||
# generator to avoid having to read all records on the cursor at this point
|
||||
return (elem['block']['transactions'] for elem in cursor)
|
||||
@ -150,11 +169,12 @@ def get_spent(conn, transaction_id, output):
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_owned_ids(conn, owner):
|
||||
cursor = conn.db['bigchain'].aggregate([
|
||||
{'$match': {'block.transactions.outputs.public_keys': owner}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {'block.transactions.outputs.public_keys': owner}}
|
||||
])
|
||||
cursor = conn.run(
|
||||
conn.collection('bigchain').aggregate([
|
||||
{'$match': {'block.transactions.outputs.public_keys': owner}},
|
||||
{'$unwind': '$block.transactions'},
|
||||
{'$match': {'block.transactions.outputs.public_keys': owner}}
|
||||
]))
|
||||
# we need to access some nested fields before returning so lets use a
|
||||
# generator to avoid having to read all records on the cursor at this point
|
||||
return (elem['block']['transactions'] for elem in cursor)
|
||||
@ -162,66 +182,80 @@ def get_owned_ids(conn, owner):
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_votes_by_block_id(conn, block_id):
|
||||
return conn.db['votes']\
|
||||
.find({'vote.voting_for_block': block_id},
|
||||
projection={'_id': False})
|
||||
return conn.run(
|
||||
conn.collection('votes')
|
||||
.find({'vote.voting_for_block': block_id},
|
||||
projection={'_id': False}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_votes_by_block_id_and_voter(conn, block_id, node_pubkey):
|
||||
return conn.db['votes']\
|
||||
.find({'vote.voting_for_block': block_id,
|
||||
'node_pubkey': node_pubkey},
|
||||
projection={'_id': False})
|
||||
return conn.run(
|
||||
conn.collection('votes')
|
||||
.find({'vote.voting_for_block': block_id,
|
||||
'node_pubkey': node_pubkey},
|
||||
projection={'_id': False}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def write_block(conn, block):
|
||||
return conn.db['bigchain'].insert_one(block.to_dict())
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.insert_one(block.to_dict()))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_block(conn, block_id):
|
||||
return conn.db['bigchain'].find_one({'id': block_id},
|
||||
projection={'_id': False})
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.find_one({'id': block_id},
|
||||
projection={'_id': False}))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def has_transaction(conn, transaction_id):
|
||||
return bool(conn.db['bigchain']
|
||||
.find_one({'block.transactions.id': transaction_id}))
|
||||
return bool(conn.run(
|
||||
conn.collection('bigchain')
|
||||
.find_one({'block.transactions.id': transaction_id})))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def count_blocks(conn):
|
||||
return conn.db['bigchain'].count()
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.count())
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def count_backlog(conn):
|
||||
return conn.db['backlog'].count()
|
||||
return conn.run(
|
||||
conn.collection('backlog')
|
||||
.count())
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def write_vote(conn, vote):
|
||||
conn.db['votes'].insert_one(vote)
|
||||
conn.run(conn.collection('votes').insert_one(vote))
|
||||
vote.pop('_id')
|
||||
return vote
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_genesis_block(conn):
|
||||
return conn.db['bigchain'].find_one(
|
||||
{'block.transactions.0.operation': 'GENESIS'},
|
||||
{'_id': False}
|
||||
)
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.find_one(
|
||||
{'block.transactions.0.operation': 'GENESIS'},
|
||||
{'_id': False}
|
||||
))
|
||||
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_last_voted_block(conn, node_pubkey):
|
||||
last_voted = conn.db['votes']\
|
||||
.find({'node_pubkey': node_pubkey},
|
||||
sort=[('vote.timestamp', -1)])
|
||||
last_voted = conn.run(
|
||||
conn.collection('votes')
|
||||
.find({'node_pubkey': node_pubkey},
|
||||
sort=[('vote.timestamp', -1)]))
|
||||
|
||||
# pymongo seems to return a cursor even if there are no results
|
||||
# so we actually need to check the count
|
||||
@ -249,18 +283,20 @@ def get_last_voted_block(conn, node_pubkey):
|
||||
|
||||
@register_query(MongoDBConnection)
|
||||
def get_unvoted_blocks(conn, node_pubkey):
|
||||
return conn.db['bigchain'].aggregate([
|
||||
{'$lookup': {
|
||||
'from': 'votes',
|
||||
'localField': 'id',
|
||||
'foreignField': 'vote.voting_for_block',
|
||||
'as': 'votes'
|
||||
}},
|
||||
{'$match': {
|
||||
'votes.node_pubkey': {'$ne': node_pubkey},
|
||||
'block.transactions.operation': {'$ne': 'GENESIS'}
|
||||
}},
|
||||
{'$project': {
|
||||
'votes': False, '_id': False
|
||||
}}
|
||||
])
|
||||
return conn.run(
|
||||
conn.collection('bigchain')
|
||||
.aggregate([
|
||||
{'$lookup': {
|
||||
'from': 'votes',
|
||||
'localField': 'id',
|
||||
'foreignField': 'vote.voting_for_block',
|
||||
'as': 'votes'
|
||||
}},
|
||||
{'$match': {
|
||||
'votes.node_pubkey': {'$ne': node_pubkey},
|
||||
'block.transactions.operation': {'$ne': 'GENESIS'}
|
||||
}},
|
||||
{'$project': {
|
||||
'votes': False, '_id': False
|
||||
}}
|
||||
]))
|
||||
|
@ -5,7 +5,7 @@ import rethinkdb as r
|
||||
|
||||
from bigchaindb.backend import admin
|
||||
from bigchaindb.backend.schema import TABLES
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
from bigchaindb.backend.utils import module_dispatch_registrar
|
||||
from bigchaindb.backend.rethinkdb.connection import RethinkDBConnection
|
||||
|
||||
@ -78,7 +78,7 @@ def reconfigure(connection, *, table, shards, replicas,
|
||||
<https://rethinkdb.com/api/python/reconfigure/>`_.
|
||||
|
||||
Raises:
|
||||
DatabaseOpFailedError: If the reconfiguration fails due to a
|
||||
OperationError: If the reconfiguration fails due to a
|
||||
RethinkDB :exc:`ReqlOpFailedError` or
|
||||
:exc:`ReqlQueryLogicError`.
|
||||
|
||||
@ -96,7 +96,7 @@ def reconfigure(connection, *, table, shards, replicas,
|
||||
try:
|
||||
return connection.run(r.table(table).reconfigure(**params))
|
||||
except (r.ReqlOpFailedError, r.ReqlQueryLogicError) as e:
|
||||
raise DatabaseOpFailedError from e
|
||||
raise OperationError from e
|
||||
|
||||
|
||||
@register_admin(RethinkDBConnection)
|
||||
|
@ -24,7 +24,7 @@ from bigchaindb import backend
|
||||
from bigchaindb.backend import schema
|
||||
from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas,
|
||||
remove_replicas)
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
from bigchaindb.commands import utils
|
||||
from bigchaindb import processes
|
||||
|
||||
@ -247,7 +247,7 @@ def run_set_shards(args):
|
||||
conn = backend.connect()
|
||||
try:
|
||||
set_shards(conn, shards=args.num_shards)
|
||||
except DatabaseOpFailedError as e:
|
||||
except OperationError as e:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
@ -255,7 +255,7 @@ def run_set_replicas(args):
|
||||
conn = backend.connect()
|
||||
try:
|
||||
set_replicas(conn, replicas=args.num_replicas)
|
||||
except DatabaseOpFailedError as e:
|
||||
except OperationError as e:
|
||||
logger.warn(e)
|
||||
|
||||
|
||||
@ -266,7 +266,7 @@ def run_add_replicas(args):
|
||||
|
||||
try:
|
||||
add_replicas(conn, args.replicas)
|
||||
except (DatabaseOpFailedError, NotImplementedError) as e:
|
||||
except (OperationError, NotImplementedError) as e:
|
||||
logger.warn(e)
|
||||
else:
|
||||
logger.info('Added {} to the replicaset.'.format(args.replicas))
|
||||
@ -279,7 +279,7 @@ def run_remove_replicas(args):
|
||||
|
||||
try:
|
||||
remove_replicas(conn, args.replicas)
|
||||
except (DatabaseOpFailedError, NotImplementedError) as e:
|
||||
except (OperationError, NotImplementedError) as e:
|
||||
logger.warn(e)
|
||||
else:
|
||||
logger.info('Removed {} from the replicaset.'.format(args.replicas))
|
||||
|
@ -157,3 +157,49 @@ def is_genesis_block(block):
|
||||
return block.transactions[0].operation == 'GENESIS'
|
||||
except AttributeError:
|
||||
return block['block']['transactions'][0]['operation'] == 'GENESIS'
|
||||
|
||||
|
||||
class Lazy:
|
||||
"""Lazy objects are useful to create chains of methods to
|
||||
execute later.
|
||||
|
||||
A lazy object records the methods that has been called, and
|
||||
replay them when the :py:meth:`run` method is called. Note that
|
||||
:py:meth:`run` needs an object `instance` to replay all the
|
||||
methods that have been recorded.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Instantiate a new Lazy object."""
|
||||
self.stack = []
|
||||
|
||||
def __getattr__(self, name):
|
||||
self.stack.append(name)
|
||||
return self
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.stack.append((args, kwargs))
|
||||
return self
|
||||
|
||||
def __getitem__(self, key):
|
||||
self.stack.append('__getitem__')
|
||||
self.stack.append(([key], {}))
|
||||
return self
|
||||
|
||||
def run(self, instance):
|
||||
"""Run the recorded chain of methods on `instance`.
|
||||
|
||||
Args:
|
||||
instance: an object.
|
||||
"""
|
||||
|
||||
last = instance
|
||||
|
||||
for item in self.stack:
|
||||
if isinstance(item, str):
|
||||
last = getattr(last, item)
|
||||
else:
|
||||
last = last(*item[0], **item[1])
|
||||
|
||||
self.stack = []
|
||||
return last
|
||||
|
@ -64,14 +64,14 @@ def test_add_replicas(mock_replicaset_config, connection):
|
||||
|
||||
def test_add_replicas_raises(mock_replicaset_config, connection):
|
||||
from bigchaindb.backend.admin import add_replicas
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
|
||||
with mock.patch.object(Database, 'command') as mock_command:
|
||||
mock_command.side_effect = [
|
||||
mock_replicaset_config,
|
||||
OperationFailure(error=1, details={'errmsg': ''})
|
||||
]
|
||||
with pytest.raises(DatabaseOpFailedError):
|
||||
with pytest.raises(OperationError):
|
||||
add_replicas(connection, ['localhost:27018'])
|
||||
|
||||
|
||||
@ -97,12 +97,12 @@ def test_remove_replicas(mock_replicaset_config, connection):
|
||||
|
||||
def test_remove_replicas_raises(mock_replicaset_config, connection):
|
||||
from bigchaindb.backend.admin import remove_replicas
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
|
||||
with mock.patch.object(Database, 'command') as mock_command:
|
||||
mock_command.side_effect = [
|
||||
mock_replicaset_config,
|
||||
OperationFailure(error=1, details={'errmsg': ''})
|
||||
]
|
||||
with pytest.raises(DatabaseOpFailedError):
|
||||
with pytest.raises(OperationError):
|
||||
remove_replicas(connection, ['localhost:27018'])
|
||||
|
@ -1,7 +1,6 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from pymongo.errors import ConnectionFailure
|
||||
|
||||
from multipipes import Pipe
|
||||
|
||||
@ -151,15 +150,15 @@ def test_changefeed_prefeed(mock_cursor_next, mock_cursor_alive,
|
||||
|
||||
|
||||
@pytest.mark.bdb
|
||||
@mock.patch('pymongo.cursor.Cursor.alive', new_callable=mock.PropertyMock)
|
||||
@mock.patch('bigchaindb.backend.mongodb.changefeed.MongoDBChangeFeed.run_changefeed') # noqa
|
||||
def test_connection_failure(mock_run_changefeed, mock_cursor_alive):
|
||||
def test_connection_failure(mock_run_changefeed):
|
||||
from bigchaindb.backend import get_changefeed, connect
|
||||
from bigchaindb.backend.exceptions import ConnectionError
|
||||
from bigchaindb.backend.changefeed import ChangeFeed
|
||||
|
||||
conn = connect()
|
||||
mock_cursor_alive.return_value = False
|
||||
mock_run_changefeed.side_effect = [ConnectionFailure(), mock.DEFAULT]
|
||||
mock_run_changefeed.side_effect = [ConnectionError(),
|
||||
mock.DEFAULT]
|
||||
|
||||
changefeed = get_changefeed(conn, 'backlog', ChangeFeed.INSERT)
|
||||
changefeed.run_forever()
|
||||
|
@ -1,9 +1,9 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import pymongo
|
||||
from pymongo import MongoClient
|
||||
from pymongo.database import Database
|
||||
from pymongo.errors import ConnectionFailure, OperationFailure
|
||||
|
||||
|
||||
pytestmark = pytest.mark.bdb
|
||||
@ -56,19 +56,49 @@ def test_get_connection_returns_the_correct_instance():
|
||||
@mock.patch('time.sleep')
|
||||
def test_connection_error(mock_sleep, mock_client, mock_init_repl_set):
|
||||
from bigchaindb.backend import connect
|
||||
from bigchaindb.backend.exceptions import ConnectionError
|
||||
|
||||
# force the driver to trow ConnectionFailure
|
||||
# force the driver to throw ConnectionFailure
|
||||
# the mock on time.sleep is to prevent the actual sleep when running
|
||||
# the tests
|
||||
mock_client.side_effect = ConnectionFailure()
|
||||
mock_client.side_effect = pymongo.errors.ConnectionFailure()
|
||||
|
||||
with pytest.raises(ConnectionFailure):
|
||||
with pytest.raises(ConnectionError):
|
||||
conn = connect()
|
||||
conn.db
|
||||
|
||||
assert mock_client.call_count == 3
|
||||
|
||||
|
||||
@mock.patch('bigchaindb.backend.mongodb.connection.initialize_replica_set')
|
||||
@mock.patch('pymongo.MongoClient')
|
||||
def test_connection_run_errors(mock_client, mock_init_repl_set):
|
||||
from bigchaindb.backend import connect
|
||||
from bigchaindb.backend.exceptions import (DuplicateKeyError,
|
||||
OperationError,
|
||||
ConnectionError)
|
||||
|
||||
conn = connect()
|
||||
|
||||
query = mock.Mock()
|
||||
query.run.side_effect = pymongo.errors.AutoReconnect('foo')
|
||||
with pytest.raises(ConnectionError):
|
||||
conn.run(query)
|
||||
assert query.run.call_count == 2
|
||||
|
||||
query = mock.Mock()
|
||||
query.run.side_effect = pymongo.errors.DuplicateKeyError('foo')
|
||||
with pytest.raises(DuplicateKeyError):
|
||||
conn.run(query)
|
||||
assert query.run.call_count == 1
|
||||
|
||||
query = mock.Mock()
|
||||
query.run.side_effect = pymongo.errors.OperationFailure('foo')
|
||||
with pytest.raises(OperationError):
|
||||
conn.run(query)
|
||||
assert query.run.call_count == 1
|
||||
|
||||
|
||||
def test_check_replica_set_not_enabled(mongodb_connection):
|
||||
from bigchaindb.backend.mongodb.connection import _check_replica_set
|
||||
from bigchaindb.common.exceptions import ConfigurationError
|
||||
@ -138,14 +168,14 @@ def test_initialize_replica_set(mock_cmd_line_opts):
|
||||
]
|
||||
|
||||
# check that it returns
|
||||
assert initialize_replica_set() is None
|
||||
assert initialize_replica_set('host', 1337, 1000) is None
|
||||
|
||||
# test it raises OperationError if anything wrong
|
||||
with mock.patch.object(Database, 'command') as mock_command:
|
||||
mock_command.side_effect = [
|
||||
mock_cmd_line_opts,
|
||||
OperationFailure(None, details={'codeName': ''})
|
||||
pymongo.errors.OperationFailure(None, details={'codeName': ''})
|
||||
]
|
||||
|
||||
with pytest.raises(OperationFailure):
|
||||
initialize_replica_set()
|
||||
with pytest.raises(pymongo.errors.OperationFailure):
|
||||
initialize_replica_set('host', 1337, 1000)
|
||||
|
@ -4,6 +4,7 @@ from unittest.mock import MagicMock
|
||||
pytestmark = pytest.mark.bdb
|
||||
|
||||
|
||||
@pytest.mark.skipif(reason='Will be handled by #1126')
|
||||
def test_asset_id_index():
|
||||
from bigchaindb.backend.mongodb.query import get_txids_filtered
|
||||
from bigchaindb.backend import connect
|
||||
|
@ -177,8 +177,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn,
|
||||
db_name,
|
||||
db_conn):
|
||||
from bigchaindb.backend.rethinkdb.admin import reconfigure
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
with pytest.raises(DatabaseOpFailedError) as exc:
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
with pytest.raises(OperationError) as exc:
|
||||
reconfigure(db_conn, table='backlog', shards=1,
|
||||
replicas={'default': 1}, primary_replica_tag='default')
|
||||
assert isinstance(exc.value.__cause__, r.ReqlQueryLogicError)
|
||||
@ -187,8 +187,8 @@ def test_reconfigure_replicas_without_nonvoting_replica_tags(rdb_conn,
|
||||
@pytest.mark.bdb
|
||||
def test_reconfigure_too_many_replicas(rdb_conn, db_name, db_conn):
|
||||
from bigchaindb.backend.rethinkdb.admin import reconfigure
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
replicas = _count_rethinkdb_servers() + 1
|
||||
with pytest.raises(DatabaseOpFailedError) as exc:
|
||||
with pytest.raises(OperationError) as exc:
|
||||
reconfigure(db_conn, table='backlog', shards=1, replicas=replicas)
|
||||
assert isinstance(exc.value.__cause__, r.ReqlOpFailedError)
|
||||
|
@ -384,7 +384,7 @@ def test_calling_main(start_mock, base_parser_mock, parse_args_mock,
|
||||
@patch('bigchaindb.commands.bigchain.add_replicas')
|
||||
def test_run_add_replicas(mock_add_replicas):
|
||||
from bigchaindb.commands.bigchain import run_add_replicas
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
|
||||
args = Namespace(config=None, replicas=['localhost:27017'])
|
||||
|
||||
@ -394,8 +394,8 @@ def test_run_add_replicas(mock_add_replicas):
|
||||
assert mock_add_replicas.call_count == 1
|
||||
mock_add_replicas.reset_mock()
|
||||
|
||||
# test add_replicas with `DatabaseOpFailedError`
|
||||
mock_add_replicas.side_effect = DatabaseOpFailedError()
|
||||
# test add_replicas with `OperationError`
|
||||
mock_add_replicas.side_effect = OperationError()
|
||||
assert run_add_replicas(args) is None
|
||||
assert mock_add_replicas.call_count == 1
|
||||
mock_add_replicas.reset_mock()
|
||||
@ -411,7 +411,7 @@ def test_run_add_replicas(mock_add_replicas):
|
||||
@patch('bigchaindb.commands.bigchain.remove_replicas')
|
||||
def test_run_remove_replicas(mock_remove_replicas):
|
||||
from bigchaindb.commands.bigchain import run_remove_replicas
|
||||
from bigchaindb.backend.exceptions import DatabaseOpFailedError
|
||||
from bigchaindb.backend.exceptions import OperationError
|
||||
|
||||
args = Namespace(config=None, replicas=['localhost:27017'])
|
||||
|
||||
@ -421,8 +421,8 @@ def test_run_remove_replicas(mock_remove_replicas):
|
||||
assert mock_remove_replicas.call_count == 1
|
||||
mock_remove_replicas.reset_mock()
|
||||
|
||||
# test add_replicas with `DatabaseOpFailedError`
|
||||
mock_remove_replicas.side_effect = DatabaseOpFailedError()
|
||||
# test add_replicas with `OperationError`
|
||||
mock_remove_replicas.side_effect = OperationError()
|
||||
assert run_remove_replicas(args) is None
|
||||
assert mock_remove_replicas.call_count == 1
|
||||
mock_remove_replicas.reset_mock()
|
||||
|
@ -137,3 +137,23 @@ def test_is_genesis_block_returns_true_if_genesis(b):
|
||||
from bigchaindb.utils import is_genesis_block
|
||||
genesis_block = b.prepare_genesis_block()
|
||||
assert is_genesis_block(genesis_block)
|
||||
|
||||
|
||||
def test_lazy_execution():
|
||||
from bigchaindb.utils import Lazy
|
||||
|
||||
l = Lazy()
|
||||
l.split(',')[1].split(' ').pop(1).strip()
|
||||
result = l.run('Like humans, cats tend to favor one paw over another')
|
||||
assert result == 'cats'
|
||||
|
||||
class Cat:
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
cat = Cat('Shmui')
|
||||
|
||||
l = Lazy()
|
||||
l.name.upper()
|
||||
result = l.run(cat)
|
||||
assert result == 'SHMUI'
|
||||
|
Loading…
x
Reference in New Issue
Block a user