From b59357ec30b01f05cb948a64b425def893e442d8 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Wed, 20 Jan 2021 11:19:38 +0100 Subject: [PATCH] feat: Update WrappedExpiringResourceLocker to new interface Due to the new interface, it is now possible to throw an error if there is a timeout which should make it easier to find locking issues. --- src/index.ts | 1 - src/util/locking/ExpiringLock.ts | 14 -- src/util/locking/ExpiringResourceLocker.ts | 30 +++- src/util/locking/WrappedExpiringLock.ts | 65 -------- .../locking/WrappedExpiringResourceLocker.ts | 60 ++++++-- .../WrappedExpiringResourceLocker.test.ts | 141 +++++++++--------- 6 files changed, 146 insertions(+), 165 deletions(-) delete mode 100644 src/util/locking/ExpiringLock.ts delete mode 100644 src/util/locking/WrappedExpiringLock.ts diff --git a/src/index.ts b/src/index.ts index f1a411108..6a4c5d7e7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -190,7 +190,6 @@ export * from './util/identifiers/IdentifierStrategy'; export * from './util/identifiers/SingleRootIdentifierStrategy'; // Util/Locking -export * from './util/locking/ExpiringLock'; export * from './util/locking/ExpiringResourceLocker'; export * from './util/locking/ResourceLocker'; export * from './util/locking/SingleThreadedResourceLocker'; diff --git a/src/util/locking/ExpiringLock.ts b/src/util/locking/ExpiringLock.ts deleted file mode 100644 index df084226b..000000000 --- a/src/util/locking/ExpiringLock.ts +++ /dev/null @@ -1,14 +0,0 @@ -import type { EventEmitter } from 'events'; -import type { Lock } from './Lock'; - -/** - * Interface for a lock that expires after a certain period of inactivity. - * Activity can be signaled by calling `renew`, which resets the expiration timeout. - * When the lock has expired, an `expired` event is emitted and the lock is released. - */ -export interface ExpiringLock extends Lock, EventEmitter { - /** - * Reset the lock expiration timeout. - */ - renew: () => void; -} diff --git a/src/util/locking/ExpiringResourceLocker.ts b/src/util/locking/ExpiringResourceLocker.ts index e0fcc4ef8..76b044bfd 100644 --- a/src/util/locking/ExpiringResourceLocker.ts +++ b/src/util/locking/ExpiringResourceLocker.ts @@ -1,7 +1,31 @@ -import type { ExpiringLock } from './ExpiringLock'; +import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier'; import type { ResourceLocker } from './ResourceLocker'; /** - * Interface for a factory of expiring locks. + * A {@link ResourceLocker} where the locks expire after a given time. */ -export interface ExpiringResourceLocker extends ResourceLocker {} +export interface ExpiringResourceLocker extends ResourceLocker { + /** + * As {@link ResourceLocker.withReadLock} but the locked function gets called with a `maintainLock` callback function + * to reset the lock expiration every time it is called. + * The resulting promise will reject once the lock expires. + * + * @param identifier - Identifier of the resource that needs to be locked. + * @param whileLocked - A function to execute while the resource is locked. + * Receives a callback as input parameter to maintain the lock. + */ + withReadLock: (identifier: ResourceIdentifier, whileLocked: (maintainLock: () => void) => T | Promise) + => Promise; + + /** + * As {@link ResourceLocker.withWriteLock} but the locked function gets called with a `maintainLock` callback function + * to reset the lock expiration every time it is called. + * The resulting promise will reject once the lock expires. + * + * @param identifier - Identifier of the resource that needs to be locked. + * @param whileLocked - A function to execute while the resource is locked. + * Receives a callback as input parameter to maintain the lock. + */ + withWriteLock: (identifier: ResourceIdentifier, whileLocked: (maintainLock: () => void) => T | Promise) + => Promise; +} diff --git a/src/util/locking/WrappedExpiringLock.ts b/src/util/locking/WrappedExpiringLock.ts deleted file mode 100644 index d0df5035a..000000000 --- a/src/util/locking/WrappedExpiringLock.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { EventEmitter } from 'events'; -import { getLoggerFor } from '../../logging/LogUtil'; -import type { ExpiringLock } from './ExpiringLock'; -import type { Lock } from './Lock'; - -/** - * An implementation of an expiring lock which defines the expiration logic. - * - * ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations. - * Emits an "expired" event when internal timer runs out and calls release function when this happen. - */ -export class WrappedExpiringLock extends EventEmitter implements ExpiringLock { - protected readonly logger = getLoggerFor(this); - - protected readonly innerLock: Lock; - protected readonly expiration: number; - protected timeoutHandle?: NodeJS.Timeout; - - /** - * @param innerLock - Instance of ResourceLocker to use for acquiring a lock. - * @param expiration - Time in ms after which the lock expires. - */ - public constructor(innerLock: Lock, expiration: number) { - super(); - this.innerLock = innerLock; - this.expiration = expiration; - this.scheduleTimeout(); - } - - /** - * Release this lock. - * @returns A promise resolving when the release is finished. - */ - public async release(): Promise { - this.clearTimeout(); - return this.innerLock.release(); - } - - /** - * Reset the unlock timer. - */ - public renew(): void { - this.clearTimeout(); - this.scheduleTimeout(); - } - - private async expire(): Promise { - this.logger.warn(`Lock expired after ${this.expiration}ms`); - this.emit('expired'); - try { - await this.innerLock.release(); - } catch (error: unknown) { - this.emit('error', error); - } - } - - private clearTimeout(): void { - clearTimeout(this.timeoutHandle!); - } - - private scheduleTimeout(): void { - this.logger.verbose(`Renewed expiring lock`); - this.timeoutHandle = setTimeout((): any => this.expire(), this.expiration); - } -} diff --git a/src/util/locking/WrappedExpiringResourceLocker.ts b/src/util/locking/WrappedExpiringResourceLocker.ts index e89afcb4d..031f4f740 100644 --- a/src/util/locking/WrappedExpiringResourceLocker.ts +++ b/src/util/locking/WrappedExpiringResourceLocker.ts @@ -1,13 +1,12 @@ import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier'; import { getLoggerFor } from '../../logging/LogUtil'; -import type { ExpiringLock } from './ExpiringLock'; +import { InternalServerError } from '../errors/InternalServerError'; import type { ExpiringResourceLocker } from './ExpiringResourceLocker'; import type { ResourceLocker } from './ResourceLocker'; -import { WrappedExpiringLock } from './WrappedExpiringLock'; +import Timeout = NodeJS.Timeout; /** - * Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s. - * Differs from {@Link ResourceLocker} by adding expiration logic. + * Wraps around an existing {@link ResourceLocker} and adds expiration logic to prevent locks from getting stuck. */ export class WrappedExpiringResourceLocker implements ExpiringResourceLocker { protected readonly logger = getLoggerFor(this); @@ -24,14 +23,53 @@ export class WrappedExpiringResourceLocker implements ExpiringResourceLocker { this.expiration = expiration; } + public async withReadLock(identifier: ResourceIdentifier, + whileLocked: (maintainLock: () => void) => T | Promise): Promise { + return this.locker.withReadLock(identifier, async(): Promise => this.expiringPromise(identifier, whileLocked)); + } + + public async withWriteLock(identifier: ResourceIdentifier, + whileLocked: (maintainLock: () => void) => T | Promise): Promise { + return this.locker.withWriteLock(identifier, async(): Promise => this.expiringPromise(identifier, whileLocked)); + } + /** - * Lock the given resource with a lock providing expiration functionality. - * @param identifier - Identifier of the resource that needs to be locked. - * - * @returns A promise containing the expiring lock on the resource. + * Creates a Promise that either resolves the given input function or rejects if time runs out, + * whichever happens first. The input function can reset the timer by calling the `maintainLock` function + * it receives. The ResourceIdentifier is only used for logging. */ - public async acquire(identifier: ResourceIdentifier): Promise { - const innerLock = await this.locker.acquire(identifier); - return new WrappedExpiringLock(innerLock, this.expiration); + private async expiringPromise(identifier: ResourceIdentifier, + whileLocked: (maintainLock: () => void) => T | Promise): Promise { + let timer: Timeout; + let createTimeout: () => Timeout; + + // Promise that throws an error when the timer finishes + const timerPromise = new Promise((resolve, reject): void => { + // Starts the timer that will cause this promise to error after a given time + createTimeout = (): Timeout => setTimeout((): void => { + this.logger.error(`Lock expired after ${this.expiration}ms on ${identifier.path}`); + reject(new InternalServerError(`Lock expired after ${this.expiration}ms on ${identifier.path}`)); + }, this.expiration); + + timer = createTimeout(); + }); + + // Restarts the timer + const renewTimer = (): void => { + this.logger.verbose(`Renewed expiring lock on ${identifier.path}`); + clearTimeout(timer); + timer = createTimeout(); + }; + + // Runs the main function and cleans up the timer afterwards + async function runWithTimeout(): Promise { + try { + return await whileLocked(renewTimer); + } finally { + clearTimeout(timer); + } + } + + return Promise.race([ timerPromise, runWithTimeout() ]); } } diff --git a/test/unit/util/locking/WrappedExpiringResourceLocker.test.ts b/test/unit/util/locking/WrappedExpiringResourceLocker.test.ts index ed54c1fee..a4e930214 100644 --- a/test/unit/util/locking/WrappedExpiringResourceLocker.test.ts +++ b/test/unit/util/locking/WrappedExpiringResourceLocker.test.ts @@ -1,87 +1,86 @@ -import type { EventEmitter } from 'events'; -import streamifyArray from 'streamify-array'; +import type { ResourceIdentifier } from '../../../../src/ldp/representation/ResourceIdentifier'; +import type { ResourceLocker } from '../../../../src/util/locking/ResourceLocker'; import { WrappedExpiringResourceLocker } from '../../../../src/util/locking/WrappedExpiringResourceLocker'; +jest.useFakeTimers(); + describe('A WrappedExpiringResourceLocker', (): void => { - let order: string[]; + const identifier = { path: 'path' }; + let syncCb: () => string; + let asyncCb: () => Promise; + let wrappedLocker: ResourceLocker; + let locker: WrappedExpiringResourceLocker; + const expiration = 1000; beforeEach(async(): Promise => { - order = []; + wrappedLocker = { + withReadLock: jest.fn(async(id: ResourceIdentifier, whileLocked: () => T | Promise): + Promise => whileLocked()), + withWriteLock: jest.fn(async(id: ResourceIdentifier, whileLocked: () => T | Promise): + Promise => whileLocked()), + }; + + syncCb = jest.fn((): string => 'sync'); + asyncCb = jest.fn(async(): Promise => new Promise((resolve): void => { + setImmediate((): void => resolve('async')); + })); + + locker = new WrappedExpiringResourceLocker(wrappedLocker, expiration); }); - async function registerEventOrder(eventSource: EventEmitter, event: string): Promise { - await new Promise((resolve): any => { - eventSource.prependListener(event, (): any => { - order.push(event); - resolve(); - }); - }); - } + it('calls the wrapped locker for locking.', async(): Promise => { + let prom = locker.withReadLock(identifier, syncCb); + await expect(prom).resolves.toBe('sync'); + expect(wrappedLocker.withReadLock).toHaveBeenCalledTimes(1); + expect((wrappedLocker.withReadLock as jest.Mock).mock.calls[0][0]).toBe(identifier); - it('emits an error event when releasing the lock errors.', async(): Promise => { - jest.useFakeTimers(); + prom = locker.withWriteLock(identifier, syncCb); + await expect(prom).resolves.toBe('sync'); + expect(wrappedLocker.withWriteLock).toHaveBeenCalledTimes(1); + expect((wrappedLocker.withWriteLock as jest.Mock).mock.calls[0][0]).toBe(identifier); + }); - // Create a locker that fails upon release - const faultyLocker = { - acquire(): any { - return { - async release(): Promise { - throw new Error('Release error'); - }, - }; - }, - }; - const expiringLocker = new WrappedExpiringResourceLocker(faultyLocker, 1000); - const expiringLock = await expiringLocker.acquire({} as any); - const errorCallback = jest.fn(); - expiringLock.on('error', errorCallback); + it('calls the functions that need to be locked through the wrapped locker.', async(): Promise => { + let prom = locker.withReadLock(identifier, syncCb); + await expect(prom).resolves.toBe('sync'); + expect(syncCb).toHaveBeenCalledTimes(1); - // Let the lock expire + prom = locker.withReadLock(identifier, asyncCb); + await expect(prom).resolves.toBe('async'); + expect(asyncCb).toHaveBeenCalledTimes(1); + }); + + it('throws an error if the locked function resolves too slow.', async(): Promise => { + async function slowCb(): Promise { + return new Promise((resolve): any => setTimeout(resolve, 5000)); + } + const prom = locker.withReadLock(identifier, slowCb); jest.advanceTimersByTime(1000); - await Promise.resolve(); - - // Verify the error has been emitted - expect(errorCallback).toHaveBeenCalledTimes(1); - expect(errorCallback).toHaveBeenLastCalledWith(new Error('Release error')); + await expect(prom).rejects.toThrow(`Lock expired after ${expiration}ms on ${identifier.path}`); }); - it('releases the lock on the resource when data has been read.', async(): Promise => { - // Mock the inner ResourceLocker. - const release = jest.fn(async(): Promise => order.push('release')); - const lock = { release }; - const locker = { - acquire: jest.fn(async(): Promise => { - order.push('acquire'); - return lock; - }), - }; + it('can reset the timer within the locked function.', async(): Promise => { + async function refreshCb(maintainLock: () => void): Promise { + return new Promise((resolve): any => { + setTimeout(maintainLock, 750); + setTimeout((): void => resolve('refresh'), 1500); + }); + } + const prom = locker.withReadLock(identifier, refreshCb); + jest.advanceTimersByTime(1500); + await expect(prom).resolves.toBe('refresh'); + }); - const expiringLocker = new WrappedExpiringResourceLocker(locker, 1000); - const expiringLock = await expiringLocker.acquire({} as any); - - // Mimic the behavior of a LockingResourceStore to test the expiringLock methods called. - const source = streamifyArray([ 1, 2, 3 ]); - // eslint-disable-next-line jest/valid-expect-in-promise - new Promise((resolve): void => { - source.on('end', resolve); - source.on('close', resolve); - }).then((): any => expiringLock.release(), null); - const readable = Object.create(source, { - read: { - value(size: number): any { - expiringLock.renew(); - return source.read(size); - }, - }, - }); - - // Read all data from the "representation" - readable.on('data', (): any => true); - await registerEventOrder(readable, 'end'); - - // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'end', 'release' ]); + it('can still error after resetting the timer.', async(): Promise => { + async function refreshCb(maintainLock: () => void): Promise { + return new Promise((resolve): any => { + setTimeout(maintainLock, 750); + setTimeout(maintainLock, 1500); + setTimeout(resolve, 5000); + }); + } + const prom = locker.withReadLock(identifier, refreshCb); + jest.advanceTimersByTime(5000); + await expect(prom).rejects.toThrow(`Lock expired after ${expiration}ms on ${identifier.path}`); }); });