diff --git a/planetmint/web/websocket_server.py b/planetmint/web/websocket_server.py index 0aceff8..8890383 100644 --- a/planetmint/web/websocket_server.py +++ b/planetmint/web/websocket_server.py @@ -107,7 +107,7 @@ async def websocket_blk_handler(request): return websocket -def init_app(tx_source, blk_source, *, loop=None): +def init_app(tx_source, blk_source, loop=None): """Init the application server. Return: @@ -117,11 +117,10 @@ def init_app(tx_source, blk_source, *, loop=None): blk_dispatcher = Dispatcher(blk_source, "blk") tx_dispatcher = Dispatcher(tx_source, "tx") - # Schedule the dispatcher - loop.create_task(blk_dispatcher.publish(), name="blk") - loop.create_task(tx_dispatcher.publish(), name="tx") + asyncio.get_running_loop().create_task(blk_dispatcher.publish(), name="blk") + asyncio.get_running_loop().create_task(tx_dispatcher.publish(), name="tx") - app = aiohttp.web.Application(loop=loop) + app = aiohttp.web.Application() app["tx_dispatcher"] = tx_dispatcher app["blk_dispatcher"] = blk_dispatcher app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler) @@ -133,17 +132,17 @@ def start(sync_event_source, loop=None): """Create and start the WebSocket server.""" if not loop: - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() - tx_source = asyncio.Queue(loop=loop) - blk_source = asyncio.Queue(loop=loop) + tx_source = asyncio.Queue() + blk_source = asyncio.Queue() bridge = threading.Thread( target=_multiprocessing_to_asyncio, args=(sync_event_source, tx_source, blk_source, loop), daemon=True ) bridge.start() - app = init_app(tx_source, blk_source, loop=loop) + app = init_app(tx_source, blk_source, loop) aiohttp.web.run_app( app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"], loop=loop ) diff --git a/tests/web/test_websocket_server.py b/tests/web/test_websocket_server.py index 26f9323..9806f86 100644 --- a/tests/web/test_websocket_server.py +++ b/tests/web/test_websocket_server.py @@ -80,8 +80,8 @@ async def test_bridge_sync_async_queue(event_loop): from planetmint.web.websocket_server import _multiprocessing_to_asyncio sync_queue = queue.Queue() - async_queue = asyncio.Queue(loop=event_loop) - async_queue2 = asyncio.Queue(loop=event_loop) + async_queue = asyncio.Queue() + async_queue2 = asyncio.Queue() bridge = threading.Thread( target=_multiprocessing_to_asyncio, args=(sync_queue, async_queue, async_queue2, event_loop), daemon=True @@ -142,8 +142,8 @@ async def test_websocket_block_event(aiohttp_client, event_loop): tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) - blk_source = asyncio.Queue(loop=event_loop) - tx_source = asyncio.Queue(loop=event_loop) + blk_source = asyncio.Queue() + tx_source = asyncio.Queue() app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS) @@ -172,8 +172,8 @@ async def test_websocket_transaction_event(aiohttp_client, event_loop): tx = Create.generate([user_pub], [([user_pub], 1)]) tx = tx.sign([user_priv]) - blk_source = asyncio.Queue(loop=event_loop) - tx_source = asyncio.Queue(loop=event_loop) + blk_source = asyncio.Queue() + tx_source = asyncio.Queue() app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT) @@ -198,8 +198,8 @@ async def test_websocket_string_event(aiohttp_client, event_loop): from planetmint.ipc.events import POISON_PILL from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT - blk_source = asyncio.Queue(loop=event_loop) - tx_source = asyncio.Queue(loop=event_loop) + blk_source = asyncio.Queue() + tx_source = asyncio.Queue() app = init_app(tx_source, blk_source, loop=event_loop) client = await aiohttp_client(app) ws = await client.ws_connect(EVENTS_ENDPOINT)