chore: Review expiring locks.

This commit is contained in:
Ruben Verborgh 2020-11-10 19:40:34 +01:00 committed by Joachim Van Herwegen
parent 9fd8440525
commit 6f4c4a15b4
7 changed files with 73 additions and 37 deletions

View File

@ -2,12 +2,13 @@ import type { EventEmitter } from 'events';
import type { Lock } from './Lock';
/**
* ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations.
* Emits an "expired" event when internal timer runs out and should call release function when this happen.
* Interface for a lock that expires after a certain period of inactivity.
* Activity can be signaled by calling `renew`, which resets the expiration timeout.
* When the lock has expired, an `expired` event is emitted and the lock is released.
*/
export interface ExpiringLock extends Lock, EventEmitter {
/**
* Reset the unlock timer.
* Reset the lock expiration timeout.
*/
renew: () => void;
}

View File

@ -2,7 +2,6 @@ import type { ExpiringLock } from './ExpiringLock';
import type { ResourceLocker } from './ResourceLocker';
/**
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
* Specific {@link ResourceLocker} to work with {@link ExpiringLock}s.
* Interface for a factory of expiring locks.
*/
export interface ExpiringResourceLocker<T extends ExpiringLock = ExpiringLock> extends ResourceLocker<T> {}

View File

@ -120,12 +120,9 @@ export class LockingResourceStore implements AtomicResourceStore {
*/
protected createExpiringReadable(source: Readable, lock: ExpiringLock): Readable {
// Destroy the source when a timeout occurs.
const destroySource = (): void => {
lock.on('expired', (): void => {
source.destroy(new Error(`Stream reading timout exceeded`));
};
// Handle the destruction of the source when the lock expires.
lock.on('expired', destroySource);
});
// Spy on the source to renew the lock upon reading.
return Object.create(source, {

View File

@ -14,21 +14,21 @@ export class WrappedExpiringLock extends EventEmitter implements ExpiringLock {
protected readonly logger = getLoggerFor(this);
protected readonly innerLock: Lock;
protected readonly readTimeout: number;
protected readonly expiration: number;
protected readonly identifier: ResourceIdentifier;
protected timeout: NodeJS.Timeout;
protected timeoutHandle?: NodeJS.Timeout;
/**
* @param innerLock - Instance of ResourceLocker to use for acquiring a lock.
* @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released.
* @param expiration - Time in ms after which the lock expires.
* @param identifier - Identifier of the resource that needs to be locked.
*/
public constructor(innerLock: Lock, readTimeout: number, identifier: ResourceIdentifier) {
public constructor(innerLock: Lock, expiration: number, identifier: ResourceIdentifier) {
super();
this.innerLock = innerLock;
this.readTimeout = readTimeout;
this.expiration = expiration;
this.identifier = identifier;
this.timeout = setTimeout((): any => this.emitExpired(), readTimeout);
this.scheduleTimeout();
}
/**
@ -36,7 +36,7 @@ export class WrappedExpiringLock extends EventEmitter implements ExpiringLock {
* @returns A promise resolving when the release is finished.
*/
public async release(): Promise<void> {
clearTimeout(this.timeout);
this.clearTimeout();
return this.innerLock.release();
}
@ -44,17 +44,26 @@ export class WrappedExpiringLock extends EventEmitter implements ExpiringLock {
* Reset the unlock timer.
*/
public renew(): void {
this.logger.verbose(`Renewed expiring timer of the lock for ${this.identifier.path}`);
clearTimeout(this.timeout);
this.timeout = setTimeout((): any => this.emitExpired(), this.readTimeout);
this.clearTimeout();
this.scheduleTimeout();
}
/**
* This function will be called when the timer expires.
*/
protected async emitExpired(): Promise<void> {
this.logger.verbose(`Lock expired after exceeding timeout of ${this.readTimeout}ms for ${this.identifier.path}`);
private async expire(): Promise<void> {
this.logger.verbose(`Lock for ${this.identifier.path} expired after ${this.expiration}ms`);
this.emit('expired');
return this.innerLock.release();
try {
await this.innerLock.release();
} catch (error: unknown) {
this.emit('error', error);
}
}
private clearTimeout(): void {
clearTimeout(this.timeoutHandle!);
}
private scheduleTimeout(): void {
this.logger.verbose(`Renewed expiring lock for ${this.identifier.path}`);
this.timeoutHandle = setTimeout((): any => this.expire(), this.expiration);
}
}

View File

@ -13,15 +13,15 @@ export class WrappedExpiringResourceLocker implements ExpiringResourceLocker {
protected readonly logger = getLoggerFor(this);
protected readonly locker: ResourceLocker;
protected readonly readTimeout: number;
protected readonly expiration: number;
/**
* @param locker - Instance of ResourceLocker to use for acquiring a lock.
* @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released.
* @param expiration - Time in ms after which the lock expires.
*/
public constructor(locker: ResourceLocker, readTimeout: number) {
public constructor(locker: ResourceLocker, expiration: number) {
this.locker = locker;
this.readTimeout = readTimeout;
this.expiration = expiration;
}
/**
@ -32,6 +32,6 @@ export class WrappedExpiringResourceLocker implements ExpiringResourceLocker {
*/
public async acquire(identifier: ResourceIdentifier): Promise<ExpiringLock> {
const innerLock = await this.locker.acquire(identifier);
return new WrappedExpiringLock(innerLock, this.readTimeout, identifier);
return new WrappedExpiringLock(innerLock, this.expiration, identifier);
}
}

View File

@ -194,9 +194,9 @@ describe('A LockingResourceStore', (): void => {
jest.useFakeTimers();
// Spy on a real ResourceLocker instance
const strLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, strLocker);
const acquireSpy = jest.spyOn(strLocker, 'acquire');
const expiringLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, expiringLocker);
const acquireSpy = jest.spyOn(expiringLocker, 'acquire');
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();
@ -221,9 +221,9 @@ describe('A LockingResourceStore', (): void => {
jest.useFakeTimers();
// Spy on a real ResourceLocker instance
const strLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, strLocker);
const acquireSpy = jest.spyOn(strLocker, 'acquire');
const expiringLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, expiringLocker);
const acquireSpy = jest.spyOn(expiringLocker, 'acquire');
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();

View File

@ -0,0 +1,30 @@
import { WrappedExpiringResourceLocker } from '../../../src/storage/WrappedExpiringResourceLocker';
describe('A WrappedExpiringResourceLocker', (): void => {
it('emits an error event when releasing the lock errors.', async(): Promise<void> => {
jest.useFakeTimers();
// Create a locker that fails upon release
const faultyLocker = {
acquire(): any {
return {
async release(): Promise<never> {
throw new Error('Release error');
},
};
},
};
const expiringLocker = new WrappedExpiringResourceLocker(faultyLocker, 1000);
const expiringLock = await expiringLocker.acquire({} as any);
const errorCallback = jest.fn();
expiringLock.on('error', errorCallback);
// Let the lock expire
jest.advanceTimersByTime(1000);
await Promise.resolve();
// Verify the error has been emitted
expect(errorCallback).toHaveBeenCalledTimes(1);
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Release error'));
});
});