mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
fix: Close unpiped streams
This commit is contained in:
parent
904c069451
commit
386d78277d
@ -40,6 +40,16 @@ export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream,
|
|||||||
// in order to prevent memory leaks."
|
// in order to prevent memory leaks."
|
||||||
destination.destroy(mapError ? mapError(error) : error);
|
destination.destroy(mapError ? mapError(error) : error);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Make sure we have no dangling streams in case of unpiping, which can happen if there's an error.
|
||||||
|
// This can also happen if a stream naturally closes so most calls here will not be indication of a problem.
|
||||||
|
// From https://nodejs.org/api/stream.html#stream_errors_while_writing :
|
||||||
|
// "If a Readable stream pipes into a Writable stream when Writable emits an error,
|
||||||
|
// the Readable stream will be unpiped."
|
||||||
|
destination.on('unpipe', (source): void => {
|
||||||
|
source.destroy();
|
||||||
|
});
|
||||||
|
|
||||||
return guardStream(destination);
|
return guardStream(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +40,22 @@ describe('StreamUtil', (): void => {
|
|||||||
const piped = pipeSafely(input, output, (): any => new Error('other error'));
|
const piped = pipeSafely(input, output, (): any => new Error('other error'));
|
||||||
await expect(readableToString(piped)).rejects.toThrow('other error');
|
await expect(readableToString(piped)).rejects.toThrow('other error');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('destroys the source stream in case the destinations becomes unpiped.', async(): Promise<void> => {
|
||||||
|
const input = new PassThrough();
|
||||||
|
const output = new PassThrough();
|
||||||
|
const piped = pipeSafely(input, output);
|
||||||
|
|
||||||
|
// Catch errors to prevent problems in test output
|
||||||
|
output.on('error', (): void => {
|
||||||
|
// Empty
|
||||||
|
});
|
||||||
|
|
||||||
|
piped.destroy(new Error('this causes an unpipe!'));
|
||||||
|
// Allow events to propagate
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
expect(input.destroyed).toBe(true);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('#transformSafely', (): void => {
|
describe('#transformSafely', (): void => {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user