mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
fix: Ensure streaming HTTP streams the whole notification in a single chunk
This commit is contained in:
committed by
Joachim Van Herwegen
parent
576eefede6
commit
3dd8602acc
@@ -1,6 +1,7 @@
|
||||
import { getLoggerFor } from '../../../logging/LogUtil';
|
||||
import type { Representation } from '../../../http/representation/Representation';
|
||||
import { AsyncHandler } from '../../../util/handlers/AsyncHandler';
|
||||
import { readableToString } from '../../../util/StreamUtil';
|
||||
import type { NotificationChannel } from '../NotificationChannel';
|
||||
import type { StreamingHttpMap } from './StreamingHttpMap';
|
||||
|
||||
@@ -27,8 +28,10 @@ export class StreamingHttp2023Emitter extends AsyncHandler<StreamingHttpEmitterI
|
||||
// Called as a NotificationEmitter: emit the notification
|
||||
const streams = this.streamMap.get(channel.topic);
|
||||
if (streams) {
|
||||
// Ensure that the whole notification gets sent in a single chunk
|
||||
const chunk = await readableToString(representation.data);
|
||||
for (const stream of streams) {
|
||||
representation.data.pipe(stream, { end: false });
|
||||
stream.write(chunk);
|
||||
}
|
||||
} else {
|
||||
representation.data.destroy();
|
||||
|
||||
@@ -15,6 +15,7 @@ import { IdentifierSetMultiMap } from '../../../util/map/IdentifierMap';
|
||||
import { createErrorMessage } from '../../../util/errors/ErrorUtil';
|
||||
import type { NotificationGenerator } from '../generate/NotificationGenerator';
|
||||
import type { NotificationSerializer } from '../serialize/NotificationSerializer';
|
||||
import { readableToString } from '../../../util/StreamUtil';
|
||||
import type { StreamingHttpMap } from './StreamingHttpMap';
|
||||
import { generateChannel } from './StreamingHttp2023Util';
|
||||
|
||||
@@ -54,7 +55,9 @@ export class StreamingHttpRequestHandler extends OperationHttpHandler {
|
||||
try {
|
||||
const notification = await this.generator.handle({ channel, topic: { path: topic }});
|
||||
const representation = await this.serializer.handleSafe({ channel, notification });
|
||||
representation.data.pipe(stream, { end: false });
|
||||
// Ensure that the whole notification gets sent in a single chunk
|
||||
const chunk = await readableToString(representation.data);
|
||||
stream.write(chunk);
|
||||
} catch (error: unknown) {
|
||||
this.logger.error(`Problem emitting initial notification: ${createErrorMessage(error)}`);
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import {
|
||||
StreamingHttp2023Emitter,
|
||||
} from '../../../../../src/server/notifications/StreamingHttpChannel2023/StreamingHttp2023Emitter';
|
||||
import { WrappedSetMultiMap } from '../../../../../src/util/map/WrappedSetMultiMap';
|
||||
import type { StreamingHttpMap } from '../../../../../src';
|
||||
import type { Representation, StreamingHttpMap } from '../../../../../src';
|
||||
|
||||
describe('A StreamingHttp2023Emitter', (): void => {
|
||||
const channel: NotificationChannel = {
|
||||
@@ -13,32 +13,31 @@ describe('A StreamingHttp2023Emitter', (): void => {
|
||||
topic: 'http://example.com/foo',
|
||||
type: 'type',
|
||||
};
|
||||
const chunk = 'notification';
|
||||
|
||||
let stream: jest.Mocked<PassThrough>;
|
||||
let streamMap: StreamingHttpMap;
|
||||
let emitter: StreamingHttp2023Emitter;
|
||||
let representation: BasicRepresentation;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
stream = jest.mocked(new PassThrough());
|
||||
|
||||
streamMap = new WrappedSetMultiMap();
|
||||
|
||||
emitter = new StreamingHttp2023Emitter(streamMap);
|
||||
representation = new BasicRepresentation(chunk, 'text/plain');
|
||||
});
|
||||
|
||||
it('emits notifications to the stored Streams.', async(): Promise<void> => {
|
||||
streamMap.add(channel.topic, stream);
|
||||
|
||||
const representation = new BasicRepresentation('notification', 'text/plain');
|
||||
const spy = jest.spyOn(representation.data, 'pipe');
|
||||
const spy = jest.spyOn(stream, 'write');
|
||||
await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined();
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
expect(spy).toHaveBeenLastCalledWith(stream, { end: false });
|
||||
expect(spy).toHaveBeenLastCalledWith(chunk);
|
||||
});
|
||||
|
||||
it('destroys the representation if there is no matching Stream.', async(): Promise<void> => {
|
||||
const representation = new BasicRepresentation('notification', 'text/plain');
|
||||
const spy = jest.spyOn(representation.data, 'pipe');
|
||||
const spy = jest.spyOn(stream, 'write');
|
||||
await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined();
|
||||
expect(spy).toHaveBeenCalledTimes(0);
|
||||
expect(representation.data.destroyed).toBe(true);
|
||||
@@ -50,12 +49,13 @@ describe('A StreamingHttp2023Emitter', (): void => {
|
||||
streamMap.add(channel.topic, stream);
|
||||
streamMap.add(channel.topic, stream2);
|
||||
|
||||
const representation = new BasicRepresentation('notification', 'text/plain');
|
||||
const spy = jest.spyOn(representation.data, 'pipe');
|
||||
const spy = jest.spyOn(stream, 'write');
|
||||
const spy2 = jest.spyOn(stream2, 'write');
|
||||
await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined();
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
expect(spy).toHaveBeenCalledWith(stream, { end: false });
|
||||
expect(spy).toHaveBeenLastCalledWith(stream2, { end: false });
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
expect(spy).toHaveBeenCalledWith(chunk);
|
||||
expect(spy2).toHaveBeenCalledTimes(1);
|
||||
expect(spy2).toHaveBeenCalledWith(chunk);
|
||||
});
|
||||
|
||||
it('only writes to the matching topic Streams.', async(): Promise<void> => {
|
||||
@@ -69,10 +69,27 @@ describe('A StreamingHttp2023Emitter', (): void => {
|
||||
streamMap.add(channel.topic, stream);
|
||||
streamMap.add(channel2.topic, stream2);
|
||||
|
||||
const representation = new BasicRepresentation('notification', 'text/plain');
|
||||
const spy = jest.spyOn(representation.data, 'pipe');
|
||||
const spy = jest.spyOn(stream, 'write');
|
||||
const spy2 = jest.spyOn(stream2, 'write');
|
||||
await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined();
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
expect(spy).toHaveBeenLastCalledWith(stream, { end: false });
|
||||
expect(spy).toHaveBeenLastCalledWith(chunk);
|
||||
expect(spy2).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('emits notifications in a single chunk.', async(): Promise<void> => {
|
||||
streamMap.add(channel.topic, stream);
|
||||
const serializationStream = new PassThrough();
|
||||
// Use two chunks for the serialization stream
|
||||
serializationStream.write('foo');
|
||||
serializationStream.end('bar');
|
||||
representation = {
|
||||
data: serializationStream,
|
||||
} as unknown as Representation;
|
||||
|
||||
const spy = jest.spyOn(stream, 'write');
|
||||
await expect(emitter.handle({ channel, representation })).resolves.toBeUndefined();
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
expect(spy).toHaveBeenLastCalledWith('foobar');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { PassThrough } from 'node:stream';
|
||||
import type { CredentialsExtractor } from '../../../../../src/authentication/CredentialsExtractor';
|
||||
import type { Authorizer } from '../../../../../src/authorization/Authorizer';
|
||||
import type { PermissionReader } from '../../../../../src/authorization/PermissionReader';
|
||||
@@ -20,6 +21,8 @@ import { StreamingHttpMap } from '../../../../../src';
|
||||
import type { Notification } from '../../../../../src/server/notifications/Notification';
|
||||
import { flushPromises } from '../../../../util/Util';
|
||||
|
||||
import * as GuardedStream from '../../../../../src/util/GuardedStream';
|
||||
|
||||
jest.mock('../../../../../src/logging/LogUtil', (): any => {
|
||||
const logger: Logger = { error: jest.fn(), debug: jest.fn() } as any;
|
||||
return { getLoggerFor: (): Logger => logger };
|
||||
@@ -45,9 +48,10 @@ describe('A StreamingHttpRequestHandler', (): void => {
|
||||
published: '123',
|
||||
state: '"123456-text/turtle"',
|
||||
};
|
||||
const representation = new BasicRepresentation();
|
||||
const chunk = 'notification';
|
||||
const request: HttpRequest = {} as any;
|
||||
const response: HttpResponse = {} as any;
|
||||
let representation: BasicRepresentation;
|
||||
let streamMap: StreamingHttpMap;
|
||||
let operation: Operation;
|
||||
let generator: jest.Mocked<NotificationGenerator>;
|
||||
@@ -64,6 +68,7 @@ describe('A StreamingHttpRequestHandler', (): void => {
|
||||
body: new BasicRepresentation(),
|
||||
preferences: {},
|
||||
};
|
||||
representation = new BasicRepresentation(chunk, 'text/plain');
|
||||
|
||||
streamMap = new StreamingHttpMap();
|
||||
|
||||
@@ -129,10 +134,33 @@ describe('A StreamingHttpRequestHandler', (): void => {
|
||||
expect(description.data).toBeDefined();
|
||||
});
|
||||
|
||||
it('sends initial notification.', async(): Promise<void> => {
|
||||
const spy = jest.spyOn(representation.data, 'pipe');
|
||||
it('sends initial notification in a single chunk.', async(): Promise<void> => {
|
||||
const mockStream = {
|
||||
write: jest.fn(),
|
||||
on: jest.fn(),
|
||||
} as unknown as GuardedStream.Guarded<PassThrough>;
|
||||
jest.spyOn(GuardedStream, 'guardStream').mockReturnValueOnce(mockStream);
|
||||
const serializationStream = new PassThrough();
|
||||
// Use two chunks for the serialization stream
|
||||
serializationStream.write('foo');
|
||||
serializationStream.end('bar');
|
||||
serializer = {
|
||||
handleSafe: jest.fn().mockResolvedValue({
|
||||
data: serializationStream,
|
||||
}),
|
||||
} as any;
|
||||
handler = new StreamingHttpRequestHandler(
|
||||
streamMap,
|
||||
pathPrefix,
|
||||
generator,
|
||||
serializer,
|
||||
credentialsExtractor,
|
||||
permissionReader,
|
||||
authorizer,
|
||||
);
|
||||
await handler.handle({ operation, request, response });
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
expect(mockStream.write).toHaveBeenCalledTimes(1);
|
||||
expect(mockStream.write).toHaveBeenCalledWith('foobar');
|
||||
});
|
||||
|
||||
it('logs an error if sending initial notification fails.', async(): Promise<void> => {
|
||||
|
||||
Reference in New Issue
Block a user