feat: Implement ExpiringLock and -ResourceLocker

This commit is contained in:
smessie 2020-11-10 16:27:48 +01:00 committed by Joachim Van Herwegen
parent 95ab0b4e76
commit 9fd8440525
7 changed files with 179 additions and 37 deletions

View File

@ -0,0 +1,13 @@
import type { EventEmitter } from 'events';
import type { Lock } from './Lock';
/**
* ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations.
* Emits an "expired" event when internal timer runs out and should call release function when this happen.
*/
export interface ExpiringLock extends Lock, EventEmitter {
/**
* Reset the unlock timer.
*/
renew: () => void;
}

View File

@ -0,0 +1,8 @@
import type { ExpiringLock } from './ExpiringLock';
import type { ResourceLocker } from './ResourceLocker';
/**
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
* Specific {@link ResourceLocker} to work with {@link ExpiringLock}s.
*/
export interface ExpiringResourceLocker<T extends ExpiringLock = ExpiringLock> extends ResourceLocker<T> {}

View File

@ -6,12 +6,10 @@ import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifie
import { getLoggerFor } from '../logging/LogUtil';
import type { AtomicResourceStore } from './AtomicResourceStore';
import type { Conditions } from './Conditions';
import type { ResourceLocker } from './ResourceLocker';
import type { ExpiringLock } from './ExpiringLock';
import type { ExpiringResourceLocker } from './ExpiringResourceLocker';
import type { ResourceStore } from './ResourceStore';
/** Time in ms after which reading a representation times out, causing the lock to be released. */
const READ_TIMEOUT = 1000;
/**
* Store that for every call acquires a lock before executing it on the requested resource,
* and releases it afterwards.
@ -20,9 +18,9 @@ export class LockingResourceStore implements AtomicResourceStore {
protected readonly logger = getLoggerFor(this);
private readonly source: ResourceStore;
private readonly locks: ResourceLocker;
private readonly locks: ExpiringResourceLocker;
public constructor(source: ResourceStore, locks: ResourceLocker) {
public constructor(source: ResourceStore, locks: ExpiringResourceLocker) {
this.source = source;
this.locks = locks;
}
@ -83,7 +81,7 @@ export class LockingResourceStore implements AtomicResourceStore {
try {
// Make the resource time out to ensure that the lock is always released eventually.
representation = await func();
return this.createExpiringRepresentation(representation);
return this.createExpiringRepresentation(representation, lock);
} finally {
// If the representation contains a valid Readable, wait for it to be consumed.
const data = representation?.data;
@ -106,10 +104,11 @@ export class LockingResourceStore implements AtomicResourceStore {
* Wraps a representation to make it time out when nothing is read for a certain amount of time.
*
* @param source - The representation to wrap
* @param lock - The lock for the corresponding identifier.
*/
protected createExpiringRepresentation(source: Representation): Representation {
protected createExpiringRepresentation(source: Representation, lock: ExpiringLock): Representation {
return Object.create(source, {
data: { value: this.createExpiringReadable(source.data) },
data: { value: this.createExpiringReadable(source.data, lock) },
});
}
@ -117,26 +116,22 @@ export class LockingResourceStore implements AtomicResourceStore {
* Wraps a readable to make it time out when nothing is read for a certain amount of time.
*
* @param source - The readable to wrap
* @param lock - The lock for the corresponding identifier.
*/
protected createExpiringReadable(source: Readable): Readable {
protected createExpiringReadable(source: Readable, lock: ExpiringLock): Readable {
// Destroy the source when a timeout occurs.
const destroySource = (): void => {
this.logger.info(`Stream reading timout of ${READ_TIMEOUT}ms exceeded; destroying source`);
source.destroy(new Error(`Stream reading timout of ${READ_TIMEOUT}ms exceeded`));
source.destroy(new Error(`Stream reading timout exceeded`));
};
let timeout = setTimeout(destroySource, READ_TIMEOUT);
// Cancel the timeout when the source terminates by itself.
const cancelTimeout = (): void => clearTimeout(timeout);
source.on('error', cancelTimeout);
source.on('end', cancelTimeout);
// Handle the destruction of the source when the lock expires.
lock.on('expired', destroySource);
// Spy on the source to reset the timeout on read.
// Spy on the source to renew the lock upon reading.
return Object.create(source, {
read: {
value(size: number): any {
cancelTimeout();
timeout = setTimeout(destroySource, READ_TIMEOUT);
lock.renew();
return source.read(size);
},
},

View File

@ -4,12 +4,12 @@ import type { Lock } from './Lock';
/**
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
*/
export interface ResourceLocker {
export interface ResourceLocker<T extends Lock = Lock> {
/**
* Lock the given resource.
* @param identifier - Identifier of the resource that needs to be locked.
*
* @returns A promise containing the lock on the resource.
*/
acquire: (identifier: ResourceIdentifier) => Promise<Lock>;
acquire: (identifier: ResourceIdentifier) => Promise<T>;
}

View File

@ -0,0 +1,60 @@
import { EventEmitter } from 'events';
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
import { getLoggerFor } from '../logging/LogUtil';
import type { ExpiringLock } from './ExpiringLock';
import type { Lock } from './Lock';
/**
* An implementation of an expiring lock which defines the expiration logic.
*
* ExpiringLock used by a {@link ExpiringResourceLocker} for non-atomic operations.
* Emits an "expired" event when internal timer runs out and calls release function when this happen.
*/
export class WrappedExpiringLock extends EventEmitter implements ExpiringLock {
protected readonly logger = getLoggerFor(this);
protected readonly innerLock: Lock;
protected readonly readTimeout: number;
protected readonly identifier: ResourceIdentifier;
protected timeout: NodeJS.Timeout;
/**
* @param innerLock - Instance of ResourceLocker to use for acquiring a lock.
* @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released.
* @param identifier - Identifier of the resource that needs to be locked.
*/
public constructor(innerLock: Lock, readTimeout: number, identifier: ResourceIdentifier) {
super();
this.innerLock = innerLock;
this.readTimeout = readTimeout;
this.identifier = identifier;
this.timeout = setTimeout((): any => this.emitExpired(), readTimeout);
}
/**
* Release this lock.
* @returns A promise resolving when the release is finished.
*/
public async release(): Promise<void> {
clearTimeout(this.timeout);
return this.innerLock.release();
}
/**
* Reset the unlock timer.
*/
public renew(): void {
this.logger.verbose(`Renewed expiring timer of the lock for ${this.identifier.path}`);
clearTimeout(this.timeout);
this.timeout = setTimeout((): any => this.emitExpired(), this.readTimeout);
}
/**
* This function will be called when the timer expires.
*/
protected async emitExpired(): Promise<void> {
this.logger.verbose(`Lock expired after exceeding timeout of ${this.readTimeout}ms for ${this.identifier.path}`);
this.emit('expired');
return this.innerLock.release();
}
}

View File

@ -0,0 +1,37 @@
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
import { getLoggerFor } from '../logging/LogUtil';
import type { ExpiringLock } from './ExpiringLock';
import type { ExpiringResourceLocker } from './ExpiringResourceLocker';
import type { ResourceLocker } from './ResourceLocker';
import { WrappedExpiringLock } from './WrappedExpiringLock';
/**
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
* Differs from {@Link ResourceLocker} by adding expiration logic.
*/
export class WrappedExpiringResourceLocker implements ExpiringResourceLocker {
protected readonly logger = getLoggerFor(this);
protected readonly locker: ResourceLocker;
protected readonly readTimeout: number;
/**
* @param locker - Instance of ResourceLocker to use for acquiring a lock.
* @param readTimeout - Time in ms after which reading a representation times out, causing the lock to be released.
*/
public constructor(locker: ResourceLocker, readTimeout: number) {
this.locker = locker;
this.readTimeout = readTimeout;
}
/**
* Lock the given resource with a lock providing expiration functionality.
* @param identifier - Identifier of the resource that needs to be locked.
*
* @returns A promise containing the expiring lock on the resource.
*/
public async acquire(identifier: ResourceIdentifier): Promise<ExpiringLock> {
const innerLock = await this.locker.acquire(identifier);
return new WrappedExpiringLock(innerLock, this.readTimeout, identifier);
}
}

View File

@ -2,20 +2,25 @@ import type { EventEmitter } from 'events';
import streamifyArray from 'streamify-array';
import type { Patch } from '../../../src/ldp/http/Patch';
import type { Representation } from '../../../src/ldp/representation/Representation';
import type { Lock } from '../../../src/storage/Lock';
import type { ExpiringLock } from '../../../src/storage/ExpiringLock';
import type { ExpiringResourceLocker } from '../../../src/storage/ExpiringResourceLocker';
import { LockingResourceStore } from '../../../src/storage/LockingResourceStore';
import type { ResourceLocker } from '../../../src/storage/ResourceLocker';
import type { ResourceStore } from '../../../src/storage/ResourceStore';
import { WrappedExpiringResourceLocker } from '../../../src/storage/WrappedExpiringResourceLocker';
describe('A LockingResourceStore', (): void => {
let store: LockingResourceStore;
let locker: ResourceLocker;
let lock: Lock;
let locker: ExpiringResourceLocker;
let lock: ExpiringLock;
let release: () => Promise<void>;
let renew: () => void;
let source: ResourceStore;
let order: string[];
let funcOnEmit: () => any;
beforeEach(async(): Promise<void> => {
jest.clearAllMocks();
order = [];
const delayedResolve = (resolve: (resolveParams: any) => void, name: string, resolveParams?: any): void => {
// `setImmediate` is introduced to make sure the promise doesn't execute immediately
@ -40,10 +45,26 @@ describe('A LockingResourceStore', (): void => {
new Promise((resolve): any => delayedResolve(resolve, 'modifyResource'))),
};
release = jest.fn(async(): Promise<any> => order.push('release'));
renew = jest.fn();
funcOnEmit = (): any => true;
locker = {
acquire: jest.fn(async(): Promise<any> => {
order.push('acquire');
lock = { release };
lock = {
release,
renew,
on(event: string, func: () => void): void {
if (event === 'expired') {
funcOnEmit = func;
}
},
emit(event: string): void {
if (event === 'expired') {
funcOnEmit();
}
},
} as unknown as ExpiringLock;
return lock;
}),
};
@ -171,6 +192,12 @@ describe('A LockingResourceStore', (): void => {
it('destroys the stream when nothing is read after 1000ms.', async(): Promise<void> => {
jest.useFakeTimers();
// Spy on a real ResourceLocker instance
const strLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, strLocker);
const acquireSpy = jest.spyOn(strLocker, 'acquire');
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();
representation.data.on('error', errorCallback);
@ -182,18 +209,22 @@ describe('A LockingResourceStore', (): void => {
// Verify a timeout error was thrown
expect(errorCallback).toHaveBeenCalledTimes(1);
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded'));
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded'));
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(acquireSpy).toHaveBeenCalledTimes(1);
expect(acquireSpy).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
});
it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise<void> => {
jest.useFakeTimers();
// Spy on a real ResourceLocker instance
const strLocker = new WrappedExpiringResourceLocker(locker, 1000);
store = new LockingResourceStore(source, strLocker);
const acquireSpy = jest.spyOn(strLocker, 'acquire');
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();
representation.data.on('error', errorCallback);
@ -213,13 +244,11 @@ describe('A LockingResourceStore', (): void => {
// Verify a timeout error was thrown
expect(errorCallback).toHaveBeenCalledTimes(1);
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded'));
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded'));
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(acquireSpy).toHaveBeenCalledTimes(1);
expect(acquireSpy).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
});
});