diff --git a/.eslintrc.js b/.eslintrc.js index 48581dfb1..ad392f6e6 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -38,6 +38,7 @@ module.exports = { 'class-methods-use-this': 'off', // conflicts with functions from interfaces that sometimes don't require `this` 'comma-dangle': ['error', 'always-multiline'], 'dot-location': ['error', 'property'], + 'lines-around-comment': 'off', // conflicts with padded-blocks 'lines-between-class-members': ['error', 'always', { exceptAfterSingleLine: true }], 'max-len': ['error', { code: 120, ignoreUrls: true }], 'new-cap': 'off', // used for RDF constants diff --git a/src/ldp/UnsecureWebSocketsProtocol.ts b/src/ldp/UnsecureWebSocketsProtocol.ts new file mode 100644 index 000000000..22ec4696e --- /dev/null +++ b/src/ldp/UnsecureWebSocketsProtocol.ts @@ -0,0 +1,145 @@ +import { EventEmitter } from 'events'; +import type WebSocket from 'ws'; +import { getLoggerFor } from '../logging/LogUtil'; +import type { HttpRequest } from '../server/HttpRequest'; +import { WebSocketHandler } from '../server/WebSocketHandler'; +import type { ResourceIdentifier } from './representation/ResourceIdentifier'; + +const VERSION = 'solid/0.1.0-alpha'; + +/** + * Implementation of Solid WebSockets API Spec solid/0.1.0-alpha + * at https://github.com/solid/solid-spec/blob/master/api-websockets.md + */ +class WebSocketListener extends EventEmitter { + private host = ''; + private protocol = ''; + private readonly socket: WebSocket; + private readonly subscribedPaths = new Set(); + private readonly logger = getLoggerFor(this); + + public constructor(socket: WebSocket) { + super(); + this.socket = socket; + socket.addListener('error', (): void => this.stop()); + socket.addListener('close', (): void => this.stop()); + socket.addListener('message', (message: string): void => this.onMessage(message)); + } + + public start(upgradeRequest: HttpRequest): void { + // Greet the client + this.sendMessage('protocol', VERSION); + this.sendMessage('warning', 'Unstandardized protocol version, proceed with care'); + + // Verify the WebSocket protocol version + const protocolHeader = upgradeRequest.headers['sec-websocket-protocol']; + if (!protocolHeader) { + this.sendMessage('warning', `Missing Sec-WebSocket-Protocol header, expected value '${VERSION}'`); + } else { + const supportedProtocols = protocolHeader.split(/\s*,\s*/u); + if (!supportedProtocols.includes(VERSION)) { + this.sendMessage('error', `Client does not support protocol ${VERSION}`); + this.stop(); + } + } + + // Store the HTTP host and protocol + this.host = upgradeRequest.headers.host ?? ''; + this.protocol = (upgradeRequest.socket as any).secure ? 'https:' : 'http:'; + } + + private stop(): void { + try { + this.socket.close(); + } catch { + // Ignore + } + this.subscribedPaths.clear(); + this.socket.removeAllListeners(); + this.emit('closed'); + } + + public onResourceChanged({ path }: ResourceIdentifier): void { + if (this.subscribedPaths.has(path)) { + this.sendMessage('pub', path); + } + } + + private onMessage(message: string): void { + // Parse the message + const match = /^(\w+)\s+(.+)$/u.exec(message); + if (!match) { + this.sendMessage('warning', `Unrecognized message format: ${message}`); + return; + } + + // Process the message + const [ , type, value ] = match; + switch (type) { + case 'sub': + this.subscribe(value); + break; + default: + this.sendMessage('warning', `Unrecognized message type: ${type}`); + } + } + + private subscribe(path: string): void { + try { + // Resolve and verify the URL + const resolved = new URL(path, `${this.protocol}${this.host}`); + if (resolved.host !== this.host) { + throw new Error(`Mismatched host: ${resolved.host} instead of ${this.host}`); + } + if (resolved.protocol !== this.protocol) { + throw new Error(`Mismatched protocol: ${resolved.protocol} instead of ${this.protocol}`); + } + // Subscribe to the URL + const url = resolved.toString(); + this.subscribedPaths.add(url); + this.sendMessage('ack', url); + this.logger.debug(`WebSocket subscribed to changes on ${url}`); + } catch (error: unknown) { + // Report errors to the socket + const errorText: string = (error as any).message; + this.sendMessage('error', errorText); + this.logger.warn(`WebSocket could not subscribe to ${path}: ${errorText}`); + } + } + + private sendMessage(type: string, value: string): void { + this.socket.send(`${type} ${value}`); + } +} + +/** + * Provides live update functionality following + * the Solid WebSockets API Spec solid/0.1.0-alpha + */ +export class UnsecureWebSocketsProtocol extends WebSocketHandler { + private readonly logger = getLoggerFor(this); + private readonly listeners = new Set(); + + public constructor(source: EventEmitter) { + super(); + source.on('changed', (changed: ResourceIdentifier): void => this.onResourceChanged(changed)); + } + + public async handle(input: { webSocket: WebSocket; upgradeRequest: HttpRequest }): Promise { + const listener = new WebSocketListener(input.webSocket); + this.listeners.add(listener); + this.logger.info(`New WebSocket added, ${this.listeners.size} in total`); + + listener.on('closed', (): void => { + this.listeners.delete(listener); + this.logger.info(`WebSocket closed, ${this.listeners.size} remaining`); + }); + listener.start(input.upgradeRequest); + } + + private onResourceChanged(changed: ResourceIdentifier): void { + for (const listener of this.listeners) { + listener.onResourceChanged(changed); + } + } +} diff --git a/test/unit/ldp/UnsecureWebSocketsProtocol.test.ts b/test/unit/ldp/UnsecureWebSocketsProtocol.test.ts new file mode 100644 index 000000000..cf5befd42 --- /dev/null +++ b/test/unit/ldp/UnsecureWebSocketsProtocol.test.ts @@ -0,0 +1,171 @@ +import { EventEmitter } from 'events'; +import { UnsecureWebSocketsProtocol } from '../../../src/ldp/UnsecureWebSocketsProtocol'; +import type { HttpRequest } from '../../../src/server/HttpRequest'; + +class DummySocket extends EventEmitter { + public readonly messages = new Array(); + public readonly close = jest.fn(); + + public send(message: string): void { + this.messages.push(message); + } +} + +describe('An UnsecureWebSocketsProtocol', (): void => { + const source = new EventEmitter(); + const protocol = new UnsecureWebSocketsProtocol(source); + + describe('after registering a socket', (): void => { + const webSocket = new DummySocket(); + + beforeAll(async(): Promise => { + const upgradeRequest = { + headers: { + host: 'mypod.example', + 'sec-websocket-protocol': 'solid/0.1.0-alpha, other/1.0.0', + }, + socket: { + secure: true, + }, + } as any as HttpRequest; + await protocol.handle({ webSocket, upgradeRequest } as any); + }); + + afterEach((): void => { + webSocket.messages.length = 0; + }); + + it('sends a protocol message.', (): void => { + expect(webSocket.messages).toHaveLength(2); + expect(webSocket.messages.shift()).toBe('protocol solid/0.1.0-alpha'); + expect(webSocket.messages.shift()).toBe('warning Unstandardized protocol version, proceed with care'); + }); + + it('warns when receiving an unexpected message.', (): void => { + webSocket.emit('message', 'unexpected'); + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('warning Unrecognized message format: unexpected'); + }); + + it('warns when receiving an unexpected message type.', (): void => { + webSocket.emit('message', 'unknown 1 2 3'); + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('warning Unrecognized message type: unknown'); + }); + + describe('before subscribing to resources', (): void => { + it('does not emit pub messages.', (): void => { + source.emit('changed', { path: 'https://mypod.example/foo/bar' }); + expect(webSocket.messages).toHaveLength(0); + }); + }); + + describe('after subscribing to a resource', (): void => { + beforeAll((): void => { + webSocket.emit('message', 'sub https://mypod.example/foo/bar'); + }); + + it('sends an ack message.', (): void => { + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('ack https://mypod.example/foo/bar'); + }); + + it('emits pub messages for that resource.', (): void => { + source.emit('changed', { path: 'https://mypod.example/foo/bar' }); + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('pub https://mypod.example/foo/bar'); + }); + }); + + describe('after subscribing to a resource via a relative URL', (): void => { + beforeAll((): void => { + webSocket.emit('message', 'sub /relative/foo'); + }); + + it('sends an ack message.', (): void => { + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('ack https://mypod.example/relative/foo'); + }); + + it('emits pub messages for that resource.', (): void => { + source.emit('changed', { path: 'https://mypod.example/relative/foo' }); + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()).toBe('pub https://mypod.example/relative/foo'); + }); + }); + + describe('after subscribing to a resource with the wrong host name', (): void => { + beforeAll((): void => { + webSocket.emit('message', 'sub https://wrong.example/host/foo'); + }); + + it('send an error message.', (): void => { + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()) + .toBe('error Mismatched host: wrong.example instead of mypod.example'); + }); + }); + + describe('after subscribing to a resource with the wrong protocol', (): void => { + beforeAll((): void => { + webSocket.emit('message', 'sub http://mypod.example/protocol/foo'); + }); + + it('send an error message.', (): void => { + expect(webSocket.messages).toHaveLength(1); + expect(webSocket.messages.shift()) + .toBe('error Mismatched protocol: http: instead of https:'); + }); + }); + }); + + it('unsubscribes when a socket closes.', async(): Promise => { + const newSocket = new DummySocket(); + await protocol.handle({ webSocket: newSocket, upgradeRequest: { headers: {}, socket: {}}} as any); + expect(newSocket.listenerCount('message')).toBe(1); + newSocket.emit('close'); + expect(newSocket.listenerCount('message')).toBe(0); + expect(newSocket.listenerCount('close')).toBe(0); + expect(newSocket.listenerCount('error')).toBe(0); + }); + + it('unsubscribes when a socket errors.', async(): Promise => { + const newSocket = new DummySocket(); + await protocol.handle({ webSocket: newSocket, upgradeRequest: { headers: {}, socket: {}}} as any); + expect(newSocket.listenerCount('message')).toBe(1); + newSocket.emit('error'); + expect(newSocket.listenerCount('message')).toBe(0); + expect(newSocket.listenerCount('close')).toBe(0); + expect(newSocket.listenerCount('error')).toBe(0); + }); + + it('emits a warning when no Sec-WebSocket-Protocol is supplied.', async(): Promise => { + const newSocket = new DummySocket(); + const upgradeRequest = { + headers: {}, + socket: {}, + } as any as HttpRequest; + await protocol.handle({ webSocket: newSocket, upgradeRequest } as any); + expect(newSocket.messages).toHaveLength(3); + expect(newSocket.messages.pop()) + .toBe('warning Missing Sec-WebSocket-Protocol header, expected value \'solid/0.1.0-alpha\''); + expect(newSocket.close).toHaveBeenCalledTimes(0); + }); + + it('emits an error and closes the connection with the wrong Sec-WebSocket-Protocol.', async(): Promise => { + const newSocket = new DummySocket(); + const upgradeRequest = { + headers: { + 'sec-websocket-protocol': 'solid/1.0.0, other', + }, + socket: {}, + } as any as HttpRequest; + await protocol.handle({ webSocket: newSocket, upgradeRequest } as any); + expect(newSocket.messages).toHaveLength(3); + expect(newSocket.messages.pop()).toBe('error Client does not support protocol solid/0.1.0-alpha'); + expect(newSocket.close).toHaveBeenCalledTimes(1); + expect(newSocket.listenerCount('message')).toBe(0); + expect(newSocket.listenerCount('close')).toBe(0); + expect(newSocket.listenerCount('error')).toBe(0); + }); +});