chore: instanceService refactor

This commit is contained in:
Ben Allfree 2022-12-31 06:39:28 -08:00
parent 93353f8570
commit 337c2818a7
3 changed files with 201 additions and 194 deletions

View File

@ -3,7 +3,7 @@ import { DEBUG, PH_BIN_CACHE, PUBLIC_PB_SUBDOMAIN } from './constants'
import { clientService } from './db/PbClient'
import { createBackupService } from './services/BackupService'
import { ftpService } from './services/FtpService/FtpService'
import { createInstanceService } from './services/InstanceService'
import { instanceService } from './services/InstanceService'
import { pocketbase } from './services/PocketBaseService'
import { createProxyService } from './services/ProxyService'
import { rpcService } from './services/RpcService'
@ -36,9 +36,8 @@ global.EventSource = require('eventsource')
ftpService({})
await rpcService({})
const instanceService = await createInstanceService({})
await instanceService({})
const proxyService = await createProxyService({
instanceManager: instanceService,
coreInternalUrl: url,
})
const backupService = await createBackupService()
@ -50,7 +49,7 @@ global.EventSource = require('eventsource')
info(`Shutting down`)
ftpService().shutdown()
proxyService.shutdown()
instanceService.shutdown()
;(await instanceService()).shutdown()
;(await rpcService()).shutdown()
pbService.shutdown()
}

View File

@ -7,6 +7,7 @@ import {
InstanceId,
InstanceStatus,
logger,
mkSingleton,
RpcCommands,
} from '@pockethost/common'
import { forEachRight, map } from '@s-libs/micro-dash'
@ -36,205 +37,211 @@ type InstanceApi = {
export type InstanceServiceConfig = {}
export type InstanceServiceApi = AsyncReturnType<typeof createInstanceService>
export const createInstanceService = async (config: InstanceServiceConfig) => {
const { dbg, raw, error, warn } = logger().create('InstanceService')
const client = await clientService()
export type InstanceServiceApi = AsyncReturnType<typeof instanceService>
export const instanceService = mkSingleton(
async (config: InstanceServiceConfig) => {
const { dbg, raw, error, warn } = logger().create('InstanceService')
const client = await clientService()
const { registerCommand } = await rpcService()
const { registerCommand } = await rpcService()
const pbService = await pocketbase()
const pbService = await pocketbase()
registerCommand<CreateInstancePayload, CreateInstanceResult>(
RpcCommands.CreateInstance,
CreateInstancePayloadSchema,
async (rpc) => {
const { payload } = rpc
const { subdomain } = payload
const instance = await client.createInstance({
subdomain,
uid: rpc.userId,
version: (await pocketbase()).getLatestVersion(),
status: InstanceStatus.Idle,
platform: 'unused',
secondsThisMonth: 0,
isBackupAllowed: false,
})
return { instance }
}
)
const instances: { [_: string]: InstanceApi } = {}
const instanceLimiter = new Bottleneck({ maxConcurrent: 1 })
const getInstance = (subdomain: string) =>
instanceLimiter.schedule(async () => {
// dbg(`Getting instance ${subdomain}`)
{
const instance = instances[subdomain]
if (instance) {
// dbg(`Found in cache: ${subdomain}`)
return instance
}
registerCommand<CreateInstancePayload, CreateInstanceResult>(
RpcCommands.CreateInstance,
CreateInstancePayloadSchema,
async (rpc) => {
const { payload } = rpc
const { subdomain } = payload
const instance = await client.createInstance({
subdomain,
uid: rpc.userId,
version: (await pocketbase()).getLatestVersion(),
status: InstanceStatus.Idle,
platform: 'unused',
secondsThisMonth: 0,
isBackupAllowed: false,
})
return { instance }
}
)
const clientLimiter = new Bottleneck({ maxConcurrent: 1 })
const instances: { [_: string]: InstanceApi } = {}
dbg(`Checking ${subdomain} for permission`)
const [instance, owner] = await clientLimiter.schedule(() =>
client.getInstanceBySubdomain(subdomain)
)
if (!instance) {
dbg(`${subdomain} not found`)
return
}
if (!owner?.verified) {
throw new Error(
`Log in at ${PUBLIC_APP_PROTOCOL}://${PUBLIC_APP_DOMAIN} to verify your account.`
)
}
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Port)
)
dbg(`${subdomain} found in DB`)
const exclude = map(instances, (i) => i.port)
const newPort = await getPort({
port: DAEMON_PB_PORT_BASE,
exclude,
}).catch((e) => {
error(`Failed to get port for ${subdomain}`)
throw e
})
dbg(`Found port for ${subdomain}: ${newPort}`)
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Starting)
)
const childProcess = await pbService.spawn({
command: 'serve',
slug: instance.id,
port: newPort,
version: instance.version,
onUnexpectedStop: (code) => {
warn(`${subdomain} exited unexpectedly with ${code}`)
api.shutdown()
},
})
const { pid } = childProcess
assertTruthy(pid, `Expected PID here but got ${pid}`)
if (!instance.isBackupAllowed) {
await client.updateInstance(instance.id, { isBackupAllowed: true })
}
const invocation = await clientLimiter.schedule(() =>
client.createInvocation(instance, pid)
)
const tm = createTimerManager({})
const api: InstanceApi = (() => {
let openRequestCount = 0
let lastRequest = now()
const internalUrl = mkInternalUrl(newPort)
const RECHECK_TTL = 1000 // 1 second
const _api: InstanceApi = {
process: childProcess,
internalUrl,
port: newPort,
shutdown: safeCatch(
`Instance ${subdomain} invocation ${invocation.id} pid ${pid} shutdown`,
async () => {
tm.shutdown()
await clientLimiter.schedule(() =>
client.finalizeInvocation(invocation)
)
const res = childProcess.kill()
delete instances[subdomain]
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Idle)
)
assertTruthy(
res,
`Expected child process to exit gracefully but got ${res}`
)
}
),
startRequest: () => {
lastRequest = now()
openRequestCount++
const id = openRequestCount
dbg(`${subdomain} started new request ${id}`)
return () => {
openRequestCount--
dbg(`${subdomain} ended request ${id}`)
}
},
}
const instanceLimiter = new Bottleneck({ maxConcurrent: 1 })
const getInstance = (subdomain: string) =>
instanceLimiter.schedule(async () => {
// dbg(`Getting instance ${subdomain}`)
{
tm.repeat(
safeCatch(`idleCheck`, async () => {
raw(`${subdomain} idle check: ${openRequestCount} open requests`)
if (
openRequestCount === 0 &&
lastRequest + DAEMON_PB_IDLE_TTL < now()
) {
dbg(
`${subdomain} idle for ${DAEMON_PB_IDLE_TTL}, shutting down`
)
await _api.shutdown()
return false
} else {
raw(`${openRequestCount} requests remain open on ${subdomain}`)
}
return true
}),
RECHECK_TTL
const instance = instances[subdomain]
if (instance) {
// dbg(`Found in cache: ${subdomain}`)
return instance
}
}
const clientLimiter = new Bottleneck({ maxConcurrent: 1 })
dbg(`Checking ${subdomain} for permission`)
const [instance, owner] = await clientLimiter.schedule(() =>
client.getInstanceBySubdomain(subdomain)
)
if (!instance) {
dbg(`${subdomain} not found`)
return
}
if (!owner?.verified) {
throw new Error(
`Log in at ${PUBLIC_APP_PROTOCOL}://${PUBLIC_APP_DOMAIN} to verify your account.`
)
}
{
const uptime = safeCatch(`uptime`, async () => {
raw(`${subdomain} uptime`)
await clientLimiter.schedule(() =>
client.pingInvocation(invocation)
)
return true
})
tm.repeat(
() =>
uptime().catch((e) => {
warn(`Ignoring error`)
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Port)
)
dbg(`${subdomain} found in DB`)
const exclude = map(instances, (i) => i.port)
const newPort = await getPort({
port: DAEMON_PB_PORT_BASE,
exclude,
}).catch((e) => {
error(`Failed to get port for ${subdomain}`)
throw e
})
dbg(`Found port for ${subdomain}: ${newPort}`)
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Starting)
)
const childProcess = await pbService.spawn({
command: 'serve',
slug: instance.id,
port: newPort,
version: instance.version,
onUnexpectedStop: (code) => {
warn(`${subdomain} exited unexpectedly with ${code}`)
api.shutdown()
},
})
const { pid } = childProcess
assertTruthy(pid, `Expected PID here but got ${pid}`)
if (!instance.isBackupAllowed) {
await client.updateInstance(instance.id, { isBackupAllowed: true })
}
const invocation = await clientLimiter.schedule(() =>
client.createInvocation(instance, pid)
)
const tm = createTimerManager({})
const api: InstanceApi = (() => {
let openRequestCount = 0
let lastRequest = now()
const internalUrl = mkInternalUrl(newPort)
const RECHECK_TTL = 1000 // 1 second
const _api: InstanceApi = {
process: childProcess,
internalUrl,
port: newPort,
shutdown: safeCatch(
`Instance ${subdomain} invocation ${invocation.id} pid ${pid} shutdown`,
async () => {
tm.shutdown()
await clientLimiter.schedule(() =>
client.finalizeInvocation(invocation)
)
const res = childProcess.kill()
delete instances[subdomain]
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Idle)
)
assertTruthy(
res,
`Expected child process to exit gracefully but got ${res}`
)
}
),
startRequest: () => {
lastRequest = now()
openRequestCount++
const id = openRequestCount
dbg(`${subdomain} started new request ${id}`)
return () => {
openRequestCount--
dbg(`${subdomain} ended request ${id}`)
}
},
}
{
tm.repeat(
safeCatch(`idleCheck`, async () => {
raw(
`${subdomain} idle check: ${openRequestCount} open requests`
)
if (
openRequestCount === 0 &&
lastRequest + DAEMON_PB_IDLE_TTL < now()
) {
dbg(
`${subdomain} idle for ${DAEMON_PB_IDLE_TTL}, shutting down`
)
await _api.shutdown()
return false
} else {
raw(
`${openRequestCount} requests remain open on ${subdomain}`
)
}
return true
}),
1000
)
}
RECHECK_TTL
)
}
return _api
})()
{
const uptime = safeCatch(`uptime`, async () => {
raw(`${subdomain} uptime`)
await clientLimiter.schedule(() =>
client.pingInvocation(invocation)
)
return true
})
tm.repeat(
() =>
uptime().catch((e) => {
warn(`Ignoring error`)
return true
}),
1000
)
}
instances[subdomain] = api
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Running)
)
dbg(`${api.internalUrl} is running`)
return instances[subdomain]
})
return _api
})()
const shutdown = () => {
dbg(`Shutting down instance manager`)
forEachRight(instances, (instance) => {
instance.shutdown()
})
instances[subdomain] = api
await clientLimiter.schedule(() =>
client.updateInstanceStatus(instance.id, InstanceStatus.Running)
)
dbg(`${api.internalUrl} is running`)
return instances[subdomain]
})
const shutdown = () => {
dbg(`Shutting down instance manager`)
forEachRight(instances, (instance) => {
instance.shutdown()
})
}
const maintenance = async (instanceId: InstanceId) => {}
return { getInstance, shutdown, maintenance }
}
const maintenance = async (instanceId: InstanceId) => {}
return { getInstance, shutdown, maintenance }
}
)

View File

@ -7,20 +7,22 @@ import {
PUBLIC_APP_PROTOCOL,
PUBLIC_PB_SUBDOMAIN,
} from '../constants'
import { InstanceServiceApi } from './InstanceService'
import { instanceService } from './InstanceService'
export type ProxyServiceApi = AsyncReturnType<typeof createProxyService>
export type ProxyServiceConfig = {
coreInternalUrl: string
instanceManager: InstanceServiceApi
}
export const createProxyService = async (config: ProxyServiceConfig) => {
const { dbg, error, info } = logger().create('ProxyService')
const { instanceManager, coreInternalUrl } = config
const { coreInternalUrl } = config
const proxy = httpProxy.createProxyServer({})
const { getInstance } = await instanceService()
const server = createServer(async (req, res) => {
dbg(`Incoming request ${req.headers.host}/${req.url}`)
@ -48,7 +50,7 @@ export const createProxyService = async (config: ProxyServiceConfig) => {
return
}
const instance = await instanceManager.getInstance(subdomain)
const instance = await getInstance(subdomain)
if (!instance) {
throw new Error(
`${host} not found. Please check the instance URL and try again, or create one at ${PUBLIC_APP_PROTOCOL}://${PUBLIC_APP_DOMAIN}.`
@ -83,7 +85,6 @@ export const createProxyService = async (config: ProxyServiceConfig) => {
resolve()
})
server.closeAllConnections()
instanceManager.shutdown()
})
}