From a9b811a5a3c14c9774878b0e5a722ae9095c6c92 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Wed, 15 Jul 2020 14:46:19 +0200 Subject: [PATCH] feature: Add lock functionality --- index.ts | 2 + package-lock.json | 10 ++ package.json | 2 + src/storage/LockingResourceStore.ts | 51 ++++++++++ src/storage/SingleThreadedResourceLocker.ts | 29 ++++++ .../unit/storage/LockingResourceStore.test.ts | 97 +++++++++++++++++++ .../SingleThreadedResourceLocker.test.ts | 68 +++++++++++++ 7 files changed, 259 insertions(+) create mode 100644 src/storage/LockingResourceStore.ts create mode 100644 src/storage/SingleThreadedResourceLocker.ts create mode 100644 test/unit/storage/LockingResourceStore.test.ts create mode 100644 test/unit/storage/SingleThreadedResourceLocker.test.ts diff --git a/index.ts b/index.ts index cb5457909..871778ceb 100644 --- a/index.ts +++ b/index.ts @@ -56,10 +56,12 @@ export * from './src/server/HttpResponse'; export * from './src/storage/AtomicResourceStore'; export * from './src/storage/Conditions'; export * from './src/storage/Lock'; +export * from './src/storage/LockingResourceStore'; export * from './src/storage/RepresentationConverter'; export * from './src/storage/ResourceLocker'; export * from './src/storage/ResourceMapper'; export * from './src/storage/ResourceStore'; +export * from './src/storage/SingleThreadedResourceLocker'; export * from './src/storage/SimpleResourceStore'; // Util/Errors diff --git a/package-lock.json b/package-lock.json index c6a61fd3d..13a234b3e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -723,6 +723,11 @@ "@types/node": "*" } }, + "@types/async-lock": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@types/async-lock/-/async-lock-1.1.2.tgz", + "integrity": "sha512-j9n4bb6RhgFIydBe0+kpjnBPYumDaDyU8zvbWykyVMkku+c2CSu31MZkLeaBfqIwU+XCxlDpYDfyMQRkM0AkeQ==" + }, "@types/babel__core": { "version": "7.1.7", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.7.tgz", @@ -1340,6 +1345,11 @@ "integrity": "sha512-+Ryf6g3BKoRc7jfp7ad8tM4TtMiaWvbF/1/sQcZPkkS7ag3D5nMBCe2UfOTONtAkaG0tO0ij3C5Lwmf1EiyjHg==", "dev": true }, + "async-lock": { + "version": "1.2.4", + "resolved": "https://registry.npmjs.org/async-lock/-/async-lock-1.2.4.tgz", + "integrity": "sha512-UBQJC2pbeyGutIfYmErGc9RaJYnpZ1FHaxuKwb0ahvGiiCkPUf3p67Io+YLPmmv3RHY+mF6JEtNW8FlHsraAaA==" + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", diff --git a/package.json b/package.json index 018093f87..8efabca90 100644 --- a/package.json +++ b/package.json @@ -29,12 +29,14 @@ ], "dependencies": { "@rdfjs/data-model": "^1.1.2", + "@types/async-lock": "^1.1.2", "@types/cors": "^2.8.6", "@types/express": "^4.17.6", "@types/n3": "^1.4.0", "@types/node": "^14.0.1", "@types/rdf-js": "^3.0.0", "@types/yargs": "^15.0.5", + "async-lock": "^1.2.4", "cors": "^2.8.5", "express": "^4.17.1", "n3": "^1.4.0", diff --git a/src/storage/LockingResourceStore.ts b/src/storage/LockingResourceStore.ts new file mode 100644 index 000000000..8361d6f66 --- /dev/null +++ b/src/storage/LockingResourceStore.ts @@ -0,0 +1,51 @@ +import { AtomicResourceStore } from './AtomicResourceStore'; +import { Conditions } from './Conditions'; +import { Patch } from '../ldp/http/Patch'; +import { Representation } from '../ldp/representation/Representation'; +import { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences'; +import { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; +import { ResourceLocker } from './ResourceLocker'; +import { ResourceStore } from './ResourceStore'; + +/** + * Store that for every call acquires a lock before executing it on the requested resource, + * and releases it afterwards. + */ +export class LockingResourceStore implements AtomicResourceStore { + private readonly source: ResourceStore; + private readonly locks: ResourceLocker; + + public constructor(source: ResourceStore, locks: ResourceLocker) { + this.source = source; + this.locks = locks; + } + + public async addResource(container: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { + return this.lockedRun(container, async(): Promise => this.source.addResource(container, representation, conditions)); + } + + public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise { + return this.lockedRun(identifier, async(): Promise => this.source.deleteResource(identifier, conditions)); + } + + public async getRepresentation(identifier: ResourceIdentifier, preferences: RepresentationPreferences, conditions?: Conditions): Promise { + return this.lockedRun(identifier, async(): Promise => this.source.getRepresentation(identifier, preferences, conditions)); + } + + public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise { + return this.lockedRun(identifier, async(): Promise => this.source.modifyResource(identifier, patch, conditions)); + } + + public async setRepresentation(identifier: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { + return this.lockedRun(identifier, async(): Promise => this.source.setRepresentation(identifier, representation, conditions)); + } + + private async lockedRun(identifier: ResourceIdentifier, func: () => Promise): Promise { + const lock = await this.locks.acquire(identifier); + try { + return await func(); + } finally { + await lock.release(); + } + } +} diff --git a/src/storage/SingleThreadedResourceLocker.ts b/src/storage/SingleThreadedResourceLocker.ts new file mode 100644 index 000000000..bebaa9d9d --- /dev/null +++ b/src/storage/SingleThreadedResourceLocker.ts @@ -0,0 +1,29 @@ +import AsyncLock from 'async-lock'; +import { Lock } from './Lock'; +import { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; +import { ResourceLocker } from './ResourceLocker'; + +/** + * A resource locker making use of the `async-lock` library. + */ +export class SingleThreadedResourceLocker implements ResourceLocker { + private readonly locks: AsyncLock; + + public constructor() { + this.locks = new AsyncLock(); + } + + /** + * Acquires a new lock for the requested identifier. + * Will resolve when the lock is available. + * @param identifier - Identifier of resource that needs to be locked. + * + * @returns The {@link Lock} when it's available. Its release function needs to be called when finished. + */ + public async acquire(identifier: ResourceIdentifier): Promise { + return new Promise(async(resolve): Promise => + this.locks.acquire(identifier.path, (done): void => { + resolve({ release: async(): Promise => done() }); + })); + } +} diff --git a/test/unit/storage/LockingResourceStore.test.ts b/test/unit/storage/LockingResourceStore.test.ts new file mode 100644 index 000000000..906f6daaa --- /dev/null +++ b/test/unit/storage/LockingResourceStore.test.ts @@ -0,0 +1,97 @@ +import { Lock } from '../../../src/storage/Lock'; +import { LockingResourceStore } from '../../../src/storage/LockingResourceStore'; +import { ResourceLocker } from '../../../src/storage/ResourceLocker'; +import { ResourceStore } from '../../../src/storage/ResourceStore'; + +describe('A LockingResourceStore', (): void => { + let store: LockingResourceStore; + let locker: ResourceLocker; + let lock: Lock; + let release: () => Promise; + let source: ResourceStore; + let order: string[]; + + beforeEach(async(): Promise => { + order = []; + const delayedResolve = (resolve: () => void, name: string): void => { + // `setImmediate` is introduced to make sure the promise doesn't execute immediately + setImmediate((): void => { + order.push(name); + resolve(); + }); + }; + + source = { + getRepresentation: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation'))), + addResource: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'addResource'))), + setRepresentation: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'setRepresentation'))), + deleteResource: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'deleteResource'))), + modifyResource: jest.fn(async(): Promise => new Promise((resolve): any => delayedResolve(resolve, 'modifyResource'))), + }; + release = jest.fn(async(): Promise => order.push('release')); + locker = { + acquire: jest.fn(async(): Promise => { + order.push('acquire'); + lock = { release }; + return lock; + }), + }; + store = new LockingResourceStore(source, locker); + }); + + it('acquires a lock on the resource when getting it.', async(): Promise => { + await store.getRepresentation({ path: 'path' }, null); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'getRepresentation', 'release' ]); + }); + + it('acquires a lock on the container when adding a representation.', async(): Promise => { + await store.addResource({ path: 'path' }, null); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.addResource).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'addResource', 'release' ]); + }); + + it('acquires a lock on the resource when setting its representation.', async(): Promise => { + await store.setRepresentation({ path: 'path' }, null); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.setRepresentation).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'setRepresentation', 'release' ]); + }); + + it('acquires a lock on the resource when deleting it.', async(): Promise => { + await store.deleteResource({ path: 'path' }); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.deleteResource).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'deleteResource', 'release' ]); + }); + + it('acquires a lock on the resource when modifying its representation.', async(): Promise => { + await store.modifyResource({ path: 'path' }, null); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(source.modifyResource).toHaveBeenCalledTimes(1); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'modifyResource', 'release' ]); + }); + + it('releases the lock if an error was thrown.', async(): Promise => { + source.getRepresentation = async(): Promise => { + throw new Error('dummy'); + }; + await expect(store.getRepresentation({ path: 'path' }, null)).rejects.toThrow('dummy'); + expect(locker.acquire).toHaveBeenCalledTimes(1); + expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(lock.release).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'acquire', 'release' ]); + }); +}); diff --git a/test/unit/storage/SingleThreadedResourceLocker.test.ts b/test/unit/storage/SingleThreadedResourceLocker.test.ts new file mode 100644 index 000000000..b4b51907e --- /dev/null +++ b/test/unit/storage/SingleThreadedResourceLocker.test.ts @@ -0,0 +1,68 @@ +import { SingleThreadedResourceLocker } from '../../../src/storage/SingleThreadedResourceLocker'; + +describe('A SingleThreadedResourceLocker', (): void => { + let locker: SingleThreadedResourceLocker; + beforeEach(async(): Promise => { + locker = new SingleThreadedResourceLocker(); + }); + + it('can acquire a lock.', async(): Promise => { + const lock = await locker.acquire({ path: 'path' }); + expect(lock).toEqual(expect.objectContaining({ release: expect.any(Function) })); + }); + + it('can release an acquired lock.', async(): Promise => { + const lock = await locker.acquire({ path: 'path' }); + await expect(lock.release()).resolves.toBeUndefined(); + }); + + it('can acquire a lock after it was released.', async(): Promise => { + let lock = await locker.acquire({ path: 'path' }); + await lock.release(); + lock = await locker.acquire({ path: 'path' }); + expect(lock).toEqual(expect.objectContaining({ release: expect.any(Function) })); + }); + + it('blocks lock acquisition until they are released.', async(): Promise => { + const results: number[] = []; + const lock1 = locker.acquire({ path: 'path' }); + const lock2 = locker.acquire({ path: 'path' }); + const lock3 = locker.acquire({ path: 'path' }); + + // Note the different order of calls + const prom2 = lock2.then(async(lock): Promise => { + results.push(2); + return lock.release(); + }); + const prom3 = lock3.then(async(lock): Promise => { + results.push(3); + return lock.release(); + }); + const prom1 = lock1.then(async(lock): Promise => { + results.push(1); + return lock.release(); + }); + await Promise.all([ prom2, prom3, prom1 ]); + expect(results).toEqual([ 1, 2, 3 ]); + }); + + it('can acquire different keys simultaneously.', async(): Promise => { + const results: number[] = []; + const lock1 = locker.acquire({ path: 'path1' }); + const lock2 = locker.acquire({ path: 'path2' }); + const lock3 = locker.acquire({ path: 'path3' }); + await lock2.then(async(lock): Promise => { + results.push(2); + return lock.release(); + }); + await lock3.then(async(lock): Promise => { + results.push(3); + return lock.release(); + }); + await lock1.then(async(lock): Promise => { + results.push(1); + return lock.release(); + }); + expect(results).toEqual([ 2, 3, 1 ]); + }); +});