mirror of
https://github.com/pockethost/pockethost.git
synced 2025-03-30 15:08:30 +00:00
feat(pockethost): edge mirror
This commit is contained in:
parent
c2982d9837
commit
7bfc99af22
@ -4,6 +4,10 @@ module.exports = {
|
||||
name: `firewall`,
|
||||
script: 'pnpm prod:cli firewall serve',
|
||||
},
|
||||
{
|
||||
name: `edge-mirror`,
|
||||
script: 'pnpm prod:cli edge mirror serve',
|
||||
},
|
||||
{
|
||||
name: `edge-daemon`,
|
||||
script: 'pnpm prod:cli edge daemon serve',
|
||||
|
@ -0,0 +1,123 @@
|
||||
import { forEach, reduce } from '@s-libs/micro-dash'
|
||||
import {
|
||||
INSTANCE_COLLECTION,
|
||||
InstanceFields,
|
||||
InstanceId,
|
||||
LoggerService,
|
||||
UserFields,
|
||||
UserId,
|
||||
WithCredentials,
|
||||
WithUser,
|
||||
mkInstanceCanonicalHostname,
|
||||
mkInstanceHostname,
|
||||
mkSingleton,
|
||||
} from '../../../../../../core'
|
||||
import { MothershipAdminClientService } from '../../../../../services'
|
||||
|
||||
export const EdgeMirror = mkSingleton(async () => {
|
||||
const { dbg, info, error } = LoggerService().create(`EdgeMirror`)
|
||||
|
||||
info(`Initializing edge mirror`)
|
||||
const adminSvc = await MothershipAdminClientService()
|
||||
const { client } = adminSvc.client
|
||||
|
||||
type MirrorUserFields = UserFields<WithCredentials>
|
||||
type MirrorInstanceFields = InstanceFields<WithUser<MirrorUserFields>>
|
||||
|
||||
const instanceCleanupsById: { [_: InstanceId]: () => void } = {}
|
||||
const instancesById: { [_: InstanceId]: InstanceFields | undefined } = {}
|
||||
const instancesByHostName: { [_: string]: InstanceFields | undefined } = {}
|
||||
const usersById: {
|
||||
[_: UserId]: MirrorUserFields
|
||||
} = {}
|
||||
|
||||
client
|
||||
.collection(`users`)
|
||||
.subscribe<UserFields>(`*`, (e) => {
|
||||
const { action, record } = e
|
||||
if ([`create`, `update`].includes(action)) {
|
||||
client
|
||||
.collection(`verified_users`)
|
||||
.getOne<MirrorUserFields>(record.id)
|
||||
.then((v) => {
|
||||
updateUser(v)
|
||||
})
|
||||
.catch(error)
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
error(`Failed to subscribe to users`, e)
|
||||
})
|
||||
|
||||
client
|
||||
.collection(INSTANCE_COLLECTION)
|
||||
.subscribe<InstanceFields>(`*`, (e) => {
|
||||
const { action, record } = e
|
||||
if ([`create`, `update`].includes(action)) {
|
||||
setItem(record)
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
error(`Failed to subscribe to instances`, e)
|
||||
})
|
||||
|
||||
info(`Loading mirror data`)
|
||||
await client
|
||||
.send(`/api/mirror`, { method: `GET` })
|
||||
.then(({ instances, users }) => {
|
||||
const usersById: { [_: UserId]: MirrorUserFields } = reduce(
|
||||
users,
|
||||
(acc, user) => ({ ...acc, [user.id]: user }),
|
||||
{},
|
||||
)
|
||||
forEach(instances, (record) => {
|
||||
record.expand = { uid: usersById[record.uid] }
|
||||
setItem(record, true)
|
||||
})
|
||||
info(`Mirror data loaded`)
|
||||
})
|
||||
.catch(error)
|
||||
|
||||
function updateUser(record: MirrorUserFields) {
|
||||
dbg(`Updating user ${record.email} (${record.id})`, { record })
|
||||
usersById[record.id] = record
|
||||
}
|
||||
|
||||
function setItem(record: InstanceFields, safe = false) {
|
||||
if (safe && instancesById[record.id]) {
|
||||
dbg(`Skipping instance update ${record.subdomain} (${record.id})`)
|
||||
return
|
||||
}
|
||||
instanceCleanupsById[record.id]?.()
|
||||
instancesById[record.id] = record
|
||||
if (record.cname) {
|
||||
instancesByHostName[record.cname] = record
|
||||
}
|
||||
instancesByHostName[mkInstanceHostname(record)] = record
|
||||
instancesByHostName[mkInstanceCanonicalHostname(record)] = record
|
||||
instanceCleanupsById[record.id] = () => {
|
||||
dbg(`Cleaning up instance ${record.subdomain} (${record.id})`)
|
||||
delete instancesById[record.id]
|
||||
delete instancesByHostName[mkInstanceHostname(record)]
|
||||
delete instancesByHostName[mkInstanceCanonicalHostname(record)]
|
||||
if (record.cname) {
|
||||
delete instancesByHostName[record.cname]
|
||||
}
|
||||
}
|
||||
dbg(`Updating instance ${record.subdomain} (${record.id})`)
|
||||
}
|
||||
|
||||
function getItem(host: string): MirrorInstanceFields | null {
|
||||
const instance = instancesByHostName[host]
|
||||
if (!instance) return null
|
||||
const user = usersById[instance.uid]
|
||||
if (!user) {
|
||||
throw new Error(
|
||||
`User ${instance.uid} not found for instance ${instance.subdomain} (${instance.uid})`,
|
||||
)
|
||||
}
|
||||
return { ...instance, expand: { uid: user } }
|
||||
}
|
||||
|
||||
return { getItem }
|
||||
})
|
@ -0,0 +1,24 @@
|
||||
import fetch from 'node-fetch'
|
||||
import { InstanceFields_WithUser, mkSingleton } from '../../../../../common'
|
||||
import { mkEdgeMirrorUrl } from './helpers'
|
||||
|
||||
export const EdgeMirrorClient = mkSingleton(() => {
|
||||
const getItem = (host: string) =>
|
||||
fetch(mkEdgeMirrorUrl(`getItem`, host)).then(
|
||||
(res) => res.json() as Promise<InstanceFields_WithUser | null>,
|
||||
)
|
||||
|
||||
const setItem = (record: InstanceFields_WithUser) =>
|
||||
fetch(mkEdgeMirrorUrl(`setItem`), {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(record),
|
||||
})
|
||||
|
||||
return {
|
||||
getItem,
|
||||
setItem,
|
||||
}
|
||||
})
|
@ -0,0 +1,5 @@
|
||||
import { join } from 'path'
|
||||
import { PH_EDGE_MIRROR_PORT } from '../../../../../constants'
|
||||
|
||||
export const mkEdgeMirrorUrl = (...paths: string[]) =>
|
||||
join(`http://localhost:${PH_EDGE_MIRROR_PORT()}`, ...paths)
|
@ -0,0 +1,15 @@
|
||||
import { Command } from 'commander'
|
||||
import { startInstanceMirrorServer } from './server'
|
||||
|
||||
type Options = {
|
||||
debug: boolean
|
||||
}
|
||||
|
||||
export const ServeCommand = () => {
|
||||
const cmd = new Command(`serve`)
|
||||
.description(`Run an edge cache server`)
|
||||
.action(async (options: Options) => {
|
||||
await startInstanceMirrorServer()
|
||||
})
|
||||
return cmd
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
import express from 'express'
|
||||
import { LoggerService } from '../../../../../common'
|
||||
import { PH_EDGE_MIRROR_PORT } from '../../../../../constants'
|
||||
import { MothershipAdminClientService } from '../../../../../services'
|
||||
import { EdgeMirror } from './EdgeMirror'
|
||||
|
||||
export async function startInstanceMirrorServer() {
|
||||
const logger = LoggerService().create(`EdgeCacheServeCommand`)
|
||||
const { dbg, error, info, warn } = logger
|
||||
info(`Starting`)
|
||||
|
||||
await MothershipAdminClientService({})
|
||||
const cache = await EdgeMirror({})
|
||||
|
||||
const app = express()
|
||||
|
||||
app.get(`/getItem/:host`, (req, res) => {
|
||||
const { host } = req.params
|
||||
const cached = cache.getItem(host)
|
||||
if (!cached) {
|
||||
info(`Cache miss for ${host}`)
|
||||
return res.status(404).json(null)
|
||||
}
|
||||
res.json(cached)
|
||||
})
|
||||
|
||||
app.listen(PH_EDGE_MIRROR_PORT(), () => {
|
||||
info(`Listening on port ${PH_EDGE_MIRROR_PORT()}`)
|
||||
})
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
import { Command } from 'commander'
|
||||
import { ServeCommand } from './ServeCommand'
|
||||
|
||||
type Options = {
|
||||
debug: boolean
|
||||
}
|
||||
|
||||
export const MirrorCommand = () => {
|
||||
const cmd = new Command(`mirror`)
|
||||
.description(`Instance mirror commands`)
|
||||
.addCommand(ServeCommand())
|
||||
return cmd
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
import { Command } from 'commander'
|
||||
import { DaemonCommand } from './DaemonCommand'
|
||||
import { FtpCommand } from './FtpCommand'
|
||||
import { MirrorCommand } from './MirrorCommand'
|
||||
import { SyslogCommand } from './SyslogCommand'
|
||||
|
||||
type Options = {
|
||||
@ -14,5 +15,6 @@ export const EdgeCommand = () => {
|
||||
.addCommand(DaemonCommand())
|
||||
.addCommand(FtpCommand())
|
||||
.addCommand(SyslogCommand())
|
||||
.addCommand(MirrorCommand())
|
||||
return cmd
|
||||
}
|
||||
|
@ -279,6 +279,9 @@ export const DOCKER_CONTAINER_HOST = () => settings().DOCKER_CONTAINER_HOST
|
||||
export const PH_GOBOT_ROOT = (...paths: string[]) =>
|
||||
join(settings().PH_GOBOT_ROOT, ...paths)
|
||||
|
||||
export const PH_EDGE_MIRROR_PORT = () =>
|
||||
env.get('PH_EDGE_MIRROR_PORT').default(3001).asPortNumber()
|
||||
|
||||
export const PH_GOBOT_VERBOSITY = () =>
|
||||
env.get(`PH_GOBOT_VERBOSITY`).default(1).asIntPositive()
|
||||
|
||||
@ -353,6 +356,7 @@ export const logConstants = () => {
|
||||
PH_GOBOT_ROOT,
|
||||
MOTHERSHIP_DATA_ROOT,
|
||||
MOTHERSHIP_DATA_DB,
|
||||
PH_EDGE_MIRROR_PORT,
|
||||
PH_GOBOT_VERBOSITY,
|
||||
}
|
||||
forEach(vars, (v, k) => {
|
||||
|
@ -0,0 +1,22 @@
|
||||
/// <reference path="../types/types.d.ts" />
|
||||
|
||||
routerAdd(
|
||||
'GET',
|
||||
'/api/mirror',
|
||||
(c) => {
|
||||
const users = $app
|
||||
.dao()
|
||||
.findRecordsByExpr('verified_users', $dbx.exp('1=1'))
|
||||
|
||||
const instances = $app
|
||||
.dao()
|
||||
.findRecordsByExpr(
|
||||
'instances',
|
||||
$dbx.exp('instances.uid in (select id from verified_users)'),
|
||||
)
|
||||
|
||||
return c.json(200, { users, instances })
|
||||
},
|
||||
$apis.gzip(),
|
||||
$apis.requireAdminAuth(),
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user