feat: initial proposal for multithreaded execution

This commit is contained in:
Thomas Dupont
2022-05-13 11:27:31 +02:00
committed by Joachim Van Herwegen
parent 32245fc604
commit 236bbc6e5d
40 changed files with 880 additions and 97 deletions

View File

@@ -187,6 +187,11 @@ export * from './identity/storage/WebIdAdapterFactory';
export * from './identity/IdentityProviderHttpHandler';
export * from './identity/OidcHttpHandler';
// Init/Cluster
export * from './init/cluster/ClusterManager';
export * from './init/cluster/SingleThreaded';
export * from './init/cluster/WorkerManager';
// Init/Final
export * from './init/final/Finalizable';
export * from './init/final/ParallelFinalizer';
@@ -218,9 +223,9 @@ export * from './init/ConfigPodInitializer';
export * from './init/ContainerInitializer';
export * from './init/Initializer';
export * from './init/LoggerInitializer';
export * from './init/ModuleVersionVerifier';
export * from './init/SeededPodInitializer';
export * from './init/ServerInitializer';
export * from './init/ModuleVersionVerifier';
// Logging
export * from './logging/LazyLoggerFactory';
@@ -401,6 +406,7 @@ export * from './util/handlers/ConditionalHandler';
export * from './util/handlers/HandlerUtil';
export * from './util/handlers/MethodFilterHandler';
export * from './util/handlers/ParallelHandler';
export * from './util/handlers/ProcessHandler';
export * from './util/handlers/SequenceHandler';
export * from './util/handlers/StaticHandler';
export * from './util/handlers/StaticThrowHandler';

View File

@@ -1,3 +1,4 @@
import type { ClusterManager } from './cluster/ClusterManager';
import type { Finalizable } from './final/Finalizable';
import type { Initializer } from './Initializer';
@@ -7,10 +8,12 @@ import type { Initializer } from './Initializer';
export class App {
private readonly initializer: Initializer;
private readonly finalizer: Finalizable;
public readonly clusterManager: ClusterManager;
public constructor(initializer: Initializer, finalizer: Finalizable) {
public constructor(initializer: Initializer, finalizer: Finalizable, clusterManager: ClusterManager) {
this.initializer = initializer;
this.finalizer = finalizer;
this.clusterManager = clusterManager;
}
/**

View File

@@ -6,9 +6,11 @@ import yargs from 'yargs';
import { LOG_LEVELS } from '../logging/LogLevel';
import { getLoggerFor } from '../logging/LogUtil';
import { createErrorMessage, isError } from '../util/errors/ErrorUtil';
import { InternalServerError } from '../util/errors/InternalServerError';
import { resolveModulePath, resolveAssetPath } from '../util/PathUtil';
import type { App } from './App';
import type { CliResolver } from './CliResolver';
import { listSingleThreadedComponents } from './cluster/SingleThreaded';
import type { CliArgv, VariableBindings } from './variables/Types';
const DEFAULT_CONFIG = resolveModulePath('config/default.json');
@@ -65,7 +67,7 @@ export class AppRunner {
const componentsManager = await this.createComponentsManager<App>(loaderProperties, configFile);
// Create the application using the translated variable values
return componentsManager.instantiate(DEFAULT_APP, { variables: variableBindings });
return await this.createApp(componentsManager, variableBindings);
}
/**
@@ -177,12 +179,26 @@ export class AppRunner {
* where the App is created and started using the variable mappings.
*/
private async createApp(componentsManager: ComponentsManager<App>, variables: Record<string, unknown>): Promise<App> {
let app: App;
// Create the app
try {
// Create the app
return await componentsManager.instantiate(DEFAULT_APP, { variables });
app = await componentsManager.instantiate(DEFAULT_APP, { variables });
} catch (error: unknown) {
this.resolveError(`Could not create the server`, error);
}
// Ensure thread safety
if (!app.clusterManager.isSingleThreaded()) {
const violatingClasses = await listSingleThreadedComponents(componentsManager);
if (violatingClasses.length > 0) {
const verb = violatingClasses.length > 1 ? 'are' : 'is';
const detailedError = new InternalServerError(
`[${violatingClasses.join(', ')}] ${verb} not threadsafe and should not be run in multithreaded setups!`,
);
this.resolveError('Cannot run a singlethreaded-only component in a multithreaded setup!', detailedError);
}
}
return app;
}
/**

View File

@@ -1,10 +1,7 @@
import { readJson } from 'fs-extra';
import type { KeyValueStorage } from '../storage/keyvalue/KeyValueStorage';
import { resolveModulePath } from '../util/PathUtil';
import { readPackageJson } from '../util/PathUtil';
import { Initializer } from './Initializer';
const PACKAGE_JSON_PATH = resolveModulePath('package.json');
/**
* This initializer simply writes the version number of the server to the storage.
* This will be relevant in the future when we look into migration initializers.
@@ -22,7 +19,7 @@ export class ModuleVersionVerifier extends Initializer {
}
public async handle(): Promise<void> {
const pkg = await readJson(PACKAGE_JSON_PATH);
const pkg = await readPackageJson();
await this.storage.set(this.storageKey, pkg.version);
}
}

View File

@@ -0,0 +1,120 @@
import type { Worker } from 'cluster';
import cluster from 'cluster';
import { cpus } from 'os';
import { getLoggerFor } from '../../logging/LogUtil';
import { InternalServerError } from '../../util/errors/InternalServerError';
/**
* Different cluster modes.
*/
enum ClusterMode {
/** Scales in relation to `core_count`. */
autoScale,
/** Single threaded mode, no clustering */
singleThreaded,
/** Fixed amount of workers being forked. (limited to core_count) */
fixed
}
/**
* Convert workers amount to {@link ClusterMode}
* @param workers - Amount of workers
* @returns ClusterMode enum value
*/
function toClusterMode(workers: number): ClusterMode {
if (workers <= 0) {
return ClusterMode.autoScale;
}
if (workers === 1) {
return ClusterMode.singleThreaded;
}
return ClusterMode.fixed;
}
/**
* This class is responsible for deciding how many affective workers are needed.
* It also contains the logic for respawning workers when they are killed by the os.
*
* The workers values are interpreted as follows:
* value | actual workers |
* ------|--------------|
* `-m` | `num_cores - m` workers _(autoscale)_ (`m < num_cores`) |
* `-1` | `num_cores - 1` workers _(autoscale)_ |
* `0` | `num_cores` workers _(autoscale)_ |
* `1` | `single threaded mode` _(default)_ |
* `n` | `n` workers |
*/
export class ClusterManager {
private readonly logger = getLoggerFor(this);
private readonly workers: number;
private readonly clusterMode: ClusterMode;
public constructor(workers: number | string) {
const cores = cpus().length;
// Workaround for https://github.com/CommunitySolidServer/CommunitySolidServer/issues/1182
if (typeof workers === 'string') {
workers = Number.parseInt(workers, 10);
}
if (workers <= -cores) {
throw new InternalServerError('Invalid workers value (should be in the interval ]-num_cores, +∞).');
}
this.workers = toClusterMode(workers) === ClusterMode.autoScale ? cores + workers : workers;
this.clusterMode = toClusterMode(this.workers);
}
/**
* Spawn all required workers.
*/
public spawnWorkers(): void {
let counter = 0;
this.logger.info(`Setting up ${this.workers} workers`);
for (let i = 0; i < this.workers; i++) {
cluster.fork().on('message', (msg: string): void => {
this.logger.info(msg);
});
}
cluster.on('online', (worker: Worker): void => {
this.logger.info(`Worker ${worker.process.pid} is listening`);
counter += 1;
if (counter === this.workers) {
this.logger.info(`All ${this.workers} requested workers have been started.`);
}
});
cluster.on('exit', (worker: Worker, code: number, signal: string): void => {
this.logger.warn(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
this.logger.warn('Starting a new worker');
cluster.fork().on('message', (msg: string): void => {
this.logger.info(msg);
});
});
}
/**
* Check whether the CSS server was booted in single threaded mode.
* @returns True is single threaded.
*/
public isSingleThreaded(): boolean {
return this.clusterMode === ClusterMode.singleThreaded;
}
/**
* Whether the calling process is the primary process.
* @returns True if primary
*/
public isPrimary(): boolean {
return cluster.isMaster;
}
/**
* Whether the calling process is a worker process.
* @returns True if worker
*/
public isWorker(): boolean {
return cluster.isWorker;
}
}

View File

@@ -0,0 +1,57 @@
import type { ComponentsManager } from 'componentsjs';
import { PrefetchedDocumentLoader } from 'componentsjs';
import { ContextParser } from 'jsonld-context-parser';
import { InternalServerError } from '../../util/errors/InternalServerError';
import { readPackageJson } from '../../util/PathUtil';
/**
* Indicates a class is only meant to work in singlethreaded setups and is thus not threadsafe.
*/
export interface SingleThreaded {}
/**
* Convert an exported interface name to the properly expected Components.js type URI.
* @param componentsManager - The currently used ComponentsManager
* @param interfaceName - An interface name
* @returns A Components.js type URI
*/
export async function toComponentsJsType<T>(componentsManager: ComponentsManager<T>, interfaceName: string):
Promise<string> {
const pkg = await readPackageJson();
const contextParser = new ContextParser({
documentLoader: new PrefetchedDocumentLoader({ contexts: componentsManager.moduleState.contexts }),
skipValidation: true,
});
// The keys of the package.json `lsd:contexts` array contains all the IRIs of the relevant contexts;
const lsdContexts = Object.keys(pkg['lsd:contexts']);
// Feed the lsd:context IRIs to the ContextParser
const cssContext = await contextParser.parse(lsdContexts);
// We can now expand a simple interface name, to its full Components.js type identifier.
const interfaceIRI = cssContext.expandTerm(interfaceName, true);
if (!interfaceIRI) {
throw new InternalServerError(`Could not expand ${interfaceName} to IRI!`);
}
return interfaceIRI;
}
/**
* Will list class names of components instantiated implementing the {@link SingleThreaded}
* interface while the application is being run in multithreaded mode.
* @param componentsManager - The componentsManager being used to set up the application
*/
export async function listSingleThreadedComponents<T>(componentsManager: ComponentsManager<T>): Promise<string[]> {
const interfaceType = await toComponentsJsType(componentsManager, 'SingleThreaded');
const violatingClasses: string[] = [];
// Loop through all instantiated Resources
for (const resource of componentsManager.getInstantiatedResources()) {
// If implementing interfaceType, while not being the interfaceType itself.
if (resource?.isA(interfaceType) && resource.value !== interfaceType) {
// Part after the # in an IRI is the actual class name
const name = resource.property?.type?.value?.split('#')?.[1];
violatingClasses.push(name);
}
}
return violatingClasses;
}

View File

@@ -0,0 +1,20 @@
import { Initializer } from '../Initializer';
import type { ClusterManager } from './ClusterManager';
/**
* Spawns the necessary workers when starting in multithreaded mode.
*/
export class WorkerManager extends Initializer {
private readonly clusterManager: ClusterManager;
public constructor(clusterManager: ClusterManager) {
super();
this.clusterManager = clusterManager;
}
public async handle(): Promise<void> {
if (!this.clusterManager.isSingleThreaded()) {
this.clusterManager.spawnWorkers();
}
}
}

View File

@@ -1,5 +1,14 @@
import cluster from 'cluster';
import process from 'process';
import type { LogLevel } from './LogLevel';
export interface LogMetadata {
/** Is the current process the Primary process */
isPrimary: boolean;
/** The process id of the current process */
pid: number;
}
/**
* Logs messages on a specific level.
*
@@ -13,7 +22,7 @@ export interface SimpleLogger {
* @param message - The message to log.
* @param meta - Optional metadata to include in the log message.
*/
log: (level: LogLevel, message: string) => SimpleLogger;
log: (level: LogLevel, message: string, meta?: LogMetadata) => SimpleLogger;
}
/**
@@ -79,30 +88,35 @@ export interface Logger extends SimpleLogger {
* leaving only the implementation of {@link SimpleLogger}.
*/
export abstract class BaseLogger implements Logger {
public abstract log(level: LogLevel, message: string): Logger;
public abstract log(level: LogLevel, message: string, meta?: LogMetadata): Logger;
private readonly getMeta = (): LogMetadata => ({
pid: process.pid,
isPrimary: cluster.isMaster,
});
public error(message: string): Logger {
return this.log('error', message);
return this.log('error', message, this.getMeta());
}
public warn(message: string): Logger {
return this.log('warn', message);
return this.log('warn', message, this.getMeta());
}
public info(message: string): Logger {
return this.log('info', message);
return this.log('info', message, this.getMeta());
}
public verbose(message: string): Logger {
return this.log('verbose', message);
return this.log('verbose', message, this.getMeta());
}
public debug(message: string): Logger {
return this.log('debug', message);
return this.log('debug', message, this.getMeta());
}
public silly(message: string): Logger {
return this.log('silly', message);
return this.log('silly', message, this.getMeta());
}
}
@@ -118,8 +132,8 @@ export class WrappingLogger extends BaseLogger {
this.logger = logger;
}
public log(level: LogLevel, message: string): this {
this.logger.log(level, message);
public log(level: LogLevel, message: string, meta?: LogMetadata): this {
this.logger.log(level, message, meta);
return this;
}
}

View File

@@ -1,6 +1,6 @@
import { createLogger, format, transports } from 'winston';
import type * as Transport from 'winston-transport';
import type { Logger } from './Logger';
import type { Logger, LogMetadata } from './Logger';
import type { LoggerFactory } from './LoggerFactory';
import { WinstonLogger } from './WinstonLogger';
@@ -17,6 +17,13 @@ export class WinstonLoggerFactory implements LoggerFactory {
this.level = level;
}
private readonly clusterInfo = (meta: LogMetadata): string => {
if (meta.isPrimary) {
return 'Primary';
}
return `W-${meta.pid ?? '???'}`;
};
public createLogger(label: string): Logger {
return new WinstonLogger(createLogger({
level: this.level,
@@ -24,8 +31,10 @@ export class WinstonLoggerFactory implements LoggerFactory {
format.label({ label }),
format.colorize(),
format.timestamp(),
format.printf(({ level: levelInner, message, label: labelInner, timestamp }: Record<string, any>): string =>
`${timestamp} [${labelInner}] ${levelInner}: ${message}`),
format.metadata({ fillExcept: [ 'level', 'timestamp', 'label', 'message' ]}),
format.printf(({ level: levelInner, message, label: labelInner, timestamp, metadata: meta }:
Record<string, any>): string =>
`${timestamp} [${labelInner}] {${this.clusterInfo(meta)}} ${levelInner}: ${message}`),
),
transports: this.createTransports(),
}));

View File

@@ -55,7 +55,7 @@ export class BaseHttpServerFactory implements HttpServerFactory {
public startServer(port: number): Server {
const protocol = this.options.https ? 'https' : 'http';
const url = new URL(`${protocol}://localhost:${port}/`).href;
this.logger.info(`Starting server at ${url}`);
this.logger.info(`Listening to server at ${url}`);
const createServer = this.options.https ? createHttpsServer : createHttpServer;
const options = this.createServerOptions();

View File

@@ -2,6 +2,7 @@ import type { Readable } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { RepresentationMetadata } from '../../http/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { SingleThreaded } from '../../init/cluster/SingleThreaded';
import { InternalServerError } from '../../util/errors/InternalServerError';
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import type { Guarded } from '../../util/GuardedStream';
@@ -19,7 +20,7 @@ interface ContainerEntry {
}
type CacheEntry = DataEntry | ContainerEntry;
export class InMemoryDataAccessor implements DataAccessor {
export class InMemoryDataAccessor implements DataAccessor, SingleThreaded {
private readonly identifierStrategy: IdentifierStrategy;
// A dummy container where every entry corresponds to a root container
private readonly store: { entries: Record<string, ContainerEntry> };

View File

@@ -1,4 +1,5 @@
import { posix, win32 } from 'path';
import { readJson } from 'fs-extra';
import urljoin from 'url-join';
import type { TargetExtractor } from '../http/input/identifier/TargetExtractor';
import type { ResourceIdentifier } from '../http/representation/ResourceIdentifier';
@@ -270,6 +271,13 @@ export function resolveAssetPath(path = modulePathPlaceholder): string {
return absoluteFilePath(path);
}
/**
* Reads the project package.json and returns it.
*/
export async function readPackageJson(): Promise<Record<string, any>> {
return readJson(resolveModulePath('package.json'));
}
/**
* Concatenates all the given strings into a normalized URL.
* Will place slashes between input strings if necessary.

View File

@@ -0,0 +1,46 @@
import type { ClusterManager } from '../../init/cluster/ClusterManager';
import { NotImplementedHttpError } from '../errors/NotImplementedHttpError';
import { AsyncHandler } from './AsyncHandler';
/**
* A wrapper handler that will only run the wrapped handler if it is executed from:
* * when running multithreaded: either the **primary** or a **worker process**
* * when running singlethreaded: **the only process** (i.e. always)
*/
export class ProcessHandler<TIn, TOut> extends AsyncHandler<TIn, TOut> {
private readonly clusterManager: ClusterManager;
private readonly source: AsyncHandler<TIn, TOut>;
private readonly executeOnPrimary: boolean;
/**
* Creates a new ProcessHandler
* @param source - The wrapped handler
* @param clusterManager - The ClusterManager in use
* @param executeOnPrimary - Whether to execute the source handler when the process is the _primary_ or a _worker_.
*/
public constructor(source: AsyncHandler<TIn, TOut>, clusterManager: ClusterManager, executeOnPrimary: boolean) {
super();
this.source = source;
this.clusterManager = clusterManager;
this.executeOnPrimary = executeOnPrimary;
}
public async canHandle(input: TIn): Promise<void> {
if (!this.canExecute()) {
throw new NotImplementedHttpError(`Will not execute on ${this.executeOnPrimary ? 'worker' : 'primary'} process.`);
}
await this.source.canHandle(input);
}
public async handle(input: TIn): Promise<TOut> {
return this.source.handle(input);
}
/**
* Checks if the condition has already been fulfilled.
*/
private canExecute(): boolean {
return this.clusterManager.isSingleThreaded() ||
(this.executeOnPrimary ? this.clusterManager.isPrimary() : this.clusterManager.isWorker());
}
}

View File

@@ -1,6 +1,6 @@
import cluster from 'cluster';
import AsyncLock from 'async-lock';
import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier';
import type { SingleThreaded } from '../../init/cluster/SingleThreaded';
import { getLoggerFor } from '../../logging/LogUtil';
import { InternalServerError } from '../errors/InternalServerError';
import type { ResourceLocker } from './ResourceLocker';
@@ -11,7 +11,7 @@ import type { ResourceLocker } from './ResourceLocker';
* in a memory leak if locks are never unlocked, so make sure this is covered with expiring locks for example,
* and/or proper `finally` handles.
*/
export class MemoryResourceLocker implements ResourceLocker {
export class MemoryResourceLocker implements ResourceLocker, SingleThreaded {
protected readonly logger = getLoggerFor(this);
private readonly locker: AsyncLock;
@@ -20,10 +20,6 @@ export class MemoryResourceLocker implements ResourceLocker {
public constructor() {
this.locker = new AsyncLock();
this.unlockCallbacks = {};
if (cluster.isWorker) {
this.logger.warn(`MemoryResourceLocker is not thread-safe/process-safe!
You should only use this locker in a single-thread/single-process CSS setup.`);
}
}
public async acquire(identifier: ResourceIdentifier): Promise<void> {