cleaned up the code

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-03-31 14:31:40 +02:00
parent cea90dffd4
commit 9fc3e597a5
No known key found for this signature in database
2 changed files with 1 additions and 152 deletions

View File

@ -85,7 +85,6 @@ class Dispatcher:
async def publish(self, app): async def publish(self, app):
"""Publish new events to the subscribers.""" """Publish new events to the subscribers."""
logger.debug(f"DISPATCHER CALLED : {self.type}") logger.debug(f"DISPATCHER CALLED : {self.type}")
#try:
while True: while True:
if self.type == "tx": if self.type == "tx":
event = await Dispatcher.get_queue_on_demand( app, "tx_source").get() event = await Dispatcher.get_queue_on_demand( app, "tx_source").get()
@ -93,9 +92,6 @@ class Dispatcher:
event = await Dispatcher.get_queue_on_demand( app, "blk_source").get() event = await Dispatcher.get_queue_on_demand( app, "blk_source").get()
str_buffer = [] str_buffer = []
if not event:
continue
if event == POISON_PILL: if event == POISON_PILL:
return return
logger.debug(f"DISPATCHER ELEMENT : {event}") logger.debug(f"DISPATCHER ELEMENT : {event}")
@ -112,6 +108,3 @@ class Dispatcher:
for str_item in str_buffer: for str_item in str_buffer:
for _, websocket in self.subscribers.items(): for _, websocket in self.subscribers.items():
await websocket.send_str(str_item) await websocket.send_str(str_item)
#except Exception as e:
# logger.debug(f"Dispatcher Exception: {e}")
# pass

View File

@ -18,15 +18,12 @@
import asyncio import asyncio
import logging import logging
#import threading
import aiohttp import aiohttp
import multiprocessing
from uuid import uuid4 from uuid import uuid4
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from planetmint.config import Config from planetmint.config import Config
from planetmint.web.websocket_dispatcher import Dispatcher from planetmint.web.websocket_dispatcher import Dispatcher
#from asyncer import asyncify
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
EVENTS_ENDPOINT = "/api/v1/streams/valid_transactions" EVENTS_ENDPOINT = "/api/v1/streams/valid_transactions"
@ -34,7 +31,6 @@ EVENTS_ENDPOINT_BLOCKS = "/api/v1/streams/valid_blocks"
async def access_queue(app): async def access_queue(app):
#global app
in_queue = app["event_source"] in_queue = app["event_source"]
tx_source = Dispatcher.get_queue_on_demand(app,"tx_source" ) tx_source = Dispatcher.get_queue_on_demand(app,"tx_source" )
blk_source = Dispatcher.get_queue_on_demand(app,"blk_source" ) blk_source = Dispatcher.get_queue_on_demand(app,"blk_source" )
@ -55,52 +51,6 @@ async def access_queue(app):
except asyncio.CancelledError as e: except asyncio.CancelledError as e:
logger.debug(f"REROUTING Cancelled : {e}") logger.debug(f"REROUTING Cancelled : {e}")
pass pass
#except Exception as e:
# logger.debug(f"REROUTING Exception: {e}")
#finally:
# return
#async def reroute_message(app):
# """Bridge between a synchronous multiprocessing queue
# and an asynchronous asyncio queue.
#
# Args:
# in_queue (multiprocessing.Queue): input queue
# out_queue (asyncio.Queue): output queue
# """
# in_queue = app["event_source"]
# tx_source = app["tx_source"]
# blk_source = app["blk_source"]
# #loop = app["loop"]
# i = 0
# while True:
# i = i + 1
# #value = await asyncify( in_queue.get() )
# #logger.debug(f"REROUTING: {i}")
# #tx_source.put( value)
# #blk_source.put( value)
# #loop.call_soon_threadsafe(tx_source.put_nowait, value)
# #loop.call_soon_threadsafe(blk_source.put_nowait, value)
def _multiprocessing_to_asyncio(in_queue, out_queue1, out_queue2, loop):
"""Bridge between a synchronous multiprocessing queue
and an asynchronous asyncio queue.
Args:
in_queue (multiprocessing.Queue): input queue
out_queue (asyncio.Queue): output queue
"""
while True:
value = in_queue.get_nowait()
logger.debug(f"REROUTING: {value}")
logger.debug(f"REROUTING: {loop}")
loop.call_soon_threadsafe(out_queue1.put_nowait, value)
loop.call_soon_threadsafe(out_queue2.put_nowait, value)
async def websocket_tx_handler(request): async def websocket_tx_handler(request):
"""Handle a new socket connection.""" """Handle a new socket connection."""
@ -164,7 +114,6 @@ async def websocket_blk_handler(request):
return websocket return websocket
async def start_background_tasks(app): async def start_background_tasks(app):
blk_dispatcher = app["blk_dispatcher"] blk_dispatcher = app["blk_dispatcher"]
app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk") app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk")
@ -175,78 +124,9 @@ async def start_background_tasks(app):
app["task3"] = asyncio.create_task(access_queue(app), name="router") app["task3"] = asyncio.create_task(access_queue(app), name="router")
async def cleanup_background_tasks(app):
app["task1"].cancel()
app["task2"].cancel()
app["task3"].cancel()
await app["task3"]
await app["task1"]
await app["task2"]
async def start_cleanup_all_background_tasks(app):
blk_dispatcher = app["blk_dispatcher"]
app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk")
tx_dispatcher = app["tx_dispatcher"]
app["task2"] = asyncio.create_task(tx_dispatcher.publish(app), name="tx")
app["task3"] = asyncio.create_task(access_queue(app), name="router")
#yield
app["task1"].cancel()
app["task2"].cancel()
app["task3"].cancel()
await app["task3"]
await app["task1"]
await app["task2"]
#await asyncio.gather(app["task1"], app["task2"],
# app["task3"],
# return_exceptions=True)
#async def background_task_blk_dispatcher(app):
# blk_dispatcher = app["blk_dispatcher"]
# app["blk_bkg_task"] = asyncio.create_task(blk_dispatcher.publish(), name="blk")
#
# yield
#
# app["blk_bkg_task"].cancel()
# await app["blk_bkg_task"]
#
#async def background_task_tx_dispatcher(app):
# tx_dispatcher = app["tx_dispatcher"]
# app["tx_bkg_task"] = asyncio.create_task(tx_dispatcher.publish(), name="tx")
#
# yield
#
# app["tx_bkg_task"].cancel()
# await app["tx_bkg_task"]
#
#async def background_task_route_dispatcher(app):
# app["route_bkg_task"] = asyncio.create_task(access_queue(), name="router")
#
# yield
#
# app["route_bkg_task"].cancel()
# await app["route_bkg_task"]
global app
def init_app(sync_event_source): def init_app(sync_event_source):
"""Create and start the WebSocket server.""" """Create and start the WebSocket server."""
global app
#global event_src
app = aiohttp.web.Application() app = aiohttp.web.Application()
# queue definition
#tx_source = asyncio.Queue()
#blk_source = asyncio.Queue()
#app["tx_source"] = tx_source
#app["blk_source"] = blk_source
app["event_source"] = sync_event_source app["event_source"] = sync_event_source
#dispatchers #dispatchers
@ -257,33 +137,9 @@ def init_app(sync_event_source):
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler) app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler) app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler)
#app.on_startup.append(start_background_tasks)
app.on_startup.append( start_background_tasks ) app.on_startup.append( start_background_tasks )
#app.cleanup_ctx.append(cleanup_background_tasks)
#app.cleanup_ctx.append(background_task_route_dispatcher)
#app.cleanup_ctx.append(background_task_tx_dispatcher)
#app.cleanup_ctx.append(background_task_blk_dispatcher)
#app = init_app(app, tx_source, blk_source)
#bridge = threading.Thread(
# target=_multiprocessing_to_asyncio,
# args=(sync_event_source, tx_source, blk_source, asyncio.get_event_loop()),
# daemon=True,
#)
#bridge.start()
return app return app
global event_src
def start(sync_event_source): def start(sync_event_source):
#global event_src
#event_src=sync_event_source
#app = asyncio.run(init_app(sync_event_source))
app = init_app(sync_event_source) app = init_app(sync_event_source)
aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"]) aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"])