From fd45779159392dffb94f8762933f1634107ea3a9 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Wed, 10 Feb 2021 13:13:09 +0100 Subject: [PATCH] feat: Pipe streams with the pump library The library handles some edge cases we didn't yet. The GuardedStream was also updated to ignore error listeners already attached to the stream (since pump adds internal listeners). --- package-lock.json | 14 ++++++++----- package.json | 2 ++ src/util/GuardedStream.ts | 22 +++++++++++++++----- src/util/StreamUtil.ts | 30 ++++++++++------------------ test/unit/util/GuardedStream.test.ts | 18 +++++++++++++++-- 5 files changed, 54 insertions(+), 32 deletions(-) diff --git a/package-lock.json b/package-lock.json index e214f4f5e..b43fd1b2c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1338,6 +1338,14 @@ "integrity": "sha512-UEyp8LwZ4Dg30kVU2Q3amHHyTn1jEdhCIE59ANed76GaT1Vp76DD3ZWSAxgCrw6wJ0TqeoBpqmfUHiUDPs//HQ==", "dev": true }, + "@types/pump": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@types/pump/-/pump-1.1.0.tgz", + "integrity": "sha512-YGGbsqf5o7sF8gGANP8ZYxgaRGlFgEAImx5tCvA4YKRCfqbsDQZO48UmWynZzSjbhn0ZWSlsWOcb5NwvOx8KcQ==", + "requires": { + "@types/node": "*" + } + }, "@types/qs": { "version": "6.9.5", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.5.tgz", @@ -3278,7 +3286,6 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "dev": true, "requires": { "once": "^1.4.0" } @@ -7367,7 +7374,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1" } @@ -7780,7 +7786,6 @@ "version": "3.0.0", "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", - "dev": true, "requires": { "end-of-stream": "^1.1.0", "once": "^1.3.1" @@ -10432,8 +10437,7 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "write": { "version": "1.0.3", diff --git a/package.json b/package.json index 0dee4246e..4fbbe8a60 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "@types/mime-types": "^2.1.0", "@types/n3": "^1.4.4", "@types/node": "^14.10.2", + "@types/pump": "^1.1.0", "@types/rdf-js": "^4.0.0", "@types/sparqljs": "^3.1.0", "@types/streamify-array": "^1.0.0", @@ -99,6 +100,7 @@ "handlebars": "^4.7.6", "mime-types": "^2.1.27", "n3": "^1.8.0", + "pump": "^3.0.0", "rdf-parse": "^1.7.0", "rdf-serialize": "^1.1.0", "rdf-terms": "^1.5.1", diff --git a/src/util/GuardedStream.ts b/src/util/GuardedStream.ts index a5fd0a670..d95b65de0 100644 --- a/src/util/GuardedStream.ts +++ b/src/util/GuardedStream.ts @@ -35,8 +35,10 @@ export function isGuarded(stream: T): stream is G * See https://github.com/solid/community-server/pull/462#issuecomment-758013492 . */ function guardingErrorListener(this: Guarded, error: Error): void { - // Only fall back to this if no other listeners are attached - if (this.listenerCount('error') === 1) { + // Only fall back to this if no new listeners are attached since guarding started. + // Not storing the index when guarding starts since listeners could be removed. + const idx = this.listeners('error').indexOf(guardingErrorListener); + if (idx === this.listenerCount('error') - 1) { this[guardedErrors].push(error); if (!this[guardedTimeout]) { this[guardedTimeout] = setTimeout((): void => { @@ -50,8 +52,8 @@ function guardingErrorListener(this: Guarded, error: Error): void { /** * Callback that is used when a new listener is attached and there are errors that were not emitted yet. */ -function emitStoredErrors(this: Guarded, event: string): void { - if (event === 'error') { +function emitStoredErrors(this: Guarded, event: string, func: (error: Error) => void): void { + if (event === 'error' && func !== guardingErrorListener) { // Cancel an error timeout if (this[guardedTimeout]) { clearTimeout(this[guardedTimeout]!); @@ -74,7 +76,13 @@ function emitStoredErrors(this: Guarded, event: string): void { /** * Makes sure that listeners always receive the error event of a stream, * even if it was thrown before the listener was attached. - * If the input is already guarded nothing will happen. + * + * When guarding a stream it is assumed that error listeners already attached should be ignored, + * only error listeners attached after the stream is guarded will prevent an error from being logged. + * + * If the input is already guarded the guard will be reset, + * which means ignoring error listeners already attached. + * * @param stream - Stream that can potentially throw an error. * * @returns The stream. @@ -85,6 +93,10 @@ export function guardStream(stream: T): Guarded { */ export function pipeSafely(readable: NodeJS.ReadableStream, destination: T, mapError?: (error: Error) => Error): Guarded { - // Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error - readable.pipe(destination); - readable.on('error', (error): void => { - logger.warn(`Piped stream errored with ${error.message}`); - - // From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options : - // "One important caveat is that if the Readable stream emits an error during processing, the Writable destination - // is not closed automatically. If an error occurs, it will be necessary to manually close each stream - // in order to prevent memory leaks." - destination.destroy(mapError ? mapError(error) : error); + // In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener + pump(readable, destination, (error): void => { + if (error) { + logger.warn(`Piped stream errored with ${error.message}`); + // Make sure the final error can be handled in a normal streaming fashion + destination.emit('error', mapError ? mapError(error) : error); + } }); - - // Make sure we have no dangling streams in case of unpiping, which can happen if there's an error. - // This can also happen if a stream naturally closes so most calls here will not be indication of a problem. - // From https://nodejs.org/api/stream.html#stream_errors_while_writing : - // "If a Readable stream pipes into a Writable stream when Writable emits an error, - // the Readable stream will be unpiped." - destination.on('unpipe', (source): void => { - source.destroy(); - }); - + // Guarding the stream now means the internal error listeners of pump will be ignored + // when checking if there is a valid error listener. return guardStream(destination); } diff --git a/test/unit/util/GuardedStream.test.ts b/test/unit/util/GuardedStream.test.ts index c3865eddc..aa5818012 100644 --- a/test/unit/util/GuardedStream.test.ts +++ b/test/unit/util/GuardedStream.test.ts @@ -89,7 +89,7 @@ describe('GuardedStream', (): void => { expect(endListener).toHaveBeenCalledTimes(0); }); - it('does not time out when a listener was already attached.', async(): Promise => { + it('ignores error listeners that were already attached.', async(): Promise => { const stream = Readable.from([ 'data' ]); stream.addListener('error', jest.fn()); guardStream(stream); @@ -97,7 +97,21 @@ describe('GuardedStream', (): void => { stream.emit('error', new Error('error')); jest.advanceTimersByTime(1000); - expect(logger.error).toHaveBeenCalledTimes(0); + expect(logger.error).toHaveBeenCalledTimes(1); + }); + + it('ignores error listeners after calling guardStream a second time.', async(): Promise => { + const stream = Readable.from([ 'data' ]); + guardStream(stream); + stream.addListener('error', jest.fn()); + + // This will cause the above error listener to be ignored for logging purposes + guardStream(stream); + + stream.emit('error', new Error('error')); + + jest.advanceTimersByTime(1000); + expect(logger.error).toHaveBeenCalledTimes(1); }); it('still works if error listeners get removed and added again.', async(): Promise => {