feat: add a process-/thread-safe file-based ResourceLocker

test: unit test succeeds

fix: not quiting loop when releasing unexisting lock

refactor: pull wait() function into TimerUtils

feat: store all locks inside a single lock folder

feat: use md5 hashing for filepath hashes

test: coverage back to 100%

fix: store locks in proper .internal/locks folder
feat: reworked tryfn

test: coverage back to 100%

buidl: package json types next to lib

style: linting

dos: add more documentation to Locker classes

refactor: SingleThreadedResourceLocker -> MemoryResourceLocker

refactor: MultiThreadedResourceLocker -> FileSystemResourceLocker

feat: update all file-based backend configs to use the new FileSystemResourceLocker

feat: add warning on starting the MemoryResourceLocker in a worker process

test: coverage back to 100%

fix: finalizer of file.json was configured wrong

docs: updated release notes for 5.0.0

refactor: incorporated changes so far

refactor: retryFunctions are less complex now

test: jitter fix
This commit is contained in:
Thomas Dupont 2022-04-27 14:46:23 +02:00 committed by Joachim Van Herwegen
parent 7e5483a36d
commit fa78bc6856
24 changed files with 579 additions and 89 deletions

View File

@ -5,6 +5,7 @@
- Support for Node v12 was dropped.
- Components.js was upgraded to v5. If you have created an external component
you should also upgrade to prevent warnings and conflicts.
- A new FileSystemResourceLocker has been added. It allows for true threadsafe locking without external dependencies.
### Data migration
The following actions are required if you are upgrading from a v4 server and want to retain your data.
@ -19,14 +20,18 @@ The `@context` needs to be updated to
The following changes pertain to the imports in the default configs:
- The prefix of all imports was changed from `files-scs` to `css`.
- All default configurations with a file-based backend now use a file-based locker instead of a memory-based one,
making them threadsafe.
The following changes are relevant for v3 custom configs that replaced certain features.
- `config/app/variables/cli.json` was changed to support the new `YargsCliExtractor` format.
- `config/util/resource-locker/memory.json` had the locker @type changed from `SingleThreadedResourceLocker` to `MemoryResourceLocker`.
### Interface changes
These changes are relevant if you wrote custom modules for the server that depend on existing interfaces.
- `YargsCliExtractor` was changed to now take as input an array of parameter objects.
- `RedirectAllHttpHandler` was removed and fully replaced by `RedirectingHttpHandler`.
- `SingleThreadedResourceLocker` has been renamed to `MemoryResourceLocker`.
## v4.0.0
### New features

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json"
],
"@graph": [

View File

@ -29,7 +29,7 @@
"css:config/util/index/default.json",
"css:config/util/logging/winston.json",
"css:config/util/representation-conversion/default.json",
"css:config/util/resource-locker/memory.json",
"css:config/util/resource-locker/file.json",
"css:config/util/variables/default.json",
"css:config/storage/backend/data-accessors/file.json",

View File

@ -37,8 +37,9 @@ to the ChainedConverter list.
## Resource-locker
Which locking mechanism to use to for example prevent 2 write simultaneous write requests.
* *debug-void*: No locking mechanism, does not prevent simultaneous read/writes.
* *file*: Uses a file-system based locking mechanism (process-safe/thread-safe).
* *memory*: Uses an in-memory locking mechanism.
* *redis*: Uses a Redis store for locking that supports threadsafe read-write locking.
* *redis*: Uses a Redis store for locking that supports threadsafe read-write locking (process-safe/thread-safe).
## Variables
Various variables used by other options.

View File

@ -0,0 +1,33 @@
{
"@context": "https://linkedsoftwaredependencies.org/bundles/npm/@solid/community-server/^4.0.0/components/context.jsonld",
"@graph": [
{
"comment": "Allows multiple simultaneous read operations. Locks are stored on filesystem. Locks expire after inactivity. This locker is threadsafe.",
"@id": "urn:solid-server:default:ResourceLocker",
"@type": "WrappedExpiringReadWriteLocker",
"locker": {
"@type": "GreedyReadWriteLocker",
"locker": {
"@id": "urn:solid-server:default:FileSystemResourceLocker",
"@type": "FileSystemResourceLocker",
"args_rootFilePath": { "@id": "urn:solid-server:default:variable:rootFilePath" }
},
"storage": {
"@id": "urn:solid-server:default:LockStorage"
},
"suffixes_count": "count",
"suffixes_read": "read",
"suffixes_write": "write"
},
"expiration": 3000
},
{
"comment": "Makes sure the lock folder is cleared and delete when the application needs to stop.",
"@id": "urn:solid-server:default:Finalizer",
"@type": "ParallelFinalizer",
"finalizers": [
{ "@id": "urn:solid-server:default:FileSystemResourceLocker" }
]
}
]
}

View File

@ -8,7 +8,7 @@
"locker": {
"@type": "GreedyReadWriteLocker",
"locker": {
"@type": "SingleThreadedResourceLocker"
"@type": "MemoryResourceLocker"
},
"storage": { "@id": "urn:solid-server:default:LockStorage" },
"suffixes_count": "count",

65
package-lock.json generated
View File

@ -51,6 +51,7 @@
"n3": "^1.16.0",
"nodemailer": "^6.7.2",
"oidc-provider": "^7.10.6",
"proper-lockfile": "^4.1.2",
"pump": "^3.0.0",
"punycode": "^2.1.1",
"rdf-dereference": "^2.0.0",
@ -77,6 +78,7 @@
"@types/cheerio": "^0.22.30",
"@types/ejs": "^3.1.0",
"@types/jest": "^27.4.0",
"@types/proper-lockfile": "^4.1.2",
"@types/set-cookie-parser": "^2.4.2",
"@types/supertest": "^2.0.11",
"@typescript-eslint/eslint-plugin": "^5.3.0",
@ -3766,6 +3768,15 @@
"integrity": "sha512-QzSuZMBuG5u8HqYz01qtMdg/Jfctlnvj1z/lYnIDXs/golxw0fxtRAHd9KrzjR7Yxz1qVeI00o0kiO3PmVdJ9w==",
"dev": true
},
"node_modules/@types/proper-lockfile": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/@types/proper-lockfile/-/proper-lockfile-4.1.2.tgz",
"integrity": "sha512-kd4LMvcnpYkspDcp7rmXKedn8iJSCoa331zRRamUp5oanKt/CefbEGPQP7G89enz7sKD4bvsr8mHSsC8j5WOvA==",
"dev": true,
"dependencies": {
"@types/retry": "*"
}
},
"node_modules/@types/pump": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@types/pump/-/pump-1.1.1.tgz",
@ -3815,6 +3826,12 @@
"@types/node": "*"
}
},
"node_modules/@types/retry": {
"version": "0.12.1",
"resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz",
"integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==",
"dev": true
},
"node_modules/@types/semver": {
"version": "7.3.6",
"resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.3.6.tgz",
@ -11747,6 +11764,16 @@
"dev": true,
"peer": true
},
"node_modules/proper-lockfile": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz",
"integrity": "sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==",
"dependencies": {
"graceful-fs": "^4.2.4",
"retry": "^0.12.0",
"signal-exit": "^3.0.2"
}
},
"node_modules/psl": {
"version": "1.8.0",
"resolved": "https://registry.npmjs.org/psl/-/psl-1.8.0.tgz",
@ -12374,6 +12401,14 @@
"lowercase-keys": "^1.0.0"
}
},
"node_modules/retry": {
"version": "0.12.0",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz",
"integrity": "sha1-G0KmJmoh8HQh0bC1S33BZ7AcATs=",
"engines": {
"node": ">= 4"
}
},
"node_modules/reusify": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz",
@ -17490,6 +17525,15 @@
"integrity": "sha512-QzSuZMBuG5u8HqYz01qtMdg/Jfctlnvj1z/lYnIDXs/golxw0fxtRAHd9KrzjR7Yxz1qVeI00o0kiO3PmVdJ9w==",
"dev": true
},
"@types/proper-lockfile": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/@types/proper-lockfile/-/proper-lockfile-4.1.2.tgz",
"integrity": "sha512-kd4LMvcnpYkspDcp7rmXKedn8iJSCoa331zRRamUp5oanKt/CefbEGPQP7G89enz7sKD4bvsr8mHSsC8j5WOvA==",
"dev": true,
"requires": {
"@types/retry": "*"
}
},
"@types/pump": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@types/pump/-/pump-1.1.1.tgz",
@ -17538,6 +17582,12 @@
"@types/node": "*"
}
},
"@types/retry": {
"version": "0.12.1",
"resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.1.tgz",
"integrity": "sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==",
"dev": true
},
"@types/semver": {
"version": "7.3.6",
"resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.3.6.tgz",
@ -23506,6 +23556,16 @@
}
}
},
"proper-lockfile": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/proper-lockfile/-/proper-lockfile-4.1.2.tgz",
"integrity": "sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==",
"requires": {
"graceful-fs": "^4.2.4",
"retry": "^0.12.0",
"signal-exit": "^3.0.2"
}
},
"psl": {
"version": "1.8.0",
"resolved": "https://registry.npmjs.org/psl/-/psl-1.8.0.tgz",
@ -24021,6 +24081,11 @@
"lowercase-keys": "^1.0.0"
}
},
"retry": {
"version": "0.12.0",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.12.0.tgz",
"integrity": "sha1-G0KmJmoh8HQh0bC1S33BZ7AcATs="
},
"reusify": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz",

View File

@ -90,6 +90,7 @@
"@types/node": "^14.18.0",
"@types/nodemailer": "^6.4.4",
"@types/oidc-provider": "^7.8.1",
"@types/proper-lockfile": "^4.1.2",
"@types/pump": "^1.1.1",
"@types/punycode": "^2.1.0",
"@types/sparqljs": "^3.1.3",
@ -117,6 +118,7 @@
"n3": "^1.16.0",
"nodemailer": "^6.7.2",
"oidc-provider": "^7.10.6",
"proper-lockfile": "^4.1.2",
"pump": "^3.0.0",
"punycode": "^2.1.1",
"rdf-dereference": "^2.0.0",

View File

@ -416,11 +416,12 @@ export * from './util/identifiers/SubdomainIdentifierStrategy';
// Util/Locking
export * from './util/locking/ExpiringReadWriteLocker';
export * from './util/locking/EqualReadWriteLocker';
export * from './util/locking/FileSystemResourceLocker';
export * from './util/locking/GreedyReadWriteLocker';
export * from './util/locking/MemoryResourceLocker';
export * from './util/locking/ReadWriteLocker';
export * from './util/locking/RedisLocker';
export * from './util/locking/ResourceLocker';
export * from './util/locking/SingleThreadedResourceLocker';
export * from './util/locking/WrappedExpiringReadWriteLocker';
export * from './util/locking/VoidLocker';

56
src/util/LockUtils.ts Normal file
View File

@ -0,0 +1,56 @@
import { getLoggerFor } from '../logging/LogUtil';
import { InternalServerError } from './errors/InternalServerError';
const logger = getLoggerFor('LockUtil');
/**
* Waits a set amount of time, without consuming cpu, with a set amount of jitter.
* @param delay - How long to wait.
* @param jitter - A fraction of this jitter will be added to the delay.
* @returns A promise that resolves after the specified amount of time.
*/
export async function setJitterTimeout(delay: number, jitter = 0): Promise<void> {
jitter = Math.max(0, Math.floor(Math.random() * jitter));
delay = Math.max(0, delay + jitter);
return new Promise<void>((resolve): any => setTimeout(resolve, delay));
}
export interface AttemptSettings {
/** How many times should an operation be retried. (-1 is indefinitely). */
retryCount?: number;
/** The how long should the next retry be delayed (+ some retryJitter) (in ms). */
retryDelay?: number;
/** Add a fraction of jitter to the original delay each attempt (in ms). */
retryJitter?: number;
}
/**
* Will execute the given function until one of the following cases occurs:
* * The function resolves to a value: the value is returned.
* * The function errors: the rejected error is thrown.
* * The function did not resolve after the set amount of retries:
* the rejected error is returned.
* @param fn - The function to retry. **This function must return a value!**
* @param settings - The options on how to retry the function
*/
export async function retryFunction<T>(fn: () => Promise<T>, settings: Required<AttemptSettings>): Promise<T> {
const { retryCount, retryDelay, retryJitter } = settings;
const maxTries = retryCount === -1 ? Number.POSITIVE_INFINITY : retryCount + 1;
let tries = 1;
let result = await fn();
while (typeof result === 'undefined' && tries < maxTries) {
await setJitterTimeout(retryDelay, retryJitter);
result = await fn();
tries += 1;
}
// Max tries was reached: throw first!
if (tries >= maxTries) {
const err = `The operation did not succeed after the set maximum of tries (${maxTries}).`;
logger.warn(err);
throw new InternalServerError(err);
}
return result;
}

View File

@ -0,0 +1,161 @@
import { createHash } from 'crypto';
import { ensureDirSync, pathExists, readdir, rmdir } from 'fs-extra';
import type { LockOptions, UnlockOptions } from 'proper-lockfile';
import { lock, unlock } from 'proper-lockfile';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { Finalizable } from '../../init/final/Finalizable';
import { getLoggerFor } from '../../logging/LogUtil';
import { createErrorMessage } from '../errors/ErrorUtil';
import { InternalServerError } from '../errors/InternalServerError';
import type { AttemptSettings } from '../LockUtils';
import { retryFunction } from '../LockUtils';
import { joinFilePath } from '../PathUtil';
import type { ResourceLocker } from './ResourceLocker';
const defaultLockOptions: LockOptions = {
// This must be set to false! If not every lock request will try to resolve the path to the file.
// Since however this locker maps all locks to a common internal folder that might be non-existing on start,
// resolving those paths would throw an filesystem error.
realpath: false,
/** The number of retries or a [retry](https://www.npmjs.org/package/retry) options object, defaults to 0 */
retries: 0,
};
const defaultUnlockOptions: UnlockOptions = {
// This must be set to false! If not every lock request will try to resolve the path to the file.
// Since however this locker maps all locks to a common internal folder that might be non-existing on start,
// resolving those paths would throw an filesystem error.
realpath: false,
};
const attemptDefaults: Required<AttemptSettings> = { retryCount: -1, retryDelay: 50, retryJitter: 30 };
/**
* Argument interface of the FileSystemResourceLocker constructor.
*/
interface FileSystemResourceLockerArgs {
/** The rootPath of the filesystem */
rootFilePath?: string;
/** The path to the directory where locks will be stored (appended to rootFilePath) */
lockDirectory?: string;
/** Custom settings concerning retrying locks */
attemptSettings?: AttemptSettings;
}
function isCodedError(err: unknown): err is { code: string } & Error {
return typeof err === 'object' && err !== null && 'code' in err;
}
/**
* A resource locker making use of the [proper-lockfile](https://www.npmjs.com/package/proper-lockfile) library.
* Note that no locks are kept in memory, thus this is considered thread- and process-safe.
*
* This **proper-lockfile** library has its own retry mechanism for the operations, since a lock/unlock call will
* either resolve successfully or reject immediately with the causing error. The retry function of the library
* however will be ignored and replaced by our own LockUtils' {@link retryFunctionUntil} function.
*/
export class FileSystemResourceLocker implements ResourceLocker, Finalizable {
protected readonly logger = getLoggerFor(this);
private readonly attemptSettings: Required<AttemptSettings>;
/** Folder that stores the locks */
private readonly lockFolder: string;
/**
* Create a new FileSystemResourceLocker
* @param rootFilePath - The rootPath of the filesystem _[default is the current dir `./`]_
* @param lockDirectory - The path to the directory where locks will be stored (appended to rootFilePath)
_[default is `/.internal/locks`]_
* @param attemptSettings - Custom settings concerning retrying locks
*/
public constructor(args: FileSystemResourceLockerArgs = {}) {
const { rootFilePath, lockDirectory, attemptSettings } = args;
this.attemptSettings = { ...attemptDefaults, ...attemptSettings };
this.lockFolder = joinFilePath(rootFilePath ?? './', lockDirectory ?? '/.internal/locks');
ensureDirSync(this.lockFolder);
}
/**
* Wrapper function for all (un)lock operations. Any errors coming from the `fn()` will be swallowed.
* Only `ENOTACQUIRED` errors wills be thrown (trying to release lock that didn't exist).
* This wrapper returns undefined because {@link retryFunction} expects that when a retry needs to happne.s
* @param fn - The function reference to swallow errors from.
* @returns Boolean or undefined.
*/
private swallowErrors(fn: () => Promise<unknown>): () => Promise<unknown> {
return async(): Promise<unknown> => {
try {
await fn();
return true;
} catch (err: unknown) {
// Only this error should be thrown
if (isCodedError(err) && err.code === 'ENOTACQUIRED') {
throw err;
}
}
};
}
public async acquire(identifier: ResourceIdentifier): Promise<void> {
const { path } = identifier;
this.logger.debug(`Acquiring lock for ${path}`);
try {
const opt = this.generateOptions(identifier, defaultLockOptions);
await retryFunction(
this.swallowErrors(lock.bind(null, path, opt)),
this.attemptSettings,
);
} catch (err: unknown) {
throw new InternalServerError(`Error trying to acquire lock for ${path}. ${createErrorMessage(err)}`);
}
}
public async release(identifier: ResourceIdentifier): Promise<void> {
const { path } = identifier;
this.logger.debug(`Releasing lock for ${path}`);
try {
const opt = this.generateOptions(identifier, defaultUnlockOptions);
await retryFunction(
this.swallowErrors(unlock.bind(null, path, opt)),
this.attemptSettings,
);
} catch (err: unknown) {
throw new InternalServerError(`Error trying to release lock for ${path}. ${createErrorMessage(err)}`);
}
}
/**
* Map the identifier path to a unique path inside the {@link lockFolder}.
* @param identifier - ResourceIdentifier to generate (Un)LockOptions for.
* @returns Full path.
*/
private toLockfilePath(identifier: ResourceIdentifier): string {
const hash = createHash('md5');
const { path } = identifier;
return joinFilePath(this.lockFolder, hash.update(path).digest('hex'));
}
/**
* Generate LockOptions or UnlockOptions depending on the type of defauls given.
* A custom lockFilePath mapping strategy will be used.
* @param identifier - ResourceIdentifier to generate (Un)LockOptions for
* @param defaults - The default options. (lockFilePath will get overwritten)
* @returns LockOptions or UnlockOptions
*/
private generateOptions<T>(identifier: ResourceIdentifier, defaults: T): T {
const lockfilePath = this.toLockfilePath(identifier);
return {
...defaults,
lockfilePath,
};
}
public async finalize(): Promise<void> {
// Delete lingering locks in the lockFolder.
if (await pathExists(this.lockFolder)) {
for (const dir of await readdir(this.lockFolder)) {
await rmdir(joinFilePath(this.lockFolder, dir));
}
await rmdir(this.lockFolder);
}
}
}

View File

@ -1,3 +1,4 @@
import cluster from 'cluster';
import AsyncLock from 'async-lock';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
@ -10,7 +11,7 @@ import type { ResourceLocker } from './ResourceLocker';
* in a memory leak if locks are never unlocked, so make sure this is covered with expiring locks for example,
* and/or proper `finally` handles.
*/
export class SingleThreadedResourceLocker implements ResourceLocker {
export class MemoryResourceLocker implements ResourceLocker {
protected readonly logger = getLoggerFor(this);
private readonly locker: AsyncLock;
@ -19,6 +20,10 @@ export class SingleThreadedResourceLocker implements ResourceLocker {
public constructor() {
this.locker = new AsyncLock();
this.unlockCallbacks = {};
if (cluster.isWorker) {
this.logger.warn(`MemoryResourceLocker is not thread-safe/process-safe!
You should only use this locker in a single-thread/single-process CSS setup.`);
}
}
public async acquire(identifier: ResourceIdentifier): Promise<void> {

View File

@ -2,21 +2,13 @@ import Redis from 'ioredis';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { Finalizable } from '../../init/final/Finalizable';
import { getLoggerFor } from '../../logging/LogUtil';
import { InternalServerError } from '../errors/InternalServerError';
import type { AttemptSettings } from '../LockUtils';
import { retryFunction } from '../LockUtils';
import type { ReadWriteLocker } from './ReadWriteLocker';
import type { ResourceLocker } from './ResourceLocker';
import type { RedisResourceLock, RedisReadWriteLock } from './scripts/RedisLuaScripts';
import type { RedisResourceLock, RedisReadWriteLock, RedisAnswer } from './scripts/RedisLuaScripts';
import { fromResp2ToBool, REDIS_LUA_SCRIPTS } from './scripts/RedisLuaScripts';
export interface AttemptSettings {
/** How many times should an operation in Redis be retried. (-1 is indefinitely). */
retryCount?: number;
/** The how long should the next retry be delayed (+ some retryJitter) (in ms). */
retryDelay?: number;
/** Add a fraction of jitter to the original delay each attempt (in ms). */
retryJitter?: number;
}
const attemptDefaults: Required<AttemptSettings> = { retryCount: -1, retryDelay: 50, retryJitter: 30 };
// Internal prefix for Redis keys;
@ -42,6 +34,10 @@ const PREFIX_LOCK = '__L__';
* These scripts are used by Redis as a single new command.
* Redis executes its operations in a single thread, as such, each such operation can be considered atomic.
*
* The operation to (un)lock will always resolve with either 1/OK/true if succeeded or 0/false if not succeeded.
* Rejection with errors will be happen on actual failures. Retrying the (un)lock operations will be done by making
* use of the LockUtils' {@link retryFunctionUntil} function.
*
* * @see [Redis Commands documentation](https://redis.io/commands/)
* * @see [Redis Lua scripting documentation](https://redis.io/docs/manual/programmability/)
* * @see [ioredis Lua scripting API](https://github.com/luin/ioredis#lua-scripting)
@ -90,44 +86,6 @@ export class RedisLocker implements ReadWriteLocker, ResourceLocker, Finalizable
Please provide a port number like '6379' or a host address and a port number like '127.0.0.1:6379'`);
}
/**
* Try a Redis function according to the set {@link AttemptSettings}
* Since the locking strategy is custom-built on Redis and Redis itself does not have a lock concept,
* this function allows us to wait until we acquired a lock.
*
* The AttemptSettings will dictate how many times we should retry the Redis functions
* before giving up and throwing an error.
*
* @param fn - The function to try
*
* @returns Promise that resolves if operation succeeded. Rejects with error otherwise
*
* @see To convert from Redis operation to Promise<boolean> use {@link fromResp2ToBool} to wrap the function
*/
private async tryRedisFn(fn: () => Promise<boolean>): Promise<void> {
const settings = this.attemptSettings;
const maxTries = settings.retryCount === -1 ? Number.POSITIVE_INFINITY : settings.retryCount + 1;
function calcTime(): number {
return Math.max(0, settings.retryDelay + Math.floor(Math.random() * settings.retryJitter));
}
let tries = 1;
let acquired = await fn();
// Keep going until either you get a lock/release or maxTries has been reached.
while (!acquired && (tries <= maxTries)) {
await new Promise<void>((resolve): any => setTimeout(resolve, calcTime()));
acquired = await fn();
tries += 1;
}
// Max tries was reached
if (tries > maxTries) {
const err = `The operation did not succeed after the set maximum of tries (${maxTries}).`;
this.logger.warn(err);
throw new InternalServerError(err);
}
}
/**
* Create a scoped Redis key for Read-Write locking.
* @param identifier - The identifier object to create a Redis key for
@ -148,23 +106,51 @@ export class RedisLocker implements ReadWriteLocker, ResourceLocker, Finalizable
/* ReadWriteLocker methods */
/**
* Wrapper function for all (un)lock operations. If the `fn()` resolves to false (after applying
* {@link fromResp2ToBool}, the result will be swallowed. When `fn()` resolves to true, this wrapper
* will return true. Any error coming from `fn()` will be thrown.
* @param fn - The function reference to swallow false from.
*/
private swallowFalse(fn: () => Promise<RedisAnswer>): () => Promise<unknown> {
return async(): Promise<unknown> => {
const result = await fromResp2ToBool(fn());
// Swallow any result resolving to `false`
if (result) {
return true;
}
};
}
public async withReadLock<T>(identifier: ResourceIdentifier, whileLocked: () => (Promise<T> | T)): Promise<T> {
const key = this.getReadWriteKey(identifier);
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisRw.acquireReadLock(key)));
await retryFunction(
this.swallowFalse(this.redisRw.acquireReadLock.bind(this.redisRw, key)),
this.attemptSettings,
);
try {
return await whileLocked();
} finally {
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisRw.releaseReadLock(key)));
await retryFunction(
this.swallowFalse(this.redisRw.releaseReadLock.bind(this.redisRw, key)),
this.attemptSettings,
);
}
}
public async withWriteLock<T>(identifier: ResourceIdentifier, whileLocked: () => (Promise<T> | T)): Promise<T> {
const key = this.getReadWriteKey(identifier);
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisRw.acquireWriteLock(key)));
await retryFunction(
this.swallowFalse(this.redisRw.acquireWriteLock.bind(this.redisRw, key)),
this.attemptSettings,
);
try {
return await whileLocked();
} finally {
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisRw.releaseWriteLock(key)));
await retryFunction(
this.swallowFalse(this.redisRw.releaseWriteLock.bind(this.redisRw, key)),
this.attemptSettings,
);
}
}
@ -172,12 +158,18 @@ export class RedisLocker implements ReadWriteLocker, ResourceLocker, Finalizable
public async acquire(identifier: ResourceIdentifier): Promise<void> {
const key = this.getResourceKey(identifier);
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisLock.acquireLock(key)));
await retryFunction(
this.swallowFalse(this.redisLock.acquireLock.bind(this.redisLock, key)),
this.attemptSettings,
);
}
public async release(identifier: ResourceIdentifier): Promise<void> {
const key = this.getResourceKey(identifier);
await this.tryRedisFn((): Promise<boolean> => fromResp2ToBool(this.redisLock.releaseLock(key)));
await retryFunction(
this.swallowFalse(this.redisLock.releaseLock.bind(this.redisLock, key)),
this.attemptSettings,
);
}
/* Finalizer methods */
@ -201,4 +193,3 @@ export class RedisLocker implements ReadWriteLocker, ResourceLocker, Finalizable
}
}
}

View File

@ -106,19 +106,19 @@ export async function fromResp2ToBool(result: Promise<RedisAnswer>): Promise<boo
export interface RedisReadWriteLock extends Redis {
/**
* Try to acquire a readLock on `resourceIdentifierPath`.
* Will succeed if there are no write locks.
*
* @returns 1 if succeeded. 0 if not possible.
*/
* Try to acquire a readLock on `resourceIdentifierPath`.
* Will succeed if there are no write locks.
*
* @returns 1 if succeeded. 0 if not possible.
*/
acquireReadLock: (resourceIdentifierPath: string, callback?: Callback<string>) => Promise<RedisAnswer>;
/**
* Try to acquire a writeLock on `resourceIdentifierPath`.
* Only works if no other write lock is present and the read counter is 0.
*
* @returns 'OK' if succeeded, 0 if not possible.
*/
* Try to acquire a writeLock on `resourceIdentifierPath`.
* Only works if no other write lock is present and the read counter is 0.
*
* @returns 'OK' if succeeded, 0 if not possible.
*/
acquireWriteLock: (resourceIdentifierPath: string, callback?: Callback<string>) => Promise<RedisAnswer>;
/**
@ -136,11 +136,11 @@ export interface RedisReadWriteLock extends Redis {
export interface RedisResourceLock extends Redis {
/**
* Try to acquire a lock on `resourceIdentifierPath`.
* Only works if no other lock is present.
*
* @returns 'OK' if succeeded, 0 if not possible.
*/
* Try to acquire a lock on `resourceIdentifierPath`.
* Only works if no other lock is present.
*
* @returns 'OK' if succeeded, 0 if not possible.
*/
acquireLock: (resourceIdentifierPath: string, callback?: Callback<string>) => Promise<RedisAnswer>;
/**

View File

@ -11,8 +11,8 @@ import { InternalServerError } from '../../src/util/errors/InternalServerError';
import { SingleRootIdentifierStrategy } from '../../src/util/identifiers/SingleRootIdentifierStrategy';
import { EqualReadWriteLocker } from '../../src/util/locking/EqualReadWriteLocker';
import type { ExpiringReadWriteLocker } from '../../src/util/locking/ExpiringReadWriteLocker';
import { MemoryResourceLocker } from '../../src/util/locking/MemoryResourceLocker';
import type { ReadWriteLocker } from '../../src/util/locking/ReadWriteLocker';
import { SingleThreadedResourceLocker } from '../../src/util/locking/SingleThreadedResourceLocker';
import { WrappedExpiringReadWriteLocker } from '../../src/util/locking/WrappedExpiringReadWriteLocker';
import { guardedStreamFrom } from '../../src/util/StreamUtil';
import { PIM, RDF } from '../../src/util/Vocabularies';
@ -48,7 +48,7 @@ describe('A LockingResourceStore', (): void => {
metadata.add(RDF.terms.type, PIM.terms.Storage);
await source.setRepresentation({ path: base }, new BasicRepresentation([], metadata));
locker = new EqualReadWriteLocker(new SingleThreadedResourceLocker());
locker = new EqualReadWriteLocker(new MemoryResourceLocker());
expiringLocker = new WrappedExpiringReadWriteLocker(locker, 1000);
store = new LockingResourceStore(source, expiringLocker, strategy);

View File

@ -399,4 +399,3 @@ describeIf('docker', 'A server with a RedisLocker', (): void => {
});
});
});

View File

@ -0,0 +1,31 @@
import { setJitterTimeout } from '../../../src/util/LockUtils';
jest.useFakeTimers();
describe('LockUtil', (): void => {
describe('#setJitterTimout', (): void => {
it('works without jitter.', async(): Promise<void> => {
let result = '';
const promise = setJitterTimeout(1000).then((): void => {
result += 'ok';
});
expect(result).toHaveLength(0);
jest.advanceTimersByTime(1000);
await expect(promise).resolves.toBeUndefined();
expect(result).toBe('ok');
});
it('works with jitter.', async(): Promise<void> => {
jest.spyOn(global.Math, 'random').mockReturnValue(1);
let elapsed = Date.now();
const promise = setJitterTimeout(1000, 100).then((): void => {
elapsed = Date.now() - elapsed;
});
jest.runAllTimers();
await expect(promise).resolves.toBeUndefined();
expect(elapsed).toBe(1100);
// Clean up
jest.spyOn(global.Math, 'random').mockRestore();
});
});
});

View File

@ -0,0 +1,122 @@
import { readdir } from 'fs-extra';
import { InternalServerError } from '../../../../src/util/errors/InternalServerError';
import { FileSystemResourceLocker } from '../../../../src/util/locking/FileSystemResourceLocker';
const lockFolder = './.internal/locks/';
describe('A FileSystemResourceLocker', (): void => {
let locker: FileSystemResourceLocker;
const identifier = { path: 'http://test.com/foo' };
beforeEach(async(): Promise<void> => {
locker = new FileSystemResourceLocker({ attemptSettings: { retryCount: 19, retryDelay: 100 }});
});
afterEach(async(): Promise<void> => {
try {
// Release to be sure
await locker.release(identifier);
} catch {
// Do nothing
}
});
afterAll(async(): Promise<void> => {
await locker.finalize();
});
it('can lock and unlock a resource.', async(): Promise<void> => {
await expect(locker.acquire(identifier)).resolves.toBeUndefined();
await expect(locker.release(identifier)).resolves.toBeUndefined();
});
it('can lock and unlock a resource with a locker with indefinite retry.', async(): Promise<void> => {
const locker2 = new FileSystemResourceLocker({ attemptSettings: { retryCount: -1 }});
await expect(locker2.acquire(identifier)).resolves.toBeUndefined();
await expect(locker2.release(identifier)).resolves.toBeUndefined();
await locker2.finalize();
});
it('can lock a resource again after it was unlocked.', async(): Promise<void> => {
await expect(locker.acquire(identifier)).resolves.toBeUndefined();
await expect(locker.release(identifier)).resolves.toBeUndefined();
await expect(locker.acquire(identifier)).resolves.toBeUndefined();
});
it('errors when unlocking a resource that was not locked.', async(): Promise<void> => {
await expect(locker.acquire(identifier)).resolves.toBeUndefined();
await expect(locker.release(identifier)).resolves.toBeUndefined();
await expect(locker.release(identifier)).rejects.toThrow(InternalServerError);
await expect(locker.release(identifier)).rejects.toThrow('Lock is not acquired/owned by you');
});
it('errors when max retries has been reached.', async(): Promise<void> => {
await locker.acquire(identifier);
await expect(locker.acquire(identifier)).rejects
.toThrow(
/Error trying to acquire lock for .*\. The operation did not succeed after the set maximum of tries \(\d+\)\./u,
);
await locker.release(identifier);
});
it('blocks lock acquisition until they are released.', async(): Promise<void> => {
const results: number[] = [];
const lock1 = locker.acquire(identifier);
const lock2 = locker.acquire(identifier);
const lock3 = locker.acquire(identifier);
// Note the different order of calls
const prom2 = lock2.then(async(): Promise<void> => {
results.push(2);
return locker.release(identifier);
});
const prom3 = lock3.then(async(): Promise<void> => {
results.push(3);
return locker.release(identifier);
});
const prom1 = lock1.then(async(): Promise<void> => {
results.push(1);
return locker.release(identifier);
});
await Promise.all([ prom2, prom3, prom1 ]);
expect(results[0]).toBe(1);
expect(results).toContain(2);
expect(results).toContain(3);
});
it('can acquire different keys simultaneously.', async(): Promise<void> => {
const results: number[] = [];
const lock1 = locker.acquire({ path: 'path1' });
const lock2 = locker.acquire({ path: 'path2' });
const lock3 = locker.acquire({ path: 'path3' });
await lock2.then(async(): Promise<void> => {
results.push(2);
return locker.release({ path: 'path2' });
});
await lock3.then(async(): Promise<void> => {
results.push(3);
return locker.release({ path: 'path3' });
});
await lock1.then(async(): Promise<void> => {
results.push(1);
return locker.release({ path: 'path1' });
});
expect(results).toEqual([ 2, 3, 1 ]);
});
it('throws an error when #tryFn() throws an error.', async(): Promise<void> => {
await locker.acquire(identifier);
await expect(locker.acquire(identifier)).rejects.toThrow(InternalServerError);
});
it('clears the files in de lock directory after calling finalize.', async(): Promise<void> => {
await locker.acquire(identifier);
await expect(readdir(lockFolder)).resolves.toHaveLength(1);
await locker.finalize();
await expect(readdir(lockFolder)).rejects.toThrow();
});
it('can create a locker with default AttemptSettings.', async(): Promise<void> => {
expect((): FileSystemResourceLocker => new FileSystemResourceLocker()).not.toThrow();
});
});

View File

@ -1,11 +1,29 @@
import type { Logger } from '../../../../src';
import { getLoggerFor } from '../../../../src';
import { InternalServerError } from '../../../../src/util/errors/InternalServerError';
import { SingleThreadedResourceLocker } from '../../../../src/util/locking/SingleThreadedResourceLocker';
import { MemoryResourceLocker } from '../../../../src/util/locking/MemoryResourceLocker';
describe('A SingleThreadedResourceLocker', (): void => {
let locker: SingleThreadedResourceLocker;
jest.mock('../../../../src/logging/LogUtil', (): any => {
const logger: Logger =
{ error: jest.fn(), debug: jest.fn(), warn: jest.fn(), info: jest.fn(), log: jest.fn() } as any;
return { getLoggerFor: (): Logger => logger };
});
const logger: jest.Mocked<Logger> = getLoggerFor('MemoryResourceLocker') as any;
jest.mock('cluster', (): any => ({
isWorker: true,
}));
describe('A MemoryResourceLocker', (): void => {
let locker: MemoryResourceLocker;
const identifier = { path: 'http://test.com/foo' };
beforeEach(async(): Promise<void> => {
locker = new SingleThreadedResourceLocker();
locker = new MemoryResourceLocker();
});
it('logs a warning when constructed on a worker process.', (): void => {
expect((): MemoryResourceLocker => new MemoryResourceLocker()).toBeDefined();
expect(logger.warn).toHaveBeenCalled();
});
it('can lock and unlock a resource.', async(): Promise<void> => {