From 8d9feb4cfbfa393de508a75da0cfc496fd8eda82 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Mon, 19 Jun 2023 07:31:20 -0700 Subject: [PATCH] fix: rpc callback ux --- .../pocketbase-client-helpers/RpcHelper.ts | 62 +++++----- .../pocketbase-client-helpers/WatchHelper.ts | 111 ++++++++---------- 2 files changed, 81 insertions(+), 92 deletions(-) diff --git a/packages/common/src/pocketbase-client-helpers/RpcHelper.ts b/packages/common/src/pocketbase-client-helpers/RpcHelper.ts index 0db8bcc2..03e4a313 100644 --- a/packages/common/src/pocketbase-client-helpers/RpcHelper.ts +++ b/packages/common/src/pocketbase-client-helpers/RpcHelper.ts @@ -1,10 +1,6 @@ import Ajv, { JSONSchemaType } from 'ajv' import type pocketbaseEs from 'pocketbase' -import { - ClientResponseError, - RecordSubscription, - UnsubscribeFunc, -} from 'pocketbase' +import { ClientResponseError, RecordSubscription } from 'pocketbase' import type { JsonObject } from 'type-fest' import { logger } from '../Logger' import { newId } from '../newId' @@ -65,33 +61,37 @@ export const createRpcHelper = (config: RpcHelperConfig) => { } _rpcLogger.breadcrumb(rpcIn.id) dbg({ rpcIn }) - let unsub: UnsubscribeFunc | undefined - return (async () => { - dbg(`Watching ${rpcIn.id}`) - unsub = await watchById( - RPC_COLLECTION, - rpcIn.id, - (data) => { - dbg(`Got an RPC change`, data) - cb?.(data) - if (data.record.status === RpcStatus.FinishedSuccess) { - return data.record.result - } - if (data.record.status === RpcStatus.FinishedError) { - throw new ClientResponseError(data.record.result) - } - }, - { initialFetch: false, pollIntervalMs: 100 } - ) - dbg(`Creating ${rpcIn.id}`) - const newRpc = await client.collection(RPC_COLLECTION).create(rpcIn) - dbg(`Created ${newRpc.id}`) - })() - .catch(error) - .finally(async () => { - dbg(`Unwatching ${rpcIn.id}`) - await unsub?.() + + return new Promise((resolve, reject) => { + ;(async () => { + dbg(`Watching ${rpcIn.id}`) + await watchById( + RPC_COLLECTION, + rpcIn.id, + (data, unsub) => { + dbg(`Got an RPC change`, data) + cb?.(data) + if (data.record.status === RpcStatus.FinishedSuccess) { + dbg(`RPC finished successfully`, data) + unsub() + resolve(data.record.result) + } + if (data.record.status === RpcStatus.FinishedError) { + dbg(`RPC finished unsuccessfully`, data) + unsub() + reject(new ClientResponseError(data.record.result)) + } + }, + { initialFetch: false, pollIntervalMs: 100 } + ) + dbg(`Creating ${rpcIn.id}`) + const newRpc = await client.collection(RPC_COLLECTION).create(rpcIn) + dbg(`Created ${newRpc.id}`) + })().catch((e) => { + error(e) + reject(e) }) + }) } ) } diff --git a/packages/common/src/pocketbase-client-helpers/WatchHelper.ts b/packages/common/src/pocketbase-client-helpers/WatchHelper.ts index b81e9e45..6149822d 100644 --- a/packages/common/src/pocketbase-client-helpers/WatchHelper.ts +++ b/packages/common/src/pocketbase-client-helpers/WatchHelper.ts @@ -19,78 +19,67 @@ export type WatchHelper = ReturnType export const createWatchHelper = (config: WatchHelperConfig) => { const { client } = config - const watchById = safeCatch( - `watchById`, - logger(), - async ( - collectionName: string, - id: RecordId, - cb: (data: RecordSubscription) => void, - options?: Partial - ): Promise => { - const { dbg } = logger().create(`watchById:${collectionName}:${id}`) - const config: WatchConfig = { - initialFetch: true, - pollIntervalMs: 0, - ...options, - } - const { initialFetch, pollIntervalMs } = config - const tm = createTimerManager({}) - dbg(`watching ${collectionName}:${id}`) - let pollId: ReturnType | undefined - const _checkValue = async () => { - if (hasFinished) return + const watchById = async ( + collectionName: string, + id: RecordId, + cb: (data: RecordSubscription, unsub: UnsubscribeFunc) => void, + options?: Partial + ): Promise => { + const { dbg } = logger().create(`watchById:${collectionName}:${id}`) + const config: WatchConfig = { + initialFetch: true, + pollIntervalMs: 0, + ...options, + } + const { initialFetch, pollIntervalMs } = config + const tm = createTimerManager({}) + dbg(`watching ${collectionName}:${id}`) + let hasUpdate = false + let hasFinished = false + if (pollIntervalMs) { + dbg(`Configuring polling for ${pollIntervalMs}ms`) + tm.repeat(async () => { dbg(`Checking ${id} by polling`) try { const rec = await client.collection(collectionName).getOne(id) - if (hasFinished) return + hasUpdate = true dbg(`Got an update polling ${collectionName}:${id}`) - cb({ action: 'poll', record: rec }) + cb({ action: 'poll', record: rec }, _unsub) } catch (e) { dbg(`Failed to poll at interval`, e) - } finally { - pollId = setTimeout(_checkValue, pollIntervalMs) } - } - let hasUpdate = false - let hasFinished = false - if (pollIntervalMs) { - dbg(`Configuring polling for ${pollIntervalMs}ms`) - setTimeout(_checkValue, pollIntervalMs) - } + return true + }, pollIntervalMs) + } - const unsub = await client - .collection(collectionName) - .subscribe(id, (e) => { - hasUpdate = true - dbg(`Got an update watching ${collectionName}:${id}`, e) - clearTimeout(pollId) - if (pollIntervalMs) { - pollId = setTimeout(_checkValue, pollIntervalMs) - } - cb(e) - }) - if (initialFetch) { - try { - const initial = await client - .collection(collectionName) - .getOne(id) - if (!hasUpdate && !hasFinished) { - // No update has been sent yet, send at least one - dbg(`Sending initial update for ${collectionName}:${id}`, initial) - cb({ action: 'initial', record: initial }) - } - } catch (e) { - throw new Error(`Expected ${collectionName}.${id} to exist.`) + const _unsub = async () => { + dbg(`Unsubbing ${collectionName}:${id}`) + tm.shutdown() + hasFinished = true + await unsub() + } + + const unsub = await client + .collection(collectionName) + .subscribe(id, (e) => { + dbg(`Got an update watching ${collectionName}:${id}`, e) + cb(e, _unsub) + }) + + if (initialFetch) { + try { + const initial = await client.collection(collectionName).getOne(id) + if (!hasUpdate && !hasFinished) { + // No update has been sent yet, send at least one + dbg(`Sending initial update for ${collectionName}:${id}`, initial) + cb({ action: 'initial', record: initial }, _unsub) } - } - return async () => { - dbg(`UNsubbing ${collectionName}:${id}`) - hasFinished = true - await unsub() + } catch (e) { + throw new Error(`Expected ${collectionName}.${id} to exist.`) } } - ) + return _unsub + } const watchAllById = safeCatch( `watchAllById`,