fix: rpc callback ux

This commit is contained in:
Ben Allfree
2023-06-19 07:31:20 -07:00
parent 21b17ae6b2
commit 8d9feb4cfb
2 changed files with 81 additions and 92 deletions

View File

@@ -1,10 +1,6 @@
import Ajv, { JSONSchemaType } from 'ajv'
import type pocketbaseEs from 'pocketbase'
import {
ClientResponseError,
RecordSubscription,
UnsubscribeFunc,
} from 'pocketbase'
import { ClientResponseError, RecordSubscription } from 'pocketbase'
import type { JsonObject } from 'type-fest'
import { logger } from '../Logger'
import { newId } from '../newId'
@@ -65,33 +61,37 @@ export const createRpcHelper = (config: RpcHelperConfig) => {
}
_rpcLogger.breadcrumb(rpcIn.id)
dbg({ rpcIn })
let unsub: UnsubscribeFunc | undefined
return (async () => {
dbg(`Watching ${rpcIn.id}`)
unsub = await watchById<ConcreteRpcRecord>(
RPC_COLLECTION,
rpcIn.id,
(data) => {
dbg(`Got an RPC change`, data)
cb?.(data)
if (data.record.status === RpcStatus.FinishedSuccess) {
return data.record.result
}
if (data.record.status === RpcStatus.FinishedError) {
throw new ClientResponseError(data.record.result)
}
},
{ initialFetch: false, pollIntervalMs: 100 }
)
dbg(`Creating ${rpcIn.id}`)
const newRpc = await client.collection(RPC_COLLECTION).create(rpcIn)
dbg(`Created ${newRpc.id}`)
})()
.catch(error)
.finally(async () => {
dbg(`Unwatching ${rpcIn.id}`)
await unsub?.()
return new Promise<TResult>((resolve, reject) => {
;(async () => {
dbg(`Watching ${rpcIn.id}`)
await watchById<ConcreteRpcRecord>(
RPC_COLLECTION,
rpcIn.id,
(data, unsub) => {
dbg(`Got an RPC change`, data)
cb?.(data)
if (data.record.status === RpcStatus.FinishedSuccess) {
dbg(`RPC finished successfully`, data)
unsub()
resolve(data.record.result)
}
if (data.record.status === RpcStatus.FinishedError) {
dbg(`RPC finished unsuccessfully`, data)
unsub()
reject(new ClientResponseError(data.record.result))
}
},
{ initialFetch: false, pollIntervalMs: 100 }
)
dbg(`Creating ${rpcIn.id}`)
const newRpc = await client.collection(RPC_COLLECTION).create(rpcIn)
dbg(`Created ${newRpc.id}`)
})().catch((e) => {
error(e)
reject(e)
})
})
}
)
}

View File

@@ -19,78 +19,67 @@ export type WatchHelper = ReturnType<typeof createWatchHelper>
export const createWatchHelper = (config: WatchHelperConfig) => {
const { client } = config
const watchById = safeCatch(
`watchById`,
logger(),
async <TRec>(
collectionName: string,
id: RecordId,
cb: (data: RecordSubscription<TRec>) => void,
options?: Partial<WatchConfig>
): Promise<UnsubscribeFunc> => {
const { dbg } = logger().create(`watchById:${collectionName}:${id}`)
const config: WatchConfig = {
initialFetch: true,
pollIntervalMs: 0,
...options,
}
const { initialFetch, pollIntervalMs } = config
const tm = createTimerManager({})
dbg(`watching ${collectionName}:${id}`)
let pollId: ReturnType<typeof setTimeout> | undefined
const _checkValue = async () => {
if (hasFinished) return
const watchById = async <TRec>(
collectionName: string,
id: RecordId,
cb: (data: RecordSubscription<TRec>, unsub: UnsubscribeFunc) => void,
options?: Partial<WatchConfig>
): Promise<UnsubscribeFunc> => {
const { dbg } = logger().create(`watchById:${collectionName}:${id}`)
const config: WatchConfig = {
initialFetch: true,
pollIntervalMs: 0,
...options,
}
const { initialFetch, pollIntervalMs } = config
const tm = createTimerManager({})
dbg(`watching ${collectionName}:${id}`)
let hasUpdate = false
let hasFinished = false
if (pollIntervalMs) {
dbg(`Configuring polling for ${pollIntervalMs}ms`)
tm.repeat(async () => {
dbg(`Checking ${id} by polling`)
try {
const rec = await client.collection(collectionName).getOne<TRec>(id)
if (hasFinished) return
hasUpdate = true
dbg(`Got an update polling ${collectionName}:${id}`)
cb({ action: 'poll', record: rec })
cb({ action: 'poll', record: rec }, _unsub)
} catch (e) {
dbg(`Failed to poll at interval`, e)
} finally {
pollId = setTimeout(_checkValue, pollIntervalMs)
}
}
let hasUpdate = false
let hasFinished = false
if (pollIntervalMs) {
dbg(`Configuring polling for ${pollIntervalMs}ms`)
setTimeout(_checkValue, pollIntervalMs)
}
return true
}, pollIntervalMs)
}
const unsub = await client
.collection(collectionName)
.subscribe<TRec>(id, (e) => {
hasUpdate = true
dbg(`Got an update watching ${collectionName}:${id}`, e)
clearTimeout(pollId)
if (pollIntervalMs) {
pollId = setTimeout(_checkValue, pollIntervalMs)
}
cb(e)
})
if (initialFetch) {
try {
const initial = await client
.collection(collectionName)
.getOne<TRec>(id)
if (!hasUpdate && !hasFinished) {
// No update has been sent yet, send at least one
dbg(`Sending initial update for ${collectionName}:${id}`, initial)
cb({ action: 'initial', record: initial })
}
} catch (e) {
throw new Error(`Expected ${collectionName}.${id} to exist.`)
const _unsub = async () => {
dbg(`Unsubbing ${collectionName}:${id}`)
tm.shutdown()
hasFinished = true
await unsub()
}
const unsub = await client
.collection(collectionName)
.subscribe<TRec>(id, (e) => {
dbg(`Got an update watching ${collectionName}:${id}`, e)
cb(e, _unsub)
})
if (initialFetch) {
try {
const initial = await client.collection(collectionName).getOne<TRec>(id)
if (!hasUpdate && !hasFinished) {
// No update has been sent yet, send at least one
dbg(`Sending initial update for ${collectionName}:${id}`, initial)
cb({ action: 'initial', record: initial }, _unsub)
}
}
return async () => {
dbg(`UNsubbing ${collectionName}:${id}`)
hasFinished = true
await unsub()
} catch (e) {
throw new Error(`Expected ${collectionName}.${id} to exist.`)
}
}
)
return _unsub
}
const watchAllById = safeCatch(
`watchAllById`,