From 27cc1ec15ee01cddc25d57b1e1f8a7537666927c Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Tue, 12 Jan 2021 10:28:29 +0100 Subject: [PATCH] fix: Always keep guarded error listener attached --- src/util/GuardedStream.ts | 74 +++++++++------------- test/unit/util/GuardedStream.test.ts | 95 ++++++++++++++-------------- 2 files changed, 77 insertions(+), 92 deletions(-) diff --git a/src/util/GuardedStream.ts b/src/util/GuardedStream.ts index aacd48a43..a5fd0a670 100644 --- a/src/util/GuardedStream.ts +++ b/src/util/GuardedStream.ts @@ -6,8 +6,6 @@ const logger = getLoggerFor('GuardedStream'); const guardedErrors = Symbol('guardedErrors'); const guardedTimeout = Symbol('guardedTimeout'); -let attachDefaultErrorListener: (this: Guarded, event: string) => void; - // Private fields for guarded streams class Guard { private [guardedErrors]: Error[]; @@ -29,47 +27,31 @@ export function isGuarded(stream: T): stream is G } /** - * Makes sure that listeners always receive the error event of a stream, - * even if it was thrown before the listener was attached. - * If the input is already guarded nothing will happen. - * @param stream - Stream that can potentially throw an error. - * - * @returns The stream. - */ -export function guardStream(stream: T): Guarded { - const guarded = stream as Guarded; - if (!isGuarded(stream)) { - guarded[guardedErrors] = []; - attachDefaultErrorListener.call(guarded, 'error'); - } - return guarded; -} - -/** - * Callback that is used when a stream emits an error and no error listener is attached. + * Callback that is used when a stream emits an error and no other error listener is attached. * Used to store the error and start the logger timer. + * + * It is important that this listener always remains attached for edge cases where an error listener gets removed + * and the number of error listeners is checked immediately afterwards. + * See https://github.com/solid/community-server/pull/462#issuecomment-758013492 . */ -function defaultErrorListener(this: Guarded, error: Error): void { - this[guardedErrors].push(error); - if (!this[guardedTimeout]) { - this[guardedTimeout] = setTimeout((): void => { - const message = `No error listener was attached but error was thrown: ${error.message}`; - logger.error(message, { error }); - }, 1000); +function guardingErrorListener(this: Guarded, error: Error): void { + // Only fall back to this if no other listeners are attached + if (this.listenerCount('error') === 1) { + this[guardedErrors].push(error); + if (!this[guardedTimeout]) { + this[guardedTimeout] = setTimeout((): void => { + const message = `No error listener was attached but error was thrown: ${error.message}`; + logger.error(message, { error }); + }, 1000); + } } } /** - * Callback that is used when a new listener is attached to remove the current error-related fallback functions, - * or to emit an error if one was thrown in the meantime. + * Callback that is used when a new listener is attached and there are errors that were not emitted yet. */ -function removeDefaultErrorListener(this: Guarded, event: string): void { +function emitStoredErrors(this: Guarded, event: string): void { if (event === 'error') { - // Remove default guard listeners (but reattach when all error listeners are removed) - this.removeListener('error', defaultErrorListener); - this.removeListener('newListener', removeDefaultErrorListener); - this.addListener('removeListener', attachDefaultErrorListener); - // Cancel an error timeout if (this[guardedTimeout]) { clearTimeout(this[guardedTimeout]!); @@ -90,13 +72,19 @@ function removeDefaultErrorListener(this: Guarded, event: string): void { } /** - * Callback that is used to make sure the error-related fallback functions are re-applied - * when all error listeners are removed. + * Makes sure that listeners always receive the error event of a stream, + * even if it was thrown before the listener was attached. + * If the input is already guarded nothing will happen. + * @param stream - Stream that can potentially throw an error. + * + * @returns The stream. */ -attachDefaultErrorListener = function(this: Guarded, event: string): void { - if (event === 'error' && this.listenerCount('error') === 0) { - this.addListener('error', defaultErrorListener); - this.addListener('newListener', removeDefaultErrorListener); - this.removeListener('removeListener', attachDefaultErrorListener); +export function guardStream(stream: T): Guarded { + const guarded = stream as Guarded; + if (!isGuarded(stream)) { + guarded[guardedErrors] = []; + guarded.on('error', guardingErrorListener); + guarded.on('newListener', emitStoredErrors); } -}; + return guarded; +} diff --git a/test/unit/util/GuardedStream.test.ts b/test/unit/util/GuardedStream.test.ts index a46f8efc7..c3865eddc 100644 --- a/test/unit/util/GuardedStream.test.ts +++ b/test/unit/util/GuardedStream.test.ts @@ -26,12 +26,7 @@ describe('GuardedStream', (): void => { expect(isGuarded(stream)).toBe(true); expect(isGuarded(guarded)).toBe(true); - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - await expect(readableToString(stream)).resolves.toBe('data'); - await expect(listen).resolves.toBeUndefined(); + await expect(readableToString(guarded)).resolves.toBe('data'); }); it('returns the stream if it is already guarded.', async(): Promise => { @@ -45,26 +40,21 @@ describe('GuardedStream', (): void => { expect(isGuarded(stream)).toBe(true); expect(isGuarded(guarded)).toBe(true); - expect(stream.listenerCount('error')).toBe(1); - expect(stream.listenerCount('newListener')).toBe(1); - expect(stream.listenerCount('removeListener')).toBe(0); + expect(guarded.listenerCount('error')).toBe(1); + expect(guarded.listenerCount('newListener')).toBe(1); + expect(guarded.listenerCount('removeListener')).toBe(0); - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - await expect(readableToString(stream)).resolves.toBe('data'); - await expect(listen).resolves.toBeUndefined(); + await expect(readableToString(guarded)).resolves.toBe('data'); }); it('emits errors when listeners are currently attached.', async(): Promise => { const stream = guardStream(Readable.from([ 'data' ])); - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - stream.emit('error', new Error('error')); - await expect(listen).rejects.toThrow(new Error('error')); + const listener = jest.fn(); + stream.on('error', listener); + const error = new Error('error'); + stream.emit('error', error); + expect(listener).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenLastCalledWith(error); }); it('emits guarded errors when new listeners are attached.', async(): Promise => { @@ -118,50 +108,57 @@ describe('GuardedStream', (): void => { const errorCb2 = jest.fn(); stream.on('error', errorCb); stream.on('error', errorCb2); - expect(stream.listenerCount('error')).toBe(2); - expect(stream.listenerCount('newListener')).toBe(0); - expect(stream.listenerCount('removeListener')).toBe(1); + expect(stream.listenerCount('error')).toBe(3); + expect(stream.listenerCount('newListener')).toBe(1); stream.removeListener('error', errorCb2); - expect(stream.listenerCount('error')).toBe(1); - expect(stream.listenerCount('newListener')).toBe(0); - expect(stream.listenerCount('removeListener')).toBe(1); + expect(stream.listenerCount('error')).toBe(2); + expect(stream.listenerCount('newListener')).toBe(1); stream.removeListener('error', errorCb); expect(stream.listenerCount('error')).toBe(1); expect(stream.listenerCount('newListener')).toBe(1); - expect(stream.listenerCount('removeListener')).toBe(0); - stream.emit('error', new Error('error')); - - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - await expect(listen).rejects.toThrow(new Error('error')); - }); - - it('logs a warning if nobody listens to the error.', async(): Promise => { - const error = new Error('failure'); - const stream = guardStream(Readable.from([ 'data' ])); + const error = new Error('error'); stream.emit('error', error); - jest.advanceTimersByTime(100); - stream.emit('error', new Error('other')); - stream.emit('error', new Error('other')); + const errorCb3 = jest.fn(); + stream.on('error', errorCb3); + await new Promise((resolve): any => setImmediate(resolve)); + + expect(errorCb).toHaveBeenCalledTimes(0); + expect(errorCb2).toHaveBeenCalledTimes(0); + expect(errorCb3).toHaveBeenCalledTimes(1); + expect(errorCb3).toHaveBeenLastCalledWith(error); + }); + + it('logs an error if nobody listens to the error.', async(): Promise => { + const errors = [ new Error('0'), new Error('1'), new Error('2') ]; + const stream = guardStream(Readable.from([ 'data' ])); + stream.emit('error', errors[0]); + + jest.advanceTimersByTime(100); + stream.emit('error', errors[1]); + stream.emit('error', errors[2]); + + // Only the first error gets logged jest.advanceTimersByTime(900); expect(logger.error).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalledWith( - 'No error listener was attached but error was thrown: failure', { error }, + 'No error listener was attached but error was thrown: 0', { error: errors[0] }, ); jest.advanceTimersByTime(1000); expect(logger.error).toHaveBeenCalledTimes(1); - const listen = new Promise((resolve, reject): void => { - stream.on('end', resolve); - stream.on('error', reject); - }); - await expect(listen).rejects.toThrow(error); + const errorCb = jest.fn(); + stream.on('error', errorCb); + + await new Promise((resolve): any => setImmediate(resolve)); + + expect(errorCb).toHaveBeenCalledTimes(3); + expect(errorCb).toHaveBeenNthCalledWith(1, errors[0]); + expect(errorCb).toHaveBeenNthCalledWith(2, errors[1]); + expect(errorCb).toHaveBeenNthCalledWith(3, errors[2]); }); }); });