From 95ab0b4e760107a06a641b86faac7b385a8b1440 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Tue, 10 Nov 2020 16:02:49 +0100 Subject: [PATCH] refactor: Make piping consistent --- src/ldp/http/BasicResponseWriter.ts | 6 ++++- src/ldp/http/SparqlUpdateBodyParser.ts | 8 +++---- src/storage/conversion/RdfToQuadConverter.ts | 6 ++--- src/util/MetadataController.ts | 6 ++--- src/util/Util.ts | 16 +++++++++---- .../unit/ldp/http/BasicResponseWriter.test.ts | 23 +++++++++++++++++++ test/unit/util/Util.test.ts | 16 ++++++------- 7 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/ldp/http/BasicResponseWriter.ts b/src/ldp/http/BasicResponseWriter.ts index 7da7272b0..64bc1a766 100644 --- a/src/ldp/http/BasicResponseWriter.ts +++ b/src/ldp/http/BasicResponseWriter.ts @@ -2,6 +2,7 @@ import { getLoggerFor } from '../../logging/LogUtil'; import type { HttpResponse } from '../../server/HttpResponse'; import { INTERNAL_QUADS } from '../../util/ContentTypes'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; +import { pipeSafe } from '../../util/Util'; import type { MetadataWriter } from './metadata/MetadataWriter'; import type { ResponseDescription } from './response/ResponseDescription'; import { ResponseWriter } from './ResponseWriter'; @@ -33,7 +34,10 @@ export class BasicResponseWriter extends ResponseWriter { input.response.writeHead(input.result.statusCode); if (input.result.data) { - input.result.data.pipe(input.response); + const pipe = pipeSafe(input.result.data, input.response); + pipe.on('error', (error): void => { + this.logger.error(`Writing to HttpResponse failed with message ${error.message}`); + }); } else { // If there is input data the response will end once the input stream ends input.response.end(); diff --git a/src/ldp/http/SparqlUpdateBodyParser.ts b/src/ldp/http/SparqlUpdateBodyParser.ts index aa7f8eae0..f2dcd58ec 100644 --- a/src/ldp/http/SparqlUpdateBodyParser.ts +++ b/src/ldp/http/SparqlUpdateBodyParser.ts @@ -5,7 +5,7 @@ import { getLoggerFor } from '../../logging/LogUtil'; import { APPLICATION_SPARQL_UPDATE } from '../../util/ContentTypes'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError'; -import { pipeStreamsAndErrors, readableToString } from '../../util/Util'; +import { pipeSafe, readableToString } from '../../util/Util'; import type { BodyParserArgs } from './BodyParser'; import { BodyParser } from './BodyParser'; import type { SparqlUpdatePatch } from './SparqlUpdatePatch'; @@ -29,10 +29,8 @@ export class SparqlUpdateBodyParser extends BodyParser { // Note that readableObjectMode is only defined starting from Node 12 // It is impossible to check if object mode is enabled in Node 10 (without accessing private variables) const options = { objectMode: request.readableObjectMode }; - const toAlgebraStream = new PassThrough(options); - const dataCopy = new PassThrough(options); - pipeStreamsAndErrors(request, toAlgebraStream); - pipeStreamsAndErrors(request, dataCopy); + const toAlgebraStream = pipeSafe(request, new PassThrough(options)); + const dataCopy = pipeSafe(request, new PassThrough(options)); let algebra: Algebra.Operation; try { const sparql = await readableToString(toAlgebraStream); diff --git a/src/storage/conversion/RdfToQuadConverter.ts b/src/storage/conversion/RdfToQuadConverter.ts index 97795c3cb..7b215dfc3 100644 --- a/src/storage/conversion/RdfToQuadConverter.ts +++ b/src/storage/conversion/RdfToQuadConverter.ts @@ -5,7 +5,7 @@ import { RepresentationMetadata } from '../../ldp/representation/RepresentationM import { INTERNAL_QUADS } from '../../util/ContentTypes'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; import { CONTENT_TYPE } from '../../util/UriConstants'; -import { pipeStreamsAndErrors } from '../../util/Util'; +import { pipeSafe } from '../../util/Util'; import { checkRequest } from './ConversionUtil'; import type { RepresentationConverterArgs } from './RepresentationConverter'; import { TypedRepresentationConverter } from './TypedRepresentationConverter'; @@ -39,8 +39,8 @@ export class RdfToQuadConverter extends TypedRepresentationConverter { // Wrap the stream such that errors are transformed // (Node 10 requires both writableObjectMode and readableObjectMode) - const data = new PassThrough({ writableObjectMode: true, readableObjectMode: true }); - pipeStreamsAndErrors(rawQuads, data, (error): Error => new UnsupportedHttpError(error.message)); + const pass = new PassThrough({ writableObjectMode: true, readableObjectMode: true }); + const data = pipeSafe(rawQuads, pass, (error): Error => new UnsupportedHttpError(error.message)); return { binary: false, diff --git a/src/util/MetadataController.ts b/src/util/MetadataController.ts index 44e8fad53..3040d0f9e 100644 --- a/src/util/MetadataController.ts +++ b/src/util/MetadataController.ts @@ -7,7 +7,7 @@ import { RepresentationMetadata } from '../ldp/representation/RepresentationMeta import { TEXT_TURTLE } from './ContentTypes'; import { LDP, RDF } from './UriConstants'; import { toNamedNode } from './UriUtil'; -import { pipeStreamsAndErrors, pushQuad } from './Util'; +import { pipeSafe, pushQuad } from './Util'; export class MetadataController { /** @@ -46,7 +46,7 @@ export class MetadataController { * @returns The Readable object. */ public serializeQuads(quads: Quad[]): Readable { - return pipeStreamsAndErrors(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE })); + return pipeSafe(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE })); } /** @@ -56,6 +56,6 @@ export class MetadataController { * @returns A promise containing the array of quads. */ public async parseQuads(readable: Readable): Promise { - return await arrayifyStream(pipeStreamsAndErrors(readable, new StreamParser({ format: TEXT_TURTLE }))); + return await arrayifyStream(pipeSafe(readable, new StreamParser({ format: TEXT_TURTLE }))); } } diff --git a/src/util/Util.ts b/src/util/Util.ts index 679cc0983..fcdac7755 100644 --- a/src/util/Util.ts +++ b/src/util/Util.ts @@ -59,20 +59,26 @@ export const matchingMediaType = (mediaA: string, mediaB: string): boolean => { }; /** - * Pipes one stream into another. - * Makes sure an error of the first stream gets passed to the second. + * Pipes one stream into another and emits errors of the first stream with the second. + * In case of an error in the first stream the second one will be destroyed with the given error. * @param readable - Initial readable stream. * @param destination - The destination for writing data. * @param mapError - Optional function that takes the error and converts it to a new error. * * @returns The destination stream. */ -export const pipeStreamsAndErrors = (readable: NodeJS.ReadableStream, destination: T, +export const pipeSafe = (readable: NodeJS.ReadableStream, destination: T, mapError?: (error: Error) => Error): T => { + // Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error readable.pipe(destination); - readable.on('error', (error): boolean => { + readable.on('error', (error): void => { logger.warn(`Piped stream errored with ${error.message}`); - return destination.emit('error', mapError ? mapError(error) : error); + + // From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options : + // "One important caveat is that if the Readable stream emits an error during processing, the Writable destination + // is not closed automatically. If an error occurs, it will be necessary to manually close each stream + // in order to prevent memory leaks." + destination.destroy(mapError ? mapError(error) : error); }); return destination; }; diff --git a/test/unit/ldp/http/BasicResponseWriter.test.ts b/test/unit/ldp/http/BasicResponseWriter.test.ts index 90a0abf80..bcacc7e5d 100644 --- a/test/unit/ldp/http/BasicResponseWriter.test.ts +++ b/test/unit/ldp/http/BasicResponseWriter.test.ts @@ -1,4 +1,5 @@ import { EventEmitter } from 'events'; +import { PassThrough } from 'stream'; import type { MockResponse } from 'node-mocks-http'; import { createResponse } from 'node-mocks-http'; import streamifyArray from 'streamify-array'; @@ -66,4 +67,26 @@ describe('A BasicResponseWriter', (): void => { expect(response._isEndCalled()).toBeTruthy(); expect(response._getStatusCode()).toBe(201); }); + + it('can handle the data stream erroring.', async(): Promise => { + const data = new PassThrough(); + data.read = (): any => { + data.emit('error', new Error('bad data!')); + return null; + }; + result = { statusCode: 201, data }; + + response = new PassThrough(); + response.writeHead = jest.fn(); + + const end = new Promise((resolve): void => { + response.on('error', (error: Error): void => { + expect(error).toEqual(new Error('bad data!')); + resolve(); + }); + }); + + await expect(writer.handle({ response, result })).resolves.toBeUndefined(); + await end; + }); }); diff --git a/test/unit/util/Util.test.ts b/test/unit/util/Util.test.ts index 74ec279af..69adb90e4 100644 --- a/test/unit/util/Util.test.ts +++ b/test/unit/util/Util.test.ts @@ -8,7 +8,7 @@ import { decodeUriPathComponents, encodeUriPathComponents, ensureTrailingSlash, - matchingMediaType, pipeStreamsAndErrors, pushQuad, + matchingMediaType, pipeSafe, pushQuad, readableToString, toCanonicalUriPath, } from '../../../src/util/Util'; @@ -48,19 +48,19 @@ describe('Util function', (): void => { it('pipes data from one stream to the other.', async(): Promise => { const input = streamifyArray([ 'data' ]); const output = new PassThrough(); - pipeStreamsAndErrors(input, output); - await expect(readableToString(output)).resolves.toEqual('data'); + const piped = pipeSafe(input, output); + await expect(readableToString(piped)).resolves.toEqual('data'); }); it('pipes errors from one stream to the other.', async(): Promise => { - const input = streamifyArray([ 'data' ]); + const input = new PassThrough(); input.read = (): any => { input.emit('error', new Error('error')); return null; }; const output = new PassThrough(); - pipeStreamsAndErrors(input, output); - await expect(readableToString(output)).rejects.toThrow(new Error('error')); + const piped = pipeSafe(input, output); + await expect(readableToString(piped)).rejects.toThrow(new Error('error')); }); it('supports mapping errors to something else.', async(): Promise => { @@ -70,8 +70,8 @@ describe('Util function', (): void => { return null; }; const output = new PassThrough(); - pipeStreamsAndErrors(input, output, (): any => new Error('other error')); - await expect(readableToString(output)).rejects.toThrow(new Error('other error')); + const piped = pipeSafe(input, output, (): any => new Error('other error')); + await expect(readableToString(piped)).rejects.toThrow(new Error('other error')); }); });