feat: Support both the old and new WebSocket specifications together

This commit is contained in:
Joachim Van Herwegen 2023-04-26 10:17:00 +02:00
parent 69af7c4e16
commit 4b7621f9e0
14 changed files with 165 additions and 80 deletions

View File

@ -27,6 +27,7 @@ Determines how notifications should be sent out from the server when resources c
* *legacy-websocket*: Follows the legacy Solid WebSocket
[specification](https://github.com/solid/solid-spec/blob/master/api-websockets.md).
Will be removed in future versions.
* *new-old-websockets.json*: Support for both the legacy Solid Websockets and the new WebSocketChannel2023.
* *webhooks*: Follows the WebHookChannel2023
[specification](https://solid.github.io/notifications/webhook-channel-2023) draft.
* *websockets*: Follows the WebSocketChannel2023

View File

@ -7,13 +7,14 @@
"@type": "UnsupportedAsyncHandler"
},
{
"@id": "urn:solid-server:default:ServerConfigurator",
"@type": "ParallelHandler",
"@id": "urn:solid-server:default:WebSocketHandler",
"@type": "WaterfallHandler",
"handlers": [
{
"comment": "Catches the server upgrade events and handles the WebSocket connections.",
"@type": "UnsecureWebSocketsProtocol",
"source": { "@id": "urn:solid-server:default:ResourceStore" }
"source": { "@id": "urn:solid-server:default:ResourceStore" },
"baseUrl": { "@id": "urn:solid-server:default:variable:baseUrl" }
}
]
},

View File

@ -0,0 +1,38 @@
{
"@context": "https://linkedsoftwaredependencies.org/bundles/npm/@solid/community-server/^6.0.0/components/context.jsonld",
"import": [
"css:config/http/notifications/base/description.json",
"css:config/http/notifications/base/handler.json",
"css:config/http/notifications/base/http.json",
"css:config/http/notifications/base/listener.json",
"css:config/http/notifications/base/storage.json",
"css:config/http/notifications/websockets/handler.json",
"css:config/http/notifications/websockets/http.json",
"css:config/http/notifications/websockets/subscription.json"
],
"@graph": [
{
"@id": "urn:solid-server:default:WebSocketHandler",
"@type": "WaterfallHandler",
"handlers": [
{
"comment": "Catches the server upgrade events and handles the WebSocket connections.",
"@type": "UnsecureWebSocketsProtocol",
"source": { "@id": "urn:solid-server:default:ResourceStore" },
"baseUrl": { "@id": "urn:solid-server:default:variable:baseUrl" }
}
]
},
{
"@id": "urn:solid-server:default:ParallelMiddleware",
"@type": "ParallelHandler",
"handlers": [
{
"comment": "Advertises the websocket connection.",
"@type": "WebSocketAdvertiser",
"baseUrl": { "@id": "urn:solid-server:default:variable:baseUrl" }
}
]
}
]
}

View File

@ -36,8 +36,8 @@
},
{
"@id": "urn:solid-server:default:ServerConfigurator",
"@type": "ParallelHandler",
"@id": "urn:solid-server:default:WebSocketHandler",
"@type": "WaterfallHandler",
"handlers": [
{ "@id": "urn:solid-server:default:WebSocket2023Listener" }
]

View File

@ -11,6 +11,16 @@
"@type": "HandlerServerConfigurator",
"handler": { "@id": "urn:solid-server:default:HttpHandler" },
"showStackTrace": { "@id": "urn:solid-server:default:variable:showStackTrace" }
},
{
"comment": "Handles all WebSocket connections to the server.",
"@id": "urn:solid-server:default:WebSocketServerConfigurator",
"@type": "WebSocketServerConfigurator",
"handler": {
"@id": "urn:solid-server:default:WebSocketHandler",
"@type": "WaterfallHandler",
"handlers": []
}
}
]
}

View File

@ -4,8 +4,10 @@ import type { WebSocket } from 'ws';
import type { SingleThreaded } from '../init/cluster/SingleThreaded';
import { getLoggerFor } from '../logging/LogUtil';
import type { ActivityEmitter } from '../server/notifications/ActivityEmitter';
import { WebSocketServerConfigurator } from '../server/WebSocketServerConfigurator';
import type { WebSocketHandlerInput } from '../server/WebSocketHandler';
import { WebSocketHandler } from '../server/WebSocketHandler';
import { createErrorMessage } from '../util/errors/ErrorUtil';
import { NotImplementedHttpError } from '../util/errors/NotImplementedHttpError';
import type { GenericEventEmitter } from '../util/GenericEventEmitter';
import { createGenericEventEmitterClass } from '../util/GenericEventEmitter';
import { parseForwarded } from '../util/HeaderUtil';
@ -124,22 +126,34 @@ class WebSocketListener extends WebSocketListenerEmitter {
/**
* Provides live update functionality following
* the Solid WebSockets API Spec solid-0.1
* the Solid WebSockets API Spec solid-0.1.
*
* The `baseUrl` parameter should be the same one that is used to advertise with the Updates-Via header.
*/
export class UnsecureWebSocketsProtocol extends WebSocketServerConfigurator implements SingleThreaded {
export class UnsecureWebSocketsProtocol extends WebSocketHandler implements SingleThreaded {
protected readonly logger = getLoggerFor(this);
private readonly path: string;
private readonly listeners = new Set<WebSocketListener>();
public constructor(source: ActivityEmitter) {
public constructor(source: ActivityEmitter, baseUrl: string) {
super();
this.logger.warn('The chosen configuration includes Solid WebSockets API 0.1, which is unauthenticated.');
this.logger.warn('This component will be removed from default configurations in future versions.');
this.path = new URL(baseUrl).pathname;
source.on('changed', (changed: ResourceIdentifier): void => this.onResourceChanged(changed));
}
protected async handleConnection(webSocket: WebSocket, upgradeRequest: IncomingMessage): Promise<void> {
public async canHandle({ upgradeRequest }: WebSocketHandlerInput): Promise<void> {
if (upgradeRequest.url !== this.path) {
throw new NotImplementedHttpError(`Only WebSocket requests to ${this.path} are supported.`);
}
}
public async handle({ webSocket, upgradeRequest }: WebSocketHandlerInput): Promise<void> {
const listener = new WebSocketListener(webSocket);
this.listeners.add(listener);
this.logger.info(`New WebSocket added, ${this.listeners.size} in total`);

View File

@ -292,6 +292,7 @@ export * from './server/OperationHttpHandler';
export * from './server/ParsingHttpHandler';
export * from './server/ServerConfigurator';
export * from './server/WacAllowHttpHandler';
export * from './server/WebSocketHandler';
export * from './server/WebSocketServerConfigurator';
// Server/Description

View File

@ -0,0 +1,13 @@
import type { WebSocket } from 'ws';
import { AsyncHandler } from '../util/handlers/AsyncHandler';
import type { HttpRequest } from './HttpRequest';
export interface WebSocketHandlerInput {
webSocket: WebSocket;
upgradeRequest: HttpRequest;
}
/**
* A handler to support requests trying to open a WebSocket connection.
*/
export abstract class WebSocketHandler extends AsyncHandler<WebSocketHandlerInput> {}

View File

@ -4,27 +4,38 @@ import type { WebSocket } from 'ws';
import { WebSocketServer } from 'ws';
import { getLoggerFor } from '../logging/LogUtil';
import { createErrorMessage } from '../util/errors/ErrorUtil';
import { guardStream } from '../util/GuardedStream';
import { ServerConfigurator } from './ServerConfigurator';
import type { WebSocketHandler } from './WebSocketHandler';
/**
* {@link ServerConfigurator} that adds WebSocket functionality to an existing {@link Server}.
*
* Implementations need to implement the `handleConnection` function to receive the necessary information.
* Listens for WebSocket requests and sends them to its handler.
*/
export abstract class WebSocketServerConfigurator extends ServerConfigurator {
export class WebSocketServerConfigurator extends ServerConfigurator {
protected readonly logger = getLoggerFor(this);
private readonly handler: WebSocketHandler;
public constructor(handler: WebSocketHandler) {
super();
this.handler = handler;
}
public async handle(server: Server): Promise<void> {
// Create WebSocket server
const webSocketServer = new WebSocketServer({ noServer: true });
server.on('upgrade', (upgradeRequest: IncomingMessage, socket: Socket, head: Buffer): void => {
webSocketServer.handleUpgrade(upgradeRequest, socket, head, (webSocket: WebSocket): void => {
this.handleConnection(webSocket, upgradeRequest).catch((error: Error): void => {
webSocketServer.handleUpgrade(upgradeRequest, socket, head, async(webSocket: WebSocket): Promise<void> => {
try {
await this.handler.handleSafe({ upgradeRequest: guardStream(upgradeRequest), webSocket });
} catch (error: unknown) {
this.logger.error(`Something went wrong handling a WebSocket connection: ${createErrorMessage(error)}`);
});
webSocket.send(`There was an error opening this WebSocket: ${createErrorMessage(error)}`);
webSocket.close();
}
});
});
}
protected abstract handleConnection(webSocket: WebSocket, upgradeRequest: IncomingMessage): Promise<void>;
}

View File

@ -1,16 +1,16 @@
import type { IncomingMessage } from 'http';
import type { WebSocket } from 'ws';
import { getLoggerFor } from '../../../logging/LogUtil';
import { WebSocketServerConfigurator } from '../../WebSocketServerConfigurator';
import { NotImplementedHttpError } from '../../../util/errors/NotImplementedHttpError';
import type { WebSocketHandlerInput } from '../../WebSocketHandler';
import { WebSocketHandler } from '../../WebSocketHandler';
import type { NotificationChannelStorage } from '../NotificationChannelStorage';
import type { WebSocket2023Handler } from './WebSocket2023Handler';
import { parseWebSocketRequest } from './WebSocket2023Util';
/**
* Listens for WebSocket connections and verifies if they are valid WebSocketChannel2023 connections,
* Listens for WebSocket connections and verifies whether they are valid WebSocketChannel2023 connections,
* in which case its {@link WebSocket2023Handler} will be alerted.
*/
export class WebSocket2023Listener extends WebSocketServerConfigurator {
export class WebSocket2023Listener extends WebSocketHandler {
protected readonly logger = getLoggerFor(this);
private readonly storage: NotificationChannelStorage;
@ -24,16 +24,18 @@ export class WebSocket2023Listener extends WebSocketServerConfigurator {
this.baseUrl = baseUrl;
}
protected async handleConnection(webSocket: WebSocket, upgradeRequest: IncomingMessage): Promise<void> {
public async canHandle({ upgradeRequest }: WebSocketHandlerInput): Promise<void> {
const id = parseWebSocketRequest(this.baseUrl, upgradeRequest);
const channel = await this.storage.get(id);
if (!channel) {
// Channel not being there implies it has expired
webSocket.send(`Notification channel has expired`);
return webSocket.close();
throw new NotImplementedHttpError(`Unknown or expired WebSocket channel ${id}`);
}
}
public async handle({ webSocket, upgradeRequest }: WebSocketHandlerInput): Promise<void> {
const id = parseWebSocketRequest(this.baseUrl, upgradeRequest);
const channel = (await this.storage.get(id))!;
this.logger.info(`Accepted WebSocket connection listening to changes on ${channel.topic}`);

View File

@ -220,7 +220,7 @@ describe.each(stores)('A server supporting WebSocketChannel2023 using %s', (name
await new Promise<void>((resolve): any => socket.on('close', resolve));
const message = (await messagePromise).toString();
expect(message).toBe('Notification channel has expired');
expect(message).toContain('There was an error opening this WebSocket');
});
it('emits container notifications if contents get added or removed.', async(): Promise<void> => {

View File

@ -1,9 +1,10 @@
import { EventEmitter } from 'events';
import type { Server } from 'http';
import type { WebSocket } from 'ws';
import { RepresentationMetadata } from '../../../src/http/representation/RepresentationMetadata';
import { UnsecureWebSocketsProtocol } from '../../../src/http/UnsecureWebSocketsProtocol';
import type { HttpRequest } from '../../../src/server/HttpRequest';
import { BaseActivityEmitter } from '../../../src/server/notifications/ActivityEmitter';
import type { Guarded } from '../../../src/util/GuardedStream';
import { AS } from '../../../src/util/Vocabularies';
jest.mock('ws', (): any => ({
@ -25,18 +26,24 @@ class DummySocket extends EventEmitter {
}
describe('An UnsecureWebSocketsProtocol', (): void => {
let server: Server;
let webSocket: DummySocket;
let webSocket: WebSocket & DummySocket;
const metadata = new RepresentationMetadata();
const source = new BaseActivityEmitter();
const baseUrl = 'http://example.com/';
let protocol: UnsecureWebSocketsProtocol;
it('can only handle requests targeting the base URl.', async(): Promise<void> => {
protocol = new UnsecureWebSocketsProtocol(source, baseUrl);
webSocket = new DummySocket() as any;
await expect(protocol.canHandle({ webSocket, upgradeRequest: { url: '/' } as any })).resolves.toBeUndefined();
await expect(protocol.canHandle({ webSocket, upgradeRequest: { url: '/foo' } as any }))
.rejects.toThrow('Only WebSocket requests to / are supported.');
});
describe('after registering a socket', (): void => {
beforeAll(async(): Promise<void> => {
server = new EventEmitter() as any;
webSocket = new DummySocket();
protocol = new UnsecureWebSocketsProtocol(source);
await protocol.handle(server);
webSocket = new DummySocket() as any;
protocol = new UnsecureWebSocketsProtocol(source, baseUrl);
const upgradeRequest = {
headers: {
@ -46,8 +53,8 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
socket: {
encrypted: true,
},
} as any as HttpRequest;
server.emit('upgrade', upgradeRequest, webSocket);
} as any as Guarded<HttpRequest>;
await protocol.handle({ webSocket, upgradeRequest });
});
it('sends a protocol message.', (): void => {
@ -135,14 +142,12 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
describe('handling other situations', (): void => {
beforeEach(async(): Promise<void> => {
server = new EventEmitter() as any;
webSocket = new DummySocket();
protocol = new UnsecureWebSocketsProtocol(source);
await protocol.handle(server);
webSocket = new DummySocket() as any;
protocol = new UnsecureWebSocketsProtocol(source, baseUrl);
});
it('unsubscribes when a socket closes.', async(): Promise<void> => {
server.emit('upgrade', { headers: {}, socket: {}} as any, webSocket);
await protocol.handle({ webSocket, upgradeRequest: { headers: {}, socket: {}} as any });
expect(webSocket.listenerCount('message')).toBe(1);
webSocket.emit('close');
expect(webSocket.listenerCount('message')).toBe(0);
@ -151,7 +156,7 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
});
it('unsubscribes when a socket errors.', async(): Promise<void> => {
server.emit('upgrade', { headers: {}, socket: {}} as any, webSocket);
await protocol.handle({ webSocket, upgradeRequest: { headers: {}, socket: {}} as any });
expect(webSocket.listenerCount('message')).toBe(1);
webSocket.emit('error');
expect(webSocket.listenerCount('message')).toBe(0);
@ -160,7 +165,7 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
});
it('emits a warning when no Sec-WebSocket-Protocol is supplied.', async(): Promise<void> => {
server.emit('upgrade', { headers: {}, socket: {}} as any, webSocket);
await protocol.handle({ webSocket, upgradeRequest: { headers: {}, socket: {}} as any });
expect(webSocket.messages).toHaveLength(2);
expect(webSocket.messages.pop())
.toBe('warning Missing Sec-WebSocket-Protocol header, expected value \'solid-0.1\'');
@ -174,7 +179,7 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
},
socket: {},
} as any as HttpRequest;
server.emit('upgrade', upgradeRequest, webSocket);
await protocol.handle({ webSocket, upgradeRequest });
expect(webSocket.messages).toHaveLength(2);
expect(webSocket.messages.pop()).toBe('error Client does not support protocol solid-0.1');
expect(webSocket.close).toHaveBeenCalledTimes(1);
@ -191,7 +196,7 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
},
socket: {},
} as any as HttpRequest;
server.emit('upgrade', upgradeRequest, webSocket);
await protocol.handle({ webSocket, upgradeRequest });
webSocket.emit('message', 'sub https://other.example/protocol/foo');
expect(webSocket.messages).toHaveLength(2);
expect(webSocket.messages.pop()).toBe('ack https://other.example/protocol/foo');
@ -206,7 +211,7 @@ describe('An UnsecureWebSocketsProtocol', (): void => {
},
socket: {},
} as any as HttpRequest;
server.emit('upgrade', upgradeRequest, webSocket);
await protocol.handle({ webSocket, upgradeRequest });
webSocket.emit('message', 'sub https://other.example/protocol/foo');
expect(webSocket.messages).toHaveLength(2);
expect(webSocket.messages.pop()).toBe('ack https://other.example/protocol/foo');

View File

@ -4,6 +4,7 @@ import type { WebSocket } from 'ws';
import type { Logger } from '../../../src/logging/Logger';
import { getLoggerFor } from '../../../src/logging/LogUtil';
import type { HttpRequest } from '../../../src/server/HttpRequest';
import type { WebSocketHandler } from '../../../src/server/WebSocketHandler';
import { WebSocketServerConfigurator } from '../../../src/server/WebSocketServerConfigurator';
import { flushPromises } from '../../util/Util';
@ -22,18 +23,13 @@ jest.mock('../../../src/logging/LogUtil', (): any => {
return { getLoggerFor: (): Logger => logger };
});
class SimpleWebSocketConfigurator extends WebSocketServerConfigurator {
public async handleConnection(): Promise<void> {
// Will be overwritten
}
}
describe('A WebSocketServerConfigurator', (): void => {
const logger: jest.Mocked<Logger> = getLoggerFor('mock') as any;
let server: Server;
let webSocket: WebSocket;
let upgradeRequest: HttpRequest;
let listener: jest.Mocked<SimpleWebSocketConfigurator>;
let handler: jest.Mocked<WebSocketHandler>;
let configurator: WebSocketServerConfigurator;
beforeEach(async(): Promise<void> => {
// Clearing the logger mock
@ -43,17 +39,20 @@ describe('A WebSocketServerConfigurator', (): void => {
webSocket.send = jest.fn();
webSocket.close = jest.fn();
upgradeRequest = { url: `/foo` } as any;
upgradeRequest = new EventEmitter() as any;
listener = new SimpleWebSocketConfigurator() as any;
listener.handleConnection = jest.fn().mockResolvedValue('');
await listener.handle(server);
handler = {
handleSafe: jest.fn(),
} as any;
configurator = new WebSocketServerConfigurator(handler);
await configurator.handle(server);
});
it('attaches an upgrade listener to any server it gets.', async(): Promise<void> => {
server = new EventEmitter() as any;
expect(server.listenerCount('upgrade')).toBe(0);
await listener.handle(server);
await configurator.handle(server);
expect(server.listenerCount('upgrade')).toBe(1);
});
@ -62,19 +61,22 @@ describe('A WebSocketServerConfigurator', (): void => {
await flushPromises();
expect(listener.handleConnection).toHaveBeenCalledTimes(1);
expect(listener.handleConnection).toHaveBeenLastCalledWith(webSocket, upgradeRequest);
expect(handler.handleSafe).toHaveBeenCalledTimes(1);
expect(handler.handleSafe).toHaveBeenLastCalledWith({ webSocket, upgradeRequest });
expect(logger.error).toHaveBeenCalledTimes(0);
});
it('logs an error if something went wrong handling the connection.', async(): Promise<void> => {
listener.handleConnection.mockRejectedValue(new Error('bad input'));
handler.handleSafe.mockRejectedValue(new Error('bad input'));
server.emit('upgrade', upgradeRequest, webSocket);
await flushPromises();
expect(listener.handleConnection).toHaveBeenCalledTimes(1);
expect(handler.handleSafe).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenLastCalledWith('Something went wrong handling a WebSocket connection: bad input');
expect(webSocket.send).toHaveBeenCalledTimes(1);
expect(webSocket.send).toHaveBeenLastCalledWith('There was an error opening this WebSocket: bad input');
expect(webSocket.close).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,5 +1,4 @@
import { EventEmitter } from 'events';
import type { Server } from 'http';
import type { WebSocket } from 'ws';
import type { HttpRequest } from '../../../../../src/server/HttpRequest';
@ -13,7 +12,7 @@ import type {
import {
WebSocket2023Listener,
} from '../../../../../src/server/notifications/WebSocketChannel2023/WebSocket2023Listener';
import { flushPromises } from '../../../../util/Util';
import { NotImplementedHttpError } from '../../../../../src/util/errors/NotImplementedHttpError';
jest.mock('ws', (): any => ({
// eslint-disable-next-line @typescript-eslint/naming-convention
@ -30,7 +29,6 @@ describe('A WebSocket2023Listener', (): void => {
topic: 'http://example.com/foo',
type: 'type',
};
let server: Server;
let webSocket: WebSocket;
let upgradeRequest: HttpRequest;
let storage: jest.Mocked<NotificationChannelStorage>;
@ -39,7 +37,6 @@ describe('A WebSocket2023Listener', (): void => {
let listener: WebSocket2023Listener;
beforeEach(async(): Promise<void> => {
server = new EventEmitter() as any;
webSocket = new EventEmitter() as any;
webSocket.send = jest.fn();
webSocket.close = jest.fn();
@ -55,26 +52,16 @@ describe('A WebSocket2023Listener', (): void => {
} as any;
listener = new WebSocket2023Listener(storage, handler, baseUrl);
await listener.handle(server);
});
it('rejects requests with an unknown target.', async(): Promise<void> => {
await expect(listener.canHandle({ upgradeRequest, webSocket })).resolves.toBeUndefined();
storage.get.mockResolvedValue(undefined);
server.emit('upgrade', upgradeRequest, webSocket);
await flushPromises();
expect(webSocket.send).toHaveBeenCalledTimes(1);
expect(webSocket.send).toHaveBeenLastCalledWith(`Notification channel has expired`);
expect(webSocket.close).toHaveBeenCalledTimes(1);
expect(handler.handleSafe).toHaveBeenCalledTimes(0);
await expect(listener.canHandle({ upgradeRequest, webSocket })).rejects.toThrow(NotImplementedHttpError);
});
it('calls the handler when receiving a valid request.', async(): Promise<void> => {
server.emit('upgrade', upgradeRequest, webSocket);
await flushPromises();
await expect(listener.handle({ upgradeRequest, webSocket })).resolves.toBeUndefined();
expect(webSocket.send).toHaveBeenCalledTimes(0);
expect(webSocket.close).toHaveBeenCalledTimes(0);
expect(handler.handleSafe).toHaveBeenCalledTimes(1);