feat: Update ResourceLocker interface

By making use of withReadLock and withWriteLock instead
of acquiring the locks themselves it's easier to keep control
of what happens.
This commit is contained in:
Joachim Van Herwegen 2021-01-19 15:42:53 +01:00
parent 038693a679
commit 4d440c6c69
5 changed files with 85 additions and 82 deletions

View File

@ -192,7 +192,6 @@ export * from './util/identifiers/SingleRootIdentifierStrategy';
// Util/Locking
export * from './util/locking/ExpiringLock';
export * from './util/locking/ExpiringResourceLocker';
export * from './util/locking/Lock';
export * from './util/locking/ResourceLocker';
export * from './util/locking/SingleThreadedResourceLocker';
export * from './util/locking/WrappedExpiringResourceLocker';

View File

@ -1,10 +0,0 @@
/**
* Lock used by a {@link ResourceLocker} for non-atomic operations.
*/
export interface Lock {
/**
* Release this lock.
* @returns A promise resolving when the release is finished.
*/
release: () => Promise<void>;
}

View File

@ -1,15 +1,30 @@
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
import type { Lock } from './Lock';
/**
* Allows the locking of resources which is needed for non-atomic {@link ResourceStore}s.
*/
export interface ResourceLocker<T extends Lock = Lock> {
export interface ResourceLocker {
/**
* Lock the given resource.
* @param identifier - Identifier of the resource that needs to be locked.
* Run the given function while the resource is locked.
* The lock will be released when the (async) input function resolves.
* This function should be used for operations that only require reading the resource.
*
* @returns A promise containing the lock on the resource.
* @param identifier - Identifier of the resource that needs to be locked.
* @param whileLocked - A function to execute while the resource is locked.
*
* @returns A promise resolving when the lock is released.
*/
acquire: (identifier: ResourceIdentifier) => Promise<T>;
withReadLock: <T>(identifier: ResourceIdentifier, whileLocked: () => T | Promise<T>) => Promise<T>;
/**
* Run the given function while the resource is locked.
* The lock will be released when the (async) input function resolves.
* This function should be used for operations that could modify the resource.
*
* @param identifier - Identifier of the resource that needs to be locked.
* @param whileLocked - A function to execute while the resource is locked.
*
* @returns A promise resolving when the lock is released.
*/
withWriteLock: <T>(identifier: ResourceIdentifier, whileLocked: () => T | Promise<T>) => Promise<T>;
}

View File

@ -1,11 +1,12 @@
import AsyncLock from 'async-lock';
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
import type { Lock } from './Lock';
import type { ResourceLocker } from './ResourceLocker';
/**
* A resource locker making use of the `async-lock` library.
* Read and write locks use the same locks so no preference is given to any operations.
* This should be changed at some point though, see #542.
*/
export class SingleThreadedResourceLocker implements ResourceLocker {
protected readonly logger = getLoggerFor(this);
@ -16,25 +17,30 @@ export class SingleThreadedResourceLocker implements ResourceLocker {
this.locks = new AsyncLock();
}
public async withReadLock<T>(identifier: ResourceIdentifier, whileLocked: () => T | Promise<T>): Promise<T> {
return this.withLock(identifier, whileLocked);
}
public async withWriteLock<T>(identifier: ResourceIdentifier, whileLocked: () => T | Promise<T>): Promise<T> {
return this.withLock(identifier, whileLocked);
}
/**
* Acquires a new lock for the requested identifier.
* Will resolve when the lock is available.
* Will resolve when the input function resolves.
* @param identifier - Identifier of resource that needs to be locked.
*
* @returns The {@link Lock} when it's available. Its release function needs to be called when finished.
* @param whileLocked - Function to resolve while the resource is locked.
*/
public async acquire(identifier: ResourceIdentifier): Promise<Lock> {
this.logger.verbose(`Acquiring lock for ${identifier.path}`);
return new Promise((resolve, reject): void => {
this.locks.acquire(identifier.path, (done): void => {
this.logger.verbose(`Acquired lock for ${identifier.path}`);
resolve({
release: async(): Promise<void> => {
this.logger.verbose(`Released lock for ${identifier.path}`);
done();
},
});
}).catch(reject);
});
private async withLock<T>(identifier: ResourceIdentifier, whileLocked: () => T | Promise<T>): Promise<T> {
this.logger.debug(`Acquiring lock for ${identifier.path}`);
try {
return await this.locks.acquire(identifier.path, async(): Promise<T> => {
this.logger.debug(`Acquired lock for ${identifier.path}`);
return whileLocked();
});
} finally {
this.logger.debug(`Released lock for ${identifier.path}`);
}
}
}

View File

@ -2,68 +2,61 @@ import { SingleThreadedResourceLocker } from '../../../../src/util/locking/Singl
describe('A SingleThreadedResourceLocker', (): void => {
let locker: SingleThreadedResourceLocker;
const identifier = { path: 'path' };
let syncCb: () => string;
let asyncCb: () => Promise<string>;
beforeEach(async(): Promise<void> => {
locker = new SingleThreadedResourceLocker();
syncCb = jest.fn((): string => 'sync');
asyncCb = jest.fn(async(): Promise<string> => new Promise((resolve): void => {
setImmediate((): void => resolve('async'));
}));
});
it('can acquire a lock.', async(): Promise<void> => {
const lock = await locker.acquire({ path: 'path' });
expect(lock).toEqual(expect.objectContaining({ release: expect.any(Function) }));
it('can run simple functions with a read lock.', async(): Promise<void> => {
let prom = locker.withReadLock(identifier, syncCb);
await expect(prom).resolves.toBe('sync');
expect(syncCb).toHaveBeenCalledTimes(1);
prom = locker.withReadLock(identifier, asyncCb);
await expect(prom).resolves.toBe('async');
expect(asyncCb).toHaveBeenCalledTimes(1);
});
it('can release an acquired lock.', async(): Promise<void> => {
const lock = await locker.acquire({ path: 'path' });
await expect(lock.release()).resolves.toBeUndefined();
it('can run simple functions with a write lock.', async(): Promise<void> => {
let prom = locker.withWriteLock(identifier, syncCb);
await expect(prom).resolves.toBe('sync');
expect(syncCb).toHaveBeenCalledTimes(1);
prom = locker.withWriteLock(identifier, asyncCb);
await expect(prom).resolves.toBe('async');
expect(asyncCb).toHaveBeenCalledTimes(1);
});
it('can acquire a lock after it was released.', async(): Promise<void> => {
let lock = await locker.acquire({ path: 'path' });
await lock.release();
lock = await locker.acquire({ path: 'path' });
expect(lock).toEqual(expect.objectContaining({ release: expect.any(Function) }));
});
/* eslint-disable jest/valid-expect-in-promise */
it('blocks lock acquisition until they are released.', async(): Promise<void> => {
const results: number[] = [];
const lock1 = locker.acquire({ path: 'path' });
const lock2 = locker.acquire({ path: 'path' });
const lock3 = locker.acquire({ path: 'path' });
// Note the different order of calls
const prom2 = lock2.then(async(lock): Promise<void> => {
const promSlow = locker.withWriteLock(identifier, async(): Promise<void> =>
new Promise((resolve): void => {
setImmediate((): void => {
results.push(1);
resolve();
});
}));
const promFast = locker.withWriteLock(identifier, (): void => {
results.push(2);
return lock.release();
});
const prom3 = lock3.then(async(lock): Promise<void> => {
results.push(3);
return lock.release();
});
const prom1 = lock1.then(async(lock): Promise<void> => {
results.push(1);
return lock.release();
});
await Promise.all([ prom2, prom3, prom1 ]);
expect(results).toEqual([ 1, 2, 3 ]);
await Promise.all([ promFast, promSlow ]);
expect(results).toEqual([ 1, 2 ]);
});
it('can acquire different keys simultaneously.', async(): Promise<void> => {
const results: number[] = [];
const lock1 = locker.acquire({ path: 'path1' });
const lock2 = locker.acquire({ path: 'path2' });
const lock3 = locker.acquire({ path: 'path3' });
await lock2.then(async(lock): Promise<void> => {
results.push(2);
return lock.release();
});
await lock3.then(async(lock): Promise<void> => {
results.push(3);
return lock.release();
});
await lock1.then(async(lock): Promise<void> => {
results.push(1);
return lock.release();
});
expect(results).toEqual([ 2, 3, 1 ]);
it('propagates errors.', async(): Promise<void> => {
asyncCb = jest.fn(async(): Promise<string> => new Promise((resolve, reject): void => {
setImmediate((): void => reject(new Error('test')));
}));
const prom = locker.withReadLock(identifier, asyncCb);
await expect(prom).rejects.toThrow('test');
});
});