diff --git a/src/util/GuardedStream.ts b/src/util/GuardedStream.ts index 6b55376bb..e64fad6f3 100644 --- a/src/util/GuardedStream.ts +++ b/src/util/GuardedStream.ts @@ -1,27 +1,17 @@ import { getLoggerFor } from '../logging/LogUtil'; -// Until Typescript adds Nominal types this is how to do it. -// This can be generalized should we need it in the future for other cases. -// Also using the guard to store the intermediate variables. -// See the following issue: -// https://github.com/microsoft/TypeScript/issues/202 - const logger = getLoggerFor('GuardedStream'); // Using symbols to make sure we don't override existing parameters -const guard = Symbol('guard'); -const errorGuard = Symbol('error'); -const timeoutGuard = Symbol('timeout'); +const guardedErrors = Symbol('guardedErrors'); +const guardedTimeout = Symbol('guardedTimeout'); -// Class used to guard streams +let attachDefaultErrorListener: (this: Guarded, event: string) => void; + +// Private fields for guarded streams class Guard { - protected [guard]: boolean; -} - -// Hidden interface for guard-related variables -interface StoredErrorStream extends NodeJS.EventEmitter { - [errorGuard]?: Error; - [timeoutGuard]?: NodeJS.Timeout; + private [guardedErrors]: Error[]; + private [guardedTimeout]?: NodeJS.Timeout; } /** @@ -29,36 +19,71 @@ interface StoredErrorStream extends NodeJS.EventEmitter { * If an error occurs while no listener is attached, * it will store the error and emit it once a listener is added (or a timeout occurs). */ -export type Guarded = T & Guard; +export type Guarded = T & Guard; + +/** + * Determines whether the stream is guarded from emitting errors. + */ +export const isGuarded = (stream: T): stream is Guarded => guardedErrors in stream; + +/** + * Makes sure that listeners always receive the error event of a stream, + * even if it was thrown before the listener was attached. + * If the input is already guarded nothing will happen. + * @param stream - Stream that can potentially throw an error. + * + * @returns The stream. + */ +export const guardStream = (stream: T): Guarded => { + const guarded = stream as Guarded; + if (!isGuarded(stream)) { + guarded[guardedErrors] = []; + attachDefaultErrorListener.call(guarded, 'error'); + } + return guarded; +}; /** * Callback that is used when a stream emits an error and no error listener is attached. * Used to store the error and start the logger timer. */ -const defaultErrorListener = function(this: StoredErrorStream, err: Error): void { - this[errorGuard] = err; - this[timeoutGuard] = setTimeout((): void => { - logger.error(`No error listener was attached but error was thrown: ${err.message}`); - }, 1000); +const defaultErrorListener = function(this: Guarded, error: Error): void { + this[guardedErrors].push(error); + if (!this[guardedTimeout]) { + this[guardedTimeout] = setTimeout((): void => { + const message = `No error listener was attached but error was thrown: ${error.message}`; + logger.error(message, { error }); + }, 1000); + } }; -let attachDefaultErrorListener: (this: StoredErrorStream, event: string) => void; - /** * Callback that is used when a new listener is attached to remove the current error-related fallback functions, * or to emit an error if one was thrown in the meantime. */ -const removeDefaultErrorListener = function(this: StoredErrorStream, event: string, listener: (err: Error) => void): +const removeDefaultErrorListener = function(this: Guarded, event: string): void { if (event === 'error') { + // Remove default guard listeners (but reattach when all error listeners are removed) this.removeListener('error', defaultErrorListener); this.removeListener('newListener', removeDefaultErrorListener); - this.on('removeListener', attachDefaultErrorListener); - if (this[timeoutGuard]) { - clearTimeout(this[timeoutGuard]!); + this.addListener('removeListener', attachDefaultErrorListener); + + // Cancel an error timeout + if (this[guardedTimeout]) { + clearTimeout(this[guardedTimeout]!); + this[guardedTimeout] = undefined; } - if (this[errorGuard]) { - setImmediate((): void => listener(this[errorGuard]!)); + + // Emit any errors that were guarded + const errors = this[guardedErrors]; + if (errors.length > 0) { + this[guardedErrors] = []; + setImmediate((): void => { + for (const error of errors) { + this.emit('error', error); + } + }); } } }; @@ -67,31 +92,10 @@ void { * Callback that is used to make sure the error-related fallback functions are re-applied * when all error listeners are removed. */ -attachDefaultErrorListener = function(this: StoredErrorStream, event: string): void { +attachDefaultErrorListener = function(this: Guarded, event: string): void { if (event === 'error' && this.listenerCount('error') === 0) { - this.on('error', defaultErrorListener); - this.on('newListener', removeDefaultErrorListener); + this.addListener('error', defaultErrorListener); + this.addListener('newListener', removeDefaultErrorListener); this.removeListener('removeListener', attachDefaultErrorListener); } }; - -/** - * Makes sure that listeners always receive the error event of a stream, - * even if it was thrown before the listener was attached. - * If the input is already guarded nothing will happen. - * @param stream - Stream that can potentially throw an error. - * - * @returns The wrapped stream. - */ -export const guardStream = (stream: T): Guarded => { - const guarded = stream as Guarded; - if (guarded[guard]) { - return guarded; - } - - guarded.on('error', defaultErrorListener); - guarded.on('newListener', removeDefaultErrorListener); - - guarded[guard] = true; - return guarded; -}; diff --git a/test/unit/util/GuardedStream.test.ts b/test/unit/util/GuardedStream.test.ts index 835c0d759..a46f8efc7 100644 --- a/test/unit/util/GuardedStream.test.ts +++ b/test/unit/util/GuardedStream.test.ts @@ -1,11 +1,31 @@ import { Readable } from 'stream'; -import { guardStream } from '../../../src/util/GuardedStream'; +import type { Logger } from '../../../src/logging/Logger'; +import { getLoggerFor } from '../../../src/logging/LogUtil'; +import { guardStream, isGuarded } from '../../../src/util/GuardedStream'; import { readableToString } from '../../../src/util/StreamUtil'; +jest.mock('../../../src/logging/LogUtil', (): any => { + const logger: Logger = { error: jest.fn() } as any; + return { getLoggerFor: (): Logger => logger }; +}); +const logger: jest.Mocked = getLoggerFor('GuardedStream') as any; + +jest.useFakeTimers(); + describe('GuardedStream', (): void => { + beforeEach((): void => { + jest.clearAllMocks(); + }); + describe('#guardStream', (): void => { it('has no effect if no error is thrown.', async(): Promise => { - const stream = guardStream(Readable.from([ 'data' ])); + const stream = Readable.from([ 'data' ]); + expect(isGuarded(stream)).toBe(false); + const guarded = guardStream(stream); + expect(guarded).toBe(stream); + expect(isGuarded(stream)).toBe(true); + expect(isGuarded(guarded)).toBe(true); + const listen = new Promise((resolve, reject): void => { stream.on('end', resolve); stream.on('error', reject); @@ -15,7 +35,16 @@ describe('GuardedStream', (): void => { }); it('returns the stream if it is already guarded.', async(): Promise => { - const stream = guardStream(guardStream(Readable.from([ 'data' ]))); + const stream = Readable.from([ 'data' ]); + expect(isGuarded(stream)).toBe(false); + const guarded = guardStream(stream); + expect(guarded).toBe(stream); + expect(isGuarded(stream)).toBe(true); + expect(isGuarded(guarded)).toBe(true); + expect(guardStream(guarded)).toBe(stream); + expect(isGuarded(stream)).toBe(true); + expect(isGuarded(guarded)).toBe(true); + expect(stream.listenerCount('error')).toBe(1); expect(stream.listenerCount('newListener')).toBe(1); expect(stream.listenerCount('removeListener')).toBe(0); @@ -28,9 +57,8 @@ describe('GuardedStream', (): void => { await expect(listen).resolves.toBeUndefined(); }); - it('still emits errors when they happen.', async(): Promise => { - let stream = Readable.from([ 'data' ]); - stream = guardStream(stream); + it('emits errors when listeners are currently attached.', async(): Promise => { + const stream = guardStream(Readable.from([ 'data' ])); const listen = new Promise((resolve, reject): void => { stream.on('end', resolve); stream.on('error', reject); @@ -39,20 +67,51 @@ describe('GuardedStream', (): void => { await expect(listen).rejects.toThrow(new Error('error')); }); - it('emits old errors when new listeners are attached.', async(): Promise => { - let stream = Readable.from([ 'data' ]); - stream = guardStream(stream); + it('emits guarded errors when new listeners are attached.', async(): Promise => { + const errors = [ new Error('0'), new Error('1') ]; + const stream = guardStream(Readable.from([ 'data' ])); + stream.emit('error', errors[0]); + stream.emit('error', errors[1]); + + const errorListeners = [ jest.fn(), jest.fn(), jest.fn() ]; + stream.addListener('error', errorListeners[0]); + stream.addListener('error', errorListeners[1]); + stream.addListener('error', errorListeners[2]); + const endListener = jest.fn(); + stream.addListener('end', endListener); + + expect(errorListeners[0]).toHaveBeenCalledTimes(0); + expect(errorListeners[1]).toHaveBeenCalledTimes(0); + expect(errorListeners[2]).toHaveBeenCalledTimes(0); + expect(endListener).toHaveBeenCalledTimes(0); + + await new Promise((resolve): any => setImmediate(resolve)); + + expect(errorListeners[0]).toHaveBeenCalledTimes(2); + expect(errorListeners[0]).toHaveBeenNthCalledWith(1, errors[0]); + expect(errorListeners[0]).toHaveBeenNthCalledWith(2, errors[1]); + expect(errorListeners[1]).toHaveBeenCalledTimes(2); + expect(errorListeners[1]).toHaveBeenNthCalledWith(1, errors[0]); + expect(errorListeners[1]).toHaveBeenNthCalledWith(2, errors[1]); + expect(errorListeners[2]).toHaveBeenCalledTimes(2); + expect(errorListeners[1]).toHaveBeenNthCalledWith(1, errors[0]); + expect(errorListeners[1]).toHaveBeenNthCalledWith(2, errors[1]); + expect(endListener).toHaveBeenCalledTimes(0); + }); + + it('does not time out when a listener was already attached.', async(): Promise => { + const stream = Readable.from([ 'data' ]); + stream.addListener('error', jest.fn()); + guardStream(stream); + stream.emit('error', new Error('error')); - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - await expect(listen).rejects.toThrow(new Error('error')); + + jest.advanceTimersByTime(1000); + expect(logger.error).toHaveBeenCalledTimes(0); }); it('still works if error listeners get removed and added again.', async(): Promise => { - let stream = Readable.from([ 'data' ]); - stream = guardStream(stream); + const stream = guardStream(Readable.from([ 'data' ])); // Make sure no unneeded listeners stay attached const errorCb = jest.fn(); @@ -81,21 +140,28 @@ describe('GuardedStream', (): void => { }); it('logs a warning if nobody listens to the error.', async(): Promise => { - jest.useFakeTimers(); + const error = new Error('failure'); + const stream = guardStream(Readable.from([ 'data' ])); + stream.emit('error', error); - let stream = Readable.from([ 'data' ]); - stream = guardStream(stream); - stream.emit('error', new Error('error')); + jest.advanceTimersByTime(100); + stream.emit('error', new Error('other')); + stream.emit('error', new Error('other')); + + jest.advanceTimersByTime(900); + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith( + 'No error listener was attached but error was thrown: failure', { error }, + ); jest.advanceTimersByTime(1000); + expect(logger.error).toHaveBeenCalledTimes(1); const listen = new Promise((resolve, reject): void => { stream.on('end', resolve); stream.on('error', reject); }); - await expect(listen).rejects.toThrow(new Error('error')); - - // No idea how to access the logger with mocks unfortunately + await expect(listen).rejects.toThrow(error); }); }); });