From 1a30b514610fb9cf351cb42fbd0fefc87948920d Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Tue, 17 Nov 2020 11:44:48 +0100 Subject: [PATCH] feat: Create function to wrap streams to not lose errors --- src/util/GuardedStream.ts | 97 +++++++++++++++++++++++++ src/util/StreamUtil.ts | 18 ++++- test/unit/util/GuardedStream.test.ts | 101 +++++++++++++++++++++++++++ test/unit/util/StreamUtil.test.ts | 9 ++- 4 files changed, 221 insertions(+), 4 deletions(-) create mode 100644 src/util/GuardedStream.ts create mode 100644 test/unit/util/GuardedStream.test.ts diff --git a/src/util/GuardedStream.ts b/src/util/GuardedStream.ts new file mode 100644 index 000000000..4be44e393 --- /dev/null +++ b/src/util/GuardedStream.ts @@ -0,0 +1,97 @@ +import { getLoggerFor } from '../logging/LogUtil'; + +// Until Typescript adds Nominal types this is how to do it. +// This can be generalized should we need it in the future for other cases. +// Also using the guard to store the intermediate variables. +// See the following issue: +// https://github.com/microsoft/TypeScript/issues/202 + +const logger = getLoggerFor('GuardedStream'); + +// Using symbols to make sure we don't override existing parameters +const guard = Symbol('guard'); +const errorGuard = Symbol('error'); +const timeoutGuard = Symbol('timeout'); + +// Class used to guard streams +class Guard { + protected [guard]: boolean; +} + +// Hidden interface for guard-related variables +interface StoredErrorStream extends NodeJS.EventEmitter { + [errorGuard]?: Error; + [timeoutGuard]?: NodeJS.Timeout; +} + +/** + * A stream that is guarded. + * This means that if this stream emits an error before a listener is attached, + * it will store the error and emit it once a listener is added. + */ +export type Guarded = T & Guard; + +/** + * Callback that is used when a stream emits an error and no error listener is attached. + * Used to store the error and start the logger timer. + */ +const defaultErrorListener = function(this: StoredErrorStream, err: Error): void { + this[errorGuard] = err; + this[timeoutGuard] = setTimeout((): void => { + logger.error(`No error listener was attached but error was thrown: ${err.message}`); + }, 1000); +}; + +let attachDefaultErrorListener: (this: StoredErrorStream, event: string) => void; + +/** + * Callback that is used when a new listener is attached to remove the current error-related fallback functions, + * or to emit an error if one was thrown in the meantime. + */ +const removeDefaultErrorListener = function(this: StoredErrorStream, event: string, listener: (err: Error) => void): +void { + if (event === 'error') { + this.removeListener('error', defaultErrorListener); + this.removeListener('newListener', removeDefaultErrorListener); + this.on('removeListener', attachDefaultErrorListener); + if (this[timeoutGuard]) { + clearTimeout(this[timeoutGuard]!); + } + if (this[errorGuard]) { + setImmediate((): void => listener(this[errorGuard]!)); + } + } +}; + +/** + * Callback that is used to make sure the error-related fallback functions are re-applied + * when all error listeners are removed. + */ +attachDefaultErrorListener = function(this: StoredErrorStream, event: string): void { + if (event === 'error' && this.listenerCount('error') === 0) { + this.on('error', defaultErrorListener); + this.on('newListener', removeDefaultErrorListener); + this.removeListener('removeListener', attachDefaultErrorListener); + } +}; + +/** + * Makes sure that listeners always receive the error event of a stream, + * even if it was thrown before the listener was attached. + * If the input is already guarded nothing will happen. + * @param stream - Stream that can potentially throw an error. + * + * @returns The wrapped stream. + */ +export const guardStream = (stream: T): Guarded => { + const guarded = stream as Guarded; + if (guarded[guard]) { + return guarded; + } + + guarded.on('error', defaultErrorListener); + guarded.on('newListener', removeDefaultErrorListener); + + guarded[guard] = true; + return guarded; +}; diff --git a/src/util/StreamUtil.ts b/src/util/StreamUtil.ts index f836e1bc7..e5198cb13 100644 --- a/src/util/StreamUtil.ts +++ b/src/util/StreamUtil.ts @@ -1,6 +1,9 @@ -import type { Readable, Writable } from 'stream'; +import type { Writable, ReadableOptions } from 'stream'; +import { Readable } from 'stream'; import arrayifyStream from 'arrayify-stream'; import { getLoggerFor } from '../logging/LogUtil'; +import type { Guarded } from './GuardedStream'; +import { guardStream } from './GuardedStream'; const logger = getLoggerFor('StreamUtil'); @@ -15,6 +18,7 @@ export const readableToString = async(stream: Readable): Promise => (awa /** * Pipes one stream into another and emits errors of the first stream with the second. * In case of an error in the first stream the second one will be destroyed with the given error. + * This will also make the stream {@link Guarded}. * @param readable - Initial readable stream. * @param destination - The destination for writing data. * @param mapError - Optional function that takes the error and converts it to a new error. @@ -22,7 +26,7 @@ export const readableToString = async(stream: Readable): Promise => (awa * @returns The destination stream. */ export const pipeSafely = (readable: NodeJS.ReadableStream, destination: T, - mapError?: (error: Error) => Error): T => { + mapError?: (error: Error) => Error): Guarded => { // Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error readable.pipe(destination); readable.on('error', (error): void => { @@ -34,5 +38,13 @@ export const pipeSafely = (readable: NodeJS.ReadableStream, // in order to prevent memory leaks." destination.destroy(mapError ? mapError(error) : error); }); - return destination; + return guardStream(destination); }; + +/** + * Converts an iterable to a stream and applies an error guard so that it is {@link Guarded}. + * @param iterable - Data to stream. + * @param options - Options to pass to the Readable constructor. See {@link Readable.from}. + */ +export const guardedStreamFrom = (iterable: Iterable, options?: ReadableOptions): Guarded => + guardStream(Readable.from(iterable, options)); diff --git a/test/unit/util/GuardedStream.test.ts b/test/unit/util/GuardedStream.test.ts new file mode 100644 index 000000000..835c0d759 --- /dev/null +++ b/test/unit/util/GuardedStream.test.ts @@ -0,0 +1,101 @@ +import { Readable } from 'stream'; +import { guardStream } from '../../../src/util/GuardedStream'; +import { readableToString } from '../../../src/util/StreamUtil'; + +describe('GuardedStream', (): void => { + describe('#guardStream', (): void => { + it('has no effect if no error is thrown.', async(): Promise => { + const stream = guardStream(Readable.from([ 'data' ])); + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + await expect(readableToString(stream)).resolves.toBe('data'); + await expect(listen).resolves.toBeUndefined(); + }); + + it('returns the stream if it is already guarded.', async(): Promise => { + const stream = guardStream(guardStream(Readable.from([ 'data' ]))); + expect(stream.listenerCount('error')).toBe(1); + expect(stream.listenerCount('newListener')).toBe(1); + expect(stream.listenerCount('removeListener')).toBe(0); + + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + await expect(readableToString(stream)).resolves.toBe('data'); + await expect(listen).resolves.toBeUndefined(); + }); + + it('still emits errors when they happen.', async(): Promise => { + let stream = Readable.from([ 'data' ]); + stream = guardStream(stream); + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + stream.emit('error', new Error('error')); + await expect(listen).rejects.toThrow(new Error('error')); + }); + + it('emits old errors when new listeners are attached.', async(): Promise => { + let stream = Readable.from([ 'data' ]); + stream = guardStream(stream); + stream.emit('error', new Error('error')); + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + await expect(listen).rejects.toThrow(new Error('error')); + }); + + it('still works if error listeners get removed and added again.', async(): Promise => { + let stream = Readable.from([ 'data' ]); + stream = guardStream(stream); + + // Make sure no unneeded listeners stay attached + const errorCb = jest.fn(); + const errorCb2 = jest.fn(); + stream.on('error', errorCb); + stream.on('error', errorCb2); + expect(stream.listenerCount('error')).toBe(2); + expect(stream.listenerCount('newListener')).toBe(0); + expect(stream.listenerCount('removeListener')).toBe(1); + stream.removeListener('error', errorCb2); + expect(stream.listenerCount('error')).toBe(1); + expect(stream.listenerCount('newListener')).toBe(0); + expect(stream.listenerCount('removeListener')).toBe(1); + stream.removeListener('error', errorCb); + expect(stream.listenerCount('error')).toBe(1); + expect(stream.listenerCount('newListener')).toBe(1); + expect(stream.listenerCount('removeListener')).toBe(0); + + stream.emit('error', new Error('error')); + + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + await expect(listen).rejects.toThrow(new Error('error')); + }); + + it('logs a warning if nobody listens to the error.', async(): Promise => { + jest.useFakeTimers(); + + let stream = Readable.from([ 'data' ]); + stream = guardStream(stream); + stream.emit('error', new Error('error')); + + jest.advanceTimersByTime(1000); + + const listen = new Promise((resolve, reject): void => { + stream.on('end', resolve); + stream.on('error', reject); + }); + await expect(listen).rejects.toThrow(new Error('error')); + + // No idea how to access the logger with mocks unfortunately + }); + }); +}); diff --git a/test/unit/util/StreamUtil.test.ts b/test/unit/util/StreamUtil.test.ts index e0f0a8baf..42beb5bc8 100644 --- a/test/unit/util/StreamUtil.test.ts +++ b/test/unit/util/StreamUtil.test.ts @@ -1,6 +1,6 @@ import { PassThrough } from 'stream'; import streamifyArray from 'streamify-array'; -import { pipeSafely, readableToString } from '../../../src/util/StreamUtil'; +import { guardedStreamFrom, pipeSafely, readableToString } from '../../../src/util/StreamUtil'; describe('StreamUtil', (): void => { describe('#readableToString', (): void => { @@ -40,4 +40,11 @@ describe('StreamUtil', (): void => { await expect(readableToString(piped)).rejects.toThrow(new Error('other error')); }); }); + + describe('#guardedStreamFrom', (): void => { + it('converts data to a guarded stream.', async(): Promise => { + const data = [ 'a', 'b' ]; + await expect(readableToString(guardedStreamFrom(data))).resolves.toBe('ab'); + }); + }); });