blackified code

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-03-31 14:32:32 +02:00
parent 9fc3e597a5
commit 358a4ff027
No known key found for this signature in database
4 changed files with 24 additions and 22 deletions

View File

@ -7,6 +7,7 @@ from planetmint.ipc.events import EventTypes, POISON_PILL
logger = logging.getLogger(__name__)
class Exchange:
"""Dispatch events to subscribers."""
@ -76,4 +77,3 @@ class Exchange:
return
except Exception as e:
logger.debug(f"Exchange Exception: {e}")

View File

@ -111,7 +111,7 @@ class TransactionListApi(Resource):
except Exception as e:
logger.error(f"Tendermint RPC connection issue: {e}")
status_code = 500
message = { "detail": "Tendermint RPC connection error"}
message = {"detail": "Tendermint RPC connection error"}
if status_code == 202:
response = jsonify(tx)

View File

@ -56,17 +56,17 @@ class Dispatcher:
for tx in block["transactions"]:
txids.append(tx.id)
return {"height": block["height"], "hash": block["hash"], "transaction_ids": txids}
@staticmethod
def get_queue_on_demand(app, queue_name:str):
def get_queue_on_demand(app, queue_name: str):
if queue_name not in app:
logging.debug(f"creating queue: {queue_name}")
get_loop = asyncio.get_event_loop()
run_loop = asyncio.get_running_loop()
logging.debug(f"get loop: {get_loop}")
logging.debug(f"run loop: {run_loop}")
app[queue_name] = asyncio.Queue( loop=get_loop)
app[queue_name] = asyncio.Queue(loop=get_loop)
return app[queue_name]
@staticmethod
@ -87,11 +87,11 @@ class Dispatcher:
logger.debug(f"DISPATCHER CALLED : {self.type}")
while True:
if self.type == "tx":
event = await Dispatcher.get_queue_on_demand( app, "tx_source").get()
event = await Dispatcher.get_queue_on_demand(app, "tx_source").get()
elif self.type == "blk":
event = await Dispatcher.get_queue_on_demand( app, "blk_source").get()
event = await Dispatcher.get_queue_on_demand(app, "blk_source").get()
str_buffer = []
if event == POISON_PILL:
return
logger.debug(f"DISPATCHER ELEMENT : {event}")

View File

@ -32,8 +32,8 @@ EVENTS_ENDPOINT_BLOCKS = "/api/v1/streams/valid_blocks"
async def access_queue(app):
in_queue = app["event_source"]
tx_source = Dispatcher.get_queue_on_demand(app,"tx_source" )
blk_source = Dispatcher.get_queue_on_demand(app,"blk_source" )
tx_source = Dispatcher.get_queue_on_demand(app, "tx_source")
blk_source = Dispatcher.get_queue_on_demand(app, "blk_source")
logger.debug(f"REROUTING CALLED")
try:
while True:
@ -43,15 +43,16 @@ async def access_queue(app):
logger.debug(f"REROUTING: {item}")
await tx_source.put(item)
await blk_source.put(item)
else:
else:
await asyncio.sleep(1)
except Exception as e:
logger.debug(f"REROUTING wait exception : {e}")
raise e #await asyncio.sleep(1)
raise e # await asyncio.sleep(1)
except asyncio.CancelledError as e:
logger.debug(f"REROUTING Cancelled : {e}")
pass
async def websocket_tx_handler(request):
"""Handle a new socket connection."""
@ -119,27 +120,28 @@ async def start_background_tasks(app):
app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk")
tx_dispatcher = app["tx_dispatcher"]
app["task2"] = asyncio.create_task(tx_dispatcher.publish(app), name="tx")
app["task3"] = asyncio.create_task(access_queue(app), name="router")
app["task2"] = asyncio.create_task(tx_dispatcher.publish(app), name="tx")
app["task3"] = asyncio.create_task(access_queue(app), name="router")
def init_app(sync_event_source):
"""Create and start the WebSocket server."""
app = aiohttp.web.Application()
app["event_source"] = sync_event_source
#dispatchers
# dispatchers
app["tx_dispatcher"] = Dispatcher("tx")
app["blk_dispatcher"] = Dispatcher("blk")
# routes
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
app.router.add_get(EVENTS_ENDPOINT_BLOCKS, websocket_blk_handler)
app.on_startup.append( start_background_tasks )
app.on_startup.append(start_background_tasks)
return app
def start(sync_event_source):
app = init_app(sync_event_source)
aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"])
aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"])