From aa510bc6b8a604044b94cfc5bc756a3d280466b8 Mon Sep 17 00:00:00 2001 From: smessie Date: Thu, 20 Aug 2020 10:51:19 +0200 Subject: [PATCH] fix: Release lock only when stream ends or is abandoned (#60) * fix: Release lock only when stream has ended reading or an error occurs * refactor: Refactor code and tests * refactor: Move function mock to onBefore and remove unnecessary data drain * fix: Make functions protected, add extra listener and add extra tests * fix: Add extra TSDoc comment * fix: Adjust tests to expect both end and close event * refactor: Move test to other file * refactor: make lockedRun method-independent * fix: ensure lock release happens only once * fix: make locked resources time out * fix: destroy readable on error Co-authored-by: Ruben Verborgh --- src/storage/LockingResourceStore.ts | 114 ++++++++++++-- .../unit/storage/LockingResourceStore.test.ts | 142 ++++++++++++++++-- 2 files changed, 229 insertions(+), 27 deletions(-) diff --git a/src/storage/LockingResourceStore.ts b/src/storage/LockingResourceStore.ts index a84383358..be4921a4a 100644 --- a/src/storage/LockingResourceStore.ts +++ b/src/storage/LockingResourceStore.ts @@ -1,12 +1,16 @@ import { AtomicResourceStore } from './AtomicResourceStore'; import { Conditions } from './Conditions'; import { Patch } from '../ldp/http/Patch'; +import { Readable } from 'stream'; import { Representation } from '../ldp/representation/Representation'; import { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences'; import { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; import { ResourceLocker } from './ResourceLocker'; import { ResourceStore } from './ResourceStore'; +/** Time in ms after which reading a representation times out, causing the lock to be released. */ +const READ_TIMEOUT = 1000; + /** * Store that for every call acquires a lock before executing it on the requested resource, * and releases it afterwards. @@ -20,34 +24,39 @@ export class LockingResourceStore implements AtomicResourceStore { this.locks = locks; } + public async getRepresentation(identifier: ResourceIdentifier, preferences: RepresentationPreferences, + conditions?: Conditions): Promise { + return this.lockedRepresentationRun(identifier, + async(): Promise => this.source.getRepresentation(identifier, preferences, conditions)); + } + public async addResource(container: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { return this.lockedRun(container, async(): Promise => this.source.addResource(container, representation, conditions)); } - public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise { - return this.lockedRun(identifier, async(): Promise => this.source.deleteResource(identifier, conditions)); - } - - public async getRepresentation(identifier: ResourceIdentifier, preferences: RepresentationPreferences, - conditions?: Conditions): Promise { - return this.lockedRun(identifier, - async(): Promise => this.source.getRepresentation(identifier, preferences, conditions)); - } - - public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise { - return this.lockedRun(identifier, - async(): Promise => this.source.modifyResource(identifier, patch, conditions)); - } - public async setRepresentation(identifier: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { return this.lockedRun(identifier, async(): Promise => this.source.setRepresentation(identifier, representation, conditions)); } - private async lockedRun(identifier: ResourceIdentifier, func: () => Promise): Promise { + public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise { + return this.lockedRun(identifier, async(): Promise => this.source.deleteResource(identifier, conditions)); + } + + public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise { + return this.lockedRun(identifier, + async(): Promise => this.source.modifyResource(identifier, patch, conditions)); + } + + /** + * Acquires a lock for the identifier and releases it when the function is executed. + * @param identifier - Identifier that should be locked. + * @param func - Function to be executed. + */ + protected async lockedRun(identifier: ResourceIdentifier, func: () => Promise): Promise { const lock = await this.locks.acquire(identifier); try { return await func(); @@ -55,4 +64,77 @@ export class LockingResourceStore implements AtomicResourceStore { await lock.release(); } } + + /** + * Acquires a lock for the identifier that should return a representation with Readable data and releases it when the + * Readable is read, closed or results in an error. + * When using this function, it is required to close the Readable stream when you are ready. + * + * @param identifier - Identifier that should be locked. + * @param func - Function to be executed. + */ + protected async lockedRepresentationRun(identifier: ResourceIdentifier, func: () => Promise): + Promise { + const lock = await this.locks.acquire(identifier); + let representation; + try { + // Make the resource time out to ensure that the lock is always released eventually. + representation = await func(); + return this.createExpiringRepresentation(representation); + } finally { + // If the representation contains a valid Readable, wait for it to be consumed. + const data = representation?.data; + if (!data) { + await lock.release(); + } else { + // When an error occurs, destroy the readable so the lock is released safely. + data.on('error', (): void => data.destroy()); + + // An `end` and/or `close` event signals that the readable has been consumed. + new Promise((resolve): void => { + data.on('end', resolve); + data.on('close', resolve); + }).then((): any => lock.release(), null); + } + } + } + + /** + * Wraps a representation to make it time out when nothing is read for a certain amount of time. + * + * @param source - The representation to wrap + */ + protected createExpiringRepresentation(source: Representation): Representation { + return Object.create(source, { + data: { value: this.createExpiringReadable(source.data) }, + }); + } + + /** + * Wraps a readable to make it time out when nothing is read for a certain amount of time. + * + * @param source - The readable to wrap + */ + protected createExpiringReadable(source: Readable): Readable { + // Destroy the source when a timeout occurs. + const destroySource = (): void => + source.destroy(new Error(`Stream reading timout of ${READ_TIMEOUT}ms exceeded`)); + let timeout = setTimeout(destroySource, READ_TIMEOUT); + + // Cancel the timeout when the source terminates by itself. + const cancelTimeout = (): void => clearTimeout(timeout); + source.on('error', cancelTimeout); + source.on('end', cancelTimeout); + + // Spy on the source to reset the timeout on read. + return Object.create(source, { + read: { + value(size: number): any { + cancelTimeout(); + timeout = setTimeout(destroySource, READ_TIMEOUT); + return source.read(size); + }, + }, + }); + } } diff --git a/test/unit/storage/LockingResourceStore.test.ts b/test/unit/storage/LockingResourceStore.test.ts index 910a5191f..d6fe6d73d 100644 --- a/test/unit/storage/LockingResourceStore.test.ts +++ b/test/unit/storage/LockingResourceStore.test.ts @@ -1,9 +1,11 @@ +import { EventEmitter } from 'events'; import { Lock } from '../../../src/storage/Lock'; import { LockingResourceStore } from '../../../src/storage/LockingResourceStore'; import { Patch } from '../../../src/ldp/http/Patch'; import { Representation } from '../../../src/ldp/representation/Representation'; import { ResourceLocker } from '../../../src/storage/ResourceLocker'; import { ResourceStore } from '../../../src/storage/ResourceStore'; +import streamifyArray from 'streamify-array'; describe('A LockingResourceStore', (): void => { let store: LockingResourceStore; @@ -15,17 +17,19 @@ describe('A LockingResourceStore', (): void => { beforeEach(async(): Promise => { order = []; - const delayedResolve = (resolve: () => void, name: string): void => { + const delayedResolve = (resolve: (resolveParams: any) => void, name: string, resolveParams?: any): void => { // `setImmediate` is introduced to make sure the promise doesn't execute immediately setImmediate((): void => { order.push(name); - resolve(); + resolve(resolveParams); }); }; + const readable = streamifyArray([ 1, 2, 3 ]); source = { getRepresentation: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation'))), + new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation', { data: readable } as + Representation))), addResource: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'addResource'))), setRepresentation: jest.fn(async(): Promise => @@ -46,14 +50,14 @@ describe('A LockingResourceStore', (): void => { store = new LockingResourceStore(source, locker); }); - it('acquires a lock on the resource when getting it.', async(): Promise => { - await store.getRepresentation({ path: 'path' }, {}); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); - expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'release' ]); - }); + const registerEventOrder = async(eventSource: EventEmitter, event: string): Promise => { + await new Promise((resolve): any => { + eventSource.prependListener(event, (): any => { + order.push(event); + resolve(); + }); + }); + }; it('acquires a lock on the container when adding a representation.', async(): Promise => { await store.addResource({ path: 'path' }, {} as Representation); @@ -101,4 +105,120 @@ describe('A LockingResourceStore', (): void => { expect(lock.release).toHaveBeenCalledTimes(1); expect(order).toEqual([ 'acquire', 'release' ]); }); + + it('releases the lock on the resource when data has been read.', async(): Promise => { + // Read all data from the representation + const representation = await store.getRepresentation({ path: 'path' }, {}); + representation.data.on('data', (): any => true); + await registerEventOrder(representation.data, 'end'); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'release' ]); + }); + + it('destroys the resource and releases the lock when the readable errors.', async(): Promise => { + // Make the representation error + const representation = await store.getRepresentation({ path: 'path' }, {}); + Promise.resolve().then((): any => + representation.data.emit('error', new Error('Error on the readable')), null); + await registerEventOrder(representation.data, 'error'); + await registerEventOrder(representation.data, 'close'); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'error', 'close', 'release' ]); + }); + + it('releases the lock on the resource when readable is destroyed.', async(): Promise => { + // Make the representation close + const representation = await store.getRepresentation({ path: 'path' }, {}); + representation.data.destroy(); + await registerEventOrder(representation.data, 'close'); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); + }); + + it('releases the lock only once when multiple events are triggered.', async(): Promise => { + // Read all data from the representation and trigger an additional close event + const representation = await store.getRepresentation({ path: 'path' }, {}); + representation.data.on('data', (): any => true); + representation.data.prependListener('end', (): any => { + order.push('end'); + representation.data.destroy(); + }); + await registerEventOrder(representation.data, 'close'); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'close', 'release' ]); + }); + + it('destroys the stream when nothing is read after 1000ms.', async(): Promise => { + jest.useFakeTimers(); + const representation = await store.getRepresentation({ path: 'path' }, {}); + const errorCallback = jest.fn(); + representation.data.on('error', errorCallback); + + // Wait 1000ms and read + jest.advanceTimersByTime(1000); + expect(representation.data.read()).toBe(null); + await registerEventOrder(representation.data, 'close'); + + // Verify a timeout error was thrown + expect(errorCallback).toHaveBeenCalledTimes(1); + expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded')); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); + }); + + it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise => { + jest.useFakeTimers(); + const representation = await store.getRepresentation({ path: 'path' }, {}); + const errorCallback = jest.fn(); + representation.data.on('error', errorCallback); + + // Wait 750ms and read + jest.advanceTimersByTime(750); + expect(representation.data.read()).toBe(1); + + // Wait 750ms and read + jest.advanceTimersByTime(750); + expect(representation.data.read()).toBe(2); + + // Wait 1000ms and watch the stream be destroyed + jest.advanceTimersByTime(1000); + expect(representation.data.read()).toBe(null); + await registerEventOrder(representation.data, 'close'); + + // Verify a timeout error was thrown + expect(errorCallback).toHaveBeenCalledTimes(1); + expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded')); + + // Verify the lock was acquired and released at the right time + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); + }); });