diff --git a/src/storage/ExpiringLock.ts b/src/storage/ExpiringLock.ts new file mode 100644 index 000000000..acf15a647 --- /dev/null +++ b/src/storage/ExpiringLock.ts @@ -0,0 +1,13 @@ +import type { EventEmitter } from 'events'; +import type { Lock } from './Lock'; + +/** + * ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations. + * Emits an "expired" event when internal timer runs out and should call release function when this happen. + */ +export interface ExpiringLock extends Lock, EventEmitter { + /** + * Reset the unlock timer. + */ + renew: () => void; +} diff --git a/src/storage/ExpiringResourceLocker.ts b/src/storage/ExpiringResourceLocker.ts new file mode 100644 index 000000000..4c67465b4 --- /dev/null +++ b/src/storage/ExpiringResourceLocker.ts @@ -0,0 +1,8 @@ +import type { ExpiringLock } from './ExpiringLock'; +import type { ResourceLocker } from './ResourceLocker'; + +/** + * Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s. + * Specific {@link ResourceLocker} to work with {@link ExpiringLock}s. + */ +export interface ExpiringResourceLocker extends ResourceLocker {} diff --git a/src/storage/LockingResourceStore.ts b/src/storage/LockingResourceStore.ts index 77eace78a..f53bc6973 100644 --- a/src/storage/LockingResourceStore.ts +++ b/src/storage/LockingResourceStore.ts @@ -6,12 +6,10 @@ import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifie import { getLoggerFor } from '../logging/LogUtil'; import type { AtomicResourceStore } from './AtomicResourceStore'; import type { Conditions } from './Conditions'; -import type { ResourceLocker } from './ResourceLocker'; +import type { ExpiringLock } from './ExpiringLock'; +import type { ExpiringResourceLocker } from './ExpiringResourceLocker'; import type { ResourceStore } from './ResourceStore'; -/** Time in ms after which reading a representation times out, causing the lock to be released. */ -const READ_TIMEOUT = 1000; - /** * Store that for every call acquires a lock before executing it on the requested resource, * and releases it afterwards. @@ -20,9 +18,9 @@ export class LockingResourceStore implements AtomicResourceStore { protected readonly logger = getLoggerFor(this); private readonly source: ResourceStore; - private readonly locks: ResourceLocker; + private readonly locks: ExpiringResourceLocker; - public constructor(source: ResourceStore, locks: ResourceLocker) { + public constructor(source: ResourceStore, locks: ExpiringResourceLocker) { this.source = source; this.locks = locks; } @@ -83,7 +81,7 @@ export class LockingResourceStore implements AtomicResourceStore { try { // Make the resource time out to ensure that the lock is always released eventually. representation = await func(); - return this.createExpiringRepresentation(representation); + return this.createExpiringRepresentation(representation, lock); } finally { // If the representation contains a valid Readable, wait for it to be consumed. const data = representation?.data; @@ -106,10 +104,11 @@ export class LockingResourceStore implements AtomicResourceStore { * Wraps a representation to make it time out when nothing is read for a certain amount of time. * * @param source - The representation to wrap + * @param lock - The lock for the corresponding identifier. */ - protected createExpiringRepresentation(source: Representation): Representation { + protected createExpiringRepresentation(source: Representation, lock: ExpiringLock): Representation { return Object.create(source, { - data: { value: this.createExpiringReadable(source.data) }, + data: { value: this.createExpiringReadable(source.data, lock) }, }); } @@ -117,26 +116,22 @@ export class LockingResourceStore implements AtomicResourceStore { * Wraps a readable to make it time out when nothing is read for a certain amount of time. * * @param source - The readable to wrap + * @param lock - The lock for the corresponding identifier. */ - protected createExpiringReadable(source: Readable): Readable { + protected createExpiringReadable(source: Readable, lock: ExpiringLock): Readable { // Destroy the source when a timeout occurs. const destroySource = (): void => { - this.logger.info(`Stream reading timout of ${READ_TIMEOUT}ms exceeded; destroying source`); - source.destroy(new Error(`Stream reading timout of ${READ_TIMEOUT}ms exceeded`)); + source.destroy(new Error(`Stream reading timout exceeded`)); }; - let timeout = setTimeout(destroySource, READ_TIMEOUT); - // Cancel the timeout when the source terminates by itself. - const cancelTimeout = (): void => clearTimeout(timeout); - source.on('error', cancelTimeout); - source.on('end', cancelTimeout); + // Handle the destruction of the source when the lock expires. + lock.on('expired', destroySource); - // Spy on the source to reset the timeout on read. + // Spy on the source to renew the lock upon reading. return Object.create(source, { read: { value(size: number): any { - cancelTimeout(); - timeout = setTimeout(destroySource, READ_TIMEOUT); + lock.renew(); return source.read(size); }, }, diff --git a/src/storage/ResourceLocker.ts b/src/storage/ResourceLocker.ts index 93e1a5960..8558b9f98 100644 --- a/src/storage/ResourceLocker.ts +++ b/src/storage/ResourceLocker.ts @@ -4,12 +4,12 @@ import type { Lock } from './Lock'; /** * Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s. */ -export interface ResourceLocker { +export interface ResourceLocker { /** * Lock the given resource. * @param identifier - Identifier of the resource that needs to be locked. * * @returns A promise containing the lock on the resource. */ - acquire: (identifier: ResourceIdentifier) => Promise; + acquire: (identifier: ResourceIdentifier) => Promise; } diff --git a/src/storage/WrappedExpiringLock.ts b/src/storage/WrappedExpiringLock.ts new file mode 100644 index 000000000..c547d8fd6 --- /dev/null +++ b/src/storage/WrappedExpiringLock.ts @@ -0,0 +1,60 @@ +import { EventEmitter } from 'events'; +import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; +import { getLoggerFor } from '../logging/LogUtil'; +import type { ExpiringLock } from './ExpiringLock'; +import type { Lock } from './Lock'; + +/** + * An implementation of an expiring lock which defines the expiration logic. + * + * ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations. + * Emits an "expired" event when internal timer runs out and calls release function when this happen. + */ +export class WrappedExpiringLock extends EventEmitter implements ExpiringLock { + protected readonly logger = getLoggerFor(this); + + protected readonly innerLock: Lock; + protected readonly readTimeout: number; + protected readonly identifier: ResourceIdentifier; + protected timeout: NodeJS.Timeout; + + /** + * @param innerLock - Instance of ResourceLocker to use for acquiring a lock. + * @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released. + * @param identifier - Identifier of the resource that needs to be locked. + */ + public constructor(innerLock: Lock, readTimeout: number, identifier: ResourceIdentifier) { + super(); + this.innerLock = innerLock; + this.readTimeout = readTimeout; + this.identifier = identifier; + this.timeout = setTimeout((): any => this.emitExpired(), readTimeout); + } + + /** + * Release this lock. + * @returns A promise resolving when the release is finished. + */ + public async release(): Promise { + clearTimeout(this.timeout); + return this.innerLock.release(); + } + + /** + * Reset the unlock timer. + */ + public renew(): void { + this.logger.verbose(`Renewed expiring timer of the lock for ${this.identifier.path}`); + clearTimeout(this.timeout); + this.timeout = setTimeout((): any => this.emitExpired(), this.readTimeout); + } + + /** + * This function will be called when the timer expires. + */ + protected async emitExpired(): Promise { + this.logger.verbose(`Lock expired after exceeding timeout of ${this.readTimeout}ms for ${this.identifier.path}`); + this.emit('expired'); + return this.innerLock.release(); + } +} diff --git a/src/storage/WrappedExpiringResourceLocker.ts b/src/storage/WrappedExpiringResourceLocker.ts new file mode 100644 index 000000000..941b4d7f8 --- /dev/null +++ b/src/storage/WrappedExpiringResourceLocker.ts @@ -0,0 +1,37 @@ +import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; +import { getLoggerFor } from '../logging/LogUtil'; +import type { ExpiringLock } from './ExpiringLock'; +import type { ExpiringResourceLocker } from './ExpiringResourceLocker'; +import type { ResourceLocker } from './ResourceLocker'; +import { WrappedExpiringLock } from './WrappedExpiringLock'; + +/** + * Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s. + * Differs from {@Link ResourceLocker} by adding expiration logic. + */ +export class WrappedExpiringResourceLocker implements ExpiringResourceLocker { + protected readonly logger = getLoggerFor(this); + + protected readonly locker: ResourceLocker; + protected readonly readTimeout: number; + + /** + * @param locker - Instance of ResourceLocker to use for acquiring a lock. + * @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released. + */ + public constructor(locker: ResourceLocker, readTimeout: number) { + this.locker = locker; + this.readTimeout = readTimeout; + } + + /** + * Lock the given resource with a lock providing expiration functionality. + * @param identifier - Identifier of the resource that needs to be locked. + * + * @returns A promise containing the expiring lock on the resource. + */ + public async acquire(identifier: ResourceIdentifier): Promise { + const innerLock = await this.locker.acquire(identifier); + return new WrappedExpiringLock(innerLock, this.readTimeout, identifier); + } +} diff --git a/test/unit/storage/LockingResourceStore.test.ts b/test/unit/storage/LockingResourceStore.test.ts index e0a848447..4bab3373d 100644 --- a/test/unit/storage/LockingResourceStore.test.ts +++ b/test/unit/storage/LockingResourceStore.test.ts @@ -2,20 +2,25 @@ import type { EventEmitter } from 'events'; import streamifyArray from 'streamify-array'; import type { Patch } from '../../../src/ldp/http/Patch'; import type { Representation } from '../../../src/ldp/representation/Representation'; -import type { Lock } from '../../../src/storage/Lock'; +import type { ExpiringLock } from '../../../src/storage/ExpiringLock'; +import type { ExpiringResourceLocker } from '../../../src/storage/ExpiringResourceLocker'; import { LockingResourceStore } from '../../../src/storage/LockingResourceStore'; -import type { ResourceLocker } from '../../../src/storage/ResourceLocker'; import type { ResourceStore } from '../../../src/storage/ResourceStore'; +import { WrappedExpiringResourceLocker } from '../../../src/storage/WrappedExpiringResourceLocker'; describe('A LockingResourceStore', (): void => { let store: LockingResourceStore; - let locker: ResourceLocker; - let lock: Lock; + let locker: ExpiringResourceLocker; + let lock: ExpiringLock; let release: () => Promise; + let renew: () => void; let source: ResourceStore; let order: string[]; + let funcOnEmit: () => any; beforeEach(async(): Promise => { + jest.clearAllMocks(); + order = []; const delayedResolve = (resolve: (resolveParams: any) => void, name: string, resolveParams?: any): void => { // `setImmediate` is introduced to make sure the promise doesn't execute immediately @@ -40,10 +45,26 @@ describe('A LockingResourceStore', (): void => { new Promise((resolve): any => delayedResolve(resolve, 'modifyResource'))), }; release = jest.fn(async(): Promise => order.push('release')); + renew = jest.fn(); + funcOnEmit = (): any => true; + locker = { acquire: jest.fn(async(): Promise => { order.push('acquire'); - lock = { release }; + lock = { + release, + renew, + on(event: string, func: () => void): void { + if (event === 'expired') { + funcOnEmit = func; + } + }, + emit(event: string): void { + if (event === 'expired') { + funcOnEmit(); + } + }, + } as unknown as ExpiringLock; return lock; }), }; @@ -171,6 +192,12 @@ describe('A LockingResourceStore', (): void => { it('destroys the stream when nothing is read after 1000ms.', async(): Promise => { jest.useFakeTimers(); + + // Spy on a real ResourceLocker instance + const strLocker = new WrappedExpiringResourceLocker(locker, 1000); + store = new LockingResourceStore(source, strLocker); + const acquireSpy = jest.spyOn(strLocker, 'acquire'); + const representation = await store.getRepresentation({ path: 'path' }, {}); const errorCallback = jest.fn(); representation.data.on('error', errorCallback); @@ -182,18 +209,22 @@ describe('A LockingResourceStore', (): void => { // Verify a timeout error was thrown expect(errorCallback).toHaveBeenCalledTimes(1); - expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded')); + expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded')); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(acquireSpy).toHaveBeenCalledTimes(1); + expect(acquireSpy).toHaveBeenLastCalledWith({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); }); it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise => { jest.useFakeTimers(); + + // Spy on a real ResourceLocker instance + const strLocker = new WrappedExpiringResourceLocker(locker, 1000); + store = new LockingResourceStore(source, strLocker); + const acquireSpy = jest.spyOn(strLocker, 'acquire'); + const representation = await store.getRepresentation({ path: 'path' }, {}); const errorCallback = jest.fn(); representation.data.on('error', errorCallback); @@ -213,13 +244,11 @@ describe('A LockingResourceStore', (): void => { // Verify a timeout error was thrown expect(errorCallback).toHaveBeenCalledTimes(1); - expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded')); + expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded')); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(acquireSpy).toHaveBeenCalledTimes(1); + expect(acquireSpy).toHaveBeenLastCalledWith({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); }); });