feat: Add transformSafely.

This commit is contained in:
Ruben Verborgh
2021-01-08 23:23:07 +01:00
committed by Joachim Van Herwegen
parent 61aa2e12bd
commit 995a2dc74d
2 changed files with 155 additions and 3 deletions

View File

@@ -1,5 +1,5 @@
import type { Writable, ReadableOptions } from 'stream';
import { Readable } from 'stream';
import type { Writable, ReadableOptions, DuplexOptions } from 'stream';
import { Readable, Transform } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { getLoggerFor } from '../logging/LogUtil';
import type { Guarded } from './GuardedStream';
@@ -43,6 +43,59 @@ export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream,
return guardStream(destination);
}
export interface AsyncTransformOptions<T = any> extends DuplexOptions {
/**
* Transforms data from the source by calling the `push` method
*/
transform?: (this: Transform, data: T, encoding: string) => any | Promise<any>;
/**
* Performs any final actions after the source has ended
*/
flush?: (this: Transform) => any | Promise<any>;
}
/**
* Transforms a stream, ensuring that all errors are forwarded.
* @param source - The stream to be transformed
* @param options - The transformation options
*
* @returns The transformed stream
*/
export function transformSafely<T = any>(
source: NodeJS.ReadableStream,
{
transform = function(data): void {
this.push(data);
},
flush = (): null => null,
...options
}: AsyncTransformOptions<T> = {},
):
Guarded<Transform> {
return pipeSafely(source, new Transform({
...options,
async transform(data, encoding, callback): Promise<void> {
let error: Error | null = null;
try {
await transform.call(this, data, encoding);
} catch (err: unknown) {
error = err as Error;
}
callback(error);
},
async flush(callback): Promise<void> {
let error: Error | null = null;
try {
await flush.call(this);
} catch (err: unknown) {
error = err as Error;
}
callback(error);
},
}));
}
/**
* Converts an iterable to a stream and applies an error guard so that it is {@link Guarded}.
* @param iterable - Data to stream.