refactor: Make piping consistent

This commit is contained in:
Joachim Van Herwegen 2020-11-10 16:02:49 +01:00
parent 715ba126f9
commit 95ab0b4e76
7 changed files with 56 additions and 25 deletions

View File

@ -2,6 +2,7 @@ import { getLoggerFor } from '../../logging/LogUtil';
import type { HttpResponse } from '../../server/HttpResponse'; import type { HttpResponse } from '../../server/HttpResponse';
import { INTERNAL_QUADS } from '../../util/ContentTypes'; import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { pipeSafe } from '../../util/Util';
import type { MetadataWriter } from './metadata/MetadataWriter'; import type { MetadataWriter } from './metadata/MetadataWriter';
import type { ResponseDescription } from './response/ResponseDescription'; import type { ResponseDescription } from './response/ResponseDescription';
import { ResponseWriter } from './ResponseWriter'; import { ResponseWriter } from './ResponseWriter';
@ -33,7 +34,10 @@ export class BasicResponseWriter extends ResponseWriter {
input.response.writeHead(input.result.statusCode); input.response.writeHead(input.result.statusCode);
if (input.result.data) { if (input.result.data) {
input.result.data.pipe(input.response); const pipe = pipeSafe(input.result.data, input.response);
pipe.on('error', (error): void => {
this.logger.error(`Writing to HttpResponse failed with message ${error.message}`);
});
} else { } else {
// If there is input data the response will end once the input stream ends // If there is input data the response will end once the input stream ends
input.response.end(); input.response.end();

View File

@ -5,7 +5,7 @@ import { getLoggerFor } from '../../logging/LogUtil';
import { APPLICATION_SPARQL_UPDATE } from '../../util/ContentTypes'; import { APPLICATION_SPARQL_UPDATE } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError'; import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { pipeStreamsAndErrors, readableToString } from '../../util/Util'; import { pipeSafe, readableToString } from '../../util/Util';
import type { BodyParserArgs } from './BodyParser'; import type { BodyParserArgs } from './BodyParser';
import { BodyParser } from './BodyParser'; import { BodyParser } from './BodyParser';
import type { SparqlUpdatePatch } from './SparqlUpdatePatch'; import type { SparqlUpdatePatch } from './SparqlUpdatePatch';
@ -29,10 +29,8 @@ export class SparqlUpdateBodyParser extends BodyParser {
// Note that readableObjectMode is only defined starting from Node 12 // Note that readableObjectMode is only defined starting from Node 12
// It is impossible to check if object mode is enabled in Node 10 (without accessing private variables) // It is impossible to check if object mode is enabled in Node 10 (without accessing private variables)
const options = { objectMode: request.readableObjectMode }; const options = { objectMode: request.readableObjectMode };
const toAlgebraStream = new PassThrough(options); const toAlgebraStream = pipeSafe(request, new PassThrough(options));
const dataCopy = new PassThrough(options); const dataCopy = pipeSafe(request, new PassThrough(options));
pipeStreamsAndErrors(request, toAlgebraStream);
pipeStreamsAndErrors(request, dataCopy);
let algebra: Algebra.Operation; let algebra: Algebra.Operation;
try { try {
const sparql = await readableToString(toAlgebraStream); const sparql = await readableToString(toAlgebraStream);

View File

@ -5,7 +5,7 @@ import { RepresentationMetadata } from '../../ldp/representation/RepresentationM
import { INTERNAL_QUADS } from '../../util/ContentTypes'; import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError'; import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { CONTENT_TYPE } from '../../util/UriConstants'; import { CONTENT_TYPE } from '../../util/UriConstants';
import { pipeStreamsAndErrors } from '../../util/Util'; import { pipeSafe } from '../../util/Util';
import { checkRequest } from './ConversionUtil'; import { checkRequest } from './ConversionUtil';
import type { RepresentationConverterArgs } from './RepresentationConverter'; import type { RepresentationConverterArgs } from './RepresentationConverter';
import { TypedRepresentationConverter } from './TypedRepresentationConverter'; import { TypedRepresentationConverter } from './TypedRepresentationConverter';
@ -39,8 +39,8 @@ export class RdfToQuadConverter extends TypedRepresentationConverter {
// Wrap the stream such that errors are transformed // Wrap the stream such that errors are transformed
// (Node 10 requires both writableObjectMode and readableObjectMode) // (Node 10 requires both writableObjectMode and readableObjectMode)
const data = new PassThrough({ writableObjectMode: true, readableObjectMode: true }); const pass = new PassThrough({ writableObjectMode: true, readableObjectMode: true });
pipeStreamsAndErrors(rawQuads, data, (error): Error => new UnsupportedHttpError(error.message)); const data = pipeSafe(rawQuads, pass, (error): Error => new UnsupportedHttpError(error.message));
return { return {
binary: false, binary: false,

View File

@ -7,7 +7,7 @@ import { RepresentationMetadata } from '../ldp/representation/RepresentationMeta
import { TEXT_TURTLE } from './ContentTypes'; import { TEXT_TURTLE } from './ContentTypes';
import { LDP, RDF } from './UriConstants'; import { LDP, RDF } from './UriConstants';
import { toNamedNode } from './UriUtil'; import { toNamedNode } from './UriUtil';
import { pipeStreamsAndErrors, pushQuad } from './Util'; import { pipeSafe, pushQuad } from './Util';
export class MetadataController { export class MetadataController {
/** /**
@ -46,7 +46,7 @@ export class MetadataController {
* @returns The Readable object. * @returns The Readable object.
*/ */
public serializeQuads(quads: Quad[]): Readable { public serializeQuads(quads: Quad[]): Readable {
return pipeStreamsAndErrors(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE })); return pipeSafe(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE }));
} }
/** /**
@ -56,6 +56,6 @@ export class MetadataController {
* @returns A promise containing the array of quads. * @returns A promise containing the array of quads.
*/ */
public async parseQuads(readable: Readable): Promise<Quad[]> { public async parseQuads(readable: Readable): Promise<Quad[]> {
return await arrayifyStream(pipeStreamsAndErrors(readable, new StreamParser({ format: TEXT_TURTLE }))); return await arrayifyStream(pipeSafe(readable, new StreamParser({ format: TEXT_TURTLE })));
} }
} }

View File

@ -59,20 +59,26 @@ export const matchingMediaType = (mediaA: string, mediaB: string): boolean => {
}; };
/** /**
* Pipes one stream into another. * Pipes one stream into another and emits errors of the first stream with the second.
* Makes sure an error of the first stream gets passed to the second. * In case of an error in the first stream the second one will be destroyed with the given error.
* @param readable - Initial readable stream. * @param readable - Initial readable stream.
* @param destination - The destination for writing data. * @param destination - The destination for writing data.
* @param mapError - Optional function that takes the error and converts it to a new error. * @param mapError - Optional function that takes the error and converts it to a new error.
* *
* @returns The destination stream. * @returns The destination stream.
*/ */
export const pipeStreamsAndErrors = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T, export const pipeSafe = <T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
mapError?: (error: Error) => Error): T => { mapError?: (error: Error) => Error): T => {
// Not using `stream.pipeline` since the result there only emits an error event if the last stream has the error
readable.pipe(destination); readable.pipe(destination);
readable.on('error', (error): boolean => { readable.on('error', (error): void => {
logger.warn(`Piped stream errored with ${error.message}`); logger.warn(`Piped stream errored with ${error.message}`);
return destination.emit('error', mapError ? mapError(error) : error);
// From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options :
// "One important caveat is that if the Readable stream emits an error during processing, the Writable destination
// is not closed automatically. If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks."
destination.destroy(mapError ? mapError(error) : error);
}); });
return destination; return destination;
}; };

View File

@ -1,4 +1,5 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { PassThrough } from 'stream';
import type { MockResponse } from 'node-mocks-http'; import type { MockResponse } from 'node-mocks-http';
import { createResponse } from 'node-mocks-http'; import { createResponse } from 'node-mocks-http';
import streamifyArray from 'streamify-array'; import streamifyArray from 'streamify-array';
@ -66,4 +67,26 @@ describe('A BasicResponseWriter', (): void => {
expect(response._isEndCalled()).toBeTruthy(); expect(response._isEndCalled()).toBeTruthy();
expect(response._getStatusCode()).toBe(201); expect(response._getStatusCode()).toBe(201);
}); });
it('can handle the data stream erroring.', async(): Promise<void> => {
const data = new PassThrough();
data.read = (): any => {
data.emit('error', new Error('bad data!'));
return null;
};
result = { statusCode: 201, data };
response = new PassThrough();
response.writeHead = jest.fn();
const end = new Promise((resolve): void => {
response.on('error', (error: Error): void => {
expect(error).toEqual(new Error('bad data!'));
resolve();
});
});
await expect(writer.handle({ response, result })).resolves.toBeUndefined();
await end;
});
}); });

View File

@ -8,7 +8,7 @@ import {
decodeUriPathComponents, decodeUriPathComponents,
encodeUriPathComponents, encodeUriPathComponents,
ensureTrailingSlash, ensureTrailingSlash,
matchingMediaType, pipeStreamsAndErrors, pushQuad, matchingMediaType, pipeSafe, pushQuad,
readableToString, readableToString,
toCanonicalUriPath, toCanonicalUriPath,
} from '../../../src/util/Util'; } from '../../../src/util/Util';
@ -48,19 +48,19 @@ describe('Util function', (): void => {
it('pipes data from one stream to the other.', async(): Promise<void> => { it('pipes data from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]); const input = streamifyArray([ 'data' ]);
const output = new PassThrough(); const output = new PassThrough();
pipeStreamsAndErrors(input, output); const piped = pipeSafe(input, output);
await expect(readableToString(output)).resolves.toEqual('data'); await expect(readableToString(piped)).resolves.toEqual('data');
}); });
it('pipes errors from one stream to the other.', async(): Promise<void> => { it('pipes errors from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]); const input = new PassThrough();
input.read = (): any => { input.read = (): any => {
input.emit('error', new Error('error')); input.emit('error', new Error('error'));
return null; return null;
}; };
const output = new PassThrough(); const output = new PassThrough();
pipeStreamsAndErrors(input, output); const piped = pipeSafe(input, output);
await expect(readableToString(output)).rejects.toThrow(new Error('error')); await expect(readableToString(piped)).rejects.toThrow(new Error('error'));
}); });
it('supports mapping errors to something else.', async(): Promise<void> => { it('supports mapping errors to something else.', async(): Promise<void> => {
@ -70,8 +70,8 @@ describe('Util function', (): void => {
return null; return null;
}; };
const output = new PassThrough(); const output = new PassThrough();
pipeStreamsAndErrors(input, output, (): any => new Error('other error')); const piped = pipeSafe(input, output, (): any => new Error('other error'));
await expect(readableToString(output)).rejects.toThrow(new Error('other error')); await expect(readableToString(piped)).rejects.toThrow(new Error('other error'));
}); });
}); });