# Copyright © 2020 Interplanetary Database Association e.V., # Planetmint and IPDB software contributors. # SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) # Code is Apache-2.0 and docs are CC-BY-4.0 import asyncio import json import queue import threading import pytest # from unittest.mock import patch from transactions.types.assets.create import Create from transactions.types.assets.transfer import Transfer from transactions.common import crypto from planetmint.ipc import events from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT, EVENTS_ENDPOINT_BLOCKS from ipld import multihash, marshal class MockWebSocket: def __init__(self): self.received = [] def send_str(self, s): self.received.append(s) def test_eventify_block_works_with_any_transaction(): from planetmint.web.websocket_dispatcher import Dispatcher from transactions.common.crypto import generate_key_pair alice = generate_key_pair() tx = Create.generate([alice.public_key], [([alice.public_key], 1)]).sign([alice.private_key]) tx_transfer = Transfer.generate(tx.to_inputs(), [([alice.public_key], 1)], asset_ids=[tx.id]).sign( [alice.private_key] ) block = {"height": 1, "transactions": [tx, tx_transfer]} expected_events = [ {"height": 1, "asset_ids": [tx.id], "transaction_id": tx.id}, {"height": 1, "asset_ids": [tx_transfer.assets[0]["id"]], "transaction_id": tx_transfer.id}, ] for event, expected in zip(Dispatcher.eventify_block(block), expected_events): assert event == expected def test_simplified_block_works(): from planetmint.web.websocket_dispatcher import Dispatcher from transactions.common.crypto import generate_key_pair alice = generate_key_pair() tx = Create.generate([alice.public_key], [([alice.public_key], 1)]).sign([alice.private_key]) tx_transfer = Transfer.generate(tx.to_inputs(), [([alice.public_key], 1)], asset_ids=[tx.id]).sign( [alice.private_key] ) block = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transactions": [tx, tx_transfer], } expected_event = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transaction_ids": [tx.id, tx_transfer.id], } blk_event = Dispatcher.simplified_block(block) assert blk_event == expected_event @pytest.mark.asyncio async def test_bridge_sync_async_queue(event_loop): from planetmint.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() async_queue = asyncio.Queue(loop=event_loop) async_queue2 = asyncio.Queue(loop=event_loop) bridge = threading.Thread( target=_multiprocessing_to_asyncio, args=(sync_queue, async_queue, async_queue2, event_loop), daemon=True ) bridge.start() sync_queue.put("fahren") sync_queue.put("auf") sync_queue.put("der") sync_queue.put("Autobahn") result = await async_queue.get() assert result == "fahren" result = await async_queue.get() assert result == "auf" result = await async_queue.get() assert result == "der" result = await async_queue.get() assert result == "Autobahn" print(f" queue ({async_queue.qsize()}): {async_queue} ") assert async_queue.qsize() == 0 # TODO: fix the test and uncomment it # @patch('threading.Thread') # @patch('aiohttp.web.run_app') # @patch('planetmint.web.websocket_server.init_app') # @patch('asyncio.get_event_loop', return_value='event-loop') # @patch('asyncio.Queue', return_value='event-queue') # def test_start_creates_an_event_loop(queue_mock, get_event_loop_mock, # init_app_mock, run_app_mock, # thread_mock): # from planetmint import config # from planetmint.web.websocket_server import start, _multiprocessing_to_asyncio # # start(None) # #thread_mock.assert_called_once_with( # # target=_multiprocessing_to_asyncio, # # args=(None, queue_mock.return_value, queue_mock.return_value, get_event_loop_mock.return_value), # # daemon=True, # #) # thread_mock.return_value.start.assert_called_once_with() # init_app_mock.assert_called_with('event-queue', 'event-queue', loop='event-loop') # run_app_mock.assert_called_once_with( # init_app_mock.return_value, # host=config['wsserver']['host'], # port=config['wsserver']['port'], # ) @pytest.mark.asyncio async def test_websocket_block_event(aiohttp_client, event_loop): user_priv, user_pub = crypto.generate_key_pair() tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) blk_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue(loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS) block = { "height": 1, "hash": "27E2D48AFA5E4B7FF26AA9C84B5CFCA2A670DBD297740053C0D177EB18962B09", "transactions": [tx], } block_event = events.Event(events.EventTypes.BLOCK_VALID, block) await blk_source.put(block_event) result = await ws.receive() json_result = json.loads(result.data) assert json_result["height"] == block["height"] assert json_result["hash"] == block["hash"] assert len(json_result["transaction_ids"]) == 1 assert json_result["transaction_ids"][0] == tx.id await blk_source.put(events.POISON_PILL) @pytest.mark.asyncio async def test_websocket_transaction_event(aiohttp_client, event_loop): user_priv, user_pub = crypto.generate_key_pair() tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) blk_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue(loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) block = {"height": 1, "transactions": [tx]} block_event = events.Event(events.EventTypes.BLOCK_VALID, block) await tx_source.put(block_event) for tx in block["transactions"]: result = await ws.receive() json_result = json.loads(result.data) assert json_result["transaction_id"] == tx.id # Since the transactions are all CREATEs, asset id == transaction id assert json_result["asset_ids"] == [tx.id] assert json_result["height"] == block["height"] await tx_source.put(events.POISON_PILL) @pytest.mark.asyncio async def test_websocket_string_event(aiohttp_client, event_loop): from planetmint.ipc.events import POISON_PILL from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT blk_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue(loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) await tx_source.put("hack") await tx_source.put("the") await tx_source.put("planet!") result = await ws.receive() assert result.data == "hack" result = await ws.receive() assert result.data == "the" result = await ws.receive() assert result.data == "planet!" await tx_source.put(POISON_PILL) @pytest.mark.skip("Processes are not stopping properly, and the whole test suite would hang") def test_integration_from_webapi_to_websocket(monkeypatch, client, loop): # XXX: I think that the `pytest-aiohttp` plugin is sparkling too much # magic in the `asyncio` module: running this test without monkey-patching # `asycio.get_event_loop` (and without the `loop` fixture) raises a: # RuntimeError: There is no current event loop in thread 'MainThread'. # # That's pretty weird because this test doesn't use the pytest-aiohttp # plugin explicitely. monkeypatch.setattr("asyncio.get_event_loop", lambda: loop) import json import random import aiohttp # TODO processes does not exist anymore, when reactivating this test it # will fail because of this from planetmint import processes # Start Planetmint processes.start() loop = asyncio.get_event_loop() import time time.sleep(1) ws_url = client.get("http://localhost:9984/api/v1/").json["_links"]["streams_v1"] # Connect to the WebSocket endpoint session = aiohttp.ClientSession() ws = loop.run_until_complete(session.ws_connect(ws_url)) # Create a keypair and generate a new asset user_priv, user_pub = crypto.generate_key_pair() assets = [{"data": multihash(marshal({"random": random.random()}))}] tx = Create.generate([user_pub], [([user_pub], 1)], assets=assets) tx = tx.sign([user_priv]) # Post the transaction to the Planetmint Web API client.post("/api/v1/transactions/", data=json.dumps(tx.to_dict())) result = loop.run_until_complete(ws.receive()) json_result = json.loads(result.data) assert json_result["transaction_id"] == tx.id