chore: convert rpc to rest

This commit is contained in:
Ben Allfree
2023-11-05 03:22:41 -08:00
parent f369f7b79e
commit 098a00c72d
31 changed files with 357 additions and 771 deletions

View File

@@ -6,7 +6,7 @@
"scripts": {
"check:types": "svelte-check",
"preview": "npx http-server@latest ./build -P \"http://localhost:8080?\"",
"dev": "vite dev --force",
"dev": "vite dev --force --host=0.0.0.0",
"build": "NODE_ENV=production vite build",
"lint": "prettier --check .",
"format": "prettier --write ."

View File

@@ -1,27 +1,19 @@
import {
CreateInstancePayloadSchema,
LoggerService,
RenameInstancePayloadSchema,
RpcCommands,
SaveSecretsPayloadSchema,
SaveVersionPayloadSchema,
SetInstanceMaintenancePayloadSchema,
RestCommands,
RestMethods,
UpdateInstancePayload,
UpdateInstancePayloadSchema,
UpdateInstanceResult,
assertExists,
createRpcHelper,
createRestHelper,
createWatchHelper,
type CreateInstancePayload,
type CreateInstanceResult,
type InstanceFields,
type InstanceId,
type InstanceLogFields,
type RenameInstancePayload,
type RenameInstanceResult,
type SaveSecretsPayload,
type SaveSecretsResult,
type SaveVersionPayload,
type SaveVersionResult,
type SetInstanceMaintenancePayload,
type SetInstanceMaintenanceResult,
} from '$shared'
import { INSTANCE_URL } from '$src/env'
import { createGenericSyncEvent } from '$util/events'
@@ -108,35 +100,21 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
const watchHelper = createWatchHelper({ client })
const { watchById, watchAllById } = watchHelper
const rpcMixin = createRpcHelper({ client, watchHelper })
const { mkRpc } = rpcMixin
const restMixin = createRestHelper({ client, watchHelper })
const { mkRest } = restMixin
const createInstance = mkRpc<CreateInstancePayload, CreateInstanceResult>(
RpcCommands.CreateInstance,
const createInstance = mkRest<CreateInstancePayload, CreateInstanceResult>(
RestCommands.Instance,
RestMethods.Create,
CreateInstancePayloadSchema,
)
const saveSecrets = mkRpc<SaveSecretsPayload, SaveSecretsResult>(
RpcCommands.SaveSecrets,
SaveSecretsPayloadSchema,
const updateInstance = mkRest<UpdateInstancePayload, UpdateInstanceResult>(
RestCommands.Instance,
RestMethods.Update,
UpdateInstancePayloadSchema,
)
const saveVersion = mkRpc<SaveVersionPayload, SaveVersionResult>(
RpcCommands.SaveVersion,
SaveVersionPayloadSchema,
)
const renameInstance = mkRpc<RenameInstancePayload, RenameInstanceResult>(
RpcCommands.RenameInstance,
RenameInstancePayloadSchema,
)
const setInstanceMaintenance = mkRpc<
SetInstanceMaintenancePayload,
SetInstanceMaintenanceResult
>(RpcCommands.SetInstanceMaintenance, SetInstanceMaintenancePayloadSchema)
// gen:mkRpc
const getInstanceById = (
id: InstanceId,
): Promise<InstanceFields | undefined> =>
@@ -295,7 +273,6 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
return {
client,
saveSecrets,
watchInstanceLog,
getAuthStoreProps,
parseError,
@@ -313,9 +290,6 @@ export const createPocketbaseClient = (config: PocketbaseClientConfig) => {
watchInstanceById,
getAllInstancesById,
resendVerificationEmail,
renameInstance,
setInstanceMaintenance,
// gen:export
saveVersion,
updateInstance,
}
}

View File

@@ -5,7 +5,7 @@
import { client } from '$src/pocketbase-client'
import { instance } from '../store'
const { setInstanceMaintenance } = client()
const { updateInstance } = client()
$: ({ id, maintenance } = $instance)
@@ -14,7 +14,7 @@
const isChecked = target.checked
// Update the database with the new value
setInstanceMaintenance({ instanceId: id, maintenance: isChecked }).then(
updateInstance({ instanceId: id, fields: { maintenance: isChecked } }).then(
() => 'saved',
)
}

View File

@@ -6,7 +6,7 @@
import { slide } from 'svelte/transition'
import { instance } from '../store'
const { renameInstance } = client()
const { updateInstance } = client()
$: ({ subdomain, id } = $instance)
@@ -39,9 +39,11 @@
// If they select yes, then update the version in pocketbase
if (confirmVersionChange) {
renameInstance({
updateInstance({
instanceId: id,
subdomain: instanceNameValidation,
fields: {
subdomain: instanceNameValidation,
},
})
.then(() => 'saved')
.catch((error) => {

View File

@@ -5,13 +5,14 @@
import { client } from '$src/pocketbase-client'
import { slide } from 'svelte/transition'
import { instance } from '../store'
import VersionPicker from './VersionPicker.svelte'
$: ({ id, maintenance, version } = $instance)
// Create a copy of the version
let instanceVersion = version
let selectedVersion = version
$: {
instanceVersion = version
selectedVersion = version
}
// Controls the disabled state of the button
@@ -29,14 +30,18 @@
// Prompt the user to confirm the version change
const confirmVersionChange = confirm(
`Are you sure you want to change the version to ${instanceVersion}?`,
`Are you sure you want to change the version to ${selectedVersion}?`,
)
// If they select yes, then update the version in pocketbase
if (confirmVersionChange) {
// Save to the database
errorMessage = ''
client()
.saveVersion({ instanceId: id, version: instanceVersion })
.updateInstance({
instanceId: id,
fields: { version: selectedVersion },
})
.then(() => {
return 'saved'
})
@@ -45,7 +50,7 @@
})
} else {
// If they hit cancel, reset the version number back to what it was initially
instanceVersion = version
selectedVersion = version
}
// Set the button back to normal
@@ -79,12 +84,8 @@
class="flex change-version-form-container-query gap-4"
on:submit={handleSave}
>
<input
required
type="text"
bind:value={instanceVersion}
class="input input-bordered w-full"
/>
<VersionPicker bind:selectedVersion disabled={!maintenance} />
<button
type="submit"
class="btn btn-error"

View File

@@ -0,0 +1,48 @@
<script lang="ts">
import { client } from '$src/pocketbase-client'
import { createEventDispatcher, onMount } from 'svelte'
// Props definition with default value if needed
export let selectedVersion: string = ''
export let disabled: boolean = false
let versions: string[] = [] // This will hold our version strings
// Function to fetch versions - replace with your actual fetch logic
async function fetchVersions(): Promise<string[]> {
const { versions } = await client().client.send(`/api/versions`, {})
return versions
}
onMount(() => {
fetchVersions()
.then((fetchedVersions) => {
versions = fetchedVersions
})
.catch((error) => {
console.error('Failed to load versions', error)
})
})
// Emit an update when the selection changes
function handleSelect(event: Event) {
const detail = (event.target as HTMLSelectElement).value
selectedVersion = detail // Update the local selected version
dispatch('change', detail)
}
// Create a dispatcher for custom events
const dispatch = createEventDispatcher()
</script>
<select
class="select w-full max-w-xs"
bind:value={selectedVersion}
on:change={handleSelect}
{disabled}
>
<option value="" disabled>Select a version</option>
{#each versions as version}
<option value={version}>{version}</option>
{/each}
</select>

View File

@@ -1,5 +1,5 @@
<script lang="ts">
import { SECRET_KEY_REGEX, SaveSecretsPayload } from '$shared'
import { SECRET_KEY_REGEX, UpdateInstancePayload } from '$shared'
import { client } from '$src/pocketbase-client/index.js'
import { reduce } from '@s-libs/micro-dash'
import { slide } from 'svelte/transition'
@@ -43,17 +43,19 @@
// Save to the database
items.upsert({ name: secretKey, value: secretValue })
await client().saveSecrets({
await client().updateInstance({
instanceId: $instance.id,
secrets: reduce(
$items,
(c, v) => {
const { name, value } = v
c[name] = value
return c
},
{} as SaveSecretsPayload['secrets'],
),
fields: {
secrets: reduce(
$items,
(c, v) => {
const { name, value } = v
c[name] = value
return c
},
{} as NonNullable<UpdateInstancePayload['fields']['secrets']>,
),
},
})
// Reset the values when the POST is done

View File

@@ -0,0 +1,58 @@
/// <reference path="../types/types.d.ts" />
/*
{
"subdomain": "foo"
}
*/
routerAdd(
'POST',
'/api/instance',
(c) => {
const authRecord = c.get('authRecord') // empty if not authenticated as regular auth record
console.log(`***authRecord`, JSON.stringify(authRecord))
if (!authRecord) {
throw new Error(`Expected authRecord here`)
}
console.log(`***TOP OF POST`)
let data = new DynamicModel({
subdomain: '',
})
console.log(`***before bind`)
c.bind(data)
console.log(`***after bind`)
// This is necessary for destructuring to work correctly
data = JSON.parse(JSON.stringify(data))
const { subdomain } = data
console.log(`***vars`, JSON.stringify({ subdomain }))
if (!subdomain) {
throw new BadRequestError(
`Subdomain is required when creating an instance.`,
)
}
const { versions } = require(`${__hooks}/versions.pb.js`)
const collection = $app.dao().findCollectionByNameOrId('instances')
const record = new Record(collection)
record.set('uid', authRecord.id)
record.set('subdomain', subdomain)
record.set('status', 'idle')
record.set('version', versions[0])
const form = new RecordUpsertForm($app, record)
form.submit()
return c.json(200, { instance: record })
},
$apis.requireRecordAuth(),
)

View File

@@ -0,0 +1,80 @@
/// <reference path="../types/types.d.ts" />
/*
{
"instanceId": "kz4ngg77eaw1ho0",
"fields": {
"maintenance": true
"name": '',
"version": ''
}
}
*/
routerAdd(
'PUT',
'/api/instance',
(c) => {
console.log(`***TOP OF PUt`)
let data = new DynamicModel({
// describe the shape of the fields to read (used also as initial values)
instanceId: '',
fields: {
maintenance: null,
name: null,
version: null,
secrets: null,
},
})
c.bind(data)
// This is necessary for destructuring to work correctly
data = JSON.parse(JSON.stringify(data))
const {
instanceId,
fields: { maintenance, name, version, secrets },
} = data
console.log(
`***vars`,
JSON.stringify({ instanceId, maintenance, name, version, secrets }),
)
const record = $app.dao().findRecordById('instances', instanceId)
const authRecord = c.get('authRecord') // empty if not authenticated as regular auth record
console.log(`***authRecord`, JSON.stringify(authRecord))
if (!authRecord) {
throw new Error(`Expected authRecord here`)
}
if (record.get('uid') !== authRecord.id) {
throw new BadRequestError(`Not authorized`)
}
function cleanObject(obj) {
return Object.entries(obj).reduce((acc, [key, value]) => {
if (value !== null && value !== undefined) {
acc[key] = value
}
return acc
}, {})
}
console.log(`***original`, JSON.stringify(data))
const sanitized = cleanObject({
subdomain: name,
version,
maintenance,
secrets,
})
console.log(`***sanitized`, JSON.stringify(sanitized))
const form = new RecordUpsertForm($app, record)
form.loadData(sanitized)
form.submit()
return c.json(200, { status: 'ok' })
},
$apis.requireRecordAuth(),
)

View File

@@ -20,7 +20,6 @@ import {
ipWhitelistService,
proxyService,
realtimeLog,
rpcService,
sqliteService,
} from '$services'
import { LogLevelName, LoggerService } from '$shared'
@@ -94,7 +93,6 @@ global.EventSource = EventSource
password: MOTHERSHIP_ADMIN_PASSWORD(),
})
await ftpService({})
await rpcService({})
await proxyService({
coreInternalUrl: url,
})
@@ -109,6 +107,4 @@ global.EventSource = EventSource
// gen:service
info(`Hooking into process exit event`)
await (await rpcService()).initRpcs()
})()

View File

@@ -1,13 +0,0 @@
version: '3.8'
services:
pb:
image: pockethost/pocketbase
ports:
- '8090:8090'
volumes:
- ../../../../../.pbincache/v0.18.6:/host_bin
- ../../../../../.data/pockethost-central:/host_data
- ../../../migrations:/host_data/pb_migrations
- ../../../pb_hooks:/host_data/pb_hooks
command: /host_bin/pocketbase serve --dir=/host_data --hooksDir=/host_data/pb_hooks --migrationsDir=/host_data/pb_migrations

View File

@@ -54,6 +54,7 @@ export const PocketbaseReleaseVersionService = mkSingleton(
}
binPaths[sanitizedTagName] = binPath
})
maxVersion = `~${rsort(keys(binPaths))[0]}`
dbg({ maxVersion })
return true

View File

@@ -1,117 +0,0 @@
import { clientService, instanceService, rpcService } from '$services'
import {
CreateInstancePayload,
CreateInstancePayloadSchema,
CreateInstanceResult,
InstanceStatus,
Logger,
RenameInstancePayloadSchema,
RpcCommands,
SaveSecretsPayload,
SaveSecretsPayloadSchema,
SaveSecretsResult,
SaveVersionPayload,
SaveVersionPayloadSchema,
// gen:import
SaveVersionResult,
SetInstanceMaintenancePayloadSchema,
type RenameInstancePayload,
type RenameInstanceResult,
type SetInstanceMaintenancePayload,
type SetInstanceMaintenanceResult,
} from '$shared'
import { valid, validRange } from 'semver'
import { PocketbaseReleaseVersionService } from '../PocketbaseReleaseVersionService'
export const registerRpcCommands = async (logger: Logger) => {
const { client } = await clientService()
const _rpcCommandLogger = logger.create(`RpcCommands`)
const { dbg, warn } = _rpcCommandLogger
const { registerCommand } = await rpcService()
registerCommand<CreateInstancePayload, CreateInstanceResult>(
RpcCommands.CreateInstance,
CreateInstancePayloadSchema,
async (rpc) => {
const { payload } = rpc
const { subdomain } = payload
const instance = await client.createInstance({
subdomain,
uid: rpc.userId,
version: (await PocketbaseReleaseVersionService()).getLatestVersion(),
status: InstanceStatus.Idle,
secondsThisMonth: 0,
secrets: {},
maintenance: false,
})
return { instance }
},
)
registerCommand<SaveVersionPayload, SaveVersionResult>(
RpcCommands.SaveVersion,
SaveVersionPayloadSchema,
async (rpc) => {
const { payload } = rpc
const { instanceId, version } = payload
if (valid(version) === null && validRange(version) === null) {
return {
status: `error`,
message: `Version must be a valid semver or semver range`,
}
}
await client.updateInstance(instanceId, { version })
return { status: 'ok' }
},
)
registerCommand<SaveSecretsPayload, SaveSecretsResult>(
RpcCommands.SaveSecrets,
SaveSecretsPayloadSchema,
async (job) => {
const { payload } = job
const { instanceId, secrets } = payload
await client.updateInstance(instanceId, { secrets })
return { status: 'ok' }
},
)
registerCommand<RenameInstancePayload, RenameInstanceResult>(
RpcCommands.RenameInstance,
RenameInstancePayloadSchema,
async (job) => {
const { dbg, error } = _rpcCommandLogger.create(`renameInstance`)
const { payload } = job
const { instanceId, subdomain } = payload
dbg(`Updating instance`)
await client.updateInstance(instanceId, { subdomain })
dbg(`Instance updated successfully `)
return {}
},
)
registerCommand<SetInstanceMaintenancePayload, SetInstanceMaintenanceResult>(
RpcCommands.SetInstanceMaintenance,
SetInstanceMaintenancePayloadSchema,
async (job) => {
const { payload } = job
const { instanceId, maintenance } = payload
dbg(`Updating to maintenance mode ${instanceId}`)
await client.updateInstance(instanceId, { maintenance })
if (maintenance) {
try {
dbg(`Shutting down instance ${instanceId}`)
const is = await instanceService()
const api = is.getInstanceApiIfExistsById(instanceId)
await api?.shutdown()
} catch (e) {
warn(e)
}
}
return {}
},
)
// gen:command
}

View File

@@ -1,138 +0,0 @@
import { clientService } from '$services'
import {
LoggerService,
RPC_COMMANDS,
RpcCommands,
RpcFields,
RpcStatus,
SingletonBaseConfig,
assertTruthy,
mkSingleton,
} from '$shared'
import { isObject } from '@s-libs/micro-dash'
import Ajv, { JSONSchemaType, ValidateFunction } from 'ajv'
import Bottleneck from 'bottleneck'
import exitHook from 'exit-hook'
import { default as knexFactory } from 'knex'
import pocketbaseEs, { ClientResponseError } from 'pocketbase'
import { AsyncReturnType, JsonObject } from 'type-fest'
import { registerRpcCommands } from './commands'
export type RpcServiceApi = AsyncReturnType<typeof rpcService>
export type KnexApi = ReturnType<typeof knexFactory>
export type CommandModuleInitializer = (
register: RpcServiceApi['registerCommand'],
client: pocketbaseEs,
knex: KnexApi,
) => void
export type RpcRunner<
TPayload extends JsonObject,
TResult extends JsonObject,
> = (job: RpcFields<TPayload, TResult>) => Promise<TResult>
export type RpcServiceConfig = SingletonBaseConfig & {}
export const rpcService = mkSingleton(async (config: RpcServiceConfig) => {
const rpcServiceLogger = LoggerService().create('RpcService')
const { dbg, error } = rpcServiceLogger
const { client } = await clientService()
const limiter = new Bottleneck({ maxConcurrent: 1 })
const jobHandlers: {
[_ in RpcCommands]?: {
validate: ValidateFunction<any>
run: RpcRunner<any, any>
}
} = {}
const run = async (rpc: RpcFields<any, any>) => {
await client.setRpcStatus(rpc, RpcStatus.Queued)
return limiter.schedule(async () => {
try {
dbg(`Starting job ${rpc.id} (${rpc.cmd})`, JSON.stringify(rpc))
await client.setRpcStatus(rpc, RpcStatus.Starting)
const cmd = (() => {
const { cmd } = rpc
if (!RPC_COMMANDS.find((c) => c === cmd)) {
throw new Error(
`RPC command '${cmd}' is invalid. It must be one of: ${RPC_COMMANDS.join(
'|',
)}.`,
)
}
return cmd as RpcCommands
})()
const handler = jobHandlers[cmd]
if (!handler) {
throw new Error(`RPC handler ${cmd} is not registered`)
}
const { payload } = rpc
assertTruthy(isObject(payload), `Payload must be an object`)
const { validate, run } = handler
if (!validate(payload)) {
throw new Error(
`Payload for ${cmd} fails validation: ${JSON.stringify(payload)}`,
)
}
dbg(`Running RPC ${rpc.id}`, rpc)
await client.setRpcStatus(rpc, RpcStatus.Running)
const res = await run(rpc)
await client.setRpcStatus(rpc, RpcStatus.FinishedSuccess, res)
} catch (e) {
if (!(e instanceof Error)) {
throw new Error(`Expected Error here but got ${typeof e}:${e}`)
}
dbg(`RPC failed with`, e)
await client
.rejectRpc(rpc, new ClientResponseError(e))
.catch((e: Error) => {
error(`rpc ${rpc.id} failed to reject with ${e}`)
})
}
})
}
dbg(`Starting RPC service...`)
const initRpcs = async () => {
dbg(`Initializing RPCs...`)
await registerRpcCommands(rpcServiceLogger)
await client.resetRpcs()
const rpcs = await client.incompleteRpcs()
rpcs.forEach(run)
}
const unsub = await client.onNewRpc(run)
exitHook(unsub)
const ajv = new Ajv()
const registerCommand = <
TPayload extends JsonObject,
TResult extends JsonObject,
>(
commandName: RpcCommands,
schema: JSONSchemaType<TPayload>,
runner: RpcRunner<TPayload, TResult>,
) => {
if (jobHandlers[commandName]) {
throw new Error(`${commandName} job handler already registered.`)
}
jobHandlers[commandName] = {
validate: ajv.compile(schema),
run: runner,
}
}
return {
registerCommand,
initRpcs,
}
})

View File

@@ -5,7 +5,6 @@ import { default as PocketBase, default as pocketbaseEs } from 'pocketbase'
import { createInstanceMixin } from './InstanceMIxin'
import { createInvocationMixin } from './InvocationMixin'
import { createRawPbClient } from './RawPbClient'
import { createRpcHelper } from './RpcHelper'
export type PocketbaseClientApi = ReturnType<typeof createPbClient>
@@ -37,7 +36,6 @@ export const createPbClient = (url: string) => {
})
const context: MixinContext = { client, rawDb, logger: _clientLogger }
const rpcApi = createRpcHelper(context)
const instanceApi = createInstanceMixin(context)
const invocationApi = createInvocationMixin(context, instanceApi)
@@ -47,7 +45,6 @@ export const createPbClient = (url: string) => {
knex: rawDb,
createFirstAdmin,
adminAuthViaEmail,
...rpcApi,
...instanceApi,
...invocationApi,
}

View File

@@ -1,75 +0,0 @@
import { RPC_COLLECTION, RpcFields, RpcStatus } from '$shared'
import { ClientResponseError } from 'pocketbase'
import { JsonObject } from 'type-fest'
import { MixinContext } from './PbClient'
export enum RecordSubscriptionActions {
Create = 'create',
Update = 'update',
Delete = 'delete',
}
export type RpcHelperConfig = MixinContext
export type RpcHelper = ReturnType<typeof createRpcHelper>
export const createRpcHelper = (config: RpcHelperConfig) => {
const { client, rawDb, logger } = config
const onNewRpc = async (cb: (e: RpcFields<any, any>) => void) => {
const unsub = await client
.collection(RPC_COLLECTION)
.subscribe<RpcFields<any, any>>('*', (e) => {
if (e.action !== RecordSubscriptionActions.Create) return
cb(e.record)
})
return unsub
}
const resetRpcs = async () =>
rawDb(RPC_COLLECTION)
.whereNotIn('status', [
RpcStatus.FinishedError,
RpcStatus.FinishedSuccess,
])
.update<RpcFields<any, any>>({
status: RpcStatus.FinishedError,
result: `Canceled by reset`,
})
const incompleteRpcs = async () => {
return client
.collection(RPC_COLLECTION)
.getFullList<RpcFields<any, any>>(100, {
filter: `status != '${RpcStatus.FinishedError}' && status != '${RpcStatus.FinishedSuccess}'`,
})
}
const rejectRpc = async (
rpc: RpcFields<any, any>,
err: ClientResponseError,
) => {
const fields: Partial<RpcFields<any, any>> = {
status: RpcStatus.FinishedError,
result: err,
}
return client
.collection(RPC_COLLECTION)
.update<RpcFields<any, any>>(rpc.id, fields)
}
const setRpcStatus = async (
rpc: RpcFields<any, any>,
status: RpcStatus,
result: JsonObject = {},
) => {
return client.collection(RPC_COLLECTION).update(rpc.id, { status, result })
}
return {
incompleteRpcs,
resetRpcs,
onNewRpc,
rejectRpc,
setRpcStatus,
}
}

View File

@@ -9,6 +9,5 @@ export * from './PocketbaseReleaseVersionService'
export * from './PortService'
export * from './ProxyService'
export * from './RealtimeLog'
export * from './RpcService'
export * from './SqliteService'
export * from './clientService'

View File

@@ -0,0 +1,47 @@
import Ajv, { JSONSchemaType } from 'ajv'
import type pocketbaseEs from 'pocketbase'
import type { JsonObject } from 'type-fest'
import { LoggerService } from '../Logger'
import { RestCommands, RestMethods } from '../schema'
import type { WatchHelper } from './WatchHelper'
export type RestHelperConfig = {
client: pocketbaseEs
watchHelper: WatchHelper
}
export type RestHelper = ReturnType<typeof createRestHelper>
export const createRestHelper = (config: RestHelperConfig) => {
const _logger = LoggerService().create(`RestHelper`)
const {
client,
watchHelper: { watchById },
} = config
const mkRest = <TPayload extends JsonObject, TResult extends JsonObject>(
cmd: RestCommands,
method: RestMethods,
schema: JSONSchemaType<TPayload>,
) => {
const validator = new Ajv().compile(schema)
return async (payload: TPayload): Promise<TResult> => {
const _restCallLogger = _logger.create(cmd)
const { dbg, error } = _restCallLogger
dbg(`Executing REST call`)
if (!validator(payload)) {
throw new Error(`Invalid REST payload: ${validator.errors}`)
}
const res = await client.send(`/api/${cmd}`, {
method: method,
body: payload,
})
dbg(res)
return res
}
}
return { mkRest }
}

View File

@@ -1,100 +0,0 @@
import Ajv, { JSONSchemaType } from 'ajv'
import type pocketbaseEs from 'pocketbase'
import { ClientResponseError, RecordSubscription } from 'pocketbase'
import type { JsonObject } from 'type-fest'
import { LoggerService } from '../Logger'
import { newId } from '../newId'
import { safeCatch } from '../safeCatch'
import {
RPC_COLLECTION,
RpcCommands,
RpcFields,
RpcRecord_Create,
RpcStatus,
} from '../schema'
import type { WatchHelper } from './WatchHelper'
export type RpcHelperConfig = {
client: pocketbaseEs
watchHelper: WatchHelper
}
export type RpcHelper = ReturnType<typeof createRpcHelper>
export const createRpcHelper = (config: RpcHelperConfig) => {
const _logger = LoggerService().create(`RpcHelper`)
const {
client,
watchHelper: { watchById },
} = config
const mkRpc = <TPayload extends JsonObject, TResult extends JsonObject>(
cmd: RpcCommands,
schema: JSONSchemaType<TPayload>,
) => {
type ConcreteRpcRecord = RpcFields<TPayload, TResult>
const validator = new Ajv().compile(schema)
return safeCatch(
cmd,
LoggerService(),
async (
payload: TPayload,
cb?: (data: RecordSubscription<ConcreteRpcRecord>) => void,
) => {
const _rpcLogger = _logger.create(cmd)
const { dbg, error } = _rpcLogger
dbg(`Executing RPC`)
const _user = client.authStore.model
if (!_user) {
throw new Error(`Expected authenticated user here.`)
}
if (!validator(payload)) {
throw new Error(`Invalid RPC payload: ${validator.errors}`)
}
const { id: userId } = _user
const rpcIn: RpcRecord_Create<ConcreteRpcRecord> = {
id: newId(),
cmd,
userId,
payload,
}
_rpcLogger.breadcrumb(rpcIn.id)
dbg({ rpcIn })
return new Promise<TResult>((resolve, reject) => {
;(async () => {
dbg(`Watching ${rpcIn.id}`)
await watchById<ConcreteRpcRecord>(
RPC_COLLECTION,
rpcIn.id,
(data, unsub) => {
dbg(`Got an RPC change`, data)
cb?.(data)
if (data.record.status === RpcStatus.FinishedSuccess) {
dbg(`RPC finished successfully`, data)
unsub()
resolve(data.record.result)
}
if (data.record.status === RpcStatus.FinishedError) {
dbg(`RPC finished unsuccessfully`, data)
unsub()
reject(new ClientResponseError(data.record.result))
}
},
{ initialFetch: false, pollIntervalMs: 100 },
)
dbg(`Creating ${rpcIn.id}`)
const newRpc = await client.collection(RPC_COLLECTION).create(rpcIn)
dbg(`Created ${newRpc.id}`)
})().catch((e) => {
error(e)
reject(e)
})
})
},
)
}
return { mkRpc }
}

View File

@@ -1,2 +1,2 @@
export * from './RpcHelper'
export * from './RestHelper'
export * from './WatchHelper'

View File

@@ -1,53 +0,0 @@
import chalk from 'chalk'
import { nanoid } from 'nanoid'
import { ClientResponseError } from 'pocketbase'
import { Logger } from './Logger'
const SAFECATCH_TTL_MS = 5000
export const safeCatch = <TIn extends any[], TOut>(
name: string,
logger: Logger,
cb: (...args: TIn) => Promise<TOut>,
timeoutMs = SAFECATCH_TTL_MS,
): ((...args: TIn) => Promise<TOut>) => {
return async (...args: TIn) => {
const uuid = `${name}:${nanoid()}`
const pfx = chalk.red(`safeCatch:${uuid}`)
const { raw, error, warn, dbg } = logger.create(pfx)
raw(`args`, args)
const tid = setTimeout(() => {
warn(`timeout ${timeoutMs}ms waiting for ${pfx}`)
}, timeoutMs)
try {
const res = await cb(...args)
raw(`finished`)
return res
} catch (e) {
const payload = JSON.stringify(args)
if (e instanceof ClientResponseError) {
if (e.status === 400) {
dbg(
`PocketBase API error: It looks like you don't have permission to make this request. Raw error: ${e}. Payload: ${payload}`,
)
} else if (e.status === 0) {
dbg(
`Client request aborted (possible duplicate request or real error). Raw error: ${e}. Payload: ${payload}`,
)
} else if (e.status === 404) {
dbg(`Record not found. Raw error: ${e}. Payload: ${payload}`)
} else {
dbg(
`Unknown PocketBase API error. Raw error: ${e}. Payload: ${payload}`,
)
}
} else {
dbg(`Caught an unknown error. Raw error: ${e}. Payload: ${payload}`)
}
throw e
} finally {
clearTimeout(tid)
}
}
}

View File

@@ -0,0 +1,52 @@
import { JSONSchemaType } from 'ajv'
import { InstanceId, Semver } from '../types'
export type UpdateInstancePayload = {
instanceId: InstanceId
fields: {
subdomain?: string
maintenance?: boolean
version?: Semver
secrets?: {
[_: string]: string
}
}
}
export const SECRET_KEY_REGEX = /^[A-Z][A-Z0-9_]*$/
export type UpdateInstanceResult = {
status: 'ok' | 'error'
message?: string
}
export const UpdateInstancePayloadSchema: JSONSchemaType<UpdateInstancePayload> =
{
type: 'object',
properties: {
instanceId: { type: 'string' },
fields: {
type: 'object',
properties: {
subdomain: { type: 'string', nullable: true },
maintenance: { type: 'boolean', nullable: true },
version: {
type: 'string',
nullable: true,
},
secrets: {
type: 'object',
nullable: true,
patternProperties: {
[SECRET_KEY_REGEX.source]: {
anyOf: [{ type: 'string' }],
},
},
required: [],
},
},
},
},
required: ['instanceId', 'fields'],
additionalProperties: false,
}

View File

@@ -0,0 +1,19 @@
import Ajv from 'ajv'
import { JsonObject } from 'type-fest'
export enum RestMethods {
Create = 'POST',
Update = 'PUT',
}
export enum RestCommands {
Instance = 'instance',
}
export type RestPayloadBase = JsonObject
export const ajv = new Ajv()
export * from './CreateInstance'
export * from './UpdateInstance'
// gen:export

View File

@@ -1,20 +0,0 @@
import { JSONSchemaType } from 'ajv'
import { InstanceId } from '../types'
export type RenameInstancePayload = {
instanceId: InstanceId
subdomain: string
}
export type RenameInstanceResult = {}
export const RenameInstancePayloadSchema: JSONSchemaType<RenameInstancePayload> =
{
type: 'object',
properties: {
instanceId: { type: 'string' },
subdomain: { type: 'string' },
},
required: ['instanceId', 'subdomain'],
additionalProperties: false,
}

View File

@@ -1,34 +0,0 @@
import { JSONSchemaType } from 'ajv'
import { InstanceId } from '../types'
export type SaveSecretsPayload = {
instanceId: InstanceId
secrets: {
[_: string]: string
}
}
export type SaveSecretsResult = {
status: 'ok' | 'error'
message?: string
}
export const SECRET_KEY_REGEX = /^[A-Z][A-Z0-9_]*$/
export const SaveSecretsPayloadSchema: JSONSchemaType<SaveSecretsPayload> = {
type: 'object',
properties: {
instanceId: { type: 'string' },
secrets: {
type: 'object',
patternProperties: {
[SECRET_KEY_REGEX.source]: {
anyOf: [{ type: 'string' }],
},
},
required: [],
},
},
required: ['instanceId', 'secrets'],
additionalProperties: false,
}

View File

@@ -1,24 +0,0 @@
import { JSONSchemaType } from 'ajv'
import { InstanceId, Semver } from '../types'
export type SaveVersionPayload = {
instanceId: InstanceId
version: Semver
}
export type SaveVersionResult = {
status: 'ok' | 'error'
message?: string
}
export const SaveVersionPayloadSchema: JSONSchemaType<SaveVersionPayload> = {
type: 'object',
properties: {
instanceId: { type: 'string' },
version: {
type: 'string',
},
},
required: ['instanceId', 'version'],
additionalProperties: false,
}

View File

@@ -1,20 +0,0 @@
import { JSONSchemaType } from 'ajv'
import { InstanceId } from '../types'
export type SetInstanceMaintenancePayload = {
instanceId: InstanceId
maintenance: boolean
}
export type SetInstanceMaintenanceResult = {}
export const SetInstanceMaintenancePayloadSchema: JSONSchemaType<SetInstanceMaintenancePayload> =
{
type: 'object',
properties: {
instanceId: { type: 'string' },
maintenance: { type: 'boolean' },
},
required: ['instanceId', 'maintenance'],
additionalProperties: false,
}

View File

@@ -1,59 +0,0 @@
import Ajv from 'ajv'
import { JsonObject } from 'type-fest'
import { BaseFields, UserId } from '../types'
export const RPC_COLLECTION = 'rpc'
export enum RpcCommands {
CreateInstance = 'create-instance',
SaveSecrets = 'save-secrets',
SaveVersion = 'save-version',
SetInstanceMaintenance = 'set-instance-maintenance',
// gen:enum
RenameInstance = 'rename-instance',
}
export const RPC_COMMANDS = [
RpcCommands.CreateInstance,
RpcCommands.SaveSecrets,
RpcCommands.SaveVersion,
RpcCommands.SetInstanceMaintenance,
// gen:commandlist
RpcCommands.RenameInstance,
]
export enum RpcStatus {
New = 'new',
Queued = 'queued',
Running = 'running',
Starting = 'starting',
FinishedSuccess = 'finished-success',
FinishedError = 'finished-error',
}
export type RpcPayloadBase = JsonObject
export type RpcFields<
TPayload extends RpcPayloadBase,
TRes extends JsonObject,
> = BaseFields & {
userId: UserId
cmd: string
payload: TPayload
status: RpcStatus
message: string
result: TRes
}
export type RpcRecord_Create<TRecord extends RpcFields<any, any>> = Pick<
TRecord,
'id' | 'userId' | 'payload' | 'cmd'
>
export const ajv = new Ajv()
export * from './CreateInstance'
export * from './RenameInstance'
export * from './SaveSecrets'
export * from './SaveVersion'
export * from './SetInstanceMaintenance'
// gen:export

View File

@@ -2,7 +2,7 @@ export * from './Backup'
export * from './Instance'
export * from './InstanceLog'
export * from './Invocation'
export * from './Rpc'
export * from './Rest'
export * from './User'
export * from './types'
export * from './util'

View File

@@ -5,8 +5,6 @@ import {
InstanceFields,
InvocationFields,
LoggerService,
RPC_COLLECTION,
RpcFields,
singletonAsyncExecutionGuard,
} from '$shared'
import Bottleneck from 'bottleneck'
@@ -53,45 +51,10 @@ export const deleteInvocationsForInstance = singletonAsyncExecutionGuard(
(instance) => `deleteInvocationsForInstance:${instance.id}`,
)
export const deleteRpc = singletonAsyncExecutionGuard(
async (rpc: RpcFields<any, any>) => {
const { client } = await clientService()
await client.client.collection(RPC_COLLECTION).delete(rpc.id)
},
(rpc) => `deleteRpc:${rpc.id}`,
)
export const getAllRpcs = singletonAsyncExecutionGuard(
async () => {
const { client } = await clientService()
const rpcs = await client.client
.collection(RPC_COLLECTION)
.getFullList<RpcFields<{ instanceId?: string }, {}>>()
console.log(`Loaded rpcs`)
return rpcs
},
() => `getAllRpcs`,
)
export const deleteRpcsForInstance = singletonAsyncExecutionGuard(
async (instance: InstanceFields) => {
const { id } = instance
const allRpcs = await getAllRpcs()
const instanceRpcs = allRpcs.filter((rpc) => rpc.payload?.instanceId === id)
await Promise.all(instanceRpcs.map(deleteRpc))
},
(instance) => `deleteRpcsForInstance:${instance.id}`,
)
export const deleteInstance = singletonAsyncExecutionGuard(
async (instance: InstanceFields) => {
const { client } = await clientService()
const { id } = instance
await deleteRpcsForInstance(instance).catch((e) => {
console.error(`deleteRpcsForInstance error`, JSON.stringify(e, null, 2))
throw e
})
console.log(`RPCs deleted for ${id}`)
await deleteInvocationsForInstance(instance).catch((e) => {
console.error(
`deleteInvocationsForInstance error`,