From c17402517e144444f9b1048ff83d47ee9815d90e Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Thu, 21 Jan 2021 11:50:06 +0100 Subject: [PATCH] feat: Update LockingResourceStore to use new locking interface This has as added bonus that, in the case of getRepresentation, the stream will be destroyed with an error in case of a timeout. --- src/storage/LockingResourceStore.ts | 132 +++++----- test/integration/LockingResourceStore.test.ts | 56 ++--- .../unit/storage/LockingResourceStore.test.ts | 228 ++++++++++-------- 3 files changed, 215 insertions(+), 201 deletions(-) diff --git a/src/storage/LockingResourceStore.ts b/src/storage/LockingResourceStore.ts index d67366a71..1a8bf267f 100644 --- a/src/storage/LockingResourceStore.ts +++ b/src/storage/LockingResourceStore.ts @@ -1,11 +1,10 @@ import type { Readable } from 'stream'; import type { Patch } from '../ldp/http/Patch'; +import { BasicRepresentation } from '../ldp/representation/BasicRepresentation'; import type { Representation } from '../ldp/representation/Representation'; import type { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences'; import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier'; import { getLoggerFor } from '../logging/LogUtil'; -import type { Guarded } from '../util/GuardedStream'; -import type { ExpiringLock } from '../util/locking/ExpiringLock'; import type { ExpiringResourceLocker } from '../util/locking/ExpiringResourceLocker'; import type { AtomicResourceStore } from './AtomicResourceStore'; import type { Conditions } from './Conditions'; @@ -14,6 +13,7 @@ import type { ResourceStore } from './ResourceStore'; /** * Store that for every call acquires a lock before executing it on the requested resource, * and releases it afterwards. + * In case the request returns a Representation the lock will only be released when the data stream is finished. */ export class LockingResourceStore implements AtomicResourceStore { protected readonly logger = getLoggerFor(this); @@ -34,105 +34,103 @@ export class LockingResourceStore implements AtomicResourceStore { public async addResource(container: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { - return this.lockedRun(container, + return this.locks.withWriteLock(container, async(): Promise => this.source.addResource(container, representation, conditions)); } public async setRepresentation(identifier: ResourceIdentifier, representation: Representation, conditions?: Conditions): Promise { - return this.lockedRun(identifier, + return this.locks.withWriteLock(identifier, async(): Promise => this.source.setRepresentation(identifier, representation, conditions)); } public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise { - return this.lockedRun(identifier, async(): Promise => this.source.deleteResource(identifier, conditions)); + return this.locks.withWriteLock(identifier, + async(): Promise => this.source.deleteResource(identifier, conditions)); } public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise { - return this.lockedRun(identifier, + return this.locks.withWriteLock(identifier, async(): Promise => this.source.modifyResource(identifier, patch, conditions)); } /** - * Acquires a lock for the identifier and releases it when the function is executed. - * @param identifier - Identifier that should be locked. - * @param func - Function to be executed. - */ - protected async lockedRun(identifier: ResourceIdentifier, func: () => Promise): Promise { - const lock = await this.locks.acquire(identifier); - try { - return await func(); - } finally { - await lock.release(); - } - } - - /** - * Acquires a lock for the identifier that should return a representation with Readable data and releases it when the - * Readable is read, closed or results in an error. - * When using this function, it is required to close the Readable stream when you are ready. + * Acquires a lock that is only released when all data of the resulting representation data has been read, + * an error occurs, or the timeout has been triggered. + * The resulting data stream will be adapted to reset the timer every time data is read. + * + * In case the data of the resulting stream is not needed it should be closed to prevent a timeout error. * * @param identifier - Identifier that should be locked. - * @param func - Function to be executed. + * @param whileLocked - Function to be executed while the resource is locked. */ - protected async lockedRepresentationRun(identifier: ResourceIdentifier, func: () => Promise): + protected async lockedRepresentationRun(identifier: ResourceIdentifier, whileLocked: () => Promise): Promise { - const lock = await this.locks.acquire(identifier); - let representation; - try { + // Create a new Promise that resolves to the resulting Representation + // while only unlocking when the data has been read (or there's a timeout). + // Note that we can't just return the result of `withReadLock` since that promise only + // resolves when the stream is finished, while we want `lockedRepresentationRun` to resolve + // once we have the Representation. + // See https://github.com/solid/community-server/pull/536#discussion_r562467957 + return new Promise((resolve, reject): void => { + let representation: Representation; // Make the resource time out to ensure that the lock is always released eventually. - representation = await func(); - return this.createExpiringRepresentation(representation, lock); - } finally { - // If the representation contains a valid Readable, wait for it to be consumed. - const data = representation?.data; - if (!data) { - await lock.release(); - } else { - // When an error occurs, destroy the readable so the lock is released safely. - data.on('error', (): void => data.destroy()); + this.locks.withReadLock(identifier, async(maintainLock): Promise => { + representation = await whileLocked(); + resolve(this.createExpiringRepresentation(representation, maintainLock)); - // An `end` and/or `close` event signals that the readable has been consumed. - new Promise((resolve): void => { - data.on('end', resolve); - data.on('close', resolve); - }).then((): any => lock.release(), null); - } - } - } + // Release the lock when an error occurs or the data finished streaming + await this.waitForStreamToEnd(representation.data); + }).catch((error): void => { + // Destroy the source stream in case the lock times out + representation?.data.destroy(error); - /** - * Wraps a representation to make it time out when nothing is read for a certain amount of time. - * - * @param source - The representation to wrap - * @param lock - The lock for the corresponding identifier. - */ - protected createExpiringRepresentation(source: Representation, lock: ExpiringLock): Representation { - return Object.create(source, { - data: { value: this.createExpiringReadable(source.data, lock) }, + // Let this function return an error in case something went wrong getting the data + // or in case the timeout happens before `func` returned + reject(error); + }); }); } /** - * Wraps a readable to make it time out when nothing is read for a certain amount of time. + * Wraps a representation to make it reset the timeout timer every time data is read. * - * @param source - The readable to wrap - * @param lock - The lock for the corresponding identifier. + * @param representation - The representation to wrap + * @param maintainLock - Function to call to reset the timer. */ - protected createExpiringReadable(source: Guarded, lock: ExpiringLock): Readable { - // Destroy the source when a timeout occurs. - lock.on('expired', (): void => { - source.destroy(new Error(`Stream reading timout exceeded`)); - }); - - // Spy on the source to renew the lock upon reading. - return Object.create(source, { + protected createExpiringRepresentation(representation: Representation, maintainLock: () => void): Representation { + const source = representation.data; + // Spy on the source to maintain the lock upon reading. + const data = Object.create(source, { read: { value(size: number): any { - lock.renew(); + maintainLock(); return source.read(size); }, }, }); + return new BasicRepresentation(data, representation.metadata, representation.binary); + } + + /** + * Returns a promise that resolve when the source stream is finished, + * either by ending or emitting an error. + * In the case of an error the stream will be destroyed if it hasn't been already. + * + * @param source - The input stream. + */ + protected async waitForStreamToEnd(source: Readable): Promise { + try { + await new Promise((resolve, reject): void => { + source.on('error', reject); + source.on('end', resolve); + source.on('close', resolve); + }); + } catch { + // Destroy the stream in case of errors + if (!source.destroyed) { + source.destroy(); + } + } } } diff --git a/test/integration/LockingResourceStore.test.ts b/test/integration/LockingResourceStore.test.ts index ba99cc63e..48220c828 100644 --- a/test/integration/LockingResourceStore.test.ts +++ b/test/integration/LockingResourceStore.test.ts @@ -1,4 +1,4 @@ -import streamifyArray from 'streamify-array'; +import type { Readable } from 'stream'; import { RootContainerInitializer } from '../../src/init/RootContainerInitializer'; import { BasicRepresentation } from '../../src/ldp/representation/BasicRepresentation'; import type { Representation } from '../../src/ldp/representation/Representation'; @@ -7,19 +7,30 @@ import { DataAccessorBasedStore } from '../../src/storage/DataAccessorBasedStore import { LockingResourceStore } from '../../src/storage/LockingResourceStore'; import type { ResourceStore } from '../../src/storage/ResourceStore'; import { APPLICATION_OCTET_STREAM } from '../../src/util/ContentTypes'; +import { InternalServerError } from '../../src/util/errors/InternalServerError'; import { SingleRootIdentifierStrategy } from '../../src/util/identifiers/SingleRootIdentifierStrategy'; import type { ExpiringResourceLocker } from '../../src/util/locking/ExpiringResourceLocker'; import type { ResourceLocker } from '../../src/util/locking/ResourceLocker'; import { SingleThreadedResourceLocker } from '../../src/util/locking/SingleThreadedResourceLocker'; import { WrappedExpiringResourceLocker } from '../../src/util/locking/WrappedExpiringResourceLocker'; +import { guardedStreamFrom } from '../../src/util/StreamUtil'; import { BASE } from './Config'; +jest.useFakeTimers(); + +async function readOnce(stream: Readable): Promise { + return await new Promise((resolve): void => { + stream.once('data', resolve); + }); +} + describe('A LockingResourceStore', (): void => { let path: string; let store: LockingResourceStore; let locker: ResourceLocker; let expiringLocker: ExpiringResourceLocker; let source: ResourceStore; + let getRepresentationSpy: jest.SpyInstance; beforeEach(async(): Promise => { jest.clearAllMocks(); @@ -37,71 +48,56 @@ describe('A LockingResourceStore', (): void => { store = new LockingResourceStore(source, expiringLocker); + // Spy on a real ResourceLocker and ResourceStore instance + getRepresentationSpy = jest.spyOn(source, 'getRepresentation'); + getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data: + guardedStreamFrom([ 1, 2, 3 ]) } as Representation))); + // Make sure something is in the store before we read from it in our tests. - await store.setRepresentation({ path }, new BasicRepresentation([ 1, 2, 3 ], APPLICATION_OCTET_STREAM)); + await source.setRepresentation({ path }, new BasicRepresentation([ 1, 2, 3 ], APPLICATION_OCTET_STREAM)); }); it('destroys the stream when nothing is read after 1000ms.', async(): Promise => { - jest.useFakeTimers(); - - // Spy on a real ResourceLocker and ResourceStore instance - const acquireSpy = jest.spyOn(expiringLocker, 'acquire'); - const getRepresentationSpy = jest.spyOn(source, 'getRepresentation'); - getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data: - streamifyArray([ 1, 2, 3 ]) } as Representation))); - const representation = await store.getRepresentation({ path }, {}); const errorCallback = jest.fn(); representation.data.on('error', errorCallback); // Wait 1000ms and read jest.advanceTimersByTime(1000); - expect(representation.data.read()).toBeNull(); + await new Promise(setImmediate); + expect(representation.data.destroyed).toBe(true); // Verify a timeout error was thrown - await new Promise((resolve): any => setImmediate(resolve)); expect(errorCallback).toHaveBeenCalledTimes(1); - expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded')); + expect(errorCallback).toHaveBeenLastCalledWith(new InternalServerError(`Lock expired after 1000ms on ${path}`)); // Verify the lock was acquired and released at the right time - expect(acquireSpy).toHaveBeenCalledTimes(1); - expect(acquireSpy).toHaveBeenLastCalledWith({ path }); expect(getRepresentationSpy).toHaveBeenCalledTimes(1); }); it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise => { - jest.useFakeTimers(); - - // Spy on a real ResourceLocker and ResourceStore instance - const acquireSpy = jest.spyOn(expiringLocker, 'acquire'); - const getRepresentationSpy = jest.spyOn(source, 'getRepresentation'); - getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data: - streamifyArray([ 1, 2, 3 ]) } as Representation))); - const representation = await store.getRepresentation({ path }, {}); const errorCallback = jest.fn(); representation.data.on('error', errorCallback); // Wait 750ms and read jest.advanceTimersByTime(750); - expect(representation.data.read()).toBe(1); + await expect(readOnce(representation.data)).resolves.toBe(1); // Wait 750ms and read jest.advanceTimersByTime(750); - expect(representation.data.read()).toBe(2); + await expect(readOnce(representation.data)).resolves.toBe(2); // Wait 1000ms and watch the stream be destroyed jest.advanceTimersByTime(1000); - expect(representation.data.read()).toBeNull(); + await new Promise(setImmediate); + expect(representation.data.destroyed).toBe(true); // Verify a timeout error was thrown - await new Promise((resolve): any => setImmediate(resolve)); expect(errorCallback).toHaveBeenCalledTimes(1); - expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded')); + expect(errorCallback).toHaveBeenLastCalledWith(new InternalServerError(`Lock expired after 1000ms on ${path}`)); // Verify the lock was acquired and released at the right time - expect(acquireSpy).toHaveBeenCalledTimes(1); - expect(acquireSpy).toHaveBeenLastCalledWith({ path }); expect(getRepresentationSpy).toHaveBeenCalledTimes(1); }); }); diff --git a/test/unit/storage/LockingResourceStore.test.ts b/test/unit/storage/LockingResourceStore.test.ts index 1c5b741b8..07c9f8f3b 100644 --- a/test/unit/storage/LockingResourceStore.test.ts +++ b/test/unit/storage/LockingResourceStore.test.ts @@ -1,174 +1,169 @@ -import type { EventEmitter } from 'events'; -import streamifyArray from 'streamify-array'; +import { EventEmitter } from 'events'; import type { Patch } from '../../../src/ldp/http/Patch'; import type { Representation } from '../../../src/ldp/representation/Representation'; +import type { ResourceIdentifier } from '../../../src/ldp/representation/ResourceIdentifier'; import { LockingResourceStore } from '../../../src/storage/LockingResourceStore'; import type { ResourceStore } from '../../../src/storage/ResourceStore'; -import type { ExpiringLock } from '../../../src/util/locking/ExpiringLock'; import type { ExpiringResourceLocker } from '../../../src/util/locking/ExpiringResourceLocker'; +import { guardedStreamFrom } from '../../../src/util/StreamUtil'; + +function emptyFn(): void { + // Empty +} describe('A LockingResourceStore', (): void => { let store: LockingResourceStore; let locker: ExpiringResourceLocker; - let lock: ExpiringLock; - let release: () => Promise; - let renew: () => void; let source: ResourceStore; let order: string[]; - let funcOnEmit: () => any; + let timeoutTrigger: EventEmitter; beforeEach(async(): Promise => { - jest.clearAllMocks(); - order = []; - function delayedResolve(resolve: (value: any) => void, name: string, resolveValue?: any): void { - // `setImmediate` is introduced to make sure the promise doesn't execute immediately - setImmediate((): void => { - order.push(name); - resolve(resolveValue); - }); + function addOrder(name: string, input?: T): T | undefined { + order.push(name); + return input; } - const readable = streamifyArray([ 1, 2, 3 ]); + const readable = guardedStreamFrom([ 1, 2, 3 ]); + const { destroy } = readable; + readable.destroy = jest.fn((error): void => destroy.call(readable, error)); source = { - getRepresentation: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation', { data: readable } as - Representation))), - addResource: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'addResource'))), - setRepresentation: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'setRepresentation'))), - deleteResource: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'deleteResource'))), - modifyResource: jest.fn(async(): Promise => - new Promise((resolve): any => delayedResolve(resolve, 'modifyResource'))), + getRepresentation: jest.fn((): any => addOrder('getRepresentation', { data: readable } as Representation)), + addResource: jest.fn((): any => addOrder('addResource')), + setRepresentation: jest.fn((): any => addOrder('setRepresentation')), + deleteResource: jest.fn((): any => addOrder('deleteResource')), + modifyResource: jest.fn((): any => addOrder('modifyResource')), }; - release = jest.fn(async(): Promise => order.push('release')); - renew = jest.fn(); - funcOnEmit = (): any => true; + + timeoutTrigger = new EventEmitter(); locker = { - acquire: jest.fn(async(): Promise => { - order.push('acquire'); - lock = { - release, - renew, - on(event: string, func: () => void): void { - if (event === 'expired') { - funcOnEmit = func; - } - }, - emit(event: string): void { - if (event === 'expired') { - funcOnEmit(); - } - }, - } as unknown as ExpiringLock; - return lock; + withReadLock: jest.fn(async (id: ResourceIdentifier, + whileLocked: (maintainLock: () => void) => T | Promise): Promise => { + order.push('lock read'); + try { + // Allows simulating a timeout event + const timeout = new Promise((resolve, reject): any => timeoutTrigger.on('timeout', (): void => { + order.push('timeout'); + reject(new Error('timeout')); + })); + return await Promise.race([ Promise.resolve(whileLocked(emptyFn)), timeout ]); + } finally { + order.push('unlock read'); + } + }), + withWriteLock: jest.fn(async (identifier: ResourceIdentifier, + whileLocked: (maintainLock: () => void) => T | Promise): Promise => { + order.push('lock write'); + try { + return await whileLocked(emptyFn); + } finally { + order.push('unlock write'); + } }), }; + store = new LockingResourceStore(source, locker); }); - async function registerEventOrder(eventSource: EventEmitter, event: string): Promise { - await new Promise((resolve): any => { - eventSource.prependListener(event, (): any => { - order.push(event); - resolve(); - }); + function registerEventOrder(eventSource: EventEmitter, event: string): void { + eventSource.on(event, (): void => { + order.push(event); }); } it('acquires a lock on the container when adding a representation.', async(): Promise => { await store.addResource({ path: 'path' }, {} as Representation); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withWriteLock).toHaveBeenCalledTimes(1); + expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.addResource).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'addResource', 'release' ]); + expect(order).toEqual([ 'lock write', 'addResource', 'unlock write' ]); }); it('acquires a lock on the resource when setting its representation.', async(): Promise => { await store.setRepresentation({ path: 'path' }, {} as Representation); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withWriteLock).toHaveBeenCalledTimes(1); + expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.setRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'setRepresentation', 'release' ]); + expect(order).toEqual([ 'lock write', 'setRepresentation', 'unlock write' ]); }); it('acquires a lock on the resource when deleting it.', async(): Promise => { await store.deleteResource({ path: 'path' }); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withWriteLock).toHaveBeenCalledTimes(1); + expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.deleteResource).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'deleteResource', 'release' ]); + expect(order).toEqual([ 'lock write', 'deleteResource', 'unlock write' ]); }); it('acquires a lock on the resource when modifying its representation.', async(): Promise => { await store.modifyResource({ path: 'path' }, {} as Patch); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withWriteLock).toHaveBeenCalledTimes(1); + expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.modifyResource).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'modifyResource', 'release' ]); + expect(order).toEqual([ 'lock write', 'modifyResource', 'unlock write' ]); }); it('releases the lock if an error was thrown.', async(): Promise => { source.getRepresentation = async(): Promise => { + order.push('bad get'); throw new Error('dummy'); }; await expect(store.getRepresentation({ path: 'path' }, {})).rejects.toThrow('dummy'); - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'release' ]); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); + expect(order).toEqual([ 'lock read', 'bad get', 'unlock read' ]); }); it('releases the lock on the resource when data has been read.', async(): Promise => { // Read all data from the representation const representation = await store.getRepresentation({ path: 'path' }, {}); representation.data.on('data', (): any => true); - await registerEventOrder(representation.data, 'end'); + registerEventOrder(representation.data, 'end'); + + // Provide opportunity for async events + await new Promise(setImmediate); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'release' ]); + expect(order).toEqual([ 'lock read', 'getRepresentation', 'end', 'unlock read' ]); }); it('destroys the resource and releases the lock when the readable errors.', async(): Promise => { // Make the representation error const representation = await store.getRepresentation({ path: 'path' }, {}); - // eslint-disable-next-line jest/valid-expect-in-promise - Promise.resolve().then((): any => - representation.data.emit('error', new Error('Error on the readable')), null); - await registerEventOrder(representation.data, 'error'); - await registerEventOrder(representation.data, 'close'); + setImmediate((): any => representation.data.emit('error', new Error('Error on the readable'))); + registerEventOrder(representation.data, 'error'); + registerEventOrder(representation.data, 'close'); + + // Provide opportunity for async events + await new Promise(setImmediate); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'error', 'close', 'release' ]); + expect(representation.data.destroy).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'lock read', 'getRepresentation', 'error', 'unlock read', 'close' ]); }); it('releases the lock on the resource when readable is destroyed.', async(): Promise => { // Make the representation close const representation = await store.getRepresentation({ path: 'path' }, {}); representation.data.destroy(); - await registerEventOrder(representation.data, 'close'); + registerEventOrder(representation.data, 'close'); + + // Provide opportunity for async events + await new Promise(setImmediate); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); + expect(order).toEqual([ 'lock read', 'getRepresentation', 'close', 'unlock read' ]); }); it('releases the lock only once when multiple events are triggered.', async(): Promise => { @@ -179,27 +174,52 @@ describe('A LockingResourceStore', (): void => { order.push('end'); representation.data.destroy(); }); - await registerEventOrder(representation.data, 'close'); + registerEventOrder(representation.data, 'close'); + + // Provide opportunity for async events + await new Promise(setImmediate); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'close', 'release' ]); + expect(order).toEqual([ 'lock read', 'getRepresentation', 'end', 'close', 'unlock read' ]); }); it('releases the lock on the resource when readable times out.', async(): Promise => { - // Make the representation time out const representation = await store.getRepresentation({ path: 'path' }, {}); - lock.emit('expired'); - await registerEventOrder(representation.data, 'close'); + registerEventOrder(representation.data, 'close'); + registerEventOrder(representation.data, 'error'); + + timeoutTrigger.emit('timeout'); + + // Provide opportunity for async events + await new Promise(setImmediate); // Verify the lock was acquired and released at the right time - expect(locker.acquire).toHaveBeenCalledTimes(1); - expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' }); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); expect(source.getRepresentation).toHaveBeenCalledTimes(1); - expect(lock.release).toHaveBeenCalledTimes(1); - expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]); + expect(representation.data.destroy).toHaveBeenCalledTimes(1); + expect(representation.data.destroy).toHaveBeenLastCalledWith(new Error('timeout')); + expect(order).toEqual([ 'lock read', 'getRepresentation', 'timeout', 'unlock read', 'error', 'close' ]); + }); + + it('throws an error if a timeout happens before getting a resource.', async(): Promise => { + source.getRepresentation = jest.fn(async(): Promise => { + order.push('useless get'); + // This will never resolve + return new Promise(emptyFn); + }); + + const prom = store.getRepresentation({ path: 'path' }, {}); + + timeoutTrigger.emit('timeout'); + + await expect(prom).rejects.toThrow(new Error('timeout')); + expect(locker.withReadLock).toHaveBeenCalledTimes(1); + expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' }); + expect(source.getRepresentation).toHaveBeenCalledTimes(1); + expect(order).toEqual([ 'lock read', 'useless get', 'timeout', 'unlock read' ]); }); });