feat: Support Add/Remove notifications on containers

This commit is contained in:
Joachim Van Herwegen
2023-02-07 13:06:19 +01:00
parent 9e1e65cdb9
commit 134237a80f
17 changed files with 326 additions and 83 deletions

View File

@@ -1,3 +1,4 @@
import type { RepresentationMetadata } from '../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { GenericEventEmitter } from '../../util/GenericEventEmitter';
import { createGenericEventEmitterClass } from '../../util/GenericEventEmitter';
@@ -8,8 +9,11 @@ import type { AS, VocabularyTerm, VocabularyValue } from '../../util/Vocabularie
* Both generic `change` events and ActivityStream-specific events are emitted.
*/
export type ActivityEmitter =
GenericEventEmitter<'changed', (target: ResourceIdentifier, activity: VocabularyTerm<typeof AS>) => void> &
GenericEventEmitter<VocabularyValue<typeof AS>, (target: ResourceIdentifier) => void>;
GenericEventEmitter<'changed',
(target: ResourceIdentifier, activity: VocabularyTerm<typeof AS>, metadata: RepresentationMetadata) => void>
&
GenericEventEmitter<VocabularyValue<typeof AS>,
(target: ResourceIdentifier, metadata: RepresentationMetadata) => void>;
/**
* A class implementation of {@link ActivityEmitter}.

View File

@@ -1,3 +1,4 @@
import type { RepresentationMetadata } from '../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
import { createErrorMessage } from '../../util/errors/ErrorUtil';
@@ -28,14 +29,15 @@ export class ListeningActivityHandler extends StaticHandler {
this.storage = storage;
this.handler = handler;
emitter.on('changed', (topic, activity): void => {
this.emit(topic, activity).catch((error): void => {
emitter.on('changed', (topic, activity, metadata): void => {
this.emit(topic, activity, metadata).catch((error): void => {
this.logger.error(`Something went wrong emitting notifications: ${createErrorMessage(error)}`);
});
});
}
private async emit(topic: ResourceIdentifier, activity: VocabularyTerm<typeof AS>): Promise<void> {
private async emit(topic: ResourceIdentifier, activity: VocabularyTerm<typeof AS>,
metadata: RepresentationMetadata): Promise<void> {
const channelIds = await this.storage.getAll(topic);
for (const id of channelIds) {
@@ -57,7 +59,7 @@ export class ListeningActivityHandler extends StaticHandler {
// No need to wait on this to resolve before going to the next channel.
// Prevent failed notification from blocking other notifications.
this.handler.handleSafe({ channel, activity, topic })
this.handler.handleSafe({ channel, activity, topic, metadata })
.then((): Promise<void> => {
// Update the `lastEmit` value if the channel has a rate limit
if (channel.rate) {

View File

@@ -1,3 +1,4 @@
import type { RepresentationMetadata } from '../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { AsyncHandler } from '../../util/handlers/AsyncHandler';
import type { AS, VocabularyTerm } from '../../util/Vocabularies';
@@ -7,6 +8,7 @@ export interface NotificationHandlerInput {
topic: ResourceIdentifier;
channel: NotificationChannel;
activity?: VocabularyTerm<typeof AS>;
metadata?: RepresentationMetadata;
}
/**

View File

@@ -0,0 +1,56 @@
import { getETag } from '../../../storage/Conditions';
import type { ResourceStore } from '../../../storage/ResourceStore';
import { InternalServerError } from '../../../util/errors/InternalServerError';
import { NotImplementedHttpError } from '../../../util/errors/NotImplementedHttpError';
import { AS } from '../../../util/Vocabularies';
import type { Notification } from '../Notification';
import { CONTEXT_ACTIVITYSTREAMS, CONTEXT_NOTIFICATION } from '../Notification';
import type { NotificationHandlerInput } from '../NotificationHandler';
import { NotificationGenerator } from './NotificationGenerator';
/**
* A {@link NotificationGenerator} specifically for Add/Remove notifications.
* Creates the notification so the `target` is set to input topic,
* and the `object` value is extracted from the provided metadata.
*/
export class AddRemoveNotificationGenerator extends NotificationGenerator {
private readonly store: ResourceStore;
public constructor(store: ResourceStore) {
super();
this.store = store;
}
public async canHandle({ activity }: NotificationHandlerInput): Promise<void> {
if (!activity || (!activity.equals(AS.terms.Add) && !activity.equals(AS.terms.Remove))) {
throw new NotImplementedHttpError(`Only Add/Remove activity updates are supported.`);
}
}
public async handle({ activity, topic, metadata }: NotificationHandlerInput): Promise<Notification> {
const representation = await this.store.getRepresentation(topic, {});
representation.data.destroy();
const state = getETag(representation.metadata);
const objects = metadata?.getAll(AS.terms.object);
if (!objects || objects.length === 0) {
throw new InternalServerError(`Missing as:object metadata for ${activity?.value} activity on ${topic.path}`);
}
if (objects.length > 1) {
throw new InternalServerError(`Found more than one as:object for ${activity?.value} activity on ${topic.path}`);
}
return {
'@context': [
CONTEXT_ACTIVITYSTREAMS,
CONTEXT_NOTIFICATION,
],
id: `urn:${Date.now()}:${topic.path}`,
type: activity!.value.slice(AS.namespace.length),
object: objects[0].value,
target: topic.path,
state,
published: new Date().toISOString(),
};
}
}