feat: Pipe streams with the pump library

The library handles some edge cases we didn't yet.
The GuardedStream was also updated to ignore error listeners
already attached to the stream (since pump adds internal listeners).
This commit is contained in:
Joachim Van Herwegen 2021-02-10 13:13:09 +01:00
parent 9b6eab27bc
commit fd45779159
5 changed files with 54 additions and 32 deletions

14
package-lock.json generated
View File

@ -1338,6 +1338,14 @@
"integrity": "sha512-UEyp8LwZ4Dg30kVU2Q3amHHyTn1jEdhCIE59ANed76GaT1Vp76DD3ZWSAxgCrw6wJ0TqeoBpqmfUHiUDPs//HQ==", "integrity": "sha512-UEyp8LwZ4Dg30kVU2Q3amHHyTn1jEdhCIE59ANed76GaT1Vp76DD3ZWSAxgCrw6wJ0TqeoBpqmfUHiUDPs//HQ==",
"dev": true "dev": true
}, },
"@types/pump": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/@types/pump/-/pump-1.1.0.tgz",
"integrity": "sha512-YGGbsqf5o7sF8gGANP8ZYxgaRGlFgEAImx5tCvA4YKRCfqbsDQZO48UmWynZzSjbhn0ZWSlsWOcb5NwvOx8KcQ==",
"requires": {
"@types/node": "*"
}
},
"@types/qs": { "@types/qs": {
"version": "6.9.5", "version": "6.9.5",
"resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.5.tgz", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.5.tgz",
@ -3278,7 +3286,6 @@
"version": "1.4.4", "version": "1.4.4",
"resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz",
"integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==", "integrity": "sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==",
"dev": true,
"requires": { "requires": {
"once": "^1.4.0" "once": "^1.4.0"
} }
@ -7367,7 +7374,6 @@
"version": "1.4.0", "version": "1.4.0",
"resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
"integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
"dev": true,
"requires": { "requires": {
"wrappy": "1" "wrappy": "1"
} }
@ -7780,7 +7786,6 @@
"version": "3.0.0", "version": "3.0.0",
"resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz",
"integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==",
"dev": true,
"requires": { "requires": {
"end-of-stream": "^1.1.0", "end-of-stream": "^1.1.0",
"once": "^1.3.1" "once": "^1.3.1"
@ -10432,8 +10437,7 @@
"wrappy": { "wrappy": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
"dev": true
}, },
"write": { "write": {
"version": "1.0.3", "version": "1.0.3",

View File

@ -83,6 +83,7 @@
"@types/mime-types": "^2.1.0", "@types/mime-types": "^2.1.0",
"@types/n3": "^1.4.4", "@types/n3": "^1.4.4",
"@types/node": "^14.10.2", "@types/node": "^14.10.2",
"@types/pump": "^1.1.0",
"@types/rdf-js": "^4.0.0", "@types/rdf-js": "^4.0.0",
"@types/sparqljs": "^3.1.0", "@types/sparqljs": "^3.1.0",
"@types/streamify-array": "^1.0.0", "@types/streamify-array": "^1.0.0",
@ -99,6 +100,7 @@
"handlebars": "^4.7.6", "handlebars": "^4.7.6",
"mime-types": "^2.1.27", "mime-types": "^2.1.27",
"n3": "^1.8.0", "n3": "^1.8.0",
"pump": "^3.0.0",
"rdf-parse": "^1.7.0", "rdf-parse": "^1.7.0",
"rdf-serialize": "^1.1.0", "rdf-serialize": "^1.1.0",
"rdf-terms": "^1.5.1", "rdf-terms": "^1.5.1",

View File

@ -35,8 +35,10 @@ export function isGuarded<T extends NodeJS.EventEmitter>(stream: T): stream is G
* See https://github.com/solid/community-server/pull/462#issuecomment-758013492 . * See https://github.com/solid/community-server/pull/462#issuecomment-758013492 .
*/ */
function guardingErrorListener(this: Guarded, error: Error): void { function guardingErrorListener(this: Guarded, error: Error): void {
// Only fall back to this if no other listeners are attached // Only fall back to this if no new listeners are attached since guarding started.
if (this.listenerCount('error') === 1) { // Not storing the index when guarding starts since listeners could be removed.
const idx = this.listeners('error').indexOf(guardingErrorListener);
if (idx === this.listenerCount('error') - 1) {
this[guardedErrors].push(error); this[guardedErrors].push(error);
if (!this[guardedTimeout]) { if (!this[guardedTimeout]) {
this[guardedTimeout] = setTimeout((): void => { this[guardedTimeout] = setTimeout((): void => {
@ -50,8 +52,8 @@ function guardingErrorListener(this: Guarded, error: Error): void {
/** /**
* Callback that is used when a new listener is attached and there are errors that were not emitted yet. * Callback that is used when a new listener is attached and there are errors that were not emitted yet.
*/ */
function emitStoredErrors(this: Guarded, event: string): void { function emitStoredErrors(this: Guarded, event: string, func: (error: Error) => void): void {
if (event === 'error') { if (event === 'error' && func !== guardingErrorListener) {
// Cancel an error timeout // Cancel an error timeout
if (this[guardedTimeout]) { if (this[guardedTimeout]) {
clearTimeout(this[guardedTimeout]!); clearTimeout(this[guardedTimeout]!);
@ -74,7 +76,13 @@ function emitStoredErrors(this: Guarded, event: string): void {
/** /**
* Makes sure that listeners always receive the error event of a stream, * Makes sure that listeners always receive the error event of a stream,
* even if it was thrown before the listener was attached. * even if it was thrown before the listener was attached.
* If the input is already guarded nothing will happen. *
* When guarding a stream it is assumed that error listeners already attached should be ignored,
* only error listeners attached after the stream is guarded will prevent an error from being logged.
*
* If the input is already guarded the guard will be reset,
* which means ignoring error listeners already attached.
*
* @param stream - Stream that can potentially throw an error. * @param stream - Stream that can potentially throw an error.
* *
* @returns The stream. * @returns The stream.
@ -85,6 +93,10 @@ export function guardStream<T extends NodeJS.EventEmitter>(stream: T): Guarded<T
guarded[guardedErrors] = []; guarded[guardedErrors] = [];
guarded.on('error', guardingErrorListener); guarded.on('error', guardingErrorListener);
guarded.on('newListener', emitStoredErrors); guarded.on('newListener', emitStoredErrors);
} else {
// This makes sure the guarding error listener is the last one in the list again
guarded.removeListener('error', guardingErrorListener);
guarded.on('error', guardingErrorListener);
} }
return guarded; return guarded;
} }

View File

@ -1,6 +1,7 @@
import type { Writable, ReadableOptions, DuplexOptions } from 'stream'; import type { Writable, ReadableOptions, DuplexOptions } from 'stream';
import { Readable, Transform } from 'stream'; import { Readable, Transform } from 'stream';
import arrayifyStream from 'arrayify-stream'; import arrayifyStream from 'arrayify-stream';
import pump from 'pump';
import { getLoggerFor } from '../logging/LogUtil'; import { getLoggerFor } from '../logging/LogUtil';
import type { Guarded } from './GuardedStream'; import type { Guarded } from './GuardedStream';
import { guardStream } from './GuardedStream'; import { guardStream } from './GuardedStream';
@ -29,27 +30,16 @@ export async function readableToString(stream: Readable): Promise<string> {
*/ */
export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream, destination: T, export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
mapError?: (error: Error) => Error): Guarded<T> { mapError?: (error: Error) => Error): Guarded<T> {
// Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error // In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener
readable.pipe(destination); pump(readable, destination, (error): void => {
readable.on('error', (error): void => { if (error) {
logger.warn(`Piped stream errored with ${error.message}`); logger.warn(`Piped stream errored with ${error.message}`);
// Make sure the final error can be handled in a normal streaming fashion
// From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options : destination.emit('error', mapError ? mapError(error) : error);
// "One important caveat is that if the Readable stream emits an error during processing, the Writable destination }
// is not closed automatically. If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks."
destination.destroy(mapError ? mapError(error) : error);
}); });
// Guarding the stream now means the internal error listeners of pump will be ignored
// Make sure we have no dangling streams in case of unpiping, which can happen if there's an error. // when checking if there is a valid error listener.
// This can also happen if a stream naturally closes so most calls here will not be indication of a problem.
// From https://nodejs.org/api/stream.html#stream_errors_while_writing :
// "If a Readable stream pipes into a Writable stream when Writable emits an error,
// the Readable stream will be unpiped."
destination.on('unpipe', (source): void => {
source.destroy();
});
return guardStream(destination); return guardStream(destination);
} }

View File

@ -89,7 +89,7 @@ describe('GuardedStream', (): void => {
expect(endListener).toHaveBeenCalledTimes(0); expect(endListener).toHaveBeenCalledTimes(0);
}); });
it('does not time out when a listener was already attached.', async(): Promise<void> => { it('ignores error listeners that were already attached.', async(): Promise<void> => {
const stream = Readable.from([ 'data' ]); const stream = Readable.from([ 'data' ]);
stream.addListener('error', jest.fn()); stream.addListener('error', jest.fn());
guardStream(stream); guardStream(stream);
@ -97,7 +97,21 @@ describe('GuardedStream', (): void => {
stream.emit('error', new Error('error')); stream.emit('error', new Error('error'));
jest.advanceTimersByTime(1000); jest.advanceTimersByTime(1000);
expect(logger.error).toHaveBeenCalledTimes(0); expect(logger.error).toHaveBeenCalledTimes(1);
});
it('ignores error listeners after calling guardStream a second time.', async(): Promise<void> => {
const stream = Readable.from([ 'data' ]);
guardStream(stream);
stream.addListener('error', jest.fn());
// This will cause the above error listener to be ignored for logging purposes
guardStream(stream);
stream.emit('error', new Error('error'));
jest.advanceTimersByTime(1000);
expect(logger.error).toHaveBeenCalledTimes(1);
}); });
it('still works if error listeners get removed and added again.', async(): Promise<void> => { it('still works if error listeners get removed and added again.', async(): Promise<void> => {