This commit is contained in:
andrei 2022-03-11 14:54:00 +02:00
commit 08e33e380c
3 changed files with 190 additions and 187 deletions

View File

@ -1,111 +1,112 @@
# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
from unittest import mock
import pytest
import pymongo
from pymongo import MongoClient
pytestmark = pytest.mark.bdb
@pytest.fixture
def mock_cmd_line_opts():
return {'argv': ['mongod', '--dbpath=/data'],
'ok': 1.0,
'parsed': {'replication': {'replSet': None},
'storage': {'dbPath': '/data'}}}
@pytest.fixture
def mock_config_opts():
return {'argv': ['mongod', '--dbpath=/data'],
'ok': 1.0,
'parsed': {'replication': {'replSetName': None},
'storage': {'dbPath': '/data'}}}
@pytest.fixture
def mongodb_connection():
import planetmint
return MongoClient(host=planetmint.config['database']['host'],
port=planetmint.config['database']['port'])
def test_get_connection_returns_the_correct_instance(db_host, db_port):
from planetmint.backend import connect
from planetmint.backend.connection import Connection
from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
config = {
'backend': 'localmongodb',
'host': db_host,
'port': db_port,
'name': 'test',
'replicaset': None,
}
conn = connect(**config)
assert isinstance(conn, Connection)
assert isinstance(conn, LocalMongoDBConnection)
assert conn.conn._topology_settings.replica_set_name == config['replicaset']
@mock.patch('pymongo.MongoClient.__init__')
def test_connection_error(mock_client):
from planetmint.backend import connect
from planetmint.backend.exceptions import ConnectionError
# force the driver to throw ConnectionFailure
# the mock on time.sleep is to prevent the actual sleep when running
# the tests
mock_client.side_effect = pymongo.errors.ConnectionFailure()
with pytest.raises(ConnectionError):
conn = connect()
conn.db
assert mock_client.call_count == 3
def test_connection_run_errors():
from planetmint.backend import connect
from planetmint.backend.exceptions import (DuplicateKeyError,
OperationError,
ConnectionError)
conn = connect()
query = mock.Mock()
query.run.side_effect = pymongo.errors.AutoReconnect('foo')
with pytest.raises(ConnectionError):
conn.run(query)
assert query.run.call_count == 2
query = mock.Mock()
query.run.side_effect = pymongo.errors.DuplicateKeyError('foo')
with pytest.raises(DuplicateKeyError):
conn.run(query)
assert query.run.call_count == 1
query = mock.Mock()
query.run.side_effect = pymongo.errors.OperationFailure('foo')
with pytest.raises(OperationError):
conn.run(query)
assert query.run.call_count == 1
@mock.patch('pymongo.database.Database.authenticate')
def test_connection_with_credentials(mock_authenticate):
import planetmint
from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
conn = LocalMongoDBConnection(host=planetmint.config['database']['host'],
port=planetmint.config['database']['port'],
login='theplague',
password='secret')
conn.connect()
assert mock_authenticate.call_count == 1
## Copyright © 2020 Interplanetary Database Association e.V.,
## Planetmint and IPDB software contributors.
## SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
## Code is Apache-2.0 and docs are CC-BY-4.0
#
#from unittest import mock
#
#import pytest
#import pymongo
#from pymongo import MongoClient
#
#
#pytestmark = pytest.mark.bdb
#
#
#@pytest.fixture
#def mock_cmd_line_opts():
# return {'argv': ['mongod', '--dbpath=/data'],
# 'ok': 1.0,
# 'parsed': {'replication': {'replSet': None},
# 'storage': {'dbPath': '/data'}}}
#
#
#@pytest.fixture
#def mock_config_opts():
# return {'argv': ['mongod', '--dbpath=/data'],
# 'ok': 1.0,
# 'parsed': {'replication': {'replSetName': None},
# 'storage': {'dbPath': '/data'}}}
#
#
#@pytest.fixture
#def mongodb_connection():
# import planetmint
# return MongoClient(host=planetmint.config['database']['host'],
# port=planetmint.config['database']['port'])
#
#
#def test_get_connection_returns_the_correct_instance(db_host, db_port):
# from planetmint.backend import connect
# from planetmint.backend.connection import Connection
# from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
#
# config = {
# 'backend': 'localmongodb',
# 'host': db_host,
# 'port': db_port,
# 'name': 'test',
# 'replicaset': None,
# }
#
# conn = connect(**config)
# assert isinstance(conn, Connection)
# assert isinstance(conn, LocalMongoDBConnection)
# assert conn.conn._topology_settings.replica_set_name == config['replicaset']
#
#
#@mock.patch('pymongo.MongoClient.__init__')
#def test_connection_error(mock_client):
# from planetmint.backend import connect
# from planetmint.backend.exceptions import ConnectionError
#
# # force the driver to throw ConnectionFailure
# # the mock on time.sleep is to prevent the actual sleep when running
# # the tests
# mock_client.side_effect = pymongo.errors.ConnectionFailure()
#
# with pytest.raises(ConnectionError):
# conn = connect()
# conn.db
#
# assert mock_client.call_count == 3
#
#
#def test_connection_run_errors():
# from planetmint.backend import connect
# from planetmint.backend.exceptions import (DuplicateKeyError,
# OperationError,
# ConnectionError)
#
# conn = connect()
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.AutoReconnect('foo')
# with pytest.raises(ConnectionError):
# conn.run(query)
# assert query.run.call_count == 2
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.DuplicateKeyError('foo')
# with pytest.raises(DuplicateKeyError):
# conn.run(query)
# assert query.run.call_count == 1
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.OperationFailure('foo')
# with pytest.raises(OperationError):
# conn.run(query)
# assert query.run.call_count == 1
#
#
#@mock.patch('pymongo.database.Database.authenticate')
#def test_connection_with_credentials(mock_authenticate):
# import planetmint
# from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
# conn = LocalMongoDBConnection(host=planetmint.config['database']['host'],
# port=planetmint.config['database']['port'],
# login='theplague',
# password='secret')
# conn.connect()
# assert mock_authenticate.call_count == 1
#

View File

@ -483,3 +483,4 @@ def test_store_abci_chain(description, stores, expected):
assert expected == actual, description
test_get_txids_filtered(None, None)

View File

@ -1,76 +1,77 @@
# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
def test_init_database_is_graceful_if_db_exists():
import planetmint
from planetmint import backend
from planetmint.backend.schema import init_database
conn = backend.connect()
dbname = planetmint.config['database']['name']
# The db is set up by the fixtures
assert dbname in conn.conn.list_database_names()
init_database()
def test_create_tables():
import planetmint
from planetmint import backend
from planetmint.backend import schema
conn = backend.connect()
dbname = planetmint.config['database']['name']
# The db is set up by the fixtures so we need to remove it
conn.conn.drop_database(dbname)
schema.create_database(conn, dbname)
schema.create_tables(conn, dbname)
collection_names = conn.conn[dbname].list_collection_names()
assert set(collection_names) == {
'transactions', 'assets', 'metadata', 'blocks', 'utxos', 'validators', 'elections',
'pre_commit', 'abci_chains',
}
indexes = conn.conn[dbname]['assets'].index_information().keys()
assert set(indexes) == {'_id_', 'asset_id', 'text'}
index_info = conn.conn[dbname]['transactions'].index_information()
indexes = index_info.keys()
assert set(indexes) == {
'_id_', 'transaction_id', 'asset_id', 'outputs', 'inputs'}
assert index_info['transaction_id']['unique']
index_info = conn.conn[dbname]['blocks'].index_information()
indexes = index_info.keys()
assert set(indexes) == {'_id_', 'height'}
assert index_info['height']['unique']
index_info = conn.conn[dbname]['utxos'].index_information()
assert set(index_info.keys()) == {'_id_', 'utxo'}
assert index_info['utxo']['unique']
assert index_info['utxo']['key'] == [('transaction_id', 1),
('output_index', 1)]
indexes = conn.conn[dbname]['elections'].index_information()
assert set(indexes.keys()) == {'_id_', 'election_id_height'}
assert indexes['election_id_height']['unique']
indexes = conn.conn[dbname]['pre_commit'].index_information()
assert set(indexes.keys()) == {'_id_', 'height'}
assert indexes['height']['unique']
def test_drop(dummy_db):
from planetmint import backend
from planetmint.backend import schema
conn = backend.connect()
assert dummy_db in conn.conn.list_database_names()
schema.drop_database(conn, dummy_db)
assert dummy_db not in conn.conn.list_database_names()
## Copyright © 2020 Interplanetary Database Association e.V.,
## Planetmint and IPDB software contributors.
## SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
## Code is Apache-2.0 and docs are CC-BY-4.0
#
#
#def test_init_database_is_graceful_if_db_exists():
# import planetmint
# from planetmint import backend
# from planetmint.backend.schema import init_database
#
# conn = backend.connect()
# dbname = planetmint.config['database']['name']
#
# # The db is set up by the fixtures
# assert dbname in conn.conn.list_database_names()
#
# init_database()
#
#
#def test_create_tables():
# import planetmint
# from planetmint import backend
# from planetmint.backend import schema
#
# conn = backend.connect()
# dbname = planetmint.config['database']['name']
#
# # The db is set up by the fixtures so we need to remove it
# conn.conn.drop_database(dbname)
# schema.create_database(conn, dbname)
# schema.create_tables(conn, dbname)
#
# collection_names = conn.conn[dbname].list_collection_names()
# assert set(collection_names) == {
# 'transactions', 'assets', 'metadata', 'blocks', 'utxos', 'validators', 'elections',
# 'pre_commit', 'abci_chains',
# }
#
# indexes = conn.conn[dbname]['assets'].index_information().keys()
# assert set(indexes) == {'_id_', 'asset_id', 'text'}
#
# index_info = conn.conn[dbname]['transactions'].index_information()
# indexes = index_info.keys()
# assert set(indexes) == {
# '_id_', 'transaction_id', 'asset_id', 'outputs', 'inputs'}
# assert index_info['transaction_id']['unique']
#
# index_info = conn.conn[dbname]['blocks'].index_information()
# indexes = index_info.keys()
# assert set(indexes) == {'_id_', 'height'}
# assert index_info['height']['unique']
#
# index_info = conn.conn[dbname]['utxos'].index_information()
# assert set(index_info.keys()) == {'_id_', 'utxo'}
# assert index_info['utxo']['unique']
# assert index_info['utxo']['key'] == [('transaction_id', 1),
# ('output_index', 1)]
#
# indexes = conn.conn[dbname]['elections'].index_information()
# assert set(indexes.keys()) == {'_id_', 'election_id_height'}
# assert indexes['election_id_height']['unique']
#
# indexes = conn.conn[dbname]['pre_commit'].index_information()
# assert set(indexes.keys()) == {'_id_', 'height'}
# assert indexes['height']['unique']
#
#
#def test_drop(dummy_db):
# from planetmint import backend
# from planetmint.backend import schema
#
# conn = backend.connect()
# assert dummy_db in conn.conn.list_database_names()
# schema.drop_database(conn, dummy_db)
# assert dummy_db not in conn.conn.list_database_names()
#