enh(pockethost): mothership mirror

This commit is contained in:
Ben Allfree 2025-07-21 08:49:00 -07:00
parent a9067b1a88
commit fd4b8339dc
3 changed files with 156 additions and 63 deletions

View File

@ -15,6 +15,7 @@ import {
} from '@'
import Dockerode from 'dockerode'
import { ErrorRequestHandler } from 'express'
import { MothershipMirrorService } from 'src/services/MothershipMirrorService'
export async function daemon() {
const { info, warn } = logger()
@ -53,6 +54,8 @@ export async function daemon() {
password: MOTHERSHIP_ADMIN_PASSWORD(),
})
await MothershipMirrorService({ client: (await MothershipAdminClientService()).client.client })
await proxyService({
coreInternalUrl: MOTHERSHIP_URL(),
})

View File

@ -1,9 +1,7 @@
import {
APP_URL,
ClientResponseError,
DAEMON_PB_IDLE_TTL,
DOC_URL,
EDGE_APEX_DOMAIN,
INSTANCE_APP_HOOK_DIR,
INSTANCE_APP_MIGRATIONS_DIR,
InstanceFields,
@ -29,7 +27,7 @@ import Bottleneck from 'bottleneck'
import { globSync } from 'glob'
import { basename, join } from 'path'
import { AsyncReturnType } from 'type-fest'
import { mkInstanceCache } from './mkInstanceCache'
import { MothershipMirrorService } from '../MothershipMirrorService'
enum InstanceApiStatus {
Starting = 'starting',
@ -233,64 +231,7 @@ export const instanceService = mkSingleton(async (config: InstanceServiceConfig)
}
}
const getInstanceRecord = (() => {
const cache = mkInstanceCache(client.client)
return async (host: string) => {
if (cache.hasItem(host)) {
dbg(`cache hit ${host}`)
return cache.getItem(host)
}
dbg(`cache miss ${host}`)
{
dbg(`Trying to get instance by host: ${host}`)
const instance = await client
.getInstanceByCname(host)
.catch((e: ClientResponseError) => {
if (e.status !== 404) {
throw new Error(`Unexpected response ${stringify(e)} from mothership`)
}
})
if (instance) {
dbg(`${host} is a cname`)
cache.setItem(instance)
return instance
}
}
const idOrSubdomain = host.replace(`.${EDGE_APEX_DOMAIN()}`, '')
{
dbg(`Trying to get instance by ID: ${idOrSubdomain}`)
const instance = await client.getInstanceById(idOrSubdomain).catch((e: ClientResponseError) => {
if (e.status !== 404) {
throw new Error(`Unexpected response ${stringify(e)} from mothership`)
}
})
if (instance) {
dbg(`${idOrSubdomain} is an instance ID`)
cache.setItem(instance)
return instance
}
}
{
dbg(`Trying to get instance by subdomain: ${idOrSubdomain}`)
const instance = await client.getInstanceBySubdomain(idOrSubdomain).catch((e: ClientResponseError) => {
if (e.status !== 404) {
throw new Error(`Unexpected response ${stringify(e)} from mothership`)
}
})
if (instance) {
dbg(`${idOrSubdomain} is a subdomain`)
cache.setItem(instance)
return instance
}
}
dbg(`${host} is none of: cname, id, subdomain`)
cache.blankItem(host)
}
})()
const mirror = await MothershipMirrorService()
;(await proxyService()).use(async (req, res, next) => {
const logger = LoggerService().create(`InstanceRequest`)
@ -299,12 +240,12 @@ export const instanceService = mkSingleton(async (config: InstanceServiceConfig)
const { host, proxy } = res.locals
const instance = await getInstanceRecord(host)
const instance = await mirror.getInstanceByHost(host)
if (!instance) {
res.status(404).end(`${host} not found`)
return
}
const owner = instance.expand.uid
const owner = await mirror.getUser(instance.uid)
if (!owner) {
throw new Error(`Instance owner is invalid`)
}

View File

@ -0,0 +1,149 @@
import {
EDGE_APEX_DOMAIN,
InstanceFields,
InstanceId,
LoggerService,
mkSingleton,
PocketBase,
SingletonBaseConfig,
UserFields,
UserId,
} from '@'
import { forEach } from '@s-libs/micro-dash'
export type MothershipMirrorServiceConfig = SingletonBaseConfig & {
client: PocketBase
}
export const MothershipMirrorService = mkSingleton(async (config: MothershipMirrorServiceConfig) => {
const { dbg, error } = LoggerService().create(`MothershipMirrorService`)
const client = config.client
const mirror: {
users: { [_: UserId]: UserFields }
instancesById: { [_: InstanceId]: InstanceFields }
instancesByCanonicalId: { [_: string]: InstanceFields }
instancesByCanonicalSubdomain: { [_: string]: InstanceFields }
instancesBySubdomain: { [_: string]: InstanceFields }
instancesByCname: { [_: string]: InstanceFields }
} = {
users: {},
instancesById: {},
instancesByCanonicalId: {},
instancesByCanonicalSubdomain: {},
instancesBySubdomain: {},
instancesByCname: {},
}
const upsertInstance = (record: InstanceFields) => {
dbg(`upsertInstance`, { record })
deleteInstance(record.id)
const canonicalId = `${record.id}.${EDGE_APEX_DOMAIN()}`
const canonicalSubdomain = `${record.subdomain}.${EDGE_APEX_DOMAIN()}`
mirror.instancesById[record.id] = record
mirror.instancesBySubdomain[record.subdomain] = record
mirror.instancesByCanonicalId[canonicalId] = record
mirror.instancesByCanonicalSubdomain[canonicalSubdomain] = record
if (record.cname) {
mirror.instancesByCname[record.cname] = record
}
}
const deleteInstance = (id: InstanceId) => {
const oldInstance = mirror.instancesById[id]
if (oldInstance) {
const canonicalId = `${oldInstance.id}.${EDGE_APEX_DOMAIN()}`
const canonicalSubdomain = `${oldInstance.subdomain}.${EDGE_APEX_DOMAIN()}`
delete mirror.instancesById[canonicalId]
delete mirror.instancesBySubdomain[canonicalSubdomain]
delete mirror.instancesByCname[oldInstance.cname]
delete mirror.instancesByCanonicalId[canonicalId]
delete mirror.instancesByCanonicalSubdomain[canonicalSubdomain]
}
}
const upsertUser = (record: UserFields) => {
dbg(`upsertUser`, { record })
deleteUser(record.id)
mirror.users[record.id] = record
}
const deleteUser = (id: UserId) => {
const oldUser = mirror.users[id]
if (oldUser) {
delete mirror.users[oldUser.id]
}
}
client.collection(`instances`).subscribe<InstanceFields>(`*`, (e) => {
const { action, record } = e
if (action === `create` || action === `update`) {
dbg(`instance`, { action, record })
upsertInstance(record)
}
if (action === `delete`) {
dbg(`instance`, { action, record })
deleteInstance(record.id)
}
})
client.collection(`users`).subscribe<UserFields>(`*`, (e) => {
const deleteUser = (id: UserId) => {
const oldUser = mirror.users[id]
if (oldUser) {
delete mirror.users[oldUser.id]
}
}
const { action, record } = e
if (action === `create` || action === `update`) {
dbg(`user`, { action, record })
upsertUser(record)
}
if (action === `delete`) {
dbg(`user`, { action, record })
deleteUser(record.id)
}
})
const init = async () => {
dbg(`init`)
const instancesPromise = client
.collection(`instances`)
.getFullList<InstanceFields>()
.then((instances) => {
dbg(`instances: ${instances.length}`)
forEach(instances, (instance) => {
upsertInstance(instance)
})
})
const usersPromise = client
.collection(`users`)
.getFullList<UserFields>()
.then((users) => {
dbg(`users: ${users.length}`)
forEach(users, (user) => {
upsertUser(user)
})
})
await Promise.all([instancesPromise, usersPromise])
}
await init().catch(error)
const api = {
async getInstance(id: InstanceId) {
return mirror.instancesById[id]
},
async getInstanceByHost(host: string) {
return (
mirror.instancesByCname[host] ||
mirror.instancesByCanonicalSubdomain[host] ||
mirror.instancesByCanonicalId[host]
)
},
async getUser(id: UserId) {
return mirror.users[id]
},
}
return api
})