Temorray commit for removing loop value

This commit is contained in:
marjan mastaliparsa 2023-03-23 14:14:19 +01:00
parent 08ce10ab1f
commit 02a8fa214c
2 changed files with 16 additions and 17 deletions

View File

@ -107,7 +107,7 @@ async def websocket_blk_handler(request):
return websocket return websocket
def init_app(tx_source, blk_source, *, loop=None): def init_app(tx_source, blk_source, loop=None):
"""Init the application server. """Init the application server.
Return: Return:
@ -117,11 +117,10 @@ def init_app(tx_source, blk_source, *, loop=None):
blk_dispatcher = Dispatcher(blk_source, "blk") blk_dispatcher = Dispatcher(blk_source, "blk")
tx_dispatcher = Dispatcher(tx_source, "tx") tx_dispatcher = Dispatcher(tx_source, "tx")
# Schedule the dispatcher asyncio.get_running_loop().create_task(blk_dispatcher.publish(), name="blk")
loop.create_task(blk_dispatcher.publish(), name="blk") asyncio.get_running_loop().create_task(tx_dispatcher.publish(), name="tx")
loop.create_task(tx_dispatcher.publish(), name="tx")
app = aiohttp.web.Application(loop=loop) app = aiohttp.web.Application()
app["tx_dispatcher"] = tx_dispatcher app["tx_dispatcher"] = tx_dispatcher
app["blk_dispatcher"] = blk_dispatcher app["blk_dispatcher"] = blk_dispatcher
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler) app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
@ -133,17 +132,17 @@ def start(sync_event_source, loop=None):
"""Create and start the WebSocket server.""" """Create and start the WebSocket server."""
if not loop: if not loop:
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
tx_source = asyncio.Queue(loop=loop) tx_source = asyncio.Queue()
blk_source = asyncio.Queue(loop=loop) blk_source = asyncio.Queue()
bridge = threading.Thread( bridge = threading.Thread(
target=_multiprocessing_to_asyncio, args=(sync_event_source, tx_source, blk_source, loop), daemon=True target=_multiprocessing_to_asyncio, args=(sync_event_source, tx_source, blk_source, loop), daemon=True
) )
bridge.start() bridge.start()
app = init_app(tx_source, blk_source, loop=loop) app = init_app(tx_source, blk_source, loop)
aiohttp.web.run_app( aiohttp.web.run_app(
app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"], loop=loop app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"], loop=loop
) )

View File

@ -80,8 +80,8 @@ async def test_bridge_sync_async_queue(event_loop):
from planetmint.web.websocket_server import _multiprocessing_to_asyncio from planetmint.web.websocket_server import _multiprocessing_to_asyncio
sync_queue = queue.Queue() sync_queue = queue.Queue()
async_queue = asyncio.Queue(loop=event_loop) async_queue = asyncio.Queue()
async_queue2 = asyncio.Queue(loop=event_loop) async_queue2 = asyncio.Queue()
bridge = threading.Thread( bridge = threading.Thread(
target=_multiprocessing_to_asyncio, args=(sync_queue, async_queue, async_queue2, event_loop), daemon=True target=_multiprocessing_to_asyncio, args=(sync_queue, async_queue, async_queue2, event_loop), daemon=True
@ -142,8 +142,8 @@ async def test_websocket_block_event(aiohttp_client, event_loop):
tx = Create.generate([user_pub], [([user_pub], 1)]) tx = Create.generate([user_pub], [([user_pub], 1)])
tx = tx.sign([user_priv]) tx = tx.sign([user_priv])
blk_source = asyncio.Queue(loop=event_loop) blk_source = asyncio.Queue()
tx_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue()
app = init_app(tx_source, blk_source, loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app) client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS) ws = await client.ws_connect(EVENTS_ENDPOINT_BLOCKS)
@ -172,8 +172,8 @@ async def test_websocket_transaction_event(aiohttp_client, event_loop):
tx = Create.generate([user_pub], [([user_pub], 1)]) tx = Create.generate([user_pub], [([user_pub], 1)])
tx = tx.sign([user_priv]) tx = tx.sign([user_priv])
blk_source = asyncio.Queue(loop=event_loop) blk_source = asyncio.Queue()
tx_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue()
app = init_app(tx_source, blk_source, loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app) client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT) ws = await client.ws_connect(EVENTS_ENDPOINT)
@ -198,8 +198,8 @@ async def test_websocket_string_event(aiohttp_client, event_loop):
from planetmint.ipc.events import POISON_PILL from planetmint.ipc.events import POISON_PILL
from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT from planetmint.web.websocket_server import init_app, EVENTS_ENDPOINT
blk_source = asyncio.Queue(loop=event_loop) blk_source = asyncio.Queue()
tx_source = asyncio.Queue(loop=event_loop) tx_source = asyncio.Queue()
app = init_app(tx_source, blk_source, loop=event_loop) app = init_app(tx_source, blk_source, loop=event_loop)
client = await aiohttp_client(app) client = await aiohttp_client(app)
ws = await client.ws_connect(EVENTS_ENDPOINT) ws = await client.ws_connect(EVENTS_ENDPOINT)