feat: introducing EventBus

This commit is contained in:
Wannes Kerckhove 2022-08-18 10:05:31 +02:00
parent cc2c284aa0
commit 09eb2ba2e0
3 changed files with 143 additions and 3 deletions

View File

@ -2,12 +2,25 @@ import type { ResourceIdentifier } from '../http/representation/ResourceIdentifi
import { getLoggerFor } from '../logging/LogUtil';
import type { KeyValueStorage } from '../storage/keyvalue/KeyValueStorage';
import type { ResourceStore } from '../storage/ResourceStore';
import type { EventBus, EventConsumer } from '../util/messaging/EventBus';
import { addGeneratedResources } from './generate/GenerateUtil';
import type { PodGenerator } from './generate/PodGenerator';
import type { ResourcesGenerator } from './generate/ResourcesGenerator';
import type { PodManager } from './PodManager';
import type { PodSettings } from './settings/PodSettings';
const configPodChangeTopic = 'signaling.pods.configManager';
enum ConfigPodChangeType {
create
}
export interface ConfigPodChange {
type: ConfigPodChangeType;
identifier: ResourceIdentifier;
settings: PodSettings;
}
/**
* Pod manager that creates a store for the pod with a {@link PodGenerator}
* and fills it with resources from a {@link ResourcesGenerator}.
@ -19,25 +32,30 @@ import type { PodSettings } from './settings/PodSettings';
*
* @see {@link TemplatedPodGenerator}, {@link ConfigPodInitializer}, {@link BaseUrlRouterRule}
*/
export class ConfigPodManager implements PodManager {
export class ConfigPodManager implements PodManager, EventConsumer<ConfigPodChange> {
protected readonly logger = getLoggerFor(this);
private readonly podGenerator: PodGenerator;
private readonly routingStorage: KeyValueStorage<string, ResourceStore>;
private readonly resourcesGenerator: ResourcesGenerator;
private readonly store: ResourceStore;
private readonly eventBus: EventBus<ConfigPodChange> | undefined;
/**
* @param podGenerator - Generator for the pod stores.
* @param resourcesGenerator - Generator for the pod resources.
* @param routingStorage - Where to store the generated pods so they can be routed to.
* @param routingStorage - Where to store the generated pods, so they can be routed to.
* @param store - The default ResourceStore
* @param eventBus - (optional) The event bus used for signaling config pod changes,
* only required in a multithreaded setup.
*/
public constructor(podGenerator: PodGenerator, resourcesGenerator: ResourcesGenerator,
routingStorage: KeyValueStorage<string, ResourceStore>, store: ResourceStore) {
routingStorage: KeyValueStorage<string, ResourceStore>, store: ResourceStore,
eventBus?: EventBus<ConfigPodChange>) {
this.podGenerator = podGenerator;
this.routingStorage = routingStorage;
this.resourcesGenerator = resourcesGenerator;
this.store = store;
this.eventBus = eventBus;
}
public async createPod(identifier: ResourceIdentifier, settings: PodSettings): Promise<void> {
@ -50,5 +68,24 @@ export class ConfigPodManager implements PodManager {
const count = await addGeneratedResources(identifier, settings, this.resourcesGenerator, this.store);
this.logger.info(`Added ${count} resources to ${identifier.path}`);
// If an eventBus was provided, subscribe to config pod changes
await this.eventBus?.subscribe(configPodChangeTopic, this);
// If an eventBus was provided, notify potential listeners of this config pod change.
await this.eventBus?.publish(configPodChangeTopic, {
type: ConfigPodChangeType.create,
identifier,
settings,
});
}
// Handler for on config pod changes
public async onEvent(event: ConfigPodChange): Promise<void> {
// When receiving a pod create event, check if a pod with the specified identifier exits in the routingStorage.
if (event.type === ConfigPodChangeType.create && !await this.routingStorage.has(event.identifier.path)) {
// If not, create the pod on this instance.
await this.createPod(event.identifier, event.settings);
}
}
}

View File

@ -0,0 +1,80 @@
import type { Serializable } from 'child_process';
import cluster, { worker } from 'cluster';
import type { ClusterManager } from '../../init/cluster/ClusterManager';
import type { Initializable } from '../../init/Initializable';
import { getLoggerFor } from '../../logging/LogUtil';
import type { EventBus, EventConsumer, EventSubscription } from './EventBus';
interface ClusterMessage<T extends Serializable> {
workerSourceId?: number;
address: string;
body: T;
}
export class ClusterEventBus<T extends Serializable> implements EventBus<T>, Initializable {
private readonly logger = getLoggerFor(this);
private readonly clusterManager: ClusterManager;
private readonly subscriptions: Set<EventSubscription<T>> = new Set([]);
public constructor(clusterManager: ClusterManager) {
this.clusterManager = clusterManager;
}
public async publish(address: string, event: T): Promise<void> {
if (this.clusterManager.isSingleThreaded()) {
await this.distribute({ address, body: event });
} else {
worker.send({
workerSourceId: cluster.worker.id,
address,
body: event,
});
}
}
public async subscribe(address: string, consumer: EventConsumer<T>): Promise<EventSubscription<T>> {
const subscriptionRef = this.subscriptions;
const newSubscription = {
address,
consumer,
async unsubscribe(): Promise<void> {
subscriptionRef.delete(this);
},
} as EventSubscription<T>;
this.subscriptions.add(newSubscription);
return newSubscription;
}
public async initialize(): Promise<void> {
if (!this.clusterManager.isSingleThreaded()) {
if (cluster.isMaster) {
cluster.on('message', (msg: string): void => {
const message: ClusterMessage<T> = { ...JSON.parse(msg) };
for (const [ , clusterWorker ] of Object.entries(cluster.workers)) {
if (clusterWorker?.id !== message.workerSourceId) {
clusterWorker?.send(msg);
}
}
});
}
if (cluster.isWorker) {
worker.on('message', (msg: string): void => {
this.distribute({ ...JSON.parse(msg) })
.then()
.catch((error): void => {
this.logger.warn(`Unexpected error while distributing event: ${error.message}.`);
});
});
}
}
}
private async distribute(message: ClusterMessage<T>): Promise<void> {
for (const subscription of this.subscriptions) {
if (subscription.address === message.address) {
await subscription.consumer.onEvent(message.body);
}
}
}
}

View File

@ -0,0 +1,23 @@
import type { Serializable } from 'child_process';
// Could be replaced with an AsyncHandler<T extends Serializable>
export interface EventConsumer<T extends Serializable> {
onEvent: (event: T) => Promise<void>;
}
export interface EventSubscription<T extends Serializable> {
address: string;
consumer: EventConsumer<T>;
unsubscribe: () => Promise<void>;
}
/**
* This interface defines a simple publish/subscribe component.
*/
export interface EventBus<T extends Serializable> {
publish: (address: string, event: T) => Promise<void>;
subscribe: (address: string, consumer: EventConsumer<T>) => Promise<EventSubscription<T>>;
}