import { PassThrough, Readable } from 'stream'; import arrayifyStream from 'arrayify-stream'; import type { Logger } from '../../../src/logging/Logger'; import { getLoggerFor } from '../../../src/logging/LogUtil'; import { isHttpRequest } from '../../../src/server/HttpRequest'; import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil'; jest.mock('../../../src/logging/LogUtil', (): any => { const logger: Logger = { warn: jest.fn(), log: jest.fn() } as any; return { getLoggerFor: (): Logger => logger }; }); const logger: jest.Mocked = getLoggerFor('StreamUtil') as any; jest.mock('../../../src/server/HttpRequest', (): any => ({ isHttpRequest: jest.fn(), })); describe('StreamUtil', (): void => { describe('#readableToString', (): void => { it('concatenates all elements of a Readable.', async(): Promise => { const stream = Readable.from([ 'a', 'b', 'c' ]); await expect(readableToString(stream)).resolves.toEqual('abc'); }); }); describe('#pipeSafely', (): void => { beforeEach(async(): Promise => { jest.clearAllMocks(); }); it('pipes data from one stream to the other.', async(): Promise => { const input = Readable.from([ 'data' ]); const output = new PassThrough(); const piped = pipeSafely(input, output); await expect(readableToString(piped)).resolves.toEqual('data'); }); it('pipes errors from one stream to the other.', async(): Promise => { const input = new PassThrough(); input.read = (): any => { input.emit('error', new Error('error')); return null; }; const output = new PassThrough(); const piped = pipeSafely(input, output); await expect(readableToString(piped)).rejects.toThrow('error'); expect(logger.log).toHaveBeenCalledTimes(1); expect(logger.log).toHaveBeenLastCalledWith('warn', 'Piped stream errored with error'); }); it('supports mapping errors to something else.', async(): Promise => { const input = Readable.from([ 'data' ]); input.read = (): any => { input.emit('error', new Error('error')); return null; }; const output = new PassThrough(); const piped = pipeSafely(input, output, (): any => new Error('other error')); await expect(readableToString(piped)).rejects.toThrow('other error'); }); it('logs specific safer errors as debug.', async(): Promise => { const input = Readable.from([ 'data' ]); input.read = (): any => { input.emit('error', new Error('Cannot call write after a stream was destroyed')); return null; }; const output = new PassThrough(); const piped = pipeSafely(input, output); await expect(readableToString(piped)).rejects.toThrow('Cannot call write after a stream was destroyed'); expect(logger.log).toHaveBeenCalledTimes(1); expect(logger.log).toHaveBeenLastCalledWith( 'debug', 'Piped stream errored with Cannot call write after a stream was destroyed', ); }); it('destroys the source stream in case the destinations becomes unpiped.', async(): Promise => { const input = new PassThrough(); const output = new PassThrough(); const piped = pipeSafely(input, output); // Catch errors to prevent problems in test output output.on('error', (): void => { // Empty }); piped.destroy(new Error('this causes an unpipe!')); // Allow events to propagate await new Promise(setImmediate); expect(input.destroyed).toBe(true); }); it('does not destroy the source stream if it is an HttpRequest.', async(): Promise => { (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); const input = new PassThrough(); const output = new PassThrough(); const piped = pipeSafely(input, output); // Catch errors to prevent problems in test output output.on('error', (): void => { // Empty }); piped.destroy(new Error('error!')); // Allow events to propagate await new Promise(setImmediate); expect(input.destroyed).toBe(false); }); it('still sends errors downstream if the input is an HttpRequest.', async(): Promise => { (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); const input = new PassThrough(); input.read = (): any => { input.emit('error', new Error('error')); return null; }; const output = new PassThrough(); const piped = pipeSafely(input, output); await expect(readableToString(piped)).rejects.toThrow('error'); }); it('can map errors if the input is an HttpRequest.', async(): Promise => { (isHttpRequest as unknown as jest.Mock).mockReturnValueOnce(true); const input = Readable.from([ 'data' ]); input.read = (): any => { input.emit('error', new Error('error')); return null; }; const output = new PassThrough(); const piped = pipeSafely(input, output, (): any => new Error('other error')); await expect(readableToString(piped)).rejects.toThrow('other error'); }); }); describe('#transformSafely', (): void => { it('can transform a stream without arguments.', async(): Promise => { const source = Readable.from([ 'data' ]); const transformed = transformSafely(source); transformed.setEncoding('utf8'); const result = await arrayifyStream(transformed); expect(result).toEqual([ 'data' ]); }); it('can transform a stream synchronously.', async(): Promise => { const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { encoding: 'utf8', transform(data: string): void { this.push(`${data}1`); this.push(`${data}2`); }, flush(): void { this.push(`data3`); }, }); const result = await arrayifyStream(transformed); expect(result).toEqual([ 'data1', 'data2', 'data3' ]); }); it('can transform a stream asynchronously.', async(): Promise => { const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { encoding: 'utf8', async transform(data: string): Promise { await new Promise((resolve): any => setImmediate(resolve)); this.push(`${data}1`); this.push(`${data}2`); }, async flush(): Promise { await new Promise((resolve): any => setImmediate(resolve)); this.push(`data3`); }, }); const result = await arrayifyStream(transformed); expect(result).toEqual([ 'data1', 'data2', 'data3' ]); }); it('catches source errors.', async(): Promise => { const error = new Error('stream error'); const source = new PassThrough(); const transformed = transformSafely(source); source.emit('error', error); await expect(arrayifyStream(transformed)).rejects.toThrow(error); }); it('catches synchronous errors on transform.', async(): Promise => { const error = new Error('stream error'); const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { transform(): never { throw error; }, }); await expect(arrayifyStream(transformed)).rejects.toThrow(error); }); it('catches synchronous errors on flush.', async(): Promise => { const error = new Error('stream error'); const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { async flush(): Promise { await new Promise((resolve): any => setImmediate(resolve)); throw error; }, }); await expect(arrayifyStream(transformed)).rejects.toThrow(error); }); it('catches asynchronous errors on transform.', async(): Promise => { const error = new Error('stream error'); const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { transform(): never { throw error; }, }); await expect(arrayifyStream(transformed)).rejects.toThrow(error); }); it('catches asynchronous errors on flush.', async(): Promise => { const error = new Error('stream error'); const source = Readable.from([ 'data' ]); const transformed = transformSafely(source, { async flush(): Promise { await new Promise((resolve): any => setImmediate(resolve)); throw error; }, }); await expect(arrayifyStream(transformed)).rejects.toThrow(error); }); }); describe('#guardedStreamFrom', (): void => { it('converts data to a guarded stream.', async(): Promise => { await expect(readableToString(guardedStreamFrom([ 'a', 'b' ]))).resolves.toBe('ab'); await expect(readableToString(guardedStreamFrom('ab'))).resolves.toBe('ab'); }); }); });