diff --git a/Dockerfile b/Dockerfile index 2f7c42a1..a66996d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ FROM node:18-alpine as pockethost-buildbox COPY --from=golang:1.19.3-alpine /usr/local/go/ /usr/local/go/ ENV PATH="/usr/local/go/bin:${PATH}" -RUN apk add python3 py3-pip make gcc musl-dev g++ bash +RUN apk add python3 py3-pip make gcc musl-dev g++ bash sqlite \ No newline at end of file diff --git a/docker/install.yaml b/docker/install.yaml index a729b1b7..63dcd2ee 100644 --- a/docker/install.yaml +++ b/docker/install.yaml @@ -9,7 +9,7 @@ services: dockerfile: Dockerfile container_name: prepbox working_dir: /src - command: bash -c "yarn" + command: bash -c "yarn && npx patch-package" volumes: - ./mount/cache/go:/go - ./mount/cache/yarn:/usr/local/share/.cache/yarn/v6 diff --git a/package.json b/package.json index 906141f9..00525db7 100644 --- a/package.json +++ b/package.json @@ -40,4 +40,4 @@ "prettier-plugin-svelte": "^2.7.0", "typescript": "^4.8.3" } -} \ No newline at end of file +} diff --git a/packages/cli/src/helpers/RealtimeSubscriptionManager.ts b/packages/cli/src/helpers/RealtimeSubscriptionManager.ts deleted file mode 100644 index 4201e2f3..00000000 --- a/packages/cli/src/helpers/RealtimeSubscriptionManager.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { client } from '../client' -import { - Pb_Any_Record_Db, - Pb_CollectionName, - Pb_PkId, - Pb_Untrusted_Db, -} from '../schema/base' - -export const createRealtimeSubscriptionManager = () => { - const subscriptions: { [_: string]: number } = {} - - const subscribe = ( - collectionName: Pb_CollectionName, - cb: (rec: Pb_Untrusted_Db) => void, - id?: Pb_PkId - ) => { - const slug = id ? `${collectionName}/${id}` : collectionName - - if (subscriptions[slug]) { - subscriptions[slug]++ - } else { - subscriptions[slug] = 1 - client.realtime.subscribe(slug, (e) => { - console.log(`Realtime update`, { e }) - cb(e.record as unknown as Pb_Untrusted_Db) - }) - } - return () => { - subscriptions[slug]-- - if (subscriptions[slug] === 0) { - console.log(`Realtime unsub`) - client.realtime.unsubscribe(slug) - } - } - } - - return subscribe -} diff --git a/packages/cli/src/helpers/index.ts b/packages/cli/src/helpers/index.ts index a1b2d7a3..16e2a2ec 100644 --- a/packages/cli/src/helpers/index.ts +++ b/packages/cli/src/helpers/index.ts @@ -1,6 +1,5 @@ export * from './getOne' export * from './onAuthStateChanged' export * from './pbUid' -export * from './RealtimeSubscriptionManager' export * from './signInAnonymously' export * from './upsert' diff --git a/packages/common/package.json b/packages/common/package.json index c066d417..33d9d994 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -8,4 +8,4 @@ "nanoid": "^4.0.0", "pocketbase": "^0.8.0-rc1" } -} +} \ No newline at end of file diff --git a/packages/common/src/RealtimeSubscriptionManager.ts b/packages/common/src/RealtimeSubscriptionManager.ts deleted file mode 100644 index 9d78f41e..00000000 --- a/packages/common/src/RealtimeSubscriptionManager.ts +++ /dev/null @@ -1,38 +0,0 @@ -import PocketBase, { Record } from 'pocketbase' -import { CollectionName, RecordId } from './schema' - -export interface RecordSubscription { - action: string - record: T -} - -export type RealtimeEventHandler = (e: RecordSubscription) => void - -export const createRealtimeSubscriptionManager = (pocketbase: PocketBase) => { - const subscriptions: { [_: string]: number } = {} - - const subscribeOne = ( - collection: CollectionName, - id: RecordId, - cb: (e: RecordSubscription) => void - ) => { - const slug = `${collection}/${id}` - if (subscriptions[slug]) { - subscriptions[slug]++ - } else { - subscriptions[slug] = 1 - pocketbase.collection(collection).subscribeOne(id, (e) => { - console.log(`Realtime update`, { e }) - cb(e) - }) - } - return () => { - subscriptions[slug]-- - if (subscriptions[slug] === 0) { - pocketbase.collection(collection).unsubscribe(id) - } - } - } - - return { subscribeOne } -} diff --git a/packages/common/src/TimerManager.ts b/packages/common/src/TimerManager.ts new file mode 100644 index 00000000..c50d59f8 --- /dev/null +++ b/packages/common/src/TimerManager.ts @@ -0,0 +1,49 @@ +import { forEach } from '@s-libs/micro-dash' + +export type UnixTimestampMs = number +export type TimerCanceler = () => void + +export type Config = {} + +export const createTimerManager = (config: Config) => { + let i = 0 + const cleanups: any = {} + + const add = (cb: () => void, ms: UnixTimestampMs) => { + const idx = i++ + const tid = setTimeout(() => { + cancel() + cb() + }, ms) + const cancel = () => { + clearTimeout(tid) + delete cleanups[idx] + } + cleanups[idx] = cancel + return cancel + } + + const shutdown = () => { + // console.log(`Canceling all`, cleanups) + forEach(cleanups, (c) => c()) + // console.log(`done`, cleanups) + } + + const repeat = ( + cb: () => Promise | boolean, + ms: UnixTimestampMs + ) => { + let _unsub: TimerCanceler | undefined = undefined + const _again = async () => { + const shouldRepeat = await cb() + if (shouldRepeat) _unsub = add(_again, ms) + } + _again() + return () => { + _unsub?.() + _unsub = undefined + } + } + + return { add, shutdown, repeat } +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 88f4e2e3..74e97ed3 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -1,4 +1,4 @@ export * from './assert' -export * from './RealtimeSubscriptionManager' export * from './releases' export * from './schema' +export * from './TimerManager' diff --git a/packages/common/src/schema.ts b/packages/common/src/schema.ts deleted file mode 100644 index 20ef839b..00000000 --- a/packages/common/src/schema.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { PlatformId, VersionId } from './releases' - -export type RecordId = string -export type UserId = RecordId -export type InstanceId = RecordId -export type InternalInstanceId = RecordId -export type Subdomain = string -export type Port = number -export type IsoDate = string -export type ProcessId = number -export type Username = string -export type Password = string -export type CollectionName = string -export type Seconds = number - -export const pocketNow = () => new Date().toISOString() - -export enum InstanceStatus { - Unknown = '', - Idle = 'idle', - Port = 'porting', - Starting = 'starting', - Running = 'running', - Failed = 'failed', -} - -export type InstancesRecord = { - id: RecordId - subdomain: Subdomain - uid: UserId - status: InstanceStatus - platform: PlatformId - version: VersionId - secondsThisMonth: Seconds -} - -export type InstancesRecord_New = Omit - -export type UserRecord = { - id: RecordId - email: string - verified: boolean -} - -export type InvocationRecord = { - id: RecordId - instanceId: RecordId - startedAt: IsoDate - endedAt: IsoDate - pid: number - totalSeconds: number -} - -export type InstanceRecordById = { [_: InstanceId]: InstancesRecord } diff --git a/packages/common/src/schema/Backup.ts b/packages/common/src/schema/Backup.ts new file mode 100644 index 00000000..b3a17c6e --- /dev/null +++ b/packages/common/src/schema/Backup.ts @@ -0,0 +1,42 @@ +import { InstanceId, IsoDate, RecordId } from './types' + +export enum BackupStatus { + Queued = 'queued', + Running = 'running', + FinishedSuccess = 'finished-success', + FinishedError = 'finished-error', +} + +export type BackupRecordId = RecordId +export type BackupRecord = { + id: BackupRecordId + instanceId: InstanceId + status: BackupStatus + message: string + bytes: number + created: IsoDate + updated: IsoDate + platform: string + version: string + progress: { + [_: string]: number + } +} + +export type BackupRecord_Create = Pick< + BackupRecord, + 'instanceId' | 'status' | 'platform' | 'version' +> + +export type BackupRecord_Update = Partial< + Pick< + BackupRecord, + | 'instanceId' + | 'status' + | 'bytes' + | 'message' + | 'platform' + | 'version' + | 'progress' + > +> diff --git a/packages/common/src/schema/Instance.ts b/packages/common/src/schema/Instance.ts new file mode 100644 index 00000000..fcbdd907 --- /dev/null +++ b/packages/common/src/schema/Instance.ts @@ -0,0 +1,23 @@ +import { PlatformId, VersionId } from '../releases' +import { RecordId, Seconds, Subdomain, UserId } from './types' + +export enum InstanceStatus { + Unknown = '', + Idle = 'idle', + Port = 'porting', + Starting = 'starting', + Running = 'running', + Failed = 'failed', +} + +export type InstancesRecord = { + id: RecordId + subdomain: Subdomain + uid: UserId + status: InstanceStatus + platform: PlatformId + version: VersionId + secondsThisMonth: Seconds +} + +export type InstancesRecord_New = Omit diff --git a/packages/common/src/schema/Invocation.ts b/packages/common/src/schema/Invocation.ts new file mode 100644 index 00000000..02e6217a --- /dev/null +++ b/packages/common/src/schema/Invocation.ts @@ -0,0 +1,10 @@ +import { IsoDate, RecordId } from './types' + +export type InvocationRecord = { + id: RecordId + instanceId: RecordId + startedAt: IsoDate + endedAt: IsoDate + pid: number + totalSeconds: number +} diff --git a/packages/common/src/schema/Job.ts b/packages/common/src/schema/Job.ts new file mode 100644 index 00000000..202df63d --- /dev/null +++ b/packages/common/src/schema/Job.ts @@ -0,0 +1,46 @@ +import { BackupRecordId } from './Backup' +import { InstancesRecord } from './Instance' +import { InstanceId, RecordId, UserId } from './types' + +export enum JobStatus { + New = 'new', + Queued = 'queued', + Running = 'running', + FinishedSuccess = 'finished-success', + FinishedError = 'finished-error', +} + +export type JobPayloadBase = { + cmd: JobCommands +} + +export enum JobCommands { + BackupInstance = 'backup-instance', + RestoreInstance = 'restore-instance', +} + +export const JOB_COMMANDS = [JobCommands.BackupInstance] + +export type InstanceBackupJobPayload = { + cmd: JobCommands.BackupInstance + instanceId: InstanceId +} + +export type InstanceRestoreJobPayload = { + cmd: JobCommands.RestoreInstance + backupId: BackupRecordId +} + +export type JobRecord = { + id: RecordId + userId: UserId + payload: TPayload + status: JobStatus + message: string +} + +export type InstanceBackupJobRecord = JobRecord + +export type JobRecord_In = Omit, 'id' | 'message'> + +export type InstanceRecordById = { [_: InstanceId]: InstancesRecord } diff --git a/packages/common/src/schema/User.ts b/packages/common/src/schema/User.ts new file mode 100644 index 00000000..0132b58c --- /dev/null +++ b/packages/common/src/schema/User.ts @@ -0,0 +1,7 @@ +import { RecordId } from './types' + +export type UserRecord = { + id: RecordId + email: string + verified: boolean +} diff --git a/packages/common/src/schema/index.ts b/packages/common/src/schema/index.ts new file mode 100644 index 00000000..72d0b5b3 --- /dev/null +++ b/packages/common/src/schema/index.ts @@ -0,0 +1,7 @@ +export * from './Backup' +export * from './Instance' +export * from './Invocation' +export * from './Job' +export * from './types' +export * from './User' +export * from './util' diff --git a/packages/common/src/schema/types.ts b/packages/common/src/schema/types.ts new file mode 100644 index 00000000..c236386f --- /dev/null +++ b/packages/common/src/schema/types.ts @@ -0,0 +1,12 @@ +export type RecordId = string +export type UserId = RecordId +export type InstanceId = RecordId +export type InternalInstanceId = RecordId +export type Subdomain = string +export type Port = number +export type IsoDate = string +export type ProcessId = number +export type Username = string +export type Password = string +export type CollectionName = string +export type Seconds = number diff --git a/packages/common/src/schema/util.ts b/packages/common/src/schema/util.ts new file mode 100644 index 00000000..f972ed18 --- /dev/null +++ b/packages/common/src/schema/util.ts @@ -0,0 +1 @@ +export const pocketNow = () => new Date().toISOString() diff --git a/packages/common/tsconfig.json b/packages/common/tsconfig.json index 1bc63d91..0a47e2fd 100644 --- a/packages/common/tsconfig.json +++ b/packages/common/tsconfig.json @@ -9,11 +9,12 @@ "sourceMap": true, "strict": true, "module": "ESNext", + "target": "ESNext", "moduleResolution": "node", "noUncheckedIndexedAccess": true, "strictNullChecks": true, "noEmit": true, - "types": ["vite/client"] + "types": [] }, "include": ["./src"] diff --git a/packages/daemon/package.json b/packages/daemon/package.json index da150422..bd7a9c77 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -23,7 +23,10 @@ "node-fetch": "^3.2.10", "pocketbase": "^0.8.0-rc1", "sqlite3": "^5.1.2", - "type-fest": "^3.1.0" + "type-fest": "^3.1.0", + "eventsource": "^2.0.2", + "tmp": "^0.2.1", + "@types/tmp": "^0.2.1" }, "devDependencies": { "tsx": "^3.11.0" diff --git a/packages/daemon/src/db/BackupMixin.ts b/packages/daemon/src/db/BackupMixin.ts new file mode 100644 index 00000000..a68206ce --- /dev/null +++ b/packages/daemon/src/db/BackupMixin.ts @@ -0,0 +1,82 @@ +import { + BackupRecord, + BackupRecordId, + BackupRecord_Create, + BackupRecord_Update, + BackupStatus, + InstanceId, + InstancesRecord, +} from '@pockethost/common' +import { safeCatch } from '../util/safeAsync' +import { MixinContext } from './PbClient' + +export type BackupApi = ReturnType + +export const createBackupMixin = (context: MixinContext) => { + const { client, rawDb } = context + + const createBackup = safeCatch( + `createBackup`, + async (instanceId: InstanceId) => { + const instance = await client + .collection('instances') + .getOne(instanceId) + if (!instance) { + throw new Error(`Expected ${instanceId} to be a valid instance`) + } + const { platform, version } = instance + const rec: BackupRecord_Create = { + instanceId, + status: BackupStatus.Queued, + platform, + version, + } + const created = await client + .collection('backups') + .create(rec) + return created + } + ) + + const updateBackup = safeCatch( + `updateBackup`, + async (backupId: BackupRecordId, fields: BackupRecord_Update) => { + await client.collection('backups').update(backupId, fields) + } + ) + + const resetBackups = safeCatch(`resetBackups`, async () => + rawDb('backups') + .whereNotIn('status', [ + BackupStatus.FinishedError, + BackupStatus.FinishedSuccess, + ]) + .delete() + ) + + const getNextBackupJob = safeCatch(`getNextBackupJob`, async () => { + return client + .collection('backups') + .getList(1, 1, { + filter: `status = '${BackupStatus.Queued}'`, + }) + .then((recs) => { + return recs.items[0] || null + }) + }) + + const getBackupJob = safeCatch( + `getBackupJob`, + async (backupId: BackupRecordId) => { + return client.collection('backups').getOne(backupId) + } + ) + + return { + createBackup, + updateBackup, + resetBackups, + getNextBackupJob, + getBackupJob, + } +} diff --git a/packages/daemon/src/db/InstanceMIxin.ts b/packages/daemon/src/db/InstanceMIxin.ts new file mode 100644 index 00000000..d07e1eda --- /dev/null +++ b/packages/daemon/src/db/InstanceMIxin.ts @@ -0,0 +1,110 @@ +import { + assertExists, + InstanceId, + InstancesRecord, + InstanceStatus, + UserRecord, +} from '@pockethost/common' +import { reduce } from '@s-libs/micro-dash' +import Bottleneck from 'bottleneck' +import { endOfMonth, startOfMonth } from 'date-fns' +import { dbg } from '../util/dbg' +import { safeCatch } from '../util/safeAsync' +import { MixinContext } from './PbClient' + +export type InstanceApi = ReturnType + +export const createInstanceMixin = (context: MixinContext) => { + const { client, rawDb } = context + + const getInstanceBySubdomain = safeCatch( + `getInstanceBySubdomain`, + (subdomain: string): Promise<[InstancesRecord, UserRecord] | []> => + client + .collection('instances') + .getFirstListItem(`subdomain = '${subdomain}'`) + .then((instance) => { + if (!instance) return [] + return client + .collection('users') + .getOne(instance.uid) + .then((user) => { + return [instance, user] + }) + }) + ) + + const updateInstance = safeCatch( + `updateInstance`, + async (instanceId: InstanceId, fields: Partial) => { + await client.collection('instances').update(instanceId, fields) + } + ) + + const updateInstanceStatus = safeCatch( + `updateInstanceStatus`, + async (instanceId: InstanceId, status: InstanceStatus) => { + await updateInstance(instanceId, { status }) + } + ) + + const getInstance = safeCatch( + `getInstance`, + async (instanceId: InstanceId) => { + return client.collection('instances').getOne(instanceId) + } + ) + + const updateInstances = safeCatch( + 'updateInstances', + async (cb: (rec: InstancesRecord) => Partial) => { + const res = await client + .collection('instances') + .getFullList(200) + const limiter = new Bottleneck({ maxConcurrent: 1 }) + const promises = reduce( + res, + (c, r) => { + c.push( + limiter.schedule(() => { + const toUpdate = cb(r) + dbg(`Updating instance ${r.id} with ${JSON.stringify(toUpdate)}`) + return client.collection('instances').update(r.id, toUpdate) + }) + ) + return c + }, + [] as Promise[] + ) + await Promise.all(promises) + } + ) + + const updateInstanceSeconds = safeCatch( + `updateInstanceSeconds`, + async (instanceId: InstanceId, forPeriod = new Date()) => { + const startIso = startOfMonth(forPeriod).toISOString() + const endIso = endOfMonth(forPeriod).toISOString() + const query = rawDb('invocations') + .sum('totalSeconds as t') + .where('instanceId', instanceId) + .where('startedAt', '>=', startIso) + .where('startedAt', '<=', endIso) + dbg(query.toString()) + const res = await query + const [row] = res + assertExists(row, `Expected row here`) + const secondsThisMonth = row.t + await updateInstance(instanceId, { secondsThisMonth }) + } + ) + + return { + updateInstance, + updateInstanceStatus, + getInstanceBySubdomain, + getInstance, + updateInstanceSeconds, + updateInstances, + } +} diff --git a/packages/daemon/src/db/InvocationMixin.ts b/packages/daemon/src/db/InvocationMixin.ts new file mode 100644 index 00000000..0f3a40f7 --- /dev/null +++ b/packages/daemon/src/db/InvocationMixin.ts @@ -0,0 +1,69 @@ +import { + InstancesRecord, + InvocationRecord, + pocketNow, +} from '@pockethost/common' +import { dbg } from '../util/dbg' +import { safeCatch } from '../util/safeAsync' +import { InstanceApi } from './InstanceMIxin' +import { MixinContext } from './PbClient' + +export const createInvocationMixin = ( + context: MixinContext, + instanceApi: InstanceApi +) => { + const { client } = context + + const createInvocation = safeCatch( + `createInvocation`, + async (instance: InstancesRecord, pid: number) => { + const init: Partial = { + startedAt: pocketNow(), + pid, + instanceId: instance.id, + totalSeconds: 0, + } + const _inv = await client + .collection('invocations') + .create(init) + return _inv + } + ) + + const pingInvocation = safeCatch( + `pingInvocation`, + async (invocation: InvocationRecord) => { + const totalSeconds = + (+new Date() - Date.parse(invocation.startedAt)) / 1000 + const toUpdate: Partial = { + totalSeconds, + } + const _inv = await client + .collection('invocations') + .update(invocation.id, toUpdate) + await instanceApi.updateInstanceSeconds(invocation.instanceId) + return _inv + } + ) + + const finalizeInvocation = safeCatch( + `finalizeInvocation`, + async (invocation: InvocationRecord) => { + dbg('finalizing') + const totalSeconds = + (+new Date() - Date.parse(invocation.startedAt)) / 1000 + const toUpdate: Partial = { + endedAt: pocketNow(), + totalSeconds, + } + dbg({ toUpdate }) + const _inv = await client + .collection('invocations') + .update(invocation.id, toUpdate) + await instanceApi.updateInstanceSeconds(invocation.instanceId) + return _inv + } + ) + + return { finalizeInvocation, pingInvocation, createInvocation } +} diff --git a/packages/daemon/src/db/JobMixin.ts b/packages/daemon/src/db/JobMixin.ts new file mode 100644 index 00000000..8d4088f1 --- /dev/null +++ b/packages/daemon/src/db/JobMixin.ts @@ -0,0 +1,60 @@ +import { JobRecord, JobStatus } from '@pockethost/common' +import { safeCatch } from '../util/safeAsync' +import { MixinContext } from './PbClient' + +export enum RecordSubscriptionActions { + Create = 'create', + Update = 'update', + Delete = 'delete', +} + +export const createJobMixin = (context: MixinContext) => { + const { client, rawDb } = context + const onNewJob = safeCatch( + `onNewJob`, + async (cb: (e: JobRecord) => void) => { + const unsub = await client + .collection('jobs') + .subscribe>('*', (e) => { + if (e.action !== RecordSubscriptionActions.Create) return + cb(e.record) + }) + return unsub + } + ) + + const resetJobs = safeCatch(`resetJobs`, async () => + rawDb('jobs') + .whereNotIn('status', [ + JobStatus.FinishedError, + JobStatus.FinishedSuccess, + ]) + .update({ + status: JobStatus.New, + }) + ) + + const incompleteJobs = safeCatch(`incompleteJobs`, async () => { + return client.collection('jobs').getFullList>(100, { + filter: `status != '${JobStatus.FinishedError}' && status != '${JobStatus.FinishedSuccess}'`, + }) + }) + + const rejectJob = safeCatch( + `rejectJob`, + async (job: JobRecord, message: string) => { + return client + .collection('jobs') + .update(job.id, { status: JobStatus.FinishedError, message }) + } + ) + + const setJobStatus = safeCatch( + `setJobStatus`, + async (job: JobRecord, status: JobStatus) => { + return client.collection('jobs').update(job.id, { status }) + } + ) + + return { incompleteJobs, resetJobs, onNewJob, rejectJob, setJobStatus } +} diff --git a/packages/daemon/src/db/PbClient.ts b/packages/daemon/src/db/PbClient.ts index 6dfb3d45..491496ea 100644 --- a/packages/daemon/src/db/PbClient.ts +++ b/packages/daemon/src/db/PbClient.ts @@ -1,22 +1,22 @@ +import { Knex } from 'knex' import { - assertExists, - InstanceId, - InstancesRecord, - InstanceStatus, - InvocationRecord, - pocketNow, - UserRecord, -} from '@pockethost/common' -import { reduce } from '@s-libs/micro-dash' -import Bottleneck from 'bottleneck' -import { endOfMonth, startOfMonth } from 'date-fns' -import PocketBase, { Collection } from 'pocketbase' + Collection, + default as PocketBase, + default as pocketbaseEs, +} from 'pocketbase' import { DAEMON_PB_DATA_DIR, PUBLIC_PB_SUBDOMAIN } from '../constants' import { Collection_Serialized } from '../migrate/schema' -import { dbg } from '../util/dbg' import { safeCatch } from '../util/safeAsync' +import { createBackupMixin } from './BackupMixin' +import { createInstanceMixin } from './InstanceMIxin' +import { createInvocationMixin } from './InvocationMixin' +import { createJobMixin } from './JobMixin' import { createRawPbClient } from './RawPbClient' +export type PocketbaseClientApi = ReturnType + +export type MixinContext = { client: pocketbaseEs; rawDb: Knex } + export const createPbClient = (url: string) => { console.log(`Initializing client: ${url}`) const rawDb = createRawPbClient( @@ -25,7 +25,7 @@ export const createPbClient = (url: string) => { const client = new PocketBase(url) client.beforeSend = (url: string, reqConfig: { [_: string]: any }) => { - dbg(reqConfig) + // dbg(reqConfig) delete reqConfig.signal return reqConfig } @@ -36,107 +36,6 @@ export const createPbClient = (url: string) => { client.admins.authWithPassword(email, password) ) - const getInstanceBySubdomain = safeCatch( - `getInstanceBySubdomain`, - (subdomain: string): Promise<[InstancesRecord, UserRecord] | []> => - client - .collection('instances') - .getFirstListItem(`subdomain = '${subdomain}'`) - .then((instance) => { - if (!instance) return [] - return client - .collection('users') - .getOne(instance.uid) - .then((user) => { - return [instance, user] - }) - }) - ) - - const updateInstance = safeCatch( - `updateInstance`, - async (instanceId: InstanceId, fields: Partial) => { - await client.collection('instances').update(instanceId, fields) - } - ) - - const updateInstanceStatus = safeCatch( - `updateInstanceStatus`, - async (instanceId: InstanceId, status: InstanceStatus) => { - await updateInstance(instanceId, { status }) - } - ) - - const createInvocation = safeCatch( - `createInvocation`, - async (instance: InstancesRecord, pid: number) => { - const init: Partial = { - startedAt: pocketNow(), - pid, - instanceId: instance.id, - totalSeconds: 0, - } - const _inv = await client - .collection('invocations') - .create(init) - return _inv - } - ) - - const pingInvocation = safeCatch( - `pingInvocation`, - async (invocation: InvocationRecord) => { - const totalSeconds = - (+new Date() - Date.parse(invocation.startedAt)) / 1000 - const toUpdate: Partial = { - totalSeconds, - } - const _inv = await client - .collection('invocations') - .update(invocation.id, toUpdate) - await updateInstanceSeconds(invocation.instanceId) - return _inv - } - ) - - const finalizeInvocation = safeCatch( - `finalizeInvocation`, - async (invocation: InvocationRecord) => { - dbg('finalizing') - const totalSeconds = - (+new Date() - Date.parse(invocation.startedAt)) / 1000 - const toUpdate: Partial = { - endedAt: pocketNow(), - totalSeconds, - } - dbg({ toUpdate }) - const _inv = await client - .collection('invocations') - .update(invocation.id, toUpdate) - await updateInstanceSeconds(invocation.instanceId) - return _inv - } - ) - - const updateInstanceSeconds = safeCatch( - `updateInstanceSeconds`, - async (instanceId: InstanceId, forPeriod = new Date()) => { - const startIso = startOfMonth(forPeriod).toISOString() - const endIso = endOfMonth(forPeriod).toISOString() - const query = rawDb('invocations') - .sum('totalSeconds as t') - .where('instanceId', instanceId) - .where('startedAt', '>=', startIso) - .where('startedAt', '<=', endIso) - dbg(query.toString()) - const res = await query - const [row] = res - assertExists(row, `Expected row here`) - const secondsThisMonth = row.t - await updateInstance(instanceId, { secondsThisMonth }) - } - ) - const applySchema = safeCatch( `applySchema`, async (collections: Collection_Serialized[]) => { @@ -144,40 +43,22 @@ export const createPbClient = (url: string) => { } ) - const updateInstances = safeCatch( - 'updateInstances', - async (cb: (rec: InstancesRecord) => Partial) => { - const res = await client - .collection('instances') - .getFullList(200) - const limiter = new Bottleneck({ maxConcurrent: 1 }) - const promises = reduce( - res, - (c, r) => { - c.push( - limiter.schedule(() => { - const toUpdate = cb(r) - dbg(`Updating instnace ${r.id} with ${JSON.stringify(toUpdate)}`) - return client.collection('instances').update(r.id, toUpdate) - }) - ) - return c - }, - [] as Promise[] - ) - await Promise.all(promises) - } - ) + const context: MixinContext = { client, rawDb } + const jobsApi = createJobMixin(context) + const instanceApi = createInstanceMixin(context) + const backupApi = createBackupMixin(context) + const invocationApi = createInvocationMixin(context, instanceApi) - return { - pingInvocation, - finalizeInvocation, - createInvocation, + const api = { + client, + knex: rawDb, adminAuthViaEmail, - getInstanceBySubdomain, - updateInstanceStatus, - updateInstance, applySchema, - updateInstances, + ...jobsApi, + ...instanceApi, + ...invocationApi, + ...backupApi, } + + return api } diff --git a/packages/daemon/src/migrate/applyDbMigrations.ts b/packages/daemon/src/migrate/applyDbMigrations.ts new file mode 100644 index 00000000..8c3e1453 --- /dev/null +++ b/packages/daemon/src/migrate/applyDbMigrations.ts @@ -0,0 +1,48 @@ +import { binFor } from '@pockethost/common' +import { + DAEMON_PB_PASSWORD, + DAEMON_PB_PORT_BASE, + DAEMON_PB_USERNAME, + PUBLIC_PB_DOMAIN, + PUBLIC_PB_PROTOCOL, + PUBLIC_PB_SUBDOMAIN, +} from '../constants' +import { createPbClient, PocketbaseClientApi } from '../db/PbClient' +import { mkInternalUrl } from '../util/internal' +import { spawnInstance } from '../util/spawnInstance' +import { tryFetch } from '../util/tryFetch' +import { schema } from './schema' + +export const applyDbMigrations = async ( + cb: (client: PocketbaseClientApi) => Promise +) => { + // Add `platform` and `bin` required columns (migrate db json) + try { + const mainProcess = await spawnInstance({ + subdomain: PUBLIC_PB_SUBDOMAIN, + slug: PUBLIC_PB_SUBDOMAIN, + port: DAEMON_PB_PORT_BASE, + bin: binFor('lollipop'), + }) + try { + const coreInternalUrl = mkInternalUrl(DAEMON_PB_PORT_BASE) + const client = createPbClient(coreInternalUrl) + await tryFetch(coreInternalUrl) + await client.adminAuthViaEmail(DAEMON_PB_USERNAME, DAEMON_PB_PASSWORD) + await client.applySchema(schema) + await cb(client) + } catch (e) { + console.error( + `***WARNING*** CANNOT AUTHENTICATE TO ${PUBLIC_PB_PROTOCOL}://${PUBLIC_PB_SUBDOMAIN}.${PUBLIC_PB_DOMAIN}/_/` + ) + console.error( + `***WARNING*** LOG IN MANUALLY, ADJUST .env, AND RESTART DOCKER` + ) + } finally { + console.log(`Exiting process`) + mainProcess.kill() + } + } catch (e) { + console.error(`${e}`) + } +} diff --git a/packages/daemon/src/migrate/migrate.ts b/packages/daemon/src/migrate/migrate.ts index 3ccfda14..5ad870f0 100644 --- a/packages/daemon/src/migrate/migrate.ts +++ b/packages/daemon/src/migrate/migrate.ts @@ -1,65 +1,44 @@ import { binFor, InstanceStatus } from '@pockethost/common' -import { chdir } from 'process' +import { renameSync } from 'fs' +import { resolve } from 'path' import { DAEMON_PB_BIN_DIR, DAEMON_PB_DATA_DIR, - DAEMON_PB_PASSWORD, - DAEMON_PB_PORT_BASE, - DAEMON_PB_USERNAME, - PUBLIC_PB_DOMAIN, - PUBLIC_PB_PROTOCOL, PUBLIC_PB_SUBDOMAIN, } from '../constants' -import { createPbClient } from '../db/PbClient' -import { mkInternalUrl } from '../util/internal' -import { tryFetch } from '../util/tryFetch' -import { _spawn } from '../util/_spawn' +import { backupInstance } from '../util/backupInstance' +import { error } from '../util/dbg' +import { applyDbMigrations } from './applyDbMigrations' import { pexec } from './pexec' -import { schema } from './schema' -const PB_BIN = `${DAEMON_PB_BIN_DIR}/${binFor('lollipop')}` -const DATA_ROOT = `${DAEMON_PB_DATA_DIR}/${PUBLIC_PB_SUBDOMAIN}` +const PB_BIN = resolve(DAEMON_PB_BIN_DIR, binFor('lollipop')) ;(async () => { - console.log(`Backing up`) - chdir(DATA_ROOT) - await pexec(`tar -czvf ${+new Date()}.tgz pb_data`) + await backupInstance( + PUBLIC_PB_SUBDOMAIN, + `${+new Date()}`, + async (progress) => { + console.log(progress) + } + ) console.log(`Upgrading`) await pexec(`${PB_BIN} upgrade --dir=pb_data`) - // Add `platform` and `bin` required columns (migrate db json) - try { - const mainProcess = await _spawn({ - subdomain: PUBLIC_PB_SUBDOMAIN, - port: DAEMON_PB_PORT_BASE, - bin: binFor('lollipop'), + await applyDbMigrations(async (client) => { + await client.updateInstances((instance) => { + const src = resolve(DAEMON_PB_DATA_DIR, instance.subdomain) + const dst = resolve(DAEMON_PB_DATA_DIR, instance.id) + try { + renameSync(src, dst) + } catch (e) { + error(`${e}`) + } + return { + status: InstanceStatus.Idle, + platform: instance.platform || 'ermine', + version: instance.version || 'latest', + } }) - try { - const coreInternalUrl = mkInternalUrl(DAEMON_PB_PORT_BASE) - const client = createPbClient(coreInternalUrl) - await tryFetch(coreInternalUrl) - await client.adminAuthViaEmail(DAEMON_PB_USERNAME, DAEMON_PB_PASSWORD) - await client.applySchema(schema) - await client.updateInstances((instance) => { - return { - status: instance.status || InstanceStatus.Idle, - platform: instance.platform || 'ermine', - version: instance.version || 'latest', - } - }) - } catch (e) { - console.error( - `***WARNING*** CANNOT AUTHENTICATE TO ${PUBLIC_PB_PROTOCOL}://${PUBLIC_PB_SUBDOMAIN}.${PUBLIC_PB_DOMAIN}/_/` - ) - console.error( - `***WARNING*** LOG IN MANUALLY, ADJUST .env, AND RESTART DOCKER` - ) - } finally { - console.log(`Exiting process`) - mainProcess.kill() - } - } catch (e) { - console.error(`${e}`) - } + }) })() diff --git a/packages/daemon/src/migrate/schema.ts b/packages/daemon/src/migrate/schema.ts index 248fbbbd..e6eec646 100644 --- a/packages/daemon/src/migrate/schema.ts +++ b/packages/daemon/src/migrate/schema.ts @@ -91,7 +91,7 @@ export const schema: Collection_Serialized[] = [ ], listRule: 'uid=@request.auth.id', viewRule: 'uid = @request.auth.id', - createRule: "uid = @request.auth.id && (status = 'idle' || status = '')", + createRule: "uid = @request.auth.id && (status = 'idle')", updateRule: null, deleteRule: null, options: {}, @@ -227,4 +227,166 @@ export const schema: Collection_Serialized[] = [ deleteRule: null, options: {}, }, + { + id: 'v7s41iokt1vizxd', + name: 'jobs', + type: 'base', + system: false, + schema: [ + { + id: 'yv38czcf', + name: 'userId', + type: 'relation', + system: false, + required: true, + unique: false, + options: { + maxSelect: 1, + collectionId: 'systemprofiles0', + cascadeDelete: false, + }, + }, + { + id: 'tgvaxwfv', + name: 'payload', + type: 'json', + system: false, + required: true, + unique: false, + options: {}, + }, + { + id: 'zede8pci', + name: 'status', + type: 'text', + system: false, + required: true, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + { + id: 'feovwsbr', + name: 'message', + type: 'text', + system: false, + required: false, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + ], + listRule: 'userId = @request.auth.id', + viewRule: 'userId = @request.auth.id', + createRule: "userId = @request.auth.id && status='new'", + updateRule: null, + deleteRule: null, + options: {}, + }, + { + id: '72clb6v41bzsay9', + name: 'backups', + type: 'base', + system: false, + schema: [ + { + id: 'someqtjw', + name: 'message', + type: 'text', + system: false, + required: false, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + { + id: 'jk4zwiaj', + name: 'instanceId', + type: 'relation', + system: false, + required: true, + unique: false, + options: { + maxSelect: 1, + collectionId: 'etae8tuiaxl6xfv', + cascadeDelete: false, + }, + }, + { + id: 'wsy3l5gm', + name: 'status', + type: 'text', + system: false, + required: true, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + { + id: 'gmkrc5d9', + name: 'bytes', + type: 'number', + system: false, + required: false, + unique: false, + options: { + min: null, + max: null, + }, + }, + { + id: '4lmammjz', + name: 'platform', + type: 'text', + system: false, + required: true, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + { + id: 'fheqxmbj', + name: 'version', + type: 'text', + system: false, + required: true, + unique: false, + options: { + min: null, + max: null, + pattern: '', + }, + }, + { + id: 'cinbmdwe', + name: 'progress', + type: 'json', + system: false, + required: false, + unique: false, + options: {}, + }, + ], + listRule: '@request.auth.id = instanceId.uid', + viewRule: null, + createRule: null, + updateRule: null, + deleteRule: null, + options: {}, + }, ] diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index 3c508329..9841cbdb 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -1,7 +1,41 @@ -import { createProxyServer } from './ProxyServer' -createProxyServer().then((api) => { +import { binFor } from '@pockethost/common' +import { DAEMON_PB_PORT_BASE, PUBLIC_PB_SUBDOMAIN } from './constants' +import { createPbClient } from './db/PbClient' +import { createBackupService } from './services/BackupService' +import { createInstanceService } from './services/InstanceService' +import { createJobService } from './services/JobService' +import { createProxyService } from './services/ProxyService' +import { mkInternalUrl } from './util/internal' +import { spawnInstance } from './util/spawnInstance' +// npm install eventsource --save +global.EventSource = require('eventsource') +;(async () => { + const coreInternalUrl = mkInternalUrl(DAEMON_PB_PORT_BASE) + + /** + * Launch central database + */ + const mainProcess = await spawnInstance({ + subdomain: PUBLIC_PB_SUBDOMAIN, + slug: PUBLIC_PB_SUBDOMAIN, + port: DAEMON_PB_PORT_BASE, + bin: binFor('lollipop'), + }) + + /** + * Launch services + */ + const client = createPbClient(coreInternalUrl) + const instanceService = await createInstanceService(client) + const proxyService = await createProxyService(instanceService) + const jobService = await createJobService(client) + const backupService = await createBackupService(client, jobService) + process.once('SIGUSR2', async () => { console.log(`SIGUSR2 detected`) - api.shutdown() + proxyService.shutdown() + instanceService.shutdown() + jobService.shutdown() + backupService.shutdown() }) -}) +})() diff --git a/packages/daemon/src/services/BackupService.ts b/packages/daemon/src/services/BackupService.ts new file mode 100644 index 00000000..a2b345b8 --- /dev/null +++ b/packages/daemon/src/services/BackupService.ts @@ -0,0 +1,105 @@ +import { + assertTruthy, + BackupStatus, + createTimerManager, + InstanceBackupJobPayload, + InstanceRestoreJobPayload, + JobCommands, +} from '@pockethost/common' +import { PocketbaseClientApi } from '../db/PbClient' +import { backupInstance } from '../util/backupInstance' +import { dbg } from '../util/dbg' +import { JobServiceApi } from './JobService' + +export const createBackupService = async ( + client: PocketbaseClientApi, + jobService: JobServiceApi +) => { + jobService.registerCommand( + JobCommands.BackupInstance, + async (unsafeJob) => { + const unsafePayload = unsafeJob.payload + const { instanceId } = unsafePayload + assertTruthy(instanceId, `Expected instanceId here`) + const instance = await client.getInstance(instanceId) + assertTruthy(instance, `Instance ${instanceId} not found`) + assertTruthy( + instance.uid === unsafeJob.userId, + `Instance ${instanceId} is not owned by user ${unsafeJob.userId}` + ) + await client.createBackup(instance.id) + } + ) + + jobService.registerCommand( + JobCommands.RestoreInstance, + async (unsafeJob) => { + const unsafePayload = unsafeJob.payload + const { backupId } = unsafePayload + assertTruthy(backupId, `Expected backupId here`) + const backup = await client.getBackupJob(backupId) + assertTruthy(backup, `Backup ${backupId} not found`) + const instance = await client.getInstance(backup.instanceId) + assertTruthy(instance, `Instance ${backup.instanceId} not found`) + assertTruthy( + instance.uid === unsafeJob.userId, + `Backup ${backupId} is not owned by user ${unsafeJob.userId}` + ) + + /** + * Restore strategy: + * + * 1. Place instance in maintenance mode + * 2. Shut down instance + * 3. Back up + * 4. Restore + * 5. Lift maintenance mode + */ + await client.createBackup(instance.id) + } + ) + + const tm = createTimerManager({}) + tm.repeat(async () => { + const backupRec = await client.getNextBackupJob() + if (!backupRec) { + dbg(`No backups requested`) + return true + } + const instance = await client.getInstance(backupRec.instanceId) + try { + await client.updateBackup(backupRec.id, { + status: BackupStatus.Running, + }) + let progress = backupRec.progress || {} + const bytes = await backupInstance( + instance.id, + backupRec.id, + (_progress) => { + progress = { ...progress, ..._progress } + dbg(_progress) + return client.updateBackup(backupRec.id, { + progress, + }) + } + ) + await client.updateBackup(backupRec.id, { + bytes, + status: BackupStatus.FinishedSuccess, + }) + } catch (e) { + await client.updateBackup(backupRec.id, { + status: BackupStatus.FinishedError, + message: `${e}`, + }) + } + return true + }, 1000) + + const shutdown = () => { + tm.shutdown() + } + return { + shutdown, + } +} diff --git a/packages/daemon/src/InstanceManager.ts b/packages/daemon/src/services/InstanceService.ts similarity index 64% rename from packages/daemon/src/InstanceManager.ts rename to packages/daemon/src/services/InstanceService.ts index 102f4f99..4f809326 100644 --- a/packages/daemon/src/InstanceManager.ts +++ b/packages/daemon/src/services/InstanceService.ts @@ -1,7 +1,14 @@ -import { assertTruthy, binFor, InstanceStatus } from '@pockethost/common' +import { + assertTruthy, + binFor, + createTimerManager, + InstanceId, + InstanceStatus, +} from '@pockethost/common' import { forEachRight, map } from '@s-libs/micro-dash' import Bottleneck from 'bottleneck' import getPort from 'get-port' +import { AsyncReturnType } from 'type-fest' import { DAEMON_PB_IDLE_TTL, DAEMON_PB_PASSWORD, @@ -12,16 +19,15 @@ import { PUBLIC_PB_DOMAIN, PUBLIC_PB_PROTOCOL, PUBLIC_PB_SUBDOMAIN, -} from './constants' -import { createPbClient } from './db/PbClient' -import { dbg, error } from './util/dbg' -import { mkInternalUrl } from './util/internal' -import { now } from './util/now' -import { safeCatch } from './util/safeAsync' -import { tryFetch } from './util/tryFetch' -import { PocketbaseProcess, _spawn } from './util/_spawn' +} from '../constants' +import { PocketbaseClientApi } from '../db/PbClient' +import { dbg, error } from '../util/dbg' +import { mkInternalUrl } from '../util/internal' +import { now } from '../util/now' +import { safeCatch } from '../util/safeAsync' +import { PocketbaseProcess, spawnInstance } from '../util/spawnInstance' -type Instance = { +type InstanceApi = { process: PocketbaseProcess internalUrl: string port: number @@ -29,27 +35,10 @@ type Instance = { startRequest: () => () => void } -export const createInstanceManger = async () => { - const instances: { [_: string]: Instance } = {} +export type InstanceServiceApi = AsyncReturnType +export const createInstanceService = async (client: PocketbaseClientApi) => { + const instances: { [_: string]: InstanceApi } = {} - const coreInternalUrl = mkInternalUrl(DAEMON_PB_PORT_BASE) - const client = createPbClient(coreInternalUrl) - const mainProcess = await _spawn({ - subdomain: PUBLIC_PB_SUBDOMAIN, - port: DAEMON_PB_PORT_BASE, - bin: binFor('lollipop'), - }) - instances[PUBLIC_PB_SUBDOMAIN] = { - process: mainProcess, - internalUrl: coreInternalUrl, - port: DAEMON_PB_PORT_BASE, - shutdown: async () => { - dbg(`Shutting down instance ${PUBLIC_PB_SUBDOMAIN}`) - mainProcess.kill() - }, - startRequest: () => () => {}, - } - await tryFetch(coreInternalUrl) try { await client.adminAuthViaEmail(DAEMON_PB_USERNAME, DAEMON_PB_PASSWORD) } catch (e) { @@ -100,8 +89,9 @@ export const createInstanceManger = async () => { await client.updateInstanceStatus(instance.id, InstanceStatus.Starting) - const childProcess = await _spawn({ + const childProcess = await spawnInstance({ subdomain, + slug: instance.id, port: newPort, bin: binFor(instance.platform, instance.version), onUnexpectedStop: (code) => { @@ -113,30 +103,28 @@ export const createInstanceManger = async () => { assertTruthy(pid, `Expected PID here but got ${pid}`) const invocation = await client.createInvocation(instance, pid) - const api: Instance = (() => { + const tm = createTimerManager({}) + const api: InstanceApi = (() => { let openRequestCount = 0 let lastRequest = now() - let tid: ReturnType const internalUrl = mkInternalUrl(newPort) const RECHECK_TTL = 1000 // 1 second - const _api: Instance = { + const _api: InstanceApi = { process: childProcess, internalUrl, port: newPort, shutdown: safeCatch( `Instance ${subdomain} invocation ${invocation.id} pid ${pid} shutdown`, async () => { - clearTimeout(tid) + tm.shutdown() await client.finalizeInvocation(invocation) const res = childProcess.kill() delete instances[subdomain] - if (subdomain !== PUBLIC_PB_SUBDOMAIN) { - await client.updateInstanceStatus( - instance.id, - InstanceStatus.Idle - ) - } + await client.updateInstanceStatus( + instance.id, + InstanceStatus.Idle + ) assertTruthy( res, `Expected child process to exit gracefully but got ${res}` @@ -156,24 +144,36 @@ export const createInstanceManger = async () => { } { - /** - * Heartbeat and idle shutdown - */ - const _beat = async () => { - dbg(`${subdomain} heartbeat: ${openRequestCount} open requests`) - await client.pingInvocation(invocation) - if ( - openRequestCount === 0 && - lastRequest + DAEMON_PB_IDLE_TTL < now() - ) { - dbg(`${subdomain} idle for ${DAEMON_PB_IDLE_TTL}, shutting down`) - await _api.shutdown() - } else { - dbg(`${openRequestCount} requests remain open on ${subdomain}`) - tid = setTimeout(_beat, RECHECK_TTL) - } - } - _beat() + tm.repeat( + safeCatch(`idleCheck`, async () => { + dbg(`${subdomain} idle check: ${openRequestCount} open requests`) + if ( + openRequestCount === 0 && + lastRequest + DAEMON_PB_IDLE_TTL < now() + ) { + dbg( + `${subdomain} idle for ${DAEMON_PB_IDLE_TTL}, shutting down` + ) + await _api.shutdown() + return false + } else { + dbg(`${openRequestCount} requests remain open on ${subdomain}`) + } + return true + }), + RECHECK_TTL + ) + } + + { + tm.repeat( + safeCatch(`uptime`, async () => { + dbg(`${subdomain} uptime`) + await client.pingInvocation(invocation) + return true + }), + 1000 + ) } return _api @@ -191,5 +191,7 @@ export const createInstanceManger = async () => { instance.shutdown() }) } - return { getInstance, shutdown } + + const maintenance = async (instanceId: InstanceId) => {} + return { getInstance, shutdown, maintenance } } diff --git a/packages/daemon/src/services/JobService.ts b/packages/daemon/src/services/JobService.ts new file mode 100644 index 00000000..15b91278 --- /dev/null +++ b/packages/daemon/src/services/JobService.ts @@ -0,0 +1,84 @@ +import { + assertTruthy, + JobCommands, + JobPayloadBase, + JobRecord, + JobStatus, +} from '@pockethost/common' +import { isObject } from '@s-libs/micro-dash' +import Bottleneck from 'bottleneck' +import { default as knexFactory } from 'knex' +import pocketbaseEs from 'pocketbase' +import { AsyncReturnType } from 'type-fest' +import { PocketbaseClientApi } from '../db/PbClient' +import { error } from '../util/dbg' + +export type JobServiceApi = AsyncReturnType + +export type KnexApi = ReturnType +export type CommandModuleInitializer = ( + register: JobServiceApi['registerCommand'], + client: pocketbaseEs, + knex: KnexApi +) => void + +export type JobHandler = ( + unsafeJob: JobRecord> +) => Promise + +export const createJobService = async (client: PocketbaseClientApi) => { + const limiter = new Bottleneck({ maxConcurrent: 1 }) + + const jobHandlers: { + [_ in JobCommands]?: JobHandler + } = {} + + const run = async (job: JobRecord) => + limiter.schedule(async () => { + try { + await client.setJobStatus(job, JobStatus.Queued) + const { payload } = job + assertTruthy(isObject(payload), `Payload must be an object`) + const unsafePayload = payload as Partial + const { cmd } = unsafePayload + assertTruthy(cmd, `Payload must contain command`) + const handler = jobHandlers[cmd] + if (!handler) { + throw new Error(`Job handler ${cmd} is not registered`) + } + console.log(`Running job ${job.id}`, job) + await client.setJobStatus(job, JobStatus.Running) + await handler(job) + await client.setJobStatus(job, JobStatus.FinishedSuccess) + } catch (e) { + await client.rejectJob(job, `${e}`).catch((e) => { + error(`job ${job.id} failed to reject with ${e}`) + }) + } + }) + + const unsub = await client.onNewJob(run) + await client.resetJobs() + await client.resetBackups() + const jobs = await client.incompleteJobs() + jobs.forEach(run) + + const shutdown = () => { + unsub() + } + + const registerCommand = ( + commandName: JobCommands, + handler: JobHandler + ) => { + if (jobHandlers[commandName]) { + throw new Error(`${commandName} job handler already registered.`) + } + jobHandlers[commandName] = handler + } + + return { + registerCommand, + shutdown, + } +} diff --git a/packages/daemon/src/ProxyServer.ts b/packages/daemon/src/services/ProxyService.ts similarity index 64% rename from packages/daemon/src/ProxyServer.ts rename to packages/daemon/src/services/ProxyService.ts index 73225ecb..1809a1c9 100644 --- a/packages/daemon/src/ProxyServer.ts +++ b/packages/daemon/src/services/ProxyService.ts @@ -1,12 +1,21 @@ import { createServer } from 'http' import httpProxy from 'http-proxy' -import { PUBLIC_APP_DOMAIN, PUBLIC_APP_PROTOCOL } from './constants' -import { createInstanceManger } from './InstanceManager' -import { dbg, info } from './util/dbg' +import { AsyncReturnType } from 'type-fest' +import { + DAEMON_PB_PORT_BASE, + PUBLIC_APP_DOMAIN, + PUBLIC_APP_PROTOCOL, + PUBLIC_PB_SUBDOMAIN, +} from '../constants' +import { dbg, info } from '../util/dbg' +import { mkInternalUrl } from '../util/internal' +import { InstanceServiceApi } from './InstanceService' -export const createProxyServer = async () => { - const instanceManager = await createInstanceManger() +export type ProxyServiceApi = AsyncReturnType +export const createProxyService = async ( + instanceManager: InstanceServiceApi +) => { const proxy = httpProxy.createProxyServer({}) const server = createServer(async (req, res) => { @@ -29,6 +38,13 @@ export const createProxyServer = async () => { throw new Error(`${host} has no subdomain.`) } try { + if (subdomain === PUBLIC_PB_SUBDOMAIN) { + const target = mkInternalUrl(DAEMON_PB_PORT_BASE) + dbg(`Forwarding proxy request for ${req.url} to instance ${target}`) + proxy.web(req, res, { target }) + return + } + const instance = await instanceManager.getInstance(subdomain) if (!instance) { throw new Error( @@ -36,9 +52,14 @@ export const createProxyServer = async () => { ) } + if (req.closed) { + throw new Error(`Request already closed.`) + } + dbg( `Forwarding proxy request for ${req.url} to instance ${instance.internalUrl}` ) + const endRequest = instance.startRequest() req.on('close', endRequest) proxy.web(req, res, { target: instance.internalUrl }) diff --git a/packages/daemon/src/util/backupInstance.ts b/packages/daemon/src/util/backupInstance.ts new file mode 100644 index 00000000..d517e5c3 --- /dev/null +++ b/packages/daemon/src/util/backupInstance.ts @@ -0,0 +1,109 @@ +import { BackupRecordId, InstanceId } from '@pockethost/common' +import { statSync } from 'fs' +import { basename, resolve } from 'path' +import { chdir, cwd } from 'process' +import { Database } from 'sqlite3' +import tmp from 'tmp' +import { DAEMON_PB_DATA_DIR } from '../constants' +import { pexec } from '../migrate/pexec' +import { dbg } from './dbg' +import { ensureDirExists } from './ensureDirExists' + +export type BackupProgress = { + current: number + total: number +} + +export type ProgressInfo = { + [src: string]: number +} +export type ProgressCallback = (info: ProgressInfo) => Promise + +export const PB_DATA_DIR = `pb_data` + +export const execBackup = ( + src: string, + dst: string, + progress?: ProgressCallback +) => { + const db = new Database(src) + const backup = db.backup(dst) + return new Promise((resolve, reject) => { + const _work = async () => { + if (backup.failed) { + reject() + return + } + if (backup.completed) { + backup.finish() + await progress?.({ + [basename(src)]: 1, + }) + resolve() + return + } + const pct = + backup.remaining === -1 ? 0 : 1 - backup.remaining / backup.pageCount + dbg(pct, backup.completed, backup.failed) + await progress?.({ + [basename(src)]: pct, + }) + if (backup.idle) { + await new Promise((resolve) => { + backup.step(5, () => resolve()) + }) + } + setTimeout(_work, 10) + } + _work() + }) +} + +export const backupInstance = async ( + instanceId: InstanceId, + backupId: BackupRecordId, + progress?: ProgressCallback +) => { + const dataRoot = resolve(DAEMON_PB_DATA_DIR, instanceId) + const backupTgzRoot = resolve(dataRoot, 'backup') + const backupTgzFile = resolve(backupTgzRoot, `${backupId}.tgz`) + const tmpObj = tmp.dirSync({ + unsafeCleanup: true, + }) + const backupTmpTargetRoot = resolve(tmpObj.name) + console.log({ + instanceId, + dataRoot, + backupTgzRoot, + backupTgzFile, + backupTmpTargetRoot, + }) + const _cwd = cwd() + try { + dbg(`Backing up ${dataRoot}`) + chdir(dataRoot) + ensureDirExists(backupTgzRoot) + ensureDirExists(resolve(backupTmpTargetRoot, PB_DATA_DIR)) + await Promise.all([ + execBackup( + `pb_data/data.db`, + resolve(backupTmpTargetRoot, PB_DATA_DIR, `data.db`), + progress + ), + execBackup( + `pb_data/logs.db`, + resolve(backupTmpTargetRoot, PB_DATA_DIR, `logs.db`), + progress + ), + ]) + chdir(backupTmpTargetRoot) + await pexec(`tar -czvf ${backupTgzFile} ${PB_DATA_DIR}`) + const stats = statSync(backupTgzFile) + const bytes = stats.size + return bytes + } finally { + console.log(`Removing again ${backupTmpTargetRoot}`) + tmpObj.removeCallback() + chdir(_cwd) + } +} diff --git a/packages/daemon/src/util/ensureDirExists.ts b/packages/daemon/src/util/ensureDirExists.ts new file mode 100644 index 00000000..898ecd59 --- /dev/null +++ b/packages/daemon/src/util/ensureDirExists.ts @@ -0,0 +1,10 @@ +import { mkdirSync } from 'fs' +import { dbg } from './dbg' + +export const ensureDirExists = (path: string) => { + try { + mkdirSync(path) + } catch (e) { + dbg(`${e}`) + } +} diff --git a/packages/daemon/src/util/safeAsync.ts b/packages/daemon/src/util/safeAsync.ts index 44d8a1fc..df858b53 100644 --- a/packages/daemon/src/util/safeAsync.ts +++ b/packages/daemon/src/util/safeAsync.ts @@ -1,6 +1,6 @@ import Bottleneck from 'bottleneck' import { ClientResponseError } from 'pocketbase' -import { dbg } from './dbg' +import { dbg, error } from './dbg' const limiter = new Bottleneck({ maxConcurrent: 1 }) @@ -35,30 +35,31 @@ export const safeCatch = ( const uuid = `${name}:${_c}` dbg(uuid, ...args) const tid = setTimeout(() => { - console.error(`ERROR: timeout waiting for ${uuid}`) + dbg(uuid, `WARNING: timeout waiting for ${uuid}`) }, 100) inside = uuid return cb(...args) .then((res) => { - dbg(`${name}:${_c} finished`) + dbg(uuid, `finished`) inside = '' clearTimeout(tid) return res }) .catch((e: any) => { if (e instanceof ClientResponseError) { - console.error(`PocketBase API error ${e}`) - console.error(JSON.stringify(e.data, null, 2)) + error(uuid, `PocketBase API error ${e}`) + error(uuid, JSON.stringify(e.data, null, 2)) if (e.status === 400) { - console.error( + error( + uuid, `It looks like you don't have permission to make this request.` ) } } else { - console.error(`${name} failed: ${e}`) + error(uuid, `failed: ${e}`) } - console.error(e) + error(uuid, e) throw e }) } diff --git a/packages/daemon/src/util/_spawn.ts b/packages/daemon/src/util/spawnInstance.ts similarity index 83% rename from packages/daemon/src/util/_spawn.ts rename to packages/daemon/src/util/spawnInstance.ts index fffac840..5ed06741 100644 --- a/packages/daemon/src/util/_spawn.ts +++ b/packages/daemon/src/util/spawnInstance.ts @@ -5,15 +5,18 @@ import { DAEMON_PB_BIN_DIR, DAEMON_PB_DATA_DIR } from '../constants' import { dbg } from './dbg' import { mkInternalAddress, mkInternalUrl } from './internal' import { tryFetch } from './tryFetch' -export type PocketbaseProcess = AsyncReturnType +export type PocketbaseProcess = AsyncReturnType -export const _spawn = async (cfg: { +export type Config = { subdomain: string + slug: string port: number bin: string onUnexpectedStop?: (code: number | null) => void -}) => { - const { subdomain, port, bin, onUnexpectedStop } = cfg +} + +export const spawnInstance = async (cfg: Config) => { + const { subdomain, port, bin, onUnexpectedStop, slug } = cfg const cmd = `${DAEMON_PB_BIN_DIR}/${bin}` if (!existsSync(cmd)) { throw new Error( @@ -24,7 +27,7 @@ export const _spawn = async (cfg: { const args = [ `serve`, `--dir`, - `${DAEMON_PB_DATA_DIR}/${subdomain}/pb_data`, + `${DAEMON_PB_DATA_DIR}/${slug}/pb_data`, `--http`, mkInternalAddress(port), ] diff --git a/packages/daemon/src/util/tryFetch.ts b/packages/daemon/src/util/tryFetch.ts index a7f66050..3064efec 100644 --- a/packages/daemon/src/util/tryFetch.ts +++ b/packages/daemon/src/util/tryFetch.ts @@ -1,4 +1,3 @@ -import fetch from 'node-fetch' import { dbg, error } from './dbg' export const tryFetch = (url: string) => diff --git a/packages/pockethost.io/.env-template b/packages/pockethost.io/.env-template index f4cd1765..8e8cb206 100644 --- a/packages/pockethost.io/.env-template +++ b/packages/pockethost.io/.env-template @@ -3,4 +3,4 @@ PUBLIC_APP_DOMAIN = localhost PUBLIC_PB_PROTOCOL=https PUBLIC_PB_SUBDOMAIN = pockethost-central PUBLIC_PB_DOMAIN = pockethost.io -PUBLIC_POCKETHOST_VERSION=0.4.1 \ No newline at end of file +PUBLIC_POCKETHOST_VERSION=0.5.0 \ No newline at end of file diff --git a/packages/pockethost.io/package.json b/packages/pockethost.io/package.json index ac93fb92..198a0b7a 100644 --- a/packages/pockethost.io/package.json +++ b/packages/pockethost.io/package.json @@ -21,17 +21,20 @@ "svelte-check": "^2.7.1", "svelte-preprocess": "^4.10.6", "tslib": "^2.3.1", - "typescript": "^4.7.4", + "typescript": "^4.8.0", "vite": "^3.1.0" }, "type": "module", "dependencies": { "@pockethost/common": "0.0.1", - "@s-libs/micro-dash": "12", + "@s-libs/micro-dash": "^14.1.0", "@types/bootstrap": "^5.2.6", "@types/js-cookie": "^3.0.2", + "async-mutex": "^0.4.0", + "date-fns": "^2.29.3", "js-cookie": "^3.0.1", "pocketbase": "^0.8.0-rc1", + "pretty-bytes": "^6.0.0", "random-word-slugs": "^0.1.6", "sass": "^1.54.9", "svelte-highlight": "^6.2.1" diff --git a/packages/pockethost.io/src/components/Navbar.svelte b/packages/pockethost.io/src/components/Navbar.svelte index 7277f41b..c5ff6aed 100644 --- a/packages/pockethost.io/src/components/Navbar.svelte +++ b/packages/pockethost.io/src/components/Navbar.svelte @@ -1,10 +1,10 @@
@@ -85,7 +85,7 @@ href="https://github.com/benallfree/pockethost/discussions" class="nav-link btn btn-outline-dark rounded-1 d-inline-block px-3" target="_blank" - rel="noreferrer">DiscussionsSupport diff --git a/packages/pockethost.io/src/pocketbase/PocketbaseClient.ts b/packages/pockethost.io/src/pocketbase/PocketbaseClient.ts index 50d38e84..cb10ae0e 100644 --- a/packages/pockethost.io/src/pocketbase/PocketbaseClient.ts +++ b/packages/pockethost.io/src/pocketbase/PocketbaseClient.ts @@ -1,15 +1,28 @@ import { createGenericSyncEvent } from '$util/events' import { assertExists, - createRealtimeSubscriptionManager, + JobCommands, + JobStatus, + type BackupRecord, + type BackupRecordId, + type InstanceBackupJobPayload, + type InstanceBackupJobRecord, type InstanceId, + type InstanceRestoreJobPayload, type InstancesRecord, type InstancesRecord_New, - type RealtimeEventHandler, + type JobRecord, + type JobRecord_In, type UserRecord } from '@pockethost/common' import { keys, map } from '@s-libs/micro-dash' -import PocketBase, { Admin, BaseAuthStore, ClientResponseError, Record } from 'pocketbase' +import PocketBase, { + Admin, + BaseAuthStore, + ClientResponseError, + Record, + type RecordSubscription +} from 'pocketbase' import type { Unsubscriber } from 'svelte/store' import { safeCatch } from '../util/safeCatch' @@ -26,6 +39,10 @@ export type PocketbaseClientApi = ReturnType export const createPocketbaseClient = (url: string) => { const client = new PocketBase(url) + client.beforeSend = (url, reqConfig) => { + delete reqConfig.signal + return reqConfig + } const { authStore } = client @@ -99,18 +116,32 @@ export const createPocketbaseClient = (url: string) => { client.collection('instances').getOne(id) ) - const { subscribeOne } = createRealtimeSubscriptionManager(client) - - const watchInstanceById = ( + const watchInstanceById = async ( id: InstanceId, - cb: RealtimeEventHandler - ): Unsubscriber => { + cb: (data: RecordSubscription) => void + ): Promise => { getInstanceById(id).then((record) => { // console.log(`Got instnace`, record) assertExists(record, `Expected instance ${id} here`) cb({ action: 'init', record }) }) - return subscribeOne('instances', id, cb) + return client.collection('instances').subscribe(id, cb) + } + + const watchBackupsByInstanceId = async ( + id: InstanceId, + cb: (data: RecordSubscription) => void + ): Promise => { + const unsub = client.collection('backups').subscribe('*', (e) => { + // console.log(e.record.instanceId, id) + if (e.record.instanceId !== id) return + cb(e) + }) + const existingBackups = await client + .collection('backups') + .getFullList(100, { filter: `instanceId = '${id}'` }) + existingBackups.forEach((record) => cb({ action: 'init', record })) + return unsub } const getAllInstancesById = safeCatch(`getAllInstancesById`, async () => @@ -142,7 +173,7 @@ export const createPocketbaseClient = (url: string) => { const getAuthStoreProps = (): AuthStoreProps => { const { token, model, isValid } = client.authStore as AuthStoreProps - // console.log(`curent authstore`, { token, model, isValid }) + // console.log(`current authStore`, { token, model, isValid }) if (model instanceof Admin) throw new Error(`Admin models not supported`) if (model && !model.email) throw new Error(`Expected model to be a user here`) return { @@ -211,6 +242,49 @@ export const createPocketbaseClient = (url: string) => { }) } + const createInstanceBackupJob = safeCatch( + `createInstanceBackupJob`, + async (instanceId: InstanceId) => { + const _user = user() + assertExists(_user, `Expected user to exist here`) + const { id: userId } = _user + const job: JobRecord_In = { + userId, + status: JobStatus.New, + payload: { + cmd: JobCommands.BackupInstance, + instanceId + } + } + const rec = await client.collection('jobs').create(job) + return rec + } + ) + + const createInstanceRestoreJob = safeCatch( + `createInstanceRestoreJob`, + async (backupId: BackupRecordId) => { + const _user = user() + assertExists(_user, `Expected user to exist here`) + const { id: userId } = _user + const job: JobRecord_In = { + userId, + status: JobStatus.New, + payload: { + cmd: JobCommands.RestoreInstance, + backupId + } + } + const rec = await client.collection('jobs').create>(job) + return rec + } + ) + + const [onJobUpdated, fireJobUpdated] = + createGenericSyncEvent>>() + + client.collection('jobs').subscribe>('*', fireJobUpdated) + return { getAuthStoreProps, parseError, @@ -227,6 +301,10 @@ export const createPocketbaseClient = (url: string) => { user, watchInstanceById, getAllInstancesById, - resendVerificationEmail + resendVerificationEmail, + watchBackupsByInstanceId, + onJobUpdated, + createInstanceBackupJob, + createInstanceRestoreJob } } diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/+layout.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/+layout.svelte new file mode 100644 index 00000000..c9ec0d1b --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/+layout.svelte @@ -0,0 +1,37 @@ + + + +
+ {#if $instance} +

{$instance.subdomain}

+ + + {/if} +
+
diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/+page.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/+page.svelte deleted file mode 100644 index 4126af45..00000000 --- a/packages/pockethost.io/src/routes/app/instances/[instanceId]/+page.svelte +++ /dev/null @@ -1,69 +0,0 @@ - - - - Your Instance - PocketHost - - -
- {#if instance} -
-
-

Admin URL

- -
- -

{`${url}/_`}

-
- -
- JavaScript: - -
-
- Running {instance.platform} - {humanVersion(instance.platform, instance.version)} -
- {/if} - - -
- - diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/+page.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/+page.svelte new file mode 100644 index 00000000..2f0b2b50 --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/+page.svelte @@ -0,0 +1,22 @@ + + + + {subdomain} details - PocketHost + + + + + + diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Backup.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Backup.svelte new file mode 100644 index 00000000..21106cbc --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Backup.svelte @@ -0,0 +1,108 @@ + + +
+

Backup

+ +
+ +
+ +
+ {#each $backups as { id, bytes, updated, platform, version, status, message, progress }} +
+ {#if status === BackupStatus.FinishedSuccess} +
+ {platform}:{version} ({prettyBytes(bytes)}) - Finished {new Date(updated)} +
+ {/if} + {#if status === BackupStatus.FinishedError} +
+ {platform}:{version} - Finished {new Date(updated)} + +
+ {/if} + {#if status !== BackupStatus.FinishedError && status !== BackupStatus.FinishedSuccess} +
+ {platform}:{version} + {status} + {#each Object.entries(progress || {}) as [src, pct]} +
+ {src} + + {Math.ceil(pct * 100)}% + +
+ {/each} + Started {formatDistanceToNow(Date.parse(updated))} ago +
+ {/if} +
+ {/each} +
+
+ + diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Code.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Code.svelte new file mode 100644 index 00000000..70e0d10d --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Code.svelte @@ -0,0 +1,18 @@ + + +
+

Code Samples

+
+ JavaScript: + +
+
diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Overview.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Overview.svelte new file mode 100644 index 00000000..47c43f89 --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Overview.svelte @@ -0,0 +1,22 @@ + + +
+

Overview

+ +
+ Running {platform} + {humanVersion(platform, version)} +
+
+ Admin URL: {`${url}/_`} +
+
diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Restore.svelte b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Restore.svelte new file mode 100644 index 00000000..9d6163c6 --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/details/Restore.svelte @@ -0,0 +1,90 @@ + + +
+

Restore

+ + {#if PUBLIC_APP_DOMAIN.toString().endsWith('.io')} + Contact support to perform a restore. + {/if} + {#if PUBLIC_APP_DOMAIN.toString().endsWith('.test')} + {#if $backups.length === 0} + You must create a backup first. + {/if} + {#if $backups.length > 0} + + +
+
+ Notice: Your instance will be placed in maintenance mode and then backed up before + restoring the selected snapshot. +
+ +
+ {/if} + {/if} +
+ + diff --git a/packages/pockethost.io/src/routes/app/instances/[instanceId]/store.ts b/packages/pockethost.io/src/routes/app/instances/[instanceId]/store.ts new file mode 100644 index 00000000..bdb650b5 --- /dev/null +++ b/packages/pockethost.io/src/routes/app/instances/[instanceId]/store.ts @@ -0,0 +1,4 @@ +import type { InstancesRecord } from '@pockethost/common' +import { writable } from 'svelte/store' + +export const instance = writable() diff --git a/packages/pockethost.io/src/routes/dashboard/+page.svelte b/packages/pockethost.io/src/routes/dashboard/+page.svelte index 77cb7530..eea5acfc 100644 --- a/packages/pockethost.io/src/routes/dashboard/+page.svelte +++ b/packages/pockethost.io/src/routes/dashboard/+page.svelte @@ -1,10 +1,10 @@