mirror of
https://github.com/CommunitySolidServer/CommunitySolidServer.git
synced 2024-10-03 14:55:10 +00:00
feat: Update LockingResourceStore to use new locking interface
This has as added bonus that, in the case of getRepresentation, the stream will be destroyed with an error in case of a timeout.
This commit is contained in:
parent
077f5d7069
commit
c17402517e
@ -1,11 +1,10 @@
|
|||||||
import type { Readable } from 'stream';
|
import type { Readable } from 'stream';
|
||||||
import type { Patch } from '../ldp/http/Patch';
|
import type { Patch } from '../ldp/http/Patch';
|
||||||
|
import { BasicRepresentation } from '../ldp/representation/BasicRepresentation';
|
||||||
import type { Representation } from '../ldp/representation/Representation';
|
import type { Representation } from '../ldp/representation/Representation';
|
||||||
import type { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences';
|
import type { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences';
|
||||||
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
|
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
|
||||||
import { getLoggerFor } from '../logging/LogUtil';
|
import { getLoggerFor } from '../logging/LogUtil';
|
||||||
import type { Guarded } from '../util/GuardedStream';
|
|
||||||
import type { ExpiringLock } from '../util/locking/ExpiringLock';
|
|
||||||
import type { ExpiringResourceLocker } from '../util/locking/ExpiringResourceLocker';
|
import type { ExpiringResourceLocker } from '../util/locking/ExpiringResourceLocker';
|
||||||
import type { AtomicResourceStore } from './AtomicResourceStore';
|
import type { AtomicResourceStore } from './AtomicResourceStore';
|
||||||
import type { Conditions } from './Conditions';
|
import type { Conditions } from './Conditions';
|
||||||
@ -14,6 +13,7 @@ import type { ResourceStore } from './ResourceStore';
|
|||||||
/**
|
/**
|
||||||
* Store that for every call acquires a lock before executing it on the requested resource,
|
* Store that for every call acquires a lock before executing it on the requested resource,
|
||||||
* and releases it afterwards.
|
* and releases it afterwards.
|
||||||
|
* In case the request returns a Representation the lock will only be released when the data stream is finished.
|
||||||
*/
|
*/
|
||||||
export class LockingResourceStore implements AtomicResourceStore {
|
export class LockingResourceStore implements AtomicResourceStore {
|
||||||
protected readonly logger = getLoggerFor(this);
|
protected readonly logger = getLoggerFor(this);
|
||||||
@ -34,105 +34,103 @@ export class LockingResourceStore implements AtomicResourceStore {
|
|||||||
|
|
||||||
public async addResource(container: ResourceIdentifier, representation: Representation,
|
public async addResource(container: ResourceIdentifier, representation: Representation,
|
||||||
conditions?: Conditions): Promise<ResourceIdentifier> {
|
conditions?: Conditions): Promise<ResourceIdentifier> {
|
||||||
return this.lockedRun(container,
|
return this.locks.withWriteLock(container,
|
||||||
async(): Promise<ResourceIdentifier> => this.source.addResource(container, representation, conditions));
|
async(): Promise<ResourceIdentifier> => this.source.addResource(container, representation, conditions));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async setRepresentation(identifier: ResourceIdentifier, representation: Representation,
|
public async setRepresentation(identifier: ResourceIdentifier, representation: Representation,
|
||||||
conditions?: Conditions): Promise<void> {
|
conditions?: Conditions): Promise<void> {
|
||||||
return this.lockedRun(identifier,
|
return this.locks.withWriteLock(identifier,
|
||||||
async(): Promise<void> => this.source.setRepresentation(identifier, representation, conditions));
|
async(): Promise<void> => this.source.setRepresentation(identifier, representation, conditions));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise<void> {
|
public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise<void> {
|
||||||
return this.lockedRun(identifier, async(): Promise<void> => this.source.deleteResource(identifier, conditions));
|
return this.locks.withWriteLock(identifier,
|
||||||
|
async(): Promise<void> => this.source.deleteResource(identifier, conditions));
|
||||||
}
|
}
|
||||||
|
|
||||||
public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise<void> {
|
public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise<void> {
|
||||||
return this.lockedRun(identifier,
|
return this.locks.withWriteLock(identifier,
|
||||||
async(): Promise<void> => this.source.modifyResource(identifier, patch, conditions));
|
async(): Promise<void> => this.source.modifyResource(identifier, patch, conditions));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquires a lock for the identifier and releases it when the function is executed.
|
* Acquires a lock that is only released when all data of the resulting representation data has been read,
|
||||||
* @param identifier - Identifier that should be locked.
|
* an error occurs, or the timeout has been triggered.
|
||||||
* @param func - Function to be executed.
|
* The resulting data stream will be adapted to reset the timer every time data is read.
|
||||||
*/
|
*
|
||||||
protected async lockedRun<T>(identifier: ResourceIdentifier, func: () => Promise<T>): Promise<T> {
|
* In case the data of the resulting stream is not needed it should be closed to prevent a timeout error.
|
||||||
const lock = await this.locks.acquire(identifier);
|
|
||||||
try {
|
|
||||||
return await func();
|
|
||||||
} finally {
|
|
||||||
await lock.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Acquires a lock for the identifier that should return a representation with Readable data and releases it when the
|
|
||||||
* Readable is read, closed or results in an error.
|
|
||||||
* When using this function, it is required to close the Readable stream when you are ready.
|
|
||||||
*
|
*
|
||||||
* @param identifier - Identifier that should be locked.
|
* @param identifier - Identifier that should be locked.
|
||||||
* @param func - Function to be executed.
|
* @param whileLocked - Function to be executed while the resource is locked.
|
||||||
*/
|
*/
|
||||||
protected async lockedRepresentationRun(identifier: ResourceIdentifier, func: () => Promise<Representation>):
|
protected async lockedRepresentationRun(identifier: ResourceIdentifier, whileLocked: () => Promise<Representation>):
|
||||||
Promise<Representation> {
|
Promise<Representation> {
|
||||||
const lock = await this.locks.acquire(identifier);
|
// Create a new Promise that resolves to the resulting Representation
|
||||||
let representation;
|
// while only unlocking when the data has been read (or there's a timeout).
|
||||||
try {
|
// Note that we can't just return the result of `withReadLock` since that promise only
|
||||||
|
// resolves when the stream is finished, while we want `lockedRepresentationRun` to resolve
|
||||||
|
// once we have the Representation.
|
||||||
|
// See https://github.com/solid/community-server/pull/536#discussion_r562467957
|
||||||
|
return new Promise((resolve, reject): void => {
|
||||||
|
let representation: Representation;
|
||||||
// Make the resource time out to ensure that the lock is always released eventually.
|
// Make the resource time out to ensure that the lock is always released eventually.
|
||||||
representation = await func();
|
this.locks.withReadLock(identifier, async(maintainLock): Promise<void> => {
|
||||||
return this.createExpiringRepresentation(representation, lock);
|
representation = await whileLocked();
|
||||||
} finally {
|
resolve(this.createExpiringRepresentation(representation, maintainLock));
|
||||||
// If the representation contains a valid Readable, wait for it to be consumed.
|
|
||||||
const data = representation?.data;
|
|
||||||
if (!data) {
|
|
||||||
await lock.release();
|
|
||||||
} else {
|
|
||||||
// When an error occurs, destroy the readable so the lock is released safely.
|
|
||||||
data.on('error', (): void => data.destroy());
|
|
||||||
|
|
||||||
// An `end` and/or `close` event signals that the readable has been consumed.
|
// Release the lock when an error occurs or the data finished streaming
|
||||||
new Promise((resolve): void => {
|
await this.waitForStreamToEnd(representation.data);
|
||||||
data.on('end', resolve);
|
}).catch((error): void => {
|
||||||
data.on('close', resolve);
|
// Destroy the source stream in case the lock times out
|
||||||
}).then((): any => lock.release(), null);
|
representation?.data.destroy(error);
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// Let this function return an error in case something went wrong getting the data
|
||||||
* Wraps a representation to make it time out when nothing is read for a certain amount of time.
|
// or in case the timeout happens before `func` returned
|
||||||
*
|
reject(error);
|
||||||
* @param source - The representation to wrap
|
});
|
||||||
* @param lock - The lock for the corresponding identifier.
|
|
||||||
*/
|
|
||||||
protected createExpiringRepresentation(source: Representation, lock: ExpiringLock): Representation {
|
|
||||||
return Object.create(source, {
|
|
||||||
data: { value: this.createExpiringReadable(source.data, lock) },
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps a readable to make it time out when nothing is read for a certain amount of time.
|
* Wraps a representation to make it reset the timeout timer every time data is read.
|
||||||
*
|
*
|
||||||
* @param source - The readable to wrap
|
* @param representation - The representation to wrap
|
||||||
* @param lock - The lock for the corresponding identifier.
|
* @param maintainLock - Function to call to reset the timer.
|
||||||
*/
|
*/
|
||||||
protected createExpiringReadable(source: Guarded<Readable>, lock: ExpiringLock): Readable {
|
protected createExpiringRepresentation(representation: Representation, maintainLock: () => void): Representation {
|
||||||
// Destroy the source when a timeout occurs.
|
const source = representation.data;
|
||||||
lock.on('expired', (): void => {
|
// Spy on the source to maintain the lock upon reading.
|
||||||
source.destroy(new Error(`Stream reading timout exceeded`));
|
const data = Object.create(source, {
|
||||||
});
|
|
||||||
|
|
||||||
// Spy on the source to renew the lock upon reading.
|
|
||||||
return Object.create(source, {
|
|
||||||
read: {
|
read: {
|
||||||
value(size: number): any {
|
value(size: number): any {
|
||||||
lock.renew();
|
maintainLock();
|
||||||
return source.read(size);
|
return source.read(size);
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
return new BasicRepresentation(data, representation.metadata, representation.binary);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a promise that resolve when the source stream is finished,
|
||||||
|
* either by ending or emitting an error.
|
||||||
|
* In the case of an error the stream will be destroyed if it hasn't been already.
|
||||||
|
*
|
||||||
|
* @param source - The input stream.
|
||||||
|
*/
|
||||||
|
protected async waitForStreamToEnd(source: Readable): Promise<void> {
|
||||||
|
try {
|
||||||
|
await new Promise((resolve, reject): void => {
|
||||||
|
source.on('error', reject);
|
||||||
|
source.on('end', resolve);
|
||||||
|
source.on('close', resolve);
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
// Destroy the stream in case of errors
|
||||||
|
if (!source.destroyed) {
|
||||||
|
source.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import streamifyArray from 'streamify-array';
|
import type { Readable } from 'stream';
|
||||||
import { RootContainerInitializer } from '../../src/init/RootContainerInitializer';
|
import { RootContainerInitializer } from '../../src/init/RootContainerInitializer';
|
||||||
import { BasicRepresentation } from '../../src/ldp/representation/BasicRepresentation';
|
import { BasicRepresentation } from '../../src/ldp/representation/BasicRepresentation';
|
||||||
import type { Representation } from '../../src/ldp/representation/Representation';
|
import type { Representation } from '../../src/ldp/representation/Representation';
|
||||||
@ -7,19 +7,30 @@ import { DataAccessorBasedStore } from '../../src/storage/DataAccessorBasedStore
|
|||||||
import { LockingResourceStore } from '../../src/storage/LockingResourceStore';
|
import { LockingResourceStore } from '../../src/storage/LockingResourceStore';
|
||||||
import type { ResourceStore } from '../../src/storage/ResourceStore';
|
import type { ResourceStore } from '../../src/storage/ResourceStore';
|
||||||
import { APPLICATION_OCTET_STREAM } from '../../src/util/ContentTypes';
|
import { APPLICATION_OCTET_STREAM } from '../../src/util/ContentTypes';
|
||||||
|
import { InternalServerError } from '../../src/util/errors/InternalServerError';
|
||||||
import { SingleRootIdentifierStrategy } from '../../src/util/identifiers/SingleRootIdentifierStrategy';
|
import { SingleRootIdentifierStrategy } from '../../src/util/identifiers/SingleRootIdentifierStrategy';
|
||||||
import type { ExpiringResourceLocker } from '../../src/util/locking/ExpiringResourceLocker';
|
import type { ExpiringResourceLocker } from '../../src/util/locking/ExpiringResourceLocker';
|
||||||
import type { ResourceLocker } from '../../src/util/locking/ResourceLocker';
|
import type { ResourceLocker } from '../../src/util/locking/ResourceLocker';
|
||||||
import { SingleThreadedResourceLocker } from '../../src/util/locking/SingleThreadedResourceLocker';
|
import { SingleThreadedResourceLocker } from '../../src/util/locking/SingleThreadedResourceLocker';
|
||||||
import { WrappedExpiringResourceLocker } from '../../src/util/locking/WrappedExpiringResourceLocker';
|
import { WrappedExpiringResourceLocker } from '../../src/util/locking/WrappedExpiringResourceLocker';
|
||||||
|
import { guardedStreamFrom } from '../../src/util/StreamUtil';
|
||||||
import { BASE } from './Config';
|
import { BASE } from './Config';
|
||||||
|
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
async function readOnce(stream: Readable): Promise<any> {
|
||||||
|
return await new Promise((resolve): void => {
|
||||||
|
stream.once('data', resolve);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
describe('A LockingResourceStore', (): void => {
|
describe('A LockingResourceStore', (): void => {
|
||||||
let path: string;
|
let path: string;
|
||||||
let store: LockingResourceStore;
|
let store: LockingResourceStore;
|
||||||
let locker: ResourceLocker;
|
let locker: ResourceLocker;
|
||||||
let expiringLocker: ExpiringResourceLocker;
|
let expiringLocker: ExpiringResourceLocker;
|
||||||
let source: ResourceStore;
|
let source: ResourceStore;
|
||||||
|
let getRepresentationSpy: jest.SpyInstance;
|
||||||
|
|
||||||
beforeEach(async(): Promise<void> => {
|
beforeEach(async(): Promise<void> => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
@ -37,71 +48,56 @@ describe('A LockingResourceStore', (): void => {
|
|||||||
|
|
||||||
store = new LockingResourceStore(source, expiringLocker);
|
store = new LockingResourceStore(source, expiringLocker);
|
||||||
|
|
||||||
|
// Spy on a real ResourceLocker and ResourceStore instance
|
||||||
|
getRepresentationSpy = jest.spyOn(source, 'getRepresentation');
|
||||||
|
getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data:
|
||||||
|
guardedStreamFrom([ 1, 2, 3 ]) } as Representation)));
|
||||||
|
|
||||||
// Make sure something is in the store before we read from it in our tests.
|
// Make sure something is in the store before we read from it in our tests.
|
||||||
await store.setRepresentation({ path }, new BasicRepresentation([ 1, 2, 3 ], APPLICATION_OCTET_STREAM));
|
await source.setRepresentation({ path }, new BasicRepresentation([ 1, 2, 3 ], APPLICATION_OCTET_STREAM));
|
||||||
});
|
});
|
||||||
|
|
||||||
it('destroys the stream when nothing is read after 1000ms.', async(): Promise<void> => {
|
it('destroys the stream when nothing is read after 1000ms.', async(): Promise<void> => {
|
||||||
jest.useFakeTimers();
|
|
||||||
|
|
||||||
// Spy on a real ResourceLocker and ResourceStore instance
|
|
||||||
const acquireSpy = jest.spyOn(expiringLocker, 'acquire');
|
|
||||||
const getRepresentationSpy = jest.spyOn(source, 'getRepresentation');
|
|
||||||
getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data:
|
|
||||||
streamifyArray([ 1, 2, 3 ]) } as Representation)));
|
|
||||||
|
|
||||||
const representation = await store.getRepresentation({ path }, {});
|
const representation = await store.getRepresentation({ path }, {});
|
||||||
const errorCallback = jest.fn();
|
const errorCallback = jest.fn();
|
||||||
representation.data.on('error', errorCallback);
|
representation.data.on('error', errorCallback);
|
||||||
|
|
||||||
// Wait 1000ms and read
|
// Wait 1000ms and read
|
||||||
jest.advanceTimersByTime(1000);
|
jest.advanceTimersByTime(1000);
|
||||||
expect(representation.data.read()).toBeNull();
|
await new Promise(setImmediate);
|
||||||
|
expect(representation.data.destroyed).toBe(true);
|
||||||
|
|
||||||
// Verify a timeout error was thrown
|
// Verify a timeout error was thrown
|
||||||
await new Promise((resolve): any => setImmediate(resolve));
|
|
||||||
expect(errorCallback).toHaveBeenCalledTimes(1);
|
expect(errorCallback).toHaveBeenCalledTimes(1);
|
||||||
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded'));
|
expect(errorCallback).toHaveBeenLastCalledWith(new InternalServerError(`Lock expired after 1000ms on ${path}`));
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(acquireSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(acquireSpy).toHaveBeenLastCalledWith({ path });
|
|
||||||
expect(getRepresentationSpy).toHaveBeenCalledTimes(1);
|
expect(getRepresentationSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise<void> => {
|
it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise<void> => {
|
||||||
jest.useFakeTimers();
|
|
||||||
|
|
||||||
// Spy on a real ResourceLocker and ResourceStore instance
|
|
||||||
const acquireSpy = jest.spyOn(expiringLocker, 'acquire');
|
|
||||||
const getRepresentationSpy = jest.spyOn(source, 'getRepresentation');
|
|
||||||
getRepresentationSpy.mockReturnValue(new Promise((resolve): any => resolve({ data:
|
|
||||||
streamifyArray([ 1, 2, 3 ]) } as Representation)));
|
|
||||||
|
|
||||||
const representation = await store.getRepresentation({ path }, {});
|
const representation = await store.getRepresentation({ path }, {});
|
||||||
const errorCallback = jest.fn();
|
const errorCallback = jest.fn();
|
||||||
representation.data.on('error', errorCallback);
|
representation.data.on('error', errorCallback);
|
||||||
|
|
||||||
// Wait 750ms and read
|
// Wait 750ms and read
|
||||||
jest.advanceTimersByTime(750);
|
jest.advanceTimersByTime(750);
|
||||||
expect(representation.data.read()).toBe(1);
|
await expect(readOnce(representation.data)).resolves.toBe(1);
|
||||||
|
|
||||||
// Wait 750ms and read
|
// Wait 750ms and read
|
||||||
jest.advanceTimersByTime(750);
|
jest.advanceTimersByTime(750);
|
||||||
expect(representation.data.read()).toBe(2);
|
await expect(readOnce(representation.data)).resolves.toBe(2);
|
||||||
|
|
||||||
// Wait 1000ms and watch the stream be destroyed
|
// Wait 1000ms and watch the stream be destroyed
|
||||||
jest.advanceTimersByTime(1000);
|
jest.advanceTimersByTime(1000);
|
||||||
expect(representation.data.read()).toBeNull();
|
await new Promise(setImmediate);
|
||||||
|
expect(representation.data.destroyed).toBe(true);
|
||||||
|
|
||||||
// Verify a timeout error was thrown
|
// Verify a timeout error was thrown
|
||||||
await new Promise((resolve): any => setImmediate(resolve));
|
|
||||||
expect(errorCallback).toHaveBeenCalledTimes(1);
|
expect(errorCallback).toHaveBeenCalledTimes(1);
|
||||||
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout exceeded'));
|
expect(errorCallback).toHaveBeenLastCalledWith(new InternalServerError(`Lock expired after 1000ms on ${path}`));
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(acquireSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(acquireSpy).toHaveBeenLastCalledWith({ path });
|
|
||||||
expect(getRepresentationSpy).toHaveBeenCalledTimes(1);
|
expect(getRepresentationSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -1,174 +1,169 @@
|
|||||||
import type { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
import streamifyArray from 'streamify-array';
|
|
||||||
import type { Patch } from '../../../src/ldp/http/Patch';
|
import type { Patch } from '../../../src/ldp/http/Patch';
|
||||||
import type { Representation } from '../../../src/ldp/representation/Representation';
|
import type { Representation } from '../../../src/ldp/representation/Representation';
|
||||||
|
import type { ResourceIdentifier } from '../../../src/ldp/representation/ResourceIdentifier';
|
||||||
import { LockingResourceStore } from '../../../src/storage/LockingResourceStore';
|
import { LockingResourceStore } from '../../../src/storage/LockingResourceStore';
|
||||||
import type { ResourceStore } from '../../../src/storage/ResourceStore';
|
import type { ResourceStore } from '../../../src/storage/ResourceStore';
|
||||||
import type { ExpiringLock } from '../../../src/util/locking/ExpiringLock';
|
|
||||||
import type { ExpiringResourceLocker } from '../../../src/util/locking/ExpiringResourceLocker';
|
import type { ExpiringResourceLocker } from '../../../src/util/locking/ExpiringResourceLocker';
|
||||||
|
import { guardedStreamFrom } from '../../../src/util/StreamUtil';
|
||||||
|
|
||||||
|
function emptyFn(): void {
|
||||||
|
// Empty
|
||||||
|
}
|
||||||
|
|
||||||
describe('A LockingResourceStore', (): void => {
|
describe('A LockingResourceStore', (): void => {
|
||||||
let store: LockingResourceStore;
|
let store: LockingResourceStore;
|
||||||
let locker: ExpiringResourceLocker;
|
let locker: ExpiringResourceLocker;
|
||||||
let lock: ExpiringLock;
|
|
||||||
let release: () => Promise<void>;
|
|
||||||
let renew: () => void;
|
|
||||||
let source: ResourceStore;
|
let source: ResourceStore;
|
||||||
let order: string[];
|
let order: string[];
|
||||||
let funcOnEmit: () => any;
|
let timeoutTrigger: EventEmitter;
|
||||||
|
|
||||||
beforeEach(async(): Promise<void> => {
|
beforeEach(async(): Promise<void> => {
|
||||||
jest.clearAllMocks();
|
|
||||||
|
|
||||||
order = [];
|
order = [];
|
||||||
function delayedResolve(resolve: (value: any) => void, name: string, resolveValue?: any): void {
|
function addOrder<T>(name: string, input?: T): T | undefined {
|
||||||
// `setImmediate` is introduced to make sure the promise doesn't execute immediately
|
order.push(name);
|
||||||
setImmediate((): void => {
|
return input;
|
||||||
order.push(name);
|
|
||||||
resolve(resolveValue);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const readable = streamifyArray([ 1, 2, 3 ]);
|
const readable = guardedStreamFrom([ 1, 2, 3 ]);
|
||||||
|
const { destroy } = readable;
|
||||||
|
readable.destroy = jest.fn((error): void => destroy.call(readable, error));
|
||||||
source = {
|
source = {
|
||||||
getRepresentation: jest.fn(async(): Promise<any> =>
|
getRepresentation: jest.fn((): any => addOrder('getRepresentation', { data: readable } as Representation)),
|
||||||
new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation', { data: readable } as
|
addResource: jest.fn((): any => addOrder('addResource')),
|
||||||
Representation))),
|
setRepresentation: jest.fn((): any => addOrder('setRepresentation')),
|
||||||
addResource: jest.fn(async(): Promise<any> =>
|
deleteResource: jest.fn((): any => addOrder('deleteResource')),
|
||||||
new Promise((resolve): any => delayedResolve(resolve, 'addResource'))),
|
modifyResource: jest.fn((): any => addOrder('modifyResource')),
|
||||||
setRepresentation: jest.fn(async(): Promise<any> =>
|
|
||||||
new Promise((resolve): any => delayedResolve(resolve, 'setRepresentation'))),
|
|
||||||
deleteResource: jest.fn(async(): Promise<any> =>
|
|
||||||
new Promise((resolve): any => delayedResolve(resolve, 'deleteResource'))),
|
|
||||||
modifyResource: jest.fn(async(): Promise<any> =>
|
|
||||||
new Promise((resolve): any => delayedResolve(resolve, 'modifyResource'))),
|
|
||||||
};
|
};
|
||||||
release = jest.fn(async(): Promise<any> => order.push('release'));
|
|
||||||
renew = jest.fn();
|
timeoutTrigger = new EventEmitter();
|
||||||
funcOnEmit = (): any => true;
|
|
||||||
|
|
||||||
locker = {
|
locker = {
|
||||||
acquire: jest.fn(async(): Promise<any> => {
|
withReadLock: jest.fn(async <T>(id: ResourceIdentifier,
|
||||||
order.push('acquire');
|
whileLocked: (maintainLock: () => void) => T | Promise<T>): Promise<T> => {
|
||||||
lock = {
|
order.push('lock read');
|
||||||
release,
|
try {
|
||||||
renew,
|
// Allows simulating a timeout event
|
||||||
on(event: string, func: () => void): void {
|
const timeout = new Promise<never>((resolve, reject): any => timeoutTrigger.on('timeout', (): void => {
|
||||||
if (event === 'expired') {
|
order.push('timeout');
|
||||||
funcOnEmit = func;
|
reject(new Error('timeout'));
|
||||||
}
|
}));
|
||||||
},
|
return await Promise.race([ Promise.resolve(whileLocked(emptyFn)), timeout ]);
|
||||||
emit(event: string): void {
|
} finally {
|
||||||
if (event === 'expired') {
|
order.push('unlock read');
|
||||||
funcOnEmit();
|
}
|
||||||
}
|
}),
|
||||||
},
|
withWriteLock: jest.fn(async <T>(identifier: ResourceIdentifier,
|
||||||
} as unknown as ExpiringLock;
|
whileLocked: (maintainLock: () => void) => T | Promise<T>): Promise<T> => {
|
||||||
return lock;
|
order.push('lock write');
|
||||||
|
try {
|
||||||
|
return await whileLocked(emptyFn);
|
||||||
|
} finally {
|
||||||
|
order.push('unlock write');
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
store = new LockingResourceStore(source, locker);
|
store = new LockingResourceStore(source, locker);
|
||||||
});
|
});
|
||||||
|
|
||||||
async function registerEventOrder(eventSource: EventEmitter, event: string): Promise<void> {
|
function registerEventOrder(eventSource: EventEmitter, event: string): void {
|
||||||
await new Promise((resolve): any => {
|
eventSource.on(event, (): void => {
|
||||||
eventSource.prependListener(event, (): any => {
|
order.push(event);
|
||||||
order.push(event);
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
it('acquires a lock on the container when adding a representation.', async(): Promise<void> => {
|
it('acquires a lock on the container when adding a representation.', async(): Promise<void> => {
|
||||||
await store.addResource({ path: 'path' }, {} as Representation);
|
await store.addResource({ path: 'path' }, {} as Representation);
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withWriteLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.addResource).toHaveBeenCalledTimes(1);
|
expect(source.addResource).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock write', 'addResource', 'unlock write' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'addResource', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('acquires a lock on the resource when setting its representation.', async(): Promise<void> => {
|
it('acquires a lock on the resource when setting its representation.', async(): Promise<void> => {
|
||||||
await store.setRepresentation({ path: 'path' }, {} as Representation);
|
await store.setRepresentation({ path: 'path' }, {} as Representation);
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withWriteLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.setRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.setRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock write', 'setRepresentation', 'unlock write' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'setRepresentation', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('acquires a lock on the resource when deleting it.', async(): Promise<void> => {
|
it('acquires a lock on the resource when deleting it.', async(): Promise<void> => {
|
||||||
await store.deleteResource({ path: 'path' });
|
await store.deleteResource({ path: 'path' });
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withWriteLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.deleteResource).toHaveBeenCalledTimes(1);
|
expect(source.deleteResource).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock write', 'deleteResource', 'unlock write' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'deleteResource', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('acquires a lock on the resource when modifying its representation.', async(): Promise<void> => {
|
it('acquires a lock on the resource when modifying its representation.', async(): Promise<void> => {
|
||||||
await store.modifyResource({ path: 'path' }, {} as Patch);
|
await store.modifyResource({ path: 'path' }, {} as Patch);
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withWriteLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withWriteLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.modifyResource).toHaveBeenCalledTimes(1);
|
expect(source.modifyResource).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock write', 'modifyResource', 'unlock write' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'modifyResource', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('releases the lock if an error was thrown.', async(): Promise<void> => {
|
it('releases the lock if an error was thrown.', async(): Promise<void> => {
|
||||||
source.getRepresentation = async(): Promise<any> => {
|
source.getRepresentation = async(): Promise<any> => {
|
||||||
|
order.push('bad get');
|
||||||
throw new Error('dummy');
|
throw new Error('dummy');
|
||||||
};
|
};
|
||||||
await expect(store.getRepresentation({ path: 'path' }, {})).rejects.toThrow('dummy');
|
await expect(store.getRepresentation({ path: 'path' }, {})).rejects.toThrow('dummy');
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock read', 'bad get', 'unlock read' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('releases the lock on the resource when data has been read.', async(): Promise<void> => {
|
it('releases the lock on the resource when data has been read.', async(): Promise<void> => {
|
||||||
// Read all data from the representation
|
// Read all data from the representation
|
||||||
const representation = await store.getRepresentation({ path: 'path' }, {});
|
const representation = await store.getRepresentation({ path: 'path' }, {});
|
||||||
representation.data.on('data', (): any => true);
|
representation.data.on('data', (): any => true);
|
||||||
await registerEventOrder(representation.data, 'end');
|
registerEventOrder(representation.data, 'end');
|
||||||
|
|
||||||
|
// Provide opportunity for async events
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock read', 'getRepresentation', 'end', 'unlock read' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('destroys the resource and releases the lock when the readable errors.', async(): Promise<void> => {
|
it('destroys the resource and releases the lock when the readable errors.', async(): Promise<void> => {
|
||||||
// Make the representation error
|
// Make the representation error
|
||||||
const representation = await store.getRepresentation({ path: 'path' }, {});
|
const representation = await store.getRepresentation({ path: 'path' }, {});
|
||||||
// eslint-disable-next-line jest/valid-expect-in-promise
|
setImmediate((): any => representation.data.emit('error', new Error('Error on the readable')));
|
||||||
Promise.resolve().then((): any =>
|
registerEventOrder(representation.data, 'error');
|
||||||
representation.data.emit('error', new Error('Error on the readable')), null);
|
registerEventOrder(representation.data, 'close');
|
||||||
await registerEventOrder(representation.data, 'error');
|
|
||||||
await registerEventOrder(representation.data, 'close');
|
// Provide opportunity for async events
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(representation.data.destroy).toHaveBeenCalledTimes(1);
|
||||||
expect(order).toEqual([ 'acquire', 'getRepresentation', 'error', 'close', 'release' ]);
|
expect(order).toEqual([ 'lock read', 'getRepresentation', 'error', 'unlock read', 'close' ]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('releases the lock on the resource when readable is destroyed.', async(): Promise<void> => {
|
it('releases the lock on the resource when readable is destroyed.', async(): Promise<void> => {
|
||||||
// Make the representation close
|
// Make the representation close
|
||||||
const representation = await store.getRepresentation({ path: 'path' }, {});
|
const representation = await store.getRepresentation({ path: 'path' }, {});
|
||||||
representation.data.destroy();
|
representation.data.destroy();
|
||||||
await registerEventOrder(representation.data, 'close');
|
registerEventOrder(representation.data, 'close');
|
||||||
|
|
||||||
|
// Provide opportunity for async events
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock read', 'getRepresentation', 'close', 'unlock read' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('releases the lock only once when multiple events are triggered.', async(): Promise<void> => {
|
it('releases the lock only once when multiple events are triggered.', async(): Promise<void> => {
|
||||||
@ -179,27 +174,52 @@ describe('A LockingResourceStore', (): void => {
|
|||||||
order.push('end');
|
order.push('end');
|
||||||
representation.data.destroy();
|
representation.data.destroy();
|
||||||
});
|
});
|
||||||
await registerEventOrder(representation.data, 'close');
|
registerEventOrder(representation.data, 'close');
|
||||||
|
|
||||||
|
// Provide opportunity for async events
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(order).toEqual([ 'lock read', 'getRepresentation', 'end', 'close', 'unlock read' ]);
|
||||||
expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'close', 'release' ]);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('releases the lock on the resource when readable times out.', async(): Promise<void> => {
|
it('releases the lock on the resource when readable times out.', async(): Promise<void> => {
|
||||||
// Make the representation time out
|
|
||||||
const representation = await store.getRepresentation({ path: 'path' }, {});
|
const representation = await store.getRepresentation({ path: 'path' }, {});
|
||||||
lock.emit('expired');
|
registerEventOrder(representation.data, 'close');
|
||||||
await registerEventOrder(representation.data, 'close');
|
registerEventOrder(representation.data, 'error');
|
||||||
|
|
||||||
|
timeoutTrigger.emit('timeout');
|
||||||
|
|
||||||
|
// Provide opportunity for async events
|
||||||
|
await new Promise(setImmediate);
|
||||||
|
|
||||||
// Verify the lock was acquired and released at the right time
|
// Verify the lock was acquired and released at the right time
|
||||||
expect(locker.acquire).toHaveBeenCalledTimes(1);
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
expect(lock.release).toHaveBeenCalledTimes(1);
|
expect(representation.data.destroy).toHaveBeenCalledTimes(1);
|
||||||
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
|
expect(representation.data.destroy).toHaveBeenLastCalledWith(new Error('timeout'));
|
||||||
|
expect(order).toEqual([ 'lock read', 'getRepresentation', 'timeout', 'unlock read', 'error', 'close' ]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('throws an error if a timeout happens before getting a resource.', async(): Promise<void> => {
|
||||||
|
source.getRepresentation = jest.fn(async(): Promise<any> => {
|
||||||
|
order.push('useless get');
|
||||||
|
// This will never resolve
|
||||||
|
return new Promise(emptyFn);
|
||||||
|
});
|
||||||
|
|
||||||
|
const prom = store.getRepresentation({ path: 'path' }, {});
|
||||||
|
|
||||||
|
timeoutTrigger.emit('timeout');
|
||||||
|
|
||||||
|
await expect(prom).rejects.toThrow(new Error('timeout'));
|
||||||
|
expect(locker.withReadLock).toHaveBeenCalledTimes(1);
|
||||||
|
expect((locker.withReadLock as jest.Mock).mock.calls[0][0]).toEqual({ path: 'path' });
|
||||||
|
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
|
||||||
|
expect(order).toEqual([ 'lock read', 'useless get', 'timeout', 'unlock read' ]);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
Loading…
x
Reference in New Issue
Block a user