mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
feat: Generalize and extend notification channel type behaviour
This commit is contained in:
241
src/server/notifications/BaseChannelType.ts
Normal file
241
src/server/notifications/BaseChannelType.ts
Normal file
@@ -0,0 +1,241 @@
|
||||
import { Readable } from 'stream';
|
||||
import { KeysRdfParseJsonLd } from '@comunica/context-entries';
|
||||
import { parse, toSeconds } from 'iso8601-duration';
|
||||
import type { Store } from 'n3';
|
||||
import type { NamedNode, Term } from 'rdf-js';
|
||||
import rdfParser from 'rdf-parse';
|
||||
import SHACLValidator from 'rdf-validate-shacl';
|
||||
import { v4 } from 'uuid';
|
||||
import type { Credentials } from '../../authentication/Credentials';
|
||||
import type { AccessMap } from '../../authorization/permissions/Permissions';
|
||||
import { AccessMode } from '../../authorization/permissions/Permissions';
|
||||
import { ContextDocumentLoader } from '../../storage/conversion/ConversionUtil';
|
||||
import { UnprocessableEntityHttpError } from '../../util/errors/UnprocessableEntityHttpError';
|
||||
import { IdentifierSetMultiMap } from '../../util/map/IdentifierMap';
|
||||
import { readableToQuads } from '../../util/StreamUtil';
|
||||
import { msToDuration } from '../../util/StringUtil';
|
||||
import { NOTIFY, RDF, XSD } from '../../util/Vocabularies';
|
||||
import { CONTEXT_NOTIFICATION } from './Notification';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
import type { NotificationChannelType } from './NotificationChannelType';
|
||||
import { DEFAULT_NOTIFICATION_FEATURES } from './NotificationDescriber';
|
||||
|
||||
/**
|
||||
* Helper type used to store information about the default features.
|
||||
*/
|
||||
type Feature = {
|
||||
predicate: NamedNode;
|
||||
key: keyof NotificationChannel;
|
||||
dataType: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* All the necessary fields of the default features that are possible for all Notification Channels.
|
||||
*/
|
||||
const featureDefinitions: Feature[] = [
|
||||
{ predicate: NOTIFY.terms.accept, key: 'accept', dataType: XSD.string },
|
||||
{ predicate: NOTIFY.terms.endAt, key: 'endAt', dataType: XSD.dateTime },
|
||||
{ predicate: NOTIFY.terms.rate, key: 'rate', dataType: XSD.duration },
|
||||
{ predicate: NOTIFY.terms.startAt, key: 'startAt', dataType: XSD.dateTime },
|
||||
{ predicate: NOTIFY.terms.state, key: 'state', dataType: XSD.string },
|
||||
];
|
||||
|
||||
// This context is slightly outdated but seems to be the only "official" source for a SHACL context.
|
||||
const CONTEXT_SHACL = 'https://w3c.github.io/shacl/shacl-jsonld-context/shacl.context.ld.json';
|
||||
/**
|
||||
* The SHACL shape for the minimum requirements on a notification channel subscription request.
|
||||
*/
|
||||
export const DEFAULT_SUBSCRIPTION_SHACL = {
|
||||
'@context': [ CONTEXT_SHACL ],
|
||||
'@type': 'sh:NodeShape',
|
||||
// Use the topic predicate to find the focus node
|
||||
targetSubjectsOf: NOTIFY.topic,
|
||||
closed: true,
|
||||
property: [
|
||||
{ path: RDF.type, minCount: 1, maxCount: 1, nodeKind: 'sh:IRI' },
|
||||
{ path: NOTIFY.topic, minCount: 1, maxCount: 1, nodeKind: 'sh:IRI' },
|
||||
...featureDefinitions.map((feat): unknown =>
|
||||
({ path: feat.predicate.value, maxCount: 1, datatype: feat.dataType })),
|
||||
],
|
||||
} as const;
|
||||
|
||||
/**
|
||||
* A {@link NotificationChannelType} that handles the base case of parsing and serializing a notification channel.
|
||||
* Note that the `extractModes` call always requires Read permissions on the target resource.
|
||||
*
|
||||
* Uses SHACL to validate the incoming data in `initChannel`.
|
||||
* Classes extending this can pass extra SHACL properties in the constructor to extend the validation check.
|
||||
*
|
||||
* The `completeChannel` implementation is an empty function.
|
||||
*/
|
||||
export abstract class BaseChannelType implements NotificationChannelType {
|
||||
protected readonly type: NamedNode;
|
||||
protected readonly shacl: unknown;
|
||||
protected shaclQuads?: Store;
|
||||
|
||||
/**
|
||||
* @param type - The URI of the notification channel type.
|
||||
* This will be added to the SHACL shape to validate incoming subscription data.
|
||||
* @param additionalShaclProperties - Any additional properties that need to be added to the default SHACL shape.
|
||||
*/
|
||||
protected constructor(type: NamedNode, additionalShaclProperties: unknown[] = []) {
|
||||
this.type = type;
|
||||
|
||||
// Inject requested properties into default SHACL shape
|
||||
this.shacl = {
|
||||
...DEFAULT_SUBSCRIPTION_SHACL,
|
||||
property: [
|
||||
...DEFAULT_SUBSCRIPTION_SHACL.property,
|
||||
// Add type check
|
||||
{ path: RDF.type, hasValue: { '@id': type.value }},
|
||||
...additionalShaclProperties,
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates the channel by first calling {@link validateSubscription} followed by {@link quadsToChannel}.
|
||||
* Subclasses can override either function safely to impact the result of the function.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
public async initChannel(data: Store, credentials: Credentials): Promise<NotificationChannel> {
|
||||
const subject = await this.validateSubscription(data);
|
||||
return this.quadsToChannel(data, subject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an N3.js {@link Store} containing quads corresponding to the stored SHACL representation.
|
||||
* Caches this result so the conversion from JSON-LD to quads only has to happen once.
|
||||
*/
|
||||
protected async getShaclQuads(): Promise<Store> {
|
||||
if (!this.shaclQuads) {
|
||||
const shaclStream = rdfParser.parse(
|
||||
Readable.from(JSON.stringify(this.shacl)),
|
||||
{
|
||||
contentType: 'application/ld+json',
|
||||
// Make sure our internal version of the context gets used
|
||||
[KeysRdfParseJsonLd.documentLoader.name]: new ContextDocumentLoader({
|
||||
[CONTEXT_SHACL]: '@css:templates/contexts/shacl.jsonld',
|
||||
}),
|
||||
},
|
||||
);
|
||||
this.shaclQuads = await readableToQuads(shaclStream);
|
||||
}
|
||||
return this.shaclQuads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates whether the given data conforms to the stored SHACL shape.
|
||||
* Will throw an {@link UnprocessableEntityHttpError} if validation fails.
|
||||
* Along with the SHACL check, this also makes sure there is only one matching entry in the dataset.
|
||||
*
|
||||
* @param data - The data to validate.
|
||||
*
|
||||
* @returns The focus node that corresponds to the subject of the found notification channel description.
|
||||
*/
|
||||
protected async validateSubscription(data: Store): Promise<Term> {
|
||||
// Need to make sure there is exactly one matching entry, which can't be done with SHACL.
|
||||
// The predicate used here must be the same as is used for `targetSubjectsOf` in the SHACL shape.
|
||||
const focusNodes = data.getSubjects(NOTIFY.terms.topic, null, null);
|
||||
if (focusNodes.length === 0) {
|
||||
throw new UnprocessableEntityHttpError('Missing topic value.');
|
||||
}
|
||||
if (focusNodes.length > 1) {
|
||||
throw new UnprocessableEntityHttpError('Only one subscription can be done at the same time.');
|
||||
}
|
||||
|
||||
const validator = new SHACLValidator(await this.getShaclQuads());
|
||||
const report = validator.validate(data);
|
||||
|
||||
if (!report.conforms) {
|
||||
// Use the first error to generate error message
|
||||
const result = report.results[0];
|
||||
const message = result.message[0];
|
||||
throw new UnprocessableEntityHttpError(`${message.value} - ${result.path?.value}`);
|
||||
}
|
||||
|
||||
// From this point on, we can assume the subject corresponds to a valid subscription request
|
||||
return focusNodes[0] as NamedNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a set of quads to a {@link NotificationChannel}.
|
||||
* Assumes the data is valid, so this should be called after {@link validateSubscription}
|
||||
*
|
||||
* The values of the default features will be added to the resulting channel,
|
||||
* subclasses with additional features that need to be added are responsible for parsing those quads.
|
||||
*
|
||||
* @param data - Data to convert.
|
||||
* @param subject - The identifier of the notification channel description in the dataset.
|
||||
*
|
||||
* @returns The generated {@link NotificationChannel}.
|
||||
*/
|
||||
protected async quadsToChannel(data: Store, subject: Term): Promise<NotificationChannel> {
|
||||
const topic = data.getObjects(subject, NOTIFY.terms.topic, null)[0] as NamedNode;
|
||||
const type = data.getObjects(subject, RDF.terms.type, null)[0] as NamedNode;
|
||||
|
||||
const channel: NotificationChannel = {
|
||||
id: `${v4()}:${topic.value}`,
|
||||
type: type.value,
|
||||
topic: topic.value,
|
||||
};
|
||||
|
||||
// Apply the values for all present features that are enabled
|
||||
for (const feature of DEFAULT_NOTIFICATION_FEATURES) {
|
||||
const objects = data.getObjects(subject, feature, null);
|
||||
if (objects.length === 1) {
|
||||
// Will always succeed since we are iterating over a list which was built using `featureDefinitions`
|
||||
const { dataType, key } = featureDefinitions.find((feat): boolean => feat.predicate.value === feature)!;
|
||||
let val: string | number = objects[0].value;
|
||||
if (dataType === XSD.dateTime) {
|
||||
val = Date.parse(val);
|
||||
} else if (dataType === XSD.duration) {
|
||||
val = toSeconds(parse(val)) * 1000;
|
||||
}
|
||||
// Need to convince TS that we can assign `string | number` to this key
|
||||
(channel as Record<typeof key, string | number>)[key] = val;
|
||||
}
|
||||
}
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the given channel to a JSON-LD description.
|
||||
* All fields found in the channel, except `lastEmit`, will be part of the result subject,
|
||||
* so subclasses should remove any fields that should not be exposed.
|
||||
*/
|
||||
public async toJsonLd(channel: NotificationChannel): Promise<Record<string, unknown>> {
|
||||
const result: Record<string, unknown> = {
|
||||
'@context': [
|
||||
CONTEXT_NOTIFICATION,
|
||||
],
|
||||
...channel,
|
||||
};
|
||||
// No need to expose this field
|
||||
delete result.lastEmit;
|
||||
|
||||
// Convert all the epoch values back to the expected date/rate format
|
||||
for (const { key, dataType } of featureDefinitions) {
|
||||
const value = channel[key];
|
||||
if (value) {
|
||||
if (dataType === XSD.dateTime) {
|
||||
result[key] = new Date(value).toISOString();
|
||||
} else if (dataType === XSD.duration) {
|
||||
result[key] = msToDuration(value as number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public async extractModes(channel: NotificationChannel): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: channel.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
public async completeChannel(channel: NotificationChannel): Promise<void> {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,9 @@
|
||||
import { v4 } from 'uuid';
|
||||
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import type { KeyValueStorage } from '../../storage/keyvalue/KeyValueStorage';
|
||||
import { InternalServerError } from '../../util/errors/InternalServerError';
|
||||
import type { ReadWriteLocker } from '../../util/locking/ReadWriteLocker';
|
||||
import type { NotificationChannel, NotificationChannelJson } from './NotificationChannel';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
import type { NotificationChannelStorage } from './NotificationChannelStorage';
|
||||
|
||||
type StorageValue = string | string[] | NotificationChannel;
|
||||
@@ -25,20 +24,6 @@ export class KeyValueChannelStorage implements NotificationChannelStorage {
|
||||
this.locker = locker;
|
||||
}
|
||||
|
||||
public create(channel: NotificationChannelJson, features: Record<string, unknown>): NotificationChannel {
|
||||
return {
|
||||
id: `${channel.type}:${v4()}:${channel.topic}`,
|
||||
topic: channel.topic,
|
||||
type: channel.type,
|
||||
startAt: channel.startAt,
|
||||
endAt: channel.endAt,
|
||||
accept: channel.accept,
|
||||
rate: channel.rate,
|
||||
state: channel.state,
|
||||
...features,
|
||||
};
|
||||
}
|
||||
|
||||
public async get(id: string): Promise<NotificationChannel | undefined> {
|
||||
const channel = await this.storage.get(id);
|
||||
if (channel && this.isChannel(channel)) {
|
||||
|
||||
@@ -1,46 +1,59 @@
|
||||
import { parse, toSeconds } from 'iso8601-duration';
|
||||
import type { InferType } from 'yup';
|
||||
import { array, number, object, string } from 'yup';
|
||||
import { CONTEXT_NOTIFICATION } from './Notification';
|
||||
|
||||
/**
|
||||
* A JSON parsing schema that can be used to parse a notification channel sent during subscription.
|
||||
* Specific notification channels can extend this schema with their own custom keys.
|
||||
* Internal representation of a notification channel.
|
||||
* Most of the fields are those defined in
|
||||
* https://solid.github.io/notifications/protocol#notification-channel-data-model
|
||||
*
|
||||
* We only support notification channels with a single topic.
|
||||
*/
|
||||
export const NOTIFICATION_CHANNEL_SCHEMA = object({
|
||||
'@context': array(string()).ensure().required().test({
|
||||
name: 'RequireNotificationContext',
|
||||
message: `The ${CONTEXT_NOTIFICATION} context is required in the notification channel JSON-LD body.`,
|
||||
test: (context): boolean => Boolean(context?.includes(CONTEXT_NOTIFICATION)),
|
||||
}),
|
||||
type: string().required(),
|
||||
topic: string().required(),
|
||||
state: string().optional(),
|
||||
startAt: number().transform((value, original): number | undefined =>
|
||||
// Convert the date string to milliseconds
|
||||
Date.parse(original)).optional(),
|
||||
endAt: number().transform((value, original): number | undefined =>
|
||||
// Convert the date string to milliseconds
|
||||
Date.parse(original)).optional(),
|
||||
rate: number().transform((value, original): number | undefined =>
|
||||
// Convert the rate string to milliseconds
|
||||
toSeconds(parse(original)) * 1000).optional(),
|
||||
accept: string().optional(),
|
||||
});
|
||||
export type NotificationChannelJson = InferType<typeof NOTIFICATION_CHANNEL_SCHEMA>;
|
||||
|
||||
/**
|
||||
* The info provided for a notification channel during a subscription.
|
||||
* `features` can contain custom values relevant for a specific channel type.
|
||||
*/
|
||||
export type NotificationChannel = {
|
||||
export interface NotificationChannel {
|
||||
/**
|
||||
* The unique identifier of the channel.
|
||||
*/
|
||||
id: string;
|
||||
topic: string;
|
||||
/**
|
||||
* The channel type.
|
||||
*/
|
||||
type: string;
|
||||
startAt?: number;
|
||||
endAt?: number;
|
||||
accept?: string;
|
||||
rate?: number;
|
||||
/**
|
||||
* The resource this channel sends notifications about.
|
||||
*/
|
||||
topic: string;
|
||||
/**
|
||||
* The state parameter sent by the receiver.
|
||||
* This is used to send a notification when the channel is established and the topic resource has a different state.
|
||||
*/
|
||||
state?: string;
|
||||
/**
|
||||
* When the channel should start sending notifications, in milliseconds since epoch.
|
||||
*/
|
||||
startAt?: number;
|
||||
/**
|
||||
* When the channel should stop existing, in milliseconds since epoch.
|
||||
*/
|
||||
endAt?: number;
|
||||
/**
|
||||
* The minimal time required between notifications, in milliseconds.
|
||||
*/
|
||||
rate?: number;
|
||||
/**
|
||||
* The media type in which the receiver expects the notifications.
|
||||
*/
|
||||
accept?: string;
|
||||
/**
|
||||
* The resource receivers can use to establish a connection and receive notifications.
|
||||
*/
|
||||
receiveFrom?: string;
|
||||
/**
|
||||
* The resource on the receiver where notifications can be sent.
|
||||
*/
|
||||
sendTo?: string;
|
||||
/**
|
||||
* Can be used to identify the sender.
|
||||
*/
|
||||
sender?: string;
|
||||
|
||||
/**
|
||||
* Internal value that we use to track when this channel last sent a notification.
|
||||
*/
|
||||
lastEmit?: number;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
|
||||
import type { NotificationChannel, NotificationChannelJson } from './NotificationChannel';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
|
||||
/**
|
||||
* Stores all the information necessary to keep track of notification channels.
|
||||
@@ -9,15 +9,7 @@ import type { NotificationChannel, NotificationChannelJson } from './Notificatio
|
||||
*/
|
||||
export interface NotificationChannelStorage {
|
||||
/**
|
||||
* Creates channel corresponding to the given channel and features.
|
||||
* This does not store the generated channel in the storage.
|
||||
* @param channel - Notification channel to generate channel of.
|
||||
* @param features - Features to add to the channel
|
||||
*/
|
||||
create: (channel: NotificationChannelJson, features: Record<string, unknown>) => NotificationChannel;
|
||||
|
||||
/**
|
||||
* Returns the channel for the requested notification channel.
|
||||
* Returns the requested channel.
|
||||
* `undefined` if no match was found or if the notification channel expired.
|
||||
* @param id - The identifier of the notification channel.
|
||||
*/
|
||||
|
||||
@@ -1,42 +1,41 @@
|
||||
import type { InferType } from 'yup';
|
||||
import type { Store } from 'n3';
|
||||
import type { Credentials } from '../../authentication/Credentials';
|
||||
import type { AccessMap } from '../../authorization/permissions/Permissions';
|
||||
import type { Representation } from '../../http/representation/Representation';
|
||||
import type { NOTIFICATION_CHANNEL_SCHEMA, NotificationChannel } from './NotificationChannel';
|
||||
|
||||
export interface NotificationChannelResponse {
|
||||
response: Representation;
|
||||
channel: NotificationChannel;
|
||||
}
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
|
||||
/**
|
||||
* A specific channel type as defined at
|
||||
* https://solidproject.org/TR/2022/notifications-protocol-20221231#notification-channel-types.
|
||||
*
|
||||
* All functions that take a {@link NotificationChannel} as input
|
||||
* only need to support channels generated by an `initChannel` on the same class.
|
||||
*/
|
||||
export interface NotificationChannelType<
|
||||
TSub extends typeof NOTIFICATION_CHANNEL_SCHEMA = typeof NOTIFICATION_CHANNEL_SCHEMA> {
|
||||
export interface NotificationChannelType {
|
||||
/**
|
||||
* The expected type value in the JSON-LD body of requests subscribing for this notification channel type.
|
||||
* Validate and convert the input quads into a {@link NotificationChannel}.
|
||||
* @param data - The input quads.
|
||||
* @param credentials - The credentials of the agent doing the request.
|
||||
*/
|
||||
readonly type: string;
|
||||
initChannel: (data: Store, credentials: Credentials) => Promise<NotificationChannel>;
|
||||
|
||||
/**
|
||||
* An extension of {@link NOTIFICATION_CHANNEL_SCHEMA}
|
||||
* that can be used to parse and validate an incoming subscription request with a notification channel body.
|
||||
* Converts a {@link NotificationChannel} to a serialized JSON-LD representation.
|
||||
* @param channel - The notification channel to serialize.
|
||||
*/
|
||||
readonly schema: TSub;
|
||||
toJsonLd: (channel: NotificationChannel) => Promise<Record<string, unknown>>;
|
||||
|
||||
/**
|
||||
* Determines which modes are required to allow the given notification channel.
|
||||
* @param channel - The notification channel to verify.
|
||||
*
|
||||
* @returns The required modes.
|
||||
*/
|
||||
extractModes: (json: InferType<TSub>) => Promise<AccessMap>;
|
||||
extractModes: (channel: NotificationChannel) => Promise<AccessMap>;
|
||||
|
||||
/**
|
||||
* Registers the given notification channel.
|
||||
* @param channel - The notification channel to register.
|
||||
* @param credentials - The credentials of the client trying to subscribe.
|
||||
*
|
||||
* @returns A {@link Representation} to return as a response and the generated {@link NotificationChannel}.
|
||||
* This function will be called after the serialized channel is sent back as a response,
|
||||
* allowing for any final actions that need to happen.
|
||||
* @param channel - The notification channel that is completed.
|
||||
*/
|
||||
subscribe: (json: InferType<TSub>, credentials: Credentials) => Promise<NotificationChannelResponse>;
|
||||
completeChannel: (channel: NotificationChannel) => Promise<void>;
|
||||
}
|
||||
|
||||
@@ -4,16 +4,17 @@ import type { Authorizer } from '../../authorization/Authorizer';
|
||||
import type { PermissionReader } from '../../authorization/PermissionReader';
|
||||
import { OkResponseDescription } from '../../http/output/response/OkResponseDescription';
|
||||
import type { ResponseDescription } from '../../http/output/response/ResponseDescription';
|
||||
import { BasicRepresentation } from '../../http/representation/BasicRepresentation';
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import { APPLICATION_LD_JSON } from '../../util/ContentTypes';
|
||||
import type { RepresentationConverter } from '../../storage/conversion/RepresentationConverter';
|
||||
import { APPLICATION_LD_JSON, INTERNAL_QUADS } from '../../util/ContentTypes';
|
||||
import { createErrorMessage } from '../../util/errors/ErrorUtil';
|
||||
import { UnprocessableEntityHttpError } from '../../util/errors/UnprocessableEntityHttpError';
|
||||
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
|
||||
import { readableToString } from '../../util/StreamUtil';
|
||||
import type { HttpRequest } from '../HttpRequest';
|
||||
import { endOfStream, readableToQuads } from '../../util/StreamUtil';
|
||||
import type { OperationHttpHandlerInput } from '../OperationHttpHandler';
|
||||
import { OperationHttpHandler } from '../OperationHttpHandler';
|
||||
import type { NotificationChannelJson } from './NotificationChannel';
|
||||
import type { NotificationChannel } from './NotificationChannel';
|
||||
import type { NotificationChannelStorage } from './NotificationChannelStorage';
|
||||
import type { NotificationChannelType } from './NotificationChannelType';
|
||||
|
||||
export interface NotificationSubscriberArgs {
|
||||
@@ -21,6 +22,10 @@ export interface NotificationSubscriberArgs {
|
||||
* The {@link NotificationChannelType} with all the necessary information.
|
||||
*/
|
||||
channelType: NotificationChannelType;
|
||||
/**
|
||||
* {@link RepresentationConverter} used to convert input data into RDF.
|
||||
*/
|
||||
converter: RepresentationConverter;
|
||||
/**
|
||||
* Used to extract the credentials from the request.
|
||||
*/
|
||||
@@ -33,6 +38,10 @@ export interface NotificationSubscriberArgs {
|
||||
* Used to determine if the request has the necessary permissions.
|
||||
*/
|
||||
authorizer: Authorizer;
|
||||
/**
|
||||
* Storage used to store the channels.
|
||||
*/
|
||||
storage: NotificationChannelStorage;
|
||||
/**
|
||||
* Overrides the expiration feature of channels, by making sure they always expire after the `maxDuration` value.
|
||||
* If the expiration of the channel is shorter than `maxDuration`, the original value will be kept.
|
||||
@@ -46,34 +55,42 @@ export interface NotificationSubscriberArgs {
|
||||
*
|
||||
* Uses the information from the provided {@link NotificationChannelType} to validate the input
|
||||
* and verify the request has the required permissions available.
|
||||
* If successful the generated channel will be stored in a {@link NotificationChannelStorage}.
|
||||
*/
|
||||
export class NotificationSubscriber extends OperationHttpHandler {
|
||||
protected logger = getLoggerFor(this);
|
||||
|
||||
private readonly channelType: NotificationChannelType;
|
||||
private readonly converter: RepresentationConverter;
|
||||
private readonly credentialsExtractor: CredentialsExtractor;
|
||||
private readonly permissionReader: PermissionReader;
|
||||
private readonly authorizer: Authorizer;
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly maxDuration: number;
|
||||
|
||||
public constructor(args: NotificationSubscriberArgs) {
|
||||
super();
|
||||
this.channelType = args.channelType;
|
||||
this.converter = args.converter;
|
||||
this.credentialsExtractor = args.credentialsExtractor;
|
||||
this.permissionReader = args.permissionReader;
|
||||
this.authorizer = args.authorizer;
|
||||
this.storage = args.storage;
|
||||
this.maxDuration = (args.maxDuration ?? 0) * 60 * 1000;
|
||||
}
|
||||
|
||||
public async handle({ operation, request }: OperationHttpHandlerInput): Promise<ResponseDescription> {
|
||||
if (operation.body.metadata.contentType !== APPLICATION_LD_JSON) {
|
||||
throw new UnsupportedMediaTypeHttpError('Subscribe bodies need to be application/ld+json.');
|
||||
}
|
||||
const credentials = await this.credentialsExtractor.handleSafe(request);
|
||||
this.logger.debug(`Extracted credentials: ${JSON.stringify(credentials)}`);
|
||||
|
||||
let channel: NotificationChannelJson;
|
||||
let channel: NotificationChannel;
|
||||
try {
|
||||
const json = JSON.parse(await readableToString(operation.body.data));
|
||||
channel = await this.channelType.schema.validate(json);
|
||||
const quadStream = await this.converter.handleSafe({
|
||||
identifier: operation.target,
|
||||
representation: operation.body,
|
||||
preferences: { type: { [INTERNAL_QUADS]: 1 }},
|
||||
});
|
||||
channel = await this.channelType.initChannel(await readableToQuads(quadStream.data), credentials);
|
||||
} catch (error: unknown) {
|
||||
throw new UnprocessableEntityHttpError(`Unable to process notification channel: ${createErrorMessage(error)}`);
|
||||
}
|
||||
@@ -86,17 +103,27 @@ export class NotificationSubscriber extends OperationHttpHandler {
|
||||
}
|
||||
|
||||
// Verify if the client is allowed to subscribe
|
||||
const credentials = await this.authorize(request, channel);
|
||||
await this.authorize(credentials, channel);
|
||||
|
||||
const { response } = await this.channelType.subscribe(channel, credentials);
|
||||
// Store the channel once it has been authorized
|
||||
await this.storage.add(channel);
|
||||
|
||||
// Generate the response JSON-LD
|
||||
const jsonld = await this.channelType.toJsonLd(channel);
|
||||
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
|
||||
|
||||
// Complete the channel once the response has been sent out
|
||||
endOfStream(response.data)
|
||||
.then((): Promise<void> => this.channelType.completeChannel(channel))
|
||||
.catch((error): void => {
|
||||
this.logger.error(`There was an issue completing notification channel ${channel.id}: ${
|
||||
createErrorMessage(error)}`);
|
||||
});
|
||||
|
||||
return new OkResponseDescription(response.metadata, response.data);
|
||||
}
|
||||
|
||||
private async authorize(request: HttpRequest, channel: NotificationChannelJson): Promise<Credentials> {
|
||||
const credentials = await this.credentialsExtractor.handleSafe(request);
|
||||
this.logger.debug(`Extracted credentials: ${JSON.stringify(credentials)}`);
|
||||
|
||||
private async authorize(credentials: Credentials, channel: NotificationChannel): Promise<void> {
|
||||
const requestedModes = await this.channelType.extractModes(channel);
|
||||
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ]}`);
|
||||
|
||||
@@ -104,8 +131,6 @@ export class NotificationSubscriber extends OperationHttpHandler {
|
||||
this.logger.debug(`Available permissions are ${[ ...availablePermissions.entries() ]}`);
|
||||
|
||||
await this.authorizer.handleSafe({ credentials, requestedModes, availablePermissions });
|
||||
this.logger.verbose(`Authorization succeeded, creating notification channel`);
|
||||
|
||||
return credentials;
|
||||
this.logger.debug(`Authorization succeeded, creating notification channel`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,31 +1,15 @@
|
||||
import type { InferType } from 'yup';
|
||||
import { string } from 'yup';
|
||||
import type { Store } from 'n3';
|
||||
import type { Credentials } from '../../../authentication/Credentials';
|
||||
import type { AccessMap } from '../../../authorization/permissions/Permissions';
|
||||
import { AccessMode } from '../../../authorization/permissions/Permissions';
|
||||
import { BasicRepresentation } from '../../../http/representation/BasicRepresentation';
|
||||
import type { InteractionRoute } from '../../../identity/interaction/routing/InteractionRoute';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
|
||||
import { BadRequestHttpError } from '../../../util/errors/BadRequestHttpError';
|
||||
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
|
||||
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { endOfStream } from '../../../util/StreamUtil';
|
||||
import { CONTEXT_NOTIFICATION } from '../Notification';
|
||||
import { NOTIFY } from '../../../util/Vocabularies';
|
||||
import { BaseChannelType } from '../BaseChannelType';
|
||||
import type { NotificationChannel } from '../NotificationChannel';
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../NotificationChannel';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { NotificationChannelResponse, NotificationChannelType } from '../NotificationChannelType';
|
||||
import type { StateHandler } from '../StateHandler';
|
||||
import { generateWebHookUnsubscribeUrl } from './WebHook2021Util';
|
||||
|
||||
const type = 'WebHookSubscription2021';
|
||||
const schema = NOTIFICATION_CHANNEL_SCHEMA.shape({
|
||||
type: string().required().oneOf([ type ]),
|
||||
// Not using `.url()` validator since it does not support localhost URLs
|
||||
target: string().required(),
|
||||
});
|
||||
|
||||
/**
|
||||
* A {@link NotificationChannel} containing the necessary fields for a WebHookSubscription2021 channel.
|
||||
*/
|
||||
@@ -33,7 +17,7 @@ export interface WebHookSubscription2021Channel extends NotificationChannel {
|
||||
/**
|
||||
* The "WebHookSubscription2021" type.
|
||||
*/
|
||||
type: typeof type;
|
||||
type: typeof NOTIFY.WebHookSubscription2021;
|
||||
/**
|
||||
* Where the notifications have to be sent.
|
||||
*/
|
||||
@@ -50,7 +34,7 @@ export interface WebHookSubscription2021Channel extends NotificationChannel {
|
||||
}
|
||||
|
||||
export function isWebHook2021Channel(channel: NotificationChannel): channel is WebHookSubscription2021Channel {
|
||||
return channel.type === type;
|
||||
return channel.type === NOTIFY.WebHookSubscription2021;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -61,29 +45,27 @@ export function isWebHook2021Channel(channel: NotificationChannel): channel is W
|
||||
*
|
||||
* Also handles the `state` feature if present.
|
||||
*/
|
||||
export class WebHookSubscription2021 implements NotificationChannelType<typeof schema> {
|
||||
export class WebHookSubscription2021 extends BaseChannelType {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly unsubscribePath: string;
|
||||
private readonly stateHandler: StateHandler;
|
||||
|
||||
public readonly type = type;
|
||||
public readonly schema = schema;
|
||||
|
||||
public constructor(storage: NotificationChannelStorage, unsubscribeRoute: InteractionRoute,
|
||||
stateHandler: StateHandler) {
|
||||
this.storage = storage;
|
||||
public constructor(unsubscribeRoute: InteractionRoute, stateHandler: StateHandler) {
|
||||
super(NOTIFY.terms.WebHookSubscription2021,
|
||||
// Need to remember to remove `target` from the vocabulary again once this is updated to webhooks 2023,
|
||||
// as it is not actually part of the vocabulary.
|
||||
// Technically we should also require that this node is a named node,
|
||||
// but that would require clients to send `target: { '@id': 'http://example.com/target' }`,
|
||||
// which would make this more annoying so we are lenient here.
|
||||
// Could change in the future once this field is updated and part of the context.
|
||||
[{ path: NOTIFY.target, minCount: 1, maxCount: 1 }]);
|
||||
this.unsubscribePath = unsubscribeRoute.getPath();
|
||||
this.stateHandler = stateHandler;
|
||||
}
|
||||
|
||||
public async extractModes(json: InferType<typeof schema>): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: json.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
public async subscribe(json: InferType<typeof schema>, credentials: Credentials):
|
||||
Promise<NotificationChannelResponse> {
|
||||
public async initChannel(data: Store, credentials: Credentials): Promise<WebHookSubscription2021Channel> {
|
||||
// The WebID is used to verify who can unsubscribe
|
||||
const webId = credentials.agent?.webId;
|
||||
|
||||
if (!webId) {
|
||||
@@ -92,27 +74,36 @@ export class WebHookSubscription2021 implements NotificationChannelType<typeof s
|
||||
);
|
||||
}
|
||||
|
||||
const channel = this.storage.create(json, { target: json.target, webId });
|
||||
await this.storage.add(channel);
|
||||
const subject = await this.validateSubscription(data);
|
||||
const channel = await this.quadsToChannel(data, subject);
|
||||
const target = data.getObjects(subject, NOTIFY.terms.target, null)[0];
|
||||
|
||||
const jsonld = {
|
||||
'@context': [ CONTEXT_NOTIFICATION ],
|
||||
type: this.type,
|
||||
target: json.target,
|
||||
return {
|
||||
...channel,
|
||||
type: NOTIFY.WebHookSubscription2021,
|
||||
webId,
|
||||
target: target.value,
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
unsubscribe_endpoint: generateWebHookUnsubscribeUrl(this.unsubscribePath, channel.id),
|
||||
};
|
||||
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
|
||||
}
|
||||
|
||||
// We want to send the state notification, if there is one,
|
||||
// right after we send the response for subscribing.
|
||||
// We do this by waiting for the response to be closed.
|
||||
endOfStream(response.data)
|
||||
.then((): Promise<void> => this.stateHandler.handleSafe({ channel }))
|
||||
.catch((error): void => {
|
||||
this.logger.error(`Error emitting state notification: ${createErrorMessage(error)}`);
|
||||
});
|
||||
public async toJsonLd(channel: NotificationChannel): Promise<Record<string, unknown>> {
|
||||
const json = await super.toJsonLd(channel);
|
||||
|
||||
return { response, channel };
|
||||
// We don't want to expose the WebID that initialized the notification channel.
|
||||
// This is not really specified either way in the spec so this might change in the future.
|
||||
delete json.webId;
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
public async completeChannel(channel: NotificationChannel): Promise<void> {
|
||||
try {
|
||||
// Send the state notification, if there is one
|
||||
await this.stateHandler.handleSafe({ channel });
|
||||
} catch (error: unknown) {
|
||||
this.logger.error(`Error emitting state notification: ${createErrorMessage(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,29 @@
|
||||
import { string } from 'yup';
|
||||
import type { AccessMap } from '../../../authorization/permissions/Permissions';
|
||||
import { AccessMode } from '../../../authorization/permissions/Permissions';
|
||||
import { BasicRepresentation } from '../../../http/representation/BasicRepresentation';
|
||||
import type { Store } from 'n3';
|
||||
import type { Credentials } from '../../../authentication/Credentials';
|
||||
import type { InteractionRoute } from '../../../identity/interaction/routing/InteractionRoute';
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import { APPLICATION_LD_JSON } from '../../../util/ContentTypes';
|
||||
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { CONTEXT_NOTIFICATION } from '../Notification';
|
||||
import type { NotificationChannelJson } from '../NotificationChannel';
|
||||
import { NOTIFICATION_CHANNEL_SCHEMA } from '../NotificationChannel';
|
||||
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
|
||||
import type { NotificationChannelResponse, NotificationChannelType } from '../NotificationChannelType';
|
||||
import { NOTIFY } from '../../../util/Vocabularies';
|
||||
import { BaseChannelType } from '../BaseChannelType';
|
||||
import type { NotificationChannel } from '../NotificationChannel';
|
||||
import { generateWebSocketUrl } from './WebSocket2021Util';
|
||||
|
||||
const type = 'WebSocketSubscription2021';
|
||||
const schema = NOTIFICATION_CHANNEL_SCHEMA.shape({
|
||||
type: string().required().oneOf([ type ]),
|
||||
});
|
||||
/**
|
||||
* A {@link NotificationChannel} containing the necessary fields for a WebSocketSubscription2021 channel.
|
||||
*/
|
||||
export interface WebSocketSubscription2021Channel extends NotificationChannel {
|
||||
/**
|
||||
* The "notify:WebSocketSubscription2021" type.
|
||||
*/
|
||||
type: typeof NOTIFY.WebSocketSubscription2021;
|
||||
/**
|
||||
* The WebSocket through which the channel will send notifications.
|
||||
*/
|
||||
source: string;
|
||||
}
|
||||
|
||||
export function isWebSocket2021Channel(channel: NotificationChannel): channel is WebSocketSubscription2021Channel {
|
||||
return channel.type === NOTIFY.WebSocketSubscription2021;
|
||||
}
|
||||
|
||||
/**
|
||||
* The notification channel type WebSocketSubscription2021 as described in
|
||||
@@ -24,35 +31,22 @@ const schema = NOTIFICATION_CHANNEL_SCHEMA.shape({
|
||||
*
|
||||
* Requires read permissions on a resource to be able to receive notifications.
|
||||
*/
|
||||
export class WebSocketSubscription2021 implements NotificationChannelType<typeof schema> {
|
||||
export class WebSocketSubscription2021 extends BaseChannelType {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
private readonly storage: NotificationChannelStorage;
|
||||
private readonly path: string;
|
||||
|
||||
public readonly type = type;
|
||||
public readonly schema = schema;
|
||||
|
||||
public constructor(storage: NotificationChannelStorage, route: InteractionRoute) {
|
||||
this.storage = storage;
|
||||
public constructor(route: InteractionRoute) {
|
||||
super(NOTIFY.terms.WebSocketSubscription2021);
|
||||
this.path = route.getPath();
|
||||
}
|
||||
|
||||
public async extractModes(json: NotificationChannelJson): Promise<AccessMap> {
|
||||
return new IdentifierSetMultiMap<AccessMode>([[{ path: json.topic }, AccessMode.read ]]);
|
||||
}
|
||||
|
||||
public async subscribe(json: NotificationChannelJson): Promise<NotificationChannelResponse> {
|
||||
const channel = this.storage.create(json, {});
|
||||
await this.storage.add(channel);
|
||||
|
||||
const jsonld = {
|
||||
'@context': [ CONTEXT_NOTIFICATION ],
|
||||
type: this.type,
|
||||
public async initChannel(data: Store, credentials: Credentials): Promise<WebSocketSubscription2021Channel> {
|
||||
const channel = await super.initChannel(data, credentials);
|
||||
return {
|
||||
...channel,
|
||||
type: NOTIFY.WebSocketSubscription2021,
|
||||
source: generateWebSocketUrl(this.path, channel.id),
|
||||
};
|
||||
const response = new BasicRepresentation(JSON.stringify(jsonld), APPLICATION_LD_JSON);
|
||||
|
||||
return { response, channel };
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user