diff --git a/src/util/StreamUtil.ts b/src/util/StreamUtil.ts index 555c27825..c878fc0f7 100644 --- a/src/util/StreamUtil.ts +++ b/src/util/StreamUtil.ts @@ -40,6 +40,16 @@ export function pipeSafely(readable: NodeJS.ReadableStream, // in order to prevent memory leaks." destination.destroy(mapError ? mapError(error) : error); }); + + // Make sure we have no dangling streams in case of unpiping, which can happen if there's an error. + // This can also happen if a stream naturally closes so most calls here will not be indication of a problem. + // From https://nodejs.org/api/stream.html#stream_errors_while_writing : + // "If a Readable stream pipes into a Writable stream when Writable emits an error, + // the Readable stream will be unpiped." + destination.on('unpipe', (source): void => { + source.destroy(); + }); + return guardStream(destination); } diff --git a/test/unit/util/StreamUtil.test.ts b/test/unit/util/StreamUtil.test.ts index a9ce2dc66..70bfeb448 100644 --- a/test/unit/util/StreamUtil.test.ts +++ b/test/unit/util/StreamUtil.test.ts @@ -40,6 +40,22 @@ describe('StreamUtil', (): void => { const piped = pipeSafely(input, output, (): any => new Error('other error')); await expect(readableToString(piped)).rejects.toThrow('other error'); }); + + it('destroys the source stream in case the destinations becomes unpiped.', async(): Promise => { + const input = new PassThrough(); + const output = new PassThrough(); + const piped = pipeSafely(input, output); + + // Catch errors to prevent problems in test output + output.on('error', (): void => { + // Empty + }); + + piped.destroy(new Error('this causes an unpipe!')); + // Allow events to propagate + await new Promise(setImmediate); + expect(input.destroyed).toBe(true); + }); }); describe('#transformSafely', (): void => {