mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
fix: Emit all guarded errors to all listeners.
This commit is contained in:
parent
166c4de493
commit
4faf916ece
@ -1,27 +1,17 @@
|
|||||||
import { getLoggerFor } from '../logging/LogUtil';
|
import { getLoggerFor } from '../logging/LogUtil';
|
||||||
|
|
||||||
// Until Typescript adds Nominal types this is how to do it.
|
|
||||||
// This can be generalized should we need it in the future for other cases.
|
|
||||||
// Also using the guard to store the intermediate variables.
|
|
||||||
// See the following issue:
|
|
||||||
// https://github.com/microsoft/TypeScript/issues/202
|
|
||||||
|
|
||||||
const logger = getLoggerFor('GuardedStream');
|
const logger = getLoggerFor('GuardedStream');
|
||||||
|
|
||||||
// Using symbols to make sure we don't override existing parameters
|
// Using symbols to make sure we don't override existing parameters
|
||||||
const guard = Symbol('guard');
|
const guardedErrors = Symbol('guardedErrors');
|
||||||
const errorGuard = Symbol('error');
|
const guardedTimeout = Symbol('guardedTimeout');
|
||||||
const timeoutGuard = Symbol('timeout');
|
|
||||||
|
|
||||||
// Class used to guard streams
|
let attachDefaultErrorListener: (this: Guarded, event: string) => void;
|
||||||
|
|
||||||
|
// Private fields for guarded streams
|
||||||
class Guard {
|
class Guard {
|
||||||
protected [guard]: boolean;
|
private [guardedErrors]: Error[];
|
||||||
}
|
private [guardedTimeout]?: NodeJS.Timeout;
|
||||||
|
|
||||||
// Hidden interface for guard-related variables
|
|
||||||
interface StoredErrorStream extends NodeJS.EventEmitter {
|
|
||||||
[errorGuard]?: Error;
|
|
||||||
[timeoutGuard]?: NodeJS.Timeout;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -29,36 +19,71 @@ interface StoredErrorStream extends NodeJS.EventEmitter {
|
|||||||
* If an error occurs while no listener is attached,
|
* If an error occurs while no listener is attached,
|
||||||
* it will store the error and emit it once a listener is added (or a timeout occurs).
|
* it will store the error and emit it once a listener is added (or a timeout occurs).
|
||||||
*/
|
*/
|
||||||
export type Guarded<T extends NodeJS.EventEmitter> = T & Guard;
|
export type Guarded<T extends NodeJS.EventEmitter = NodeJS.EventEmitter> = T & Guard;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines whether the stream is guarded from emitting errors.
|
||||||
|
*/
|
||||||
|
export const isGuarded = <T extends NodeJS.EventEmitter>(stream: T): stream is Guarded<T> => guardedErrors in stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Makes sure that listeners always receive the error event of a stream,
|
||||||
|
* even if it was thrown before the listener was attached.
|
||||||
|
* If the input is already guarded nothing will happen.
|
||||||
|
* @param stream - Stream that can potentially throw an error.
|
||||||
|
*
|
||||||
|
* @returns The stream.
|
||||||
|
*/
|
||||||
|
export const guardStream = <T extends NodeJS.EventEmitter>(stream: T): Guarded<T> => {
|
||||||
|
const guarded = stream as Guarded<T>;
|
||||||
|
if (!isGuarded(stream)) {
|
||||||
|
guarded[guardedErrors] = [];
|
||||||
|
attachDefaultErrorListener.call(guarded, 'error');
|
||||||
|
}
|
||||||
|
return guarded;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback that is used when a stream emits an error and no error listener is attached.
|
* Callback that is used when a stream emits an error and no error listener is attached.
|
||||||
* Used to store the error and start the logger timer.
|
* Used to store the error and start the logger timer.
|
||||||
*/
|
*/
|
||||||
const defaultErrorListener = function(this: StoredErrorStream, err: Error): void {
|
const defaultErrorListener = function(this: Guarded, error: Error): void {
|
||||||
this[errorGuard] = err;
|
this[guardedErrors].push(error);
|
||||||
this[timeoutGuard] = setTimeout((): void => {
|
if (!this[guardedTimeout]) {
|
||||||
logger.error(`No error listener was attached but error was thrown: ${err.message}`);
|
this[guardedTimeout] = setTimeout((): void => {
|
||||||
}, 1000);
|
const message = `No error listener was attached but error was thrown: ${error.message}`;
|
||||||
|
logger.error(message, { error });
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let attachDefaultErrorListener: (this: StoredErrorStream, event: string) => void;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback that is used when a new listener is attached to remove the current error-related fallback functions,
|
* Callback that is used when a new listener is attached to remove the current error-related fallback functions,
|
||||||
* or to emit an error if one was thrown in the meantime.
|
* or to emit an error if one was thrown in the meantime.
|
||||||
*/
|
*/
|
||||||
const removeDefaultErrorListener = function(this: StoredErrorStream, event: string, listener: (err: Error) => void):
|
const removeDefaultErrorListener = function(this: Guarded, event: string):
|
||||||
void {
|
void {
|
||||||
if (event === 'error') {
|
if (event === 'error') {
|
||||||
|
// Remove default guard listeners (but reattach when all error listeners are removed)
|
||||||
this.removeListener('error', defaultErrorListener);
|
this.removeListener('error', defaultErrorListener);
|
||||||
this.removeListener('newListener', removeDefaultErrorListener);
|
this.removeListener('newListener', removeDefaultErrorListener);
|
||||||
this.on('removeListener', attachDefaultErrorListener);
|
this.addListener('removeListener', attachDefaultErrorListener);
|
||||||
if (this[timeoutGuard]) {
|
|
||||||
clearTimeout(this[timeoutGuard]!);
|
// Cancel an error timeout
|
||||||
|
if (this[guardedTimeout]) {
|
||||||
|
clearTimeout(this[guardedTimeout]!);
|
||||||
|
this[guardedTimeout] = undefined;
|
||||||
}
|
}
|
||||||
if (this[errorGuard]) {
|
|
||||||
setImmediate((): void => listener(this[errorGuard]!));
|
// Emit any errors that were guarded
|
||||||
|
const errors = this[guardedErrors];
|
||||||
|
if (errors.length > 0) {
|
||||||
|
this[guardedErrors] = [];
|
||||||
|
setImmediate((): void => {
|
||||||
|
for (const error of errors) {
|
||||||
|
this.emit('error', error);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -67,31 +92,10 @@ void {
|
|||||||
* Callback that is used to make sure the error-related fallback functions are re-applied
|
* Callback that is used to make sure the error-related fallback functions are re-applied
|
||||||
* when all error listeners are removed.
|
* when all error listeners are removed.
|
||||||
*/
|
*/
|
||||||
attachDefaultErrorListener = function(this: StoredErrorStream, event: string): void {
|
attachDefaultErrorListener = function(this: Guarded, event: string): void {
|
||||||
if (event === 'error' && this.listenerCount('error') === 0) {
|
if (event === 'error' && this.listenerCount('error') === 0) {
|
||||||
this.on('error', defaultErrorListener);
|
this.addListener('error', defaultErrorListener);
|
||||||
this.on('newListener', removeDefaultErrorListener);
|
this.addListener('newListener', removeDefaultErrorListener);
|
||||||
this.removeListener('removeListener', attachDefaultErrorListener);
|
this.removeListener('removeListener', attachDefaultErrorListener);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Makes sure that listeners always receive the error event of a stream,
|
|
||||||
* even if it was thrown before the listener was attached.
|
|
||||||
* If the input is already guarded nothing will happen.
|
|
||||||
* @param stream - Stream that can potentially throw an error.
|
|
||||||
*
|
|
||||||
* @returns The wrapped stream.
|
|
||||||
*/
|
|
||||||
export const guardStream = <T extends NodeJS.EventEmitter>(stream: T): Guarded<T> => {
|
|
||||||
const guarded = stream as Guarded<T>;
|
|
||||||
if (guarded[guard]) {
|
|
||||||
return guarded;
|
|
||||||
}
|
|
||||||
|
|
||||||
guarded.on('error', defaultErrorListener);
|
|
||||||
guarded.on('newListener', removeDefaultErrorListener);
|
|
||||||
|
|
||||||
guarded[guard] = true;
|
|
||||||
return guarded;
|
|
||||||
};
|
|
||||||
|
@ -1,11 +1,31 @@
|
|||||||
import { Readable } from 'stream';
|
import { Readable } from 'stream';
|
||||||
import { guardStream } from '../../../src/util/GuardedStream';
|
import type { Logger } from '../../../src/logging/Logger';
|
||||||
|
import { getLoggerFor } from '../../../src/logging/LogUtil';
|
||||||
|
import { guardStream, isGuarded } from '../../../src/util/GuardedStream';
|
||||||
import { readableToString } from '../../../src/util/StreamUtil';
|
import { readableToString } from '../../../src/util/StreamUtil';
|
||||||
|
|
||||||
|
jest.mock('../../../src/logging/LogUtil', (): any => {
|
||||||
|
const logger: Logger = { error: jest.fn() } as any;
|
||||||
|
return { getLoggerFor: (): Logger => logger };
|
||||||
|
});
|
||||||
|
const logger: jest.Mocked<Logger> = getLoggerFor('GuardedStream') as any;
|
||||||
|
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
describe('GuardedStream', (): void => {
|
describe('GuardedStream', (): void => {
|
||||||
|
beforeEach((): void => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
describe('#guardStream', (): void => {
|
describe('#guardStream', (): void => {
|
||||||
it('has no effect if no error is thrown.', async(): Promise<void> => {
|
it('has no effect if no error is thrown.', async(): Promise<void> => {
|
||||||
const stream = guardStream(Readable.from([ 'data' ]));
|
const stream = Readable.from([ 'data' ]);
|
||||||
|
expect(isGuarded(stream)).toBe(false);
|
||||||
|
const guarded = guardStream(stream);
|
||||||
|
expect(guarded).toBe(stream);
|
||||||
|
expect(isGuarded(stream)).toBe(true);
|
||||||
|
expect(isGuarded(guarded)).toBe(true);
|
||||||
|
|
||||||
const listen = new Promise((resolve, reject): void => {
|
const listen = new Promise((resolve, reject): void => {
|
||||||
stream.on('end', resolve);
|
stream.on('end', resolve);
|
||||||
stream.on('error', reject);
|
stream.on('error', reject);
|
||||||
@ -15,7 +35,16 @@ describe('GuardedStream', (): void => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('returns the stream if it is already guarded.', async(): Promise<void> => {
|
it('returns the stream if it is already guarded.', async(): Promise<void> => {
|
||||||
const stream = guardStream(guardStream(Readable.from([ 'data' ])));
|
const stream = Readable.from([ 'data' ]);
|
||||||
|
expect(isGuarded(stream)).toBe(false);
|
||||||
|
const guarded = guardStream(stream);
|
||||||
|
expect(guarded).toBe(stream);
|
||||||
|
expect(isGuarded(stream)).toBe(true);
|
||||||
|
expect(isGuarded(guarded)).toBe(true);
|
||||||
|
expect(guardStream(guarded)).toBe(stream);
|
||||||
|
expect(isGuarded(stream)).toBe(true);
|
||||||
|
expect(isGuarded(guarded)).toBe(true);
|
||||||
|
|
||||||
expect(stream.listenerCount('error')).toBe(1);
|
expect(stream.listenerCount('error')).toBe(1);
|
||||||
expect(stream.listenerCount('newListener')).toBe(1);
|
expect(stream.listenerCount('newListener')).toBe(1);
|
||||||
expect(stream.listenerCount('removeListener')).toBe(0);
|
expect(stream.listenerCount('removeListener')).toBe(0);
|
||||||
@ -28,9 +57,8 @@ describe('GuardedStream', (): void => {
|
|||||||
await expect(listen).resolves.toBeUndefined();
|
await expect(listen).resolves.toBeUndefined();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('still emits errors when they happen.', async(): Promise<void> => {
|
it('emits errors when listeners are currently attached.', async(): Promise<void> => {
|
||||||
let stream = Readable.from([ 'data' ]);
|
const stream = guardStream(Readable.from([ 'data' ]));
|
||||||
stream = guardStream(stream);
|
|
||||||
const listen = new Promise((resolve, reject): void => {
|
const listen = new Promise((resolve, reject): void => {
|
||||||
stream.on('end', resolve);
|
stream.on('end', resolve);
|
||||||
stream.on('error', reject);
|
stream.on('error', reject);
|
||||||
@ -39,20 +67,51 @@ describe('GuardedStream', (): void => {
|
|||||||
await expect(listen).rejects.toThrow(new Error('error'));
|
await expect(listen).rejects.toThrow(new Error('error'));
|
||||||
});
|
});
|
||||||
|
|
||||||
it('emits old errors when new listeners are attached.', async(): Promise<void> => {
|
it('emits guarded errors when new listeners are attached.', async(): Promise<void> => {
|
||||||
let stream = Readable.from([ 'data' ]);
|
const errors = [ new Error('0'), new Error('1') ];
|
||||||
stream = guardStream(stream);
|
const stream = guardStream(Readable.from([ 'data' ]));
|
||||||
|
stream.emit('error', errors[0]);
|
||||||
|
stream.emit('error', errors[1]);
|
||||||
|
|
||||||
|
const errorListeners = [ jest.fn(), jest.fn(), jest.fn() ];
|
||||||
|
stream.addListener('error', errorListeners[0]);
|
||||||
|
stream.addListener('error', errorListeners[1]);
|
||||||
|
stream.addListener('error', errorListeners[2]);
|
||||||
|
const endListener = jest.fn();
|
||||||
|
stream.addListener('end', endListener);
|
||||||
|
|
||||||
|
expect(errorListeners[0]).toHaveBeenCalledTimes(0);
|
||||||
|
expect(errorListeners[1]).toHaveBeenCalledTimes(0);
|
||||||
|
expect(errorListeners[2]).toHaveBeenCalledTimes(0);
|
||||||
|
expect(endListener).toHaveBeenCalledTimes(0);
|
||||||
|
|
||||||
|
await new Promise((resolve): any => setImmediate(resolve));
|
||||||
|
|
||||||
|
expect(errorListeners[0]).toHaveBeenCalledTimes(2);
|
||||||
|
expect(errorListeners[0]).toHaveBeenNthCalledWith(1, errors[0]);
|
||||||
|
expect(errorListeners[0]).toHaveBeenNthCalledWith(2, errors[1]);
|
||||||
|
expect(errorListeners[1]).toHaveBeenCalledTimes(2);
|
||||||
|
expect(errorListeners[1]).toHaveBeenNthCalledWith(1, errors[0]);
|
||||||
|
expect(errorListeners[1]).toHaveBeenNthCalledWith(2, errors[1]);
|
||||||
|
expect(errorListeners[2]).toHaveBeenCalledTimes(2);
|
||||||
|
expect(errorListeners[1]).toHaveBeenNthCalledWith(1, errors[0]);
|
||||||
|
expect(errorListeners[1]).toHaveBeenNthCalledWith(2, errors[1]);
|
||||||
|
expect(endListener).toHaveBeenCalledTimes(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not time out when a listener was already attached.', async(): Promise<void> => {
|
||||||
|
const stream = Readable.from([ 'data' ]);
|
||||||
|
stream.addListener('error', jest.fn());
|
||||||
|
guardStream(stream);
|
||||||
|
|
||||||
stream.emit('error', new Error('error'));
|
stream.emit('error', new Error('error'));
|
||||||
const listen = new Promise((resolve, reject): void => {
|
|
||||||
stream.on('end', resolve);
|
jest.advanceTimersByTime(1000);
|
||||||
stream.on('error', reject);
|
expect(logger.error).toHaveBeenCalledTimes(0);
|
||||||
});
|
|
||||||
await expect(listen).rejects.toThrow(new Error('error'));
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('still works if error listeners get removed and added again.', async(): Promise<void> => {
|
it('still works if error listeners get removed and added again.', async(): Promise<void> => {
|
||||||
let stream = Readable.from([ 'data' ]);
|
const stream = guardStream(Readable.from([ 'data' ]));
|
||||||
stream = guardStream(stream);
|
|
||||||
|
|
||||||
// Make sure no unneeded listeners stay attached
|
// Make sure no unneeded listeners stay attached
|
||||||
const errorCb = jest.fn();
|
const errorCb = jest.fn();
|
||||||
@ -81,21 +140,28 @@ describe('GuardedStream', (): void => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('logs a warning if nobody listens to the error.', async(): Promise<void> => {
|
it('logs a warning if nobody listens to the error.', async(): Promise<void> => {
|
||||||
jest.useFakeTimers();
|
const error = new Error('failure');
|
||||||
|
const stream = guardStream(Readable.from([ 'data' ]));
|
||||||
|
stream.emit('error', error);
|
||||||
|
|
||||||
let stream = Readable.from([ 'data' ]);
|
jest.advanceTimersByTime(100);
|
||||||
stream = guardStream(stream);
|
stream.emit('error', new Error('other'));
|
||||||
stream.emit('error', new Error('error'));
|
stream.emit('error', new Error('other'));
|
||||||
|
|
||||||
|
jest.advanceTimersByTime(900);
|
||||||
|
expect(logger.error).toHaveBeenCalledTimes(1);
|
||||||
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
|
'No error listener was attached but error was thrown: failure', { error },
|
||||||
|
);
|
||||||
|
|
||||||
jest.advanceTimersByTime(1000);
|
jest.advanceTimersByTime(1000);
|
||||||
|
expect(logger.error).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
const listen = new Promise((resolve, reject): void => {
|
const listen = new Promise((resolve, reject): void => {
|
||||||
stream.on('end', resolve);
|
stream.on('end', resolve);
|
||||||
stream.on('error', reject);
|
stream.on('error', reject);
|
||||||
});
|
});
|
||||||
await expect(listen).rejects.toThrow(new Error('error'));
|
await expect(listen).rejects.toThrow(error);
|
||||||
|
|
||||||
// No idea how to access the logger with mocks unfortunately
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
Loading…
x
Reference in New Issue
Block a user