From 3dd8602acce892b36d1ecaf584c938032e754213 Mon Sep 17 00:00:00 2001 From: elf Pavlik Date: Fri, 2 Aug 2024 15:50:07 -0600 Subject: [PATCH] fix: Ensure streaming HTTP streams the whole notification in a single chunk --- .../StreamingHttp2023Emitter.ts | 5 +- .../StreamingHttpRequestHandler.ts | 5 +- .../StreamingHttp2023Emitter.test.ts | 49 +++++++++++++------ .../StreamingHttpRequestHandler.test.ts | 36 ++++++++++++-- 4 files changed, 73 insertions(+), 22 deletions(-) diff --git a/src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter.ts b/src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter.ts index 873bd3e82..8b0c541ca 100644 --- a/src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter.ts +++ b/src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter.ts @@ -1,6 +1,7 @@ import { getLoggerFor } from '../../../logging/LogUtil'; import type { Representation } from '../../../http/representation/Representation'; import { AsyncHandler } from '../../../util/handlers/AsyncHandler'; +import { readableToString } from '../../../util/StreamUtil'; import type { NotificationChannel } from '../NotificationChannel'; import type { StreamingHttpMap } from './StreamingHttpMap'; @@ -27,8 +28,10 @@ export class StreamingHttp2023Emitter extends AsyncHandler { const channel: NotificationChannel = { @@ -13,32 +13,31 @@ describe('A StreamingHttp2023Emitter', (): void => { topic: 'http://example.com/foo', type: 'type', }; + const chunk = 'notification'; let stream: jest.Mocked; let streamMap: StreamingHttpMap; let emitter: StreamingHttp2023Emitter; + let representation: BasicRepresentation; beforeEach(async(): Promise => { stream = jest.mocked(new PassThrough()); - streamMap = new WrappedSetMultiMap(); - emitter = new StreamingHttp2023Emitter(streamMap); + representation = new BasicRepresentation(chunk, 'text/plain'); }); it('emits notifications to the stored Streams.', async(): Promise => { streamMap.add(channel.topic, stream); - const representation = new BasicRepresentation('notification', 'text/plain'); - const spy = jest.spyOn(representation.data, 'pipe'); + const spy = jest.spyOn(stream, 'write'); await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined(); expect(spy).toHaveBeenCalledTimes(1); - expect(spy).toHaveBeenLastCalledWith(stream, { end: false }); + expect(spy).toHaveBeenLastCalledWith(chunk); }); it('destroys the representation if there is no matching Stream.', async(): Promise => { - const representation = new BasicRepresentation('notification', 'text/plain'); - const spy = jest.spyOn(representation.data, 'pipe'); + const spy = jest.spyOn(stream, 'write'); await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined(); expect(spy).toHaveBeenCalledTimes(0); expect(representation.data.destroyed).toBe(true); @@ -50,12 +49,13 @@ describe('A StreamingHttp2023Emitter', (): void => { streamMap.add(channel.topic, stream); streamMap.add(channel.topic, stream2); - const representation = new BasicRepresentation('notification', 'text/plain'); - const spy = jest.spyOn(representation.data, 'pipe'); + const spy = jest.spyOn(stream, 'write'); + const spy2 = jest.spyOn(stream2, 'write'); await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined(); - expect(spy).toHaveBeenCalledTimes(2); - expect(spy).toHaveBeenCalledWith(stream, { end: false }); - expect(spy).toHaveBeenLastCalledWith(stream2, { end: false }); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith(chunk); + expect(spy2).toHaveBeenCalledTimes(1); + expect(spy2).toHaveBeenCalledWith(chunk); }); it('only writes to the matching topic Streams.', async(): Promise => { @@ -69,10 +69,27 @@ describe('A StreamingHttp2023Emitter', (): void => { streamMap.add(channel.topic, stream); streamMap.add(channel2.topic, stream2); - const representation = new BasicRepresentation('notification', 'text/plain'); - const spy = jest.spyOn(representation.data, 'pipe'); + const spy = jest.spyOn(stream, 'write'); + const spy2 = jest.spyOn(stream2, 'write'); await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined(); expect(spy).toHaveBeenCalledTimes(1); - expect(spy).toHaveBeenLastCalledWith(stream, { end: false }); + expect(spy).toHaveBeenLastCalledWith(chunk); + expect(spy2).not.toHaveBeenCalled(); + }); + + it('emits notifications in a single chunk.', async(): Promise => { + streamMap.add(channel.topic, stream); + const serializationStream = new PassThrough(); + // Use two chunks for the serialization stream + serializationStream.write('foo'); + serializationStream.end('bar'); + representation = { + data: serializationStream, + } as unknown as Representation; + + const spy = jest.spyOn(stream, 'write'); + await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined(); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenLastCalledWith('foobar'); }); }); diff --git a/test/unit/server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler.test.ts b/test/unit/server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler.test.ts index 246cb6970..756297008 100644 --- a/test/unit/server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler.test.ts +++ b/test/unit/server/notifications/StreamingHttpChannel2023/StreamingHttpRequestHandler.test.ts @@ -1,3 +1,4 @@ +import { PassThrough } from 'node:stream'; import type { CredentialsExtractor } from '../../../../../src/authentication/CredentialsExtractor'; import type { Authorizer } from '../../../../../src/authorization/Authorizer'; import type { PermissionReader } from '../../../../../src/authorization/PermissionReader'; @@ -20,6 +21,8 @@ import { StreamingHttpMap } from '../../../../../src'; import type { Notification } from '../../../../../src/server/notifications/Notification'; import { flushPromises } from '../../../../util/Util'; +import * as GuardedStream from '../../../../../src/util/GuardedStream'; + jest.mock('../../../../../src/logging/LogUtil', (): any => { const logger: Logger = { error: jest.fn(), debug: jest.fn() } as any; return { getLoggerFor: (): Logger => logger }; @@ -45,9 +48,10 @@ describe('A StreamingHttpRequestHandler', (): void => { published: '123', state: '"123456-text/turtle"', }; - const representation = new BasicRepresentation(); + const chunk = 'notification'; const request: HttpRequest = {} as any; const response: HttpResponse = {} as any; + let representation: BasicRepresentation; let streamMap: StreamingHttpMap; let operation: Operation; let generator: jest.Mocked; @@ -64,6 +68,7 @@ describe('A StreamingHttpRequestHandler', (): void => { body: new BasicRepresentation(), preferences: {}, }; + representation = new BasicRepresentation(chunk, 'text/plain'); streamMap = new StreamingHttpMap(); @@ -129,10 +134,33 @@ describe('A StreamingHttpRequestHandler', (): void => { expect(description.data).toBeDefined(); }); - it('sends initial notification.', async(): Promise => { - const spy = jest.spyOn(representation.data, 'pipe'); + it('sends initial notification in a single chunk.', async(): Promise => { + const mockStream = { + write: jest.fn(), + on: jest.fn(), + } as unknown as GuardedStream.Guarded; + jest.spyOn(GuardedStream, 'guardStream').mockReturnValueOnce(mockStream); + const serializationStream = new PassThrough(); + // Use two chunks for the serialization stream + serializationStream.write('foo'); + serializationStream.end('bar'); + serializer = { + handleSafe: jest.fn().mockResolvedValue({ + data: serializationStream, + }), + } as any; + handler = new StreamingHttpRequestHandler( + streamMap, + pathPrefix, + generator, + serializer, + credentialsExtractor, + permissionReader, + authorizer, + ); await handler.handle({ operation, request, response }); - expect(spy).toHaveBeenCalledTimes(1); + expect(mockStream.write).toHaveBeenCalledTimes(1); + expect(mockStream.write).toHaveBeenCalledWith('foobar'); }); it('logs an error if sending initial notification fails.', async(): Promise => {