From a3f41c1d431c3e1582cd87971434282a46408d09 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Thu, 28 Jan 2021 11:16:47 +0100 Subject: [PATCH] feat: Create GreedyReadWriteLocker using read/write locking algorithm --- src/index.ts | 1 + src/util/locking/GreedyReadWriteLocker.ts | 151 +++++++++ .../locking/GreedyReadWriteLocker.test.ts | 319 ++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 src/util/locking/GreedyReadWriteLocker.ts create mode 100644 test/unit/util/locking/GreedyReadWriteLocker.test.ts diff --git a/src/index.ts b/src/index.ts index c5a08e2e0..3a7b6b8df 100644 --- a/src/index.ts +++ b/src/index.ts @@ -207,6 +207,7 @@ export * from './util/identifiers/SingleRootIdentifierStrategy'; // Util/Locking export * from './util/locking/ExpiringReadWriteLocker'; export * from './util/locking/EqualReadWriteLocker'; +export * from './util/locking/GreedyReadWriteLocker'; export * from './util/locking/ReadWriteLocker'; export * from './util/locking/ResourceLocker'; export * from './util/locking/SingleThreadedResourceLocker'; diff --git a/src/util/locking/GreedyReadWriteLocker.ts b/src/util/locking/GreedyReadWriteLocker.ts new file mode 100644 index 000000000..073fff52b --- /dev/null +++ b/src/util/locking/GreedyReadWriteLocker.ts @@ -0,0 +1,151 @@ +import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier'; +import type { KeyValueStorage } from '../../storage/keyvalue/KeyValueStorage'; +import { ForbiddenHttpError } from '../errors/ForbiddenHttpError'; +import { InternalServerError } from '../errors/InternalServerError'; +import type { ReadWriteLocker } from './ReadWriteLocker'; +import type { ResourceLocker } from './ResourceLocker'; + +export interface GreedyReadWriteSuffixes { + count: string; + read: string; + write: string; +} + +/** + * A {@link ReadWriteLocker} that allows for multiple simultaneous read operations. + * Write operations will be blocked as long as read operations are not finished. + * New read operations are allowed while this is going on, which will cause write operations to wait longer. + * + * Based on https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes . + * As soon as 1 read lock request is made, the write lock is locked. + * Internally a counter keeps track of the amount of active read locks. + * Only when this number reaches 0 will the write lock be released again. + * The internal read lock is only locked to increase/decrease this counter and is released afterwards. + * This allows for multiple read operations, although only 1 at the time can update the counter, + * which means there can still be a small waiting period if there are multiple simultaneous read operations. + */ +export class GreedyReadWriteLocker implements ReadWriteLocker { + private readonly locker: ResourceLocker; + private readonly storage: KeyValueStorage; + private readonly suffixes: GreedyReadWriteSuffixes; + + /** + * @param locker - Used for creating read and write locks. + * @param storage - Used for storing the amount of active read operations on a resource. + * @param suffixes - Used to generate identifiers with the given suffixes. + * `count` is used for the identifier used to store the counter. + * `read` and `write` are used for the 2 types of locks that are needed. + */ + public constructor(locker: ResourceLocker, storage: KeyValueStorage, + suffixes: GreedyReadWriteSuffixes = { count: 'count', read: 'read', write: 'write' }) { + this.locker = locker; + this.storage = storage; + this.suffixes = suffixes; + } + + public async withReadLock(identifier: ResourceIdentifier, whileLocked: () => (Promise | T)): Promise { + await this.preReadSetup(identifier); + try { + return await whileLocked(); + } finally { + await this.postReadCleanup(identifier); + } + } + + public async withWriteLock(identifier: ResourceIdentifier, whileLocked: () => (Promise | T)): Promise { + if (identifier.path.endsWith(`.${this.suffixes.count}`)) { + throw new ForbiddenHttpError('This resource is used for internal purposes.'); + } + const write = this.getWriteLockIdentifier(identifier); + await this.locker.acquire(write); + try { + return await whileLocked(); + } finally { + await this.locker.release(write); + } + } + + /** + * This identifier is used for storing the count of active read operations. + */ + private getCountIdentifier(identifier: ResourceIdentifier): ResourceIdentifier { + return { path: `${identifier.path}.${this.suffixes.count}` }; + } + + /** + * This is the identifier for the read lock: the lock that is used to safely update and read the count. + */ + private getReadLockIdentifier(identifier: ResourceIdentifier): ResourceIdentifier { + return { path: `${identifier.path}.${this.suffixes.read}` }; + } + + /** + * This is the identifier for the write lock, making sure there is at most 1 write operation active. + */ + private getWriteLockIdentifier(identifier: ResourceIdentifier): ResourceIdentifier { + return { path: `${identifier.path}.${this.suffixes.write}` }; + } + + /** + * Safely updates the count before starting a read operation. + */ + private async preReadSetup(identifier: ResourceIdentifier): Promise { + await this.withInternalReadLock(identifier, async(): Promise => { + const count = await this.incrementCount(identifier, +1); + if (count === 1) { + // There is at least 1 read operation so write operations are blocked + const write = this.getWriteLockIdentifier(identifier); + await this.locker.acquire(write); + } + }); + } + + /** + * Safely decreases the count after the read operation is finished. + */ + private async postReadCleanup(identifier: ResourceIdentifier): Promise { + await this.withInternalReadLock(identifier, async(): Promise => { + const count = await this.incrementCount(identifier, -1); + if (count === 0) { + // All read locks have been released so a write operation is possible again + const write = this.getWriteLockIdentifier(identifier); + await this.locker.release(write); + } + }); + } + + /** + * Safely runs an action on the count. + */ + private async withInternalReadLock(identifier: ResourceIdentifier, whileLocked: () => (Promise | T)): + Promise { + const read = this.getReadLockIdentifier(identifier); + await this.locker.acquire(read); + try { + return await whileLocked(); + } finally { + await this.locker.release(read); + } + } + + /** + * Updates the count with the given modifier. + * Creates the data if it didn't exist yet. + * Deletes the data when the count reaches zero. + */ + private async incrementCount(identifier: ResourceIdentifier, mod: number): Promise { + const countIdentifier = this.getCountIdentifier(identifier); + let number = await this.storage.get(countIdentifier) ?? 0; + number += mod; + if (number === 0) { + // Make sure there is no remaining data once all locks are released + await this.storage.delete(countIdentifier); + } else if (number > 0) { + await this.storage.set(countIdentifier, number); + } else { + // Failsafe in case something goes wrong with the count storage + throw new InternalServerError('Read counter would become negative. Something is wrong with the count storage.'); + } + return number; + } +} diff --git a/test/unit/util/locking/GreedyReadWriteLocker.test.ts b/test/unit/util/locking/GreedyReadWriteLocker.test.ts new file mode 100644 index 000000000..81a88e7f8 --- /dev/null +++ b/test/unit/util/locking/GreedyReadWriteLocker.test.ts @@ -0,0 +1,319 @@ +import { EventEmitter } from 'events'; +import type { ResourceIdentifier } from '../../../../src/ldp/representation/ResourceIdentifier'; +import type { KeyValueStorage } from '../../../../src/storage/keyvalue/KeyValueStorage'; +import { ForbiddenHttpError } from '../../../../src/util/errors/ForbiddenHttpError'; +import { InternalServerError } from '../../../../src/util/errors/InternalServerError'; +import { GreedyReadWriteLocker } from '../../../../src/util/locking/GreedyReadWriteLocker'; +import type { ResourceLocker } from '../../../../src/util/locking/ResourceLocker'; + +// A simple ResourceLocker that keeps a queue of lock requests +class MemoryLocker implements ResourceLocker { + private readonly locks: Record void)[]>; + + public constructor() { + this.locks = {}; + } + + public async acquire(identifier: ResourceIdentifier): Promise { + const { path } = identifier; + if (!this.locks[path]) { + this.locks[path] = []; + } else { + return new Promise((resolve): void => { + this.locks[path].push(resolve); + }); + } + } + + public async release(identifier: ResourceIdentifier): Promise { + const { path } = identifier; + if (this.locks[path].length > 0) { + this.locks[path].shift()!(); + } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete this.locks[path]; + } + } +} + +describe('A GreedyReadWriteLocker', (): void => { + let sourceLocker: ResourceLocker; + let storage: KeyValueStorage; + const resourceId = { path: 'http://test.com/resource' }; + const resource2Id = { path: 'http://test.com/resource2' }; + let locker: GreedyReadWriteLocker; + + beforeEach(async(): Promise => { + sourceLocker = new MemoryLocker(); + + const map = new Map(); + storage = { + get: async(identifier: ResourceIdentifier): Promise => map.get(identifier.path), + has: async(identifier: ResourceIdentifier): Promise => map.has(identifier.path), + set: async(identifier: ResourceIdentifier, value: number): Promise => map.set(identifier.path, value), + delete: async(identifier: ResourceIdentifier): Promise => map.delete(identifier.path), + }; + + locker = new GreedyReadWriteLocker(sourceLocker, storage); + }); + + it('does not block single read operations.', async(): Promise => { + await expect(locker.withReadLock(resourceId, (): any => 5)).resolves.toBe(5); + }); + + it('does not block single write operations.', async(): Promise => { + await expect(locker.withWriteLock(resourceId, (): any => 5)).resolves.toBe(5); + }); + + it('errors when trying to writeLock a count identifier.', async(): Promise => { + await expect(locker.withWriteLock({ path: `http://test.com/foo.count` }, (): any => 5)) + .rejects.toThrow(ForbiddenHttpError); + + locker = new GreedyReadWriteLocker(sourceLocker, storage, { count: 'dummy', read: 'read', write: 'write' }); + await expect(locker.withWriteLock({ path: `http://test.com/foo.dummy` }, (): any => 5)) + .rejects.toThrow(ForbiddenHttpError); + }); + + it('errors if the read counter has an unexpected value.', async(): Promise => { + storage.get = jest.fn().mockResolvedValue(0); + await expect(locker.withReadLock(resourceId, (): any => 5)).rejects.toThrow(InternalServerError); + }); + + it('does not block multiple read operations.', async(): Promise => { + const order: string[] = []; + const emitter = new EventEmitter(); + + const unlocks = [ 0, 1, 2 ].map((num): any => new Promise((resolve): any => emitter.on(`release${num}`, resolve))); + const promises = [ 0, 1, 2 ].map((num): any => locker.withReadLock(resourceId, async(): Promise => { + order.push(`start ${num}`); + await unlocks[num]; + order.push(`finish ${num}`); + return num; + })); + + // Allow time to attach listeners + await new Promise(setImmediate); + + emitter.emit('release2'); + await expect(promises[2]).resolves.toBe(2); + emitter.emit('release0'); + await expect(promises[0]).resolves.toBe(0); + emitter.emit('release1'); + await expect(promises[1]).resolves.toBe(1); + + expect(order).toEqual([ 'start 0', 'start 1', 'start 2', 'finish 2', 'finish 0', 'finish 1' ]); + }); + + it('blocks multiple write operations.', async(): Promise => { + // Previous test but with write locks + const order: string[] = []; + const emitter = new EventEmitter(); + + const unlocks = [ 0, 1, 2 ].map((num): any => new Promise((resolve): any => emitter.on(`release${num}`, resolve))); + const promises = [ 0, 1, 2 ].map((num): any => locker.withWriteLock(resourceId, async(): Promise => { + order.push(`start ${num}`); + await unlocks[num]; + order.push(`finish ${num}`); + return num; + })); + + // Allow time to attach listeners + await new Promise(setImmediate); + + emitter.emit('release2'); + + // Allow time to finish write 2 + await new Promise(setImmediate); + + emitter.emit('release0'); + emitter.emit('release1'); + await Promise.all([ promises[2], promises[0], promises[1] ]); + expect(order).toEqual([ 'start 0', 'finish 0', 'start 1', 'finish 1', 'start 2', 'finish 2' ]); + }); + + it('allows multiple write operations on different resources.', async(): Promise => { + // Previous test but with write locks + const order: string[] = []; + const emitter = new EventEmitter(); + + const resources = [ resourceId, resource2Id ]; + const unlocks = [ 0, 1 ].map((num): any => new Promise((resolve): any => emitter.on(`release${num}`, resolve))); + const promises = [ 0, 1 ].map((num): any => locker.withWriteLock(resources[num], async(): Promise => { + order.push(`start ${num}`); + await unlocks[num]; + order.push(`finish ${num}`); + return num; + })); + + // Allow time to attach listeners + await new Promise(setImmediate); + + emitter.emit('release1'); + await expect(promises[1]).resolves.toBe(1); + emitter.emit('release0'); + await expect(promises[0]).resolves.toBe(0); + + expect(order).toEqual([ 'start 0', 'start 1', 'finish 1', 'finish 0' ]); + }); + + it('blocks write operations during read operations.', async(): Promise => { + const order: string[] = []; + const emitter = new EventEmitter(); + + const promRead = new Promise((resolve): any => { + emitter.on('releaseRead', resolve); + }); + + // We want to make sure the write operation only starts while the read operation is busy + // Otherwise the internal write lock might not be acquired yet + const delayedLockWrite = new Promise((resolve): void => { + emitter.on('readStarted', (): void => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + locker.withWriteLock(resourceId, (): any => { + order.push('write'); + resolve(); + }); + }); + }); + + const lockRead = locker.withReadLock(resourceId, async(): Promise => { + emitter.emit('readStarted'); + order.push('read start'); + await promRead; + order.push('read finish'); + }); + + // Allow time to attach listeners + await new Promise(setImmediate); + + const promAll = Promise.all([ delayedLockWrite, lockRead ]); + + emitter.emit('releaseRead'); + await promAll; + expect(order).toEqual([ 'read start', 'read finish', 'write' ]); + }); + + it('allows write operations on different resources during read operations.', async(): Promise => { + const order: string[] = []; + const emitter = new EventEmitter(); + + const promRead = new Promise((resolve): any => { + emitter.on('releaseRead', resolve); + }); + + const delayedLockWrite = new Promise((resolve): void => { + emitter.on('readStarted', (): void => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + locker.withWriteLock(resource2Id, (): any => { + order.push('write'); + resolve(); + }); + }); + }); + + const lockRead = locker.withReadLock(resourceId, async(): Promise => { + emitter.emit('readStarted'); + order.push('read start'); + await promRead; + order.push('read finish'); + }); + + // Allow time to attach listeners + await new Promise(setImmediate); + + const promAll = Promise.all([ delayedLockWrite, lockRead ]); + + emitter.emit('releaseRead'); + await promAll; + expect(order).toEqual([ 'read start', 'write', 'read finish' ]); + }); + + it('prioritizes read operations when a read operation is waiting.', async(): Promise => { + // This test is very similar to the previous ones but adds an extra read lock + const order: string[] = []; + const emitter = new EventEmitter(); + + const promRead1 = new Promise((resolve): any => emitter.on('releaseRead1', resolve)); + const promRead2 = new Promise((resolve): any => emitter.on('releaseRead2', resolve)); + + const delayedLockWrite = new Promise((resolve): void => { + emitter.on('readStarted', (): void => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + locker.withWriteLock(resourceId, (): any => { + order.push('write'); + resolve(); + }); + }); + }); + + const delayedLockRead2 = new Promise((resolve): void => { + emitter.on('readStarted', (): void => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + locker.withReadLock(resourceId, async(): Promise => { + order.push('read 2 start'); + await promRead2; + order.push('read 2 finish'); + resolve(); + }); + }); + }); + + const lockRead = locker.withReadLock(resourceId, async(): Promise => { + emitter.emit('readStarted'); + order.push('read 1 start'); + await promRead1; + order.push('read 1 finish'); + }); + + // Allow time to attach listeners + await new Promise(setImmediate); + + const promAll = Promise.all([ delayedLockWrite, lockRead, delayedLockRead2 ]); + + emitter.emit('releaseRead1'); + + // Allow time to finish read 1 + await new Promise(setImmediate); + + emitter.emit('releaseRead2'); + await promAll; + expect(order).toEqual([ 'read 1 start', 'read 2 start', 'read 1 finish', 'read 2 finish', 'write' ]); + }); + + it('blocks read operations during write operations.', async(): Promise => { + // Again similar but with read and write order switched + const order: string[] = []; + const emitter = new EventEmitter(); + + const promWrite = new Promise((resolve): any => { + emitter.on('releaseWrite', resolve); + }); + + // We want to make sure the read operation only starts while the write operation is busy + const delayedLockRead = new Promise((resolve): void => { + emitter.on('writeStarted', (): void => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + locker.withReadLock(resourceId, (): any => { + order.push('read'); + resolve(); + }); + }); + }); + + const lockWrite = locker.withWriteLock(resourceId, async(): Promise => { + emitter.emit('writeStarted'); + order.push('write start'); + await promWrite; + order.push('write finish'); + }); + + // Allow time to attach listeners + await new Promise(setImmediate); + + const promAll = Promise.all([ delayedLockRead, lockWrite ]); + + emitter.emit('releaseWrite'); + await promAll; + expect(order).toEqual([ 'write start', 'write finish', 'read' ]); + }); +});