feat: Track binary size of resources when possible

This commit is contained in:
Joachim Van Herwegen
2023-10-02 13:40:19 +02:00
parent 3e9adef4cf
commit 71e55690f3
15 changed files with 194 additions and 35 deletions

View File

@@ -1,6 +1,8 @@
import { getLoggerFor } from '../../../logging/LogUtil';
import type { HttpResponse } from '../../../server/HttpResponse';
import { addHeader } from '../../../util/HeaderUtil';
import { SOLID_HTTP } from '../../../util/Vocabularies';
import { termToInt } from '../../../util/QuadUtil';
import { POSIX, SOLID_HTTP } from '../../../util/Vocabularies';
import type { RepresentationMetadata } from '../../representation/RepresentationMetadata';
import { MetadataWriter } from './MetadataWriter';
@@ -10,16 +12,39 @@ import { MetadataWriter } from './MetadataWriter';
* According to the RFC, this is incorrect,
* but is all we can do as long as we don't know the full length of the representation in advance.
* For the same reason, the total length of the representation will always be `*`.
*
* This class also adds the content-length header.
* This will contain either the full size for standard requests,
* or the size of the slice for range requests.
*/
export class RangeMetadataWriter extends MetadataWriter {
protected readonly logger = getLoggerFor(this);
public async handle(input: { response: HttpResponse; metadata: RepresentationMetadata }): Promise<void> {
const unit = input.metadata.get(SOLID_HTTP.terms.unit);
const size = termToInt(input.metadata.get(POSIX.terms.size));
const unit = input.metadata.get(SOLID_HTTP.terms.unit)?.value;
if (!unit) {
if (typeof size === 'number') {
addHeader(input.response, 'Content-Length', `${size}`);
}
return;
}
const start = input.metadata.get(SOLID_HTTP.terms.start);
const end = input.metadata.get(SOLID_HTTP.terms.end);
addHeader(input.response, 'Content-Range', `${unit.value} ${start?.value ?? '*'}-${end?.value ?? '*'}/*`);
let start = termToInt(input.metadata.get(SOLID_HTTP.terms.start));
if (typeof start === 'number' && start < 0 && typeof size === 'number') {
start = size + start;
}
let end = termToInt(input.metadata.get(SOLID_HTTP.terms.end));
if (typeof end !== 'number' && typeof size === 'number') {
end = size - 1;
}
const rangeHeader = `${unit} ${start ?? '*'}-${end ?? '*'}/${size ?? '*'}`;
addHeader(input.response, 'Content-Range', rangeHeader);
if (typeof start === 'number' && typeof end === 'number') {
addHeader(input.response, 'Content-Length', `${end - start + 1}`);
} else {
this.logger.warn(`Generating invalid content-range header due to missing size information: ${rangeHeader}`);
}
}
}

View File

@@ -5,9 +5,10 @@ import { getLoggerFor } from '../logging/LogUtil';
import { InternalServerError } from '../util/errors/InternalServerError';
import { RangeNotSatisfiedHttpError } from '../util/errors/RangeNotSatisfiedHttpError';
import { guardStream } from '../util/GuardedStream';
import { termToInt } from '../util/QuadUtil';
import { SliceStream } from '../util/SliceStream';
import { toLiteral } from '../util/TermUtil';
import { SOLID_HTTP, XSD } from '../util/Vocabularies';
import { POSIX, SOLID_HTTP, XSD } from '../util/Vocabularies';
import type { Conditions } from './Conditions';
import { PassthroughStore } from './PassthroughStore';
import type { ResourceStore } from './ResourceStore';
@@ -51,10 +52,11 @@ export class BinarySliceResourceStore<T extends ResourceStore = ResourceStore> e
}
try {
const size = termToInt(result.metadata.get(POSIX.terms.size));
// The reason we don't determine the object mode based on the object mode of the parent stream
// is that `guardedStreamFrom` does not create object streams when inputting streams/buffers.
// Something to potentially update in the future.
result.data = guardStream(new SliceStream(result.data, { start, end, objectMode: false }));
result.data = guardStream(new SliceStream(result.data, { start, end, size, objectMode: false }));
} catch (error: unknown) {
// Creating the slice stream can throw an error if some of the parameters are unacceptable.
// Need to make sure the stream is closed in that case.

View File

@@ -32,6 +32,9 @@ export interface DataAccessor {
/**
* Returns the metadata corresponding to the identifier.
* If possible, it is suggested to add a `posix:size` triple to the metadata indicating the binary size.
* This is necessary for range requests.
*
* @param identifier - Identifier for which the metadata is requested.
*/
getMetadata: (identifier: ResourceIdentifier) => Promise<RepresentationMetadata>;

View File

@@ -8,6 +8,8 @@ import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import type { Guarded } from '../../util/GuardedStream';
import type { IdentifierStrategy } from '../../util/identifiers/IdentifierStrategy';
import { guardedStreamFrom } from '../../util/StreamUtil';
import { POSIX } from '../../util/Vocabularies';
import { isInternalContentType } from '../conversion/ConversionUtil';
import type { DataAccessor } from './DataAccessor';
interface DataEntry {
@@ -59,9 +61,17 @@ export class InMemoryDataAccessor implements DataAccessor, SingleThreaded {
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
Promise<void> {
const parent = this.getParentEntry(identifier);
// Drain original stream and create copy
const dataArray = await arrayifyStream(data);
// Only add the size for binary streams, which are all streams that do not have an internal type.
if (metadata.contentType && !isInternalContentType(metadata.contentType)) {
const size = dataArray.reduce<number>((total, chunk: Buffer): number => total + chunk.length, 0);
metadata.set(POSIX.terms.size, `${size}`);
}
parent.entries[identifier.path] = {
// Drain original stream and create copy
data: await arrayifyStream(data),
data: dataArray,
metadata,
};
}

View File

@@ -5,6 +5,7 @@ import type { ValuePreferences } from '../../http/representation/RepresentationP
import { getLoggerFor } from '../../logging/LogUtil';
import { BadRequestHttpError } from '../../util/errors/BadRequestHttpError';
import { NotImplementedHttpError } from '../../util/errors/NotImplementedHttpError';
import { POSIX } from '../../util/Vocabularies';
import { cleanPreferences, getBestPreference, getTypeWeight, preferencesToString } from './ConversionUtil';
import type { RepresentationConverterArgs } from './RepresentationConverter';
import { RepresentationConverter } from './RepresentationConverter';
@@ -100,6 +101,13 @@ export class ChainedConverter extends RepresentationConverter {
args.preferences = { type: { [outTypes[i]]: 1 }};
args.representation = await match.converters[i].handle(args);
}
// For now, we assume any kind of conversion invalidates the stored byte length.
// In the future, we could let converters handle this individually, as some might know the size of the result.
if (match.converters.length > 0) {
args.representation.metadata.removeAll(POSIX.terms.size);
}
return args.representation;
}

View File

@@ -3,7 +3,7 @@ import type { NamedNode } from '@rdfjs/types';
import arrayifyStream from 'arrayify-stream';
import type { ParserOptions } from 'n3';
import { StreamParser, StreamWriter } from 'n3';
import type { Quad } from 'rdf-js';
import type { Quad, Term } from 'rdf-js';
import type { Guarded } from './GuardedStream';
import { guardedStreamFrom, pipeSafely } from './StreamUtil';
import { toNamedTerm } from './TermUtil';
@@ -45,6 +45,18 @@ export function uniqueQuads(quads: Quad[]): Quad[] {
}, []);
}
/**
* Converts a term to a number. Returns undefined if the term was undefined.
*
* @param term - Term to parse.
* @param radix - Radix to use when parsing. Default is 10.
*/
export function termToInt(term?: Term, radix = 10): number | undefined {
if (term) {
return Number.parseInt(term.value, radix);
}
}
/**
* Represents a triple pattern to be used as a filter.
*/

View File

@@ -3,12 +3,19 @@ import { Transform } from 'stream';
import { RangeNotSatisfiedHttpError } from './errors/RangeNotSatisfiedHttpError';
import { pipeSafely } from './StreamUtil';
export interface SliceStreamOptions extends TransformOptions {
start: number;
end?: number;
size?: number;
}
/**
* A stream that slices a part out of another stream.
* `start` and `end` are inclusive.
* If `end` is not defined it is until the end of the stream.
* Does not support negative `start` values which would indicate slicing the end of the stream off,
* since we don't know the length of the input stream.
*
* Negative `start` values can be used to instead slice that many streams off the end of the stream.
* This requires the `size` field to be defined.
*
* Both object and non-object streams are supported.
* This needs to be explicitly specified,
@@ -19,16 +26,29 @@ export class SliceStream extends Transform {
protected remainingSkip: number;
protected remainingRead: number;
public constructor(source: Readable, options: TransformOptions & { start: number; end?: number }) {
public constructor(source: Readable, options: SliceStreamOptions) {
super(options);
let start = options.start;
const end = options.end ?? Number.POSITIVE_INFINITY;
if (options.start < 0) {
throw new RangeNotSatisfiedHttpError('Slicing data at the end of a stream is not supported.');
if (typeof options.size !== 'number') {
throw new RangeNotSatisfiedHttpError('Slicing data at the end of a stream requires a known size.');
} else {
// `start` is a negative number here so need to add
start = options.size + start;
}
}
if (options.start >= end) {
if (start >= end) {
throw new RangeNotSatisfiedHttpError('Range start should be less than end.');
}
this.remainingSkip = options.start;
// Not using `end` variable as that could be infinity
if (typeof options.end === 'number' && typeof options.size === 'number' && options.end >= options.size) {
throw new RangeNotSatisfiedHttpError('Range end should be less than the total size.');
}
this.remainingSkip = start;
// End value is inclusive
this.remainingRead = end - options.start + 1;