fix: Always keep guarded error listener attached

This commit is contained in:
Joachim Van Herwegen 2021-01-12 10:28:29 +01:00
parent 0f3680db8d
commit 27cc1ec15e
2 changed files with 77 additions and 92 deletions

View File

@ -6,8 +6,6 @@ const logger = getLoggerFor('GuardedStream');
const guardedErrors = Symbol('guardedErrors'); const guardedErrors = Symbol('guardedErrors');
const guardedTimeout = Symbol('guardedTimeout'); const guardedTimeout = Symbol('guardedTimeout');
let attachDefaultErrorListener: (this: Guarded, event: string) => void;
// Private fields for guarded streams // Private fields for guarded streams
class Guard { class Guard {
private [guardedErrors]: Error[]; private [guardedErrors]: Error[];
@ -29,47 +27,31 @@ export function isGuarded<T extends NodeJS.EventEmitter>(stream: T): stream is G
} }
/** /**
* Makes sure that listeners always receive the error event of a stream, * Callback that is used when a stream emits an error and no other error listener is attached.
* even if it was thrown before the listener was attached.
* If the input is already guarded nothing will happen.
* @param stream - Stream that can potentially throw an error.
*
* @returns The stream.
*/
export function guardStream<T extends NodeJS.EventEmitter>(stream: T): Guarded<T> {
const guarded = stream as Guarded<T>;
if (!isGuarded(stream)) {
guarded[guardedErrors] = [];
attachDefaultErrorListener.call(guarded, 'error');
}
return guarded;
}
/**
* Callback that is used when a stream emits an error and no error listener is attached.
* Used to store the error and start the logger timer. * Used to store the error and start the logger timer.
*
* It is important that this listener always remains attached for edge cases where an error listener gets removed
* and the number of error listeners is checked immediately afterwards.
* See https://github.com/solid/community-server/pull/462#issuecomment-758013492 .
*/ */
function defaultErrorListener(this: Guarded, error: Error): void { function guardingErrorListener(this: Guarded, error: Error): void {
this[guardedErrors].push(error); // Only fall back to this if no other listeners are attached
if (!this[guardedTimeout]) { if (this.listenerCount('error') === 1) {
this[guardedTimeout] = setTimeout((): void => { this[guardedErrors].push(error);
const message = `No error listener was attached but error was thrown: ${error.message}`; if (!this[guardedTimeout]) {
logger.error(message, { error }); this[guardedTimeout] = setTimeout((): void => {
}, 1000); const message = `No error listener was attached but error was thrown: ${error.message}`;
logger.error(message, { error });
}, 1000);
}
} }
} }
/** /**
* Callback that is used when a new listener is attached to remove the current error-related fallback functions, * Callback that is used when a new listener is attached and there are errors that were not emitted yet.
* or to emit an error if one was thrown in the meantime.
*/ */
function removeDefaultErrorListener(this: Guarded, event: string): void { function emitStoredErrors(this: Guarded, event: string): void {
if (event === 'error') { if (event === 'error') {
// Remove default guard listeners (but reattach when all error listeners are removed)
this.removeListener('error', defaultErrorListener);
this.removeListener('newListener', removeDefaultErrorListener);
this.addListener('removeListener', attachDefaultErrorListener);
// Cancel an error timeout // Cancel an error timeout
if (this[guardedTimeout]) { if (this[guardedTimeout]) {
clearTimeout(this[guardedTimeout]!); clearTimeout(this[guardedTimeout]!);
@ -90,13 +72,19 @@ function removeDefaultErrorListener(this: Guarded, event: string): void {
} }
/** /**
* Callback that is used to make sure the error-related fallback functions are re-applied * Makes sure that listeners always receive the error event of a stream,
* when all error listeners are removed. * even if it was thrown before the listener was attached.
* If the input is already guarded nothing will happen.
* @param stream - Stream that can potentially throw an error.
*
* @returns The stream.
*/ */
attachDefaultErrorListener = function(this: Guarded, event: string): void { export function guardStream<T extends NodeJS.EventEmitter>(stream: T): Guarded<T> {
if (event === 'error' && this.listenerCount('error') === 0) { const guarded = stream as Guarded<T>;
this.addListener('error', defaultErrorListener); if (!isGuarded(stream)) {
this.addListener('newListener', removeDefaultErrorListener); guarded[guardedErrors] = [];
this.removeListener('removeListener', attachDefaultErrorListener); guarded.on('error', guardingErrorListener);
guarded.on('newListener', emitStoredErrors);
} }
}; return guarded;
}

View File

@ -26,12 +26,7 @@ describe('GuardedStream', (): void => {
expect(isGuarded(stream)).toBe(true); expect(isGuarded(stream)).toBe(true);
expect(isGuarded(guarded)).toBe(true); expect(isGuarded(guarded)).toBe(true);
const listen = new Promise((resolve, reject): void => { await expect(readableToString(guarded)).resolves.toBe('data');
stream.on('end', resolve);
stream.on('error', reject);
});
await expect(readableToString(stream)).resolves.toBe('data');
await expect(listen).resolves.toBeUndefined();
}); });
it('returns the stream if it is already guarded.', async(): Promise<void> => { it('returns the stream if it is already guarded.', async(): Promise<void> => {
@ -45,26 +40,21 @@ describe('GuardedStream', (): void => {
expect(isGuarded(stream)).toBe(true); expect(isGuarded(stream)).toBe(true);
expect(isGuarded(guarded)).toBe(true); expect(isGuarded(guarded)).toBe(true);
expect(stream.listenerCount('error')).toBe(1); expect(guarded.listenerCount('error')).toBe(1);
expect(stream.listenerCount('newListener')).toBe(1); expect(guarded.listenerCount('newListener')).toBe(1);
expect(stream.listenerCount('removeListener')).toBe(0); expect(guarded.listenerCount('removeListener')).toBe(0);
const listen = new Promise((resolve, reject): void => { await expect(readableToString(guarded)).resolves.toBe('data');
stream.on('end', resolve);
stream.on('error', reject);
});
await expect(readableToString(stream)).resolves.toBe('data');
await expect(listen).resolves.toBeUndefined();
}); });
it('emits errors when listeners are currently attached.', async(): Promise<void> => { it('emits errors when listeners are currently attached.', async(): Promise<void> => {
const stream = guardStream(Readable.from([ 'data' ])); const stream = guardStream(Readable.from([ 'data' ]));
const listen = new Promise((resolve, reject): void => { const listener = jest.fn();
stream.on('end', resolve); stream.on('error', listener);
stream.on('error', reject); const error = new Error('error');
}); stream.emit('error', error);
stream.emit('error', new Error('error')); expect(listener).toHaveBeenCalledTimes(1);
await expect(listen).rejects.toThrow(new Error('error')); expect(listener).toHaveBeenLastCalledWith(error);
}); });
it('emits guarded errors when new listeners are attached.', async(): Promise<void> => { it('emits guarded errors when new listeners are attached.', async(): Promise<void> => {
@ -118,50 +108,57 @@ describe('GuardedStream', (): void => {
const errorCb2 = jest.fn(); const errorCb2 = jest.fn();
stream.on('error', errorCb); stream.on('error', errorCb);
stream.on('error', errorCb2); stream.on('error', errorCb2);
expect(stream.listenerCount('error')).toBe(2); expect(stream.listenerCount('error')).toBe(3);
expect(stream.listenerCount('newListener')).toBe(0); expect(stream.listenerCount('newListener')).toBe(1);
expect(stream.listenerCount('removeListener')).toBe(1);
stream.removeListener('error', errorCb2); stream.removeListener('error', errorCb2);
expect(stream.listenerCount('error')).toBe(1); expect(stream.listenerCount('error')).toBe(2);
expect(stream.listenerCount('newListener')).toBe(0); expect(stream.listenerCount('newListener')).toBe(1);
expect(stream.listenerCount('removeListener')).toBe(1);
stream.removeListener('error', errorCb); stream.removeListener('error', errorCb);
expect(stream.listenerCount('error')).toBe(1); expect(stream.listenerCount('error')).toBe(1);
expect(stream.listenerCount('newListener')).toBe(1); expect(stream.listenerCount('newListener')).toBe(1);
expect(stream.listenerCount('removeListener')).toBe(0);
stream.emit('error', new Error('error')); const error = new Error('error');
const listen = new Promise((resolve, reject): void => {
stream.on('end', resolve);
stream.on('error', reject);
});
await expect(listen).rejects.toThrow(new Error('error'));
});
it('logs a warning if nobody listens to the error.', async(): Promise<void> => {
const error = new Error('failure');
const stream = guardStream(Readable.from([ 'data' ]));
stream.emit('error', error); stream.emit('error', error);
jest.advanceTimersByTime(100); const errorCb3 = jest.fn();
stream.emit('error', new Error('other')); stream.on('error', errorCb3);
stream.emit('error', new Error('other'));
await new Promise((resolve): any => setImmediate(resolve));
expect(errorCb).toHaveBeenCalledTimes(0);
expect(errorCb2).toHaveBeenCalledTimes(0);
expect(errorCb3).toHaveBeenCalledTimes(1);
expect(errorCb3).toHaveBeenLastCalledWith(error);
});
it('logs an error if nobody listens to the error.', async(): Promise<void> => {
const errors = [ new Error('0'), new Error('1'), new Error('2') ];
const stream = guardStream(Readable.from([ 'data' ]));
stream.emit('error', errors[0]);
jest.advanceTimersByTime(100);
stream.emit('error', errors[1]);
stream.emit('error', errors[2]);
// Only the first error gets logged
jest.advanceTimersByTime(900); jest.advanceTimersByTime(900);
expect(logger.error).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith( expect(logger.error).toHaveBeenCalledWith(
'No error listener was attached but error was thrown: failure', { error }, 'No error listener was attached but error was thrown: 0', { error: errors[0] },
); );
jest.advanceTimersByTime(1000); jest.advanceTimersByTime(1000);
expect(logger.error).toHaveBeenCalledTimes(1); expect(logger.error).toHaveBeenCalledTimes(1);
const listen = new Promise((resolve, reject): void => { const errorCb = jest.fn();
stream.on('end', resolve); stream.on('error', errorCb);
stream.on('error', reject);
}); await new Promise((resolve): any => setImmediate(resolve));
await expect(listen).rejects.toThrow(error);
expect(errorCb).toHaveBeenCalledTimes(3);
expect(errorCb).toHaveBeenNthCalledWith(1, errors[0]);
expect(errorCb).toHaveBeenNthCalledWith(2, errors[1]);
expect(errorCb).toHaveBeenNthCalledWith(3, errors[2]);
}); });
}); });
}); });