mirror of
https://github.com/planetmint/planetmint.git
synced 2025-11-24 22:45:44 +00:00
Merge pull request #7 from LaurentDeMontBlanc/planetmint-tarantool
Planetmint tarantool
This commit is contained in:
commit
32b27015a1
@ -36,3 +36,4 @@ sphinxcontrib-serializinghtml==1.1.5
|
||||
urllib3==1.26.9
|
||||
wget==3.2
|
||||
zipp==3.8.0
|
||||
nest-asyncio==1.5.5
|
||||
@ -4,9 +4,10 @@ Content-Type: application/json
|
||||
{
|
||||
"assets": "/assets/",
|
||||
"blocks": "/blocks/",
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/http-client-server-api.html",
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/http-client-server-api.html",
|
||||
"metadata": "/metadata/",
|
||||
"outputs": "/outputs/",
|
||||
"streamedblocks": "ws://localhost:9985/api/v1/streams/valid_blocks",
|
||||
"streams": "ws://localhost:9985/api/v1/streams/valid_transactions",
|
||||
"transactions": "/transactions/",
|
||||
"validators": "/validators"
|
||||
|
||||
@ -6,15 +6,16 @@ Content-Type: application/json
|
||||
"v1": {
|
||||
"assets": "/api/v1/assets/",
|
||||
"blocks": "/api/v1/blocks/",
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/http-client-server-api.html",
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/http-client-server-api.html",
|
||||
"metadata": "/api/v1/metadata/",
|
||||
"outputs": "/api/v1/outputs/",
|
||||
"streamedblocks": "ws://localhost:9985/api/v1/streams/valid_blocks",
|
||||
"streams": "ws://localhost:9985/api/v1/streams/valid_transactions",
|
||||
"transactions": "/api/v1/transactions/",
|
||||
"validators": "/api/v1/validators"
|
||||
}
|
||||
},
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/",
|
||||
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/",
|
||||
"software": "Planetmint",
|
||||
"version": "0.9.2"
|
||||
"version": "0.9.3"
|
||||
}
|
||||
|
||||
@ -12,13 +12,8 @@ Planetmint는 모든 종류의 데이터를 저장할 수 있지만 자산 등
|
||||
* CREATE 트랜잭션은 임의의 메타 데이터와 함께 모든 종류의 자산 (나눌 수 없거나 분할 할 수없는)을 등록하는 데 사용할 수 있습니다.
|
||||
* 저작물에는 0 명, 1 명 또는 여러 명의 소유자가있을 수 있습니다.
|
||||
* 자산 소유자는 자산을 신규 소유자에게 양도하려는 사람이 만족해야하는 조건을 지정할 수 있습니다. 예를 들어 5 명의 현재 소유자 중 최소 3 명이 TRANSFER 트랜잭션에 암호를 사용해야합니다.
|
||||
<<<<<<< HEAD
|
||||
* BigchainDB는 TRANSFER 트랜잭션의 유효성을 검사하는 과정에서 조건이 충족되었는지 확인합니다. (또한 누구나 만족하는지 확인할 수 있습니다.)
|
||||
* BigchainDB는 자산의 이중 지출을 방지합니다.
|
||||
=======
|
||||
* Planetmint는 TRANSFER 트랜잭션의 유효성을 검사하는 과정에서 조건이 충족되었는지 확인합니다. (또한 누구나 만족하는지 확인할 수 있습니다.)
|
||||
* Planetmint는 자산의 이중 지출을 방지합니다.
|
||||
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
|
||||
* 유효성이 검증 된 트랜잭션은 [변경불가능](https://github.com/planetmint/planetmint/blob/master/docs/root/source/korean/immutable-ko.md) 입니다.
|
||||
|
||||
Note
|
||||
|
||||
@ -7,12 +7,7 @@ Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
# Planetmint와 Byzantine Fault Tolerance
|
||||
|
||||
<<<<<<< HEAD
|
||||
[Planetmint Server](https://docs.planetmint.com/projects/server/en/latest/index.html)
|
||||
는 블록체인 합의와 트랜잭션 복제에 [Tendermint](https://tendermint.com/)를 사용합니다.
|
||||
=======
|
||||
[Planetmint Server](https://docs.planetmint.io/projects/server/en/latest/index.html)
|
||||
는 블록체인 합의와 트랜잭션 복제에 [Tendermint](https://tendermint.io/)를 사용합니다.
|
||||
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
|
||||
|
||||
그리고 Tendermint 는 [Byzantine Fault Tolerant (BFT)](https://en.wikipedia.org/wiki/Byzantine_fault_tolerance).
|
||||
|
||||
@ -53,11 +53,7 @@ SQL을 이용해 mongoDB 데이터베이스를 쿼리할 수 있습니다. 예
|
||||
...
|
||||
> show dbs
|
||||
admin 0.000GB
|
||||
<<<<<<< HEAD
|
||||
planet 0.000GB
|
||||
=======
|
||||
planetmint 0.000GB
|
||||
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
|
||||
config 0.000GB
|
||||
local 0.000GB
|
||||
> use planetmint
|
||||
@ -166,11 +162,7 @@ metadata 컬렉션의 문서는 MongoDB가 추가한 `"_id"`필드와 거래에
|
||||
각 노드 operator는 외부 사용자가 자신의 로컬 MongoDB 데이터베이스에서 정보를 얻는 방법을 결정할 수 있습니다. 그들은 다음과 같은 것들을 보낼 수 있습니다:
|
||||
|
||||
- 외부유저를 쿼리 처리하는 로컬 MongoDB 데이터베이스 한된 제한된 권한을 가진 역할을 가진 MongoDB 사용자 예) read-only
|
||||
<<<<<<< HEAD
|
||||
- 제한된 미리 정의된 쿼리 집합을 허용하는 제한된 HTTP API, [Planetmint 서버에서 제공하는 HTTP API](http://planetmint.com/http-api), 혹은Django, Express, Ruby on Rails, or ASP.NET.를 이용해 구현된 커스텀 HTTP API
|
||||
=======
|
||||
- 제한된 미리 정의된 쿼리 집합을 허용하는 제한된 HTTP API, [Planetmint 서버에서 제공하는 HTTP API](http://planetmint.io/http-api), 혹은Django, Express, Ruby on Rails, or ASP.NET.를 이용해 구현된 커스텀 HTTP API
|
||||
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
|
||||
- 다른 API(예: GraphQL API) 제3자의 사용자 정의 코드 또는 코드를 사용하여 수행할 수 있습니다..
|
||||
|
||||
각 노드 operator는 로컬 MongoDB 데이터베이스에 대한 다른 레벨 또는 유형의 액세스를 노출할 수 있습니다.
|
||||
|
||||
@ -57,9 +57,5 @@ Each [Planetmint Transactions Spec](https://github.com/planetmint/BEPs/tree/mast
|
||||
|
||||
## 트랜잭션 예시
|
||||
|
||||
<<<<<<< HEAD
|
||||
아래의 [HTTP API 문서](https://docs.planetmint.com/projects/server/en/latest/http-client-server-api.html)와 [the Python 드라이버 문서](https://docs.planetmint.com/projects/py-driver/en/latest/usage.html)에는 예제 Planetmint 트랜잭션이 있습니다.
|
||||
=======
|
||||
아래의 [HTTP API 문서](https://docs.planetmint.io/projects/server/en/latest/http-client-server-api.html)와 [the Python 드라이버 문서](https://docs.planetmint.io/projects/py-driver/en/latest/usage.html)에는 예제 Planetmint 트랜잭션이 있습니다.
|
||||
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
|
||||
.
|
||||
|
||||
@ -13,4 +13,4 @@ configuration or the ``PLANETMINT_DATABASE_BACKEND`` environment variable.
|
||||
|
||||
# Include the backend interfaces
|
||||
from planetmint.backend import schema, query # noqa
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect, Connection
|
||||
|
||||
@ -13,14 +13,14 @@ from planetmint.backend.exceptions import ConnectionError
|
||||
from planetmint.transactions.common.exceptions import ConfigurationError
|
||||
|
||||
BACKENDS = { # This is path to MongoDBClass
|
||||
'tarantool_db': 'planetmint.backend.tarantool.connection.TarantoolDB',
|
||||
'tarantool_db': 'planetmint.backend.tarantool.connection.TarantoolDBConnection',
|
||||
'localmongodb': 'planetmint.backend.localmongodb.connection.LocalMongoDBConnection'
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def Connection(host: str = None, port: int = None, login: str = None, password: str = None, backend: str = None,
|
||||
def connect(host: str = None, port: int = None, login: str = None, password: str = None, backend: str = None,
|
||||
**kwargs):
|
||||
backend = backend
|
||||
if not backend and kwargs and kwargs.get("backend"):
|
||||
@ -81,3 +81,80 @@ def _kwargs_parser(key, kwargs):
|
||||
if kwargs.get(key):
|
||||
return kwargs[key]
|
||||
return None
|
||||
|
||||
class Connection:
|
||||
"""Connection class interface.
|
||||
All backend implementations should provide a connection class that inherits
|
||||
from and implements this class.
|
||||
"""
|
||||
|
||||
def __init__(self, host=None, port=None, dbname=None,
|
||||
connection_timeout=None, max_tries=None,
|
||||
**kwargs):
|
||||
"""Create a new :class:`~.Connection` instance.
|
||||
Args:
|
||||
host (str): the host to connect to.
|
||||
port (int): the port to connect to.
|
||||
dbname (str): the name of the database to use.
|
||||
connection_timeout (int, optional): the milliseconds to wait
|
||||
until timing out the database connection attempt.
|
||||
Defaults to 5000ms.
|
||||
max_tries (int, optional): how many tries before giving up,
|
||||
if 0 then try forever. Defaults to 3.
|
||||
**kwargs: arbitrary keyword arguments provided by the
|
||||
configuration's ``database`` settings
|
||||
"""
|
||||
|
||||
dbconf = Config().get()['database']
|
||||
|
||||
self.host = host or dbconf['host']
|
||||
self.port = port or dbconf['port']
|
||||
self.dbname = dbname or dbconf['name']
|
||||
self.connection_timeout = connection_timeout if connection_timeout is not None \
|
||||
else dbconf['connection_timeout']
|
||||
self.max_tries = max_tries if max_tries is not None else dbconf['max_tries']
|
||||
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
|
||||
self._conn = None
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
if self._conn is None:
|
||||
self.connect()
|
||||
return self._conn
|
||||
|
||||
def run(self, query):
|
||||
"""Run a query.
|
||||
Args:
|
||||
query: the query to run
|
||||
Raises:
|
||||
:exc:`~DuplicateKeyError`: If the query fails because of a
|
||||
duplicate key constraint.
|
||||
:exc:`~OperationFailure`: If the query fails for any other
|
||||
reason.
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def connect(self):
|
||||
"""Try to connect to the database.
|
||||
Raises:
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
for i in self.max_tries_counter:
|
||||
attempt += 1
|
||||
try:
|
||||
self._conn = self._connect()
|
||||
except ConnectionError as exc:
|
||||
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
|
||||
attempt, self.max_tries if self.max_tries != 0 else '∞',
|
||||
self.host, self.port, self.connection_timeout)
|
||||
if attempt == self.max_tries:
|
||||
logger.critical('Cannot connect to the Database. Giving up.')
|
||||
raise ConnectionError() from exc
|
||||
else:
|
||||
break
|
||||
|
||||
@ -1,176 +0,0 @@
|
||||
# Copyright © 2020 Interplanetary Database Association e.V.,
|
||||
# Planetmint and IPDB software contributors.
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
import logging
|
||||
from importlib import import_module
|
||||
from itertools import repeat
|
||||
|
||||
import tarantool
|
||||
import planetmint
|
||||
import os
|
||||
|
||||
from planetmint.backend.exceptions import ConnectionError
|
||||
from planetmint.backend.utils import get_planetmint_config_value, get_planetmint_config_value_or_key_error
|
||||
from planetmint.transactions.common.exceptions import ConfigurationError
|
||||
|
||||
BACKENDS = { # This is path to MongoDBClass
|
||||
'tarantool': 'planetmint.backend.connection_tarantool.TarantoolDB',
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TarantoolDB:
|
||||
init_config = {
|
||||
"init_file": "init_db.txt",
|
||||
"relative_path": os.path.dirname(os.path.abspath(__file__)) + "/backend/tarantool/"
|
||||
}
|
||||
|
||||
drop_config = {
|
||||
"drop_file": "drop_db.txt", # planetmint/backend/tarantool/init_db.txt
|
||||
"relative_path": os.path.dirname(os.path.abspath(__file__)) + "/backend/tarantool/"
|
||||
}
|
||||
|
||||
def __init__(self, host: str, port: int, user: str, password: str, reset_database: bool = False):
|
||||
self.db_connect = tarantool.connect(host=host, port=port, user=user, password=password)
|
||||
if reset_database:
|
||||
self.drop_database()
|
||||
self.init_database()
|
||||
|
||||
def get_connection(self, space_name: str = None):
|
||||
return self.db_connect if space_name is None else self.db_connect.space(space_name)
|
||||
|
||||
def __read_commands(self, file_path):
|
||||
with open(file_path, "r") as cmd_file:
|
||||
commands = [line.strip() for line in cmd_file.readlines() if len(str(line)) > 1]
|
||||
cmd_file.close()
|
||||
return commands
|
||||
|
||||
def drop_database(self):
|
||||
from planetmint.backend.tarantool.utils import run
|
||||
# config = get_planetmint_config_value_or_key_error("ctl_config")
|
||||
# drop_config = config["drop_config"]
|
||||
f_path = "%s%s" % (self.drop_config["relative_path"], self.drop_config["drop_file"])
|
||||
commands = self.__read_commands(file_path=f_path)
|
||||
run(commands=commands, config=config)
|
||||
|
||||
def init_database(self):
|
||||
from planetmint.backend.tarantool.utils import run
|
||||
# config = get_planetmint_config_value_or_key_error("ctl_config")
|
||||
# init_config = config["init_config"]
|
||||
f_path = "%s%s" % (self.init_config["relative_path"], self.init_config["init_file"])
|
||||
commands = self.__read_commands(file_path=f_path)
|
||||
run(commands=commands, config=config)
|
||||
|
||||
|
||||
|
||||
def connect(host: str = None, port: int = None, username: str = None, password: str = None,
|
||||
backend: str = None, reset_database: bool = False, name=None, max_tries=None,
|
||||
connection_timeout=None, replicaset=None, ssl=None, login: str = None, ctl_config=None,
|
||||
ca_cert=None, certfile=None, keyfile=None, keyfile_passphrase=None, reconnect_delay=None,
|
||||
crlfile=None, connect_now=True, encoding=None):
|
||||
backend = backend or get_planetmint_config_value_or_key_error('backend') # TODO Rewrite Configs
|
||||
host = host or get_planetmint_config_value_or_key_error('host')
|
||||
port = port or get_planetmint_config_value_or_key_error('port')
|
||||
username = username or login or get_planetmint_config_value('login')
|
||||
password = password or get_planetmint_config_value('password')
|
||||
|
||||
try: # Here we get class using getattr function
|
||||
module_name, _, class_name = BACKENDS[backend].rpartition('.')
|
||||
Class = getattr(import_module(module_name), class_name)
|
||||
except KeyError:
|
||||
raise ConfigurationError('Backend `{}` is not supported. '
|
||||
'Planetmint currently supports {}'.format(backend, BACKENDS.keys()))
|
||||
except (ImportError, AttributeError) as exc:
|
||||
raise ConfigurationError('Error loading backend `{}`'.format(backend)) from exc
|
||||
print(host)
|
||||
print(port)
|
||||
print(username)
|
||||
|
||||
logger.debug('Connection: {}'.format(Class))
|
||||
return Class(host=host, port=port, user=username, password=password, reset_database=reset_database)
|
||||
|
||||
|
||||
class Connection:
|
||||
"""Connection class interface.
|
||||
|
||||
All backend implementations should provide a connection class that inherits
|
||||
from and implements this class.
|
||||
"""
|
||||
|
||||
def __init__(self, host=None, port=None, connection_timeout=None, max_tries=None,
|
||||
**kwargs):
|
||||
"""Create a new :class:`~.Connection` instance.
|
||||
|
||||
Args:
|
||||
host (str): the host to connect to.
|
||||
port (int): the port to connect to.
|
||||
dbname (str): the name of the database to use.
|
||||
connection_timeout (int, optional): the milliseconds to wait
|
||||
until timing out the database connection attempt.
|
||||
Defaults to 5000ms.
|
||||
max_tries (int, optional): how many tries before giving up,
|
||||
if 0 then try forever. Defaults to 3.
|
||||
**kwargs: arbitrary keyword arguments provided by the
|
||||
configuration's ``database`` settings
|
||||
"""
|
||||
|
||||
dbconf = planetmint.config['database']
|
||||
|
||||
self.host = host or dbconf['host']
|
||||
self.port = port or dbconf['port']
|
||||
self.connection_timeout = connection_timeout if connection_timeout is not None \
|
||||
else dbconf['connection_timeout']
|
||||
self.max_tries = max_tries if max_tries is not None else dbconf['max_tries']
|
||||
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
|
||||
self._conn = None
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
pass
|
||||
if self._conn is None:
|
||||
self.connect()
|
||||
return self._conn
|
||||
|
||||
def run(self, query):
|
||||
pass
|
||||
"""Run a query.
|
||||
|
||||
Args:
|
||||
query: the query to run
|
||||
Raises:
|
||||
:exc:`~DuplicateKeyError`: If the query fails because of a
|
||||
duplicate key constraint.
|
||||
:exc:`~OperationFailure`: If the query fails for any other
|
||||
reason.
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def connect(self):
|
||||
pass
|
||||
"""Try to connect to the database.
|
||||
|
||||
Raises:
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
for i in self.max_tries_counter:
|
||||
attempt += 1
|
||||
try:
|
||||
self._conn = self._connect()
|
||||
except ConnectionError as exc:
|
||||
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
|
||||
attempt, self.max_tries if self.max_tries != 0 else '∞',
|
||||
self.host, self.port, self.connection_timeout)
|
||||
if attempt == self.max_tries:
|
||||
logger.critical('Cannot connect to the Database. Giving up.')
|
||||
raise ConnectionError() from exc
|
||||
else:
|
||||
break
|
||||
@ -13,88 +13,10 @@ from planetmint.backend.exceptions import (DuplicateKeyError,
|
||||
ConnectionError)
|
||||
from planetmint.transactions.common.exceptions import ConfigurationError
|
||||
from planetmint.utils import Lazy
|
||||
from planetmint.backend.connection import Connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection:
|
||||
"""Connection class interface.
|
||||
All backend implementations should provide a connection class that inherits
|
||||
from and implements this class.
|
||||
"""
|
||||
|
||||
def __init__(self, host=None, port=None, dbname=None,
|
||||
connection_timeout=None, max_tries=None,
|
||||
**kwargs):
|
||||
"""Create a new :class:`~.Connection` instance.
|
||||
Args:
|
||||
host (str): the host to connect to.
|
||||
port (int): the port to connect to.
|
||||
dbname (str): the name of the database to use.
|
||||
connection_timeout (int, optional): the milliseconds to wait
|
||||
until timing out the database connection attempt.
|
||||
Defaults to 5000ms.
|
||||
max_tries (int, optional): how many tries before giving up,
|
||||
if 0 then try forever. Defaults to 3.
|
||||
**kwargs: arbitrary keyword arguments provided by the
|
||||
configuration's ``database`` settings
|
||||
"""
|
||||
|
||||
dbconf = Config().get()['database']
|
||||
|
||||
self.host = host or dbconf['host']
|
||||
self.port = port or dbconf['port']
|
||||
self.dbname = dbname or dbconf['name']
|
||||
self.connection_timeout = connection_timeout if connection_timeout is not None \
|
||||
else dbconf['connection_timeout']
|
||||
self.max_tries = max_tries if max_tries is not None else dbconf['max_tries']
|
||||
self.max_tries_counter = range(self.max_tries) if self.max_tries != 0 else repeat(0)
|
||||
self._conn = None
|
||||
|
||||
@property
|
||||
def conn(self):
|
||||
if self._conn is None:
|
||||
self.connect()
|
||||
return self._conn
|
||||
|
||||
def run(self, query):
|
||||
"""Run a query.
|
||||
Args:
|
||||
query: the query to run
|
||||
Raises:
|
||||
:exc:`~DuplicateKeyError`: If the query fails because of a
|
||||
duplicate key constraint.
|
||||
:exc:`~OperationFailure`: If the query fails for any other
|
||||
reason.
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def connect(self):
|
||||
"""Try to connect to the database.
|
||||
Raises:
|
||||
:exc:`~ConnectionError`: If the connection to the database
|
||||
fails.
|
||||
"""
|
||||
|
||||
attempt = 0
|
||||
for i in self.max_tries_counter:
|
||||
attempt += 1
|
||||
try:
|
||||
self._conn = self._connect()
|
||||
except ConnectionError as exc:
|
||||
logger.warning('Attempt %s/%s. Connection to %s:%s failed after %sms.',
|
||||
attempt, self.max_tries if self.max_tries != 0 else '∞',
|
||||
self.host, self.port, self.connection_timeout)
|
||||
if attempt == self.max_tries:
|
||||
logger.critical('Cannot connect to the Database. Giving up.')
|
||||
raise ConnectionError() from exc
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
class LocalMongoDBConnection(Connection):
|
||||
|
||||
def __init__(self, replicaset=None, ssl=None, login=None, password=None,
|
||||
|
||||
@ -9,7 +9,7 @@ from functools import singledispatch
|
||||
import logging
|
||||
|
||||
from planetmint.config import Config
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
from planetmint.transactions.common.exceptions import ValidationError
|
||||
from planetmint.transactions.common.utils import (
|
||||
validate_all_values_for_key_in_obj, validate_all_values_for_key_in_list)
|
||||
@ -83,7 +83,7 @@ def init_database(connection=None, dbname=None):
|
||||
configuration.
|
||||
"""
|
||||
|
||||
connection = connection or Connection()
|
||||
connection = connection or connect()
|
||||
dbname = dbname or Config().get()['database']['name']
|
||||
|
||||
create_database(connection, dbname)
|
||||
|
||||
@ -8,11 +8,12 @@ import tarantool
|
||||
|
||||
from planetmint.config import Config
|
||||
from planetmint.transactions.common.exceptions import ConfigurationError
|
||||
from planetmint.backend.connection import Connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TarantoolDB:
|
||||
class TarantoolDBConnection(Connection):
|
||||
def __init__(self, host: str = "localhost", port: int = 3303, user: str = None, password: str = None, **kwargs):
|
||||
try:
|
||||
self.host = host
|
||||
|
||||
@ -12,14 +12,14 @@ import tarantool.error
|
||||
|
||||
from planetmint.backend import query
|
||||
from planetmint.backend.utils import module_dispatch_registrar
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
from planetmint.backend.tarantool.transaction.tools import TransactionCompose, TransactionDecompose
|
||||
from json import dumps, loads
|
||||
|
||||
register_query = module_dispatch_registrar(query)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def _group_transaction_by_ids(connection, txids: list):
|
||||
txspace = connection.space("transactions")
|
||||
inxspace = connection.space("inputs")
|
||||
@ -55,7 +55,7 @@ def _group_transaction_by_ids(connection, txids: list):
|
||||
return _transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_transactions(connection, signed_transactions: list):
|
||||
txspace = connection.space("transactions")
|
||||
inxspace = connection.space("inputs")
|
||||
@ -88,26 +88,26 @@ def store_transactions(connection, signed_transactions: list):
|
||||
assetsxspace.insert(txtuples["asset"])
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_transaction(connection, transaction_id: str):
|
||||
_transactions = _group_transaction_by_ids(txids=[transaction_id], connection=connection)
|
||||
return next(iter(_transactions), None)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_transactions(connection, transactions_ids: list):
|
||||
_transactions = _group_transaction_by_ids(txids=transactions_ids, connection=connection)
|
||||
return _transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_metadatas(connection, metadata: list):
|
||||
space = connection.space("meta_data")
|
||||
for meta in metadata:
|
||||
space.insert((meta["id"], meta["data"] if not "metadata" in meta else meta["metadata"]))
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_metadata(connection, transaction_ids: list):
|
||||
_returned_data = []
|
||||
space = connection.space("meta_data")
|
||||
@ -118,7 +118,7 @@ def get_metadata(connection, transaction_ids: list):
|
||||
return _returned_data if len(_returned_data) > 0 else None
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_asset(connection, asset):
|
||||
space = connection.space("assets")
|
||||
convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"])
|
||||
@ -128,7 +128,7 @@ def store_asset(connection, asset):
|
||||
print("DUPLICATE ERROR")
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_assets(connection, assets: list):
|
||||
space = connection.space("assets")
|
||||
convert = lambda obj: obj if isinstance(obj, tuple) else (obj, obj["id"], obj["id"])
|
||||
@ -139,7 +139,7 @@ def store_assets(connection, assets: list):
|
||||
print(f"EXCEPTION : {ex} ")
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_asset(connection, asset_id: str):
|
||||
space = connection.space("assets")
|
||||
_data = space.select(asset_id, index="txid_search")
|
||||
@ -147,7 +147,7 @@ def get_asset(connection, asset_id: str):
|
||||
return _data[0][0] if len(_data) > 0 else []
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_assets(connection, assets_ids: list) -> list:
|
||||
_returned_data = []
|
||||
for _id in list(set(assets_ids)):
|
||||
@ -156,7 +156,7 @@ def get_assets(connection, assets_ids: list) -> list:
|
||||
return sorted(_returned_data, key=lambda k: k["id"], reverse=False)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str):
|
||||
space = connection.space("inputs")
|
||||
_inputs = space.select([fullfil_transaction_id, str(fullfil_output_index)], index="spent_search")
|
||||
@ -165,7 +165,7 @@ def get_spent(connection, fullfil_transaction_id: str, fullfil_output_index: str
|
||||
return _transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR
|
||||
try:
|
||||
space = connection.space("blocks")
|
||||
@ -190,7 +190,7 @@ def get_latest_block(connection): # TODO Here is used DESCENDING OPERATOR
|
||||
raise err
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_block(connection, block: dict):
|
||||
space = connection.space("blocks")
|
||||
block_unique_id = token_hex(8)
|
||||
@ -202,7 +202,7 @@ def store_block(connection, block: dict):
|
||||
space.insert((txid, block_unique_id))
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_txids_filtered(connection, asset_id: str, operation: str = None,
|
||||
last_tx: any = None): # TODO here is used 'OR' operator
|
||||
actions = {
|
||||
@ -263,7 +263,7 @@ def _remove_text_score(asset):
|
||||
return asset
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_owned_ids(connection, owner: str):
|
||||
space = connection.space("keys")
|
||||
_keys = space.select(owner, index="keys_search")
|
||||
@ -274,7 +274,7 @@ def get_owned_ids(connection, owner: str):
|
||||
return _transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_spending_transactions(connection, inputs):
|
||||
_transactions = []
|
||||
|
||||
@ -287,7 +287,7 @@ def get_spending_transactions(connection, inputs):
|
||||
return _transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_block(connection, block_id=[]):
|
||||
space = connection.space("blocks")
|
||||
_block = space.select(block_id, index="block_search", limit=1)
|
||||
@ -302,7 +302,7 @@ def get_block(connection, block_id=[]):
|
||||
return {"app_hash": _block[0], "height": _block[1], "transactions": [_tx[0] for _tx in _txblock]}
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_block_with_transaction(connection, txid: str):
|
||||
space = connection.space("blocks_tx")
|
||||
_all_blocks_tx = space.select(txid, index="id_search")
|
||||
@ -314,7 +314,7 @@ def get_block_with_transaction(connection, txid: str):
|
||||
return [{"height": _height[1]} for _height in _block.data]
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def delete_transactions(connection, txn_ids: list):
|
||||
tx_space = connection.space("transactions")
|
||||
for _id in txn_ids:
|
||||
@ -342,7 +342,7 @@ def delete_transactions(connection, txn_ids: list):
|
||||
assets_space.delete(_id, index="txid_search")
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_unspent_outputs(connection, *unspent_outputs: list):
|
||||
space = connection.space('utxos')
|
||||
result = []
|
||||
@ -352,7 +352,7 @@ def store_unspent_outputs(connection, *unspent_outputs: list):
|
||||
result.append(output.data)
|
||||
return result
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def delete_unspent_outputs(connection, *unspent_outputs: list):
|
||||
space = connection.space('utxos')
|
||||
result = []
|
||||
@ -363,14 +363,14 @@ def delete_unspent_outputs(connection, *unspent_outputs: list):
|
||||
return result
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_unspent_outputs(connection, query=None): # for now we don't have implementation for 'query'.
|
||||
space = connection.space('utxos')
|
||||
_utxos = space.select([]).data
|
||||
return [loads(utx[2]) for utx in _utxos]
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_pre_commit_state(connection, state: dict):
|
||||
space = connection.space("pre_commits")
|
||||
_precommit = space.select(state["height"], index="height_search", limit=1)
|
||||
@ -382,7 +382,7 @@ def store_pre_commit_state(connection, state: dict):
|
||||
limit=1)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_pre_commit_state(connection):
|
||||
try:
|
||||
space = connection.space("pre_commits")
|
||||
@ -397,7 +397,7 @@ def get_pre_commit_state(connection):
|
||||
raise err
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_validator_set(conn, validators_update: dict):
|
||||
space = conn.space("validators")
|
||||
_validator = space.select(validators_update["height"], index="height_search", limit=1)
|
||||
@ -409,7 +409,7 @@ def store_validator_set(conn, validators_update: dict):
|
||||
limit=1)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def delete_validator_set(connection, height: int):
|
||||
space = connection.space("validators")
|
||||
_validators = space.select(height, index="height_search")
|
||||
@ -417,7 +417,7 @@ def delete_validator_set(connection, height: int):
|
||||
space.delete(_valid[0])
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_election(connection, election_id: str, height: int, is_concluded: bool):
|
||||
space = connection.space("elections")
|
||||
space.upsert((election_id, height, is_concluded),
|
||||
@ -427,7 +427,7 @@ def store_election(connection, election_id: str, height: int, is_concluded: bool
|
||||
limit=1)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_elections(connection, elections: list):
|
||||
space = connection.space("elections")
|
||||
for election in elections:
|
||||
@ -436,7 +436,7 @@ def store_elections(connection, elections: list):
|
||||
election["is_concluded"]))
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def delete_elections(connection, height: int):
|
||||
space = connection.space("elections")
|
||||
_elections = space.select(height, index="height_search")
|
||||
@ -444,7 +444,7 @@ def delete_elections(connection, height: int):
|
||||
space.delete(_elec[0])
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_validator_set(connection, height: int = None):
|
||||
try:
|
||||
space = connection.space("validators")
|
||||
@ -463,7 +463,7 @@ def get_validator_set(connection, height: int = None):
|
||||
raise err
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_election(connection, election_id: str):
|
||||
try:
|
||||
space = connection.space("elections")
|
||||
@ -479,7 +479,7 @@ def get_election(connection, election_id: str):
|
||||
raise err
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str):
|
||||
space = connection.space("keys")
|
||||
# _keys = space.select([public_key], index="keys_search")
|
||||
@ -491,7 +491,7 @@ def get_asset_tokens_for_public_key(connection, asset_id: str, public_key: str):
|
||||
return _grouped_transactions
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = True):
|
||||
space = connection.space("abci_chains")
|
||||
space.upsert((height, is_synced, chain_id),
|
||||
@ -501,7 +501,7 @@ def store_abci_chain(connection, height: int, chain_id: str, is_synced: bool = T
|
||||
limit=1)
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def delete_abci_chain(connection, height: int):
|
||||
space = connection.space("abci_chains")
|
||||
_chains = space.select(height, index="height_search")
|
||||
@ -509,7 +509,7 @@ def delete_abci_chain(connection, height: int):
|
||||
space.delete(_chain[2])
|
||||
|
||||
|
||||
@register_query(TarantoolDB)
|
||||
@register_query(TarantoolDBConnection)
|
||||
def get_latest_abci_chain(connection):
|
||||
try:
|
||||
space = connection.space("abci_chains")
|
||||
|
||||
@ -3,7 +3,7 @@ import warnings
|
||||
import tarantool
|
||||
from planetmint.backend.utils import module_dispatch_registrar
|
||||
from planetmint import backend
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
register_schema = module_dispatch_registrar(backend.schema)
|
||||
|
||||
@ -145,7 +145,7 @@ SCHEMA_DROP_COMMANDS = {
|
||||
}
|
||||
|
||||
|
||||
@register_schema(TarantoolDB)
|
||||
@register_schema(TarantoolDBConnection)
|
||||
def drop_database(connection, not_used=None):
|
||||
for _space in SPACE_NAMES:
|
||||
try:
|
||||
@ -161,7 +161,7 @@ def drop_database(connection, not_used=None):
|
||||
# connection.drop_database()
|
||||
|
||||
|
||||
@register_schema(TarantoolDB)
|
||||
@register_schema(TarantoolDBConnection)
|
||||
def create_database(connection, not_used=None):
|
||||
'''
|
||||
|
||||
@ -182,7 +182,7 @@ def run_command_with_output(command):
|
||||
return output
|
||||
|
||||
|
||||
@register_schema(TarantoolDB)
|
||||
@register_schema(TarantoolDBConnection)
|
||||
def create_tables(connection, dbname):
|
||||
for _space in SPACE_NAMES:
|
||||
try:
|
||||
|
||||
@ -13,7 +13,7 @@ import argparse
|
||||
import copy
|
||||
import json
|
||||
import sys
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
from planetmint.core import rollback
|
||||
from planetmint.migrations.chain_migration_election import ChainMigrationElection
|
||||
@ -265,9 +265,9 @@ def run_drop(args):
|
||||
if response != 'y':
|
||||
return
|
||||
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
from planetmint.backend import schema
|
||||
conn = Connection()
|
||||
conn = connect()
|
||||
try:
|
||||
schema.drop_database(conn)
|
||||
except DatabaseDoesNotExist:
|
||||
|
||||
@ -245,6 +245,7 @@ class App(BaseApplication):
|
||||
if self.events_queue:
|
||||
event = Event(EventTypes.BLOCK_VALID, {
|
||||
'height': self.new_height,
|
||||
'hash': self.block_txn_hash,
|
||||
'transactions': self.block_transactions
|
||||
})
|
||||
self.events_queue.put(event)
|
||||
|
||||
@ -73,9 +73,7 @@ class Planetmint(object):
|
||||
self.validation = config_utils.load_validation_plugin(validationPlugin)
|
||||
else:
|
||||
self.validation = BaseValidationRules
|
||||
# planetmint.backend.tarantool.connection_tarantool.connect(**Config().get()['database'])
|
||||
self.connection = connection if connection is not None else planetmint.backend.Connection()
|
||||
print(f"PLANETMINT self.connection {self.connection} !!!!")
|
||||
self.connection = connection if connection is not None else planetmint.backend.connect()
|
||||
|
||||
def post_transaction(self, transaction, mode):
|
||||
"""Submit a valid transaction to the mempool."""
|
||||
|
||||
@ -11,6 +11,7 @@ from .utils import _fulfillment_to_details, _fulfillment_from_details
|
||||
from .output import Output
|
||||
from .transaction_link import TransactionLink
|
||||
|
||||
|
||||
class Input(object):
|
||||
"""A Input is used to spend assets locked by an Output.
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ from cryptoconditions import Fulfillment, ThresholdSha256, Ed25519Sha256
|
||||
from planetmint.transactions.common.exceptions import AmountError
|
||||
from .utils import _fulfillment_to_details, _fulfillment_from_details
|
||||
|
||||
|
||||
class Output(object):
|
||||
"""An Output is used to lock an asset.
|
||||
|
||||
|
||||
@ -48,6 +48,7 @@ UnspentOutput = namedtuple(
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class Transaction(object):
|
||||
"""A Transaction is used to create and transfer assets.
|
||||
|
||||
|
||||
@ -168,6 +168,7 @@ def validate_key(obj_name, key):
|
||||
'".", "$" or null characters').format(key, obj_name)
|
||||
raise ValidationError(error_str)
|
||||
|
||||
|
||||
def _fulfillment_to_details(fulfillment):
|
||||
"""Encode a fulfillment as a details dictionary
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ from planetmint.models import Transaction
|
||||
from planetmint.transactions.common.input import Input
|
||||
from planetmint.transactions.common.output import Output
|
||||
|
||||
|
||||
class Create(Transaction):
|
||||
|
||||
OPERATION = 'CREATE'
|
||||
|
||||
@ -7,6 +7,7 @@ from planetmint.models import Transaction
|
||||
from planetmint.transactions.common.output import Output
|
||||
from copy import deepcopy
|
||||
|
||||
|
||||
class Transfer(Transaction):
|
||||
|
||||
OPERATION = 'TRANSFER'
|
||||
|
||||
@ -6,6 +6,7 @@ from tendermint.abci import types_pb2
|
||||
from tendermint.crypto import keys_pb2
|
||||
from planetmint.transactions.common.exceptions import InvalidPublicKey
|
||||
|
||||
|
||||
def encode_validator(v):
|
||||
ed25519_public_key = v['public_key']['value']
|
||||
pub_key = keys_pb2.PublicKey(ed25519=bytes.fromhex(ed25519_public_key))
|
||||
|
||||
@ -3,8 +3,8 @@
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
__version__ = '0.9.2'
|
||||
__version__ = '0.9.3'
|
||||
__short_version__ = '0.9'
|
||||
|
||||
# Supported Tendermint versions
|
||||
__tm_supported_versions__ = ["0.34.15"]
|
||||
__tm_supported_versions__ = ['0.34.15']
|
||||
|
||||
@ -10,7 +10,7 @@ from flask_restful import Resource
|
||||
|
||||
from planetmint.web.views.base import base_ws_uri
|
||||
from planetmint import version
|
||||
from planetmint.web.websocket_server import EVENTS_ENDPOINT
|
||||
from planetmint.web.websocket_server import EVENTS_ENDPOINT, EVENTS_ENDPOINT_BLOCKS
|
||||
|
||||
|
||||
class RootIndex(Resource):
|
||||
@ -38,7 +38,8 @@ def get_api_v1_info(api_prefix):
|
||||
"""Return a dict with all the information specific for the v1 of the
|
||||
api.
|
||||
"""
|
||||
websocket_root = base_ws_uri() + EVENTS_ENDPOINT
|
||||
websocket_root_tx = base_ws_uri() + EVENTS_ENDPOINT
|
||||
websocket_root_block = base_ws_uri() + EVENTS_ENDPOINT_BLOCKS
|
||||
docs_url = [
|
||||
'https://docs.planetmint.com/projects/server/en/v',
|
||||
version.__version__,
|
||||
@ -51,7 +52,8 @@ def get_api_v1_info(api_prefix):
|
||||
'blocks': '{}blocks/'.format(api_prefix),
|
||||
'assets': '{}assets/'.format(api_prefix),
|
||||
'outputs': '{}outputs/'.format(api_prefix),
|
||||
'streams': websocket_root,
|
||||
'streams': websocket_root_tx,
|
||||
'streamedblocks': websocket_root_block,
|
||||
'metadata': '{}metadata/'.format(api_prefix),
|
||||
'validators': '{}validators'.format(api_prefix),
|
||||
}
|
||||
|
||||
@ -26,6 +26,6 @@ class OutputListApi(Resource):
|
||||
pool = current_app.config['bigchain_pool']
|
||||
with pool() as planet:
|
||||
outputs = planet.get_outputs_filtered(args['public_key'],
|
||||
args['spent'])
|
||||
args['spent'])
|
||||
return [{'transaction_id': output.txid, 'output_index': output.output}
|
||||
for output in outputs]
|
||||
|
||||
@ -8,6 +8,7 @@ import re
|
||||
from planetmint.transactions.common.transaction_mode_types import (
|
||||
BROADCAST_TX_COMMIT, BROADCAST_TX_ASYNC, BROADCAST_TX_SYNC)
|
||||
|
||||
|
||||
def valid_txid(txid):
|
||||
if re.match('^[a-fA-F0-9]{64}$', txid):
|
||||
return txid.lower()
|
||||
|
||||
89
planetmint/web/websocket_dispatcher.py
Normal file
89
planetmint/web/websocket_dispatcher.py
Normal file
@ -0,0 +1,89 @@
|
||||
# Copyright © 2020 Interplanetary Database Association e.V.,
|
||||
# Planetmint and IPDB software contributors.
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
|
||||
import json
|
||||
from planetmint.events import EventTypes
|
||||
from planetmint.events import POISON_PILL
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
"""Dispatch events to websockets.
|
||||
|
||||
This class implements a simple publish/subscribe pattern.
|
||||
"""
|
||||
|
||||
def __init__(self, event_source, type='tx'):
|
||||
"""Create a new instance.
|
||||
|
||||
Args:
|
||||
event_source: a source of events. Elements in the queue
|
||||
should be strings.
|
||||
"""
|
||||
|
||||
self.event_source = event_source
|
||||
self.subscribers = {}
|
||||
self.type = type
|
||||
|
||||
def subscribe(self, uuid, websocket):
|
||||
"""Add a websocket to the list of subscribers.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the websocket.
|
||||
websocket: the websocket to publish information.
|
||||
"""
|
||||
|
||||
self.subscribers[uuid] = websocket
|
||||
|
||||
def unsubscribe(self, uuid):
|
||||
"""Remove a websocket from the list of subscribers.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the websocket.
|
||||
"""
|
||||
|
||||
del self.subscribers[uuid]
|
||||
|
||||
@staticmethod
|
||||
def simplified_block(block):
|
||||
txids = []
|
||||
for tx in block['transactions']:
|
||||
txids.append(tx.id)
|
||||
return {'height': block['height'], 'hash': block['hash'], 'transaction_ids': txids}
|
||||
|
||||
@staticmethod
|
||||
def eventify_block(block):
|
||||
for tx in block['transactions']:
|
||||
if tx.asset:
|
||||
asset_id = tx.asset.get('id', tx.id)
|
||||
else:
|
||||
asset_id = tx.id
|
||||
yield {'height': block['height'],
|
||||
'asset_id': asset_id,
|
||||
'transaction_id': tx.id}
|
||||
|
||||
async def publish(self):
|
||||
"""Publish new events to the subscribers."""
|
||||
|
||||
while True:
|
||||
event = await self.event_source.get()
|
||||
str_buffer = []
|
||||
|
||||
if event == POISON_PILL:
|
||||
return
|
||||
|
||||
if isinstance(event, str):
|
||||
str_buffer.append(event)
|
||||
elif event.type == EventTypes.BLOCK_VALID:
|
||||
if self.type == 'tx':
|
||||
str_buffer = map(json.dumps, self.eventify_block(event.data))
|
||||
elif self.type == 'blk':
|
||||
str_buffer = [json.dumps(self.simplified_block(event.data))]
|
||||
else:
|
||||
return
|
||||
|
||||
for str_item in str_buffer:
|
||||
for _, websocket in self.subscribers.items():
|
||||
await websocket.send_str(str_item)
|
||||
@ -16,26 +16,24 @@
|
||||
# things in a better way.
|
||||
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
import aiohttp
|
||||
|
||||
|
||||
from uuid import uuid4
|
||||
from concurrent.futures import CancelledError
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
|
||||
from planetmint.config import Config
|
||||
from planetmint.events import EventTypes
|
||||
from planetmint import config
|
||||
from planetmint.web.websocket_dispatcher import Dispatcher
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
POISON_PILL = 'POISON_PILL'
|
||||
EVENTS_ENDPOINT = '/api/v1/streams/valid_transactions'
|
||||
EVENTS_ENDPOINT_BLOCKS = '/api/v1/streams/valid_blocks'
|
||||
|
||||
|
||||
def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
|
||||
def _multiprocessing_to_asyncio(in_queue, out_queue1, out_queue2, loop):
|
||||
"""Bridge between a synchronous multiprocessing queue
|
||||
and an asynchronous asyncio queue.
|
||||
|
||||
@ -46,85 +44,18 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
|
||||
|
||||
while True:
|
||||
value = in_queue.get()
|
||||
loop.call_soon_threadsafe(out_queue.put_nowait, value)
|
||||
loop.call_soon_threadsafe(out_queue1.put_nowait, value)
|
||||
loop.call_soon_threadsafe(out_queue2.put_nowait, value)
|
||||
|
||||
|
||||
def eventify_block(block):
|
||||
for tx in block['transactions']:
|
||||
if tx.asset:
|
||||
asset_id = tx.asset.get('id', tx.id)
|
||||
else:
|
||||
asset_id = tx.id
|
||||
yield {'height': block['height'],
|
||||
'asset_id': asset_id,
|
||||
'transaction_id': tx.id}
|
||||
|
||||
|
||||
class Dispatcher:
|
||||
"""Dispatch events to websockets.
|
||||
|
||||
This class implements a simple publish/subscribe pattern.
|
||||
"""
|
||||
|
||||
def __init__(self, event_source):
|
||||
"""Create a new instance.
|
||||
|
||||
Args:
|
||||
event_source: a source of events. Elements in the queue
|
||||
should be strings.
|
||||
"""
|
||||
|
||||
self.event_source = event_source
|
||||
self.subscribers = {}
|
||||
|
||||
def subscribe(self, uuid, websocket):
|
||||
"""Add a websocket to the list of subscribers.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the websocket.
|
||||
websocket: the websocket to publish information.
|
||||
"""
|
||||
|
||||
self.subscribers[uuid] = websocket
|
||||
|
||||
def unsubscribe(self, uuid):
|
||||
"""Remove a websocket from the list of subscribers.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the websocket.
|
||||
"""
|
||||
|
||||
del self.subscribers[uuid]
|
||||
|
||||
async def publish(self):
|
||||
"""Publish new events to the subscribers."""
|
||||
|
||||
while True:
|
||||
event = await self.event_source.get()
|
||||
str_buffer = []
|
||||
|
||||
if event == POISON_PILL:
|
||||
return
|
||||
|
||||
if isinstance(event, str):
|
||||
str_buffer.append(event)
|
||||
|
||||
elif event.type == EventTypes.BLOCK_VALID:
|
||||
str_buffer = map(json.dumps, eventify_block(event.data))
|
||||
|
||||
for str_item in str_buffer:
|
||||
for _, websocket in self.subscribers.items():
|
||||
await websocket.send_str(str_item)
|
||||
|
||||
|
||||
async def websocket_handler(request):
|
||||
async def websocket_tx_handler(request):
|
||||
"""Handle a new socket connection."""
|
||||
|
||||
logger.debug('New websocket connection.')
|
||||
websocket = web.WebSocketResponse()
|
||||
logger.debug('New TX websocket connection.')
|
||||
websocket = aiohttp.web.WebSocketResponse()
|
||||
await websocket.prepare(request)
|
||||
uuid = uuid4()
|
||||
request.app['dispatcher'].subscribe(uuid, websocket)
|
||||
request.app['tx_dispatcher'].subscribe(uuid, websocket)
|
||||
|
||||
while True:
|
||||
# Consume input buffer
|
||||
@ -143,25 +74,59 @@ async def websocket_handler(request):
|
||||
logger.debug('Websocket exception: %s', websocket.exception())
|
||||
break
|
||||
|
||||
request.app['dispatcher'].unsubscribe(uuid)
|
||||
request.app['tx_dispatcher'].unsubscribe(uuid)
|
||||
return websocket
|
||||
|
||||
|
||||
def init_app(event_source, *, loop=None):
|
||||
async def websocket_blk_handler(request):
|
||||
"""Handle a new socket connection."""
|
||||
|
||||
logger.debug('New BLK websocket connection.')
|
||||
websocket = aiohttp.web.WebSocketResponse()
|
||||
await websocket.prepare(request)
|
||||
uuid = uuid4()
|
||||
request.app['blk_dispatcher'].subscribe(uuid, websocket)
|
||||
|
||||
while True:
|
||||
# Consume input buffer
|
||||
try:
|
||||
msg = await websocket.receive()
|
||||
except RuntimeError as e:
|
||||
logger.debug('Websocket exception: %s', str(e))
|
||||
break
|
||||
except CancelledError:
|
||||
logger.debug('Websocket closed')
|
||||
break
|
||||
if msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
logger.debug('Websocket closed')
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
logger.debug('Websocket exception: %s', websocket.exception())
|
||||
break
|
||||
|
||||
request.app['blk_dispatcher'].unsubscribe(uuid)
|
||||
return websocket
|
||||
|
||||
|
||||
def init_app(tx_source, blk_source, *, loop=None):
|
||||
"""Init the application server.
|
||||
|
||||
Return:
|
||||
An aiohttp application.
|
||||
"""
|
||||
|
||||
dispatcher = Dispatcher(event_source)
|
||||
blk_dispatcher = Dispatcher(blk_source, 'blk')
|
||||
tx_dispatcher = Dispatcher(tx_source, 'tx')
|
||||
|
||||
# Schedule the dispatcher
|
||||
loop.create_task(dispatcher.publish())
|
||||
loop.create_task(blk_dispatcher.publish(), name='blk')
|
||||
loop.create_task(tx_dispatcher.publish(), name='tx')
|
||||
|
||||
app = web.Application(loop=loop)
|
||||
app['dispatcher'] = dispatcher
|
||||
app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
|
||||
app = aiohttp.web.Application(loop=loop)
|
||||
app['tx_dispatcher'] = tx_dispatcher
|
||||
app['blk_dispatcher'] = blk_dispatcher
|
||||
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
|
||||
app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler)
|
||||
return app
|
||||
|
||||
|
||||
@ -171,14 +136,16 @@ def start(sync_event_source, loop=None):
|
||||
if not loop:
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
event_source = asyncio.Queue(loop=loop)
|
||||
tx_source = asyncio.Queue(loop=loop)
|
||||
blk_source = asyncio.Queue(loop=loop)
|
||||
|
||||
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
|
||||
args=(sync_event_source, event_source, loop),
|
||||
args=(sync_event_source, tx_source, blk_source, loop),
|
||||
daemon=True)
|
||||
bridge.start()
|
||||
|
||||
app = init_app(event_source, loop=loop)
|
||||
app = init_app(tx_source, blk_source, loop=loop)
|
||||
aiohttp.web.run_app(app,
|
||||
host=Config().get()['wsserver']['host'],
|
||||
port=Config().get()['wsserver']['port'])
|
||||
host=config['wsserver']['host'],
|
||||
port=config['wsserver']['port'],
|
||||
loop=loop)
|
||||
|
||||
4
setup.py
4
setup.py
@ -93,7 +93,9 @@ install_requires = [
|
||||
'pyyaml==5.4.1',
|
||||
'requests==2.25.1',
|
||||
'setproctitle==1.2.2',
|
||||
'werkzeug==2.0.3'
|
||||
'werkzeug==2.0.3',
|
||||
'nest-asyncio==1.5.5'
|
||||
|
||||
]
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
|
||||
|
||||
#
|
||||
@ -27,5 +27,5 @@ from planetmint.backend.connection import Connection
|
||||
|
||||
@pytest.fixture
|
||||
def db_conn():
|
||||
conn = Connection()
|
||||
conn = connect()
|
||||
return conn
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
|
||||
# Code is Apache-2.0 and docs are CC-BY-4.0
|
||||
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
|
||||
def _check_spaces_by_list(conn, space_names):
|
||||
|
||||
@ -8,9 +8,9 @@ import pytest
|
||||
|
||||
def test_get_connection_raises_a_configuration_error(monkeypatch):
|
||||
from planetmint.transactions.common.exceptions import ConfigurationError
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
with pytest.raises(ConfigurationError):
|
||||
Connection('msaccess', 'localhost', '1337', 'mydb')
|
||||
connect('msaccess', 'localhost', '1337', 'mydb')
|
||||
|
||||
with pytest.raises(ConfigurationError):
|
||||
# We need to force a misconfiguration here
|
||||
@ -18,4 +18,4 @@ def test_get_connection_raises_a_configuration_error(monkeypatch):
|
||||
{'catsandra':
|
||||
'planetmint.backend.meowmeow.Catsandra'})
|
||||
|
||||
Connection('catsandra', 'localhost', '1337', 'mydb')
|
||||
connect('catsandra', 'localhost', '1337', 'mydb')
|
||||
|
||||
@ -93,9 +93,9 @@ def test__run_init(mocker):
|
||||
init_db_mock = mocker.patch(
|
||||
'planetmint.backend.tarantool.connection.TarantoolDB.init_database')
|
||||
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
|
||||
conn = Connection()
|
||||
conn = connect()
|
||||
conn.init_database()
|
||||
|
||||
init_db_mock.assert_called_once_with()
|
||||
|
||||
@ -18,8 +18,8 @@ import codecs
|
||||
from collections import namedtuple
|
||||
from logging import getLogger
|
||||
from logging.config import dictConfig
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.connection import connect
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
import pytest
|
||||
# from pymongo import MongoClient
|
||||
@ -391,8 +391,8 @@ def db_name(db_config):
|
||||
|
||||
@pytest.fixture
|
||||
def db_conn():
|
||||
from planetmint.backend import Connection
|
||||
return Connection()
|
||||
from planetmint.backend import connect
|
||||
return connect()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -541,7 +541,7 @@ def unspent_outputs(unspent_output_0, unspent_output_1, unspent_output_2):
|
||||
|
||||
@pytest.fixture
|
||||
def tarantool_client(db_context): # TODO Here add TarantoolConnectionClass
|
||||
return TarantoolDB(host=db_context.host, port=db_context.port)
|
||||
return TarantoolDBConnection(host=db_context.host, port=db_context.port)
|
||||
|
||||
|
||||
# @pytest.fixture
|
||||
|
||||
@ -47,13 +47,13 @@ class TestBigchainApi(object):
|
||||
def test_double_inclusion(self, b, alice):
|
||||
from planetmint.backend.exceptions import OperationError
|
||||
from tarantool.error import DatabaseError
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
tx = Create.generate([alice.public_key], [([alice.public_key], 1)])
|
||||
tx = tx.sign([alice.private_key])
|
||||
|
||||
b.store_bulk_transactions([tx])
|
||||
if isinstance(b.connection, TarantoolDB):
|
||||
if isinstance(b.connection, TarantoolDBConnection):
|
||||
with pytest.raises(DatabaseError):
|
||||
b.store_bulk_transactions([tx])
|
||||
else:
|
||||
@ -61,9 +61,9 @@ class TestBigchainApi(object):
|
||||
b.store_bulk_transactions([tx])
|
||||
|
||||
def test_text_search(self, b, alice):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
if isinstance(b.connection, TarantoolDB):
|
||||
if isinstance(b.connection, TarantoolDBConnection):
|
||||
warnings.warn(" :::::: This function is used only with :::::: ")
|
||||
return
|
||||
|
||||
|
||||
@ -93,7 +93,7 @@ def test_filter_unspent_outputs(b, user_pk, user_sk):
|
||||
|
||||
def test_outputs_query_key_order(b, user_pk, user_sk, user2_pk, user2_sk):
|
||||
from planetmint import backend
|
||||
from planetmint.backend.connection import Connection
|
||||
from planetmint.backend.connection import connect
|
||||
from planetmint.backend import query
|
||||
|
||||
tx1 = Create.generate([user_pk],
|
||||
@ -119,7 +119,7 @@ def test_outputs_query_key_order(b, user_pk, user_sk, user2_pk, user2_sk):
|
||||
|
||||
# clean the transaction, metdata and asset collection
|
||||
# conn = connect()
|
||||
connection = Connection()
|
||||
connection = connect()
|
||||
# conn.run(conn.collection('transactions').delete_many({}))
|
||||
# conn.run(conn.collection('metadata').delete_many({}))
|
||||
# conn.run(conn.collection('assets').delete_many({}))
|
||||
|
||||
@ -28,10 +28,10 @@ from planetmint.lib import Block
|
||||
def test_asset_is_separated_from_transaciton(b):
|
||||
import copy
|
||||
from planetmint.transactions.common.crypto import generate_key_pair
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
|
||||
|
||||
if isinstance(b.connection, TarantoolDB):
|
||||
if isinstance(b.connection, TarantoolDBConnection):
|
||||
pytest.skip("This specific function is skipped because, assets are stored differently if using Tarantool")
|
||||
|
||||
|
||||
@ -172,14 +172,14 @@ def test_update_utxoset(b, signed_create_tx, signed_transfer_tx, db_conn):
|
||||
@pytest.mark.bdb
|
||||
def test_store_transaction(mocker, b, signed_create_tx,
|
||||
signed_transfer_tx, db_context):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
mocked_store_asset = mocker.patch('planetmint.backend.query.store_assets')
|
||||
mocked_store_metadata = mocker.patch(
|
||||
'planetmint.backend.query.store_metadatas')
|
||||
mocked_store_transaction = mocker.patch(
|
||||
'planetmint.backend.query.store_transactions')
|
||||
b.store_bulk_transactions([signed_create_tx])
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
mongo_client = MongoClient(host=db_context.host, port=db_context.port)
|
||||
utxoset = mongo_client[db_context.name]['utxos']
|
||||
assert utxoset.count_documents({}) == 1
|
||||
@ -209,7 +209,7 @@ def test_store_transaction(mocker, b, signed_create_tx,
|
||||
mocked_store_metadata.reset_mock()
|
||||
mocked_store_transaction.reset_mock()
|
||||
b.store_bulk_transactions([signed_transfer_tx])
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert utxoset.count_documents({}) == 1
|
||||
utxo = utxoset.find_one()
|
||||
assert utxo['transaction_id'] == signed_transfer_tx.id
|
||||
@ -219,7 +219,7 @@ def test_store_transaction(mocker, b, signed_create_tx,
|
||||
b.connection,
|
||||
[{'id': signed_transfer_tx.id, 'metadata': signed_transfer_tx.metadata}],
|
||||
)
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
mocked_store_transaction.assert_called_once_with(
|
||||
b.connection,
|
||||
[{k: v for k, v in signed_transfer_tx.to_dict().items()
|
||||
@ -230,7 +230,7 @@ def test_store_transaction(mocker, b, signed_create_tx,
|
||||
@pytest.mark.bdb
|
||||
def test_store_bulk_transaction(mocker, b, signed_create_tx,
|
||||
signed_transfer_tx, db_context):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
mocked_store_assets = mocker.patch(
|
||||
'planetmint.backend.query.store_assets')
|
||||
mocked_store_metadata = mocker.patch(
|
||||
@ -238,14 +238,14 @@ def test_store_bulk_transaction(mocker, b, signed_create_tx,
|
||||
mocked_store_transactions = mocker.patch(
|
||||
'planetmint.backend.query.store_transactions')
|
||||
b.store_bulk_transactions((signed_create_tx,))
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
mongo_client = MongoClient(host=db_context.host, port=db_context.port)
|
||||
utxoset = mongo_client[db_context.name]['utxos']
|
||||
assert utxoset.count_documents({}) == 1
|
||||
utxo = utxoset.find_one()
|
||||
assert utxo['transaction_id'] == signed_create_tx.id
|
||||
assert utxo['output_index'] == 0
|
||||
if isinstance(b.connection, TarantoolDB):
|
||||
if isinstance(b.connection, TarantoolDBConnection):
|
||||
mocked_store_assets.assert_called_once_with(
|
||||
b.connection, # signed_create_tx.asset['data'] this was before
|
||||
[(signed_create_tx.asset, signed_create_tx.id, signed_create_tx.id)],
|
||||
@ -268,7 +268,7 @@ def test_store_bulk_transaction(mocker, b, signed_create_tx,
|
||||
mocked_store_metadata.reset_mock()
|
||||
mocked_store_transactions.reset_mock()
|
||||
b.store_bulk_transactions((signed_transfer_tx,))
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert utxoset.count_documents({}) == 1
|
||||
utxo = utxoset.find_one()
|
||||
assert utxo['transaction_id'] == signed_transfer_tx.id
|
||||
@ -279,7 +279,7 @@ def test_store_bulk_transaction(mocker, b, signed_create_tx,
|
||||
[{'id': signed_transfer_tx.id,
|
||||
'metadata': signed_transfer_tx.metadata}],
|
||||
)
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
mocked_store_transactions.assert_called_once_with(
|
||||
b.connection,
|
||||
[{k: v for k, v in signed_transfer_tx.to_dict().items()
|
||||
@ -304,10 +304,10 @@ def test_delete_zero_unspent_outputs(b, utxoset):
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_delete_one_unspent_outputs(b, utxoset):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
unspent_outputs, utxo_collection = utxoset
|
||||
delete_res = b.delete_unspent_outputs(unspent_outputs[0])
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert len(list(delete_res)) == 1
|
||||
assert utxo_collection.count_documents(
|
||||
{'$or': [
|
||||
@ -328,10 +328,10 @@ def test_delete_one_unspent_outputs(b, utxoset):
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_delete_many_unspent_outputs(b, utxoset):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
unspent_outputs, utxo_collection = utxoset
|
||||
delete_res = b.delete_unspent_outputs(*unspent_outputs[::2])
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert len(list(delete_res)) == 2
|
||||
assert utxo_collection.count_documents(
|
||||
{'$or': [
|
||||
@ -360,9 +360,9 @@ def test_store_zero_unspent_output(b, utxo_collection):
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_store_one_unspent_output(b, unspent_output_1, utxo_collection):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
res = b.store_unspent_outputs(unspent_output_1)
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert res.acknowledged
|
||||
assert len(list(res)) == 1
|
||||
assert utxo_collection.count_documents(
|
||||
@ -378,9 +378,9 @@ def test_store_one_unspent_output(b, unspent_output_1, utxo_collection):
|
||||
|
||||
@pytest.mark.bdb
|
||||
def test_store_many_unspent_outputs(b, unspent_outputs, utxo_collection):
|
||||
from planetmint.backend.tarantool.connection import TarantoolDB
|
||||
from planetmint.backend.tarantool.connection import TarantoolDBConnection
|
||||
res = b.store_unspent_outputs(*unspent_outputs)
|
||||
if not isinstance(b.connection, TarantoolDB):
|
||||
if not isinstance(b.connection, TarantoolDBConnection):
|
||||
assert res.acknowledged
|
||||
assert len(list(res)) == 3
|
||||
assert utxo_collection.count_documents(
|
||||
|
||||
@ -65,7 +65,7 @@ def test_bigchain_class_default_initialization(config):
|
||||
|
||||
def test_bigchain_class_initialization_with_parameters():
|
||||
from planetmint import Planetmint
|
||||
from planetmint.backend import Connection
|
||||
from planetmint.backend import connect
|
||||
from planetmint.validation import BaseValidationRules
|
||||
init_db_kwargs = {
|
||||
'backend': 'localmongodb',
|
||||
@ -73,7 +73,7 @@ def test_bigchain_class_initialization_with_parameters():
|
||||
'port': 12345,
|
||||
'name': 'this_is_the_db_name',
|
||||
}
|
||||
connection = Connection(**init_db_kwargs)
|
||||
connection = connect(**init_db_kwargs)
|
||||
planet = Planetmint(connection=connection)
|
||||
assert planet.connection == connection
|
||||
assert planet.connection.host == init_db_kwargs['host']
|
||||
|
||||
@ -22,6 +22,8 @@ def test_api_root_endpoint(client, wsserver_base_url):
|
||||
'outputs': '/api/v1/outputs/',
|
||||
'streams': '{}/api/v1/streams/valid_transactions'.format(
|
||||
wsserver_base_url),
|
||||
'streamedblocks': '{}/api/v1/streams/valid_blocks'.format(
|
||||
wsserver_base_url),
|
||||
'metadata': '/api/v1/metadata/',
|
||||
'validators': '/api/v1/validators',
|
||||
}
|
||||
@ -45,6 +47,8 @@ def test_api_v1_endpoint(client, wsserver_base_url):
|
||||
'outputs': '/outputs/',
|
||||
'streams': '{}/api/v1/streams/valid_transactions'.format(
|
||||
wsserver_base_url),
|
||||
'streamedblocks': '{}/api/v1/streams/valid_blocks'.format(
|
||||
wsserver_base_url),
|
||||
'metadata': '/metadata/',
|
||||
'validators': '/validators'
|
||||
}
|
||||
|
||||
@ -7,13 +7,12 @@ import asyncio
|
||||
import json
|
||||
import queue
|
||||
import threading
|
||||
from unittest.mock import patch
|
||||
# from unittest.mock import patch
|
||||
from planetmint.transactions.types.assets.create import Create
|
||||
from planetmint.transactions.types.assets.transfer import Transfer
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class MockWebSocket:
|
||||
def __init__(self):
|
||||
self.received = []
|
||||
@ -23,7 +22,7 @@ class MockWebSocket:
|
||||
|
||||
|
||||
def test_eventify_block_works_with_any_transaction():
|
||||
from planetmint.web.websocket_server import eventify_block
|
||||
from planetmint.web.websocket_dispatcher import Dispatcher
|
||||
from planetmint.transactions.common.crypto import generate_key_pair
|
||||
|
||||
alice = generate_key_pair()
|
||||
@ -51,18 +50,42 @@ def test_eventify_block_works_with_any_transaction():
|
||||
'transaction_id': tx_transfer.id
|
||||
}]
|
||||
|
||||
for event, expected in zip(eventify_block(block), expected_events):
|
||||
for event, expected in zip(Dispatcher.eventify_block(block), expected_events):
|
||||
assert event == expected
|
||||
|
||||
def test_simplified_block_works():
|
||||
from planetmint.web.websocket_dispatcher import Dispatcher
|
||||
from planetmint.transactions.common.crypto import generate_key_pair
|
||||
|
||||
async def test_bridge_sync_async_queue(loop):
|
||||
alice = generate_key_pair()
|
||||
|
||||
tx = Create.generate([alice.public_key],
|
||||
[([alice.public_key], 1)])\
|
||||
.sign([alice.private_key])
|
||||
tx_transfer = Transfer.generate(tx.to_inputs(),
|
||||
[([alice.public_key], 1)],
|
||||
asset_id=tx.id)\
|
||||
.sign([alice.private_key])
|
||||
|
||||
block = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
|
||||
'transactions': [tx, tx_transfer]}
|
||||
|
||||
expected_event = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
|
||||
'transaction_ids': [tx.id, tx_transfer.id]}
|
||||
|
||||
blk_event = Dispatcher.simplified_block(block)
|
||||
assert blk_event == expected_event
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bridge_sync_async_queue(event_loop):
|
||||
from planetmint.web.websocket_server import _multiprocessing_to_asyncio
|
||||
|
||||
sync_queue = queue.Queue()
|
||||
async_queue = asyncio.Queue(loop=loop)
|
||||
async_queue = asyncio.Queue(loop=event_loop)
|
||||
async_queue2 = asyncio.Queue(loop=event_loop)
|
||||
|
||||
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
|
||||
args=(sync_queue, async_queue, loop),
|
||||
args=(sync_queue, async_queue, async_queue2, event_loop),
|
||||
daemon=True)
|
||||
bridge.start()
|
||||
|
||||
@ -86,44 +109,107 @@ async def test_bridge_sync_async_queue(loop):
|
||||
print(f" queue ({async_queue.qsize()}): {async_queue} ")
|
||||
assert async_queue.qsize() == 0
|
||||
|
||||
# TODO: fix the test and uncomment it
|
||||
# @patch('threading.Thread')
|
||||
# @patch('aiohttp.web.run_app')
|
||||
# @patch('planetmint.web.websocket_server.init_app')
|
||||
# @patch('asyncio.get_event_loop', return_value='event-loop')
|
||||
# @patch('asyncio.Queue', return_value='event-queue')
|
||||
# def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock,
|
||||
# init_app_mock, run_app_mock,
|
||||
# thread_mock):
|
||||
# from planetmint import config
|
||||
# from planetmint.web.websocket_server import start, _multiprocessing_to_asyncio
|
||||
#
|
||||
# start(None)
|
||||
# #thread_mock.assert_called_once_with(
|
||||
# # target=_multiprocessing_to_asyncio,
|
||||
# # args=(None, queue_mock.return_value, queue_mock.return_value, get_event_loop_mock.return_value),
|
||||
# # daemon=True,
|
||||
# #)
|
||||
# thread_mock.return_value.start.assert_called_once_with()
|
||||
# init_app_mock.assert_called_with('event-queue', 'event-queue', loop='event-loop')
|
||||
# run_app_mock.assert_called_once_with(
|
||||
# init_app_mock.return_value,
|
||||
# host=config['wsserver']['host'],
|
||||
# port=config['wsserver']['port'],
|
||||
# )
|
||||
|
||||
@patch('threading.Thread')
|
||||
@patch('aiohttp.web.run_app')
|
||||
@patch('planetmint.web.websocket_server.init_app')
|
||||
@patch('asyncio.get_event_loop', return_value='event-loop')
|
||||
@patch('asyncio.Queue', return_value='event-queue')
|
||||
def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock,
|
||||
init_app_mock, run_app_mock,
|
||||
thread_mock):
|
||||
from planetmint.config import Config
|
||||
from planetmint.web.websocket_server import start, _multiprocessing_to_asyncio
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_block_event(aiohttp_client, event_loop):
|
||||
from planetmint import events
|
||||
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT_BLOCKS
|
||||
from planetmint.transactions.common import crypto
|
||||
|
||||
start(None)
|
||||
thread_mock.assert_called_once_with(
|
||||
target=_multiprocessing_to_asyncio,
|
||||
args=(None, queue_mock.return_value, get_event_loop_mock.return_value),
|
||||
daemon=True,
|
||||
)
|
||||
thread_mock.return_value.start.assert_called_once_with()
|
||||
init_app_mock.assert_called_with('event-queue', loop='event-loop')
|
||||
run_app_mock.assert_called_once_with(
|
||||
init_app_mock.return_value,
|
||||
host=Config().get()['wsserver']['host'],
|
||||
port=Config().get()['wsserver']['port'],
|
||||
)
|
||||
user_priv, user_pub = crypto.generate_key_pair()
|
||||
tx = Create.generate([user_pub], [([user_pub], 1)])
|
||||
tx = tx.sign([user_priv])
|
||||
|
||||
blk_source = asyncio.Queue(loop=event_loop)
|
||||
tx_source = asyncio.Queue(loop=event_loop)
|
||||
app = init_app(tx_source, blk_source, loop=event_loop)
|
||||
client = await aiohttp_client(app)
|
||||
ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS)
|
||||
block = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
|
||||
'transactions': [tx]}
|
||||
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
||||
|
||||
await blk_source.put(block_event)
|
||||
|
||||
result = await ws.receive()
|
||||
json_result = json.loads(result.data)
|
||||
assert json_result['height'] == block['height']
|
||||
assert json_result['hash'] == block['hash']
|
||||
assert len(json_result['transaction_ids']) == 1
|
||||
assert json_result['transaction_ids'][0] == tx.id
|
||||
|
||||
await blk_source.put(events.POISON_PILL)
|
||||
|
||||
|
||||
async def test_websocket_string_event(test_client, loop):
|
||||
from planetmint.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_transaction_event(aiohttp_client, event_loop):
|
||||
from planetmint import events
|
||||
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT
|
||||
from planetmint.transactions.common import crypto
|
||||
|
||||
event_source = asyncio.Queue(loop=loop)
|
||||
app = init_app(event_source, loop=loop)
|
||||
client = await test_client(app)
|
||||
user_priv, user_pub = crypto.generate_key_pair()
|
||||
tx = Create.generate([user_pub], [([user_pub], 1)])
|
||||
tx = tx.sign([user_priv])
|
||||
|
||||
blk_source = asyncio.Queue(loop=event_loop)
|
||||
tx_source = asyncio.Queue(loop=event_loop)
|
||||
app = init_app(tx_source, blk_source, loop=event_loop)
|
||||
client = await aiohttp_client(app)
|
||||
ws = await client.ws_connect(EVENTS_ENDPOINT)
|
||||
block = {'height': 1, 'transactions': [tx]}
|
||||
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
||||
|
||||
await tx_source.put(block_event)
|
||||
|
||||
for tx in block['transactions']:
|
||||
result = await ws.receive()
|
||||
json_result = json.loads(result.data)
|
||||
assert json_result['transaction_id'] == tx.id
|
||||
# Since the transactions are all CREATEs, asset id == transaction id
|
||||
assert json_result['asset_id'] == tx.id
|
||||
assert json_result['height'] == block['height']
|
||||
|
||||
await tx_source.put(events.POISON_PILL)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websocket_string_event(aiohttp_client, event_loop):
|
||||
from planetmint.events import POISON_PILL
|
||||
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT
|
||||
|
||||
blk_source = asyncio.Queue(loop=event_loop)
|
||||
tx_source = asyncio.Queue(loop=event_loop)
|
||||
app = init_app(tx_source, blk_source, loop=event_loop)
|
||||
client = await aiohttp_client(app)
|
||||
ws = await client.ws_connect(EVENTS_ENDPOINT)
|
||||
|
||||
await event_source.put('hack')
|
||||
await event_source.put('the')
|
||||
await event_source.put('planet!')
|
||||
await tx_source.put('hack')
|
||||
await tx_source.put('the')
|
||||
await tx_source.put('planet!')
|
||||
|
||||
result = await ws.receive()
|
||||
assert result.data == 'hack'
|
||||
@ -134,36 +220,7 @@ async def test_websocket_string_event(test_client, loop):
|
||||
result = await ws.receive()
|
||||
assert result.data == 'planet!'
|
||||
|
||||
await event_source.put(POISON_PILL)
|
||||
|
||||
|
||||
async def test_websocket_block_event( test_client, loop):
|
||||
from planetmint import events
|
||||
from planetmint.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
|
||||
from planetmint.transactions.common import crypto
|
||||
|
||||
user_priv, user_pub = crypto.generate_key_pair()
|
||||
tx = Create.generate([user_pub], [([user_pub], 1)])
|
||||
tx = tx.sign([user_priv])
|
||||
|
||||
event_source = asyncio.Queue(loop=loop)
|
||||
app = init_app(event_source, loop=loop)
|
||||
client = await test_client(app)
|
||||
ws = await client.ws_connect(EVENTS_ENDPOINT)
|
||||
block = {'height': 1, 'transactions': [tx]}
|
||||
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
|
||||
|
||||
await event_source.put(block_event)
|
||||
|
||||
for tx in block['transactions']:
|
||||
result = await ws.receive()
|
||||
json_result = json.loads(result.data)
|
||||
assert json_result['transaction_id'] == tx.id
|
||||
# Since the transactions are all CREATEs, asset id == transaction id
|
||||
assert json_result['asset_id'] == tx.id
|
||||
assert json_result['height'] == block['height']
|
||||
|
||||
await event_source.put(POISON_PILL)
|
||||
await tx_source.put(POISON_PILL)
|
||||
|
||||
|
||||
@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user