mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
refactor: Rename subscription to notification channel
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
"IdentifierMap",
|
||||
"IdentifierSetMultiMap",
|
||||
"NodeJS.Dict",
|
||||
"NotificationChannelType",
|
||||
"PermissionMap",
|
||||
"Promise",
|
||||
"Readonly",
|
||||
@@ -26,7 +27,6 @@
|
||||
"Server",
|
||||
"SetMultiMap",
|
||||
"Shorthand",
|
||||
"SubscriptionType",
|
||||
"Template",
|
||||
"TemplateEngine",
|
||||
"ValuePreferencesArg",
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
{
|
||||
"comment": "Storage to be used to keep track of subscriptions.",
|
||||
"@id": "urn:solid-server:default:SubscriptionStorage",
|
||||
"@type": "KeyValueSubscriptionStorage",
|
||||
"@type": "KeyValueChannelStorage",
|
||||
"locker": { "@id": "urn:solid-server:default:ResourceLocker" },
|
||||
"storage": {
|
||||
"@type": "EncodingPathStorage",
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"allowedPathNames": [ "/WebHookSubscription2021/$" ],
|
||||
"handler": {
|
||||
"@type": "NotificationSubscriber",
|
||||
"subscriptionType": { "@id": "urn:solid-server:default:WebHookSubscription2021" },
|
||||
"channelType": { "@id": "urn:solid-server:default:WebHookSubscription2021" },
|
||||
"credentialsExtractor": { "@id": "urn:solid-server:default:CredentialsExtractor" },
|
||||
"permissionReader": { "@id": "urn:solid-server:default:PermissionReader" },
|
||||
"authorizer": { "@id": "urn:solid-server:default:Authorizer" }
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"allowedPathNames": [ "/WebSocketSubscription2021/" ],
|
||||
"handler": {
|
||||
"@type": "NotificationSubscriber",
|
||||
"subscriptionType": { "@id": "urn:solid-server:default:WebSocketSubscription2021" },
|
||||
"channelType": { "@id": "urn:solid-server:default:WebSocketSubscription2021" },
|
||||
"credentialsExtractor": { "@id": "urn:solid-server:default:CredentialsExtractor" },
|
||||
"permissionReader": { "@id": "urn:solid-server:default:PermissionReader" },
|
||||
"authorizer": { "@id": "urn:solid-server:default:Authorizer" }
|
||||
|
||||
@@ -38,7 +38,7 @@ that contains all the necessary presets to describe a notification subscription
|
||||
When adding a new subscription type,
|
||||
a new instance of such a class should be added to the `urn:solid-server:default:StorageDescriber`.
|
||||
|
||||
## Subscription
|
||||
## NotificationChannel
|
||||
|
||||
To subscribe, a client has to send a specific JSON-LD request to the URl found during discovery.
|
||||
|
||||
@@ -61,7 +61,7 @@ which in our configs is set to `/.notifications/`.
|
||||
For every type there is then a `OperationRouterHandler` that accepts requests to that specific URL,
|
||||
after which a `NotificationSubscriber` handles all checks related to subscribing,
|
||||
for which it uses a `SubscriptionType` that contains all the information necessary for a specific type.
|
||||
If the subscription is valid and has authorization, the results will be saved in a `SubscriptionStorage`.
|
||||
If the subscription is valid and has authorization, the results will be saved in a `NotificationChannelStorage`.
|
||||
|
||||
## Activity
|
||||
|
||||
@@ -71,7 +71,7 @@ flowchart TB
|
||||
ListeningActivityHandler --> ListeningActivityHandlerArgs
|
||||
|
||||
subgraph ListeningActivityHandlerArgs[" "]
|
||||
SubscriptionStorage("<strong>SubscriptionStorage</strong><br><i>SubscriptionStorage</i>")
|
||||
NotificationChannelStorage("<strong>NotificationChannelStorage</strong><br><i>NotificationChannelStorage</i>")
|
||||
ResourceStore("<strong>ResourceStore</strong><br><i>ActivityEmitter</i>")
|
||||
NotificationHandler("<strong>NotificationHandler</strong><br>WaterfallHandler")
|
||||
end
|
||||
@@ -149,7 +149,7 @@ flowchart TB
|
||||
|
||||
subgraph WebSocket2021ListenerArgs[" "]
|
||||
direction LR
|
||||
SubscriptionStorage("<strong>SubscriptionStorage</strong><br>SubscriptionStorage")
|
||||
NotificationChannelStorage("<strong>NotificationChannelStorage</strong><br>NotificationChannelStorage")
|
||||
SequenceHandler("<br>SequenceHandler")
|
||||
end
|
||||
|
||||
|
||||
@@ -340,16 +340,16 @@ export * from './server/notifications/WebSocketSubscription2021/WebSocketSubscri
|
||||
export * from './server/notifications/ActivityEmitter';
|
||||
export * from './server/notifications/BaseStateHandler';
|
||||
export * from './server/notifications/ComposedNotificationHandler';
|
||||
export * from './server/notifications/KeyValueSubscriptionStorage';
|
||||
export * from './server/notifications/KeyValueChannelStorage';
|
||||
export * from './server/notifications/ListeningActivityHandler';
|
||||
export * from './server/notifications/NotificationChannel';
|
||||
export * from './server/notifications/NotificationChannelStorage';
|
||||
export * from './server/notifications/NotificationChannelType';
|
||||
export * from './server/notifications/NotificationDescriber';
|
||||
export * from './server/notifications/NotificationEmitter';
|
||||
export * from './server/notifications/NotificationHandler';
|
||||
export * from './server/notifications/NotificationSubscriber';
|
||||
export * from './server/notifications/StateHandler';
|
||||
export * from './server/notifications/Subscription';
|
||||
export * from './server/notifications/SubscriptionStorage';
|
||||
export * from './server/notifications/SubscriptionType';
|
||||
export * from './server/notifications/TypedNotificationHandler';
|
||||
|
||||
// Server/Util
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import { createErrorMessage } from '../../util/errors/ErrorUtil';
|
||||
import type { NotificationChannelInfo, NotificationChannelStorage } from './NotificationChannelStorage';
|
||||
import type { NotificationHandler } from './NotificationHandler';
|
||||
import { StateHandler } from './StateHandler';
|
||||
import type { SubscriptionInfo, SubscriptionStorage } from './SubscriptionStorage';
|
||||
|
||||
/**
|
||||
* Handles the `state` feature by calling a {@link NotificationHandler}
|
||||
* in case the {@link SubscriptionInfo} has a `state` value.
|
||||
* in case the {@link NotificationChannelInfo} has a `state` value.
|
||||
*
|
||||
* Deletes the `state` parameter from the info afterwards.
|
||||
*/
|
||||
@@ -14,15 +14,15 @@ export class BaseStateHandler extends StateHandler {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly handler: NotificationHandler;
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
|
||||
public constructor(handler: NotificationHandler, storage: SubscriptionStorage) {
|
||||
public constructor(handler: NotificationHandler, storage: NotificationChannelStorage) {
|
||||
super();
|
||||
this.handler = handler;
|
||||
this.storage = storage;
|
||||
}
|
||||
|
||||
public async handle({ info }: { info: SubscriptionInfo }): Promise<void> {
|
||||
public async handle({ info }: { info: NotificationChannelInfo }): Promise<void> {
|
||||
if (info.state) {
|
||||
const topic = { path: info.topic };
|
||||
try {
|
||||
|
||||
@@ -14,7 +14,7 @@ export interface ComposedNotificationHandlerArgs {
|
||||
* Generates, serializes and emits a {@link Notification} using a {@link NotificationGenerator},
|
||||
* {@link NotificationSerializer} and {@link NotificationEmitter}.
|
||||
*
|
||||
* Will not emit an event in case it has the same state as the subscription info.
|
||||
* Will not emit an event in case it has the same state as the notification channel info.
|
||||
*/
|
||||
export class ComposedNotificationHandler extends NotificationHandler {
|
||||
private readonly generator: NotificationGenerator;
|
||||
|
||||
@@ -4,17 +4,17 @@ import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import type { KeyValueStorage } from '../../storage/keyvalue/KeyValueStorage';
|
||||
import { InternalServerError } from '../../util/errors/InternalServerError';
|
||||
import type { ReadWriteLocker } from '../../util/locking/ReadWriteLocker';
|
||||
import type { Subscription } from './Subscription';
|
||||
import type { SubscriptionInfo, SubscriptionStorage } from './SubscriptionStorage';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
import type { NotificationChannelInfo, NotificationChannelStorage } from './NotificationChannelStorage';
|
||||
|
||||
type StorageValue<T> = string | string[] | SubscriptionInfo<T>;
|
||||
type StorageValue<T> = string | string[] | NotificationChannelInfo<T>;
|
||||
|
||||
/**
|
||||
* Stores all the {@link SubscriptionInfo} in a {@link KeyValueStorage}.
|
||||
* Stores all the {@link NotificationChannelInfo} in a {@link KeyValueStorage}.
|
||||
*
|
||||
* Uses a {@link ReadWriteLocker} to prevent internal race conditions.
|
||||
*/
|
||||
export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> implements SubscriptionStorage<T> {
|
||||
export class KeyValueChannelStorage<T extends Record<string, unknown>> implements NotificationChannelStorage<T> {
|
||||
protected logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: KeyValueStorage<string, StorageValue<T>>;
|
||||
@@ -25,26 +25,26 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
|
||||
this.locker = locker;
|
||||
}
|
||||
|
||||
public create(subscription: Subscription, features: T): SubscriptionInfo<T> {
|
||||
public create(channel: NotificationChannel, features: T): NotificationChannelInfo<T> {
|
||||
return {
|
||||
id: `${subscription.type}:${v4()}:${subscription.topic}`,
|
||||
topic: subscription.topic,
|
||||
type: subscription.type,
|
||||
id: `${channel.type}:${v4()}:${channel.topic}`,
|
||||
topic: channel.topic,
|
||||
type: channel.type,
|
||||
lastEmit: 0,
|
||||
startAt: subscription.startAt,
|
||||
endAt: subscription.endAt,
|
||||
accept: subscription.accept,
|
||||
rate: subscription.rate,
|
||||
state: subscription.state,
|
||||
startAt: channel.startAt,
|
||||
endAt: channel.endAt,
|
||||
accept: channel.accept,
|
||||
rate: channel.rate,
|
||||
state: channel.state,
|
||||
features,
|
||||
};
|
||||
}
|
||||
|
||||
public async get(id: string): Promise<SubscriptionInfo<T> | undefined> {
|
||||
public async get(id: string): Promise<NotificationChannelInfo<T> | undefined> {
|
||||
const info = await this.storage.get(id);
|
||||
if (info && this.isSubscriptionInfo(info)) {
|
||||
if (info && this.isChannelInfo(info)) {
|
||||
if (typeof info.endAt === 'number' && info.endAt < Date.now()) {
|
||||
this.logger.info(`Subscription ${id} has expired.`);
|
||||
this.logger.info(`Notification channel ${id} has expired.`);
|
||||
await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
|
||||
await this.deleteInfo(info);
|
||||
});
|
||||
@@ -63,7 +63,7 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
|
||||
return [];
|
||||
}
|
||||
|
||||
public async add(info: SubscriptionInfo<T>): Promise<void> {
|
||||
public async add(info: NotificationChannelInfo<T>): Promise<void> {
|
||||
const target = { path: info.topic };
|
||||
return this.locker.withWriteLock(this.getLockKey(target), async(): Promise<void> => {
|
||||
const infos = await this.getAll(target);
|
||||
@@ -73,16 +73,16 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
|
||||
});
|
||||
}
|
||||
|
||||
public async update(info: SubscriptionInfo<T>): Promise<void> {
|
||||
public async update(info: NotificationChannelInfo<T>): Promise<void> {
|
||||
return this.locker.withWriteLock(this.getLockKey(info.id), async(): Promise<void> => {
|
||||
const oldInfo = await this.storage.get(info.id);
|
||||
|
||||
if (oldInfo) {
|
||||
if (!this.isSubscriptionInfo(oldInfo)) {
|
||||
throw new InternalServerError(`Trying to update ${info.id} which is not a SubscriptionInfo.`);
|
||||
if (!this.isChannelInfo(oldInfo)) {
|
||||
throw new InternalServerError(`Trying to update ${info.id} which is not a NotificationChannelInfo.`);
|
||||
}
|
||||
if (info.topic !== oldInfo.topic) {
|
||||
throw new InternalServerError(`Trying to change the topic of subscription ${info.id}`);
|
||||
throw new InternalServerError(`Trying to change the topic of a notification channel ${info.id}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,16 +101,16 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility function for deleting a specific {@link SubscriptionInfo} object.
|
||||
* Does not create a lock on the subscription ID so should be wrapped in such a lock.
|
||||
* Utility function for deleting a specific {@link NotificationChannelInfo} object.
|
||||
* Does not create a lock on the info ID so should be wrapped in such a lock.
|
||||
*/
|
||||
private async deleteInfo(info: SubscriptionInfo): Promise<void> {
|
||||
private async deleteInfo(info: NotificationChannelInfo): Promise<void> {
|
||||
await this.locker.withWriteLock(this.getLockKey(info.topic), async(): Promise<void> => {
|
||||
const infos = await this.getAll({ path: info.topic });
|
||||
const idx = infos.indexOf(info.id);
|
||||
// If idx < 0 we have an inconsistency
|
||||
if (idx < 0) {
|
||||
this.logger.error(`Subscription info ${info.id} was not found in the list of info targeting ${info.topic}.`);
|
||||
this.logger.error(`Channel info ${info.id} was not found in the list of info targeting ${info.topic}.`);
|
||||
this.logger.error('This should not happen and indicates a data consistency issue.');
|
||||
} else {
|
||||
infos.splice(idx, 1);
|
||||
@@ -124,8 +124,8 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
|
||||
});
|
||||
}
|
||||
|
||||
private isSubscriptionInfo(value: StorageValue<T>): value is SubscriptionInfo<T> {
|
||||
return Boolean((value as SubscriptionInfo).id);
|
||||
private isChannelInfo(value: StorageValue<T>): value is NotificationChannelInfo<T> {
|
||||
return Boolean((value as NotificationChannelInfo).id);
|
||||
}
|
||||
|
||||
private getLockKey(identifier: ResourceIdentifier | string): ResourceIdentifier {
|
||||
@@ -4,14 +4,14 @@ import { createErrorMessage } from '../../util/errors/ErrorUtil';
|
||||
import { StaticHandler } from '../../util/handlers/StaticHandler';
|
||||
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
|
||||
import type { ActivityEmitter } from './ActivityEmitter';
|
||||
import type { NotificationChannelStorage } from './NotificationChannelStorage';
|
||||
import type { NotificationHandler } from './NotificationHandler';
|
||||
import type { SubscriptionStorage } from './SubscriptionStorage';
|
||||
|
||||
/**
|
||||
* Listens to an {@link ActivityEmitter} and calls the stored {@link NotificationHandler}s in case of an event
|
||||
* for every matching Subscription found.
|
||||
* for every matching notification channel found.
|
||||
*
|
||||
* Takes the `rate` feature into account so only subscriptions that want a new notification will receive one.
|
||||
* Takes the `rate` feature into account so only channels that want a new notification will receive one.
|
||||
*
|
||||
* Extends {@link StaticHandler} so it can be more easily injected into a Components.js configuration.
|
||||
* No class takes this one as input, so to make sure Components.js instantiates it,
|
||||
@@ -20,10 +20,10 @@ import type { SubscriptionStorage } from './SubscriptionStorage';
|
||||
export class ListeningActivityHandler extends StaticHandler {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly handler: NotificationHandler;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, emitter: ActivityEmitter, handler: NotificationHandler) {
|
||||
public constructor(storage: NotificationChannelStorage, emitter: ActivityEmitter, handler: NotificationHandler) {
|
||||
super();
|
||||
this.storage = storage;
|
||||
this.handler = handler;
|
||||
@@ -36,12 +36,12 @@ export class ListeningActivityHandler extends StaticHandler {
|
||||
}
|
||||
|
||||
private async emit(topic: ResourceIdentifier, activity: VocabularyTerm<typeof AS>): Promise<void> {
|
||||
const subscriptionIds = await this.storage.getAll(topic);
|
||||
const channelIds = await this.storage.getAll(topic);
|
||||
|
||||
for (const id of subscriptionIds) {
|
||||
for (const id of channelIds) {
|
||||
const info = await this.storage.get(id);
|
||||
if (!info) {
|
||||
// Subscription has expired
|
||||
// Notification channel has expired
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ export class ListeningActivityHandler extends StaticHandler {
|
||||
continue;
|
||||
}
|
||||
|
||||
// No need to wait on this to resolve before going to the next subscription.
|
||||
// No need to wait on this to resolve before going to the next channel.
|
||||
// Prevent failed notification from blocking other notifications.
|
||||
this.handler.handleSafe({ info, activity, topic }).catch((error): void => {
|
||||
this.logger.error(`Error trying to handle notification for ${id}: ${createErrorMessage(error)}`);
|
||||
|
||||
@@ -4,13 +4,13 @@ import { array, number, object, string } from 'yup';
|
||||
import { CONTEXT_NOTIFICATION } from './Notification';
|
||||
|
||||
/**
|
||||
* A JSON parsing schema that can be used to parse subscription input.
|
||||
* Specific subscription types can extend this schema with their own custom keys.
|
||||
* A JSON parsing schema that can be used to parse a notification channel sent during subscription.
|
||||
* Specific notification channels can extend this schema with their own custom keys.
|
||||
*/
|
||||
export const SUBSCRIBE_SCHEMA = object({
|
||||
export const NOTIFICATION_CHANNEL_SCHEMA = object({
|
||||
'@context': array(string()).ensure().required().test({
|
||||
name: 'RequireNotificationContext',
|
||||
message: `The ${CONTEXT_NOTIFICATION} context is required in the subscription JSON-LD body.`,
|
||||
message: `The ${CONTEXT_NOTIFICATION} context is required in the notification channel JSON-LD body.`,
|
||||
test: (context): boolean => Boolean(context?.includes(CONTEXT_NOTIFICATION)),
|
||||
}),
|
||||
type: string().required(),
|
||||
@@ -27,4 +27,4 @@ export const SUBSCRIBE_SCHEMA = object({
|
||||
toSeconds(parse(original)) * 1000).optional(),
|
||||
accept: string().optional(),
|
||||
});
|
||||
export type Subscription = InferType<typeof SUBSCRIBE_SCHEMA>;
|
||||
export type NotificationChannel = InferType<typeof NOTIFICATION_CHANNEL_SCHEMA>;
|
||||
68
src/server/notifications/NotificationChannelStorage.ts
Normal file
68
src/server/notifications/NotificationChannelStorage.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
|
||||
/**
|
||||
* The info provided for a notification channel during a subscription.
|
||||
* `features` can contain custom values relevant for a specific channel type.
|
||||
*/
|
||||
export type NotificationChannelInfo<T = Record<string, unknown>> = {
|
||||
id: string;
|
||||
topic: string;
|
||||
type: string;
|
||||
startAt?: number;
|
||||
endAt?: number;
|
||||
accept?: string;
|
||||
rate?: number;
|
||||
state?: string;
|
||||
lastEmit: number;
|
||||
features: T;
|
||||
};
|
||||
|
||||
/**
|
||||
* Stores all the information necessary to keep track of notification channels.
|
||||
* Besides the standard channel info it also stores features specific to a certain channel type.
|
||||
*
|
||||
* This storage assumes that a channel can only have a single identifier as its topic.
|
||||
*/
|
||||
export interface NotificationChannelStorage<T extends Record<string, unknown> = Record<string, unknown>> {
|
||||
/**
|
||||
* Creates info corresponding to the given channel and features.
|
||||
* This does not store the generated info in the storage.
|
||||
* @param channel - Notification channel to generate info of.
|
||||
* @param features - Features to add to the info
|
||||
*/
|
||||
create: (channel: NotificationChannel, features: T) => NotificationChannelInfo<T>;
|
||||
|
||||
/**
|
||||
* Returns the info for the requested notification channel.
|
||||
* `undefined` if no match was found or if the notification channel expired.
|
||||
* @param id - The identifier of the notification channel.
|
||||
*/
|
||||
get: (id: string) => Promise<NotificationChannelInfo<T> | undefined>;
|
||||
|
||||
/**
|
||||
* Returns the identifiers of all notification channel entries that have the given identifier as their topic.
|
||||
* The identifiers can potentially correspond to expired channels.
|
||||
* @param topic - The identifier that is the topic.
|
||||
*/
|
||||
getAll: (topic: ResourceIdentifier) => Promise<string[]>;
|
||||
|
||||
/**
|
||||
* Adds the given info to the storage.
|
||||
* @param info - Info to add.
|
||||
*/
|
||||
add: (info: NotificationChannelInfo<T>) => Promise<void>;
|
||||
|
||||
/**
|
||||
* Updates the given notification channel info.
|
||||
* The `id` and the `topic` can not be updated.
|
||||
* @param info - The info to update.
|
||||
*/
|
||||
update: (info: NotificationChannelInfo<T>) => Promise<void>;
|
||||
|
||||
/**
|
||||
* Deletes the given notification channel from the storage.
|
||||
* @param id - The identifier of the notification channel
|
||||
*/
|
||||
delete: (id: string) => Promise<void>;
|
||||
}
|
||||
44
src/server/notifications/NotificationChannelType.ts
Normal file
44
src/server/notifications/NotificationChannelType.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import type { InferType } from 'yup';
|
||||
import type { Credentials } from '../../authentication/Credentials';
|
||||
import type { AccessMap } from '../../authorization/permissions/Permissions';
|
||||
import type { Representation } from '../../http/representation/Representation';
|
||||
import type { NOTIFICATION_CHANNEL_SCHEMA } from './NotificationChannel';
|
||||
import type { NotificationChannelInfo } from './NotificationChannelStorage';
|
||||
|
||||
export interface NotificationChannelResponse<TFeat extends Record<string, unknown> = Record<string, unknown>> {
|
||||
response: Representation;
|
||||
info: NotificationChannelInfo<TFeat>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A specific channel type as defined at
|
||||
* https://solidproject.org/TR/2022/notifications-protocol-20221231#notification-channel-types.
|
||||
*/
|
||||
export interface NotificationChannelType<
|
||||
TSub extends typeof NOTIFICATION_CHANNEL_SCHEMA = typeof NOTIFICATION_CHANNEL_SCHEMA,
|
||||
TFeat extends Record<string, unknown> = Record<string, unknown>> {
|
||||
/**
|
||||
* The expected type value in the JSON-LD body of requests subscribing for this notification channel type.
|
||||
*/
|
||||
readonly type: string;
|
||||
/**
|
||||
* An extension of {@link NOTIFICATION_CHANNEL_SCHEMA}
|
||||
* that can be used to parse and validate an incoming subscription request with a notification channel body.
|
||||
*/
|
||||
readonly schema: TSub;
|
||||
/**
|
||||
* Determines which modes are required to allow the given notification channel.
|
||||
* @param channel - The notification channel to verify.
|
||||
*
|
||||
* @returns The required modes.
|
||||
*/
|
||||
extractModes: (channel: InferType<TSub>) => Promise<AccessMap>;
|
||||
/**
|
||||
* Registers the given notification channel.
|
||||
* @param channel - The notification channel to register.
|
||||
* @param credentials - The credentials of the client trying to subscribe.
|
||||
*
|
||||
* @returns A {@link Representation} to return as a response and the generated {@link NotificationChannelInfo}.
|
||||
*/
|
||||
subscribe: (channel: InferType<TSub>, credentials: Credentials) => Promise<NotificationChannelResponse<TFeat>>;
|
||||
}
|
||||
@@ -1,14 +1,14 @@
|
||||
import type { Representation } from '../../http/representation/Representation';
|
||||
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
|
||||
import type { SubscriptionInfo } from './SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from './NotificationChannelStorage';
|
||||
|
||||
export interface NotificationEmitterInput<T = Record<string, unknown>> {
|
||||
representation: Representation;
|
||||
info: SubscriptionInfo<T>;
|
||||
info: NotificationChannelInfo<T>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a serialized Notification to the subscription defined by the info.
|
||||
* Emits a serialized Notification to the channel defined by the info.
|
||||
*/
|
||||
export abstract class NotificationEmitter<T = Record<string, unknown>>
|
||||
extends AsyncHandler<NotificationEmitterInput<T>> {}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
|
||||
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
|
||||
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
|
||||
import type { SubscriptionInfo } from './SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from './NotificationChannelStorage';
|
||||
|
||||
export interface NotificationHandlerInput {
|
||||
topic: ResourceIdentifier;
|
||||
info: SubscriptionInfo;
|
||||
info: NotificationChannelInfo;
|
||||
activity?: VocabularyTerm<typeof AS>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes sure an activity gets emitted to the relevant subscription based on the given info.
|
||||
* Makes sure an activity gets emitted to the relevant channel based on the given info.
|
||||
*/
|
||||
export abstract class NotificationHandler extends AsyncHandler<NotificationHandlerInput> {}
|
||||
|
||||
@@ -13,14 +13,14 @@ import { readableToString } from '../../util/StreamUtil';
|
||||
import type { HttpRequest } from '../HttpRequest';
|
||||
import type { OperationHttpHandlerInput } from '../OperationHttpHandler';
|
||||
import { OperationHttpHandler } from '../OperationHttpHandler';
|
||||
import type { Subscription } from './Subscription';
|
||||
import type { SubscriptionType } from './SubscriptionType';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
import type { NotificationChannelType } from './NotificationChannelType';
|
||||
|
||||
export interface NotificationSubscriberArgs {
|
||||
/**
|
||||
* The {@link SubscriptionType} with all the necessary information.
|
||||
* The {@link NotificationChannelType} with all the necessary information.
|
||||
*/
|
||||
subscriptionType: SubscriptionType;
|
||||
channelType: NotificationChannelType;
|
||||
/**
|
||||
* Used to extract the credentials from the request.
|
||||
*/
|
||||
@@ -34,23 +34,23 @@ export interface NotificationSubscriberArgs {
|
||||
*/
|
||||
authorizer: Authorizer;
|
||||
/**
|
||||
* Overrides the expiration feature of subscriptions by making sure they always expire after the `maxDuration` value.
|
||||
* In case the expiration of the subscription is shorter than `maxDuration` the original value will be kept.
|
||||
* Overrides the expiration feature of channels by making sure they always expire after the `maxDuration` value.
|
||||
* In case the expiration of the channel is shorter than `maxDuration` the original value will be kept.
|
||||
* Value is set in minutes. 0 is infinite.
|
||||
*/
|
||||
maxDuration?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles notification subscriptions.
|
||||
* Handles notification subscriptions by creating a notification channel.
|
||||
*
|
||||
* Uses the information from the provided {@link SubscriptionType} to validate the input
|
||||
* Uses the information from the provided {@link NotificationChannelType} to validate the input
|
||||
* and verify the request has the required permissions available.
|
||||
*/
|
||||
export class NotificationSubscriber extends OperationHttpHandler {
|
||||
protected logger = getLoggerFor(this);
|
||||
|
||||
private readonly subscriptionType: SubscriptionType;
|
||||
private readonly channelType: NotificationChannelType;
|
||||
private readonly credentialsExtractor: CredentialsExtractor;
|
||||
private readonly permissionReader: PermissionReader;
|
||||
private readonly authorizer: Authorizer;
|
||||
@@ -58,7 +58,7 @@ export class NotificationSubscriber extends OperationHttpHandler {
|
||||
|
||||
public constructor(args: NotificationSubscriberArgs) {
|
||||
super();
|
||||
this.subscriptionType = args.subscriptionType;
|
||||
this.channelType = args.channelType;
|
||||
this.credentialsExtractor = args.credentialsExtractor;
|
||||
this.permissionReader = args.permissionReader;
|
||||
this.authorizer = args.authorizer;
|
||||
@@ -70,41 +70,41 @@ export class NotificationSubscriber extends OperationHttpHandler {
|
||||
throw new UnsupportedMediaTypeHttpError('Subscribe bodies need to be application/ld+json.');
|
||||
}
|
||||
|
||||
let subscription: Subscription;
|
||||
let channel: NotificationChannel;
|
||||
try {
|
||||
const json = JSON.parse(await readableToString(operation.body.data));
|
||||
subscription = await this.subscriptionType.schema.validate(json);
|
||||
channel = await this.channelType.schema.validate(json);
|
||||
} catch (error: unknown) {
|
||||
throw new UnprocessableEntityHttpError(`Unable to process subscription: ${createErrorMessage(error)}`);
|
||||
throw new UnprocessableEntityHttpError(`Unable to process notification channel: ${createErrorMessage(error)}`);
|
||||
}
|
||||
|
||||
if (this.maxDuration) {
|
||||
const duration = (subscription.endAt ?? Number.POSITIVE_INFINITY) - Date.now();
|
||||
const duration = (channel.endAt ?? Number.POSITIVE_INFINITY) - Date.now();
|
||||
if (duration > this.maxDuration) {
|
||||
subscription.endAt = Date.now() + this.maxDuration;
|
||||
channel.endAt = Date.now() + this.maxDuration;
|
||||
}
|
||||
}
|
||||
|
||||
// Verify if the client is allowed to subscribe
|
||||
const credentials = await this.authorize(request, subscription);
|
||||
const credentials = await this.authorize(request, channel);
|
||||
|
||||
const { response } = await this.subscriptionType.subscribe(subscription, credentials);
|
||||
const { response } = await this.channelType.subscribe(channel, credentials);
|
||||
|
||||
return new OkResponseDescription(response.metadata, response.data);
|
||||
}
|
||||
|
||||
private async authorize(request: HttpRequest, subscription: Subscription): Promise<Credentials> {
|
||||
private async authorize(request: HttpRequest, channel: NotificationChannel): Promise<Credentials> {
|
||||
const credentials = await this.credentialsExtractor.handleSafe(request);
|
||||
this.logger.debug(`Extracted credentials: ${JSON.stringify(credentials)}`);
|
||||
|
||||
const requestedModes = await this.subscriptionType.extractModes(subscription);
|
||||
const requestedModes = await this.channelType.extractModes(channel);
|
||||
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ]}`);
|
||||
|
||||
const availablePermissions = await this.permissionReader.handleSafe({ credentials, requestedModes });
|
||||
this.logger.debug(`Available permissions are ${[ ...availablePermissions.entries() ]}`);
|
||||
|
||||
await this.authorizer.handleSafe({ credentials, requestedModes, availablePermissions });
|
||||
this.logger.verbose(`Authorization succeeded, creating subscription`);
|
||||
this.logger.verbose(`Authorization succeeded, creating notification channel`);
|
||||
|
||||
return credentials;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
|
||||
import type { SubscriptionInfo } from './SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from './NotificationChannelStorage';
|
||||
|
||||
/**
|
||||
* Handles the `state` feature of notifications.
|
||||
* Every implementation of a specific subscription type should make sure an instance of this class
|
||||
* Every implementation of a specific notification channel type should make sure an instance of this class
|
||||
* gets called when a `state` notification can be sent out.
|
||||
*
|
||||
* Implementations of this class should handle all subscriptions and filter out those that need a `state` notification.
|
||||
* Implementations of this class should handle all channels and filter out those that need a `state` notification.
|
||||
*/
|
||||
export abstract class StateHandler extends AsyncHandler<{ info: SubscriptionInfo }> {}
|
||||
export abstract class StateHandler extends AsyncHandler<{ info: NotificationChannelInfo }> {}
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
|
||||
import type { Subscription } from './Subscription';
|
||||
|
||||
/**
|
||||
* The info provided during a subscription.
|
||||
* `features` can contain custom values relevant for a specific subscription type.
|
||||
*/
|
||||
export type SubscriptionInfo<T = Record<string, unknown>> = {
|
||||
id: string;
|
||||
topic: string;
|
||||
type: string;
|
||||
startAt?: number;
|
||||
endAt?: number;
|
||||
accept?: string;
|
||||
rate?: number;
|
||||
state?: string;
|
||||
lastEmit: number;
|
||||
features: T;
|
||||
};
|
||||
|
||||
/**
|
||||
* Stores all the information necessary to keep track of notification subscriptions.
|
||||
* Besides the standard subscription info it also stores features specific to a certain subscription type.
|
||||
*
|
||||
* This storage assumes that a subscription can only have a single identifier as its topic.
|
||||
*/
|
||||
export interface SubscriptionStorage<T extends Record<string, unknown> = Record<string, unknown>> {
|
||||
/**
|
||||
* Creates info corresponding to the given subscription and features.
|
||||
* This does not store the generated info in the storage.
|
||||
* @param subscription - Subscription to generate info of.
|
||||
* @param features - Features to add to the info
|
||||
*/
|
||||
create: (subscription: Subscription, features: T) => SubscriptionInfo<T>;
|
||||
|
||||
/**
|
||||
* Returns the info for the requested subscription.
|
||||
* `undefined` if no match was found or if the subscription expired.
|
||||
* @param id - The identifier of the subscription.
|
||||
*/
|
||||
get: (id: string) => Promise<SubscriptionInfo<T> | undefined>;
|
||||
|
||||
/**
|
||||
* Returns the identifiers of all subscription entries that have the given identifier as their topic.
|
||||
* @param topic - The identifier that is the topic.
|
||||
*/
|
||||
getAll: (topic: ResourceIdentifier) => Promise<string[]>;
|
||||
|
||||
/**
|
||||
* Adds the given info to the storage.
|
||||
* @param info - Info to add.
|
||||
*/
|
||||
add: (info: SubscriptionInfo<T>) => Promise<void>;
|
||||
|
||||
/**
|
||||
* Updates the given subscription info.
|
||||
* The `id` and the `topic` can not be updated.
|
||||
* @param info - The info to update.
|
||||
*/
|
||||
update: (info: SubscriptionInfo<T>) => Promise<void>;
|
||||
|
||||
/**
|
||||
* Deletes the given subscription from the storage.
|
||||
* @param id - The identifier of the subscription
|
||||
*/
|
||||
delete: (id: string) => Promise<void>;
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
import type { InferType } from 'yup';
|
||||
import type { Credentials } from '../../authentication/Credentials';
|
||||
import type { AccessMap } from '../../authorization/permissions/Permissions';
|
||||
import type { Representation } from '../../http/representation/Representation';
|
||||
import type { SUBSCRIBE_SCHEMA } from './Subscription';
|
||||
import type { SubscriptionInfo } from './SubscriptionStorage';
|
||||
|
||||
export interface SubscriptionResponse<TFeat extends Record<string, unknown> = Record<string, unknown>> {
|
||||
response: Representation;
|
||||
info: SubscriptionInfo<TFeat>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A specific subscription type as defined at https://solidproject.org/TR/notifications-protocol#subscription-types.
|
||||
*/
|
||||
export interface SubscriptionType<TSub extends typeof SUBSCRIBE_SCHEMA = typeof SUBSCRIBE_SCHEMA,
|
||||
TFeat extends Record<string, unknown> = Record<string, unknown>> {
|
||||
/**
|
||||
* The expected type value in the JSON-LD body of requests subscribing for this subscription type.
|
||||
*/
|
||||
readonly type: string;
|
||||
/**
|
||||
* An extension of {@link SUBSCRIBE_SCHEMA} that can be used to parse and valide an incoming subscription request.
|
||||
*/
|
||||
readonly schema: TSub;
|
||||
/**
|
||||
* Determines which modes are required to allow the given subscription.
|
||||
* @param subscription - The subscription to verify.
|
||||
*
|
||||
* @returns The required modes.
|
||||
*/
|
||||
extractModes: (subscription: InferType<TSub>) => Promise<AccessMap>;
|
||||
/**
|
||||
* Registers the given subscription.
|
||||
* @param subscription - The subscription to register.
|
||||
* @param credentials - The credentials of the client trying to subscribe.
|
||||
*
|
||||
* @returns A {@link Representation} to return as a response and the generated {@link SubscriptionInfo}.
|
||||
*/
|
||||
subscribe: (subscription: InferType<TSub>, credentials: Credentials) => Promise<SubscriptionResponse<TFeat>>;
|
||||
}
|
||||
@@ -3,7 +3,7 @@ import type { NotificationHandlerInput } from './NotificationHandler';
|
||||
import { NotificationHandler } from './NotificationHandler';
|
||||
|
||||
/**
|
||||
* A {@link NotificationHandler} that only accepts input for a specific subscription type.
|
||||
* A {@link NotificationHandler} that only accepts input for a specific notification channel type.
|
||||
*/
|
||||
export class TypedNotificationHandler extends NotificationHandler {
|
||||
private readonly type: string;
|
||||
@@ -17,7 +17,7 @@ export class TypedNotificationHandler extends NotificationHandler {
|
||||
|
||||
public async canHandle(input: NotificationHandlerInput): Promise<void> {
|
||||
if (input.info.type !== this.type) {
|
||||
throw new NotImplementedHttpError(`Only ${this.type} subscriptions are supported.`);
|
||||
throw new NotImplementedHttpError(`Only ${this.type} notification channels are supported.`);
|
||||
}
|
||||
await this.source.canHandle(input);
|
||||
}
|
||||
|
||||
@@ -12,14 +12,14 @@ import { createErrorMessage } from '../../../util/errors/ErrorUtil';
|
||||
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { endOfStream } from '../../../util/StreamUtil';
|
||||
import { CONTEXT_NOTIFICATION } from '../Notification';
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../NotificationChannel';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { NotificationChannelResponse, NotificationChannelType } from '../NotificationChannelType';
|
||||
import type { StateHandler } from '../StateHandler';
|
||||
import { SUBSCRIBE_SCHEMA } from '../Subscription';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { SubscriptionResponse, SubscriptionType } from '../SubscriptionType';
|
||||
import { generateWebHookUnsubscribeUrl } from './WebHook2021Util';
|
||||
|
||||
const type = 'WebHookSubscription2021';
|
||||
const schema = SUBSCRIBE_SCHEMA.shape({
|
||||
const schema = NOTIFICATION_CHANNEL_SCHEMA.shape({
|
||||
type: string().required().oneOf([ type ]),
|
||||
// Not using `.url()` validator since it does not support localhost URLs
|
||||
target: string().required(),
|
||||
@@ -28,36 +28,36 @@ const schema = SUBSCRIBE_SCHEMA.shape({
|
||||
export type WebHookFeatures = { target: string; webId: string };
|
||||
|
||||
/**
|
||||
* The notification subscription type WebHookSubscription2021 as described in
|
||||
* The notification channel type WebHookSubscription2021 as described in
|
||||
* https://github.com/solid/notifications/blob/main/webhook-subscription-2021.md
|
||||
*
|
||||
* Requires read permissions on a resource to be able to receive notifications.
|
||||
*
|
||||
* Also handles the `state` feature if present.
|
||||
*/
|
||||
export class WebHookSubscription2021 implements SubscriptionType<typeof schema, WebHookFeatures> {
|
||||
export class WebHookSubscription2021 implements NotificationChannelType<typeof schema, WebHookFeatures> {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage<WebHookFeatures>;
|
||||
private readonly storage: NotificationChannelStorage<WebHookFeatures>;
|
||||
private readonly unsubscribePath: string;
|
||||
private readonly stateHandler: StateHandler;
|
||||
|
||||
public readonly type = type;
|
||||
public readonly schema = schema;
|
||||
|
||||
public constructor(storage: SubscriptionStorage<WebHookFeatures>, unsubscribeRoute: InteractionRoute,
|
||||
public constructor(storage: NotificationChannelStorage<WebHookFeatures>, unsubscribeRoute: InteractionRoute,
|
||||
stateHandler: StateHandler) {
|
||||
this.storage = storage;
|
||||
this.unsubscribePath = unsubscribeRoute.getPath();
|
||||
this.stateHandler = stateHandler;
|
||||
}
|
||||
|
||||
public async extractModes(subscription: InferType<typeof schema>): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: subscription.topic }, AccessMode.read ]]);
|
||||
public async extractModes(channel: InferType<typeof schema>): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: channel.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
public async subscribe(subscription: InferType<typeof schema>, credentials: Credentials):
|
||||
Promise<SubscriptionResponse<WebHookFeatures>> {
|
||||
public async subscribe(channel: InferType<typeof schema>, credentials: Credentials):
|
||||
Promise<NotificationChannelResponse<WebHookFeatures>> {
|
||||
const webId = credentials.agent?.webId;
|
||||
|
||||
if (!webId) {
|
||||
@@ -66,13 +66,13 @@ export class WebHookSubscription2021 implements SubscriptionType<typeof schema,
|
||||
);
|
||||
}
|
||||
|
||||
const info = this.storage.create(subscription, { target: subscription.target, webId });
|
||||
const info = this.storage.create(channel, { target: channel.target, webId });
|
||||
await this.storage.add(info);
|
||||
|
||||
const jsonld = {
|
||||
'@context': [ CONTEXT_NOTIFICATION ],
|
||||
type: this.type,
|
||||
target: subscription.target,
|
||||
target: channel.target,
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
unsubscribe_endpoint: generateWebHookUnsubscribeUrl(this.unsubscribePath, info.id),
|
||||
};
|
||||
|
||||
@@ -6,7 +6,7 @@ import { ForbiddenHttpError } from '../../../util/errors/ForbiddenHttpError';
|
||||
import { NotFoundHttpError } from '../../../util/errors/NotFoundHttpError';
|
||||
import type { OperationHttpHandlerInput } from '../../OperationHttpHandler';
|
||||
import { OperationHttpHandler } from '../../OperationHttpHandler';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import { parseWebHookUnsubscribeUrl } from './WebHook2021Util';
|
||||
import type { WebHookFeatures } from './WebHookSubscription2021';
|
||||
|
||||
@@ -18,9 +18,9 @@ export class WebHookUnsubscriber extends OperationHttpHandler {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly credentialsExtractor: CredentialsExtractor;
|
||||
private readonly storage: SubscriptionStorage<WebHookFeatures>;
|
||||
private readonly storage: NotificationChannelStorage<WebHookFeatures>;
|
||||
|
||||
public constructor(credentialsExtractor: CredentialsExtractor, storage: SubscriptionStorage<WebHookFeatures>) {
|
||||
public constructor(credentialsExtractor: CredentialsExtractor, storage: NotificationChannelStorage<WebHookFeatures>) {
|
||||
super();
|
||||
this.credentialsExtractor = credentialsExtractor;
|
||||
this.storage = storage;
|
||||
@@ -38,7 +38,7 @@ export class WebHookUnsubscriber extends OperationHttpHandler {
|
||||
throw new ForbiddenHttpError();
|
||||
}
|
||||
|
||||
this.logger.debug(`Deleting WebHook subscription ${id}`);
|
||||
this.logger.debug(`Deleting WebHook notification channel ${id}`);
|
||||
await this.storage.delete(id);
|
||||
|
||||
return new ResetResponseDescription();
|
||||
|
||||
@@ -8,7 +8,7 @@ import type { NotificationEmitterInput } from '../NotificationEmitter';
|
||||
/**
|
||||
* Emits notifications on WebSocketSubscription2021 subscription.
|
||||
* Uses the WebSockets found in the provided map.
|
||||
* The key should be the identifier of the matching subscription.
|
||||
* The key should be the identifier of the matching channel.
|
||||
*/
|
||||
export class WebSocket2021Emitter extends NotificationEmitter {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import type { WebSocket } from 'ws';
|
||||
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
|
||||
import type { SubscriptionInfo } from '../SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../NotificationChannelStorage';
|
||||
|
||||
export interface WebSocket2021HandlerInput {
|
||||
info: SubscriptionInfo;
|
||||
info: NotificationChannelInfo;
|
||||
webSocket: WebSocket;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ import type { WebSocket } from 'ws';
|
||||
import type { InteractionRoute } from '../../../identity/interaction/routing/InteractionRoute';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { WebSocketServerConfigurator } from '../../WebSocketServerConfigurator';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { WebSocket2021Handler } from './WebSocket2021Handler';
|
||||
import { parseWebSocketRequest } from './WebSocket2021Util';
|
||||
|
||||
@@ -14,11 +14,11 @@ import { parseWebSocketRequest } from './WebSocket2021Util';
|
||||
export class WebSocket2021Listener extends WebSocketServerConfigurator {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly handler: WebSocket2021Handler;
|
||||
private readonly path: string;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, handler: WebSocket2021Handler, route: InteractionRoute) {
|
||||
public constructor(storage: NotificationChannelStorage, handler: WebSocket2021Handler, route: InteractionRoute) {
|
||||
super();
|
||||
this.storage = storage;
|
||||
this.handler = handler;
|
||||
@@ -42,7 +42,7 @@ export class WebSocket2021Listener extends WebSocketServerConfigurator {
|
||||
|
||||
if (!info) {
|
||||
// Info not being there implies it has expired
|
||||
webSocket.send(`Subscription has expired`);
|
||||
webSocket.send(`Notification channel has expired`);
|
||||
return webSocket.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -2,26 +2,27 @@ import type { WebSocket } from 'ws';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import type { SetMultiMap } from '../../../util/map/SetMultiMap';
|
||||
import { setSafeInterval } from '../../../util/TimerUtil';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { WebSocket2021HandlerInput } from './WebSocket2021Handler';
|
||||
import { WebSocket2021Handler } from './WebSocket2021Handler';
|
||||
|
||||
/**
|
||||
* Keeps track of the WebSockets that were opened for a WebSocketSubscription2021 subscription.
|
||||
* The WebSockets are stored in the map using the identifier of the matching subscription.
|
||||
* Keeps track of the WebSockets that were opened for a WebSocketSubscription2021 channel.
|
||||
* The WebSockets are stored in the map using the identifier of the matching channel.
|
||||
*
|
||||
* `cleanupTimer` defines in minutes how often the stored WebSockets are closed
|
||||
* if their corresponding subscription has expired.
|
||||
* if their corresponding channel has expired.
|
||||
* Defaults to 60 minutes.
|
||||
* Open WebSockets will not receive notifications if their subscription expired.
|
||||
* Open WebSockets will not receive notifications if their channel expired.
|
||||
*/
|
||||
export class WebSocket2021Storer extends WebSocket2021Handler {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly socketMap: SetMultiMap<string, WebSocket>;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, socketMap: SetMultiMap<string, WebSocket>, cleanupTimer = 60) {
|
||||
public constructor(storage: NotificationChannelStorage, socketMap: SetMultiMap<string, WebSocket>,
|
||||
cleanupTimer = 60) {
|
||||
super();
|
||||
this.socketMap = socketMap;
|
||||
this.storage = storage;
|
||||
@@ -40,7 +41,7 @@ export class WebSocket2021Storer extends WebSocket2021Handler {
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all WebSockets that are attached to a subscription that no longer exists.
|
||||
* Close all WebSockets that are attached to a channel that no longer exists.
|
||||
*/
|
||||
private async closeExpiredSockets(): Promise<void> {
|
||||
this.logger.debug('Closing expired WebSockets');
|
||||
|
||||
@@ -7,43 +7,43 @@ import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
|
||||
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { CONTEXT_NOTIFICATION } from '../Notification';
|
||||
import type { Subscription } from '../Subscription';
|
||||
import { SUBSCRIBE_SCHEMA } from '../Subscription';
|
||||
import type { SubscriptionStorage } from '../SubscriptionStorage';
|
||||
import type { SubscriptionResponse, SubscriptionType } from '../SubscriptionType';
|
||||
import type { NotificationChannel } from '../NotificationChannel';
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../NotificationChannel';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { NotificationChannelResponse, NotificationChannelType } from '../NotificationChannelType';
|
||||
import { generateWebSocketUrl } from './WebSocket2021Util';
|
||||
|
||||
const type = 'WebSocketSubscription2021';
|
||||
const schema = SUBSCRIBE_SCHEMA.shape({
|
||||
const schema = NOTIFICATION_CHANNEL_SCHEMA.shape({
|
||||
type: string().required().oneOf([ type ]),
|
||||
});
|
||||
|
||||
/**
|
||||
* The notification subscription type WebSocketSubscription2021 as described in
|
||||
* The notification channel type WebSocketSubscription2021 as described in
|
||||
* https://solidproject.org/TR/websocket-subscription-2021
|
||||
*
|
||||
* Requires read permissions on a resource to be able to receive notifications.
|
||||
*/
|
||||
export class WebSocketSubscription2021 implements SubscriptionType<typeof schema> {
|
||||
export class WebSocketSubscription2021 implements NotificationChannelType<typeof schema> {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: SubscriptionStorage;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly path: string;
|
||||
|
||||
public readonly type = type;
|
||||
public readonly schema = schema;
|
||||
|
||||
public constructor(storage: SubscriptionStorage, route: InteractionRoute) {
|
||||
public constructor(storage: NotificationChannelStorage, route: InteractionRoute) {
|
||||
this.storage = storage;
|
||||
this.path = route.getPath();
|
||||
}
|
||||
|
||||
public async extractModes(subscription: Subscription): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: subscription.topic }, AccessMode.read ]]);
|
||||
public async extractModes(channel: NotificationChannel): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: channel.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
public async subscribe(subscription: Subscription): Promise<SubscriptionResponse> {
|
||||
const info = this.storage.create(subscription, {});
|
||||
public async subscribe(channel: NotificationChannel): Promise<NotificationChannelResponse> {
|
||||
const info = this.storage.create(channel, {});
|
||||
await this.storage.add(info);
|
||||
|
||||
const jsonld = {
|
||||
|
||||
@@ -6,7 +6,7 @@ import { NotificationGenerator } from './NotificationGenerator';
|
||||
|
||||
/**
|
||||
* Determines the most relevant activity for a {@link Notification} in case none was provided.
|
||||
* This is relevant for the `state` feature where a subscription needs to know the current state of a resource.
|
||||
* This is relevant for the `state` feature where a notification channel needs to know the current state of a resource.
|
||||
*/
|
||||
export class StateNotificationGenerator extends NotificationGenerator {
|
||||
private readonly source: NotificationGenerator;
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
import type { Representation } from '../../../http/representation/Representation';
|
||||
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
|
||||
import type { Notification } from '../Notification';
|
||||
import type { SubscriptionInfo } from '../SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../NotificationChannelStorage';
|
||||
|
||||
export interface NotificationSerializerInput {
|
||||
notification: Notification;
|
||||
info: SubscriptionInfo;
|
||||
info: NotificationChannelInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a {@link Notification} into a {@link Representation} that can be transmitted.
|
||||
*
|
||||
* The reason this is a separate class in between a generator and emitter,
|
||||
* is so a specific subscription type can add extra metadata to the Representation if needed.
|
||||
* is so a specific notification channel type can add extra metadata to the Representation if needed.
|
||||
*/
|
||||
export abstract class NotificationSerializer extends AsyncHandler<NotificationSerializerInput, Representation> { }
|
||||
|
||||
@@ -164,7 +164,7 @@ describe.each(stores)('A server supporting WebSocketSubscription2021 using %s',
|
||||
acl:accessTo <./restricted>.`;
|
||||
await store.setRepresentation({ path: `${restricted}.acl` }, new BasicRepresentation(restrictedAcl, 'text/turtle'));
|
||||
|
||||
const subscription = {
|
||||
const channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'WebSocketSubscription2021',
|
||||
topic: restricted,
|
||||
@@ -174,7 +174,7 @@ describe.each(stores)('A server supporting WebSocketSubscription2021 using %s',
|
||||
let response = await fetch(subscriptionUrl, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/ld+json' },
|
||||
body: JSON.stringify(subscription),
|
||||
body: JSON.stringify(channel),
|
||||
});
|
||||
expect(response.status).toBe(401);
|
||||
|
||||
@@ -185,7 +185,7 @@ describe.each(stores)('A server supporting WebSocketSubscription2021 using %s',
|
||||
authorization: `WebID ${webId}`,
|
||||
'content-type': 'application/ld+json',
|
||||
},
|
||||
body: JSON.stringify(subscription),
|
||||
body: JSON.stringify(channel),
|
||||
});
|
||||
expect(response.status).toBe(200);
|
||||
});
|
||||
@@ -211,7 +211,7 @@ describe.each(stores)('A server supporting WebSocketSubscription2021 using %s',
|
||||
expectNotification(notification, topic, 'Update');
|
||||
});
|
||||
|
||||
it('removes expired subscriptions.', async(): Promise<void> => {
|
||||
it('removes expired channels.', async(): Promise<void> => {
|
||||
const { source } = await subscribe(notificationType, webId, subscriptionUrl, topic, { endAt: 1 }) as any;
|
||||
|
||||
const socket = new WebSocket(source);
|
||||
@@ -219,6 +219,6 @@ describe.each(stores)('A server supporting WebSocketSubscription2021 using %s',
|
||||
await new Promise<void>((resolve): any => socket.on('close', resolve));
|
||||
|
||||
const message = (await messagePromise).toString();
|
||||
expect(message).toBe('Subscription has expired');
|
||||
expect(message).toBe('Notification channel has expired');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import { BaseStateHandler } from '../../../../src/server/notifications/BaseStateHandler';
|
||||
import type {
|
||||
NotificationChannelInfo,
|
||||
NotificationChannelStorage,
|
||||
} from '../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { NotificationHandler } from '../../../../src/server/notifications/NotificationHandler';
|
||||
import type { SubscriptionInfo, SubscriptionStorage } from '../../../../src/server/notifications/SubscriptionStorage';
|
||||
|
||||
describe('A BaseStateHandler', (): void => {
|
||||
let info: SubscriptionInfo;
|
||||
let info: NotificationChannelInfo;
|
||||
let notificationHandler: jest.Mocked<NotificationHandler>;
|
||||
let storage: jest.Mocked<SubscriptionStorage>;
|
||||
let storage: jest.Mocked<NotificationChannelStorage>;
|
||||
let handler: BaseStateHandler;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
|
||||
@@ -3,9 +3,9 @@ import type { ResourceIdentifier } from '../../../../src/http/representation/Res
|
||||
import { ComposedNotificationHandler } from '../../../../src/server/notifications/ComposedNotificationHandler';
|
||||
import type { NotificationGenerator } from '../../../../src/server/notifications/generate/NotificationGenerator';
|
||||
import type { Notification } from '../../../../src/server/notifications/Notification';
|
||||
import type { NotificationChannelInfo } from '../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { NotificationEmitter } from '../../../../src/server/notifications/NotificationEmitter';
|
||||
import type { NotificationSerializer } from '../../../../src/server/notifications/serialize/NotificationSerializer';
|
||||
import type { SubscriptionInfo } from '../../../../src/server/notifications/SubscriptionStorage';
|
||||
|
||||
describe('A ComposedNotificationHandler', (): void => {
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
@@ -20,7 +20,7 @@ describe('A ComposedNotificationHandler', (): void => {
|
||||
published: '123',
|
||||
state: '123',
|
||||
};
|
||||
let info: SubscriptionInfo;
|
||||
let info: NotificationChannelInfo;
|
||||
const representation = new BasicRepresentation();
|
||||
let generator: jest.Mocked<NotificationGenerator>;
|
||||
let serializer: jest.Mocked<NotificationSerializer>;
|
||||
|
||||
@@ -2,9 +2,9 @@ import { v4 } from 'uuid';
|
||||
import type { ResourceIdentifier } from '../../../../src/http/representation/ResourceIdentifier';
|
||||
import type { Logger } from '../../../../src/logging/Logger';
|
||||
import { getLoggerFor } from '../../../../src/logging/LogUtil';
|
||||
import { KeyValueSubscriptionStorage } from '../../../../src/server/notifications/KeyValueSubscriptionStorage';
|
||||
import type { Subscription } from '../../../../src/server/notifications/Subscription';
|
||||
import type { SubscriptionInfo } from '../../../../src/server/notifications/SubscriptionStorage';
|
||||
import { KeyValueChannelStorage } from '../../../../src/server/notifications/KeyValueChannelStorage';
|
||||
import type { NotificationChannel } from '../../../../src/server/notifications/NotificationChannel';
|
||||
import type { NotificationChannelInfo } from '../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { KeyValueStorage } from '../../../../src/storage/keyvalue/KeyValueStorage';
|
||||
import type { ReadWriteLocker } from '../../../../src/util/locking/ReadWriteLocker';
|
||||
import resetAllMocks = jest.resetAllMocks;
|
||||
@@ -15,21 +15,21 @@ jest.mock('../../../../src/logging/LogUtil', (): any => {
|
||||
return { getLoggerFor: (): Logger => logger };
|
||||
});
|
||||
|
||||
describe('A KeyValueSubscriptionStorage', (): void => {
|
||||
describe('A KeyValueChannelStorage', (): void => {
|
||||
const logger = getLoggerFor('mock');
|
||||
const topic = 'http://example.com/foo';
|
||||
const identifier = { path: topic };
|
||||
const subscription = {
|
||||
const channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'WebSocketSubscription2021',
|
||||
topic,
|
||||
} as Subscription;
|
||||
} as NotificationChannel;
|
||||
const features = { aa: 'bb' };
|
||||
let info: SubscriptionInfo<Record<string, string>>;
|
||||
let info: NotificationChannelInfo<Record<string, string>>;
|
||||
let internalMap: Map<string, any>;
|
||||
let internalStorage: KeyValueStorage<string, any>;
|
||||
let locker: ReadWriteLocker;
|
||||
let storage: KeyValueSubscriptionStorage<Record<string, string>>;
|
||||
let storage: KeyValueChannelStorage<Record<string, string>>;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
resetAllMocks();
|
||||
@@ -50,12 +50,12 @@ describe('A KeyValueSubscriptionStorage', (): void => {
|
||||
withReadLock: jest.fn(),
|
||||
};
|
||||
|
||||
storage = new KeyValueSubscriptionStorage(internalStorage, locker);
|
||||
storage = new KeyValueChannelStorage(internalStorage, locker);
|
||||
});
|
||||
|
||||
describe('#create', (): void => {
|
||||
it('creates info based on a subscription.', async(): Promise<void> => {
|
||||
expect(storage.create(subscription, features)).toEqual(info);
|
||||
it('creates info based on a notification channel.', async(): Promise<void> => {
|
||||
expect(storage.create(channel, features)).toEqual(info);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -119,7 +119,8 @@ describe('A KeyValueSubscriptionStorage', (): void => {
|
||||
...info,
|
||||
topic: 'http://example.com/other',
|
||||
};
|
||||
await expect(storage.update(newInfo)).rejects.toThrow(`Trying to change the topic of subscription ${info.id}`);
|
||||
await expect(storage.update(newInfo)).rejects
|
||||
.toThrow(`Trying to change the topic of a notification channel ${info.id}`);
|
||||
});
|
||||
|
||||
it('rejects update request targeting a non-info value.', async(): Promise<void> => {
|
||||
@@ -130,7 +131,8 @@ describe('A KeyValueSubscriptionStorage', (): void => {
|
||||
...info,
|
||||
id,
|
||||
};
|
||||
await expect(storage.update(newInfo)).rejects.toThrow(`Trying to update ${id} which is not a SubscriptionInfo.`);
|
||||
await expect(storage.update(newInfo)).rejects
|
||||
.toThrow(`Trying to update ${id} which is not a NotificationChannelInfo.`);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,8 +4,11 @@ import type { Logger } from '../../../../src/logging/Logger';
|
||||
import { getLoggerFor } from '../../../../src/logging/LogUtil';
|
||||
import type { ActivityEmitter } from '../../../../src/server/notifications/ActivityEmitter';
|
||||
import { ListeningActivityHandler } from '../../../../src/server/notifications/ListeningActivityHandler';
|
||||
import type {
|
||||
NotificationChannelInfo,
|
||||
NotificationChannelStorage,
|
||||
} from '../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { NotificationHandler } from '../../../../src/server/notifications/NotificationHandler';
|
||||
import type { SubscriptionInfo, SubscriptionStorage } from '../../../../src/server/notifications/SubscriptionStorage';
|
||||
import { AS } from '../../../../src/util/Vocabularies';
|
||||
import { flushPromises } from '../../../util/Util';
|
||||
|
||||
@@ -18,8 +21,8 @@ describe('A ListeningActivityHandler', (): void => {
|
||||
const logger: jest.Mocked<Logger> = getLoggerFor('mock') as any;
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
const activity = AS.terms.Update;
|
||||
let info: SubscriptionInfo;
|
||||
let storage: jest.Mocked<SubscriptionStorage>;
|
||||
let info: NotificationChannelInfo;
|
||||
let storage: jest.Mocked<NotificationChannelStorage>;
|
||||
let emitter: ActivityEmitter;
|
||||
let notificationHandler: jest.Mocked<NotificationHandler>;
|
||||
let handler: ListeningActivityHandler;
|
||||
@@ -59,7 +62,7 @@ describe('A ListeningActivityHandler', (): void => {
|
||||
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('does not emit an event on subscriptions if their rate does not yet allow it.', async(): Promise<void> => {
|
||||
it('does not emit an event on channels if their rate does not yet allow it.', async(): Promise<void> => {
|
||||
info.rate = 100000;
|
||||
info.lastEmit = Date.now();
|
||||
|
||||
@@ -71,7 +74,7 @@ describe('A ListeningActivityHandler', (): void => {
|
||||
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('does not emit an event on subscriptions if their start time has not been reached.', async(): Promise<void> => {
|
||||
it('does not emit an event on channels if their start time has not been reached.', async(): Promise<void> => {
|
||||
info.startAt = Date.now() + 100000;
|
||||
|
||||
emitter.emit('changed', topic, activity);
|
||||
@@ -82,7 +85,7 @@ describe('A ListeningActivityHandler', (): void => {
|
||||
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('does not stop if one subscription causes an error.', async(): Promise<void> => {
|
||||
it('does not stop if one channel causes an error.', async(): Promise<void> => {
|
||||
storage.getAll.mockResolvedValue([ info.id, info.id ]);
|
||||
notificationHandler.handleSafe.mockRejectedValueOnce(new Error('bad input'));
|
||||
|
||||
@@ -106,7 +109,7 @@ describe('A ListeningActivityHandler', (): void => {
|
||||
expect(logger.error).toHaveBeenLastCalledWith(`Something went wrong emitting notifications: bad event`);
|
||||
});
|
||||
|
||||
it('ignores undefined subscriptions.', async(): Promise<void> => {
|
||||
it('ignores undefined channels.', async(): Promise<void> => {
|
||||
storage.get.mockResolvedValue(undefined);
|
||||
|
||||
emitter.emit('changed', topic, activity);
|
||||
|
||||
78
test/unit/server/notifications/NotificationChannel.test.ts
Normal file
78
test/unit/server/notifications/NotificationChannel.test.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../../../../src/server/notifications/NotificationChannel';
|
||||
|
||||
describe('A NotificationChannel', (): void => {
|
||||
const validChannel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'NotificationChannelType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
|
||||
it('requires a minimal set of values.', async(): Promise<void> => {
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.isValid(validChannel)).resolves.toBe(true);
|
||||
});
|
||||
|
||||
it('requires the notification context header to be present.', async(): Promise<void> => {
|
||||
let channel: unknown = {
|
||||
type: 'NotificationChannelType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.isValid(channel)).resolves.toBe(false);
|
||||
|
||||
channel = {
|
||||
'@context': [ 'wrongContext' ],
|
||||
type: 'NotificationChannelType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.isValid(channel)).resolves.toBe(false);
|
||||
|
||||
channel = {
|
||||
'@context': [ 'contextA', 'https://www.w3.org/ns/solid/notification/v1', 'contextB' ],
|
||||
type: 'NotificationChannelType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.isValid(channel)).resolves.toBe(true);
|
||||
|
||||
channel = {
|
||||
'@context': 'https://www.w3.org/ns/solid/notification/v1',
|
||||
type: 'NotificationChannelType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.isValid(channel)).resolves.toBe(true);
|
||||
});
|
||||
|
||||
it('converts the start date to a number.', async(): Promise<void> => {
|
||||
const date = '1988-03-09T14:48:00.000Z';
|
||||
const ms = Date.parse(date);
|
||||
|
||||
const channel: unknown = {
|
||||
...validChannel,
|
||||
startAt: date,
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.validate(channel)).resolves.toEqual(expect.objectContaining({
|
||||
startAt: ms,
|
||||
}));
|
||||
});
|
||||
|
||||
it('converts the end date to a number.', async(): Promise<void> => {
|
||||
const date = '1988-03-09T14:48:00.000Z';
|
||||
const ms = Date.parse(date);
|
||||
|
||||
const channel: unknown = {
|
||||
...validChannel,
|
||||
endAt: date,
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.validate(channel)).resolves.toEqual(expect.objectContaining({
|
||||
endAt: ms,
|
||||
}));
|
||||
});
|
||||
|
||||
it('converts the rate to a number.', async(): Promise<void> => {
|
||||
const channel: unknown = {
|
||||
...validChannel,
|
||||
rate: 'PT10S',
|
||||
};
|
||||
await expect(NOTIFICATION_CHANNEL_SCHEMA.validate(channel)).resolves.toEqual(expect.objectContaining({
|
||||
rate: 10 * 1000,
|
||||
}));
|
||||
});
|
||||
});
|
||||
@@ -8,43 +8,43 @@ import { BasicRepresentation } from '../../../../src/http/representation/BasicRe
|
||||
import type { ResourceIdentifier } from '../../../../src/http/representation/ResourceIdentifier';
|
||||
import type { HttpRequest } from '../../../../src/server/HttpRequest';
|
||||
import type { HttpResponse } from '../../../../src/server/HttpResponse';
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../../../../src/server/notifications/NotificationChannel';
|
||||
import type { NotificationChannelType } from '../../../../src/server/notifications/NotificationChannelType';
|
||||
import { NotificationSubscriber } from '../../../../src/server/notifications/NotificationSubscriber';
|
||||
import { SUBSCRIBE_SCHEMA } from '../../../../src/server/notifications/Subscription';
|
||||
import type { SubscriptionType } from '../../../../src/server/notifications/SubscriptionType';
|
||||
import { UnprocessableEntityHttpError } from '../../../../src/util/errors/UnprocessableEntityHttpError';
|
||||
import { UnsupportedMediaTypeHttpError } from '../../../../src/util/errors/UnsupportedMediaTypeHttpError';
|
||||
import { IdentifierMap, IdentifierSetMultiMap } from '../../../../src/util/map/IdentifierMap';
|
||||
import { guardedStreamFrom } from '../../../../src/util/StreamUtil';
|
||||
|
||||
describe('A NotificationSubscriber', (): void => {
|
||||
let subscriptionBody: any;
|
||||
let channel: any;
|
||||
const request: HttpRequest = {} as any;
|
||||
const response: HttpResponse = {} as any;
|
||||
let operation: Operation;
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
let subscriptionType: jest.Mocked<SubscriptionType>;
|
||||
let channelType: jest.Mocked<NotificationChannelType>;
|
||||
let credentialsExtractor: jest.Mocked<CredentialsExtractor>;
|
||||
let permissionReader: jest.Mocked<PermissionReader>;
|
||||
let authorizer: jest.Mocked<Authorizer>;
|
||||
let subscriber: NotificationSubscriber;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
subscriptionBody = {
|
||||
channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'SubscriptionType',
|
||||
type: 'NotificationChannelType',
|
||||
topic: topic.path,
|
||||
};
|
||||
|
||||
operation = {
|
||||
method: 'POST',
|
||||
target: { path: 'http://example.com/.notifications/websockets/' },
|
||||
body: new BasicRepresentation(JSON.stringify(subscriptionBody), 'application/ld+json'),
|
||||
body: new BasicRepresentation(JSON.stringify(channel), 'application/ld+json'),
|
||||
preferences: {},
|
||||
};
|
||||
|
||||
subscriptionType = {
|
||||
type: 'SubscriptionType',
|
||||
schema: SUBSCRIBE_SCHEMA,
|
||||
channelType = {
|
||||
type: 'NotificationChannelType',
|
||||
schema: NOTIFICATION_CHANNEL_SCHEMA,
|
||||
extractModes: jest.fn(async(subscription): Promise<AccessMap> =>
|
||||
new IdentifierSetMultiMap([[{ path: subscription.topic }, AccessMode.read ]]) as AccessMap),
|
||||
subscribe: jest.fn().mockResolvedValue({ response: new BasicRepresentation(), info: {}}),
|
||||
@@ -62,7 +62,7 @@ describe('A NotificationSubscriber', (): void => {
|
||||
handleSafe: jest.fn(),
|
||||
} as any;
|
||||
|
||||
subscriber = new NotificationSubscriber({ subscriptionType, credentialsExtractor, permissionReader, authorizer });
|
||||
subscriber = new NotificationSubscriber({ channelType, credentialsExtractor, permissionReader, authorizer });
|
||||
});
|
||||
|
||||
it('requires the request to be JSON-LD.', async(): Promise<void> => {
|
||||
@@ -86,7 +86,7 @@ describe('A NotificationSubscriber', (): void => {
|
||||
it('returns the representation generated by the subscribe call.', async(): Promise<void> => {
|
||||
const description = await subscriber.handle({ operation, request, response });
|
||||
expect(description.statusCode).toBe(200);
|
||||
const subscribeResult = await subscriptionType.subscribe.mock.results[0].value;
|
||||
const subscribeResult = await channelType.subscribe.mock.results[0].value;
|
||||
expect(description.data).toBe(subscribeResult.response.data);
|
||||
expect(description.metadata).toBe(subscribeResult.response.metadata);
|
||||
});
|
||||
@@ -96,12 +96,12 @@ describe('A NotificationSubscriber', (): void => {
|
||||
await expect(subscriber.handle({ operation, request, response })).rejects.toThrow('not allowed');
|
||||
});
|
||||
|
||||
it('updates the subscription expiration if a max is defined.', async(): Promise<void> => {
|
||||
it('updates the channel expiration if a max is defined.', async(): Promise<void> => {
|
||||
jest.useFakeTimers();
|
||||
jest.setSystemTime();
|
||||
|
||||
subscriber = new NotificationSubscriber({
|
||||
subscriptionType,
|
||||
channelType,
|
||||
credentialsExtractor,
|
||||
permissionReader,
|
||||
authorizer,
|
||||
@@ -109,25 +109,25 @@ describe('A NotificationSubscriber', (): void => {
|
||||
});
|
||||
|
||||
await subscriber.handle({ operation, request, response });
|
||||
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
expect(channelType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
endAt: Date.now() + (60 * 60 * 1000),
|
||||
}), { public: {}});
|
||||
|
||||
operation.body.data = guardedStreamFrom(JSON.stringify({
|
||||
...subscriptionBody,
|
||||
...channel,
|
||||
endAt: new Date(Date.now() + 99999999999999).toISOString(),
|
||||
}));
|
||||
await subscriber.handle({ operation, request, response });
|
||||
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
expect(channelType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
endAt: Date.now() + (60 * 60 * 1000),
|
||||
}), { public: {}});
|
||||
|
||||
operation.body.data = guardedStreamFrom(JSON.stringify({
|
||||
...subscriptionBody,
|
||||
...channel,
|
||||
endAt: new Date(Date.now() + 5).toISOString(),
|
||||
}));
|
||||
await subscriber.handle({ operation, request, response });
|
||||
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
expect(channelType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
|
||||
endAt: Date.now() + 5,
|
||||
}), { public: {}});
|
||||
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
import { SUBSCRIBE_SCHEMA } from '../../../../src/server/notifications/Subscription';
|
||||
|
||||
describe('A Subscription', (): void => {
|
||||
const validSubscription = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'SubscriptionType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
|
||||
it('requires a minimal set of values.', async(): Promise<void> => {
|
||||
await expect(SUBSCRIBE_SCHEMA.isValid(validSubscription)).resolves.toBe(true);
|
||||
});
|
||||
|
||||
it('requires the notification context header to be present.', async(): Promise<void> => {
|
||||
let subscription: unknown = {
|
||||
type: 'SubscriptionType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(false);
|
||||
|
||||
subscription = {
|
||||
'@context': [ 'wrongContext' ],
|
||||
type: 'SubscriptionType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(false);
|
||||
|
||||
subscription = {
|
||||
'@context': [ 'contextA', 'https://www.w3.org/ns/solid/notification/v1', 'contextB' ],
|
||||
type: 'SubscriptionType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true);
|
||||
|
||||
subscription = {
|
||||
'@context': 'https://www.w3.org/ns/solid/notification/v1',
|
||||
type: 'SubscriptionType',
|
||||
topic: 'http://example.com/foo',
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true);
|
||||
});
|
||||
|
||||
it('converts the start date to a number.', async(): Promise<void> => {
|
||||
const date = '1988-03-09T14:48:00.000Z';
|
||||
const ms = Date.parse(date);
|
||||
|
||||
const subscription: unknown = {
|
||||
...validSubscription,
|
||||
startAt: date,
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
|
||||
startAt: ms,
|
||||
}));
|
||||
});
|
||||
|
||||
it('converts the end date to a number.', async(): Promise<void> => {
|
||||
const date = '1988-03-09T14:48:00.000Z';
|
||||
const ms = Date.parse(date);
|
||||
|
||||
const subscription: unknown = {
|
||||
...validSubscription,
|
||||
endAt: date,
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
|
||||
endAt: ms,
|
||||
}));
|
||||
});
|
||||
|
||||
it('converts the rate to a number.', async(): Promise<void> => {
|
||||
const subscription: unknown = {
|
||||
...validSubscription,
|
||||
rate: 'PT10S',
|
||||
};
|
||||
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
|
||||
rate: 10 * 1000,
|
||||
}));
|
||||
});
|
||||
});
|
||||
@@ -1,15 +1,15 @@
|
||||
import type { ResourceIdentifier } from '../../../../src/http/representation/ResourceIdentifier';
|
||||
import type { NotificationChannelInfo } from '../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { NotificationHandler } from '../../../../src/server/notifications/NotificationHandler';
|
||||
import type { SubscriptionInfo } from '../../../../src/server/notifications/SubscriptionStorage';
|
||||
import { TypedNotificationHandler } from '../../../../src/server/notifications/TypedNotificationHandler';
|
||||
import { NotImplementedHttpError } from '../../../../src/util/errors/NotImplementedHttpError';
|
||||
|
||||
describe('A TypedNotificationHandler', (): void => {
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: topic.path,
|
||||
type: 'SubscriptionType',
|
||||
type: 'NotificationChannelType',
|
||||
features: {},
|
||||
lastEmit: 0,
|
||||
};
|
||||
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
import type { Logger } from '../../../../../src/logging/Logger';
|
||||
import { getLoggerFor } from '../../../../../src/logging/LogUtil';
|
||||
import type { Notification } from '../../../../../src/server/notifications/Notification';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import { WebHookEmitter } from '../../../../../src/server/notifications/WebHookSubscription2021/WebHookEmitter';
|
||||
import type {
|
||||
WebHookFeatures,
|
||||
@@ -40,7 +40,7 @@ describe('A WebHookEmitter', (): void => {
|
||||
published: '123',
|
||||
};
|
||||
let representation: Representation;
|
||||
const info: SubscriptionInfo<WebHookFeatures> = {
|
||||
const info: NotificationChannelInfo<WebHookFeatures> = {
|
||||
id: 'id',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
|
||||
@@ -6,11 +6,11 @@ import {
|
||||
} from '../../../../../src/identity/interaction/routing/AbsolutePathInteractionRoute';
|
||||
import type { Logger } from '../../../../../src/logging/Logger';
|
||||
import { getLoggerFor } from '../../../../../src/logging/LogUtil';
|
||||
import type { StateHandler } from '../../../../../src/server/notifications/StateHandler';
|
||||
import type {
|
||||
SubscriptionInfo,
|
||||
SubscriptionStorage,
|
||||
} from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
NotificationChannelInfo,
|
||||
NotificationChannelStorage,
|
||||
} from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { StateHandler } from '../../../../../src/server/notifications/StateHandler';
|
||||
import type {
|
||||
WebHookFeatures,
|
||||
} from '../../../../../src/server/notifications/WebHookSubscription2021/WebHookSubscription2021';
|
||||
@@ -31,14 +31,14 @@ jest.mock('../../../../../src/logging/LogUtil', (): any => {
|
||||
describe('A WebHookSubscription2021', (): void => {
|
||||
const credentials: Credentials = { agent: { webId: 'http://example.org/alice' }};
|
||||
const target = 'http://example.org/somewhere-else';
|
||||
let subscription: InferType<WebHookSubscription2021['schema']>;
|
||||
let channel: InferType<WebHookSubscription2021['schema']>;
|
||||
const unsubscribeRoute = new AbsolutePathInteractionRoute('http://example.com/unsubscribe');
|
||||
let storage: jest.Mocked<SubscriptionStorage<WebHookFeatures>>;
|
||||
let storage: jest.Mocked<NotificationChannelStorage<WebHookFeatures>>;
|
||||
let stateHandler: jest.Mocked<StateHandler>;
|
||||
let subscriptionType: WebHookSubscription2021;
|
||||
let channelType: WebHookSubscription2021;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
subscription = {
|
||||
channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'WebHookSubscription2021',
|
||||
topic: 'https://storage.example/resource',
|
||||
@@ -51,7 +51,7 @@ describe('A WebHookSubscription2021', (): void => {
|
||||
};
|
||||
|
||||
storage = {
|
||||
create: jest.fn((features: WebHookFeatures): SubscriptionInfo<WebHookFeatures> => ({
|
||||
create: jest.fn((features: WebHookFeatures): NotificationChannelInfo<WebHookFeatures> => ({
|
||||
id: '123',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'WebHookSubscription2021',
|
||||
@@ -65,27 +65,27 @@ describe('A WebHookSubscription2021', (): void => {
|
||||
handleSafe: jest.fn(),
|
||||
} as any;
|
||||
|
||||
subscriptionType = new WebHookSubscription2021(storage, unsubscribeRoute, stateHandler);
|
||||
channelType = new WebHookSubscription2021(storage, unsubscribeRoute, stateHandler);
|
||||
});
|
||||
|
||||
it('has the correct type.', async(): Promise<void> => {
|
||||
expect(subscriptionType.type).toBe('WebHookSubscription2021');
|
||||
expect(channelType.type).toBe('WebHookSubscription2021');
|
||||
});
|
||||
|
||||
it('correctly parses subscriptions.', async(): Promise<void> => {
|
||||
await expect(subscriptionType.schema.isValid(subscription)).resolves.toBe(true);
|
||||
it('correctly parses notification channel bodies.', async(): Promise<void> => {
|
||||
await expect(channelType.schema.isValid(channel)).resolves.toBe(true);
|
||||
|
||||
subscription.type = 'something else';
|
||||
await expect(subscriptionType.schema.isValid(subscription)).resolves.toBe(false);
|
||||
channel.type = 'something else';
|
||||
await expect(channelType.schema.isValid(channel)).resolves.toBe(false);
|
||||
});
|
||||
|
||||
it('requires Read permissions on the topic.', async(): Promise<void> => {
|
||||
await expect(subscriptionType.extractModes(subscription)).resolves
|
||||
.toEqual(new IdentifierSetMultiMap([[{ path: subscription.topic }, AccessMode.read ]]));
|
||||
await expect(channelType.extractModes(channel)).resolves
|
||||
.toEqual(new IdentifierSetMultiMap([[{ path: channel.topic }, AccessMode.read ]]));
|
||||
});
|
||||
|
||||
it('stores the info and returns a valid response when subscribing.', async(): Promise<void> => {
|
||||
const { response } = await subscriptionType.subscribe(subscription, credentials);
|
||||
const { response } = await channelType.subscribe(channel, credentials);
|
||||
expect(response.metadata.contentType).toBe('application/ld+json');
|
||||
await expect(readJsonStream(response.data)).resolves.toEqual({
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
@@ -97,12 +97,12 @@ describe('A WebHookSubscription2021', (): void => {
|
||||
});
|
||||
|
||||
it('errors if the credentials do not contain a WebID.', async(): Promise<void> => {
|
||||
await expect(subscriptionType.subscribe(subscription, {})).rejects
|
||||
await expect(channelType.subscribe(channel, {})).rejects
|
||||
.toThrow('A WebHookSubscription2021 subscription request needs to be authenticated with a WebID.');
|
||||
});
|
||||
|
||||
it('calls the state handler once the response has been read.', async(): Promise<void> => {
|
||||
const { response, info } = await subscriptionType.subscribe(subscription, credentials);
|
||||
const { response, info } = await channelType.subscribe(channel, credentials);
|
||||
expect(stateHandler.handleSafe).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Read out data to end stream correctly
|
||||
@@ -116,7 +116,7 @@ describe('A WebHookSubscription2021', (): void => {
|
||||
const logger = getLoggerFor('mock');
|
||||
stateHandler.handleSafe.mockRejectedValue(new Error('notification error'));
|
||||
|
||||
const { response } = await subscriptionType.subscribe(subscription, credentials);
|
||||
const { response } = await channelType.subscribe(channel, credentials);
|
||||
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||
|
||||
// Read out data to end stream correctly
|
||||
|
||||
@@ -4,7 +4,7 @@ import { ResetResponseDescription } from '../../../../../src/http/output/respons
|
||||
import { BasicRepresentation } from '../../../../../src/http/representation/BasicRepresentation';
|
||||
import type { HttpRequest } from '../../../../../src/server/HttpRequest';
|
||||
import type { HttpResponse } from '../../../../../src/server/HttpResponse';
|
||||
import type { SubscriptionStorage } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannelStorage } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type {
|
||||
WebHookFeatures,
|
||||
} from '../../../../../src/server/notifications/WebHookSubscription2021/WebHookSubscription2021';
|
||||
@@ -20,7 +20,7 @@ describe('A WebHookUnsubscriber', (): void => {
|
||||
let operation: Operation;
|
||||
const webId = 'http://example.com/alice';
|
||||
let credentialsExtractor: jest.Mocked<CredentialsExtractor>;
|
||||
let storage: jest.Mocked<SubscriptionStorage<WebHookFeatures>>;
|
||||
let storage: jest.Mocked<NotificationChannelStorage<WebHookFeatures>>;
|
||||
let unsubscriber: WebHookUnsubscriber;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
|
||||
@@ -2,8 +2,8 @@ import { EventEmitter } from 'events';
|
||||
import type { WebSocket } from 'ws';
|
||||
import { BasicRepresentation } from '../../../../../src/http/representation/BasicRepresentation';
|
||||
import type {
|
||||
SubscriptionInfo,
|
||||
} from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
NotificationChannelInfo,
|
||||
} from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import {
|
||||
WebSocket2021Emitter,
|
||||
} from '../../../../../src/server/notifications/WebSocketSubscription2021/WebSocket2021Emitter';
|
||||
@@ -11,7 +11,7 @@ import type { SetMultiMap } from '../../../../../src/util/map/SetMultiMap';
|
||||
import { WrappedSetMultiMap } from '../../../../../src/util/map/WrappedSetMultiMap';
|
||||
|
||||
describe('A WebSocket2021Emitter', (): void => {
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
@@ -67,7 +67,7 @@ describe('A WebSocket2021Emitter', (): void => {
|
||||
it('only sends to the matching WebSockets.', async(): Promise<void> => {
|
||||
const webSocket2: jest.Mocked<WebSocket> = new EventEmitter() as any;
|
||||
webSocket2.send = jest.fn();
|
||||
const info2: SubscriptionInfo = {
|
||||
const info2: NotificationChannelInfo = {
|
||||
...info,
|
||||
id: 'other',
|
||||
};
|
||||
|
||||
@@ -6,9 +6,9 @@ import {
|
||||
} from '../../../../../src/identity/interaction/routing/AbsolutePathInteractionRoute';
|
||||
import type { HttpRequest } from '../../../../../src/server/HttpRequest';
|
||||
import type {
|
||||
SubscriptionInfo,
|
||||
SubscriptionStorage,
|
||||
} from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
NotificationChannelInfo,
|
||||
NotificationChannelStorage,
|
||||
} from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type {
|
||||
WebSocket2021Handler,
|
||||
} from '../../../../../src/server/notifications/WebSocketSubscription2021/WebSocket2021Handler';
|
||||
@@ -27,7 +27,7 @@ jest.mock('ws', (): any => ({
|
||||
}));
|
||||
|
||||
describe('A WebSocket2021Listener', (): void => {
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
@@ -38,7 +38,7 @@ describe('A WebSocket2021Listener', (): void => {
|
||||
let server: Server;
|
||||
let webSocket: WebSocket;
|
||||
let upgradeRequest: HttpRequest;
|
||||
let storage: jest.Mocked<SubscriptionStorage>;
|
||||
let storage: jest.Mocked<NotificationChannelStorage>;
|
||||
let handler: jest.Mocked<WebSocket2021Handler>;
|
||||
const route = new AbsolutePathInteractionRoute('http://example.com/foo');
|
||||
let listener: WebSocket2021Listener;
|
||||
@@ -106,7 +106,7 @@ describe('A WebSocket2021Listener', (): void => {
|
||||
await flushPromises();
|
||||
|
||||
expect(webSocket.send).toHaveBeenCalledTimes(1);
|
||||
expect(webSocket.send).toHaveBeenLastCalledWith(`Subscription has expired`);
|
||||
expect(webSocket.send).toHaveBeenLastCalledWith(`Notification channel has expired`);
|
||||
expect(webSocket.close).toHaveBeenCalledTimes(1);
|
||||
expect(handler.handleSafe).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import type { WebSocket } from 'ws';
|
||||
import type {
|
||||
SubscriptionInfo,
|
||||
SubscriptionStorage,
|
||||
} from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
NotificationChannelInfo,
|
||||
NotificationChannelStorage,
|
||||
} from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
|
||||
import {
|
||||
WebSocket2021Storer,
|
||||
@@ -13,7 +13,7 @@ import { WrappedSetMultiMap } from '../../../../../src/util/map/WrappedSetMultiM
|
||||
import { flushPromises } from '../../../../util/Util';
|
||||
|
||||
describe('A WebSocket2021Storer', (): void => {
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
@@ -21,7 +21,7 @@ describe('A WebSocket2021Storer', (): void => {
|
||||
lastEmit: 0,
|
||||
};
|
||||
let webSocket: jest.Mocked<WebSocket>;
|
||||
let storage: jest.Mocked<SubscriptionStorage>;
|
||||
let storage: jest.Mocked<NotificationChannelStorage>;
|
||||
let socketMap: SetMultiMap<string, WebSocket>;
|
||||
let storer: WebSocket2021Storer;
|
||||
|
||||
@@ -68,7 +68,7 @@ describe('A WebSocket2021Storer', (): void => {
|
||||
webSocket2.close = jest.fn();
|
||||
const webSocketOther: jest.Mocked<WebSocket> = new EventEmitter() as any;
|
||||
webSocketOther.close = jest.fn();
|
||||
const infoOther: SubscriptionInfo = {
|
||||
const infoOther: NotificationChannelInfo = {
|
||||
...info,
|
||||
id: 'other',
|
||||
};
|
||||
|
||||
@@ -2,8 +2,8 @@ import { AccessMode } from '../../../../../src/authorization/permissions/Permiss
|
||||
import {
|
||||
AbsolutePathInteractionRoute,
|
||||
} from '../../../../../src/identity/interaction/routing/AbsolutePathInteractionRoute';
|
||||
import type { Subscription } from '../../../../../src/server/notifications/Subscription';
|
||||
import type { SubscriptionStorage } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannel } from '../../../../../src/server/notifications/NotificationChannel';
|
||||
import type { NotificationChannelStorage } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import {
|
||||
WebSocketSubscription2021,
|
||||
} from '../../../../../src/server/notifications/WebSocketSubscription2021/WebSocketSubscription2021';
|
||||
@@ -11,13 +11,13 @@ import { IdentifierSetMultiMap } from '../../../../../src/util/map/IdentifierMap
|
||||
import { readJsonStream } from '../../../../../src/util/StreamUtil';
|
||||
|
||||
describe('A WebSocketSubscription2021', (): void => {
|
||||
let subscription: Subscription;
|
||||
let storage: jest.Mocked<SubscriptionStorage>;
|
||||
let channel: NotificationChannel;
|
||||
let storage: jest.Mocked<NotificationChannelStorage>;
|
||||
const route = new AbsolutePathInteractionRoute('http://example.com/foo');
|
||||
let subscriptionType: WebSocketSubscription2021;
|
||||
let channelType: WebSocketSubscription2021;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
subscription = {
|
||||
channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type: 'WebSocketSubscription2021',
|
||||
topic: 'https://storage.example/resource',
|
||||
@@ -39,27 +39,27 @@ describe('A WebSocketSubscription2021', (): void => {
|
||||
add: jest.fn(),
|
||||
} as any;
|
||||
|
||||
subscriptionType = new WebSocketSubscription2021(storage, route);
|
||||
channelType = new WebSocketSubscription2021(storage, route);
|
||||
});
|
||||
|
||||
it('has the correct type.', async(): Promise<void> => {
|
||||
expect(subscriptionType.type).toBe('WebSocketSubscription2021');
|
||||
expect(channelType.type).toBe('WebSocketSubscription2021');
|
||||
});
|
||||
|
||||
it('correctly parses subscriptions.', async(): Promise<void> => {
|
||||
await expect(subscriptionType.schema.isValid(subscription)).resolves.toBe(true);
|
||||
it('correctly parses notification channel bodies.', async(): Promise<void> => {
|
||||
await expect(channelType.schema.isValid(channel)).resolves.toBe(true);
|
||||
|
||||
subscription.type = 'something else';
|
||||
await expect(subscriptionType.schema.isValid(subscription)).resolves.toBe(false);
|
||||
channel.type = 'something else';
|
||||
await expect(channelType.schema.isValid(channel)).resolves.toBe(false);
|
||||
});
|
||||
|
||||
it('requires Read permissions on the topic.', async(): Promise<void> => {
|
||||
await expect(subscriptionType.extractModes(subscription)).resolves
|
||||
.toEqual(new IdentifierSetMultiMap([[{ path: subscription.topic }, AccessMode.read ]]));
|
||||
await expect(channelType.extractModes(channel)).resolves
|
||||
.toEqual(new IdentifierSetMultiMap([[{ path: channel.topic }, AccessMode.read ]]));
|
||||
});
|
||||
|
||||
it('stores the info and returns a valid response when subscribing.', async(): Promise<void> => {
|
||||
const { response } = await subscriptionType.subscribe(subscription);
|
||||
const { response } = await channelType.subscribe(channel);
|
||||
expect(response.metadata.contentType).toBe('application/ld+json');
|
||||
await expect(readJsonStream(response.data)).resolves.toEqual({
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
|
||||
@@ -4,13 +4,13 @@ import type { ResourceIdentifier } from '../../../../../src/http/representation/
|
||||
import {
|
||||
ActivityNotificationGenerator,
|
||||
} from '../../../../../src/server/notifications/generate/ActivityNotificationGenerator';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { ResourceStore } from '../../../../../src/storage/ResourceStore';
|
||||
import { AS, DC, LDP, RDF } from '../../../../../src/util/Vocabularies';
|
||||
|
||||
describe('An ActivityNotificationGenerator', (): void => {
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: topic.path,
|
||||
type: 'type',
|
||||
|
||||
@@ -2,12 +2,12 @@ import type { ResourceIdentifier } from '../../../../../src/http/representation/
|
||||
import {
|
||||
DeleteNotificationGenerator,
|
||||
} from '../../../../../src/server/notifications/generate/DeleteNotificationGenerator';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import { AS } from '../../../../../src/util/Vocabularies';
|
||||
|
||||
describe('A DeleteNotificationGenerator', (): void => {
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: topic.path,
|
||||
type: 'type',
|
||||
|
||||
@@ -4,13 +4,13 @@ import {
|
||||
StateNotificationGenerator,
|
||||
} from '../../../../../src/server/notifications/generate/StateNotificationGenerator';
|
||||
import type { Notification } from '../../../../../src/server/notifications/Notification';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import type { ResourceSet } from '../../../../../src/storage/ResourceSet';
|
||||
import { AS } from '../../../../../src/util/Vocabularies';
|
||||
|
||||
describe('A StateNotificationGenerator', (): void => {
|
||||
const topic: ResourceIdentifier = { path: 'http://example.com/foo' };
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: topic.path,
|
||||
type: 'type',
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import { BasicRepresentation } from '../../../../../src/http/representation/BasicRepresentation';
|
||||
import type { Representation } from '../../../../../src/http/representation/Representation';
|
||||
import type { Notification } from '../../../../../src/server/notifications/Notification';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import {
|
||||
ConvertingNotificationSerializer,
|
||||
} from '../../../../../src/server/notifications/serialize/ConvertingNotificationSerializer';
|
||||
import type { NotificationSerializer } from '../../../../../src/server/notifications/serialize/NotificationSerializer';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import type { RepresentationConverter } from '../../../../../src/storage/conversion/RepresentationConverter';
|
||||
|
||||
describe('A ConvertingNotificationSerializer', (): void => {
|
||||
let info: SubscriptionInfo;
|
||||
let info: NotificationChannelInfo;
|
||||
const notification: Notification = {
|
||||
'@context': [
|
||||
'https://www.w3.org/ns/activitystreams',
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import type { Notification } from '../../../../../src/server/notifications/Notification';
|
||||
import type { NotificationChannelInfo } from '../../../../../src/server/notifications/NotificationChannelStorage';
|
||||
import {
|
||||
JsonLdNotificationSerializer,
|
||||
} from '../../../../../src/server/notifications/serialize/JsonLdNotificationSerializer';
|
||||
import type { SubscriptionInfo } from '../../../../../src/server/notifications/SubscriptionStorage';
|
||||
import { readableToString } from '../../../../../src/util/StreamUtil';
|
||||
|
||||
describe('A JsonLdNotificationSerializer', (): void => {
|
||||
const info: SubscriptionInfo = {
|
||||
const info: NotificationChannelInfo = {
|
||||
id: 'id',
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
|
||||
@@ -4,13 +4,13 @@ import { fetch } from 'cross-fetch';
|
||||
* Subscribes to a notification channel.
|
||||
* @param type - The type of the notification channel. E.g. "WebSocketSubscription2021".
|
||||
* @param webId - The WebID to spoof in the authorization header. This assumes the config uses the debug auth import.
|
||||
* @param subscriptionUrl - The URL where the subscription request needs to be sent to.
|
||||
* @param subscriptionUrl - The subscription resource URL where the request needs to be sent to.
|
||||
* @param topic - The topic to subscribe to.
|
||||
* @param features - Any extra fields that need to be added to the subscription body.
|
||||
*/
|
||||
export async function subscribe(type: string, webId: string, subscriptionUrl: string, topic: string,
|
||||
features: Record<string, unknown> = {}): Promise<unknown> {
|
||||
const subscription = {
|
||||
const channel = {
|
||||
'@context': [ 'https://www.w3.org/ns/solid/notification/v1' ],
|
||||
type,
|
||||
topic,
|
||||
@@ -20,7 +20,7 @@ export async function subscribe(type: string, webId: string, subscriptionUrl: st
|
||||
const response = await fetch(subscriptionUrl, {
|
||||
method: 'POST',
|
||||
headers: { authorization: `WebID ${webId}`, 'content-type': 'application/ld+json' },
|
||||
body: JSON.stringify(subscription),
|
||||
body: JSON.stringify(channel),
|
||||
});
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get('content-type')).toBe('application/ld+json');
|
||||
|
||||
Reference in New Issue
Block a user