From 953458231b4b7149056cf6fe6887a1eef7a87737 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Thu, 8 Apr 2021 13:21:03 +0200 Subject: [PATCH] fix: Prevent HttpRequest from being closed In case a stream the request is being piped into closes, we don't want to close the request since it shares a socket with the response. --- src/server/HttpRequest.ts | 7 ++++ src/util/StreamUtil.ts | 33 ++++++++++++----- test/integration/ServerFetch.test.ts | 12 +++++++ test/unit/server/HttpRequest.test.ts | 10 ++++++ test/unit/util/StreamUtil.test.ts | 54 ++++++++++++++++++++++++++-- 5 files changed, 106 insertions(+), 10 deletions(-) create mode 100644 test/unit/server/HttpRequest.test.ts diff --git a/src/server/HttpRequest.ts b/src/server/HttpRequest.ts index 1ba1dc48b..a071f4d46 100644 --- a/src/server/HttpRequest.ts +++ b/src/server/HttpRequest.ts @@ -5,3 +5,10 @@ import type { Guarded } from '../util/GuardedStream'; * An incoming HTTP request; */ export type HttpRequest = Guarded; + +/** + * Checks if the given stream is an HttpRequest. + */ +export function isHttpRequest(stream: any): stream is HttpRequest { + return typeof stream.socket === 'object' && typeof stream.url === 'string' && typeof stream.method === 'string'; +} diff --git a/src/util/StreamUtil.ts b/src/util/StreamUtil.ts index 2eb536e70..a0a18030e 100644 --- a/src/util/StreamUtil.ts +++ b/src/util/StreamUtil.ts @@ -3,6 +3,7 @@ import { Readable, Transform } from 'stream'; import arrayifyStream from 'arrayify-stream'; import pump from 'pump'; import { getLoggerFor } from '../logging/LogUtil'; +import { isHttpRequest } from '../server/HttpRequest'; import type { Guarded } from './GuardedStream'; import { guardStream } from './GuardedStream'; @@ -30,14 +31,30 @@ export async function readableToString(stream: Readable): Promise { */ export function pipeSafely(readable: NodeJS.ReadableStream, destination: T, mapError?: (error: Error) => Error): Guarded { - // In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener - pump(readable, destination, (error): void => { - if (error) { - logger.warn(`Piped stream errored with ${error.message}`); - // Make sure the final error can be handled in a normal streaming fashion - destination.emit('error', mapError ? mapError(error) : error); - } - }); + // We never want to closes the incoming HttpRequest if there is an error + // since that also closes the outgoing HttpResponse. + // Since `pump` sends stream errors both up and down the pipe chain, + // in this case we need to make sure the error only goes down the chain. + if (isHttpRequest(readable)) { + readable.pipe(destination); + readable.on('error', (error): void => { + logger.warn(`HttpRequest errored with ${error.message}`); + // From https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options : + // One important caveat is that if the Readable stream emits an error during processing, + // the Writable destination is not closed automatically. If an error occurs, + // it will be necessary to manually close each stream in order to prevent memory leaks. + destination.destroy(mapError ? mapError(error) : error); + }); + } else { + // In case the input readable is guarded, it will no longer log errors since `pump` attaches a new error listener + pump(readable, destination, (error): void => { + if (error) { + logger.warn(`Piped stream errored with ${error.message}`); + // Make sure the final error can be handled in a normal streaming fashion + destination.emit('error', mapError ? mapError(error) : error); + } + }); + } // Guarding the stream now means the internal error listeners of pump will be ignored // when checking if there is a valid error listener. return guardStream(destination); diff --git a/test/integration/ServerFetch.test.ts b/test/integration/ServerFetch.test.ts index e492468e6..3daa07b0a 100644 --- a/test/integration/ServerFetch.test.ts +++ b/test/integration/ServerFetch.test.ts @@ -68,6 +68,18 @@ describe('A Solid server', (): void => { expect(res.status).toBe(205); }); + it('can handle PUT errors.', async(): Promise => { + // There was a specific case where the following request caused the connection to close instead of error + const res = await fetch(baseUrl, { + method: 'PUT', + headers: { + 'content-type': 'text/plain', + }, + body: '"test"', + }); + expect(res.status).toBe(400); + }); + it('can POST to create a container.', async(): Promise => { const res = await fetch(baseUrl, { method: 'POST', diff --git a/test/unit/server/HttpRequest.test.ts b/test/unit/server/HttpRequest.test.ts new file mode 100644 index 000000000..461bcce85 --- /dev/null +++ b/test/unit/server/HttpRequest.test.ts @@ -0,0 +1,10 @@ +import { isHttpRequest } from '../../../src/server/HttpRequest'; + +describe('HttpRequest', (): void => { + describe('#isHttpRequest', (): void => { + it('can identify HttpRequests.', async(): Promise => { + expect(isHttpRequest({})).toBe(false); + expect(isHttpRequest({ socket: {}, method: 'GET', url: '/url' })).toBe(true); + }); + }); +}); diff --git a/test/unit/util/StreamUtil.test.ts b/test/unit/util/StreamUtil.test.ts index 70bfeb448..be3e0b1d6 100644 --- a/test/unit/util/StreamUtil.test.ts +++ b/test/unit/util/StreamUtil.test.ts @@ -1,8 +1,13 @@ import { PassThrough } from 'stream'; import arrayifyStream from 'arrayify-stream'; import streamifyArray from 'streamify-array'; +import { isHttpRequest } from '../../../src/server/HttpRequest'; import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil'; +jest.mock('../../../src/server/HttpRequest', (): any => ({ + isHttpRequest: jest.fn(), +})); + describe('StreamUtil', (): void => { describe('#readableToString', (): void => { it('concatenates all elements of a Readable.', async(): Promise => { @@ -12,6 +17,10 @@ describe('StreamUtil', (): void => { }); describe('#pipeSafely', (): void => { + beforeEach(async(): Promise => { + (isHttpRequest as unknown as jest.Mock).mockClear(); + }); + it('pipes data from one stream to the other.', async(): Promise => { const input = streamifyArray([ 'data' ]); const output = new PassThrough(); @@ -56,6 +65,47 @@ describe('StreamUtil', (): void => { await new Promise(setImmediate); expect(input.destroyed).toBe(true); }); + + it('does not destroy the source stream if it is an HttpRequest.', async(): Promise => { + (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); + const input = new PassThrough(); + const output = new PassThrough(); + const piped = pipeSafely(input, output); + + // Catch errors to prevent problems in test output + output.on('error', (): void => { + // Empty + }); + + piped.destroy(new Error('error!')); + // Allow events to propagate + await new Promise(setImmediate); + expect(input.destroyed).toBe(false); + }); + + it('still sends errors downstream if the input is an HttpRequest.', async(): Promise => { + (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); + const input = new PassThrough(); + input.read = (): any => { + input.emit('error', new Error('error')); + return null; + }; + const output = new PassThrough(); + const piped = pipeSafely(input, output); + await expect(readableToString(piped)).rejects.toThrow('error'); + }); + + it('can map errors if the input is an HttpRequest.', async(): Promise => { + (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); + const input = streamifyArray([ 'data' ]); + input.read = (): any => { + input.emit('error', new Error('error')); + return null; + }; + const output = new PassThrough(); + const piped = pipeSafely(input, output, (): any => new Error('other error')); + await expect(readableToString(piped)).rejects.toThrow('other error'); + }); }); describe('#transformSafely', (): void => { @@ -158,8 +208,8 @@ describe('StreamUtil', (): void => { describe('#guardedStreamFrom', (): void => { it('converts data to a guarded stream.', async(): Promise => { - const data = [ 'a', 'b' ]; - await expect(readableToString(guardedStreamFrom(data))).resolves.toBe('ab'); + await expect(readableToString(guardedStreamFrom([ 'a', 'b' ]))).resolves.toBe('ab'); + await expect(readableToString(guardedStreamFrom('ab'))).resolves.toBe('ab'); }); }); });