diff --git a/src/authorization/WebAclAuthorizer.ts b/src/authorization/WebAclAuthorizer.ts index de7ac53e2..97650b078 100644 --- a/src/authorization/WebAclAuthorizer.ts +++ b/src/authorization/WebAclAuthorizer.ts @@ -15,6 +15,7 @@ import { NotFoundHttpError } from '../util/errors/NotFoundHttpError'; import { NotImplementedHttpError } from '../util/errors/NotImplementedHttpError'; import { UnauthorizedHttpError } from '../util/errors/UnauthorizedHttpError'; import type { IdentifierStrategy } from '../util/identifiers/IdentifierStrategy'; +import { readableToQuads } from '../util/StreamUtil'; import { ACL, FOAF } from '../util/Vocabularies'; import type { AuthorizerArgs } from './Authorizer'; import { Authorizer } from './Authorizer'; @@ -254,12 +255,7 @@ export class WebAclAuthorizer extends Authorizer { */ private async filterData(data: Representation, predicate: string, object: string): Promise { // Import all triples from the representation into a queryable store - const quads = new Store(); - const importer = quads.import(data.data); - await new Promise((resolve, reject): void => { - importer.on('end', resolve); - importer.on('error', reject); - }); + const quads = await readableToQuads(data.data); // Find subjects that occur with a given predicate/object, and collect all their triples const subjectData = new Store(); diff --git a/src/storage/patch/SparqlUpdatePatchHandler.ts b/src/storage/patch/SparqlUpdatePatchHandler.ts index 379100ad6..07ffe9a44 100644 --- a/src/storage/patch/SparqlUpdatePatchHandler.ts +++ b/src/storage/patch/SparqlUpdatePatchHandler.ts @@ -15,7 +15,7 @@ import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdenti import { getLoggerFor } from '../../logging/LogUtil'; import { INTERNAL_QUADS } from '../../util/ContentTypes'; import { NotImplementedHttpError } from '../../util/errors/NotImplementedHttpError'; -import { readableToString } from '../../util/StreamUtil'; +import { readableToQuads, readableToString } from '../../util/StreamUtil'; import type { RepresentationConverter } from '../conversion/RepresentationConverter'; import { ConvertingPatchHandler } from './ConvertingPatchHandler'; import type { PatchHandlerArgs } from './PatchHandler'; @@ -119,19 +119,16 @@ export class SparqlUpdatePatchHandler extends ConvertingPatchHandler { */ protected async patch(input: PatchHandlerArgs, representation?: Representation): Promise { const { identifier, patch } = input; - const result = new Store(); + let result: Store; let metadata: RepresentationMetadata; if (representation) { ({ metadata } = representation); - const importEmitter = result.import(representation.data); - await new Promise((resolve, reject): void => { - importEmitter.on('end', resolve); - importEmitter.on('error', reject); - }); + result = await readableToQuads(representation.data); this.logger.debug(`${result.size} quads in ${identifier.path}.`); } else { metadata = new RepresentationMetadata(identifier, INTERNAL_QUADS); + result = new Store(); } // Run the query through Comunica diff --git a/src/util/StreamUtil.ts b/src/util/StreamUtil.ts index 1affe13b8..ffc3a9617 100644 --- a/src/util/StreamUtil.ts +++ b/src/util/StreamUtil.ts @@ -3,6 +3,7 @@ import { Readable, Transform } from 'stream'; import { promisify } from 'util'; import arrayifyStream from 'arrayify-stream'; import eos from 'end-of-stream'; +import { Store } from 'n3'; import pump from 'pump'; import { getLoggerFor } from '../logging/LogUtil'; import { isHttpRequest } from '../server/HttpRequest'; @@ -23,6 +24,19 @@ export async function readableToString(stream: Readable): Promise { return (await arrayifyStream(stream)).join(''); } +/** + * Imports quads from a stream into a Store. + * @param stream - Stream of quads. + * + * @returns A Store containing all the quads. + */ +export async function readableToQuads(stream: Readable): Promise { + const quads = new Store(); + quads.import(stream); + await endOfStream(stream); + return quads; +} + // These error messages usually indicate expected behaviour so should not give a warning. // We compare against the error message instead of the code // since the second one is from an external library that does not assign an error code. diff --git a/test/unit/util/StreamUtil.test.ts b/test/unit/util/StreamUtil.test.ts index 7e02129f7..941720891 100644 --- a/test/unit/util/StreamUtil.test.ts +++ b/test/unit/util/StreamUtil.test.ts @@ -1,9 +1,11 @@ import { PassThrough, Readable } from 'stream'; import arrayifyStream from 'arrayify-stream'; +import { Quad, NamedNode, Literal, BlankNode, Store } from 'n3'; import type { Logger } from '../../../src/logging/Logger'; import { getLoggerFor } from '../../../src/logging/LogUtil'; import { isHttpRequest } from '../../../src/server/HttpRequest'; -import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil'; +import { guardedStreamFrom, pipeSafely, transformSafely, + readableToString, readableToQuads } from '../../../src/util/StreamUtil'; jest.mock('../../../src/logging/LogUtil', (): any => { const logger: Logger = { warn: jest.fn(), log: jest.fn() } as any; @@ -23,6 +25,28 @@ describe('StreamUtil', (): void => { }); }); + describe('#readableToQuads', (): void => { + it('imports all quads from a Readable.', async(): Promise => { + const subject = new NamedNode('#subject'); + const property = new NamedNode('#property'); + const object = new NamedNode('#object'); + const literal = new Literal('abcde'); + const blankNode = new BlankNode('_1'); + const graph = new NamedNode('#graph'); + + const quad1 = new Quad(subject, property, object, graph); + const quad2 = new Quad(subject, property, literal, graph); + const quad3 = new Quad(subject, property, blankNode, graph); + const quads = new Store(); + quads.add(quad1); + quads.add(quad2); + quads.add(quad3); + + const stream = Readable.from([ quad1, quad2, quad3 ]); + await expect(readableToQuads(stream)).resolves.toEqual(quads); + }); + }); + describe('#pipeSafely', (): void => { beforeEach(async(): Promise => { jest.clearAllMocks();