feat: Add support for the Notification specification

This commit is contained in:
Joachim Van Herwegen
2022-09-30 10:20:36 +02:00
parent be7af277bb
commit cbc07c6ef3
48 changed files with 2164 additions and 19 deletions

View File

@@ -0,0 +1,38 @@
import { getLoggerFor } from '../../logging/LogUtil';
import { createErrorMessage } from '../../util/errors/ErrorUtil';
import type { NotificationHandler } from './NotificationHandler';
import { StateHandler } from './StateHandler';
import type { SubscriptionInfo, SubscriptionStorage } from './SubscriptionStorage';
/**
* Handles the `state` feature by calling a {@link NotificationHandler}
* in case the {@link SubscriptionInfo} has a `state` value.
*
* Deletes the `state` parameter from the info afterwards.
*/
export class BaseStateHandler extends StateHandler {
protected readonly logger = getLoggerFor(this);
private readonly handler: NotificationHandler;
private readonly storage: SubscriptionStorage;
public constructor(handler: NotificationHandler, storage: SubscriptionStorage) {
super();
this.handler = handler;
this.storage = storage;
}
public async handle({ info }: { info: SubscriptionInfo }): Promise<void> {
if (info.state) {
const topic = { path: info.topic };
try {
await this.handler.handleSafe({ info, topic });
// Remove the state once the relevant notification has been sent
delete info.state;
await this.storage.update(info);
} catch (error: unknown) {
this.logger.error(`Problem emitting state notification: ${createErrorMessage(error)}`);
}
}
}
}

View File

@@ -0,0 +1,47 @@
import type { NotificationGenerator } from './generate/NotificationGenerator';
import type { NotificationEmitter } from './NotificationEmitter';
import type { NotificationHandlerInput } from './NotificationHandler';
import { NotificationHandler } from './NotificationHandler';
import type { NotificationSerializer } from './serialize/NotificationSerializer';
export interface ComposedNotificationHandlerArgs {
generator: NotificationGenerator;
serializer: NotificationSerializer;
emitter: NotificationEmitter;
}
/**
* Generates, serializes and emits a {@link Notification} using a {@link NotificationGenerator},
* {@link NotificationSerializer} and {@link NotificationEmitter}.
*
* Will not emit an event in case it has the same state as the subscription info.
*/
export class ComposedNotificationHandler extends NotificationHandler {
private readonly generator: NotificationGenerator;
private readonly serializer: NotificationSerializer;
private readonly emitter: NotificationEmitter;
public constructor(args: ComposedNotificationHandlerArgs) {
super();
this.generator = args.generator;
this.serializer = args.serializer;
this.emitter = args.emitter;
}
public async canHandle(input: NotificationHandlerInput): Promise<void> {
await this.generator.canHandle(input);
}
public async handle(input: NotificationHandlerInput): Promise<void> {
const notification = await this.generator.handle(input);
const { state } = input.info;
// In case the state matches there is no need to send the notification
if (typeof state === 'string' && state === notification.state) {
return;
}
const representation = await this.serializer.handleSafe({ info: input.info, notification });
await this.emitter.handleSafe({ info: input.info, representation });
}
}

View File

@@ -0,0 +1,133 @@
import { v4 } from 'uuid';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
import type { KeyValueStorage } from '../../storage/keyvalue/KeyValueStorage';
import { InternalServerError } from '../../util/errors/InternalServerError';
import type { ReadWriteLocker } from '../../util/locking/ReadWriteLocker';
import type { Subscription } from './Subscription';
import type { SubscriptionInfo, SubscriptionStorage } from './SubscriptionStorage';
type StorageValue<T> = string | string[] | SubscriptionInfo<T>;
/**
* Stores all the {@link SubscriptionInfo} in a {@link KeyValueStorage}.
*
* Uses a {@link ReadWriteLocker} to prevent internal race conditions.
*/
export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> implements SubscriptionStorage<T> {
protected logger = getLoggerFor(this);
private readonly storage: KeyValueStorage<string, StorageValue<T>>;
private readonly locker: ReadWriteLocker;
public constructor(storage: KeyValueStorage<string, StorageValue<T>>, locker: ReadWriteLocker) {
this.storage = storage;
this.locker = locker;
}
public create(subscription: Subscription, features: T): SubscriptionInfo<T> {
return {
id: `${subscription.type}:${v4()}:${subscription.topic}`,
topic: subscription.topic,
type: subscription.type,
lastEmit: 0,
expiration: subscription.expiration,
accept: subscription.accept,
rate: subscription.rate,
state: subscription.state,
features,
};
}
public async get(id: string): Promise<SubscriptionInfo<T> | undefined> {
const info = await this.storage.get(id);
if (info && this.isSubscriptionInfo(info)) {
if (typeof info.expiration === 'number' && info.expiration < Date.now()) {
this.logger.info(`Subscription ${id} has expired.`);
await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
await this.deleteInfo(info);
});
return;
}
return info;
}
}
public async getAll(topic: ResourceIdentifier): Promise<string[]> {
const infos = await this.storage.get(topic.path);
if (Array.isArray(infos)) {
return infos;
}
return [];
}
public async add(info: SubscriptionInfo<T>): Promise<void> {
const target = { path: info.topic };
return this.locker.withWriteLock(this.getLockKey(target), async(): Promise<void> => {
const infos = await this.getAll(target);
await this.storage.set(info.id, info);
infos.push(info.id);
await this.storage.set(info.topic, infos);
});
}
public async update(info: SubscriptionInfo<T>): Promise<void> {
return this.locker.withWriteLock(this.getLockKey(info.id), async(): Promise<void> => {
const oldInfo = await this.storage.get(info.id);
if (oldInfo) {
if (!this.isSubscriptionInfo(oldInfo)) {
throw new InternalServerError(`Trying to update ${info.id} which is not a SubscriptionInfo.`);
}
if (info.topic !== oldInfo.topic) {
throw new InternalServerError(`Trying to change the topic of subscription ${info.id}`);
}
}
await this.storage.set(info.id, info);
});
}
public async delete(id: string): Promise<void> {
return this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
const info = await this.get(id);
if (!info) {
return;
}
await this.deleteInfo(info);
});
}
/**
* Utility function for deleting a specific {@link SubscriptionInfo} object.
* Does not create a lock on the subscription ID so should be wrapped in such a lock.
*/
private async deleteInfo(info: SubscriptionInfo): Promise<void> {
await this.locker.withWriteLock(this.getLockKey(info.topic), async(): Promise<void> => {
const infos = await this.getAll({ path: info.topic });
const idx = infos.indexOf(info.id);
// If idx < 0 we have an inconsistency
if (idx < 0) {
this.logger.error(`Subscription info ${info.id} was not found in the list of info targeting ${info.topic}.`);
this.logger.error('This should not happen and indicates a data consistency issue.');
} else {
infos.splice(idx, 1);
if (infos.length > 0) {
await this.storage.set(info.topic, infos);
} else {
await this.storage.delete(info.topic);
}
}
await this.storage.delete(info.id);
});
}
private isSubscriptionInfo(value: StorageValue<T>): value is SubscriptionInfo<T> {
return Boolean((value as SubscriptionInfo).id);
}
private getLockKey(identifier: ResourceIdentifier | string): ResourceIdentifier {
return { path: `${typeof identifier === 'string' ? identifier : identifier.path}.notification-storage` };
}
}

View File

@@ -0,0 +1,59 @@
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
import { createErrorMessage } from '../../util/errors/ErrorUtil';
import { StaticHandler } from '../../util/handlers/StaticHandler';
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
import type { ActivityEmitter } from './ActivityEmitter';
import type { NotificationHandler } from './NotificationHandler';
import type { SubscriptionStorage } from './SubscriptionStorage';
/**
* Listens to an {@link ActivityEmitter} and calls the stored {@link NotificationHandler}s in case of an event
* for every matching Subscription found.
*
* Takes the `rate` feature into account so only subscriptions that want a new notification will receive one.
*
* Extends {@link StaticHandler} so it can be more easily injected into a Components.js configuration.
* No class takes this one as input, so to make sure Components.js instantiates it,
* it needs to be added somewhere where its presence has no impact, such as the list of initializers.
*/
export class ListeningActivityHandler extends StaticHandler {
protected readonly logger = getLoggerFor(this);
private readonly storage: SubscriptionStorage;
private readonly handler: NotificationHandler;
public constructor(storage: SubscriptionStorage, emitter: ActivityEmitter, handler: NotificationHandler) {
super();
this.storage = storage;
this.handler = handler;
emitter.on('changed', (topic, activity): void => {
this.emit(topic, activity).catch((error): void => {
this.logger.error(`Something went wrong emitting notifications: ${createErrorMessage(error)}`);
});
});
}
private async emit(topic: ResourceIdentifier, activity: VocabularyTerm<typeof AS>): Promise<void> {
const subscriptionIds = await this.storage.getAll(topic);
for (const id of subscriptionIds) {
const info = await this.storage.get(id);
if (!info) {
// Subscription has expired
continue;
}
if (info.rate && info.rate > Date.now() - info.lastEmit) {
continue;
}
// No need to wait on this to resolve before going to the next subscription.
// Prevent failed notification from blocking other notifications.
this.handler.handleSafe({ info, activity, topic }).catch((error): void => {
this.logger.error(`Error trying to handle notification for ${id}: ${createErrorMessage(error)}`);
});
}
}
}

View File

@@ -0,0 +1,22 @@
export const CONTEXT_ACTIVITYSTREAMS = 'https://www.w3.org/ns/activitystreams';
export const CONTEXT_NOTIFICATION = 'https://www.w3.org/ns/solid/notification/v1';
/**
* The minimal expected fields for a Notification
* as defined in https://solidproject.org/TR/notifications-protocol#notification-data-model.
*/
export interface Notification {
'@context': [
typeof CONTEXT_ACTIVITYSTREAMS,
typeof CONTEXT_NOTIFICATION,
...string[],
];
id: string;
type: string[];
object: {
id: string;
type: string[];
};
state?: string;
published: string;
}

View File

@@ -0,0 +1,52 @@
import type { NamedNode, Quad } from '@rdfjs/types';
import { DataFactory } from 'n3';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { InteractionRoute } from '../../identity/interaction/routing/InteractionRoute';
import { NOTIFY, RDF } from '../../util/Vocabularies';
import { StorageDescriber } from '../description/StorageDescriber';
const { namedNode, quad } = DataFactory;
const DEFAULT_FEATURES = [
NOTIFY.accept,
NOTIFY.expiration,
NOTIFY.rate,
NOTIFY.state,
];
/**
* Outputs quads describing how to access a specific Notificaion Subscription type and its features,
* as described in https://solidproject.org/TR/notifications-protocol#discovery.
*/
export class NotificationDescriber extends StorageDescriber {
private readonly path: NamedNode;
private readonly relative: string;
private readonly type: NamedNode;
private readonly features: NamedNode[];
/**
* @param route - The route describing where the subscription target is.
* @param relative - Will be appended to the input path to generate a named node corresponding to the description.
* E.g., "#websocketNotification".
* @param type - The rdf:type of the subscription type.
* @param features - Which features are enabled for this subscription type. Defaults to accept/expiration/rate/state.
*/
public constructor(route: InteractionRoute, relative: string, type: string, features: string[] = DEFAULT_FEATURES) {
super();
this.path = namedNode(route.getPath());
this.relative = relative;
this.type = namedNode(type);
this.features = features.map(namedNode);
}
public async handle(input: ResourceIdentifier): Promise<Quad[]> {
const subject = namedNode(input.path);
const subscription = namedNode(`${input.path}${this.relative}`);
return [
quad(subject, NOTIFY.terms.notificationChannel, subscription),
quad(subscription, RDF.terms.type, this.type),
quad(subscription, NOTIFY.terms.subscription, this.path),
...this.features.map((feature): Quad => quad(subscription, NOTIFY.terms.feature, feature)),
];
}
}

View File

@@ -0,0 +1,13 @@
import type { Representation } from '../../http/representation/Representation';
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { SubscriptionInfo } from './SubscriptionStorage';
export interface NotificationEmitterInput {
representation: Representation;
info: SubscriptionInfo;
}
/**
* Emits a serialized Notification to the subscription defined by the info.
*/
export abstract class NotificationEmitter extends AsyncHandler<NotificationEmitterInput> {}

View File

@@ -0,0 +1,15 @@
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
import type { SubscriptionInfo } from './SubscriptionStorage';
export interface NotificationHandlerInput {
topic: ResourceIdentifier;
info: SubscriptionInfo;
activity?: VocabularyTerm<typeof AS>;
}
/**
* Makes sure an activity gets emitted to the relevant subscription based on the given info.
*/
export abstract class NotificationHandler extends AsyncHandler<NotificationHandlerInput> {}

View File

@@ -0,0 +1,108 @@
import type { CredentialsExtractor } from '../../authentication/CredentialsExtractor';
import type { Authorizer } from '../../authorization/Authorizer';
import type { PermissionReader } from '../../authorization/PermissionReader';
import { OkResponseDescription } from '../../http/output/response/OkResponseDescription';
import type { ResponseDescription } from '../../http/output/response/ResponseDescription';
import { getLoggerFor } from '../../logging/LogUtil';
import { APPLICATION_LD_JSON } from '../../util/ContentTypes';
import { createErrorMessage } from '../../util/errors/ErrorUtil';
import { UnprocessableEntityHttpError } from '../../util/errors/UnprocessableEntityHttpError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { readableToString } from '../../util/StreamUtil';
import type { HttpRequest } from '../HttpRequest';
import type { OperationHttpHandlerInput } from '../OperationHttpHandler';
import { OperationHttpHandler } from '../OperationHttpHandler';
import type { Subscription } from './Subscription';
import type { SubscriptionType } from './SubscriptionType';
export interface NotificationSubscriberArgs {
/**
* The {@link SubscriptionType} with all the necessary information.
*/
subscriptionType: SubscriptionType;
/**
* Used to extract the credentials from the request.
*/
credentialsExtractor: CredentialsExtractor;
/**
* Used to determine which permissions the found credentials have.
*/
permissionReader: PermissionReader;
/**
* Used to determine if the request has the necessary permissions.
*/
authorizer: Authorizer;
/**
* Overrides the expiration feature of subscriptions by making sure they always expire after the `maxDuration` value.
* In case the expiration of the subscription is shorter than `maxDuration` the original value will be kept.
* Value is set in minutes. 0 is infinite.
*/
maxDuration?: number;
}
/**
* Handles notification subscriptions.
*
* Uses the information from the provided {@link SubscriptionType} to validate the input
* and verify the request has the required permissions available.
*/
export class NotificationSubscriber extends OperationHttpHandler {
protected logger = getLoggerFor(this);
private readonly subscriptionType: SubscriptionType;
private readonly credentialsExtractor: CredentialsExtractor;
private readonly permissionReader: PermissionReader;
private readonly authorizer: Authorizer;
private readonly maxDuration: number;
public constructor(args: NotificationSubscriberArgs) {
super();
this.subscriptionType = args.subscriptionType;
this.credentialsExtractor = args.credentialsExtractor;
this.permissionReader = args.permissionReader;
this.authorizer = args.authorizer;
this.maxDuration = (args.maxDuration ?? 0) * 60 * 1000;
}
public async handle({ operation, request }: OperationHttpHandlerInput): Promise<ResponseDescription> {
if (operation.body.metadata.contentType !== APPLICATION_LD_JSON) {
throw new UnsupportedMediaTypeHttpError('Subscribe bodies need to be application/ld+json.');
}
let subscription: Subscription;
try {
const json = JSON.parse(await readableToString(operation.body.data));
subscription = await this.subscriptionType.schema.validate(json);
} catch (error: unknown) {
throw new UnprocessableEntityHttpError(`Unable to process subscription: ${createErrorMessage(error)}`);
}
if (this.maxDuration) {
const duration = (subscription.expiration ?? Number.POSITIVE_INFINITY) - Date.now();
if (duration > this.maxDuration) {
subscription.expiration = Date.now() + this.maxDuration;
}
}
// Verify if the client is allowed to subscribe
await this.authorize(request, subscription);
const { response } = await this.subscriptionType.subscribe(subscription);
return new OkResponseDescription(response.metadata, response.data);
}
private async authorize(request: HttpRequest, subscription: Subscription): Promise<void> {
const credentials = await this.credentialsExtractor.handleSafe(request);
this.logger.debug(`Extracted credentials: ${JSON.stringify(credentials)}`);
const requestedModes = await this.subscriptionType.extractModes(subscription);
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ]}`);
const availablePermissions = await this.permissionReader.handleSafe({ credentials, requestedModes });
this.logger.debug(`Available permissions are ${[ ...availablePermissions.entries() ]}`);
await this.authorizer.handleSafe({ credentials, requestedModes, availablePermissions });
this.logger.verbose(`Authorization succeeded, creating subscription`);
}
}

View File

@@ -0,0 +1,11 @@
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { SubscriptionInfo } from './SubscriptionStorage';
/**
* Handles the `state` feature of notifications.
* Every implementation of a specific subscription type should make sure an instance of this class
* gets called when a `state` notification can be sent out.
*
* Implementations of this class should handle all subscriptions and filter out those that need a `state` notification.
*/
export abstract class StateHandler extends AsyncHandler<{ info: SubscriptionInfo }> {}

View File

@@ -0,0 +1,27 @@
import { parse, toSeconds } from 'iso8601-duration';
import type { InferType } from 'yup';
import { array, number, object, string } from 'yup';
import { CONTEXT_NOTIFICATION } from './Notification';
/**
* A JSON parsing schema that can be used to parse subscription input.
* Specific subscription types can extend this schema with their own custom keys.
*/
export const SUBSCRIBE_SCHEMA = object({
'@context': array(string()).ensure().required().test({
name: 'RequireNotificationContext',
message: `The ${CONTEXT_NOTIFICATION} context is required in the subscription JSON-LD body.`,
test: (context): boolean => Boolean(context?.includes(CONTEXT_NOTIFICATION)),
}),
type: string().required(),
topic: string().required(),
state: string().optional(),
expiration: number().transform((value, original): number | undefined =>
// Convert the date string to milliseconds
Date.parse(original)).optional(),
rate: number().transform((value, original): number | undefined =>
// Convert the rate string to milliseconds
toSeconds(parse(original)) * 1000).optional(),
accept: string().optional(),
});
export type Subscription = InferType<typeof SUBSCRIBE_SCHEMA>;

View File

@@ -0,0 +1,66 @@
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { Subscription } from './Subscription';
/**
* The info provided during a subscription.
* `features` can contain custom values relevant for a specific subscription type.
*/
export type SubscriptionInfo<T = Record<string, unknown>> = {
id: string;
topic: string;
type: string;
expiration?: number;
accept?: string;
rate?: number;
state?: string;
lastEmit: number;
features: T;
};
/**
* Stores all the information necessary to keep track of notification subscriptions.
* Besides the standard subscription info it also stores features specific to a certain subscription type.
*
* This storage assumes that a subscription can only have a single identifier as its topic.
*/
export interface SubscriptionStorage<T extends Record<string, unknown> = Record<string, unknown>> {
/**
* Creates info corresponding to the given subscription and features.
* This does not store the generated info in the storage.
* @param subscription - Subscription to generate info of.
* @param features - Features to add to the info
*/
create: (subscription: Subscription, features: T) => SubscriptionInfo<T>;
/**
* Returns the info for the requested subscription.
* `undefined` if no match was found or if the subscription expired.
* @param id - The identifier of the subscription.
*/
get: (id: string) => Promise<SubscriptionInfo<T> | undefined>;
/**
* Returns the identifiers of all subscription entries that have the given identifier as their topic.
* @param topic - The identifier that is the topic.
*/
getAll: (topic: ResourceIdentifier) => Promise<string[]>;
/**
* Adds the given info to the storage.
* @param info - Info to add.
*/
add: (info: SubscriptionInfo<T>) => Promise<void>;
/**
* Updates the given subscription info.
* The `id` and the `topic` can not be updated.
* @param info - The info to update.
*/
update: (info: SubscriptionInfo<T>) => Promise<void>;
/**
* Deletes the given subscription from the storage.
* @param id - The identifier of the subscription
*/
delete: (id: string) => Promise<void>;
}

View File

@@ -0,0 +1,39 @@
import type { InferType } from 'yup';
import type { AccessMap } from '../../authorization/permissions/Permissions';
import type { Representation } from '../../http/representation/Representation';
import type { SUBSCRIBE_SCHEMA } from './Subscription';
import type { SubscriptionInfo } from './SubscriptionStorage';
export interface SubscriptionResponse<TFeat extends Record<string, unknown> = Record<string, unknown>> {
response: Representation;
info: SubscriptionInfo<TFeat>;
}
/**
* A specific subscription type as defined at https://solidproject.org/TR/notifications-protocol#subscription-types.
*/
export interface SubscriptionType<TSub extends typeof SUBSCRIBE_SCHEMA = typeof SUBSCRIBE_SCHEMA,
TFeat extends Record<string, unknown> = Record<string, unknown>> {
/**
* The expected type value in the JSON-LD body of requests subscribing for this subscription type.
*/
readonly type: string;
/**
* An extension of {@link SUBSCRIBE_SCHEMA} that can be used to parse and valide an incoming subscription request.
*/
readonly schema: TSub;
/**
* Determines which modes are required to allow the given subscription.
* @param subscription - The subscription to verify.
*
* @returns The required modes.
*/
extractModes: (subscription: InferType<TSub>) => Promise<AccessMap>;
/**
* Registers the given subscription.
* @param subscription - The subscription to register.
*
* @returns A {@link Representation} to return as a response and the generated {@link SubscriptionInfo}.
*/
subscribe: (subscription: InferType<TSub>) => Promise<SubscriptionResponse<TFeat>>;
}

View File

@@ -0,0 +1,28 @@
import { NotImplementedHttpError } from '../../util/errors/NotImplementedHttpError';
import type { NotificationHandlerInput } from './NotificationHandler';
import { NotificationHandler } from './NotificationHandler';
/**
* A {@link NotificationHandler} that only accepts input for a specific subscription type.
*/
export class TypedNotificationHandler extends NotificationHandler {
private readonly type: string;
private readonly source: NotificationHandler;
public constructor(type: string, source: NotificationHandler) {
super();
this.type = type;
this.source = source;
}
public async canHandle(input: NotificationHandlerInput): Promise<void> {
if (input.info.type !== this.type) {
throw new NotImplementedHttpError(`Only ${this.type} subscriptions are supported.`);
}
await this.source.canHandle(input);
}
public async handle(input: NotificationHandlerInput): Promise<void> {
await this.source.handle(input);
}
}

View File

@@ -0,0 +1,49 @@
import { getETag } from '../../../storage/Conditions';
import type { ResourceStore } from '../../../storage/ResourceStore';
import { NotImplementedHttpError } from '../../../util/errors/NotImplementedHttpError';
import { AS, RDF } from '../../../util/Vocabularies';
import type { Notification } from '../Notification';
import { CONTEXT_ACTIVITYSTREAMS, CONTEXT_NOTIFICATION } from '../Notification';
import type { NotificationHandlerInput } from '../NotificationHandler';
import { NotificationGenerator } from './NotificationGenerator';
/**
* A {@link NotificationGenerator} that creates a {@link Notification} by using the provided activity as type.
* Requests metadata of the topic from the {@link ResourceStore} to fill in the details.
*/
export class ActivityNotificationGenerator extends NotificationGenerator {
private readonly store: ResourceStore;
public constructor(store: ResourceStore) {
super();
this.store = store;
}
public async canHandle({ activity }: NotificationHandlerInput): Promise<void> {
if (!activity) {
throw new NotImplementedHttpError(`Only defined activities are supported.`);
}
}
public async handle({ topic, activity }: NotificationHandlerInput): Promise<Notification> {
const representation = await this.store.getRepresentation(topic, {});
representation.data.destroy();
const state = getETag(representation.metadata);
return {
'@context': [
CONTEXT_ACTIVITYSTREAMS,
CONTEXT_NOTIFICATION,
],
id: `urn:${Date.now()}:${topic.path}`,
type: [ activity!.value.slice(AS.namespace.length) ],
object: {
id: topic.path,
type: representation.metadata.getAll(RDF.terms.type).map((term): string => term.value),
},
state,
published: new Date().toISOString(),
};
}
}

View File

@@ -0,0 +1,35 @@
import { NotImplementedHttpError } from '../../../util/errors/NotImplementedHttpError';
import { AS } from '../../../util/Vocabularies';
import type { Notification } from '../Notification';
import { CONTEXT_ACTIVITYSTREAMS, CONTEXT_NOTIFICATION } from '../Notification';
import type { NotificationHandlerInput } from '../NotificationHandler';
import { NotificationGenerator } from './NotificationGenerator';
/**
* Generates a {@link Notification} for a resource that was deleted.
* This differs from other activity notifications in that there is no state and no resource metadata
* since the resource no longer exists.
*/
export class DeleteNotificationGenerator extends NotificationGenerator {
public async canHandle({ activity }: NotificationHandlerInput): Promise<void> {
if (!activity?.equals(AS.terms.Delete)) {
throw new NotImplementedHttpError(`Only Delete activity updates are supported.`);
}
}
public async handle({ topic }: NotificationHandlerInput): Promise<Notification> {
return {
'@context': [
CONTEXT_ACTIVITYSTREAMS,
CONTEXT_NOTIFICATION,
],
id: `urn:${Date.now()}:${topic.path}`,
type: [ 'Delete' ],
object: {
id: topic.path,
type: [],
},
published: new Date().toISOString(),
};
}
}

View File

@@ -0,0 +1,8 @@
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import type { Notification } from '../Notification';
import type { NotificationHandlerInput } from '../NotificationHandler';
/**
* Creates a {@link Notification} based on the provided input.
*/
export abstract class NotificationGenerator extends AsyncHandler<NotificationHandlerInput, Notification> { }

View File

@@ -0,0 +1,29 @@
import type { ResourceSet } from '../../../storage/ResourceSet';
import { AS } from '../../../util/Vocabularies';
import type { Notification } from '../Notification';
import type { NotificationHandlerInput } from '../NotificationHandler';
import { NotificationGenerator } from './NotificationGenerator';
/**
* Determines the most relevant activity for a {@link Notification} in case none was provided.
* This is relevant for the `state` feature where a subscription needs to know the current state of a resource.
*/
export class StateNotificationGenerator extends NotificationGenerator {
private readonly source: NotificationGenerator;
private readonly resourceSet: ResourceSet;
public constructor(source: NotificationGenerator, resourceSet: ResourceSet) {
super();
this.source = source;
this.resourceSet = resourceSet;
}
public async handle(input: NotificationHandlerInput): Promise<Notification> {
if (input.activity) {
return this.source.handleSafe(input);
}
const activity = await this.resourceSet.hasResource(input.topic) ? AS.terms.Update : AS.terms.Delete;
return this.source.handleSafe({ ...input, activity });
}
}

View File

@@ -0,0 +1,37 @@
import type { Representation } from '../../../http/representation/Representation';
import type { RepresentationPreferences } from '../../../http/representation/RepresentationPreferences';
import type { RepresentationConverter } from '../../../storage/conversion/RepresentationConverter';
import type { NotificationSerializerInput } from './NotificationSerializer';
import { NotificationSerializer } from './NotificationSerializer';
/**
* Converts a serialization based on the provided `accept` feature value.
* In case none was provided no conversion takes place.
*/
export class ConvertingNotificationSerializer extends NotificationSerializer {
private readonly source: NotificationSerializer;
private readonly converter: RepresentationConverter;
public constructor(source: NotificationSerializer, converter: RepresentationConverter) {
super();
this.source = source;
this.converter = converter;
}
public async canHandle(input: NotificationSerializerInput): Promise<void> {
await this.source.canHandle(input);
}
public async handle(input: NotificationSerializerInput): Promise<Representation> {
const representation = await this.source.handle(input);
const type = input.info.accept;
if (!type) {
return representation;
}
const preferences: RepresentationPreferences = { type: { [type]: 1 }};
return this.converter.handleSafe({ representation, preferences, identifier: { path: input.notification.id }});
}
}

View File

@@ -0,0 +1,14 @@
import { BasicRepresentation } from '../../../http/representation/BasicRepresentation';
import type { Representation } from '../../../http/representation/Representation';
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
import type { NotificationSerializerInput } from './NotificationSerializer';
import { NotificationSerializer } from './NotificationSerializer';
/**
* Serializes a Notification into a JSON-LD string.
*/
export class JsonLdNotificationSerializer extends NotificationSerializer {
public async handle({ notification }: NotificationSerializerInput): Promise<Representation> {
return new BasicRepresentation(JSON.stringify(notification), APPLICATION_LD_JSON);
}
}

View File

@@ -0,0 +1,17 @@
import type { Representation } from '../../../http/representation/Representation';
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import type { Notification } from '../Notification';
import type { SubscriptionInfo } from '../SubscriptionStorage';
export interface NotificationSerializerInput {
notification: Notification;
info: SubscriptionInfo;
}
/**
* Converts a {@link Notification} into a {@link Representation} that can be transmitted.
*
* The reason this is a separate class in between a generator and emitter,
* is so a specific subscription type can add extra metadata to the Representation if needed.
*/
export abstract class NotificationSerializer extends AsyncHandler<NotificationSerializerInput, Representation> { }