feat: Replace expiration feature with startAt and endAt

This commit is contained in:
Joachim Van Herwegen 2023-01-24 13:44:49 +01:00
parent 10980e90a3
commit caee563dd6
11 changed files with 54 additions and 17 deletions

View File

@ -31,7 +31,8 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
topic: subscription.topic, topic: subscription.topic,
type: subscription.type, type: subscription.type,
lastEmit: 0, lastEmit: 0,
expiration: subscription.expiration, startAt: subscription.startAt,
endAt: subscription.endAt,
accept: subscription.accept, accept: subscription.accept,
rate: subscription.rate, rate: subscription.rate,
state: subscription.state, state: subscription.state,
@ -42,7 +43,7 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
public async get(id: string): Promise<SubscriptionInfo<T> | undefined> { public async get(id: string): Promise<SubscriptionInfo<T> | undefined> {
const info = await this.storage.get(id); const info = await this.storage.get(id);
if (info && this.isSubscriptionInfo(info)) { if (info && this.isSubscriptionInfo(info)) {
if (typeof info.expiration === 'number' && info.expiration < Date.now()) { if (typeof info.endAt === 'number' && info.endAt < Date.now()) {
this.logger.info(`Subscription ${id} has expired.`); this.logger.info(`Subscription ${id} has expired.`);
await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => { await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
await this.deleteInfo(info); await this.deleteInfo(info);

View File

@ -45,10 +45,16 @@ export class ListeningActivityHandler extends StaticHandler {
continue; continue;
} }
// Don't emit if the previous notification was too recent according to the requested rate
if (info.rate && info.rate > Date.now() - info.lastEmit) { if (info.rate && info.rate > Date.now() - info.lastEmit) {
continue; continue;
} }
// Don't emit if we have not yet reached the requested starting time
if (info.startAt && info.startAt > Date.now()) {
continue;
}
// No need to wait on this to resolve before going to the next subscription. // No need to wait on this to resolve before going to the next subscription.
// Prevent failed notification from blocking other notifications. // Prevent failed notification from blocking other notifications.
this.handler.handleSafe({ info, activity, topic }).catch((error): void => { this.handler.handleSafe({ info, activity, topic }).catch((error): void => {

View File

@ -79,9 +79,9 @@ export class NotificationSubscriber extends OperationHttpHandler {
} }
if (this.maxDuration) { if (this.maxDuration) {
const duration = (subscription.expiration ?? Number.POSITIVE_INFINITY) - Date.now(); const duration = (subscription.endAt ?? Number.POSITIVE_INFINITY) - Date.now();
if (duration > this.maxDuration) { if (duration > this.maxDuration) {
subscription.expiration = Date.now() + this.maxDuration; subscription.endAt = Date.now() + this.maxDuration;
} }
} }

View File

@ -16,7 +16,10 @@ export const SUBSCRIBE_SCHEMA = object({
type: string().required(), type: string().required(),
topic: string().required(), topic: string().required(),
state: string().optional(), state: string().optional(),
expiration: number().transform((value, original): number | undefined => startAt: number().transform((value, original): number | undefined =>
// Convert the date string to milliseconds
Date.parse(original)).optional(),
endAt: number().transform((value, original): number | undefined =>
// Convert the date string to milliseconds // Convert the date string to milliseconds
Date.parse(original)).optional(), Date.parse(original)).optional(),
rate: number().transform((value, original): number | undefined => rate: number().transform((value, original): number | undefined =>

View File

@ -9,7 +9,8 @@ export type SubscriptionInfo<T = Record<string, unknown>> = {
id: string; id: string;
topic: string; topic: string;
type: string; type: string;
expiration?: number; startAt?: number;
endAt?: number;
accept?: string; accept?: string;
rate?: number; rate?: number;
state?: string; state?: string;

View File

@ -70,7 +70,7 @@ describe('A KeyValueSubscriptionStorage', (): void => {
}); });
it('deletes expired info.', async(): Promise<void> => { it('deletes expired info.', async(): Promise<void> => {
info.expiration = 0; info.endAt = 0;
await storage.add(info); await storage.add(info);
await expect(storage.get(info.id)).resolves.toBeUndefined(); await expect(storage.get(info.id)).resolves.toBeUndefined();
expect(internalMap.size).toBe(0); expect(internalMap.size).toBe(0);

View File

@ -71,6 +71,17 @@ describe('A ListeningActivityHandler', (): void => {
expect(logger.error).toHaveBeenCalledTimes(0); expect(logger.error).toHaveBeenCalledTimes(0);
}); });
it('does not emit an event on subscriptions if their start time has not been reached.', async(): Promise<void> => {
info.startAt = Date.now() + 100000;
emitter.emit('changed', topic, activity);
await flushPromises();
expect(notificationHandler.handleSafe).toHaveBeenCalledTimes(0);
expect(logger.error).toHaveBeenCalledTimes(0);
});
it('does not stop if one subscription causes an error.', async(): Promise<void> => { it('does not stop if one subscription causes an error.', async(): Promise<void> => {
storage.getAll.mockResolvedValue([ info.id, info.id ]); storage.getAll.mockResolvedValue([ info.id, info.id ]);
notificationHandler.handleSafe.mockRejectedValueOnce(new Error('bad input')); notificationHandler.handleSafe.mockRejectedValueOnce(new Error('bad input'));

View File

@ -110,25 +110,25 @@ describe('A NotificationSubscriber', (): void => {
await subscriber.handle({ operation, request, response }); await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + (60 * 60 * 1000), endAt: Date.now() + (60 * 60 * 1000),
}), { public: {}}); }), { public: {}});
operation.body.data = guardedStreamFrom(JSON.stringify({ operation.body.data = guardedStreamFrom(JSON.stringify({
...subscriptionBody, ...subscriptionBody,
expiration: new Date(Date.now() + 99999999999999).toISOString(), endAt: new Date(Date.now() + 99999999999999).toISOString(),
})); }));
await subscriber.handle({ operation, request, response }); await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + (60 * 60 * 1000), endAt: Date.now() + (60 * 60 * 1000),
}), { public: {}}); }), { public: {}});
operation.body.data = guardedStreamFrom(JSON.stringify({ operation.body.data = guardedStreamFrom(JSON.stringify({
...subscriptionBody, ...subscriptionBody,
expiration: new Date(Date.now() + 5).toISOString(), endAt: new Date(Date.now() + 5).toISOString(),
})); }));
await subscriber.handle({ operation, request, response }); await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + 5, endAt: Date.now() + 5,
}), { public: {}}); }), { public: {}});
jest.useRealTimers(); jest.useRealTimers();

View File

@ -40,16 +40,29 @@ describe('A Subscription', (): void => {
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true); await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true);
}); });
it('converts the expiration date to a number.', async(): Promise<void> => { it('converts the start date to a number.', async(): Promise<void> => {
const date = '1988-03-09T14:48:00.000Z'; const date = '1988-03-09T14:48:00.000Z';
const ms = Date.parse(date); const ms = Date.parse(date);
const subscription: unknown = { const subscription: unknown = {
...validSubscription, ...validSubscription,
expiration: date, startAt: date,
}; };
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({ await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
expiration: ms, startAt: ms,
}));
});
it('converts the end date to a number.', async(): Promise<void> => {
const date = '1988-03-09T14:48:00.000Z';
const ms = Date.parse(date);
const subscription: unknown = {
...validSubscription,
endAt: date,
};
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
endAt: ms,
})); }));
}); });

View File

@ -44,7 +44,8 @@ describe('A WebHookSubscription2021', (): void => {
topic: 'https://storage.example/resource', topic: 'https://storage.example/resource',
target, target,
state: undefined, state: undefined,
expiration: undefined, startAt: undefined,
endAt: undefined,
accept: undefined, accept: undefined,
rate: undefined, rate: undefined,
}; };

View File

@ -22,7 +22,8 @@ describe('A WebSocketSubscription2021', (): void => {
type: 'WebSocketSubscription2021', type: 'WebSocketSubscription2021',
topic: 'https://storage.example/resource', topic: 'https://storage.example/resource',
state: undefined, state: undefined,
expiration: undefined, startAt: undefined,
endAt: undefined,
accept: undefined, accept: undefined,
rate: undefined, rate: undefined,
}; };