mirror of
https://github.com/pockethost/pockethost.git
synced 2025-05-19 21:36:40 +00:00
chore: remove WatchHelper
This commit is contained in:
parent
be2e1b16ea
commit
881c42ddcb
@ -8,7 +8,6 @@ import {
|
||||
UpdateInstanceResult,
|
||||
assertExists,
|
||||
createRestHelper,
|
||||
createWatchHelper,
|
||||
type CreateInstancePayload,
|
||||
type CreateInstanceResult,
|
||||
type InstanceFields,
|
||||
@ -23,8 +22,6 @@ import PocketBase, {
|
||||
BaseAuthStore,
|
||||
ClientResponseError,
|
||||
type AuthModel,
|
||||
type RecordSubscription,
|
||||
type UnsubscribeFunc,
|
||||
} from 'pocketbase'
|
||||
|
||||
export type AuthChangeHandler = (user: BaseAuthStore) => void
|
||||
@ -98,9 +95,7 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
|
||||
|
||||
const refreshAuthToken = () => client.collection('users').authRefresh()
|
||||
|
||||
const watchHelper = createWatchHelper({ client })
|
||||
const { watchById, watchAllById } = watchHelper
|
||||
const restMixin = createRestHelper({ client, watchHelper })
|
||||
const restMixin = createRestHelper({ client })
|
||||
const { mkRest } = restMixin
|
||||
|
||||
const createInstance = mkRest<CreateInstancePayload, CreateInstanceResult>(
|
||||
@ -120,11 +115,6 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
|
||||
): Promise<InstanceFields | undefined> =>
|
||||
client.collection('instances').getOne<InstanceFields>(id)
|
||||
|
||||
const watchInstanceById = async (
|
||||
id: InstanceId,
|
||||
cb: (data: RecordSubscription<InstanceFields>) => void,
|
||||
): Promise<UnsubscribeFunc> => watchById('instances', id, cb)
|
||||
|
||||
const getAllInstancesById = async () =>
|
||||
(await client.collection('instances').getFullList()).reduce(
|
||||
(c, v) => {
|
||||
@ -287,7 +277,6 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
|
||||
onAuthChange,
|
||||
isLoggedIn,
|
||||
user,
|
||||
watchInstanceById,
|
||||
getAllInstancesById,
|
||||
resendVerificationEmail,
|
||||
updateInstance,
|
||||
|
@ -3,21 +3,16 @@ import type pocketbaseEs from 'pocketbase'
|
||||
import type { JsonObject } from 'type-fest'
|
||||
import { LoggerService } from '../Logger'
|
||||
import { RestCommands, RestMethods } from '../schema'
|
||||
import type { WatchHelper } from './WatchHelper'
|
||||
|
||||
export type RestHelperConfig = {
|
||||
client: pocketbaseEs
|
||||
watchHelper: WatchHelper
|
||||
}
|
||||
|
||||
export type RestHelper = ReturnType<typeof createRestHelper>
|
||||
|
||||
export const createRestHelper = (config: RestHelperConfig) => {
|
||||
const _logger = LoggerService().create(`RestHelper`)
|
||||
const {
|
||||
client,
|
||||
watchHelper: { watchById },
|
||||
} = config
|
||||
const { client } = config
|
||||
|
||||
const mkRest = <TPayload extends JsonObject, TResult extends JsonObject>(
|
||||
cmd: RestCommands,
|
||||
|
@ -1,114 +0,0 @@
|
||||
import type pocketbaseEs from 'pocketbase'
|
||||
import type { RecordSubscription, UnsubscribeFunc } from 'pocketbase'
|
||||
import { LoggerService } from '../Logger'
|
||||
import { UnixTimestampMs, createTimerManager } from '../TimerManager'
|
||||
import { BaseFields, RecordId } from '../schema'
|
||||
|
||||
export type WatchHelperConfig = {
|
||||
client: pocketbaseEs
|
||||
}
|
||||
|
||||
export type WatchConfig = {
|
||||
initialFetch: boolean
|
||||
pollIntervalMs: UnixTimestampMs
|
||||
}
|
||||
|
||||
export type WatchHelper = ReturnType<typeof createWatchHelper>
|
||||
|
||||
export const createWatchHelper = (config: WatchHelperConfig) => {
|
||||
const { client } = config
|
||||
|
||||
const watchById = async <TRec>(
|
||||
collectionName: string,
|
||||
id: RecordId,
|
||||
cb: (data: RecordSubscription<TRec>, unsub: UnsubscribeFunc) => void,
|
||||
options?: Partial<WatchConfig>,
|
||||
): Promise<UnsubscribeFunc> => {
|
||||
const { dbg } = LoggerService().create(`watchById:${collectionName}:${id}`)
|
||||
const config: WatchConfig = {
|
||||
initialFetch: true,
|
||||
pollIntervalMs: 0,
|
||||
...options,
|
||||
}
|
||||
const { initialFetch, pollIntervalMs } = config
|
||||
const tm = createTimerManager({})
|
||||
dbg(`watching ${collectionName}:${id}`)
|
||||
let hasUpdate = false
|
||||
let hasFinished = false
|
||||
if (pollIntervalMs) {
|
||||
dbg(`Configuring polling for ${pollIntervalMs}ms`)
|
||||
tm.repeat(async () => {
|
||||
dbg(`Checking ${id} by polling`)
|
||||
try {
|
||||
const rec = await client.collection(collectionName).getOne<TRec>(id)
|
||||
hasUpdate = true
|
||||
dbg(`Got an update polling ${collectionName}:${id}`)
|
||||
cb({ action: 'poll', record: rec }, _unsub)
|
||||
} catch (e) {
|
||||
dbg(`Failed to poll at interval`, e)
|
||||
}
|
||||
return true
|
||||
}, pollIntervalMs)
|
||||
}
|
||||
|
||||
const _unsub = async () => {
|
||||
dbg(`Unsubbing ${collectionName}:${id}`)
|
||||
tm.shutdown()
|
||||
hasFinished = true
|
||||
await unsub()
|
||||
}
|
||||
|
||||
const unsub = await client
|
||||
.collection(collectionName)
|
||||
.subscribe<TRec>(id, (e) => {
|
||||
dbg(`Got an update watching ${collectionName}:${id}`, e)
|
||||
cb(e, _unsub)
|
||||
})
|
||||
|
||||
if (initialFetch) {
|
||||
try {
|
||||
const initial = await client.collection(collectionName).getOne<TRec>(id)
|
||||
if (!hasUpdate && !hasFinished) {
|
||||
// No update has been sent yet, send at least one
|
||||
dbg(`Sending initial update for ${collectionName}:${id}`, initial)
|
||||
cb({ action: 'initial', record: initial }, _unsub)
|
||||
}
|
||||
} catch (e) {
|
||||
throw new Error(`Expected ${collectionName}.${id} to exist.`)
|
||||
}
|
||||
}
|
||||
return _unsub
|
||||
}
|
||||
|
||||
const watchAllById = async <TRec extends BaseFields>(
|
||||
collectionName: string,
|
||||
idName: keyof TRec,
|
||||
idValue: RecordId,
|
||||
cb: (data: RecordSubscription<TRec>) => void,
|
||||
initialFetch = true,
|
||||
): Promise<UnsubscribeFunc> => {
|
||||
let hasUpdate: { [_: RecordId]: boolean } = {}
|
||||
const unsub = client
|
||||
.collection(collectionName)
|
||||
.subscribe<TRec>('*', (e) => {
|
||||
// console.log(e.record.instanceId, id)
|
||||
if (e.record[idName] !== idValue) return
|
||||
hasUpdate[e.record.id] = true
|
||||
cb(e)
|
||||
})
|
||||
if (initialFetch) {
|
||||
const existing = await client
|
||||
.collection(collectionName)
|
||||
.getFullList<TRec>(100, {
|
||||
filter: `${idName.toString()} = '${idValue}'`,
|
||||
})
|
||||
existing.forEach((record) => {
|
||||
if (hasUpdate[record.id]) return
|
||||
cb({ action: 'initial', record })
|
||||
})
|
||||
}
|
||||
return unsub
|
||||
}
|
||||
|
||||
return { watchById, watchAllById }
|
||||
}
|
@ -1,2 +1 @@
|
||||
export * from './RestHelper'
|
||||
export * from './WatchHelper'
|
||||
|
Loading…
x
Reference in New Issue
Block a user