mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
feat: Update WrappedExpiringResourceLocker to new interface
Due to the new interface, it is now possible to throw an error if there is a timeout which should make it easier to find locking issues.
This commit is contained in:
@@ -190,7 +190,6 @@ export * from './util/identifiers/IdentifierStrategy';
|
||||
export * from './util/identifiers/SingleRootIdentifierStrategy';
|
||||
|
||||
// Util/Locking
|
||||
export * from './util/locking/ExpiringLock';
|
||||
export * from './util/locking/ExpiringResourceLocker';
|
||||
export * from './util/locking/ResourceLocker';
|
||||
export * from './util/locking/SingleThreadedResourceLocker';
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
import type { EventEmitter } from 'events';
|
||||
import type { Lock } from './Lock';
|
||||
|
||||
/**
|
||||
* Interface for a lock that expires after a certain period of inactivity.
|
||||
* Activity can be signaled by calling `renew`, which resets the expiration timeout.
|
||||
* When the lock has expired, an `expired` event is emitted and the lock is released.
|
||||
*/
|
||||
export interface ExpiringLock extends Lock, EventEmitter {
|
||||
/**
|
||||
* Reset the lock expiration timeout.
|
||||
*/
|
||||
renew: () => void;
|
||||
}
|
||||
@@ -1,7 +1,31 @@
|
||||
import type { ExpiringLock } from './ExpiringLock';
|
||||
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
|
||||
import type { ResourceLocker } from './ResourceLocker';
|
||||
|
||||
/**
|
||||
* Interface for a factory of expiring locks.
|
||||
* A {@link ResourceLocker} where the locks expire after a given time.
|
||||
*/
|
||||
export interface ExpiringResourceLocker<T extends ExpiringLock = ExpiringLock> extends ResourceLocker<T> {}
|
||||
export interface ExpiringResourceLocker extends ResourceLocker {
|
||||
/**
|
||||
* As {@link ResourceLocker.withReadLock} but the locked function gets called with a `maintainLock` callback function
|
||||
* to reset the lock expiration every time it is called.
|
||||
* The resulting promise will reject once the lock expires.
|
||||
*
|
||||
* @param identifier - Identifier of the resource that needs to be locked.
|
||||
* @param whileLocked - A function to execute while the resource is locked.
|
||||
* Receives a callback as input parameter to maintain the lock.
|
||||
*/
|
||||
withReadLock: <T>(identifier: ResourceIdentifier, whileLocked: (maintainLock: () => void) => T | Promise<T>)
|
||||
=> Promise<T>;
|
||||
|
||||
/**
|
||||
* As {@link ResourceLocker.withWriteLock} but the locked function gets called with a `maintainLock` callback function
|
||||
* to reset the lock expiration every time it is called.
|
||||
* The resulting promise will reject once the lock expires.
|
||||
*
|
||||
* @param identifier - Identifier of the resource that needs to be locked.
|
||||
* @param whileLocked - A function to execute while the resource is locked.
|
||||
* Receives a callback as input parameter to maintain the lock.
|
||||
*/
|
||||
withWriteLock: <T>(identifier: ResourceIdentifier, whileLocked: (maintainLock: () => void) => T | Promise<T>)
|
||||
=> Promise<T>;
|
||||
}
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import type { ExpiringLock } from './ExpiringLock';
|
||||
import type { Lock } from './Lock';
|
||||
|
||||
/**
|
||||
* An implementation of an expiring lock which defines the expiration logic.
|
||||
*
|
||||
* ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations.
|
||||
* Emits an "expired" event when internal timer runs out and calls release function when this happen.
|
||||
*/
|
||||
export class WrappedExpiringLock extends EventEmitter implements ExpiringLock {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
|
||||
protected readonly innerLock: Lock;
|
||||
protected readonly expiration: number;
|
||||
protected timeoutHandle?: NodeJS.Timeout;
|
||||
|
||||
/**
|
||||
* @param innerLock - Instance of ResourceLocker to use for acquiring a lock.
|
||||
* @param expiration - Time in ms after which the lock expires.
|
||||
*/
|
||||
public constructor(innerLock: Lock, expiration: number) {
|
||||
super();
|
||||
this.innerLock = innerLock;
|
||||
this.expiration = expiration;
|
||||
this.scheduleTimeout();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release this lock.
|
||||
* @returns A promise resolving when the release is finished.
|
||||
*/
|
||||
public async release(): Promise<void> {
|
||||
this.clearTimeout();
|
||||
return this.innerLock.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the unlock timer.
|
||||
*/
|
||||
public renew(): void {
|
||||
this.clearTimeout();
|
||||
this.scheduleTimeout();
|
||||
}
|
||||
|
||||
private async expire(): Promise<void> {
|
||||
this.logger.warn(`Lock expired after ${this.expiration}ms`);
|
||||
this.emit('expired');
|
||||
try {
|
||||
await this.innerLock.release();
|
||||
} catch (error: unknown) {
|
||||
this.emit('error', error);
|
||||
}
|
||||
}
|
||||
|
||||
private clearTimeout(): void {
|
||||
clearTimeout(this.timeoutHandle!);
|
||||
}
|
||||
|
||||
private scheduleTimeout(): void {
|
||||
this.logger.verbose(`Renewed expiring lock`);
|
||||
this.timeoutHandle = setTimeout((): any => this.expire(), this.expiration);
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,12 @@
|
||||
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
|
||||
import { getLoggerFor } from '../../logging/LogUtil';
|
||||
import type { ExpiringLock } from './ExpiringLock';
|
||||
import { InternalServerError } from '../errors/InternalServerError';
|
||||
import type { ExpiringResourceLocker } from './ExpiringResourceLocker';
|
||||
import type { ResourceLocker } from './ResourceLocker';
|
||||
import { WrappedExpiringLock } from './WrappedExpiringLock';
|
||||
import Timeout = NodeJS.Timeout;
|
||||
|
||||
/**
|
||||
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
|
||||
* Differs from {@Link ResourceLocker} by adding expiration logic.
|
||||
* Wraps around an existing {@link ResourceLocker} and adds expiration logic to prevent locks from getting stuck.
|
||||
*/
|
||||
export class WrappedExpiringResourceLocker implements ExpiringResourceLocker {
|
||||
protected readonly logger = getLoggerFor(this);
|
||||
@@ -24,14 +23,53 @@ export class WrappedExpiringResourceLocker implements ExpiringResourceLocker {
|
||||
this.expiration = expiration;
|
||||
}
|
||||
|
||||
public async withReadLock<T>(identifier: ResourceIdentifier,
|
||||
whileLocked: (maintainLock: () => void) => T | Promise<T>): Promise<T> {
|
||||
return this.locker.withReadLock(identifier, async(): Promise<T> => this.expiringPromise(identifier, whileLocked));
|
||||
}
|
||||
|
||||
public async withWriteLock<T>(identifier: ResourceIdentifier,
|
||||
whileLocked: (maintainLock: () => void) => T | Promise<T>): Promise<T> {
|
||||
return this.locker.withWriteLock(identifier, async(): Promise<T> => this.expiringPromise(identifier, whileLocked));
|
||||
}
|
||||
|
||||
/**
|
||||
* Lock the given resource with a lock providing expiration functionality.
|
||||
* @param identifier - Identifier of the resource that needs to be locked.
|
||||
*
|
||||
* @returns A promise containing the expiring lock on the resource.
|
||||
* Creates a Promise that either resolves the given input function or rejects if time runs out,
|
||||
* whichever happens first. The input function can reset the timer by calling the `maintainLock` function
|
||||
* it receives. The ResourceIdentifier is only used for logging.
|
||||
*/
|
||||
public async acquire(identifier: ResourceIdentifier): Promise<ExpiringLock> {
|
||||
const innerLock = await this.locker.acquire(identifier);
|
||||
return new WrappedExpiringLock(innerLock, this.expiration);
|
||||
private async expiringPromise<T>(identifier: ResourceIdentifier,
|
||||
whileLocked: (maintainLock: () => void) => T | Promise<T>): Promise<T> {
|
||||
let timer: Timeout;
|
||||
let createTimeout: () => Timeout;
|
||||
|
||||
// Promise that throws an error when the timer finishes
|
||||
const timerPromise = new Promise<never>((resolve, reject): void => {
|
||||
// Starts the timer that will cause this promise to error after a given time
|
||||
createTimeout = (): Timeout => setTimeout((): void => {
|
||||
this.logger.error(`Lock expired after ${this.expiration}ms on ${identifier.path}`);
|
||||
reject(new InternalServerError(`Lock expired after ${this.expiration}ms on ${identifier.path}`));
|
||||
}, this.expiration);
|
||||
|
||||
timer = createTimeout();
|
||||
});
|
||||
|
||||
// Restarts the timer
|
||||
const renewTimer = (): void => {
|
||||
this.logger.verbose(`Renewed expiring lock on ${identifier.path}`);
|
||||
clearTimeout(timer);
|
||||
timer = createTimeout();
|
||||
};
|
||||
|
||||
// Runs the main function and cleans up the timer afterwards
|
||||
async function runWithTimeout(): Promise<T> {
|
||||
try {
|
||||
return await whileLocked(renewTimer);
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.race([ timerPromise, runWithTimeout() ]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,87 +1,86 @@
|
||||
import type { EventEmitter } from 'events';
|
||||
import streamifyArray from 'streamify-array';
|
||||
import type { ResourceIdentifier } from '../../../../src/ldp/representation/ResourceIdentifier';
|
||||
import type { ResourceLocker } from '../../../../src/util/locking/ResourceLocker';
|
||||
import { WrappedExpiringResourceLocker } from '../../../../src/util/locking/WrappedExpiringResourceLocker';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
describe('A WrappedExpiringResourceLocker', (): void => {
|
||||
let order: string[];
|
||||
const identifier = { path: 'path' };
|
||||
let syncCb: () => string;
|
||||
let asyncCb: () => Promise<string>;
|
||||
let wrappedLocker: ResourceLocker;
|
||||
let locker: WrappedExpiringResourceLocker;
|
||||
const expiration = 1000;
|
||||
|
||||
beforeEach(async(): Promise<void> => {
|
||||
order = [];
|
||||
wrappedLocker = {
|
||||
withReadLock: jest.fn(async<T>(id: ResourceIdentifier, whileLocked: () => T | Promise<T>):
|
||||
Promise<T> => whileLocked()),
|
||||
withWriteLock: jest.fn(async<T>(id: ResourceIdentifier, whileLocked: () => T | Promise<T>):
|
||||
Promise<T> => whileLocked()),
|
||||
};
|
||||
|
||||
syncCb = jest.fn((): string => 'sync');
|
||||
asyncCb = jest.fn(async(): Promise<string> => new Promise((resolve): void => {
|
||||
setImmediate((): void => resolve('async'));
|
||||
}));
|
||||
|
||||
locker = new WrappedExpiringResourceLocker(wrappedLocker, expiration);
|
||||
});
|
||||
|
||||
async function registerEventOrder(eventSource: EventEmitter, event: string): Promise<void> {
|
||||
await new Promise((resolve): any => {
|
||||
eventSource.prependListener(event, (): any => {
|
||||
order.push(event);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
it('calls the wrapped locker for locking.', async(): Promise<void> => {
|
||||
let prom = locker.withReadLock(identifier, syncCb);
|
||||
await expect(prom).resolves.toBe('sync');
|
||||
expect(wrappedLocker.withReadLock).toHaveBeenCalledTimes(1);
|
||||
expect((wrappedLocker.withReadLock as jest.Mock).mock.calls[0][0]).toBe(identifier);
|
||||
|
||||
it('emits an error event when releasing the lock errors.', async(): Promise<void> => {
|
||||
jest.useFakeTimers();
|
||||
prom = locker.withWriteLock(identifier, syncCb);
|
||||
await expect(prom).resolves.toBe('sync');
|
||||
expect(wrappedLocker.withWriteLock).toHaveBeenCalledTimes(1);
|
||||
expect((wrappedLocker.withWriteLock as jest.Mock).mock.calls[0][0]).toBe(identifier);
|
||||
});
|
||||
|
||||
// Create a locker that fails upon release
|
||||
const faultyLocker = {
|
||||
acquire(): any {
|
||||
return {
|
||||
async release(): Promise<never> {
|
||||
throw new Error('Release error');
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
const expiringLocker = new WrappedExpiringResourceLocker(faultyLocker, 1000);
|
||||
const expiringLock = await expiringLocker.acquire({} as any);
|
||||
const errorCallback = jest.fn();
|
||||
expiringLock.on('error', errorCallback);
|
||||
it('calls the functions that need to be locked through the wrapped locker.', async(): Promise<void> => {
|
||||
let prom = locker.withReadLock(identifier, syncCb);
|
||||
await expect(prom).resolves.toBe('sync');
|
||||
expect(syncCb).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Let the lock expire
|
||||
prom = locker.withReadLock(identifier, asyncCb);
|
||||
await expect(prom).resolves.toBe('async');
|
||||
expect(asyncCb).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('throws an error if the locked function resolves too slow.', async(): Promise<void> => {
|
||||
async function slowCb(): Promise<void> {
|
||||
return new Promise((resolve): any => setTimeout(resolve, 5000));
|
||||
}
|
||||
const prom = locker.withReadLock(identifier, slowCb);
|
||||
jest.advanceTimersByTime(1000);
|
||||
await Promise.resolve();
|
||||
|
||||
// Verify the error has been emitted
|
||||
expect(errorCallback).toHaveBeenCalledTimes(1);
|
||||
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Release error'));
|
||||
await expect(prom).rejects.toThrow(`Lock expired after ${expiration}ms on ${identifier.path}`);
|
||||
});
|
||||
|
||||
it('releases the lock on the resource when data has been read.', async(): Promise<void> => {
|
||||
// Mock the inner ResourceLocker.
|
||||
const release = jest.fn(async(): Promise<any> => order.push('release'));
|
||||
const lock = { release };
|
||||
const locker = {
|
||||
acquire: jest.fn(async(): Promise<any> => {
|
||||
order.push('acquire');
|
||||
return lock;
|
||||
}),
|
||||
};
|
||||
it('can reset the timer within the locked function.', async(): Promise<void> => {
|
||||
async function refreshCb(maintainLock: () => void): Promise<string> {
|
||||
return new Promise((resolve): any => {
|
||||
setTimeout(maintainLock, 750);
|
||||
setTimeout((): void => resolve('refresh'), 1500);
|
||||
});
|
||||
}
|
||||
const prom = locker.withReadLock(identifier, refreshCb);
|
||||
jest.advanceTimersByTime(1500);
|
||||
await expect(prom).resolves.toBe('refresh');
|
||||
});
|
||||
|
||||
const expiringLocker = new WrappedExpiringResourceLocker(locker, 1000);
|
||||
const expiringLock = await expiringLocker.acquire({} as any);
|
||||
|
||||
// Mimic the behavior of a LockingResourceStore to test the expiringLock methods called.
|
||||
const source = streamifyArray([ 1, 2, 3 ]);
|
||||
// eslint-disable-next-line jest/valid-expect-in-promise
|
||||
new Promise((resolve): void => {
|
||||
source.on('end', resolve);
|
||||
source.on('close', resolve);
|
||||
}).then((): any => expiringLock.release(), null);
|
||||
const readable = Object.create(source, {
|
||||
read: {
|
||||
value(size: number): any {
|
||||
expiringLock.renew();
|
||||
return source.read(size);
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Read all data from the "representation"
|
||||
readable.on('data', (): any => true);
|
||||
await registerEventOrder(readable, 'end');
|
||||
|
||||
// Verify the lock was acquired and released at the right time
|
||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
||||
expect(order).toEqual([ 'acquire', 'end', 'release' ]);
|
||||
it('can still error after resetting the timer.', async(): Promise<void> => {
|
||||
async function refreshCb(maintainLock: () => void): Promise<void> {
|
||||
return new Promise((resolve): any => {
|
||||
setTimeout(maintainLock, 750);
|
||||
setTimeout(maintainLock, 1500);
|
||||
setTimeout(resolve, 5000);
|
||||
});
|
||||
}
|
||||
const prom = locker.withReadLock(identifier, refreshCb);
|
||||
jest.advanceTimersByTime(5000);
|
||||
await expect(prom).rejects.toThrow(`Lock expired after ${expiration}ms on ${identifier.path}`);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user