refactor: Rename NotificationChannelInfo to NotificationChannel

This commit is contained in:
Joachim Van Herwegen
2023-02-07 14:35:02 +01:00
parent 61f04487a1
commit 8d31233075
41 changed files with 358 additions and 357 deletions

View File

@@ -1,14 +1,15 @@
import { getLoggerFor } from '../../logging/LogUtil';
import { createErrorMessage } from '../../util/errors/ErrorUtil';
import type { NotificationChannelInfo, NotificationChannelStorage } from './NotificationChannelStorage';
import type { NotificationChannel } from './NotificationChannel';
import type { NotificationChannelStorage } from './NotificationChannelStorage';
import type { NotificationHandler } from './NotificationHandler';
import { StateHandler } from './StateHandler';
/**
* Handles the `state` feature by calling a {@link NotificationHandler}
* in case the {@link NotificationChannelInfo} has a `state` value.
* in case the {@link NotificationChannel} has a `state` value.
*
* Deletes the `state` parameter from the info afterwards.
* Deletes the `state` parameter from the channel afterwards.
*/
export class BaseStateHandler extends StateHandler {
protected readonly logger = getLoggerFor(this);
@@ -22,14 +23,14 @@ export class BaseStateHandler extends StateHandler {
this.storage = storage;
}
public async handle({ info }: { info: NotificationChannelInfo }): Promise<void> {
if (info.state) {
const topic = { path: info.topic };
public async handle({ channel }: { channel: NotificationChannel }): Promise<void> {
if (channel.state) {
const topic = { path: channel.topic };
try {
await this.handler.handleSafe({ info, topic });
await this.handler.handleSafe({ channel, topic });
// Remove the state once the relevant notification has been sent
delete info.state;
await this.storage.update(info);
delete channel.state;
await this.storage.update(channel);
} catch (error: unknown) {
this.logger.error(`Problem emitting state notification: ${createErrorMessage(error)}`);
}

View File

@@ -14,7 +14,7 @@ export interface ComposedNotificationHandlerArgs {
* Generates, serializes and emits a {@link Notification} using a {@link NotificationGenerator},
* {@link NotificationSerializer} and {@link NotificationEmitter}.
*
* Will not emit an event in case it has the same state as the notification channel info.
* Will not emit an event when it has the same state as the notification channel.
*/
export class ComposedNotificationHandler extends NotificationHandler {
private readonly generator: NotificationGenerator;
@@ -35,13 +35,13 @@ export class ComposedNotificationHandler extends NotificationHandler {
public async handle(input: NotificationHandlerInput): Promise<void> {
const notification = await this.generator.handle(input);
const { state } = input.info;
const { state } = input.channel;
// In case the state matches there is no need to send the notification
if (typeof state === 'string' && state === notification.state) {
return;
}
const representation = await this.serializer.handleSafe({ info: input.info, notification });
await this.emitter.handleSafe({ info: input.info, representation });
const representation = await this.serializer.handleSafe({ channel: input.channel, notification });
await this.emitter.handleSafe({ channel: input.channel, representation });
}
}

View File

@@ -4,13 +4,13 @@ import { getLoggerFor } from '../../logging/LogUtil';
import type { KeyValueStorage } from '../../storage/keyvalue/KeyValueStorage';
import { InternalServerError } from '../../util/errors/InternalServerError';
import type { ReadWriteLocker } from '../../util/locking/ReadWriteLocker';
import type { NotificationChannel } from './NotificationChannel';
import type { NotificationChannelInfo, NotificationChannelStorage } from './NotificationChannelStorage';
import type { NotificationChannel, NotificationChannelJson } from './NotificationChannel';
import type { NotificationChannelStorage } from './NotificationChannelStorage';
type StorageValue<T> = string | string[] | NotificationChannelInfo<T>;
type StorageValue<T> = string | string[] | NotificationChannel<T>;
/**
* Stores all the {@link NotificationChannelInfo} in a {@link KeyValueStorage}.
* Stores all the {@link NotificationChannel} in a {@link KeyValueStorage}.
*
* Uses a {@link ReadWriteLocker} to prevent internal race conditions.
*/
@@ -25,7 +25,7 @@ export class KeyValueChannelStorage<T extends Record<string, unknown>> implement
this.locker = locker;
}
public create(channel: NotificationChannel, features: T): NotificationChannelInfo<T> {
public create(channel: NotificationChannelJson, features: T): NotificationChannel<T> {
return {
id: `${channel.type}:${v4()}:${channel.topic}`,
topic: channel.topic,
@@ -40,92 +40,92 @@ export class KeyValueChannelStorage<T extends Record<string, unknown>> implement
};
}
public async get(id: string): Promise<NotificationChannelInfo<T> | undefined> {
const info = await this.storage.get(id);
if (info && this.isChannelInfo(info)) {
if (typeof info.endAt === 'number' && info.endAt < Date.now()) {
public async get(id: string): Promise<NotificationChannel<T> | undefined> {
const channel = await this.storage.get(id);
if (channel && this.isChannel(channel)) {
if (typeof channel.endAt === 'number' && channel.endAt < Date.now()) {
this.logger.info(`Notification channel ${id} has expired.`);
await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
await this.deleteInfo(info);
await this.deleteChannel(channel);
});
return;
}
return info;
return channel;
}
}
public async getAll(topic: ResourceIdentifier): Promise<string[]> {
const infos = await this.storage.get(topic.path);
if (Array.isArray(infos)) {
return infos;
const channels = await this.storage.get(topic.path);
if (Array.isArray(channels)) {
return channels;
}
return [];
}
public async add(info: NotificationChannelInfo<T>): Promise<void> {
const target = { path: info.topic };
public async add(channel: NotificationChannel<T>): Promise<void> {
const target = { path: channel.topic };
return this.locker.withWriteLock(this.getLockKey(target), async(): Promise<void> => {
const infos = await this.getAll(target);
await this.storage.set(info.id, info);
infos.push(info.id);
await this.storage.set(info.topic, infos);
const channels = await this.getAll(target);
await this.storage.set(channel.id, channel);
channels.push(channel.id);
await this.storage.set(channel.topic, channels);
});
}
public async update(info: NotificationChannelInfo<T>): Promise<void> {
return this.locker.withWriteLock(this.getLockKey(info.id), async(): Promise<void> => {
const oldInfo = await this.storage.get(info.id);
public async update(channel: NotificationChannel<T>): Promise<void> {
return this.locker.withWriteLock(this.getLockKey(channel.id), async(): Promise<void> => {
const oldChannel = await this.storage.get(channel.id);
if (oldInfo) {
if (!this.isChannelInfo(oldInfo)) {
throw new InternalServerError(`Trying to update ${info.id} which is not a NotificationChannelInfo.`);
if (oldChannel) {
if (!this.isChannel(oldChannel)) {
throw new InternalServerError(`Trying to update ${channel.id} which is not a NotificationChannel.`);
}
if (info.topic !== oldInfo.topic) {
throw new InternalServerError(`Trying to change the topic of a notification channel ${info.id}`);
if (channel.topic !== oldChannel.topic) {
throw new InternalServerError(`Trying to change the topic of a notification channel ${channel.id}`);
}
}
await this.storage.set(info.id, info);
await this.storage.set(channel.id, channel);
});
}
public async delete(id: string): Promise<void> {
return this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
const info = await this.get(id);
if (!info) {
const channel = await this.get(id);
if (!channel) {
return;
}
await this.deleteInfo(info);
await this.deleteChannel(channel);
});
}
/**
* Utility function for deleting a specific {@link NotificationChannelInfo} object.
* Does not create a lock on the info ID so should be wrapped in such a lock.
* Utility function for deleting a specific {@link NotificationChannel} object.
* Does not create a lock on the channel ID so should be wrapped in such a lock.
*/
private async deleteInfo(info: NotificationChannelInfo): Promise<void> {
await this.locker.withWriteLock(this.getLockKey(info.topic), async(): Promise<void> => {
const infos = await this.getAll({ path: info.topic });
const idx = infos.indexOf(info.id);
private async deleteChannel(channel: NotificationChannel): Promise<void> {
await this.locker.withWriteLock(this.getLockKey(channel.topic), async(): Promise<void> => {
const channels = await this.getAll({ path: channel.topic });
const idx = channels.indexOf(channel.id);
// If idx < 0 we have an inconsistency
if (idx < 0) {
this.logger.error(`Channel info ${info.id} was not found in the list of info targeting ${info.topic}.`);
this.logger.error(`Channel ${channel.id} was not found in the list of channels targeting ${channel.topic}.`);
this.logger.error('This should not happen and indicates a data consistency issue.');
} else {
infos.splice(idx, 1);
if (infos.length > 0) {
await this.storage.set(info.topic, infos);
channels.splice(idx, 1);
if (channels.length > 0) {
await this.storage.set(channel.topic, channels);
} else {
await this.storage.delete(info.topic);
await this.storage.delete(channel.topic);
}
}
await this.storage.delete(info.id);
await this.storage.delete(channel.id);
});
}
private isChannelInfo(value: StorageValue<T>): value is NotificationChannelInfo<T> {
return Boolean((value as NotificationChannelInfo).id);
private isChannel(value: StorageValue<T>): value is NotificationChannel<T> {
return Boolean((value as NotificationChannel).id);
}
private getLockKey(identifier: ResourceIdentifier | string): ResourceIdentifier {

View File

@@ -39,25 +39,25 @@ export class ListeningActivityHandler extends StaticHandler {
const channelIds = await this.storage.getAll(topic);
for (const id of channelIds) {
const info = await this.storage.get(id);
if (!info) {
const channel = await this.storage.get(id);
if (!channel) {
// Notification channel has expired
continue;
}
// Don't emit if the previous notification was too recent according to the requested rate
if (info.rate && info.rate > Date.now() - info.lastEmit) {
if (channel.rate && channel.rate > Date.now() - channel.lastEmit) {
continue;
}
// Don't emit if we have not yet reached the requested starting time
if (info.startAt && info.startAt > Date.now()) {
if (channel.startAt && channel.startAt > Date.now()) {
continue;
}
// No need to wait on this to resolve before going to the next channel.
// Prevent failed notification from blocking other notifications.
this.handler.handleSafe({ info, activity, topic }).catch((error): void => {
this.handler.handleSafe({ channel, activity, topic }).catch((error): void => {
this.logger.error(`Error trying to handle notification for ${id}: ${createErrorMessage(error)}`);
});
}

View File

@@ -27,4 +27,21 @@ export const NOTIFICATION_CHANNEL_SCHEMA = object({
toSeconds(parse(original)) * 1000).optional(),
accept: string().optional(),
});
export type NotificationChannel = InferType<typeof NOTIFICATION_CHANNEL_SCHEMA>;
export type NotificationChannelJson = InferType<typeof NOTIFICATION_CHANNEL_SCHEMA>;
/**
* The info provided for a notification channel during a subscription.
* `features` can contain custom values relevant for a specific channel type.
*/
export type NotificationChannel<T = Record<string, unknown>> = {
id: string;
topic: string;
type: string;
startAt?: number;
endAt?: number;
accept?: string;
rate?: number;
state?: string;
lastEmit: number;
features: T;
};

View File

@@ -1,22 +1,5 @@
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { NotificationChannel } from './NotificationChannel';
/**
* The info provided for a notification channel during a subscription.
* `features` can contain custom values relevant for a specific channel type.
*/
export type NotificationChannelInfo<T = Record<string, unknown>> = {
id: string;
topic: string;
type: string;
startAt?: number;
endAt?: number;
accept?: string;
rate?: number;
state?: string;
lastEmit: number;
features: T;
};
import type { NotificationChannel, NotificationChannelJson } from './NotificationChannel';
/**
* Stores all the information necessary to keep track of notification channels.
@@ -26,19 +9,19 @@ export type NotificationChannelInfo<T = Record<string, unknown>> = {
*/
export interface NotificationChannelStorage<T extends Record<string, unknown> = Record<string, unknown>> {
/**
* Creates info corresponding to the given channel and features.
* This does not store the generated info in the storage.
* @param channel - Notification channel to generate info of.
* @param features - Features to add to the info
* Creates channel corresponding to the given channel and features.
* This does not store the generated channel in the storage.
* @param channel - Notification channel to generate channel of.
* @param features - Features to add to the channel
*/
create: (channel: NotificationChannel, features: T) => NotificationChannelInfo<T>;
create: (channel: NotificationChannelJson, features: T) => NotificationChannel<T>;
/**
* Returns the info for the requested notification channel.
* Returns the channel for the requested notification channel.
* `undefined` if no match was found or if the notification channel expired.
* @param id - The identifier of the notification channel.
*/
get: (id: string) => Promise<NotificationChannelInfo<T> | undefined>;
get: (id: string) => Promise<NotificationChannel<T> | undefined>;
/**
* Returns the identifiers of all notification channel entries that have the given identifier as their topic.
@@ -48,17 +31,17 @@ export interface NotificationChannelStorage<T extends Record<string, unknown> =
getAll: (topic: ResourceIdentifier) => Promise<string[]>;
/**
* Adds the given info to the storage.
* @param info - Info to add.
* Adds the given channel to the storage.
* @param channel - Channel to add.
*/
add: (info: NotificationChannelInfo<T>) => Promise<void>;
add: (channel: NotificationChannel<T>) => Promise<void>;
/**
* Updates the given notification channel info.
* Updates the given notification channel.
* The `id` and the `topic` can not be updated.
* @param info - The info to update.
* @param channel - The channel to update.
*/
update: (info: NotificationChannelInfo<T>) => Promise<void>;
update: (channel: NotificationChannel<T>) => Promise<void>;
/**
* Deletes the given notification channel from the storage.

View File

@@ -2,12 +2,11 @@ import type { InferType } from 'yup';
import type { Credentials } from '../../authentication/Credentials';
import type { AccessMap } from '../../authorization/permissions/Permissions';
import type { Representation } from '../../http/representation/Representation';
import type { NOTIFICATION_CHANNEL_SCHEMA } from './NotificationChannel';
import type { NotificationChannelInfo } from './NotificationChannelStorage';
import type { NOTIFICATION_CHANNEL_SCHEMA, NotificationChannel } from './NotificationChannel';
export interface NotificationChannelResponse<TFeat extends Record<string, unknown> = Record<string, unknown>> {
response: Representation;
info: NotificationChannelInfo<TFeat>;
channel: NotificationChannel<TFeat>;
}
/**
@@ -32,13 +31,13 @@ export interface NotificationChannelType<
*
* @returns The required modes.
*/
extractModes: (channel: InferType<TSub>) => Promise<AccessMap>;
extractModes: (json: InferType<TSub>) => Promise<AccessMap>;
/**
* Registers the given notification channel.
* @param channel - The notification channel to register.
* @param credentials - The credentials of the client trying to subscribe.
*
* @returns A {@link Representation} to return as a response and the generated {@link NotificationChannelInfo}.
* @returns A {@link Representation} to return as a response and the generated {@link NotificationChannel}.
*/
subscribe: (channel: InferType<TSub>, credentials: Credentials) => Promise<NotificationChannelResponse<TFeat>>;
subscribe: (json: InferType<TSub>, credentials: Credentials) => Promise<NotificationChannelResponse<TFeat>>;
}

View File

@@ -1,14 +1,14 @@
import type { Representation } from '../../http/representation/Representation';
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { NotificationChannelInfo } from './NotificationChannelStorage';
import type { NotificationChannel } from './NotificationChannel';
export interface NotificationEmitterInput<T = Record<string, unknown>> {
representation: Representation;
info: NotificationChannelInfo<T>;
channel: NotificationChannel<T>;
}
/**
* Emits a serialized Notification to the channel defined by the info.
* Emits a serialized Notification to the channel defined by the channel.
*/
export abstract class NotificationEmitter<T = Record<string, unknown>>
extends AsyncHandler<NotificationEmitterInput<T>> {}

View File

@@ -1,15 +1,15 @@
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
import type { NotificationChannelInfo } from './NotificationChannelStorage';
import type { NotificationChannel } from './NotificationChannel';
export interface NotificationHandlerInput {
topic: ResourceIdentifier;
info: NotificationChannelInfo;
channel: NotificationChannel;
activity?: VocabularyTerm<typeof AS>;
}
/**
* Makes sure an activity gets emitted to the relevant channel based on the given info.
* Makes sure an activity gets emitted to the relevant channel.
*/
export abstract class NotificationHandler extends AsyncHandler<NotificationHandlerInput> {}

View File

@@ -13,7 +13,7 @@ import { readableToString } from '../../util/StreamUtil';
import type { HttpRequest } from '../HttpRequest';
import type { OperationHttpHandlerInput } from '../OperationHttpHandler';
import { OperationHttpHandler } from '../OperationHttpHandler';
import type { NotificationChannel } from './NotificationChannel';
import type { NotificationChannelJson } from './NotificationChannel';
import type { NotificationChannelType } from './NotificationChannelType';
export interface NotificationSubscriberArgs {
@@ -34,8 +34,8 @@ export interface NotificationSubscriberArgs {
*/
authorizer: Authorizer;
/**
* Overrides the expiration feature of channels by making sure they always expire after the `maxDuration` value.
* In case the expiration of the channel is shorter than `maxDuration` the original value will be kept.
* Overrides the expiration feature of channels, by making sure they always expire after the `maxDuration` value.
* If the expiration of the channel is shorter than `maxDuration`, the original value will be kept.
* Value is set in minutes. 0 is infinite.
*/
maxDuration?: number;
@@ -70,7 +70,7 @@ export class NotificationSubscriber extends OperationHttpHandler {
throw new UnsupportedMediaTypeHttpError('Subscribe bodies need to be application/ld+json.');
}
let channel: NotificationChannel;
let channel: NotificationChannelJson;
try {
const json = JSON.parse(await readableToString(operation.body.data));
channel = await this.channelType.schema.validate(json);
@@ -93,7 +93,7 @@ export class NotificationSubscriber extends OperationHttpHandler {
return new OkResponseDescription(response.metadata, response.data);
}
private async authorize(request: HttpRequest, channel: NotificationChannel): Promise<Credentials> {
private async authorize(request: HttpRequest, channel: NotificationChannelJson): Promise<Credentials> {
const credentials = await this.credentialsExtractor.handleSafe(request);
this.logger.debug(`Extracted credentials: ${JSON.stringify(credentials)}`);

View File

@@ -1,5 +1,5 @@
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { NotificationChannelInfo } from './NotificationChannelStorage';
import type { NotificationChannel } from './NotificationChannel';
/**
* Handles the `state` feature of notifications.
@@ -8,4 +8,4 @@ import type { NotificationChannelInfo } from './NotificationChannelStorage';
*
* Implementations of this class should handle all channels and filter out those that need a `state` notification.
*/
export abstract class StateHandler extends AsyncHandler<{ info: NotificationChannelInfo }> {}
export abstract class StateHandler extends AsyncHandler<{ channel: NotificationChannel }> {}

View File

@@ -16,7 +16,7 @@ export class TypedNotificationHandler extends NotificationHandler {
}
public async canHandle(input: NotificationHandlerInput): Promise<void> {
if (input.info.type !== this.type) {
if (input.channel.type !== this.type) {
throw new NotImplementedHttpError(`Only ${this.type} notification channels are supported.`);
}
await this.source.canHandle(input);

View File

@@ -34,8 +34,8 @@ export class WebHookEmitter extends NotificationEmitter<WebHookFeatures> {
this.expiration = expiration * 60 * 1000;
}
public async handle({ info, representation }: NotificationEmitterInput<WebHookFeatures>): Promise<void> {
this.logger.debug(`Emitting WebHook notification with target ${info.features.target}`);
public async handle({ channel, representation }: NotificationEmitterInput<WebHookFeatures>): Promise<void> {
this.logger.debug(`Emitting WebHook notification with target ${channel.features.target}`);
const privateKey = await this.jwkGenerator.getPrivateKey();
const publicKey = await this.jwkGenerator.getPublicKey();
@@ -66,14 +66,14 @@ export class WebHookEmitter extends NotificationEmitter<WebHookFeatures> {
// https://datatracker.ietf.org/doc/html/draft-ietf-oauth-dpop#section-4.2
const dpopProof = await new SignJWT({
htu: info.features.target,
htu: channel.features.target,
htm: 'POST',
}).setProtectedHeader({ alg: privateKey.alg, jwk: publicKey, typ: 'dpop+jwt' })
.setIssuedAt(time)
.setJti(v4())
.sign(privateKeyObject);
const response = await fetch(info.features.target, {
const response = await fetch(channel.features.target, {
method: 'POST',
headers: {
'content-type': representation.metadata.contentType!,
@@ -83,7 +83,7 @@ export class WebHookEmitter extends NotificationEmitter<WebHookFeatures> {
body: await readableToString(representation.data),
});
if (response.status >= 400) {
this.logger.error(`There was an issue emitting a WebHook notification with target ${info.features.target}: ${
this.logger.error(`There was an issue emitting a WebHook notification with target ${channel.features.target}: ${
await response.text()}`);
}
}

View File

@@ -52,11 +52,11 @@ export class WebHookSubscription2021 implements NotificationChannelType<typeof s
this.stateHandler = stateHandler;
}
public async extractModes(channel: InferType<typeof schema>): Promise<AccessMap> {
return new IdentifierSetMultiMap<AccessMode>([[{ path: channel.topic }, AccessMode.read ]]);
public async extractModes(json: InferType<typeof schema>): Promise<AccessMap> {
return new IdentifierSetMultiMap<AccessMode>([[{ path: json.topic }, AccessMode.read ]]);
}
public async subscribe(channel: InferType<typeof schema>, credentials: Credentials):
public async subscribe(json: InferType<typeof schema>, credentials: Credentials):
Promise<NotificationChannelResponse<WebHookFeatures>> {
const webId = credentials.agent?.webId;
@@ -66,15 +66,15 @@ export class WebHookSubscription2021 implements NotificationChannelType<typeof s
);
}
const info = this.storage.create(channel, { target: channel.target, webId });
await this.storage.add(info);
const channel = this.storage.create(json, { target: json.target, webId });
await this.storage.add(channel);
const jsonld = {
'@context': [ CONTEXT_NOTIFICATION ],
type: this.type,
target: channel.target,
target: json.target,
// eslint-disable-next-line @typescript-eslint/naming-convention
unsubscribe_endpoint: generateWebHookUnsubscribeUrl(this.unsubscribePath, info.id),
unsubscribe_endpoint: generateWebHookUnsubscribeUrl(this.unsubscribePath, channel.id),
};
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
@@ -82,11 +82,11 @@ export class WebHookSubscription2021 implements NotificationChannelType<typeof s
// right after we send the response for subscribing.
// We do this by waiting for the response to be closed.
endOfStream(response.data)
.then((): Promise<void> => this.stateHandler.handleSafe({ info }))
.then((): Promise<void> => this.stateHandler.handleSafe({ channel }))
.catch((error): void => {
this.logger.error(`Error emitting state notification: ${createErrorMessage(error)}`);
});
return { response, info };
return { response, channel };
}
}

View File

@@ -28,13 +28,13 @@ export class WebHookUnsubscriber extends OperationHttpHandler {
public async handle({ operation, request }: OperationHttpHandlerInput): Promise<ResponseDescription> {
const id = parseWebHookUnsubscribeUrl(operation.target.path);
const info = await this.storage.get(id);
if (!info) {
const channel = await this.storage.get(id);
if (!channel) {
throw new NotFoundHttpError();
}
const credentials = await this.credentialsExtractor.handleSafe(request);
if (info.features.webId !== credentials.agent?.webId) {
if (channel.features.webId !== credentials.agent?.webId) {
throw new ForbiddenHttpError();
}

View File

@@ -21,9 +21,9 @@ export class WebSocket2021Emitter extends NotificationEmitter {
this.socketMap = socketMap;
}
public async handle({ info, representation }: NotificationEmitterInput): Promise<void> {
public async handle({ channel, representation }: NotificationEmitterInput): Promise<void> {
// Called as a NotificationEmitter: emit the notification
const webSockets = this.socketMap.get(info.id);
const webSockets = this.socketMap.get(channel.id);
if (webSockets) {
const data = await readableToString(representation.data);
for (const webSocket of webSockets) {

View File

@@ -1,9 +1,9 @@
import type { WebSocket } from 'ws';
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import type { NotificationChannelInfo } from '../NotificationChannelStorage';
import type { NotificationChannel } from '../NotificationChannel';
export interface WebSocket2021HandlerInput {
info: NotificationChannelInfo;
channel: NotificationChannel;
webSocket: WebSocket;
}

View File

@@ -38,16 +38,16 @@ export class WebSocket2021Listener extends WebSocketServerConfigurator {
return webSocket.close();
}
const info = await this.storage.get(id);
const channel = await this.storage.get(id);
if (!info) {
// Info not being there implies it has expired
if (!channel) {
// Channel not being there implies it has expired
webSocket.send(`Notification channel has expired`);
return webSocket.close();
}
this.logger.info(`Accepted WebSocket connection listening to changes on ${info.topic}`);
this.logger.info(`Accepted WebSocket connection listening to changes on ${channel.topic}`);
await this.handler.handleSafe({ info, webSocket });
await this.handler.handleSafe({ channel, webSocket });
}
}

View File

@@ -34,10 +34,10 @@ export class WebSocket2021Storer extends WebSocket2021Handler {
timer.unref();
}
public async handle({ webSocket, info }: WebSocket2021HandlerInput): Promise<void> {
this.socketMap.add(info.id, webSocket);
webSocket.on('error', (): boolean => this.socketMap.deleteEntry(info.id, webSocket));
webSocket.on('close', (): boolean => this.socketMap.deleteEntry(info.id, webSocket));
public async handle({ webSocket, channel }: WebSocket2021HandlerInput): Promise<void> {
this.socketMap.add(channel.id, webSocket);
webSocket.on('error', (): boolean => this.socketMap.deleteEntry(channel.id, webSocket));
webSocket.on('close', (): boolean => this.socketMap.deleteEntry(channel.id, webSocket));
}
/**

View File

@@ -7,7 +7,7 @@ import { getLoggerFor } from '../../../logging/LogUtil';
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
import { CONTEXT_NOTIFICATION } from '../Notification';
import type { NotificationChannel } from '../NotificationChannel';
import type { NotificationChannelJson } from '../NotificationChannel';
import { NOTIFICATION_CHANNEL_SCHEMA } from '../NotificationChannel';
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
import type { NotificationChannelResponse, NotificationChannelType } from '../NotificationChannelType';
@@ -38,21 +38,21 @@ export class WebSocketSubscription2021 implements NotificationChannelType<typeof
this.path = route.getPath();
}
public async extractModes(channel: NotificationChannel): Promise<AccessMap> {
return new IdentifierSetMultiMap<AccessMode>([[{ path: channel.topic }, AccessMode.read ]]);
public async extractModes(json: NotificationChannelJson): Promise<AccessMap> {
return new IdentifierSetMultiMap<AccessMode>([[{ path: json.topic }, AccessMode.read ]]);
}
public async subscribe(channel: NotificationChannel): Promise<NotificationChannelResponse> {
const info = this.storage.create(channel, {});
await this.storage.add(info);
public async subscribe(json: NotificationChannelJson): Promise<NotificationChannelResponse> {
const channel = this.storage.create(json, {});
await this.storage.add(channel);
const jsonld = {
'@context': [ CONTEXT_NOTIFICATION ],
type: this.type,
source: generateWebSocketUrl(this.path, info.id),
source: generateWebSocketUrl(this.path, channel.id),
};
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
return { response, info };
return { response, channel };
}
}

View File

@@ -25,7 +25,7 @@ export class ConvertingNotificationSerializer extends NotificationSerializer {
public async handle(input: NotificationSerializerInput): Promise<Representation> {
const representation = await this.source.handle(input);
const type = input.info.accept;
const type = input.channel.accept;
if (!type) {
return representation;

View File

@@ -1,17 +1,17 @@
import type { Representation } from '../../../http/representation/Representation';
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import type { Notification } from '../Notification';
import type { NotificationChannelInfo } from '../NotificationChannelStorage';
import type { NotificationChannel } from '../NotificationChannel';
export interface NotificationSerializerInput {
notification: Notification;
info: NotificationChannelInfo;
channel: NotificationChannel;
}
/**
* Converts a {@link Notification} into a {@link Representation} that can be transmitted.
*
* The reason this is a separate class in between a generator and emitter,
* is so a specific notification channel type can add extra metadata to the Representation if needed.
* This is a separate class between a generator and emitter,
* so that a specific notification channel type can add extra metadata to the Representation if needed.
*/
export abstract class NotificationSerializer extends AsyncHandler<NotificationSerializerInput, Representation> { }