mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
feat: Create function to wrap streams to not lose errors
This commit is contained in:
parent
9c78c4f9ad
commit
1a30b51461
97
src/util/GuardedStream.ts
Normal file
97
src/util/GuardedStream.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import { getLoggerFor } from '../logging/LogUtil';
|
||||||
|
|
||||||
|
// Until Typescript adds Nominal types this is how to do it.
|
||||||
|
// This can be generalized should we need it in the future for other cases.
|
||||||
|
// Also using the guard to store the intermediate variables.
|
||||||
|
// See the following issue:
|
||||||
|
// https://github.com/microsoft/TypeScript/issues/202
|
||||||
|
|
||||||
|
const logger = getLoggerFor('GuardedStream');
|
||||||
|
|
||||||
|
// Using symbols to make sure we don't override existing parameters
|
||||||
|
const guard = Symbol('guard');
|
||||||
|
const errorGuard = Symbol('error');
|
||||||
|
const timeoutGuard = Symbol('timeout');
|
||||||
|
|
||||||
|
// Class used to guard streams
|
||||||
|
class Guard {
|
||||||
|
protected [guard]: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hidden interface for guard-related variables
|
||||||
|
interface StoredErrorStream extends NodeJS.EventEmitter {
|
||||||
|
[errorGuard]?: Error;
|
||||||
|
[timeoutGuard]?: NodeJS.Timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A stream that is guarded.
|
||||||
|
* This means that if this stream emits an error before a listener is attached,
|
||||||
|
* it will store the error and emit it once a listener is added.
|
||||||
|
*/
|
||||||
|
export type Guarded<T extends NodeJS.EventEmitter> = T & Guard;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is used when a stream emits an error and no error listener is attached.
|
||||||
|
* Used to store the error and start the logger timer.
|
||||||
|
*/
|
||||||
|
const defaultErrorListener = function(this: StoredErrorStream, err: Error): void {
|
||||||
|
this[errorGuard] = err;
|
||||||
|
this[timeoutGuard] = setTimeout((): void => {
|
||||||
|
logger.error(`No error listener was attached but error was thrown: ${err.message}`);
|
||||||
|
}, 1000);
|
||||||
|
};
|
||||||
|
|
||||||
|
let attachDefaultErrorListener: (this: StoredErrorStream, event: string) => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is used when a new listener is attached to remove the current error-related fallback functions,
|
||||||
|
* or to emit an error if one was thrown in the meantime.
|
||||||
|
*/
|
||||||
|
const removeDefaultErrorListener = function(this: StoredErrorStream, event: string, listener: (err: Error) => void):
|
||||||
|
void {
|
||||||
|
if (event === 'error') {
|
||||||
|
this.removeListener('error', defaultErrorListener);
|
||||||
|
this.removeListener('newListener', removeDefaultErrorListener);
|
||||||
|
this.on('removeListener', attachDefaultErrorListener);
|
||||||
|
if (this[timeoutGuard]) {
|
||||||
|
clearTimeout(this[timeoutGuard]!);
|
||||||
|
}
|
||||||
|
if (this[errorGuard]) {
|
||||||
|
setImmediate((): void => listener(this[errorGuard]!));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback that is used to make sure the error-related fallback functions are re-applied
|
||||||
|
* when all error listeners are removed.
|
||||||
|
*/
|
||||||
|
attachDefaultErrorListener = function(this: StoredErrorStream, event: string): void {
|
||||||
|
if (event === 'error' && this.listenerCount('error') === 0) {
|
||||||
|
this.on('error', defaultErrorListener);
|
||||||
|
this.on('newListener', removeDefaultErrorListener);
|
||||||
|
this.removeListener('removeListener', attachDefaultErrorListener);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Makes sure that listeners always receive the error event of a stream,
|
||||||
|
* even if it was thrown before the listener was attached.
|
||||||
|
* If the input is already guarded nothing will happen.
|
||||||
|
* @param stream - Stream that can potentially throw an error.
|
||||||
|
*
|
||||||
|
* @returns The wrapped stream.
|
||||||
|
*/
|
||||||
|
export const guardStream = <T extends NodeJS.EventEmitter>(stream: T): Guarded<T> => {
|
||||||
|
const guarded = stream as Guarded<T>;
|
||||||
|
if (guarded[guard]) {
|
||||||
|
return guarded;
|
||||||
|
}
|
||||||
|
|
||||||
|
guarded.on('error', defaultErrorListener);
|
||||||
|
guarded.on('newListener', removeDefaultErrorListener);
|
||||||
|
|
||||||
|
guarded[guard] = true;
|
||||||
|
return guarded;
|
||||||
|
};
|
@ -1,6 +1,9 @@
|
|||||||
import type { Readable, Writable } from 'stream';
|
import type { Writable, ReadableOptions } from 'stream';
|
||||||
|
import { Readable } from 'stream';
|
||||||
import arrayifyStream from 'arrayify-stream';
|
import arrayifyStream from 'arrayify-stream';
|
||||||
import { getLoggerFor } from '../logging/LogUtil';
|
import { getLoggerFor } from '../logging/LogUtil';
|
||||||
|
import type { Guarded } from './GuardedStream';
|
||||||
|
import { guardStream } from './GuardedStream';
|
||||||
|
|
||||||
const logger = getLoggerFor('StreamUtil');
|
const logger = getLoggerFor('StreamUtil');
|
||||||
|
|
||||||
@ -15,6 +18,7 @@ export const readableToString = async(stream: Readable): Promise<string> => (awa
|
|||||||
/**
|
/**
|
||||||
* Pipes one stream into another and emits errors of the first stream with the second.
|
* Pipes one stream into another and emits errors of the first stream with the second.
|
||||||
* In case of an error in the first stream the second one will be destroyed with the given error.
|
* In case of an error in the first stream the second one will be destroyed with the given error.
|
||||||
|
* This will also make the stream {@link Guarded}.
|
||||||
* @param readable - Initial readable stream.
|
* @param readable - Initial readable stream.
|
||||||
* @param destination - The destination for writing data.
|
* @param destination - The destination for writing data.
|
||||||
* @param mapError - Optional function that takes the error and converts it to a new error.
|
* @param mapError - Optional function that takes the error and converts it to a new error.
|
||||||
@ -22,7 +26,7 @@ export const readableToString = async(stream: Readable): Promise<string> => (awa
|
|||||||
* @returns The destination stream.
|
* @returns The destination stream.
|
||||||
*/
|
*/
|
||||||
export const pipeSafely = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
|
export const pipeSafely = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
|
||||||
mapError?: (error: Error) => Error): T => {
|
mapError?: (error: Error) => Error): Guarded<T> => {
|
||||||
// Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error
|
// Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error
|
||||||
readable.pipe(destination);
|
readable.pipe(destination);
|
||||||
readable.on('error', (error): void => {
|
readable.on('error', (error): void => {
|
||||||
@ -34,5 +38,13 @@ export const pipeSafely = <T extends Writable>(readable: NodeJS.ReadableStream,
|
|||||||
// in order to prevent memory leaks."
|
// in order to prevent memory leaks."
|
||||||
destination.destroy(mapError ? mapError(error) : error);
|
destination.destroy(mapError ? mapError(error) : error);
|
||||||
});
|
});
|
||||||
return destination;
|
return guardStream(destination);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts an iterable to a stream and applies an error guard so that it is {@link Guarded}.
|
||||||
|
* @param iterable - Data to stream.
|
||||||
|
* @param options - Options to pass to the Readable constructor. See {@link Readable.from}.
|
||||||
|
*/
|
||||||
|
export const guardedStreamFrom = (iterable: Iterable<any>, options?: ReadableOptions): Guarded<Readable> =>
|
||||||
|
guardStream(Readable.from(iterable, options));
|
||||||
|
101
test/unit/util/GuardedStream.test.ts
Normal file
101
test/unit/util/GuardedStream.test.ts
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
import { Readable } from 'stream';
|
||||||
|
import { guardStream } from '../../../src/util/GuardedStream';
|
||||||
|
import { readableToString } from '../../../src/util/StreamUtil';
|
||||||
|
|
||||||
|
describe('GuardedStream', (): void => {
|
||||||
|
describe('#guardStream', (): void => {
|
||||||
|
it('has no effect if no error is thrown.', async(): Promise<void> => {
|
||||||
|
const stream = guardStream(Readable.from([ 'data' ]));
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
await expect(readableToString(stream)).resolves.toBe('data');
|
||||||
|
await expect(listen).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns the stream if it is already guarded.', async(): Promise<void> => {
|
||||||
|
const stream = guardStream(guardStream(Readable.from([ 'data' ])));
|
||||||
|
expect(stream.listenerCount('error')).toBe(1);
|
||||||
|
expect(stream.listenerCount('newListener')).toBe(1);
|
||||||
|
expect(stream.listenerCount('removeListener')).toBe(0);
|
||||||
|
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
await expect(readableToString(stream)).resolves.toBe('data');
|
||||||
|
await expect(listen).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('still emits errors when they happen.', async(): Promise<void> => {
|
||||||
|
let stream = Readable.from([ 'data' ]);
|
||||||
|
stream = guardStream(stream);
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
stream.emit('error', new Error('error'));
|
||||||
|
await expect(listen).rejects.toThrow(new Error('error'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits old errors when new listeners are attached.', async(): Promise<void> => {
|
||||||
|
let stream = Readable.from([ 'data' ]);
|
||||||
|
stream = guardStream(stream);
|
||||||
|
stream.emit('error', new Error('error'));
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
await expect(listen).rejects.toThrow(new Error('error'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('still works if error listeners get removed and added again.', async(): Promise<void> => {
|
||||||
|
let stream = Readable.from([ 'data' ]);
|
||||||
|
stream = guardStream(stream);
|
||||||
|
|
||||||
|
// Make sure no unneeded listeners stay attached
|
||||||
|
const errorCb = jest.fn();
|
||||||
|
const errorCb2 = jest.fn();
|
||||||
|
stream.on('error', errorCb);
|
||||||
|
stream.on('error', errorCb2);
|
||||||
|
expect(stream.listenerCount('error')).toBe(2);
|
||||||
|
expect(stream.listenerCount('newListener')).toBe(0);
|
||||||
|
expect(stream.listenerCount('removeListener')).toBe(1);
|
||||||
|
stream.removeListener('error', errorCb2);
|
||||||
|
expect(stream.listenerCount('error')).toBe(1);
|
||||||
|
expect(stream.listenerCount('newListener')).toBe(0);
|
||||||
|
expect(stream.listenerCount('removeListener')).toBe(1);
|
||||||
|
stream.removeListener('error', errorCb);
|
||||||
|
expect(stream.listenerCount('error')).toBe(1);
|
||||||
|
expect(stream.listenerCount('newListener')).toBe(1);
|
||||||
|
expect(stream.listenerCount('removeListener')).toBe(0);
|
||||||
|
|
||||||
|
stream.emit('error', new Error('error'));
|
||||||
|
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
await expect(listen).rejects.toThrow(new Error('error'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('logs a warning if nobody listens to the error.', async(): Promise<void> => {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
let stream = Readable.from([ 'data' ]);
|
||||||
|
stream = guardStream(stream);
|
||||||
|
stream.emit('error', new Error('error'));
|
||||||
|
|
||||||
|
jest.advanceTimersByTime(1000);
|
||||||
|
|
||||||
|
const listen = new Promise((resolve, reject): void => {
|
||||||
|
stream.on('end', resolve);
|
||||||
|
stream.on('error', reject);
|
||||||
|
});
|
||||||
|
await expect(listen).rejects.toThrow(new Error('error'));
|
||||||
|
|
||||||
|
// No idea how to access the logger with mocks unfortunately
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
@ -1,6 +1,6 @@
|
|||||||
import { PassThrough } from 'stream';
|
import { PassThrough } from 'stream';
|
||||||
import streamifyArray from 'streamify-array';
|
import streamifyArray from 'streamify-array';
|
||||||
import { pipeSafely, readableToString } from '../../../src/util/StreamUtil';
|
import { guardedStreamFrom, pipeSafely, readableToString } from '../../../src/util/StreamUtil';
|
||||||
|
|
||||||
describe('StreamUtil', (): void => {
|
describe('StreamUtil', (): void => {
|
||||||
describe('#readableToString', (): void => {
|
describe('#readableToString', (): void => {
|
||||||
@ -40,4 +40,11 @@ describe('StreamUtil', (): void => {
|
|||||||
await expect(readableToString(piped)).rejects.toThrow(new Error('other error'));
|
await expect(readableToString(piped)).rejects.toThrow(new Error('other error'));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('#guardedStreamFrom', (): void => {
|
||||||
|
it('converts data to a guarded stream.', async(): Promise<void> => {
|
||||||
|
const data = [ 'a', 'b' ];
|
||||||
|
await expect(readableToString(guardedStreamFrom(data))).resolves.toBe('ab');
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
Loading…
x
Reference in New Issue
Block a user