mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
fix: Integrate wrapStreamError to prevent uncaught errors
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
import streamifyArray from 'streamify-array';
|
||||
import type { AclManager } from '../authorization/AclManager';
|
||||
import { RepresentationMetadata } from '../ldp/representation/RepresentationMetadata';
|
||||
import type { LoggerFactory } from '../logging/LoggerFactory';
|
||||
@@ -6,6 +5,7 @@ import { getLoggerFor, setGlobalLoggerFactory } from '../logging/LogUtil';
|
||||
import type { ExpressHttpServerFactory } from '../server/ExpressHttpServerFactory';
|
||||
import type { ResourceStore } from '../storage/ResourceStore';
|
||||
import { TEXT_TURTLE } from '../util/ContentTypes';
|
||||
import { guardedStreamFrom } from '../util/StreamUtil';
|
||||
import { CONTENT_TYPE } from '../util/UriConstants';
|
||||
|
||||
/**
|
||||
@@ -65,7 +65,7 @@ export class Setup {
|
||||
baseAclId,
|
||||
{
|
||||
binary: true,
|
||||
data: streamifyArray([ acl ]),
|
||||
data: guardedStreamFrom([ acl ]),
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Readable } from 'stream';
|
||||
import type { Guarded } from '../../../util/GuardedStream';
|
||||
import type { RepresentationMetadata } from '../../representation/RepresentationMetadata';
|
||||
import { ResponseDescription } from './ResponseDescription';
|
||||
|
||||
@@ -10,7 +11,7 @@ export class OkResponseDescription extends ResponseDescription {
|
||||
* @param metadata - Metadata concerning the response.
|
||||
* @param data - Potential data. @ignored
|
||||
*/
|
||||
public constructor(metadata: RepresentationMetadata, data?: Readable) {
|
||||
public constructor(metadata: RepresentationMetadata, data?: Guarded<Readable>) {
|
||||
super(200, metadata, data);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Readable } from 'stream';
|
||||
import type { Guarded } from '../../../util/GuardedStream';
|
||||
import type { RepresentationMetadata } from '../../representation/RepresentationMetadata';
|
||||
|
||||
/**
|
||||
@@ -7,14 +8,14 @@ import type { RepresentationMetadata } from '../../representation/Representation
|
||||
export class ResponseDescription {
|
||||
public readonly statusCode: number;
|
||||
public readonly metadata?: RepresentationMetadata;
|
||||
public readonly data?: Readable;
|
||||
public readonly data?: Guarded<Readable>;
|
||||
|
||||
/**
|
||||
* @param statusCode - Status code to return.
|
||||
* @param metadata - Metadata corresponding to the response (and data potentially).
|
||||
* @param data - Data that needs to be returned. @ignored
|
||||
*/
|
||||
public constructor(statusCode: number, metadata?: RepresentationMetadata, data?: Readable) {
|
||||
public constructor(statusCode: number, metadata?: RepresentationMetadata, data?: Guarded<Readable>) {
|
||||
this.statusCode = statusCode;
|
||||
this.metadata = metadata;
|
||||
this.data = data;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Readable } from 'stream';
|
||||
import type { Guarded } from '../../util/GuardedStream';
|
||||
import type { RepresentationMetadata } from './RepresentationMetadata';
|
||||
|
||||
/**
|
||||
@@ -12,7 +13,7 @@ export interface Representation {
|
||||
/**
|
||||
* The raw data stream for this representation.
|
||||
*/
|
||||
data: Readable;
|
||||
data: Guarded<Readable>;
|
||||
/**
|
||||
* Whether the data stream consists of binary/string chunks
|
||||
* (as opposed to complex objects).
|
||||
|
||||
@@ -3,6 +3,7 @@ import cors from 'cors';
|
||||
import type { Express } from 'express';
|
||||
import express from 'express';
|
||||
import { getLoggerFor } from '../logging/LogUtil';
|
||||
import { guardStream } from '../util/GuardedStream';
|
||||
import type { HttpHandler } from './HttpHandler';
|
||||
import type { HttpServerFactory } from './HttpServerFactory';
|
||||
|
||||
@@ -40,7 +41,7 @@ export class ExpressHttpServerFactory implements HttpServerFactory {
|
||||
app.use(async(request, response, done): Promise<void> => {
|
||||
try {
|
||||
this.logger.info(`Received request for ${request.url}`);
|
||||
await this.handler.handleSafe({ request, response });
|
||||
await this.handler.handleSafe({ request: guardStream(request), response });
|
||||
} catch (error: unknown) {
|
||||
const errMsg = error instanceof Error ? `${error.name}: ${error.message}\n${error.stack}` : 'Unknown error.';
|
||||
this.logger.error(errMsg);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { IncomingMessage } from 'http';
|
||||
import type { Guarded } from '../util/GuardedStream';
|
||||
|
||||
/**
|
||||
* An incoming HTTP request;
|
||||
*/
|
||||
export type HttpRequest = IncomingMessage;
|
||||
export type HttpRequest = Guarded<IncomingMessage>;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import type { Readable } from 'stream';
|
||||
import { DataFactory } from 'n3';
|
||||
import type { Quad } from 'rdf-js';
|
||||
import streamifyArray from 'streamify-array';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import type { Representation } from '../ldp/representation/Representation';
|
||||
import { RepresentationMetadata } from '../ldp/representation/RepresentationMetadata';
|
||||
@@ -12,6 +11,7 @@ import { MethodNotAllowedHttpError } from '../util/errors/MethodNotAllowedHttpEr
|
||||
import { NotFoundHttpError } from '../util/errors/NotFoundHttpError';
|
||||
import { NotImplementedError } from '../util/errors/NotImplementedError';
|
||||
import { UnsupportedHttpError } from '../util/errors/UnsupportedHttpError';
|
||||
import type { Guarded } from '../util/GuardedStream';
|
||||
import {
|
||||
ensureTrailingSlash,
|
||||
getParentContainer,
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
} from '../util/PathUtil';
|
||||
import { parseQuads } from '../util/QuadUtil';
|
||||
import { generateResourceQuads } from '../util/ResourceUtil';
|
||||
import { guardedStreamFrom } from '../util/StreamUtil';
|
||||
import { CONTENT_TYPE, HTTP, LDP, RDF } from '../util/UriConstants';
|
||||
import type { DataAccessor } from './accessors/DataAccessor';
|
||||
import type { ResourceStore } from './ResourceStore';
|
||||
@@ -70,9 +71,9 @@ export class DataAccessorBasedStore implements ResourceStore {
|
||||
metadata.contentType = INTERNAL_QUADS;
|
||||
result = {
|
||||
binary: false,
|
||||
get data(): Readable {
|
||||
get data(): Guarded<Readable> {
|
||||
// This allows other modules to still add metadata before the output data is written
|
||||
return streamifyArray(result.metadata.quads());
|
||||
return guardedStreamFrom(result.metadata.quads());
|
||||
},
|
||||
metadata,
|
||||
};
|
||||
@@ -365,7 +366,7 @@ export class DataAccessorBasedStore implements ResourceStore {
|
||||
protected getEmptyContainerRepresentation(container: ResourceIdentifier): Representation {
|
||||
return {
|
||||
binary: true,
|
||||
data: streamifyArray([]),
|
||||
data: guardedStreamFrom([]),
|
||||
metadata: new RepresentationMetadata(container.path),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import type { Representation } from '../ldp/representation/Representation';
|
||||
import type { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences';
|
||||
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
|
||||
import { getLoggerFor } from '../logging/LogUtil';
|
||||
import type { Guarded } from '../util/GuardedStream';
|
||||
import type { AtomicResourceStore } from './AtomicResourceStore';
|
||||
import type { Conditions } from './Conditions';
|
||||
import type { ExpiringLock } from './ExpiringLock';
|
||||
@@ -118,7 +119,7 @@ export class LockingResourceStore implements AtomicResourceStore {
|
||||
* @param source - The readable to wrap
|
||||
* @param lock - The lock for the corresponding identifier.
|
||||
*/
|
||||
protected createExpiringReadable(source: Readable, lock: ExpiringLock): Readable {
|
||||
protected createExpiringReadable(source: Guarded<Readable>, lock: ExpiringLock): Readable {
|
||||
// Destroy the source when a timeout occurs.
|
||||
lock.on('expired', (): void => {
|
||||
source.destroy(new Error(`Stream reading timout exceeded`));
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { Readable } from 'stream';
|
||||
import type { Representation } from '../../ldp/representation/Representation';
|
||||
import type { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
|
||||
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
|
||||
import type { Guarded } from '../../util/GuardedStream';
|
||||
|
||||
/**
|
||||
* A DataAccessor is the building block closest to the actual data storage.
|
||||
@@ -27,7 +28,7 @@ export interface DataAccessor {
|
||||
* It can be assumed that the incoming identifier will always correspond to a document.
|
||||
* @param identifier - Identifier for which the data is requested.
|
||||
*/
|
||||
getData: (identifier: ResourceIdentifier) => Promise<Readable>;
|
||||
getData: (identifier: ResourceIdentifier) => Promise<Guarded<Readable>>;
|
||||
|
||||
/**
|
||||
* Returns the metadata corresponding to the identifier.
|
||||
@@ -42,7 +43,8 @@ export interface DataAccessor {
|
||||
* @param data - Data to store.
|
||||
* @param metadata - Metadata to store.
|
||||
*/
|
||||
writeDocument: (identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata) => Promise<void>;
|
||||
writeDocument: (identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata) =>
|
||||
Promise<void>;
|
||||
|
||||
/**
|
||||
* Writes metadata for a container.
|
||||
|
||||
@@ -12,6 +12,8 @@ import { ConflictHttpError } from '../../util/errors/ConflictHttpError';
|
||||
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
|
||||
import { isSystemError } from '../../util/errors/SystemError';
|
||||
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
|
||||
import { guardStream } from '../../util/GuardedStream';
|
||||
import type { Guarded } from '../../util/GuardedStream';
|
||||
import { isContainerIdentifier } from '../../util/PathUtil';
|
||||
import { parseQuads, pushQuad, serializeQuads } from '../../util/QuadUtil';
|
||||
import { generateContainmentQuads, generateResourceQuads } from '../../util/ResourceUtil';
|
||||
@@ -45,12 +47,12 @@ export class FileDataAccessor implements DataAccessor {
|
||||
* Will return data stream directly to the file corresponding to the resource.
|
||||
* Will throw NotFoundHttpError if the input is a container.
|
||||
*/
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Readable> {
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
|
||||
const link = await this.resourceMapper.mapUrlToFilePath(identifier);
|
||||
const stats = await this.getStats(link.filePath);
|
||||
|
||||
if (stats.isFile()) {
|
||||
return createReadStream(link.filePath);
|
||||
return guardStream(createReadStream(link.filePath));
|
||||
}
|
||||
|
||||
throw new NotFoundHttpError();
|
||||
@@ -76,7 +78,7 @@ export class FileDataAccessor implements DataAccessor {
|
||||
* Writes the given data as a file (and potential metadata as additional file).
|
||||
* The metadata file will be written first and will be deleted if something goes wrong writing the actual data.
|
||||
*/
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
|
||||
Promise<void> {
|
||||
if (this.isMetadataPath(identifier.path)) {
|
||||
throw new ConflictHttpError('Not allowed to create files with the metadata extension.');
|
||||
@@ -264,7 +266,7 @@ export class FileDataAccessor implements DataAccessor {
|
||||
// Check if the metadata file exists first
|
||||
await fsPromises.lstat(metadataPath);
|
||||
|
||||
const readMetadataStream = createReadStream(metadataPath);
|
||||
const readMetadataStream = guardStream(createReadStream(metadataPath));
|
||||
return await parseQuads(readMetadataStream);
|
||||
} catch (error: unknown) {
|
||||
// Metadata file doesn't exist so lets keep `rawMetaData` an empty array.
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import { Readable } from 'stream';
|
||||
import type { Readable } from 'stream';
|
||||
import arrayifyStream from 'arrayify-stream';
|
||||
import { DataFactory } from 'n3';
|
||||
import type { NamedNode } from 'rdf-js';
|
||||
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
|
||||
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
|
||||
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
|
||||
import type { Guarded } from '../../util/GuardedStream';
|
||||
import { ensureTrailingSlash, isContainerIdentifier } from '../../util/PathUtil';
|
||||
import { generateContainmentQuads, generateResourceQuads } from '../../util/ResourceUtil';
|
||||
import { guardedStreamFrom } from '../../util/StreamUtil';
|
||||
import type { DataAccessor } from './DataAccessor';
|
||||
|
||||
interface DataEntry {
|
||||
@@ -19,27 +21,6 @@ interface ContainerEntry {
|
||||
}
|
||||
type CacheEntry = DataEntry | ContainerEntry;
|
||||
|
||||
class ArrayReadable extends Readable {
|
||||
private readonly data: any[];
|
||||
private idx: number;
|
||||
|
||||
public constructor(data: any[]) {
|
||||
super({ objectMode: true });
|
||||
this.data = data;
|
||||
this.idx = 0;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
public _read(): void {
|
||||
if (this.idx < this.data.length) {
|
||||
this.push(this.data[this.idx]);
|
||||
this.idx += 1;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class InMemoryDataAccessor implements DataAccessor {
|
||||
private readonly base: string;
|
||||
private readonly store: ContainerEntry;
|
||||
@@ -56,12 +37,12 @@ export class InMemoryDataAccessor implements DataAccessor {
|
||||
// All data is supported since streams never get read, only copied
|
||||
}
|
||||
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Readable> {
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
|
||||
const entry = this.getEntry(identifier);
|
||||
if (!this.isDataEntry(entry)) {
|
||||
throw new NotFoundHttpError();
|
||||
}
|
||||
return new ArrayReadable(entry.data);
|
||||
return guardedStreamFrom(entry.data);
|
||||
}
|
||||
|
||||
public async getMetadata(identifier: ResourceIdentifier): Promise<RepresentationMetadata> {
|
||||
@@ -72,7 +53,7 @@ export class InMemoryDataAccessor implements DataAccessor {
|
||||
return this.generateMetadata(identifier, entry);
|
||||
}
|
||||
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
|
||||
Promise<void> {
|
||||
const { parent, name } = this.getParentEntry(identifier);
|
||||
parent.entries[name] = {
|
||||
|
||||
@@ -23,6 +23,8 @@ import { ConflictHttpError } from '../../util/errors/ConflictHttpError';
|
||||
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
|
||||
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
|
||||
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
|
||||
import { guardStream } from '../../util/GuardedStream';
|
||||
import type { Guarded } from '../../util/GuardedStream';
|
||||
import { ensureTrailingSlash, getParentContainer, isContainerIdentifier } from '../../util/PathUtil';
|
||||
import { generateResourceQuads } from '../../util/ResourceUtil';
|
||||
import { CONTENT_TYPE, LDP } from '../../util/UriConstants';
|
||||
@@ -70,9 +72,9 @@ export class SparqlDataAccessor implements DataAccessor {
|
||||
* Returns all triples stored for the corresponding identifier.
|
||||
* Note that this will not throw a 404 if no results were found.
|
||||
*/
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Readable> {
|
||||
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
|
||||
const name = namedNode(identifier.path);
|
||||
return this.sendSparqlConstruct(this.sparqlConstruct(name));
|
||||
return await this.sendSparqlConstruct(this.sparqlConstruct(name));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -114,7 +116,7 @@ export class SparqlDataAccessor implements DataAccessor {
|
||||
/**
|
||||
* Reads the given data stream and stores it together with the metadata.
|
||||
*/
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
|
||||
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
|
||||
Promise<void> {
|
||||
if (this.isMetadataIdentifier(identifier)) {
|
||||
throw new ConflictHttpError('Not allowed to create NamedNodes with the metadata extension.');
|
||||
@@ -292,11 +294,11 @@ export class SparqlDataAccessor implements DataAccessor {
|
||||
* Sends a SPARQL CONSTRUCT query to the endpoint and returns a stream of quads.
|
||||
* @param sparqlQuery - Query to execute.
|
||||
*/
|
||||
private async sendSparqlConstruct(sparqlQuery: ConstructQuery): Promise<Readable> {
|
||||
private async sendSparqlConstruct(sparqlQuery: ConstructQuery): Promise<Guarded<Readable>> {
|
||||
const query = this.generator.stringify(sparqlQuery);
|
||||
this.logger.info(`Sending SPARQL CONSTRUCT query to ${this.endpoint}: ${query}`);
|
||||
try {
|
||||
return await this.fetcher.fetchTriples(this.endpoint, query);
|
||||
return guardStream(await this.fetcher.fetchTriples(this.endpoint, query));
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
this.logger.error(`SPARQL endpoint ${this.endpoint} error: ${error.message}`);
|
||||
|
||||
@@ -4,6 +4,7 @@ import type { Representation } from '../../ldp/representation/Representation';
|
||||
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
|
||||
import type { RepresentationPreferences } from '../../ldp/representation/RepresentationPreferences';
|
||||
import { INTERNAL_QUADS } from '../../util/ContentTypes';
|
||||
import { guardStream } from '../../util/GuardedStream';
|
||||
import { CONTENT_TYPE } from '../../util/UriConstants';
|
||||
import { validateRequestArgs, matchingTypes } from './ConversionUtil';
|
||||
import type { RepresentationConverterArgs } from './RepresentationConverter';
|
||||
@@ -34,7 +35,7 @@ export class QuadToRdfConverter extends TypedRepresentationConverter {
|
||||
const metadata = new RepresentationMetadata(quads.metadata, { [CONTENT_TYPE]: contentType });
|
||||
return {
|
||||
binary: true,
|
||||
data: rdfSerializer.serialize(quads.data, { contentType }) as Readable,
|
||||
data: guardStream(rdfSerializer.serialize(quads.data, { contentType }) as Readable),
|
||||
metadata,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdenti
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import { INTERNAL_QUADS } from '../../util/ContentTypes';
|
||||
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
|
||||
import { guardStream } from '../../util/GuardedStream';
|
||||
import { CONTENT_TYPE } from '../../util/UriConstants';
|
||||
import type { ResourceLocker } from '../ResourceLocker';
|
||||
import type { ResourceStore } from '../ResourceStore';
|
||||
@@ -77,7 +78,7 @@ export class SparqlUpdatePatchHandler extends PatchHandler {
|
||||
const metadata = new RepresentationMetadata(input.identifier.path, { [CONTENT_TYPE]: INTERNAL_QUADS });
|
||||
const representation: Representation = {
|
||||
binary: false,
|
||||
data: store.match() as Readable,
|
||||
data: guardStream(store.match() as Readable),
|
||||
metadata,
|
||||
};
|
||||
await this.source.setRepresentation(input.identifier, representation);
|
||||
|
||||
0
src/util/MetadataController.ts
Normal file
0
src/util/MetadataController.ts
Normal file
@@ -4,6 +4,7 @@ import { DataFactory, StreamParser, StreamWriter } from 'n3';
|
||||
import type { Literal, NamedNode, Quad } from 'rdf-js';
|
||||
import streamifyArray from 'streamify-array';
|
||||
import { TEXT_TURTLE } from './ContentTypes';
|
||||
import type { Guarded } from './GuardedStream';
|
||||
import { pipeSafely } from './StreamUtil';
|
||||
|
||||
/**
|
||||
@@ -19,7 +20,7 @@ export const pushQuad =
|
||||
*
|
||||
* @returns The Readable object.
|
||||
*/
|
||||
export const serializeQuads = (quads: Quad[]): Readable =>
|
||||
export const serializeQuads = (quads: Quad[]): Guarded<Readable> =>
|
||||
pipeSafely(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE }));
|
||||
|
||||
/**
|
||||
@@ -28,5 +29,5 @@ export const serializeQuads = (quads: Quad[]): Readable =>
|
||||
*
|
||||
* @returns A promise containing the array of quads.
|
||||
*/
|
||||
export const parseQuads = async(readable: Readable): Promise<Quad[]> =>
|
||||
export const parseQuads = async(readable: Guarded<Readable>): Promise<Quad[]> =>
|
||||
arrayifyStream(pipeSafely(readable, new StreamParser({ format: TEXT_TURTLE })));
|
||||
|
||||
Reference in New Issue
Block a user