working websockets

Signed-off-by: Jürgen Eckel <juergen@riddleandcode.com>
This commit is contained in:
Jürgen Eckel 2023-03-31 14:20:31 +02:00
parent 8071f007fa
commit ed51655317
No known key found for this signature in database
2 changed files with 93 additions and 70 deletions

View File

@ -6,6 +6,8 @@
import json
import logging
import asyncio
from planetmint.ipc.events import EventTypes
from planetmint.ipc.events import POISON_PILL
@ -19,15 +21,13 @@ class Dispatcher:
This class implements a simple publish/subscribe pattern.
"""
def __init__(self, event_source, type="tx"):
def __init__(self, type="tx"):
"""Create a new instance.
Args:
event_source: a source of events. Elements in the queue
should be strings.
type: a string identifier.
"""
self.event_source = event_source
self.subscribers = {}
self.type = type
@ -56,6 +56,18 @@ class Dispatcher:
for tx in block["transactions"]:
txids.append(tx.id)
return {"height": block["height"], "hash": block["hash"], "transaction_ids": txids}
@staticmethod
def get_queue_on_demand(app, queue_name:str):
if queue_name not in app:
logging.debug(f"creating queue: {queue_name}")
get_loop = asyncio.get_event_loop()
run_loop = asyncio.get_running_loop()
logging.debug(f"get loop: {get_loop}")
logging.debug(f"run loop: {run_loop}")
app[queue_name] = asyncio.Queue( loop=get_loop)
return app[queue_name]
@staticmethod
def eventify_block(block):
@ -70,29 +82,36 @@ class Dispatcher:
asset_ids = [tx.id]
yield {"height": block["height"], "asset_ids": asset_ids, "transaction_id": tx.id}
async def publish(self):
async def publish(self, app):
"""Publish new events to the subscribers."""
try:
while True:
event = await self.event_source.get()
str_buffer = []
if event == POISON_PILL:
logger.debug(f"DISPATCHER CALLED : {self.type}")
#try:
while True:
if self.type == "tx":
event = await Dispatcher.get_queue_on_demand( app, "tx_source").get()
elif self.type == "blk":
event = await Dispatcher.get_queue_on_demand( app, "blk_source").get()
str_buffer = []
if not event:
continue
if event == POISON_PILL:
return
logger.debug(f"DISPATCHER ELEMENT : {event}")
if isinstance(event, str):
str_buffer.append(event)
elif event.type == EventTypes.BLOCK_VALID:
if self.type == "tx":
str_buffer = map(json.dumps, self.eventify_block(event.data))
elif self.type == "blk":
str_buffer = [json.dumps(self.simplified_block(event.data))]
else:
return
if isinstance(event, str):
str_buffer.append(event)
elif event.type == EventTypes.BLOCK_VALID:
if self.type == "tx":
str_buffer = map(json.dumps, self.eventify_block(event.data))
elif self.type == "blk":
str_buffer = [json.dumps(self.simplified_block(event.data))]
else:
return
for str_item in str_buffer:
for _, websocket in self.subscribers.items():
await websocket.send_str(str_item)
except Exception as e:
logger.debug(f"Dispatcher Exception: {e}")
pass
for str_item in str_buffer:
for _, websocket in self.subscribers.items():
await websocket.send_str(str_item)
#except Exception as e:
# logger.debug(f"Dispatcher Exception: {e}")
# pass

View File

@ -26,36 +26,39 @@ from uuid import uuid4
from concurrent.futures import CancelledError
from planetmint.config import Config
from planetmint.web.websocket_dispatcher import Dispatcher
from asyncer import asyncify
#from asyncer import asyncify
logger = logging.getLogger(__name__)
EVENTS_ENDPOINT = "/api/v1/streams/valid_transactions"
EVENTS_ENDPOINT_BLOCKS = "/api/v1/streams/valid_blocks"
async def access_queue():
global app
async def access_queue(app):
#global app
in_queue = app["event_source"]
tx_source = app["tx_source"]
blk_source = app["blk_source"]
tx_source = Dispatcher.get_queue_on_demand(app,"tx_source" )
blk_source = Dispatcher.get_queue_on_demand(app,"blk_source" )
logger.debug(f"REROUTING CALLED")
try:
while True:
try:
item = in_queue.get_nowait()
logger.debug(f"REROUTING: {item}")
tx_source.put(item)
blk_source.put(item)
except multiprocessing.Queue.Empty:
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.debug(f"REROUTING : {i}")
if not in_queue.empty():
item = in_queue.get_nowait()
logger.debug(f"REROUTING: {item}")
await tx_source.put(item)
await blk_source.put(item)
else:
await asyncio.sleep(1)
except Exception as e:
logger.debug(f"REROUTING wait exception : {e}")
raise e #await asyncio.sleep(1)
except asyncio.CancelledError as e:
logger.debug(f"REROUTING Cancelled : {e}")
pass
except Exception as e:
logger.debug(f"REROUTING Exception: {i}")
finally:
return
#except Exception as e:
# logger.debug(f"REROUTING Exception: {e}")
#finally:
# return
@ -164,12 +167,12 @@ async def websocket_blk_handler(request):
async def start_background_tasks(app):
blk_dispatcher = app["blk_dispatcher"]
app["task1"] = asyncio.create_task(blk_dispatcher.publish(), name="blk")
app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk")
tx_dispatcher = app["tx_dispatcher"]
app["task2"] = asyncio.create_task(tx_dispatcher.publish(), name="tx")
app["task2"] = asyncio.create_task(tx_dispatcher.publish(app), name="tx")
app["task3"] = asyncio.create_task(access_queue(), name="router")
app["task3"] = asyncio.create_task(access_queue(app), name="router")
async def cleanup_background_tasks(app):
@ -182,18 +185,18 @@ async def cleanup_background_tasks(app):
async def start_cleanup_all_background_tasks(app):
blk_dispatcher = app["blk_dispatcher"]
app["task1"] = asyncio.create_task(blk_dispatcher.publish(), name="blk")
app["task1"] = asyncio.create_task(blk_dispatcher.publish(app), name="blk")
tx_dispatcher = app["tx_dispatcher"]
app["task2"] = asyncio.create_task(tx_dispatcher.publish(), name="tx")
app["task2"] = asyncio.create_task(tx_dispatcher.publish(app), name="tx")
app["task3"] = asyncio.create_task(access_queue(), name="router")
app["task3"] = asyncio.create_task(access_queue(app), name="router")
#yield
#app["task1"].cancel()
#app["task2"].cancel()
#app["task3"].cancel()
app["task1"].cancel()
app["task2"].cancel()
app["task3"].cancel()
await app["task3"]
await app["task1"]
await app["task2"]
@ -231,25 +234,24 @@ async def start_cleanup_all_background_tasks(app):
global app
async def init_app():
def init_app(sync_event_source):
"""Create and start the WebSocket server."""
global app
global event_src
#global event_src
app = aiohttp.web.Application()
# queue definition
tx_source = asyncio.Queue()
blk_source = asyncio.Queue()
app["tx_source"] = tx_source
app["blk_source"] = blk_source
#tx_source = asyncio.Queue()
#blk_source = asyncio.Queue()
#app["tx_source"] = tx_source
#app["blk_source"] = blk_source
app["event_source"] = event_src
app["event_source"] = sync_event_source
#dispatchers
blk_dispatcher = Dispatcher(blk_source, "blk")
tx_dispatcher = Dispatcher(tx_source, "tx")
app["tx_dispatcher"] = tx_dispatcher
app["blk_dispatcher"] = blk_dispatcher
app["tx_dispatcher"] = Dispatcher("tx")
app["blk_dispatcher"] = Dispatcher("blk")
# routes
app.router.add_get(EVENTS_ENDPOINT, websocket_tx_handler)
@ -259,8 +261,9 @@ async def init_app():
#app.on_startup.append(start_background_tasks)
app.on_startup.append( start_background_tasks )
#app.cleanup_ctx.append(cleanup_background_tasks)
app.on_startup.append( start_cleanup_all_background_tasks )
#app.cleanup_ctx.append(background_task_route_dispatcher)
#app.cleanup_ctx.append(background_task_tx_dispatcher)
#app.cleanup_ctx.append(background_task_blk_dispatcher)
@ -279,7 +282,8 @@ async def init_app():
global event_src
def start(sync_event_source):
global event_src
event_src=sync_event_source
app = asyncio.run(init_app())
#global event_src
#event_src=sync_event_source
#app = asyncio.run(init_app(sync_event_source))
app = init_app(sync_event_source)
aiohttp.web.run_app(app, host=Config().get()["wsserver"]["host"], port=Config().get()["wsserver"]["port"])