mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
feat: Add support for WebSocketSubscription2021
This commit is contained in:
@@ -2,7 +2,7 @@ export const CONTEXT_ACTIVITYSTREAMS = 'https://www.w3.org/ns/activitystreams';
|
||||
export const CONTEXT_NOTIFICATION = 'https://www.w3.org/ns/solid/notification/v1';
|
||||
|
||||
/**
|
||||
* The minimal expected fields for a Notification
|
||||
* The minimally expected fields for a Notification
|
||||
* as defined in https://solidproject.org/TR/notifications-protocol#notification-data-model.
|
||||
*/
|
||||
export interface Notification {
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
import type { WebSocket } from 'ws';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import type { SetMultiMap } from '../../../util/map/SetMultiMap';
|
||||
import { readableToString } from '../../../util/StreamUtil';
|
||||
import { NotificationEmitter } from '../NotificationEmitter';
|
||||
import type { NotificationEmitterInput } from '../NotificationEmitter';
|
||||
|
||||
/**
|
||||
* Emits notifications on WebSocketSubscription2021 subscription.
|
||||
* Uses the WebSockets found in the provided map.
|
||||
* The key should be the identifier of the matching subscription.
|
||||
*/
|
||||
export class WebSocket2021Emitter extends NotificationEmitter {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly socketMap: SetMultiMap<string, WebSocket>;
|
||||
|
||||
public constructor(socketMap: SetMultiMap<string, WebSocket>) {
|
||||
super();
|
||||
|
||||
this.socketMap = socketMap;
|
||||
}
|
||||
|
||||
public async handle({ info, representation }: NotificationEmitterInput): Promise<void> {
|
||||
// Called as a NotificationEmitter: emit the notification
|
||||
const webSockets = this.socketMap.get(info.id);
|
||||
if (webSockets) {
|
||||
const data = await readableToString(representation.data);
|
||||
for (const webSocket of webSockets) {
|
||||
webSocket.send(data);
|
||||
}
|
||||
} else {
|
||||
representation.data.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
import type { WebSocket } from 'ws';
|
||||
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
|
||||
import type { SubscriptionInfo } from '../SubscriptionStorage';
|
||||
|
||||
export interface WebSocket2021HandlerInput {
|
||||
info: SubscriptionInfo;
|
||||
webSocket: WebSocket;
|
||||
}
|
||||
|
||||
/**
|
||||
* A handler that is called when a valid WebSocketSubscription2021 connection has been made.
|
||||
*/
|
||||
export abstract class WebSocket2021Handler extends AsyncHandler<WebSocket2021HandlerInput> {}
|
||||
@@ -0,0 +1,56 @@
|
||||
import type { IncomingMessage } from 'http';
|
||||
import type { WebSocket } from 'ws';
|
||||
import type { InteractionRoute } from '../../../identity/interaction/routing/InteractionRoute';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { WebSocketServerConfigurator } from '../../WebSocketServerConfigurator';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { WebSocket2021Handler } from './WebSocket2021Handler';
|
||||
|
||||
/**
|
||||
* Listens for WebSocket connections and verifies if they are valid WebSocketSubscription2021 connections,
|
||||
* in which case its {@link WebSocket2021Handler} will be alerted.
|
||||
*/
|
||||
export class WebSocket2021Listener extends WebSocketServerConfigurator {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly handler: WebSocket2021Handler;
|
||||
private readonly path: string;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, handler: WebSocket2021Handler, route: InteractionRoute) {
|
||||
super();
|
||||
this.storage = storage;
|
||||
this.handler = handler;
|
||||
this.path = new URL(route.getPath()).pathname;
|
||||
}
|
||||
|
||||
protected async handleConnection(webSocket: WebSocket, upgradeRequest: IncomingMessage): Promise<void> {
|
||||
// Base doesn't matter since we just want the path and query parameter
|
||||
const { pathname, searchParams } = new URL(upgradeRequest.url ?? '', 'http://example.com');
|
||||
|
||||
if (pathname !== this.path) {
|
||||
webSocket.send('Unknown WebSocket target.');
|
||||
return webSocket.close();
|
||||
}
|
||||
|
||||
const auth = searchParams.get('auth');
|
||||
|
||||
if (!auth) {
|
||||
webSocket.send('Missing auth parameter from WebSocket URL.');
|
||||
return webSocket.close();
|
||||
}
|
||||
|
||||
const id = decodeURI(auth);
|
||||
const info = await this.storage.get(id);
|
||||
|
||||
if (!info) {
|
||||
// Info not being there implies it has expired
|
||||
webSocket.send(`Subscription has expired`);
|
||||
return webSocket.close();
|
||||
}
|
||||
|
||||
this.logger.info(`Accepted WebSocket connection listening to changes on ${info.topic}`);
|
||||
|
||||
await this.handler.handleSafe({ info, webSocket });
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
import type { WebSocket } from 'ws';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import type { SetMultiMap } from '../../../util/map/SetMultiMap';
|
||||
import { setSafeInterval } from '../../../util/TimerUtil';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { WebSocket2021HandlerInput } from './WebSocket2021Handler';
|
||||
import { WebSocket2021Handler } from './WebSocket2021Handler';
|
||||
|
||||
/**
|
||||
* Keeps track of the WebSockets that were opened for a WebSocketSubscription2021 subscription.
|
||||
* The WebSockets are stored in the map using the identifier of the matching subscription.
|
||||
*
|
||||
* `cleanupTimer` defines in minutes how often the stored WebSockets are closed
|
||||
* if their corresponding subscription has expired.
|
||||
* Defaults to 60 minutes.
|
||||
* Open WebSockets will not receive notifications if their subscription expired.
|
||||
*/
|
||||
export class WebSocket2021Storer extends WebSocket2021Handler {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly socketMap: SetMultiMap<string, WebSocket>;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, socketMap: SetMultiMap<string, WebSocket>, cleanupTimer = 60) {
|
||||
super();
|
||||
this.socketMap = socketMap;
|
||||
this.storage = storage;
|
||||
|
||||
const timer = setSafeInterval(this.logger,
|
||||
'Failed to remove closed WebSockets',
|
||||
this.closeExpiredSockets.bind(this),
|
||||
cleanupTimer * 60 * 1000);
|
||||
timer.unref();
|
||||
}
|
||||
|
||||
public async handle({ webSocket, info }: WebSocket2021HandlerInput): Promise<void> {
|
||||
this.socketMap.add(info.id, webSocket);
|
||||
webSocket.on('error', (): boolean => this.socketMap.deleteEntry(info.id, webSocket));
|
||||
webSocket.on('close', (): boolean => this.socketMap.deleteEntry(info.id, webSocket));
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all WebSockets that are attached to a subscription that no longer exists.
|
||||
*/
|
||||
private async closeExpiredSockets(): Promise<void> {
|
||||
this.logger.debug('Closing expired WebSockets');
|
||||
for (const [ id, sockets ] of this.socketMap.entrySets()) {
|
||||
const result = await this.storage.get(id);
|
||||
if (!result) {
|
||||
for (const socket of sockets) {
|
||||
// Due to the attached listener this also deletes the entries
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.logger.debug('Finished closing expired WebSockets');
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
import type { WebSocket } from 'ws';
|
||||
import type { SingleThreaded } from '../../../init/cluster/SingleThreaded';
|
||||
import { WrappedSetMultiMap } from '../../../util/map/WrappedSetMultiMap';
|
||||
|
||||
/**
|
||||
* A {@link SetMultiMap} linking identifiers to a set of WebSockets.
|
||||
* An extension of {@link WrappedSetMultiMap} to make sure Components.js allows us to create this in the config,
|
||||
* as {@link WrappedSetMultiMap} has a constructor not supported.
|
||||
*/
|
||||
export class WebSocketMap extends WrappedSetMultiMap<string, WebSocket> implements SingleThreaded {}
|
||||
@@ -0,0 +1,57 @@
|
||||
import { string } from 'yup';
|
||||
import type { AccessMap } from '../../../authorization/permissions/Permissions';
|
||||
import { AccessMode } from '../../../authorization/permissions/Permissions';
|
||||
import { BasicRepresentation } from '../../../http/representation/BasicRepresentation';
|
||||
import type { InteractionRoute } from '../../../identity/interaction/routing/InteractionRoute';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
|
||||
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { CONTEXT_NOTIFICATION } from '../Notification';
|
||||
import type { Subscription } from '../Subscription';
|
||||
import { SUBSCRIBE_SCHEMA } from '../Subscription';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { SubscriptionResponse, SubscriptionType } from '../SubscriptionType';
|
||||
|
||||
const type = 'WebSocketSubscription2021';
|
||||
const schema = SUBSCRIBE_SCHEMA.shape({
|
||||
type: string().required().oneOf([ type ]),
|
||||
});
|
||||
|
||||
/**
|
||||
* The notification subscription type WebSocketSubscription2021 as described in
|
||||
* https://solidproject.org/TR/websocket-subscription-2021
|
||||
*
|
||||
* Requires read permissions on a resource to be able to receive notifications.
|
||||
*/
|
||||
export class WebSocketSubscription2021 implements SubscriptionType<typeof schema> {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly path: string;
|
||||
|
||||
public readonly type = type;
|
||||
public readonly schema = schema;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, route: InteractionRoute) {
|
||||
this.storage = storage;
|
||||
this.path = route.getPath();
|
||||
}
|
||||
|
||||
public async extractModes(subscription: Subscription): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: subscription.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
public async subscribe(subscription: Subscription): Promise<SubscriptionResponse> {
|
||||
const info = this.storage.create(subscription, {});
|
||||
await this.storage.add(info);
|
||||
|
||||
const jsonld = {
|
||||
'@context': [ CONTEXT_NOTIFICATION ],
|
||||
type: this.type,
|
||||
source: `ws${this.path.slice('http'.length)}?auth=${encodeURI(info.id)}`,
|
||||
};
|
||||
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
|
||||
|
||||
return { response, info };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user