Add query and aggregation features.

This commit is contained in:
maoudia 2020-05-04 18:01:26 -04:00
parent e962236ba3
commit ad9f27d0d3
No known key found for this signature in database
GPG Key ID: 064B92C4B20397C6
8 changed files with 434 additions and 2 deletions

View File

@ -137,6 +137,23 @@ def get_txids_filtered(conn, asset_id, operation=None, last_tx=None):
return (elem['id'] for elem in cursor)
@register_query(LocalMongoDBConnection)
def query(conn, json_query, *, limit=0, table='assets'):
cursor = conn.run(
conn.collection(table)
.find(json_query).limit(limit))
return (obj for obj in cursor)
@register_query(LocalMongoDBConnection)
def aggregate(conn, aggregation_functions, *, table='assets'):
cursor = conn.run(
conn.collection(table)
.aggregate(aggregation_functions))
return (obj for obj in cursor)
@register_query(LocalMongoDBConnection)
def text_search(conn, search, *, language='english', case_sensitive=False,
diacritic_sensitive=False, text_score=False, limit=0, table='assets'):

View File

@ -214,6 +214,48 @@ def get_txids_filtered(connection, asset_id, operation=None):
raise NotImplementedError
@singledispatch
def query(conn, json_query, *, limit=0, table='assets'):
"""Return all the assets that match the text search.
The results are sorted by text score.
For more information about the behavior of text search on MongoDB see
https://docs.mongodb.com/manual/reference/operator/query/text/#behavior
Args:
json_query (str): A MongoDB json search query.
limit (int, optional): Limit the number of returned documents.
Returns:
:obj:`list` of :obj:`dict`: a list of assets
Raises:
OperationError: If the backend does not support json queries.
"""
raise OperationError('This query is only supported when running '
'BigchainDB with MongoDB as the backend.')
@singledispatch
def aggregate(conn, aggregation_functions, *, limit=0, table='assets'):
"""Return an iterator of aggregation results.
Args:
aggregation_functions (str): A MongoDB list of aggregation functions
in json format.
limit (int, optional): Limit the number of returned documents.
Returns:
iter: An iterator of aggregation results.
Raises:
OperationError: If the backend does not support aggregations.
"""
raise OperationError('This query is only supported when running '
'BigchainDB with MongoDB as the backend.')
@singledispatch
def text_search(conn, search, *, language='english', case_sensitive=False,
diacritic_sensitive=False, text_score=False, limit=0, table=None):

View File

@ -398,6 +398,33 @@ class BigchainDB(object):
logger.warning('Invalid transaction (%s): %s', type(e).__name__, e)
return False
def query(self, json_query, *, limit=0, table='assets'):
"""Return an iterator of assets that match the json query
Args:
json_query (str): A MongoDB json search query.
limit (int, optional): Limit the number of returned documents.
Returns:
iter: An iterator of assets that match the query.
"""
return backend.query.query(self.connection, json_query,
limit=limit, table=table)
def aggregate(self, aggregation_functions, *, table='assets'):
"""Return an iterator of aggregation results.
Args:
aggregation_functions (str): A MongoDB list of aggregation functions
in json format.
limit (int, optional): Limit the number of returned documents.
Returns:
iter: An iterator of aggregation results.
"""
return backend.query.aggregate(self.connection, aggregation_functions,
table=table)
def text_search(self, search, *, limit=0, table='assets'):
"""Return an iterator of assets that match the text search

View File

@ -38,6 +38,8 @@ ROUTES_API_V1 = [
r('transactions/<string:tx_id>', tx.TransactionApi),
r('transactions', tx.TransactionListApi),
r('outputs/', outputs.OutputListApi),
r('query-assets/', assets.AssetQueryApi),
r('aggregate-assets/', assets.AssetAggregateApi),
r('validators/', validators.ValidatorsApi),
]

View File

@ -7,10 +7,12 @@
For more information please refer to the documentation: http://bigchaindb.com/http-api
"""
import json
from json import JSONEncoder
import logging
from bson import ObjectId
from flask_restful import reqparse, Resource
from flask import current_app
from flask import current_app, request, jsonify
from bigchaindb.backend.exceptions import OperationError
from bigchaindb.web.views.base import make_error
@ -18,6 +20,13 @@ from bigchaindb.web.views.base import make_error
logger = logging.getLogger(__name__)
class ResultsJSONEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, ObjectId):
return str(o)
return json.JSONEncoder.default(self, o)
class AssetListApi(Resource):
def get(self):
"""API endpoint to perform a text search on the assets.
@ -53,3 +62,87 @@ class AssetListApi(Resource):
400,
'({}): {}'.format(type(e).__name__, e)
)
class AssetQueryApi(Resource):
def get(self):
"""API endpoint to perform a json query on the assets.
Args:
json_query (str): A MongoDB json search query.
limit (int, optional): Limit the number of returned documents.
Return:
A list of assets that match the query.
"""
parser = reqparse.RequestParser()
parser.add_argument('query', type=str)
parser.add_argument('limit', type=int)
args = parser.parse_args()
if not args['limit']:
# if the limit is not specified do not pass None to `text_search`
del args['limit']
if not args['query']:
return make_error(
400,
'No query specified'
)
query = json.loads(args['query'])
del args['query']
print(f"QUERY:{query}")
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
assets = list(bigchain.query(json_query=query, **args))
try:
return json.dumps(assets, cls=ResultsJSONEncoder)
except OperationError as e:
return make_error(
400,
'({}): {}'.format(type(e).__name__, e)
)
class AssetAggregateApi(Resource):
def get(self):
"""API endpoint to perform a json query on the assets.
Args:
json_query (str): A MongoDB json search query.
limit (int, optional): Limit the number of returned documents.
Return:
A list of assets that match the query.
"""
parser = reqparse.RequestParser()
parser.add_argument('aggregation_functions', type=str)
args = parser.parse_args()
if not args['aggregation_functions']:
return make_error(
400,
'No aggregation_functions specified'
)
aggregation_functions = json.loads(args['aggregation_functions'])
if 'function_list' not in aggregation_functions.keys():
return make_error(
400,
'Invalid json format'
)
function_list = aggregation_functions['function_list']
del args['aggregation_functions']
pool = current_app.config['bigchain_pool']
with pool() as bigchain:
results = list(bigchain.aggregate(
aggregation_functions=function_list,
**args))
try:
return json.dumps({"results": results}, cls=ResultsJSONEncoder)
except OperationError as e:
return make_error(
400,
'({}): {}'.format(type(e).__name__, e)
)

View File

@ -78,6 +78,111 @@ class TestBigchainApi(object):
assets = list(b.text_search('bigchaindb'))
assert len(assets) == 3
def test_query(self, b, alice):
from bigchaindb.models import Transaction
# define the assets
asset1 = {'msg': 'BigchainDB 1', 'complex_key': {'complex_sub_key': 'value_1'}}
asset2 = {'msg': 'BigchainDB 2', 'complex_key': {'complex_sub_key': 'value_2'}}
asset3 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_3'}}
# create the transactions
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
tx3 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset3).sign([alice.private_key])
# write the transactions to the DB
b.store_bulk_transactions([tx1, tx2, tx3])
# get the assets through text search
assets = list(b.query(json_query={
'data.complex_key.complex_sub_key': {
'$in': ['value_3', 'value_2', 'value_1']}}))
assert len(assets) == 3
def test_query_limit(self, b, alice):
from bigchaindb.models import Transaction
# define the assets
asset1 = {'msg': 'BigchainDB 1', 'complex_key': {'complex_sub_key': 'value_1'}}
asset2 = {'msg': 'BigchainDB 2', 'complex_key': {'complex_sub_key': 'value_2'}}
asset3 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_3'}}
# create the transactions
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
tx3 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset3).sign([alice.private_key])
# write the transactions to the DB
b.store_bulk_transactions([tx1, tx2, tx3])
# get the assets through text search
assets = list(b.query(json_query={
'data.complex_key.complex_sub_key': {
'$in': ['value_3', 'value_2', 'value_1']}},
limit=2))
assert len(assets) == 2
def test_query_not_found(self, b, alice):
from bigchaindb.models import Transaction
# define the assets
asset1 = {'msg': 'BigchainDB 1', 'complex_key': {'complex_sub_key': 'value_1'}}
asset2 = {'msg': 'BigchainDB 2', 'complex_key': {'complex_sub_key': 'value_2'}}
asset3 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_3'}}
# create the transactions
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
tx3 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset3).sign([alice.private_key])
# write the transactions to the DB
b.store_bulk_transactions([tx1, tx2, tx3])
# get the assets through text search
assets = list(b.query(json_query={'data.complex_key.complex_sub_key':'value_4'}))
assert len(assets) == 0
def test_aggregate(self, b, alice):
from bigchaindb.models import Transaction
# define the assets
asset1 = {'msg': 'BigchainDB 1', 'complex_key': {'complex_sub_key': 'value_1', 'aggregation_key': 'ak_1'}}
asset2 = {'msg': 'BigchainDB 2', 'complex_key': {'complex_sub_key': 'value_2', 'aggregation_key': 'ak_1'}}
asset3 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_3', 'aggregation_key': 'ak_2'}}
asset4 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_4', 'aggregation_key': 'ak_3'}}
asset5 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_5', 'aggregation_key': 'ak_3'}}
asset6 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_6', 'aggregation_key': 'ak_3'}}
# create the transactions
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
tx3 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset3).sign([alice.private_key])
tx4 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset4).sign([alice.private_key])
tx5 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset5).sign([alice.private_key])
tx6 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset6).sign([alice.private_key])
# write the transactions to the DB
b.store_bulk_transactions([tx1, tx2, tx3, tx4, tx5, tx6])
# get the assets through text search
aggregation_query = [{'$group': {'_id': "$data.complex_key.aggregation_key", 'count': {'$sum': 1}}}]
assets = list(b.aggregate(aggregation_functions=aggregation_query))
assert assets == [{'_id': 'ak_3', 'count': 3},
{'_id': 'ak_2', 'count': 1},
{'_id': 'ak_1', 'count': 2}]
@pytest.mark.usefixtures('inputs')
def test_non_create_input_not_found(self, b, user_pk):
from cryptoconditions import Ed25519Sha256

View File

@ -0,0 +1,54 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import pytest
import json
import urllib.parse
AGGREGATE_ASSETS_ENDPOINT = '/api/v1/aggregate-assets/'
def insert_aggregation_assets(b, alice):
from bigchaindb.models import Transaction
asset1 = {'msg': 'BigchainDB 1', 'complex_key': {'complex_sub_key': 'value_1', 'aggregation_key': 'ak_1'}}
asset2 = {'msg': 'BigchainDB 2', 'complex_key': {'complex_sub_key': 'value_2', 'aggregation_key': 'ak_1'}}
asset3 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_3', 'aggregation_key': 'ak_2'}}
asset4 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_4', 'aggregation_key': 'ak_3'}}
asset5 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_5', 'aggregation_key': 'ak_3'}}
asset6 = {'msg': 'BigchainDB 3', 'complex_key': {'complex_sub_key': 'value_6', 'aggregation_key': 'ak_3'}}
# create the transactions
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
tx3 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset3).sign([alice.private_key])
tx4 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset4).sign([alice.private_key])
tx5 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset5).sign([alice.private_key])
tx6 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset6).sign([alice.private_key])
# write the transactions to the DB
b.store_bulk_transactions([tx1, tx2, tx3, tx4, tx5, tx6])
@pytest.mark.bdb
def test_aggregate_assets_with_query(client, b, alice):
insert_aggregation_assets(b, alice)
aggregation_functions = {'function_list': [{'$group': {'_id': "$data.complex_key.aggregation_key", 'count': {'$sum': 1}}}]}
res = client.get(AGGREGATE_ASSETS_ENDPOINT + f"?aggregation_functions={urllib.parse.quote(json.dumps(aggregation_functions))}")
assert res.json == json.dumps({"results": [{'_id': 'ak_3', 'count': 3},
{'_id': 'ak_2', 'count': 1},
{'_id': 'ak_1', 'count': 2}]})
assert res.status_code == 200
@pytest.mark.bdb
def test_aggregate_assets_with_invalid_query(client, b, alice):
insert_aggregation_assets(b, alice)
aggregation_functions = {'something': [{'$group': {'_id': "$data.complex_key.aggregation_key", 'count': {'$sum': 1}}}]}
res = client.get(AGGREGATE_ASSETS_ENDPOINT + f"?aggregation_functions={urllib.parse.quote(json.dumps(aggregation_functions))}")
assert res.status_code == 400

View File

@ -0,0 +1,92 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import pytest
import json
import urllib.parse
QUERY_ASSETS_ENDPOINT = '/api/v1/query-assets/'
@pytest.mark.bdb
def test_query_assets_with_query(client, b, alice):
from bigchaindb.models import Transaction
# create two assets
asset1 = {'msg': 'abc 1'}
asset2 = {'msg': 'abc 2'}
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
b.store_bulk_transactions([tx1])
b.store_bulk_transactions([tx2])
res = client.get(QUERY_ASSETS_ENDPOINT + f"?query={urllib.parse.quote(json.dumps({'data.msg': 'abc 1'}))}")
print(res)
assert json.loads(res.json)[0]["data"] == {'msg': 'abc 1'}
assert res.status_code == 200
@pytest.mark.bdb
def test_query_assets_with_empty_query(client, b, alice):
from bigchaindb.models import Transaction
# create two assets
asset1 = {'msg': 'abc 1'}
asset2 = {'msg': 'abc 2'}
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
b.store_bulk_transactions([tx1])
b.store_bulk_transactions([tx2])
res = client.get(QUERY_ASSETS_ENDPOINT + f"?query={urllib.parse.quote(json.dumps({}))}")
print(res)
assert json.loads(res.json)[0]["data"] == {'msg': 'abc 1'}
assert json.loads(res.json)[1]["data"] == {'msg': 'abc 2'}
assert res.status_code == 200
@pytest.mark.bdb
def test_query_assets_with_empty_query_limit(client, b, alice):
from bigchaindb.models import Transaction
# create two assets
asset1 = {'msg': 'abc 1'}
asset2 = {'msg': 'abc 2'}
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
b.store_bulk_transactions([tx1])
b.store_bulk_transactions([tx2])
res = client.get(QUERY_ASSETS_ENDPOINT + '?limit=1' + f"&query={urllib.parse.quote(json.dumps({}))}")
print(res.json)
assert len(json.loads(res.json)) == 1
assert json.loads(res.json)[0]["data"] == {'msg': 'abc 1'}
assert res.status_code == 200
@pytest.mark.bdb
def test_query_assets_with_empty_query_limit_0(client, b, alice):
from bigchaindb.models import Transaction
# create two assets
asset1 = {'msg': 'abc 1'}
asset2 = {'msg': 'abc 2'}
tx1 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset1).sign([alice.private_key])
tx2 = Transaction.create([alice.public_key], [([alice.public_key], 1)],
asset=asset2).sign([alice.private_key])
b.store_bulk_transactions([tx1])
b.store_bulk_transactions([tx2])
res = client.get(QUERY_ASSETS_ENDPOINT + '?limit=0'+ f"&query={urllib.parse.quote(json.dumps({}))}")
print(res.json)
assert len(json.loads(res.json)) == 2
assert json.loads(res.json)[0]["data"] == {'msg': 'abc 1'}
assert json.loads(res.json)[1]["data"] == {'msg': 'abc 2'}
assert res.status_code == 200