fix: Release lock only when stream ends or is abandoned (#60)

* fix: Release lock only when stream has ended reading or an error occurs

* refactor: Refactor code and tests

* refactor: Move function mock to onBefore and remove unnecessary data drain

* fix: Make functions protected, add extra listener and add extra tests

* fix: Add extra TSDoc comment

* fix: Adjust tests to expect both end and close event

* refactor: Move test to other file

* refactor: make lockedRun method-independent

* fix: ensure lock release happens only once

* fix: make locked resources time out

* fix: destroy readable on error

Co-authored-by: Ruben Verborgh <ruben@verborgh.org>
This commit is contained in:
smessie 2020-08-20 10:51:19 +02:00 committed by GitHub
parent b489d69bad
commit aa510bc6b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 229 additions and 27 deletions

View File

@ -1,12 +1,16 @@
import { AtomicResourceStore } from './AtomicResourceStore';
import { Conditions } from './Conditions';
import { Patch } from '../ldp/http/Patch';
import { Readable } from 'stream';
import { Representation } from '../ldp/representation/Representation';
import { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences';
import { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
import { ResourceLocker } from './ResourceLocker';
import { ResourceStore } from './ResourceStore';
/** Time in ms after which reading a representation times out, causing the lock to be released. */
const READ_TIMEOUT = 1000;
/**
* Store that for every call acquires a lock before executing it on the requested resource,
* and releases it afterwards.
@ -20,34 +24,39 @@ export class LockingResourceStore implements AtomicResourceStore {
this.locks = locks;
}
public async getRepresentation(identifier: ResourceIdentifier, preferences: RepresentationPreferences,
conditions?: Conditions): Promise<Representation> {
return this.lockedRepresentationRun(identifier,
async(): Promise<Representation> => this.source.getRepresentation(identifier, preferences, conditions));
}
public async addResource(container: ResourceIdentifier, representation: Representation,
conditions?: Conditions): Promise<ResourceIdentifier> {
return this.lockedRun(container,
async(): Promise<ResourceIdentifier> => this.source.addResource(container, representation, conditions));
}
public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise<void> {
return this.lockedRun(identifier, async(): Promise<void> => this.source.deleteResource(identifier, conditions));
}
public async getRepresentation(identifier: ResourceIdentifier, preferences: RepresentationPreferences,
conditions?: Conditions): Promise<Representation> {
return this.lockedRun(identifier,
async(): Promise<Representation> => this.source.getRepresentation(identifier, preferences, conditions));
}
public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise<void> {
return this.lockedRun(identifier,
async(): Promise<void> => this.source.modifyResource(identifier, patch, conditions));
}
public async setRepresentation(identifier: ResourceIdentifier, representation: Representation,
conditions?: Conditions): Promise<void> {
return this.lockedRun(identifier,
async(): Promise<void> => this.source.setRepresentation(identifier, representation, conditions));
}
private async lockedRun<T>(identifier: ResourceIdentifier, func: () => Promise<T>): Promise<T> {
public async deleteResource(identifier: ResourceIdentifier, conditions?: Conditions): Promise<void> {
return this.lockedRun(identifier, async(): Promise<void> => this.source.deleteResource(identifier, conditions));
}
public async modifyResource(identifier: ResourceIdentifier, patch: Patch, conditions?: Conditions): Promise<void> {
return this.lockedRun(identifier,
async(): Promise<void> => this.source.modifyResource(identifier, patch, conditions));
}
/**
* Acquires a lock for the identifier and releases it when the function is executed.
* @param identifier - Identifier that should be locked.
* @param func - Function to be executed.
*/
protected async lockedRun<T>(identifier: ResourceIdentifier, func: () => Promise<T>): Promise<T> {
const lock = await this.locks.acquire(identifier);
try {
return await func();
@ -55,4 +64,77 @@ export class LockingResourceStore implements AtomicResourceStore {
await lock.release();
}
}
/**
* Acquires a lock for the identifier that should return a representation with Readable data and releases it when the
* Readable is read, closed or results in an error.
* When using this function, it is required to close the Readable stream when you are ready.
*
* @param identifier - Identifier that should be locked.
* @param func - Function to be executed.
*/
protected async lockedRepresentationRun(identifier: ResourceIdentifier, func: () => Promise<Representation>):
Promise<Representation> {
const lock = await this.locks.acquire(identifier);
let representation;
try {
// Make the resource time out to ensure that the lock is always released eventually.
representation = await func();
return this.createExpiringRepresentation(representation);
} finally {
// If the representation contains a valid Readable, wait for it to be consumed.
const data = representation?.data;
if (!data) {
await lock.release();
} else {
// When an error occurs, destroy the readable so the lock is released safely.
data.on('error', (): void => data.destroy());
// An `end` and/or `close` event signals that the readable has been consumed.
new Promise((resolve): void => {
data.on('end', resolve);
data.on('close', resolve);
}).then((): any => lock.release(), null);
}
}
}
/**
* Wraps a representation to make it time out when nothing is read for a certain amount of time.
*
* @param source - The representation to wrap
*/
protected createExpiringRepresentation(source: Representation): Representation {
return Object.create(source, {
data: { value: this.createExpiringReadable(source.data) },
});
}
/**
* Wraps a readable to make it time out when nothing is read for a certain amount of time.
*
* @param source - The readable to wrap
*/
protected createExpiringReadable(source: Readable): Readable {
// Destroy the source when a timeout occurs.
const destroySource = (): void =>
source.destroy(new Error(`Stream reading timout of ${READ_TIMEOUT}ms exceeded`));
let timeout = setTimeout(destroySource, READ_TIMEOUT);
// Cancel the timeout when the source terminates by itself.
const cancelTimeout = (): void => clearTimeout(timeout);
source.on('error', cancelTimeout);
source.on('end', cancelTimeout);
// Spy on the source to reset the timeout on read.
return Object.create(source, {
read: {
value(size: number): any {
cancelTimeout();
timeout = setTimeout(destroySource, READ_TIMEOUT);
return source.read(size);
},
},
});
}
}

View File

@ -1,9 +1,11 @@
import { EventEmitter } from 'events';
import { Lock } from '../../../src/storage/Lock';
import { LockingResourceStore } from '../../../src/storage/LockingResourceStore';
import { Patch } from '../../../src/ldp/http/Patch';
import { Representation } from '../../../src/ldp/representation/Representation';
import { ResourceLocker } from '../../../src/storage/ResourceLocker';
import { ResourceStore } from '../../../src/storage/ResourceStore';
import streamifyArray from 'streamify-array';
describe('A LockingResourceStore', (): void => {
let store: LockingResourceStore;
@ -15,17 +17,19 @@ describe('A LockingResourceStore', (): void => {
beforeEach(async(): Promise<void> => {
order = [];
const delayedResolve = (resolve: () => void, name: string): void => {
const delayedResolve = (resolve: (resolveParams: any) => void, name: string, resolveParams?: any): void => {
// `setImmediate` is introduced to make sure the promise doesn't execute immediately
setImmediate((): void => {
order.push(name);
resolve();
resolve(resolveParams);
});
};
const readable = streamifyArray([ 1, 2, 3 ]);
source = {
getRepresentation: jest.fn(async(): Promise<any> =>
new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation'))),
new Promise((resolve): any => delayedResolve(resolve, 'getRepresentation', { data: readable } as
Representation))),
addResource: jest.fn(async(): Promise<any> =>
new Promise((resolve): any => delayedResolve(resolve, 'addResource'))),
setRepresentation: jest.fn(async(): Promise<any> =>
@ -46,14 +50,14 @@ describe('A LockingResourceStore', (): void => {
store = new LockingResourceStore(source, locker);
});
it('acquires a lock on the resource when getting it.', async(): Promise<void> => {
await store.getRepresentation({ path: 'path' }, {});
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'release' ]);
});
const registerEventOrder = async(eventSource: EventEmitter, event: string): Promise<void> => {
await new Promise((resolve): any => {
eventSource.prependListener(event, (): any => {
order.push(event);
resolve();
});
});
};
it('acquires a lock on the container when adding a representation.', async(): Promise<void> => {
await store.addResource({ path: 'path' }, {} as Representation);
@ -101,4 +105,120 @@ describe('A LockingResourceStore', (): void => {
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'release' ]);
});
it('releases the lock on the resource when data has been read.', async(): Promise<void> => {
// Read all data from the representation
const representation = await store.getRepresentation({ path: 'path' }, {});
representation.data.on('data', (): any => true);
await registerEventOrder(representation.data, 'end');
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'release' ]);
});
it('destroys the resource and releases the lock when the readable errors.', async(): Promise<void> => {
// Make the representation error
const representation = await store.getRepresentation({ path: 'path' }, {});
Promise.resolve().then((): any =>
representation.data.emit('error', new Error('Error on the readable')), null);
await registerEventOrder(representation.data, 'error');
await registerEventOrder(representation.data, 'close');
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'error', 'close', 'release' ]);
});
it('releases the lock on the resource when readable is destroyed.', async(): Promise<void> => {
// Make the representation close
const representation = await store.getRepresentation({ path: 'path' }, {});
representation.data.destroy();
await registerEventOrder(representation.data, 'close');
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
});
it('releases the lock only once when multiple events are triggered.', async(): Promise<void> => {
// Read all data from the representation and trigger an additional close event
const representation = await store.getRepresentation({ path: 'path' }, {});
representation.data.on('data', (): any => true);
representation.data.prependListener('end', (): any => {
order.push('end');
representation.data.destroy();
});
await registerEventOrder(representation.data, 'close');
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'end', 'close', 'release' ]);
});
it('destroys the stream when nothing is read after 1000ms.', async(): Promise<void> => {
jest.useFakeTimers();
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();
representation.data.on('error', errorCallback);
// Wait 1000ms and read
jest.advanceTimersByTime(1000);
expect(representation.data.read()).toBe(null);
await registerEventOrder(representation.data, 'close');
// Verify a timeout error was thrown
expect(errorCallback).toHaveBeenCalledTimes(1);
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded'));
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
});
it('destroys the stream when pauses between reads exceed 1000ms.', async(): Promise<void> => {
jest.useFakeTimers();
const representation = await store.getRepresentation({ path: 'path' }, {});
const errorCallback = jest.fn();
representation.data.on('error', errorCallback);
// Wait 750ms and read
jest.advanceTimersByTime(750);
expect(representation.data.read()).toBe(1);
// Wait 750ms and read
jest.advanceTimersByTime(750);
expect(representation.data.read()).toBe(2);
// Wait 1000ms and watch the stream be destroyed
jest.advanceTimersByTime(1000);
expect(representation.data.read()).toBe(null);
await registerEventOrder(representation.data, 'close');
// Verify a timeout error was thrown
expect(errorCallback).toHaveBeenCalledTimes(1);
expect(errorCallback).toHaveBeenLastCalledWith(new Error('Stream reading timout of 1000ms exceeded'));
// Verify the lock was acquired and released at the right time
expect(locker.acquire).toHaveBeenCalledTimes(1);
expect(locker.acquire).toHaveBeenLastCalledWith({ path: 'path' });
expect(source.getRepresentation).toHaveBeenCalledTimes(1);
expect(lock.release).toHaveBeenCalledTimes(1);
expect(order).toEqual([ 'acquire', 'getRepresentation', 'close', 'release' ]);
});
});