feat: Split up server creation and request handling

This allows us to decouple the WebSocket listening from the HTTP configs,
making these features completely orthogonal.
This commit is contained in:
Joachim Van Herwegen
2022-09-29 16:34:38 +02:00
parent 764ce3cc28
commit 4223dcf8a4
64 changed files with 949 additions and 694 deletions

View File

@@ -1,21 +1,27 @@
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';
import type { TLSSocket } from 'tls';
import type { WebSocket } from 'ws';
import type { SingleThreaded } from '../init/cluster/SingleThreaded';
import { getLoggerFor } from '../logging/LogUtil';
import type { HttpRequest } from '../server/HttpRequest';
import { WebSocketHandler } from '../server/WebSocketHandler';
import type { ActivityEmitter } from '../server/notifications/ActivityEmitter';
import { WebSocketServerConfigurator } from '../server/WebSocketServerConfigurator';
import { createErrorMessage } from '../util/errors/ErrorUtil';
import type { GenericEventEmitter } from '../util/GenericEventEmitter';
import { createGenericEventEmitterClass } from '../util/GenericEventEmitter';
import { parseForwarded } from '../util/HeaderUtil';
import { splitCommaSeparated } from '../util/StringUtil';
import type { ResourceIdentifier } from './representation/ResourceIdentifier';
const VERSION = 'solid-0.1';
// eslint-disable-next-line @typescript-eslint/naming-convention
const WebSocketListenerEmitter = createGenericEventEmitterClass<GenericEventEmitter<'closed', () => void>>();
/**
* Implementation of Solid WebSockets API Spec solid-0.1
* at https://github.com/solid/solid-spec/blob/master/api-websockets.md
*/
class WebSocketListener extends EventEmitter {
class WebSocketListener extends WebSocketListenerEmitter {
private host = '';
private protocol = '';
private readonly socket: WebSocket;
@@ -30,7 +36,7 @@ class WebSocketListener extends EventEmitter {
socket.addListener('message', (message: string): void => this.onMessage(message));
}
public start({ headers, socket }: HttpRequest): void {
public start({ headers, socket }: IncomingMessage): void {
// Greet the client
this.sendMessage('protocol', VERSION);
@@ -105,7 +111,7 @@ class WebSocketListener extends EventEmitter {
this.logger.debug(`WebSocket subscribed to changes on ${url}`);
} catch (error: unknown) {
// Report errors to the socket
const errorText: string = (error as any).message;
const errorText: string = createErrorMessage(error);
this.sendMessage('error', errorText);
this.logger.warn(`WebSocket could not subscribe to ${path}: ${errorText}`);
}
@@ -120,11 +126,11 @@ class WebSocketListener extends EventEmitter {
* Provides live update functionality following
* the Solid WebSockets API Spec solid-0.1
*/
export class UnsecureWebSocketsProtocol extends WebSocketHandler implements SingleThreaded {
private readonly logger = getLoggerFor(this);
export class UnsecureWebSocketsProtocol extends WebSocketServerConfigurator implements SingleThreaded {
protected readonly logger = getLoggerFor(this);
private readonly listeners = new Set<WebSocketListener>();
public constructor(source: EventEmitter) {
public constructor(source: ActivityEmitter) {
super();
this.logger.warn('The chosen configuration includes Solid WebSockets API 0.1, which is unauthenticated.');
@@ -133,8 +139,8 @@ export class UnsecureWebSocketsProtocol extends WebSocketHandler implements Sing
source.on('changed', (changed: ResourceIdentifier): void => this.onResourceChanged(changed));
}
public async handle(input: { webSocket: WebSocket; upgradeRequest: HttpRequest }): Promise<void> {
const listener = new WebSocketListener(input.webSocket);
protected async handleConnection(webSocket: WebSocket, upgradeRequest: IncomingMessage): Promise<void> {
const listener = new WebSocketListener(webSocket);
this.listeners.add(listener);
this.logger.info(`New WebSocket added, ${this.listeners.size} in total`);
@@ -142,7 +148,7 @@ export class UnsecureWebSocketsProtocol extends WebSocketHandler implements Sing
this.listeners.delete(listener);
this.logger.info(`WebSocket closed, ${this.listeners.size} remaining`);
});
listener.start(input.upgradeRequest);
listener.start(upgradeRequest);
}
private onResourceChanged(changed: ResourceIdentifier): void {