diff --git a/planetmint/web/websocket_server.py b/planetmint/web/websocket_server.py index 0b3c5ac..f43b814 100644 --- a/planetmint/web/websocket_server.py +++ b/planetmint/web/websocket_server.py @@ -31,6 +31,8 @@ EVENTS_ENDPOINT_BLOCKS = "/api/v1/streams/valid_blocks" async def access_queue(app): + if app["event_source"] == None: + return in_queue = app["event_source"] tx_source = Dispatcher.get_queue_on_demand(app, "tx_source") blk_source = Dispatcher.get_queue_on_demand(app, "blk_source") diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index f2700af..f769ee1 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -16,6 +16,7 @@ from transactions.common import crypto from planetmint.ipc import events from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT, EVENTS_ENDPOINT_BLOCKS from ipld import multihash, marshal +from planetmint.web.websocket_dispatcher import Dispatcher class MockWebSocket: @@ -75,52 +76,13 @@ def test_simplified_block_works(): assert blk_event == expected_event -@pytest.mark.asyncio -async def test_bridge_sync_async_queue(): - from planetmint.web.websocket_server import _multiprocessing_to_asyncio - - sync_queue = queue.Queue() - async_queue = asyncio.Queue() - async_queue2 = asyncio.Queue() - - bridge = threading.Thread( - target=_multiprocessing_to_asyncio, - args=(sync_queue, async_queue, async_queue2, asyncio.get_event_loop()), - daemon=True, - ) - bridge.start() - - sync_queue.put("fahren") - sync_queue.put("auf") - sync_queue.put("der") - sync_queue.put("Autobahn") - - result = await async_queue.get() - assert result == "fahren" - - result = await async_queue.get() - assert result == "auf" - - result = await async_queue.get() - assert result == "der" - - result = await async_queue.get() - assert result == "Autobahn" - - print(f" queue ({async_queue.qsize()}): {async_queue} ") - assert async_queue.qsize() == 0 - - @pytest.mark.asyncio async def test_websocket_block_event(aiohttp_client): user_priv, user_pub = crypto.generate_key_pair() tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) - app = aiohttp.web.Application() - blk_source = asyncio.Queue() - tx_source = asyncio.Queue() - app = await init_app(app, tx_source, blk_source) + app = init_app(None) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS) block = { @@ -129,7 +91,8 @@ async def test_websocket_block_event(aiohttp_client): "transactions": [tx], } block_event = events.Event(events.EventTypes.BLOCK_VALID, block) - + blk_source = Dispatcher.get_queue_on_demand(app, "blk_source") + tx_source = Dispatcher.get_queue_on_demand(app, "tx_source") await blk_source.put(block_event) result = await ws.receive() @@ -140,6 +103,7 @@ async def test_websocket_block_event(aiohttp_client): assert json_result["transaction_ids"][0] == tx.id await blk_source.put(events.POISON_PILL) + await tx_source.put(events.POISON_PILL) @pytest.mark.asyncio @@ -148,13 +112,12 @@ async def test_websocket_transaction_event(aiohttp_client): tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) - app = aiohttp.web.Application() - blk_source = asyncio.Queue() - tx_source = asyncio.Queue() - app = await init_app(app, tx_source, blk_source) + app = init_app(None) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) block = {"height": 1, "transactions": [tx]} + blk_source = Dispatcher.get_queue_on_demand(app, "blk_source") + tx_source = Dispatcher.get_queue_on_demand(app, "tx_source") block_event = events.Event(events.EventTypes.BLOCK_VALID, block) await tx_source.put(block_event) @@ -167,6 +130,7 @@ async def test_websocket_transaction_event(aiohttp_client): assert json_result["asset_ids"] == [tx.id] assert json_result["height"] == block["height"] + await blk_source.put(events.POISON_PILL) await tx_source.put(events.POISON_PILL) @@ -175,13 +139,13 @@ async def test_websocket_string_event(aiohttp_client): from planetmint.ipc.events import POISON_PILL from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT - app = aiohttp.web.Application() - blk_source = asyncio.Queue() - tx_source = asyncio.Queue() - app = await init_app(app, tx_source, blk_source) + app = init_app(None) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) + blk_source = Dispatcher.get_queue_on_demand(app, "blk_source") + tx_source = Dispatcher.get_queue_on_demand(app, "tx_source") + await tx_source.put("hack") await tx_source.put("the") await tx_source.put("planet!") @@ -195,7 +159,8 @@ async def test_websocket_string_event(aiohttp_client): result = await ws.receive() assert result.data == "planet!" - await tx_source.put(POISON_PILL) + await blk_source.put(events.POISON_PILL) + await tx_source.put(events.POISON_PILL) @pytest.mark.skip("Processes are not stopping properly, and the whole test suite would hang")