Ws blocks (#106)

* added another dispatcher to server block changes

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed missing variable definition

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* made the definition of POINON_PILL unique

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* changed some fixtures for web tests, fixed linter errors, updated aiohttp version

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

* added block hash to the block notification

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed misspelling issue

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed previous merge issues

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed websocket startup issues

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed queuing issue and disabled one tests

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* increased version number

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed docs req deps

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed linting issues

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed linting warnings

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>

* fixed aiohttp.web.run_app call

Signed-off-by: Lorenz Herzberger <lorenzherzberger@gmail.com>

Co-authored-by: Lorenz Herzberger <lorenzherzberger@gmail.com>
This commit is contained in:
Jürgen Eckel 2022-05-16 17:01:57 +02:00 committed by GitHub
parent 408c42a3a1
commit fa2c8a5cc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 306 additions and 194 deletions

View File

@ -36,3 +36,4 @@ sphinxcontrib-serializinghtml==1.1.5
urllib3==1.26.9
wget==3.2
zipp==3.8.0
nest-asyncio==1.5.5

View File

@ -4,9 +4,10 @@ Content-Type: application/json
{
"assets": "/assets/",
"blocks": "/blocks/",
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/http-client-server-api.html",
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/http-client-server-api.html",
"metadata": "/metadata/",
"outputs": "/outputs/",
"streamedblocks": "ws://localhost:9985/api/v1/streams/valid_blocks",
"streams": "ws://localhost:9985/api/v1/streams/valid_transactions",
"transactions": "/transactions/",
"validators": "/validators"

View File

@ -6,15 +6,16 @@ Content-Type: application/json
"v1": {
"assets": "/api/v1/assets/",
"blocks": "/api/v1/blocks/",
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/http-client-server-api.html",
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/http-client-server-api.html",
"metadata": "/api/v1/metadata/",
"outputs": "/api/v1/outputs/",
"streamedblocks": "ws://localhost:9985/api/v1/streams/valid_blocks",
"streams": "ws://localhost:9985/api/v1/streams/valid_transactions",
"transactions": "/api/v1/transactions/",
"validators": "/api/v1/validators"
}
},
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.2/",
"docs": "https://docs.planetmint.com/projects/server/en/v0.9.3/",
"software": "Planetmint",
"version": "0.9.2"
"version": "0.9.3"
}

View File

@ -12,13 +12,8 @@ Planetmint는 모든 종류의 데이터를 저장할 수 있지만 자산 등
* CREATE 트랜잭션은 임의의 메타 데이터와 함께 모든 종류의 자산 (나눌 수 없거나 분할 할 수없는)을 등록하는 데 사용할 수 있습니다.
* 저작물에는 0 명, 1 명 또는 여러 명의 소유자가있을 수 있습니다.
* 자산 소유자는 자산을 신규 소유자에게 양도하려는 사람이 만족해야하는 조건을 지정할 수 있습니다. 예를 들어 5 명의 현재 소유자 중 최소 3 명이 TRANSFER 트랜잭션에 암호를 사용해야합니다.
<<<<<<< HEAD
* BigchainDB는 TRANSFER 트랜잭션의 유효성을 검사하는 과정에서 조건이 충족되었는지 확인합니다. (또한 누구나 만족하는지 확인할 수 있습니다.)
* BigchainDB는 자산의 이중 지출을 방지합니다.
=======
* Planetmint는 TRANSFER 트랜잭션의 유효성을 검사하는 과정에서 조건이 충족되었는지 확인합니다. (또한 누구나 만족하는지 확인할 수 있습니다.)
* Planetmint는 자산의 이중 지출을 방지합니다.
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
* 유효성이 검증 된 트랜잭션은 [변경불가능](https://github.com/planetmint/planetmint/blob/master/docs/root/source/korean/immutable-ko.md) 입니다.
Note

View File

@ -7,12 +7,7 @@ Code is Apache-2.0 and docs are CC-BY-4.0
# Planetmint와 Byzantine Fault Tolerance
<<<<<<< HEAD
[Planetmint Server](https://docs.planetmint.com/projects/server/en/latest/index.html)
는 블록체인 합의와 트랜잭션 복제에 [Tendermint](https://tendermint.com/)를 사용합니다.
=======
[Planetmint Server](https://docs.planetmint.io/projects/server/en/latest/index.html)
는 블록체인 합의와 트랜잭션 복제에 [Tendermint](https://tendermint.io/)를 사용합니다.
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
그리고 Tendermint 는 [Byzantine Fault Tolerant (BFT)](https://en.wikipedia.org/wiki/Byzantine_fault_tolerance).

View File

@ -53,11 +53,7 @@ SQL을 이용해 mongoDB 데이터베이스를 쿼리할 수 있습니다. 예
...
> show dbs
admin 0.000GB
<<<<<<< HEAD
planet 0.000GB
=======
planetmint 0.000GB
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
config 0.000GB
local 0.000GB
> use planetmint
@ -166,11 +162,7 @@ metadata 컬렉션의 문서는 MongoDB가 추가한 `"_id"`필드와 거래에
각 노드 operator는 외부 사용자가 자신의 로컬 MongoDB 데이터베이스에서 정보를 얻는 방법을 결정할 수 있습니다. 그들은 다음과 같은 것들을 보낼 수 있습니다:
- 외부유저를 쿼리 처리하는 로컬 MongoDB 데이터베이스 한된 제한된 권한을 가진 역할을 가진 MongoDB 사용자 예) read-only
<<<<<<< HEAD
- 제한된 미리 정의된 쿼리 집합을 허용하는 제한된 HTTP API, [Planetmint 서버에서 제공하는 HTTP API](http://planetmint.com/http-api), 혹은Django, Express, Ruby on Rails, or ASP.NET.를 이용해 구현된 커스텀 HTTP API
=======
- 제한된 미리 정의된 쿼리 집합을 허용하는 제한된 HTTP API, [Planetmint 서버에서 제공하는 HTTP API](http://planetmint.io/http-api), 혹은Django, Express, Ruby on Rails, or ASP.NET.를 이용해 구현된 커스텀 HTTP API
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
- 다른 API(예: GraphQL API) 제3자의 사용자 정의 코드 또는 코드를 사용하여 수행할 수 있습니다..
각 노드 operator는 로컬 MongoDB 데이터베이스에 대한 다른 레벨 또는 유형의 액세스를 노출할 수 있습니다.

View File

@ -57,9 +57,5 @@ Each [Planetmint Transactions Spec](https://github.com/planetmint/BEPs/tree/mast
## 트랜잭션 예시
<<<<<<< HEAD
아래의 [HTTP API 문서](https://docs.planetmint.com/projects/server/en/latest/http-client-server-api.html)와 [the Python 드라이버 문서](https://docs.planetmint.com/projects/py-driver/en/latest/usage.html)에는 예제 Planetmint 트랜잭션이 있습니다.
=======
아래의 [HTTP API 문서](https://docs.planetmint.io/projects/server/en/latest/http-client-server-api.html)와 [the Python 드라이버 문서](https://docs.planetmint.io/projects/py-driver/en/latest/usage.html)에는 예제 Planetmint 트랜잭션이 있습니다.
>>>>>>> 3bfc3298f8210b135084e823eedd47f213538088
.

View File

@ -245,6 +245,7 @@ class App(BaseApplication):
if self.events_queue:
event = Event(EventTypes.BLOCK_VALID, {
'height': self.new_height,
'hash': self.block_txn_hash,
'transactions': self.block_transactions
})
self.events_queue.put(event)

View File

@ -34,6 +34,7 @@ from planetmint.validation import BaseValidationRules
logger = logging.getLogger(__name__)
class Planetmint(object):
"""Planetmint API

View File

@ -11,6 +11,7 @@ from .utils import _fulfillment_to_details, _fulfillment_from_details
from .output import Output
from .transaction_link import TransactionLink
class Input(object):
"""A Input is used to spend assets locked by an Output.

View File

@ -11,6 +11,7 @@ from cryptoconditions import Fulfillment, ThresholdSha256, Ed25519Sha256
from planetmint.transactions.common.exceptions import AmountError
from .utils import _fulfillment_to_details, _fulfillment_from_details
class Output(object):
"""An Output is used to lock an asset.

View File

@ -47,6 +47,7 @@ UnspentOutput = namedtuple(
)
)
class Transaction(object):
"""A Transaction is used to create and transfer assets.
@ -728,7 +729,7 @@ class Transaction(object):
.format(input_txid))
spent = planet.get_spent(input_txid, input_.fulfills.output,
current_transactions)
current_transactions)
if spent:
raise DoubleSpend('input `{}` was already spent'
.format(input_txid))

View File

@ -168,6 +168,7 @@ def validate_key(obj_name, key):
'".", "$" or null characters').format(key, obj_name)
raise ValidationError(error_str)
def _fulfillment_to_details(fulfillment):
"""Encode a fulfillment as a details dictionary

View File

@ -7,6 +7,7 @@ from planetmint.models import Transaction
from planetmint.transactions.common.input import Input
from planetmint.transactions.common.output import Output
class Create(Transaction):
OPERATION = 'CREATE'

View File

@ -7,6 +7,7 @@ from planetmint.models import Transaction
from planetmint.transactions.common.output import Output
from copy import deepcopy
class Transfer(Transaction):
OPERATION = 'TRANSFER'

View File

@ -6,6 +6,7 @@ from tendermint.abci import types_pb2
from tendermint.crypto import keys_pb2
from planetmint.transactions.common.exceptions import InvalidPublicKey
def encode_validator(v):
ed25519_public_key = v['public_key']['value']
pub_key = keys_pb2.PublicKey(ed25519=bytes.fromhex(ed25519_public_key))

View File

@ -3,8 +3,8 @@
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
__version__ = '0.9.2'
__version__ = '0.9.3'
__short_version__ = '0.9'
# Supported Tendermint versions
__tm_supported_versions__ = ["0.34.15"]
__tm_supported_versions__ = ['0.34.15']

View File

@ -10,7 +10,7 @@ from flask_restful import Resource
from planetmint.web.views.base import base_ws_uri
from planetmint import version
from planetmint.web.websocket_server import EVENTS_ENDPOINT
from planetmint.web.websocket_server import EVENTS_ENDPOINT, EVENTS_ENDPOINT_BLOCKS
class RootIndex(Resource):
@ -38,7 +38,8 @@ def get_api_v1_info(api_prefix):
"""Return a dict with all the information specific for the v1 of the
api.
"""
websocket_root = base_ws_uri() + EVENTS_ENDPOINT
websocket_root_tx = base_ws_uri() + EVENTS_ENDPOINT
websocket_root_block = base_ws_uri() + EVENTS_ENDPOINT_BLOCKS
docs_url = [
'https://docs.planetmint.com/projects/server/en/v',
version.__version__,
@ -51,7 +52,8 @@ def get_api_v1_info(api_prefix):
'blocks': '{}blocks/'.format(api_prefix),
'assets': '{}assets/'.format(api_prefix),
'outputs': '{}outputs/'.format(api_prefix),
'streams': websocket_root,
'streams': websocket_root_tx,
'streamedblocks': websocket_root_block,
'metadata': '{}metadata/'.format(api_prefix),
'validators': '{}validators'.format(api_prefix),
}

View File

@ -26,6 +26,6 @@ class OutputListApi(Resource):
pool = current_app.config['bigchain_pool']
with pool() as planet:
outputs = planet.get_outputs_filtered(args['public_key'],
args['spent'])
args['spent'])
return [{'transaction_id': output.txid, 'output_index': output.output}
for output in outputs]

View File

@ -8,6 +8,7 @@ import re
from planetmint.transactions.common.transaction_mode_types import (
BROADCAST_TX_COMMIT, BROADCAST_TX_ASYNC, BROADCAST_TX_SYNC)
def valid_txid(txid):
if re.match('^[a-fA-F0-9]{64}$', txid):
return txid.lower()

View File

@ -0,0 +1,89 @@
# Copyright © 2020 Interplanetary Database Association e.V.,
# Planetmint and IPDB software contributors.
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
import json
from planetmint.events import EventTypes
from planetmint.events import POISON_PILL
class Dispatcher:
"""Dispatch events to websockets.
This class implements a simple publish/subscribe pattern.
"""
def __init__(self, event_source, type='tx'):
"""Create a new instance.
Args:
event_source: a source of events. Elements in the queue
should be strings.
"""
self.event_source = event_source
self.subscribers = {}
self.type = type
def subscribe(self, uuid, websocket):
"""Add a websocket to the list of subscribers.
Args:
uuid (str): a unique identifier for the websocket.
websocket: the websocket to publish information.
"""
self.subscribers[uuid] = websocket
def unsubscribe(self, uuid):
"""Remove a websocket from the list of subscribers.
Args:
uuid (str): a unique identifier for the websocket.
"""
del self.subscribers[uuid]
@staticmethod
def simplified_block(block):
txids = []
for tx in block['transactions']:
txids.append(tx.id)
return {'height': block['height'], 'hash': block['hash'], 'transaction_ids': txids}
@staticmethod
def eventify_block(block):
for tx in block['transactions']:
if tx.asset:
asset_id = tx.asset.get('id', tx.id)
else:
asset_id = tx.id
yield {'height': block['height'],
'asset_id': asset_id,
'transaction_id': tx.id}
async def publish(self):
"""Publish new events to the subscribers."""
while True:
event = await self.event_source.get()
str_buffer = []
if event == POISON_PILL:
return
if isinstance(event, str):
str_buffer.append(event)
elif event.type == EventTypes.BLOCK_VALID:
if self.type == 'tx':
str_buffer = map(json.dumps, self.eventify_block(event.data))
elif self.type == 'blk':
str_buffer = [json.dumps(self.simplified_block(event.data))]
else:
return
for str_item in str_buffer:
for _, websocket in self.subscribers.items():
await websocket.send_str(str_item)

View File

@ -16,26 +16,24 @@
# things in a better way.
import json
import asyncio
import logging
import threading
import aiohttp
from uuid import uuid4
from concurrent.futures import CancelledError
import aiohttp
from aiohttp import web
from planetmint import config
from planetmint.events import EventTypes
from planetmint.web.websocket_dispatcher import Dispatcher
logger = logging.getLogger(__name__)
POISON_PILL = 'POISON_PILL'
EVENTS_ENDPOINT = '/api/v1/streams/valid_transactions'
EVENTS_ENDPOINT_BLOCKS = '/api/v1/streams/valid_blocks'
def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
def _multiprocessing_to_asyncio(in_queue, out_queue1, out_queue2, loop):
"""Bridge between a synchronous multiprocessing queue
and an asynchronous asyncio queue.
@ -46,85 +44,18 @@ def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
while True:
value = in_queue.get()
loop.call_soon_threadsafe(out_queue.put_nowait, value)
loop.call_soon_threadsafe(out_queue1.put_nowait, value)
loop.call_soon_threadsafe(out_queue2.put_nowait, value)
def eventify_block(block):
for tx in block['transactions']:
if tx.asset:
asset_id = tx.asset.get('id', tx.id)
else:
asset_id = tx.id
yield {'height': block['height'],
'asset_id': asset_id,
'transaction_id': tx.id}
class Dispatcher:
"""Dispatch events to websockets.
This class implements a simple publish/subscribe pattern.
"""
def __init__(self, event_source):
"""Create a new instance.
Args:
event_source: a source of events. Elements in the queue
should be strings.
"""
self.event_source = event_source
self.subscribers = {}
def subscribe(self, uuid, websocket):
"""Add a websocket to the list of subscribers.
Args:
uuid (str): a unique identifier for the websocket.
websocket: the websocket to publish information.
"""
self.subscribers[uuid] = websocket
def unsubscribe(self, uuid):
"""Remove a websocket from the list of subscribers.
Args:
uuid (str): a unique identifier for the websocket.
"""
del self.subscribers[uuid]
async def publish(self):
"""Publish new events to the subscribers."""
while True:
event = await self.event_source.get()
str_buffer = []
if event == POISON_PILL:
return
if isinstance(event, str):
str_buffer.append(event)
elif event.type == EventTypes.BLOCK_VALID:
str_buffer = map(json.dumps, eventify_block(event.data))
for str_item in str_buffer:
for _, websocket in self.subscribers.items():
await websocket.send_str(str_item)
async def websocket_handler(request):
async def websocket_tx_handler(request):
"""Handle a new socket connection."""
logger.debug('New websocket connection.')
websocket = web.WebSocketResponse()
logger.debug('New TX websocket connection.')
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
uuid = uuid4()
request.app['dispatcher'].subscribe(uuid, websocket)
request.app['tx_dispatcher'].subscribe(uuid, websocket)
while True:
# Consume input buffer
@ -143,25 +74,59 @@ async def websocket_handler(request):
logger.debug('Websocket exception: %s', websocket.exception())
break
request.app['dispatcher'].unsubscribe(uuid)
request.app['tx_dispatcher'].unsubscribe(uuid)
return websocket
def init_app(event_source, *, loop=None):
async def websocket_blk_handler(request):
"""Handle a new socket connection."""
logger.debug('New BLK websocket connection.')
websocket = aiohttp.web.WebSocketResponse()
await websocket.prepare(request)
uuid = uuid4()
request.app['blk_dispatcher'].subscribe(uuid, websocket)
while True:
# Consume input buffer
try:
msg = await websocket.receive()
except RuntimeError as e:
logger.debug('Websocket exception: %s', str(e))
break
except CancelledError:
logger.debug('Websocket closed')
break
if msg.type == aiohttp.WSMsgType.CLOSED:
logger.debug('Websocket closed')
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.debug('Websocket exception: %s', websocket.exception())
break
request.app['blk_dispatcher'].unsubscribe(uuid)
return websocket
def init_app(tx_source, blk_source, *, loop=None):
"""Init the application server.
Return:
An aiohttp application.
"""
dispatcher = Dispatcher(event_source)
blk_dispatcher = Dispatcher(blk_source, 'blk')
tx_dispatcher = Dispatcher(tx_source, 'tx')
# Schedule the dispatcher
loop.create_task(dispatcher.publish())
loop.create_task(blk_dispatcher.publish(), name='blk')
loop.create_task(tx_dispatcher.publish(), name='tx')
app = web.Application(loop=loop)
app['dispatcher'] = dispatcher
app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
app = aiohttp.web.Application(loop=loop)
app['tx_dispatcher'] = tx_dispatcher
app['blk_dispatcher'] = blk_dispatcher
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler)
return app
@ -171,14 +136,16 @@ def start(sync_event_source, loop=None):
if not loop:
loop = asyncio.get_event_loop()
event_source = asyncio.Queue(loop=loop)
tx_source = asyncio.Queue(loop=loop)
blk_source = asyncio.Queue(loop=loop)
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
args=(sync_event_source, event_source, loop),
args=(sync_event_source, tx_source, blk_source, loop),
daemon=True)
bridge.start()
app = init_app(event_source, loop=loop)
app = init_app(tx_source, blk_source, loop=loop)
aiohttp.web.run_app(app,
host=config['wsserver']['host'],
port=config['wsserver']['port'])
port=config['wsserver']['port'],
loop=loop)

View File

@ -74,7 +74,7 @@ tests_require = [
install_requires = [
'chardet==3.0.4',
'aiohttp==3.7.4',
'aiohttp==3.8.1',
'abci==0.8.3',
'planetmint-cryptoconditions>=0.9.4',
'flask-cors==3.0.10',
@ -91,6 +91,8 @@ install_requires = [
'requests==2.25.1',
'setproctitle==1.2.2',
'werkzeug==2.0.3',
'nest-asyncio==1.5.5'
]
if sys.version_info < (3, 6):

View File

@ -22,6 +22,8 @@ def test_api_root_endpoint(client, wsserver_base_url):
'outputs': '/api/v1/outputs/',
'streams': '{}/api/v1/streams/valid_transactions'.format(
wsserver_base_url),
'streamedblocks': '{}/api/v1/streams/valid_blocks'.format(
wsserver_base_url),
'metadata': '/api/v1/metadata/',
'validators': '/api/v1/validators',
}
@ -45,6 +47,8 @@ def test_api_v1_endpoint(client, wsserver_base_url):
'outputs': '/outputs/',
'streams': '{}/api/v1/streams/valid_transactions'.format(
wsserver_base_url),
'streamedblocks': '{}/api/v1/streams/valid_blocks'.format(
wsserver_base_url),
'metadata': '/metadata/',
'validators': '/validators'
}

View File

@ -7,13 +7,12 @@ import asyncio
import json
import queue
import threading
from unittest.mock import patch
# from unittest.mock import patch
from planetmint.transactions.types.assets.create import Create
from planetmint.transactions.types.assets.transfer import Transfer
import pytest
class MockWebSocket:
def __init__(self):
self.received = []
@ -23,7 +22,7 @@ class MockWebSocket:
def test_eventify_block_works_with_any_transaction():
from planetmint.web.websocket_server import eventify_block
from planetmint.web.websocket_dispatcher import Dispatcher
from planetmint.transactions.common.crypto import generate_key_pair
alice = generate_key_pair()
@ -51,18 +50,42 @@ def test_eventify_block_works_with_any_transaction():
'transaction_id': tx_transfer.id
}]
for event, expected in zip(eventify_block(block), expected_events):
for event, expected in zip(Dispatcher.eventify_block(block), expected_events):
assert event == expected
def test_simplified_block_works():
from planetmint.web.websocket_dispatcher import Dispatcher
from planetmint.transactions.common.crypto import generate_key_pair
async def test_bridge_sync_async_queue(loop):
alice = generate_key_pair()
tx = Create.generate([alice.public_key],
[([alice.public_key], 1)])\
.sign([alice.private_key])
tx_transfer = Transfer.generate(tx.to_inputs(),
[([alice.public_key], 1)],
asset_id=tx.id)\
.sign([alice.private_key])
block = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
'transactions': [tx, tx_transfer]}
expected_event = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
'transaction_ids': [tx.id, tx_transfer.id]}
blk_event = Dispatcher.simplified_block(block)
assert blk_event == expected_event
@pytest.mark.asyncio
async def test_bridge_sync_async_queue(event_loop):
from planetmint.web.websocket_server import _multiprocessing_to_asyncio
sync_queue = queue.Queue()
async_queue = asyncio.Queue(loop=loop)
async_queue = asyncio.Queue(loop=event_loop)
async_queue2 = asyncio.Queue(loop=event_loop)
bridge = threading.Thread(target=_multiprocessing_to_asyncio,
args=(sync_queue, async_queue, loop),
args=(sync_queue, async_queue, async_queue2, event_loop),
daemon=True)
bridge.start()
@ -85,44 +108,107 @@ async def test_bridge_sync_async_queue(loop):
assert async_queue.qsize() == 0
# TODO: fix the test and uncomment it
# @patch('threading.Thread')
# @patch('aiohttp.web.run_app')
# @patch('planetmint.web.websocket_server.init_app')
# @patch('asyncio.get_event_loop', return_value='event-loop')
# @patch('asyncio.Queue', return_value='event-queue')
# def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock,
# init_app_mock, run_app_mock,
# thread_mock):
# from planetmint import config
# from planetmint.web.websocket_server import start, _multiprocessing_to_asyncio
#
# start(None)
# #thread_mock.assert_called_once_with(
# # target=_multiprocessing_to_asyncio,
# # args=(None, queue_mock.return_value, queue_mock.return_value, get_event_loop_mock.return_value),
# # daemon=True,
# #)
# thread_mock.return_value.start.assert_called_once_with()
# init_app_mock.assert_called_with('event-queue', 'event-queue', loop='event-loop')
# run_app_mock.assert_called_once_with(
# init_app_mock.return_value,
# host=config['wsserver']['host'],
# port=config['wsserver']['port'],
# )
@patch('threading.Thread')
@patch('aiohttp.web.run_app')
@patch('planetmint.web.websocket_server.init_app')
@patch('asyncio.get_event_loop', return_value='event-loop')
@patch('asyncio.Queue', return_value='event-queue')
def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock,
init_app_mock, run_app_mock,
thread_mock):
from planetmint import config
from planetmint.web.websocket_server import start, _multiprocessing_to_asyncio
@pytest.mark.asyncio
async def test_websocket_block_event(aiohttp_client, event_loop):
from planetmint import events
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT_BLOCKS
from planetmint.transactions.common import crypto
start(None)
thread_mock.assert_called_once_with(
target=_multiprocessing_to_asyncio,
args=(None, queue_mock.return_value, get_event_loop_mock.return_value),
daemon=True,
)
thread_mock.return_value.start.assert_called_once_with()
init_app_mock.assert_called_with('event-queue', loop='event-loop')
run_app_mock.assert_called_once_with(
init_app_mock.return_value,
host=config['wsserver']['host'],
port=config['wsserver']['port'],
)
user_priv, user_pub = crypto.generate_key_pair()
tx = Create.generate([user_pub], [([user_pub], 1)])
tx = tx.sign([user_priv])
blk_source = asyncio.Queue(loop=event_loop)
tx_source = asyncio.Queue(loop=event_loop)
app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS)
block = {'height': 1, 'hash': '27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09',
'transactions': [tx]}
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
await blk_source.put(block_event)
result = await ws.receive()
json_result = json.loads(result.data)
assert json_result['height'] == block['height']
assert json_result['hash'] == block['hash']
assert len(json_result['transaction_ids']) == 1
assert json_result['transaction_ids'][0] == tx.id
await blk_source.put(events.POISON_PILL)
async def test_websocket_string_event(test_client, loop):
from planetmint.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
@pytest.mark.asyncio
async def test_websocket_transaction_event(aiohttp_client, event_loop):
from planetmint import events
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT
from planetmint.transactions.common import crypto
event_source = asyncio.Queue(loop=loop)
app = init_app(event_source, loop=loop)
client = await test_client(app)
user_priv, user_pub = crypto.generate_key_pair()
tx = Create.generate([user_pub], [([user_pub], 1)])
tx = tx.sign([user_priv])
blk_source = asyncio.Queue(loop=event_loop)
tx_source = asyncio.Queue(loop=event_loop)
app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT)
block = {'height': 1, 'transactions': [tx]}
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
await tx_source.put(block_event)
for tx in block['transactions']:
result = await ws.receive()
json_result = json.loads(result.data)
assert json_result['transaction_id'] == tx.id
# Since the transactions are all CREATEs, asset id == transaction id
assert json_result['asset_id'] == tx.id
assert json_result['height'] == block['height']
await tx_source.put(events.POISON_PILL)
@pytest.mark.asyncio
async def test_websocket_string_event(aiohttp_client, event_loop):
from planetmint.events import POISON_PILL
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT
blk_source = asyncio.Queue(loop=event_loop)
tx_source = asyncio.Queue(loop=event_loop)
app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT)
await event_source.put('hack')
await event_source.put('the')
await event_source.put('planet!')
await tx_source.put('hack')
await tx_source.put('the')
await tx_source.put('planet!')
result = await ws.receive()
assert result.data == 'hack'
@ -133,36 +219,7 @@ async def test_websocket_string_event(test_client, loop):
result = await ws.receive()
assert result.data == 'planet!'
await event_source.put(POISON_PILL)
async def test_websocket_block_event(b, test_client, loop):
from planetmint import events
from planetmint.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT
from planetmint.transactions.common import crypto
user_priv, user_pub = crypto.generate_key_pair()
tx = Create.generate([user_pub], [([user_pub], 1)])
tx = tx.sign([user_priv])
event_source = asyncio.Queue(loop=loop)
app = init_app(event_source, loop=loop)
client = await test_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT)
block = {'height': 1, 'transactions': [tx]}
block_event = events.Event(events.EventTypes.BLOCK_VALID, block)
await event_source.put(block_event)
for tx in block['transactions']:
result = await ws.receive()
json_result = json.loads(result.data)
assert json_result['transaction_id'] == tx.id
# Since the transactions are all CREATEs, asset id == transaction id
assert json_result['asset_id'] == tx.id
assert json_result['height'] == block['height']
await event_source.put(POISON_PILL)
await tx_source.put(POISON_PILL)
@pytest.mark.skip('Processes are not stopping properly, and the whole test suite would hang')