fix: Prevent HttpRequest from being closed

In case a stream the request is being piped into closes,
we don't want to close the request since it shares a socket
with the response.
This commit is contained in:
Joachim Van Herwegen
2021-04-08 13:21:03 +02:00
parent 218c8f4662
commit 953458231b
5 changed files with 106 additions and 10 deletions

View File

@@ -5,3 +5,10 @@ import type { Guarded } from '../util/GuardedStream';
* An incoming HTTP request;
*/
export type HttpRequest = Guarded<IncomingMessage>;
/**
* Checks if the given stream is an HttpRequest.
*/
export function isHttpRequest(stream: any): stream is HttpRequest {
return typeof stream.socket === 'object' && typeof stream.url === 'string' && typeof stream.method === 'string';
}

View File

@@ -3,6 +3,7 @@ import { Readable, Transform } from 'stream';
import arrayifyStream from 'arrayify-stream';
import pump from 'pump';
import { getLoggerFor } from '../logging/LogUtil';
import { isHttpRequest } from '../server/HttpRequest';
import type { Guarded } from './GuardedStream';
import { guardStream } from './GuardedStream';
@@ -30,14 +31,30 @@ export async function readableToString(stream: Readable): Promise<string> {
*/
export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream, destination: T,
mapError?: (error: Error) => Error): Guarded<T> {
// In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener
pump(readable, destination, (error): void => {
if (error) {
logger.warn(`Piped stream errored with ${error.message}`);
// Make sure the final error can be handled in a normal streaming fashion
destination.emit('error', mapError ? mapError(error) : error);
}
});
// We never want to closes the incoming HttpRequest if there is an error
// since that also closes the outgoing HttpResponse.
// Since `pump` sends stream errors both up and down the pipe chain,
// in this case we need to make sure the error only goes down the chain.
if (isHttpRequest(readable)) {
readable.pipe(destination);
readable.on('error', (error): void => {
logger.warn(`HttpRequest errored with ${error.message}`);
// From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options :
// One important caveat is that if the Readable stream emits an error during processing,
// the Writable destination is not closed automatically. If an error occurs,
// it will be necessary to manually close each stream in order to prevent memory leaks.
destination.destroy(mapError ? mapError(error) : error);
});
} else {
// In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener
pump(readable, destination, (error): void => {
if (error) {
logger.warn(`Piped stream errored with ${error.message}`);
// Make sure the final error can be handled in a normal streaming fashion
destination.emit('error', mapError ? mapError(error) : error);
}
});
}
// Guarding the stream now means the internal error listeners of pump will be ignored
// when checking if there is a valid error listener.
return guardStream(destination);

View File

@@ -68,6 +68,18 @@ describe('A Solid server', (): void => {
expect(res.status).toBe(205);
});
it('can handle PUT errors.', async(): Promise<void> => {
// There was a specific case where the following request caused the connection to close instead of error
const res = await fetch(baseUrl, {
method: 'PUT',
headers: {
'content-type': 'text/plain',
},
body: '"test"',
});
expect(res.status).toBe(400);
});
it('can POST to create a container.', async(): Promise<void> => {
const res = await fetch(baseUrl, {
method: 'POST',

View File

@@ -0,0 +1,10 @@
import { isHttpRequest } from '../../../src/server/HttpRequest';
describe('HttpRequest', (): void => {
describe('#isHttpRequest', (): void => {
it('can identify HttpRequests.', async(): Promise<void> => {
expect(isHttpRequest({})).toBe(false);
expect(isHttpRequest({ socket: {}, method: 'GET', url: '/url' })).toBe(true);
});
});
});

View File

@@ -1,8 +1,13 @@
import { PassThrough } from 'stream';
import arrayifyStream from 'arrayify-stream';
import streamifyArray from 'streamify-array';
import { isHttpRequest } from '../../../src/server/HttpRequest';
import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil';
jest.mock('../../../src/server/HttpRequest', (): any => ({
isHttpRequest: jest.fn(),
}));
describe('StreamUtil', (): void => {
describe('#readableToString', (): void => {
it('concatenates all elements of a Readable.', async(): Promise<void> => {
@@ -12,6 +17,10 @@ describe('StreamUtil', (): void => {
});
describe('#pipeSafely', (): void => {
beforeEach(async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockClear();
});
it('pipes data from one stream to the other.', async(): Promise<void> => {
const input = streamifyArray([ 'data' ]);
const output = new PassThrough();
@@ -56,6 +65,47 @@ describe('StreamUtil', (): void => {
await new Promise(setImmediate);
expect(input.destroyed).toBe(true);
});
it('does not destroy the source stream if it is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = new PassThrough();
const output = new PassThrough();
const piped = pipeSafely(input, output);
// Catch errors to prevent problems in test output
output.on('error', (): void => {
// Empty
});
piped.destroy(new Error('error!'));
// Allow events to propagate
await new Promise(setImmediate);
expect(input.destroyed).toBe(false);
});
it('still sends errors downstream if the input is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = new PassThrough();
input.read = (): any => {
input.emit('error', new Error('error'));
return null;
};
const output = new PassThrough();
const piped = pipeSafely(input, output);
await expect(readableToString(piped)).rejects.toThrow('error');
});
it('can map errors if the input is an HttpRequest.', async(): Promise<void> => {
(isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true);
const input = streamifyArray([ 'data' ]);
input.read = (): any => {
input.emit('error', new Error('error'));
return null;
};
const output = new PassThrough();
const piped = pipeSafely(input, output, (): any => new Error('other error'));
await expect(readableToString(piped)).rejects.toThrow('other error');
});
});
describe('#transformSafely', (): void => {
@@ -158,8 +208,8 @@ describe('StreamUtil', (): void => {
describe('#guardedStreamFrom', (): void => {
it('converts data to a guarded stream.', async(): Promise<void> => {
const data = [ 'a', 'b' ];
await expect(readableToString(guardedStreamFrom(data))).resolves.toBe('ab');
await expect(readableToString(guardedStreamFrom([ 'a', 'b' ]))).resolves.toBe('ab');
await expect(readableToString(guardedStreamFrom('ab'))).resolves.toBe('ab');
});
});
});