diff --git a/package-lock.json b/package-lock.json index e214f4f5e..b43fd1b2c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1338,6 +1338,14 @@ "integrity": "sha512-UEyp8LwZ4Dg30kVU2Q3amHHyTn1jEdhCIE59ANed76GaT1Vp76DD3ZWSAxgCrw6wJ0TqeoBpqmfUHiUDPs//HQ==", "dev": true }, + "@types/pump": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@types/pump/-/pump-1.1.0.tgz", + "integrity": "sha512-YGGbsqf5o7sF8gGANP8ZYxgaRGlFgEAImx5tCvA4YKRCfqbsDQZO48UmWynZzSjbhn0ZWSlsWOcb5NwvOx8KcQ==", + "requires": { + "@types/node": "*" + } + }, "@types/qs": { "version": "6.9.5", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.5.tgz", @@ -3278,7 +3286,6 @@ "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", - "dev": true, "requires": { "once": "^1.4.0" } @@ -7367,7 +7374,6 @@ "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", - "dev": true, "requires": { "wrappy": "1" } @@ -7780,7 +7786,6 @@ "version": "3.0.0", "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", - "dev": true, "requires": { "end-of-stream": "^1.1.0", "once": "^1.3.1" @@ -10432,8 +10437,7 @@ "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", - "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", - "dev": true + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" }, "write": { "version": "1.0.3", diff --git a/package.json b/package.json index 0dee4246e..4fbbe8a60 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "@types/mime-types": "^2.1.0", "@types/n3": "^1.4.4", "@types/node": "^14.10.2", + "@types/pump": "^1.1.0", "@types/rdf-js": "^4.0.0", "@types/sparqljs": "^3.1.0", "@types/streamify-array": "^1.0.0", @@ -99,6 +100,7 @@ "handlebars": "^4.7.6", "mime-types": "^2.1.27", "n3": "^1.8.0", + "pump": "^3.0.0", "rdf-parse": "^1.7.0", "rdf-serialize": "^1.1.0", "rdf-terms": "^1.5.1", diff --git a/src/util/GuardedStream.ts b/src/util/GuardedStream.ts index a5fd0a670..d95b65de0 100644 --- a/src/util/GuardedStream.ts +++ b/src/util/GuardedStream.ts @@ -35,8 +35,10 @@ export function isGuarded(stream: T): stream is G * See https://github.com/solid/community-server/pull/462#issuecomment-758013492 . */ function guardingErrorListener(this: Guarded, error: Error): void { - // Only fall back to this if no other listeners are attached - if (this.listenerCount('error') === 1) { + // Only fall back to this if no new listeners are attached since guarding started. + // Not storing the index when guarding starts since listeners could be removed. + const idx = this.listeners('error').indexOf(guardingErrorListener); + if (idx === this.listenerCount('error') - 1) { this[guardedErrors].push(error); if (!this[guardedTimeout]) { this[guardedTimeout] = setTimeout((): void => { @@ -50,8 +52,8 @@ function guardingErrorListener(this: Guarded, error: Error): void { /** * Callback that is used when a new listener is attached and there are errors that were not emitted yet. */ -function emitStoredErrors(this: Guarded, event: string): void { - if (event === 'error') { +function emitStoredErrors(this: Guarded, event: string, func: (error: Error) => void): void { + if (event === 'error' && func !== guardingErrorListener) { // Cancel an error timeout if (this[guardedTimeout]) { clearTimeout(this[guardedTimeout]!); @@ -74,7 +76,13 @@ function emitStoredErrors(this: Guarded, event: string): void { /** * Makes sure that listeners always receive the error event of a stream, * even if it was thrown before the listener was attached. - * If the input is already guarded nothing will happen. + * + * When guarding a stream it is assumed that error listeners already attached should be ignored, + * only error listeners attached after the stream is guarded will prevent an error from being logged. + * + * If the input is already guarded the guard will be reset, + * which means ignoring error listeners already attached. + * * @param stream - Stream that can potentially throw an error. * * @returns The stream. @@ -85,6 +93,10 @@ export function guardStream(stream: T): Guarded { */ export function pipeSafely(readable: NodeJS.ReadableStream, destination: T, mapError?: (error: Error) => Error): Guarded { - // Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error - readable.pipe(destination); - readable.on('error', (error): void => { - logger.warn(`Piped stream errored with ${error.message}`); - - // From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options : - // "One important caveat is that if the Readable stream emits an error during processing, the Writable destination - // is not closed automatically. If an error occurs, it will be necessary to manually close each stream - // in order to prevent memory leaks." - destination.destroy(mapError ? mapError(error) : error); + // In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener + pump(readable, destination, (error): void => { + if (error) { + logger.warn(`Piped stream errored with ${error.message}`); + // Make sure the final error can be handled in a normal streaming fashion + destination.emit('error', mapError ? mapError(error) : error); + } }); - - // Make sure we have no dangling streams in case of unpiping, which can happen if there's an error. - // This can also happen if a stream naturally closes so most calls here will not be indication of a problem. - // From https://nodejs.org/api/stream.html#stream_errors_while_writing : - // "If a Readable stream pipes into a Writable stream when Writable emits an error, - // the Readable stream will be unpiped." - destination.on('unpipe', (source): void => { - source.destroy(); - }); - + // Guarding the stream now means the internal error listeners of pump will be ignored + // when checking if there is a valid error listener. return guardStream(destination); } diff --git a/test/unit/util/GuardedStream.test.ts b/test/unit/util/GuardedStream.test.ts index c3865eddc..aa5818012 100644 --- a/test/unit/util/GuardedStream.test.ts +++ b/test/unit/util/GuardedStream.test.ts @@ -89,7 +89,7 @@ describe('GuardedStream', (): void => { expect(endListener).toHaveBeenCalledTimes(0); }); - it('does not time out when a listener was already attached.', async(): Promise => { + it('ignores error listeners that were already attached.', async(): Promise => { const stream = Readable.from([ 'data' ]); stream.addListener('error', jest.fn()); guardStream(stream); @@ -97,7 +97,21 @@ describe('GuardedStream', (): void => { stream.emit('error', new Error('error')); jest.advanceTimersByTime(1000); - expect(logger.error).toHaveBeenCalledTimes(0); + expect(logger.error).toHaveBeenCalledTimes(1); + }); + + it('ignores error listeners after calling guardStream a second time.', async(): Promise => { + const stream = Readable.from([ 'data' ]); + guardStream(stream); + stream.addListener('error', jest.fn()); + + // This will cause the above error listener to be ignored for logging purposes + guardStream(stream); + + stream.emit('error', new Error('error')); + + jest.advanceTimersByTime(1000); + expect(logger.error).toHaveBeenCalledTimes(1); }); it('still works if error listeners get removed and added again.', async(): Promise => {