planetmint/tests/backend/localmongodb/test_connection.py
Arpit Shukla ef8dbff7a5
Fixes lint issues (#176)
* Resolved lint issues

* resolved other lint issues
2022-07-05 10:36:54 +02:00

112 lines
3.6 KiB
Python

# # Copyright © 2020 Interplanetary Database Association e.V.,
# # Planetmint and IPDB software contributors.
# # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# # Code is Apache-2.0 and docs are CC-BY-4.0
#
# from unittest import mock
#
# import pytest
# import pymongo
# from pymongo import MongoClient
#
#
# pytestmark = pytest.mark.bdb
#
#
# @pytest.fixture
# def mock_cmd_line_opts():
# return {'argv': ['mongod', '--dbpath=/data'],
# 'ok': 1.0,
# 'parsed': {'replication': {'replSet': None},
# 'storage': {'dbPath': '/data'}}}
#
#
# @pytest.fixture
# def mock_config_opts():
# return {'argv': ['mongod', '--dbpath=/data'],
# 'ok': 1.0,
# 'parsed': {'replication': {'replSetName': None},
# 'storage': {'dbPath': '/data'}}}
#
#
# @pytest.fixture
# def mongodb_connection():
# import planetmint
# return MongoClient(host=planetmint.config['database']['host'],
# port=planetmint.config['database']['port'])
#
#
# def test_get_connection_returns_the_correct_instance(db_host, db_port):
# from planetmint.backend import connect
# from planetmint.backend.connection import Connection
# from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
#
# config = {
# 'backend': 'localmongodb',
# 'host': db_host,
# 'port': db_port,
# 'name': 'test',
# 'replicaset': None,
# }
#
# conn = connect(**config)
# assert isinstance(conn, Connection)
# assert isinstance(conn, LocalMongoDBConnection)
# assert conn.conn._topology_settings.replica_set_name == config['replicaset']
#
#
# @mock.patch('pymongo.MongoClient.__init__')
# def test_connection_error(mock_client):
# from planetmint.backend import connect
# from planetmint.backend.exceptions import ConnectionError
#
# # force the driver to throw ConnectionFailure
# # the mock on time.sleep is to prevent the actual sleep when running
# # the tests
# mock_client.side_effect = pymongo.errors.ConnectionFailure()
#
# with pytest.raises(ConnectionError):
# conn = connect()
# conn.db
#
# assert mock_client.call_count == 3
#
#
# def test_connection_run_errors():
# from planetmint.backend import connect
# from planetmint.backend.exceptions import (DuplicateKeyError,
# OperationError,
# ConnectionError)
#
# conn = connect()
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.AutoReconnect('foo')
# with pytest.raises(ConnectionError):
# conn.run(query)
# assert query.run.call_count == 2
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.DuplicateKeyError('foo')
# with pytest.raises(DuplicateKeyError):
# conn.run(query)
# assert query.run.call_count == 1
#
# query = mock.Mock()
# query.run.side_effect = pymongo.errors.OperationFailure('foo')
# with pytest.raises(OperationError):
# conn.run(query)
# assert query.run.call_count == 1
#
#
# @mock.patch('pymongo.database.Database.authenticate')
# def test_connection_with_credentials(mock_authenticate):
# import planetmint
# from planetmint.backend.localmongodb.connection import LocalMongoDBConnection
# conn = LocalMongoDBConnection(host=planetmint.config['database']['host'],
# port=planetmint.config['database']['port'],
# login='theplague',
# password='secret')
# conn.connect()
# assert mock_authenticate.call_count == 1