diff --git a/planetmint/web/websocket_server.py b/planetmint/web/websocket_server.py index 4314872..b195a92 100644 --- a/planetmint/web/websocket_server.py +++ b/planetmint/web/websocket_server.py @@ -18,8 +18,9 @@ import asyncio import logging -import threading +#import threading import aiohttp +import multiprocessing from uuid import uuid4 from concurrent.futures import CancelledError @@ -32,24 +33,53 @@ EVENTS_ENDPOINT = "/api/v1/streams/valid_transactions" EVENTS_ENDPOINT_BLOCKS = "/api/v1/streams/valid_blocks" -def reroute_message(app): - """Bridge between a synchronous multiprocessing queue - and an asynchronous asyncio queue. - Args: - in_queue (multiprocessing.Queue): input queue - out_queue (asyncio.Queue): output queue - """ +async def access_queue(): + global app in_queue = app["event_source"] tx_source = app["tx_source"] blk_source = app["blk_source"] - loop = app["loop"] - while True: - value = in_queue.get() - # tx_source.put( value) - # blk_source.put( value) - loop.call_soon_threadsafe(tx_source.put_nowait, value) - loop.call_soon_threadsafe(blk_source.put_nowait, value) + try: + while True: + try: + item = in_queue.get_nowait() + logger.debug(f"REROUTING: {item}") + tx_source.put(item) + blk_source.put(item) + + except multiprocessing.Queue.Empty: + await asyncio.sleep(1) + except asyncio.CancelledError: + logger.debug(f"REROUTING : {i}") + pass + except Exception as e: + logger.debug(f"REROUTING Exception: {i}") + finally: + return + + + +#async def reroute_message(app): +# """Bridge between a synchronous multiprocessing queue +# and an asynchronous asyncio queue. +# +# Args: +# in_queue (multiprocessing.Queue): input queue +# out_queue (asyncio.Queue): output queue +# """ +# in_queue = app["event_source"] +# tx_source = app["tx_source"] +# blk_source = app["blk_source"] +# #loop = app["loop"] +# i = 0 +# while True: +# i = i + 1 +# #value = await asyncify( in_queue.get() ) +# #logger.debug(f"REROUTING: {i}") +# #tx_source.put( value) +# #blk_source.put( value) +# #loop.call_soon_threadsafe(tx_source.put_nowait, value) +# #loop.call_soon_threadsafe(blk_source.put_nowait, value) def _multiprocessing_to_asyncio(in_queue, out_queue1, out_queue2, loop): @@ -62,10 +92,11 @@ def _multiprocessing_to_asyncio(in_queue, out_queue1, out_queue2, loop): """ while True: - value = in_queue.get() + value = in_queue.get_nowait() logger.debug(f"REROUTING: {value}") - asyncio.call_soon_threadsafe(out_queue1.put_nowait, value) - asyncio.call_soon_threadsafe(out_queue2.put_nowait, value) + logger.debug(f"REROUTING: {loop}") + loop.call_soon_threadsafe(out_queue1.put_nowait, value) + loop.call_soon_threadsafe(out_queue2.put_nowait, value) async def websocket_tx_handler(request): @@ -130,47 +161,125 @@ async def websocket_blk_handler(request): return websocket -async def init_app(app, tx_source, blk_source): - """Init the application server. - Return: - An aiohttp application. - """ +async def start_background_tasks(app): + blk_dispatcher = app["blk_dispatcher"] + app["task1"] = asyncio.create_task(blk_dispatcher.publish(), name="blk") + tx_dispatcher = app["tx_dispatcher"] + app["task2"] = asyncio.create_task(tx_dispatcher.publish(), name="tx") + + app["task3"] = asyncio.create_task(access_queue(), name="router") + + +async def cleanup_background_tasks(app): + app["task1"].cancel() + app["task2"].cancel() + app["task3"].cancel() + await app["task3"] + await app["task1"] + await app["task2"] + +async def start_cleanup_all_background_tasks(app): + blk_dispatcher = app["blk_dispatcher"] + app["task1"] = asyncio.create_task(blk_dispatcher.publish(), name="blk") + + tx_dispatcher = app["tx_dispatcher"] + app["task2"] = asyncio.create_task(tx_dispatcher.publish(), name="tx") + + app["task3"] = asyncio.create_task(access_queue(), name="router") + + #yield + + #app["task1"].cancel() + #app["task2"].cancel() + #app["task3"].cancel() + await app["task3"] + await app["task1"] + await app["task2"] + + + #await asyncio.gather(app["task1"], app["task2"], + # app["task3"], + # return_exceptions=True) + +#async def background_task_blk_dispatcher(app): +# blk_dispatcher = app["blk_dispatcher"] +# app["blk_bkg_task"] = asyncio.create_task(blk_dispatcher.publish(), name="blk") +# +# yield +# +# app["blk_bkg_task"].cancel() +# await app["blk_bkg_task"] +# +#async def background_task_tx_dispatcher(app): +# tx_dispatcher = app["tx_dispatcher"] +# app["tx_bkg_task"] = asyncio.create_task(tx_dispatcher.publish(), name="tx") +# +# yield +# +# app["tx_bkg_task"].cancel() +# await app["tx_bkg_task"] +# +#async def background_task_route_dispatcher(app): +# app["route_bkg_task"] = asyncio.create_task(access_queue(), name="router") +# +# yield +# +# app["route_bkg_task"].cancel() +# await app["route_bkg_task"] + +global app + +async def init_app(): + """Create and start the WebSocket server.""" + global app + global event_src + app = aiohttp.web.Application() + + # queue definition + tx_source = asyncio.Queue() + blk_source = asyncio.Queue() + app["tx_source"] = tx_source + app["blk_source"] = blk_source + + app["event_source"] = event_src + + #dispatchers blk_dispatcher = Dispatcher(blk_source, "blk") tx_dispatcher = Dispatcher(tx_source, "tx") - - # Schedule the dispatcher - asyncio.create_task(blk_dispatcher.publish(), name="blk") - asyncio.create_task(tx_dispatcher.publish(), name="tx") - app["tx_dispatcher"] = tx_dispatcher app["blk_dispatcher"] = blk_dispatcher + + # routes app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler) app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler) - app["tx_source"] = tx_source - app["blk_source"] = blk_source - # app["loop"]=loop + + + #app.on_startup.append(start_background_tasks) + #app.cleanup_ctx.append(cleanup_background_tasks) + app.on_startup.append( start_cleanup_all_background_tasks ) + #app.cleanup_ctx.append(background_task_route_dispatcher) + #app.cleanup_ctx.append(background_task_tx_dispatcher) + #app.cleanup_ctx.append(background_task_blk_dispatcher) + + #app = init_app(app, tx_source, blk_source) + + #bridge = threading.Thread( + # target=_multiprocessing_to_asyncio, + # args=(sync_event_source, tx_source, blk_source, asyncio.get_event_loop()), + # daemon=True, + #) + #bridge.start() return app - + + +global event_src def start(sync_event_source): - """Create and start the WebSocket server.""" - - tx_source = asyncio.Queue() - blk_source = asyncio.Queue() - - app = aiohttp.web.Application() - app["event_source"] = sync_event_source - app = init_app(app, tx_source, blk_source) - - bridge = threading.Thread( - target=_multiprocessing_to_asyncio, - args=(sync_event_source, tx_source, blk_source, asyncio.get_event_loop()), - daemon=True, - ) - bridge.start() - - aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"]) + global event_src + event_src=sync_event_source + app = asyncio.run(init_app()) + aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"]) \ No newline at end of file