# Copyright © 2020 Interplanetary Database Association e.V., # Planetmint and IPDB software contributors. # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # Code is Apache-2.0 and docs are CC-BY-4.0 import aiohttp import asyncio import json import queue import threading import pytest # from unittest.mock import patch from transactions.types.assets.create import Create from transactions.types.assets.transfer import Transfer from transactions.common import crypto from planetmint.ipc import events from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT, EVENTS_ENDPOINT_BLOCKS from ipld import multihash, marshal class MockWebSocket: def __init__(self): self.received = [] def send_str(self, s): self.received.append(s) def test_eventify_block_works_with_any_transaction(): from planetmint.web.websocket_dispatcher import Dispatcher from transactions.common.crypto import generate_key_pair alice = generate_key_pair() tx = Create.generate([alice.public_key], [([alice.public_key], 1)]).sign([alice.private_key]) tx_transfer = Transfer.generate(tx.to_inputs(), [([alice.public_key], 1)], asset_ids=[tx.id]).sign( [alice.private_key] ) block = {"height": 1, "transactions": [tx, tx_transfer]} expected_events = [ {"height": 1, "asset_ids": [tx.id], "transaction_id": tx.id}, {"height": 1, "asset_ids": [tx_transfer.assets[0]["id"]], "transaction_id": tx_transfer.id}, ] for event, expected in zip(Dispatcher.eventify_block(block), expected_events): assert event == expected def test_simplified_block_works(): from planetmint.web.websocket_dispatcher import Dispatcher from transactions.common.crypto import generate_key_pair alice = generate_key_pair() tx = Create.generate([alice.public_key], [([alice.public_key], 1)]).sign([alice.private_key]) tx_transfer = Transfer.generate(tx.to_inputs(), [([alice.public_key], 1)], asset_ids=[tx.id]).sign( [alice.private_key] ) block = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transactions": [tx, tx_transfer], } expected_event = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transaction_ids": [tx.id, tx_transfer.id], } blk_event = Dispatcher.simplified_block(block) assert blk_event == expected_event @pytest.mark.asyncio async def test_bridge_sync_async_queue(): from planetmint.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() async_queue = asyncio.Queue() async_queue2 = asyncio.Queue() bridge = threading.Thread( target=_multiprocessing_to_asyncio, args=(sync_queue, async_queue, async_queue2, asyncio.get_event_loop()), daemon=True, ) bridge.start() sync_queue.put("fahren") sync_queue.put("auf") sync_queue.put("der") sync_queue.put("Autobahn") result = await async_queue.get() assert result == "fahren" result = await async_queue.get() assert result == "auf" result = await async_queue.get() assert result == "der" result = await async_queue.get() assert result == "Autobahn" print(f" queue ({async_queue.qsize()}): {async_queue} ") assert async_queue.qsize() == 0 @pytest.mark.asyncio async def test_websocket_block_event(aiohttp_client): user_priv, user_pub = crypto.generate_key_pair() tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) app = aiohttp.web.Application() blk_source = asyncio.Queue() tx_source = asyncio.Queue() app = await init_app(app, tx_source, blk_source) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS) block = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transactions": [tx], } block_event = events.Event(events.EventTypes.BLOCK_VALID, block) await blk_source.put(block_event) result = await ws.receive() json_result = json.loads(result.data) assert json_result["height"] == block["height"] assert json_result["hash"] == block["hash"] assert len(json_result["transaction_ids"]) == 1 assert json_result["transaction_ids"][0] == tx.id await blk_source.put(events.POISON_PILL) @pytest.mark.asyncio async def test_websocket_transaction_event(aiohttp_client): user_priv, user_pub = crypto.generate_key_pair() tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) app = aiohttp.web.Application() blk_source = asyncio.Queue() tx_source = asyncio.Queue() app = await init_app(app, tx_source, blk_source) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) block = {"height": 1, "transactions": [tx]} block_event = events.Event(events.EventTypes.BLOCK_VALID, block) await tx_source.put(block_event) for tx in block["transactions"]: result = await ws.receive() json_result = json.loads(result.data) assert json_result["transaction_id"] == tx.id # Since the transactions are all CREATEs, asset id == transaction id assert json_result["asset_ids"] == [tx.id] assert json_result["height"] == block["height"] await tx_source.put(events.POISON_PILL) @pytest.mark.asyncio async def test_websocket_string_event(aiohttp_client): from planetmint.ipc.events import POISON_PILL from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT app = aiohttp.web.Application() blk_source = asyncio.Queue() tx_source = asyncio.Queue() app = await init_app(app, tx_source, blk_source) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) await tx_source.put("hack") await tx_source.put("the") await tx_source.put("planet!") result = await ws.receive() assert result.data == "hack" result = await ws.receive() assert result.data == "the" result = await ws.receive() assert result.data == "planet!" await tx_source.put(POISON_PILL) @pytest.mark.skip("Processes are not stopping properly, and the whole test suite would hang") def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much # magic in the `asyncio` module: running this test without monkey-patching # `asycio.get_event_loop` (and without the `loop` fixture) raises a: # RuntimeError: There is no current event loop in thread 'MainThread'. # # That's pretty weird because this test doesn't use the pytest-aiohttp # plugin explicitely. monkeypatch.setattr("asyncio.get_event_loop", lambda: loop) import json import random import aiohttp # TODO processes does not exist anymore, when reactivating this test it # will fail because of this from planetmint import processes # Start Planetmint processes.start() loop = asyncio.get_event_loop() import time time.sleep(1) ws_url = client.get("http://localhost:9984/api/v1/").json["_links"]["streams_v1"] # Connect to the WebSocket endpoint session = aiohttp.ClientSession() ws = loop.run_until_complete(session.ws_connect(ws_url)) # Create a keypair and generate a new asset user_priv, user_pub = crypto.generate_key_pair() assets = [{"data": multihash(marshal({"random": random.random()}))}] tx = Create.generate([user_pub], [([user_pub], 1)], assets=assets) tx = tx.sign([user_priv]) # Post the transaction to the Planetmint Web API client.post("/api/v1/transactions/", data=json.dumps(tx.to_dict())) result = loop.run_until_complete(ws.receive()) json_result = json.loads(result.data) assert json_result["transaction_id"] == tx.id