feat: Add support for StreamingHTTPChannel2023 notifications

* feat: initial StremingHTTPChannel2023 notifications

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>

* test: unit for StremingHTTPChannel2023 notifications

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>

* test: integration for StremingHTTPChannel2023 notifications

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>

* emit initial notification on streaming http channel

* fix linting erros

* ensure canceling fetch body in integration tests

* extract defaultChannel for topic into util

* add documentation

* Apply suggestions from code review

Co-authored-by: Ted Thibodeau Jr <tthibodeau@openlinksw.com>

* only generate notifications when needed

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>

* test: set body timeout to pass on node >21

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>

* address review feedback

* remove node 21 workaround

* add architecture documentation

* Apply suggestions from code review

Co-authored-by: Joachim Van Herwegen <joachimvh@gmail.com>

---------

Co-authored-by: Maciej Samoraj <maciej.samoraj@gmail.com>
Co-authored-by: Ted Thibodeau Jr <tthibodeau@openlinksw.com>
Co-authored-by: Joachim Van Herwegen <joachimvh@gmail.com>
This commit is contained in:
elf Pavlik
2024-05-22 00:58:26 -06:00
committed by GitHub
parent 203f80020c
commit cb38613b4c
22 changed files with 1121 additions and 1 deletions

View File

@@ -404,6 +404,14 @@ export * from './server/notifications/WebSocketChannel2023/WebSocket2023Util';
export * from './server/notifications/WebSocketChannel2023/WebSocketMap';
export * from './server/notifications/WebSocketChannel2023/WebSocketChannel2023Type';
// Server/Notifications/StreamingHTTPChannel2023
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttp2023Util';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpListeningActivityHandler';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpMap';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpMetadataWriter';
export * from './server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler';
// Server/Notifications
export * from './server/notifications/ActivityEmitter';
export * from './server/notifications/BaseChannelType';

View File

@@ -0,0 +1,37 @@
import { getLoggerFor } from '../../../logging/LogUtil';
import type { Representation } from '../../../http/representation/Representation';
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
import type { NotificationChannel } from '../NotificationChannel';
import type { StreamingHttpMap } from './StreamingHttpMap';
export interface StreamingHttpEmitterInput {
channel: NotificationChannel;
representation: Representation;
}
/**
* Emits notifications on StreamingHTTPChannel2023 streams.
* Uses the response streams found in the provided map.
* The key should be the identifier of the topic resource.
*/
export class StreamingHttp2023Emitter extends AsyncHandler<StreamingHttpEmitterInput> {
protected readonly logger = getLoggerFor(this);
public constructor(
private readonly streamMap: StreamingHttpMap,
) {
super();
}
public async handle({ channel, representation }: StreamingHttpEmitterInput): Promise<void> {
// Called as a NotificationEmitter: emit the notification
const streams = this.streamMap.get(channel.topic);
if (streams) {
for (const stream of streams) {
representation.data.pipe(stream, { end: false });
}
} else {
representation.data.destroy();
}
}
}

View File

@@ -0,0 +1,17 @@
import type { ResourceIdentifier } from '../../../http/representation/ResourceIdentifier';
import { NOTIFY } from '../../../util/Vocabularies';
import type { NotificationChannel } from '../NotificationChannel';
/**
* Default StreamingHTTPChanel2023 for a topic.
* Currently channel description is only used internally and never sent to the client.
* The default channel uses Turtle.
*/
export function generateChannel(topic: ResourceIdentifier): NotificationChannel {
return {
id: `${topic.path}.channel`,
type: NOTIFY.StreamingHTTPChannel2023,
topic: topic.path,
accept: 'text/turtle',
};
}

View File

@@ -0,0 +1,49 @@
import type { RepresentationMetadata } from '../../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../../logging/LogUtil';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import { StaticHandler } from '../../../util/handlers/StaticHandler';
import type { AS, VocabularyTerm } from '../../../util/Vocabularies';
import type { ActivityEmitter } from '../ActivityEmitter';
import type { NotificationHandler } from '../NotificationHandler';
import { generateChannel } from './StreamingHttp2023Util';
import type { StreamingHttpMap } from './StreamingHttpMap';
/**
* Listens to an {@link ActivityEmitter} and calls the stored {@link NotificationHandler}s in case of an event
* for every matching notification channel found.
*
* Extends {@link StaticHandler} so it can be more easily injected into a Components.js configuration.
* No class takes this one as input, so to make sure Components.js instantiates it,
* it needs to be added somewhere where its presence has no impact, such as the list of initializers.
*/
export class StreamingHttpListeningActivityHandler extends StaticHandler {
protected readonly logger = getLoggerFor(this);
public constructor(
emitter: ActivityEmitter,
private readonly streamMap: StreamingHttpMap,
private readonly source: NotificationHandler,
) {
super();
emitter.on('changed', (topic, activity, metadata): void => {
if (this.streamMap.has(topic.path)) {
this.emit(topic, activity, metadata).catch(
(error): void => {
this.logger.error(`Error trying to handle notification for ${topic.path}: ${createErrorMessage(error)}`);
},
);
}
});
}
private async emit(
topic: ResourceIdentifier,
activity: VocabularyTerm<typeof AS>,
metadata: RepresentationMetadata,
): Promise<void> {
const channel = generateChannel(topic);
return this.source.handleSafe({ channel, activity, topic, metadata });
}
}

View File

@@ -0,0 +1,10 @@
import type { PassThrough } from 'node:stream';
import type { SingleThreaded } from '../../../init/cluster/SingleThreaded';
import { WrappedSetMultiMap } from '../../../util/map/WrappedSetMultiMap';
/**
* A {@link SetMultiMap} linking identifiers to a set of Streaming HTTP streams.
* An extension of {@link WrappedSetMultiMap} to make sure Components.js allows us to create this in the config,
* as {@link WrappedSetMultiMap} has a constructor not supported.
*/
export class StreamingHttpMap extends WrappedSetMultiMap<string, PassThrough> implements SingleThreaded {}

View File

@@ -0,0 +1,28 @@
import { getLoggerFor } from '../../../logging/LogUtil';
import type { HttpResponse } from '../../HttpResponse';
import { addHeader } from '../../../util/HeaderUtil';
import type { RepresentationMetadata } from '../../../http/representation/RepresentationMetadata';
import { MetadataWriter } from '../../../http/output/metadata/MetadataWriter';
/**
* A {@link MetadataWriter} that adds a link to the receiveFrom endpoint
* of the corresponding Streaming HTTP notifications channel
*/
export class StreamingHttpMetadataWriter extends MetadataWriter {
protected readonly logger = getLoggerFor(this);
public constructor(
private readonly baseUrl: string,
private readonly pathPrefix: string,
) {
super();
}
public async handle(input: { response: HttpResponse; metadata: RepresentationMetadata }): Promise<void> {
const resourcePath = input.metadata.identifier.value.replace(this.baseUrl, '');
const receiveFrom = `${this.baseUrl}${this.pathPrefix}${resourcePath}`;
const link = `<${receiveFrom}>; rel="http://www.w3.org/ns/solid/terms#updatesViaStreamingHttp2023"`;
this.logger.debug('Adding updatesViaStreamingHttp2023 to the Link header');
addHeader(input.response, 'Link', link);
}
}

View File

@@ -0,0 +1,79 @@
import { PassThrough } from 'node:stream';
import type { Credentials } from '../../../authentication/Credentials';
import type { CredentialsExtractor } from '../../../authentication/CredentialsExtractor';
import type { Authorizer } from '../../../authorization/Authorizer';
import type { PermissionReader } from '../../../authorization/PermissionReader';
import { AccessMode } from '../../../authorization/permissions/Permissions';
import { OkResponseDescription } from '../../../http/output/response/OkResponseDescription';
import type { ResponseDescription } from '../../../http/output/response/ResponseDescription';
import { BasicRepresentation } from '../../../http/representation/BasicRepresentation';
import { getLoggerFor } from '../../../logging/LogUtil';
import type { OperationHttpHandlerInput } from '../../OperationHttpHandler';
import { OperationHttpHandler } from '../../OperationHttpHandler';
import { guardStream } from '../../../util/GuardedStream';
import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
import type { NotificationGenerator } from '../generate/NotificationGenerator';
import type { NotificationSerializer } from '../serialize/NotificationSerializer';
import type { StreamingHttpMap } from './StreamingHttpMap';
import { generateChannel } from './StreamingHttp2023Util';
/**
* Handles request to Streaming HTTP receiveFrom endopints.
* All allowed requests are stored in the {@link StreamingHttpMap}
*/
export class StreamingHttpRequestHandler extends OperationHttpHandler {
protected logger = getLoggerFor(this);
public constructor(
private readonly streamMap: StreamingHttpMap,
private readonly pathPrefix: string,
private readonly generator: NotificationGenerator,
private readonly serializer: NotificationSerializer,
private readonly credentialsExtractor: CredentialsExtractor,
private readonly permissionReader: PermissionReader,
private readonly authorizer: Authorizer,
) {
super();
}
public async handle({ operation, request }: OperationHttpHandlerInput): Promise<ResponseDescription> {
const topic = operation.target.path.replace(this.pathPrefix, '');
// Verify if the client is allowed to connect
const credentials = await this.credentialsExtractor.handleSafe(request);
await this.authorize(credentials, topic);
const stream = guardStream(new PassThrough());
this.streamMap.add(topic, stream);
stream.on('error', (): boolean => this.streamMap.deleteEntry(topic, stream));
stream.on('close', (): boolean => this.streamMap.deleteEntry(topic, stream));
const channel = generateChannel({ path: topic });
// Send initial notification
try {
const notification = await this.generator.handle({ channel, topic: { path: topic }});
const representation = await this.serializer.handleSafe({ channel, notification });
representation.data.pipe(stream, { end: false });
} catch (error: unknown) {
this.logger.error(`Problem emitting initial notification: ${createErrorMessage(error)}`);
}
// Pre-established channels use Turtle
const representation = new BasicRepresentation(topic, operation.target, channel.accept);
return new OkResponseDescription(
representation.metadata,
stream,
);
}
private async authorize(credentials: Credentials, topic: string): Promise<void> {
const requestedModes = new IdentifierSetMultiMap<AccessMode>([[{ path: topic }, AccessMode.read ]]);
this.logger.debug(`Retrieved required modes: ${[ ...requestedModes.entrySets() ].join(',')}`);
const availablePermissions = await this.permissionReader.handleSafe({ credentials, requestedModes });
this.logger.debug(`Available permissions are ${[ ...availablePermissions.entries() ].join(',')}`);
await this.authorizer.handleSafe({ credentials, requestedModes, availablePermissions });
this.logger.debug(`Authorization succeeded, creating notification channel`);
}
}

View File

@@ -153,6 +153,7 @@ export const ACP = createVocabulary(
export const AS = createVocabulary(
'https://www.w3.org/ns/activitystreams#',
'object',
'target',
'Add',
'Create',
@@ -231,6 +232,7 @@ export const NOTIFY = createVocabulary(
'WebhookChannel2023',
'WebSocketChannel2023',
'StreamingHTTPChannel2023',
);
export const OIDC = createVocabulary(