enh: overhaul container launcher

This commit is contained in:
Ben Allfree 2024-11-12 00:56:03 -08:00
parent 6997ff33a5
commit 4d96492353
10 changed files with 175 additions and 491 deletions

View File

@ -1,17 +0,0 @@
import { Command } from 'commander'
import { logger } from '../../../../../../common'
import { syslog } from './syslog'
type Options = {
debug: boolean
}
export const ServeCommand = () => {
const cmd = new Command(`serve`)
.description(`Run an edge syslog server`)
.action(async (options: Options) => {
logger().context({ cli: 'edge:syslog:serve' })
await syslog()
})
return cmd
}

View File

@ -1,49 +0,0 @@
import * as dgram from 'dgram'
import parse from 'syslog-parse'
import { LoggerService } from '../../../../../common'
import { SYSLOGD_PORT } from '../../../../../core'
import { InstanceLogger } from '../../../../../services'
export function syslog() {
return new Promise<void>((resolve, reject) => {
const logger = LoggerService().create(`EdgeSyslogCommand`)
const { dbg, error, info, warn } = logger
info(`Starting`)
const PORT = SYSLOGD_PORT()
const HOST = '0.0.0.0'
const server = dgram.createSocket('udp4')
server.on('error', (err) => {
console.log(`Server error:\n${err.stack}`)
server.close()
reject(err)
})
server.on('message', (msg, rinfo) => {
const raw = msg.toString()
const parsed = parse(raw)
if (!parsed) {
return
}
dbg(parsed)
const { process: instanceId, severity, message } = parsed
const logger = InstanceLogger(instanceId, `exec`, { ttl: 5000 })
if (severity === 'info') {
logger.info(message)
} else {
logger.error(message)
}
})
server.on('listening', () => {
const address = server.address()
info(`Server listening ${address.address}:${address.port}`)
})
server.bind(PORT, HOST)
})
}

View File

@ -1,16 +0,0 @@
import { Command } from 'commander'
import { ServeCommand } from './ServeCommand'
type Options = {
debug: boolean
}
export const SyslogCommand = () => {
const cmd = new Command(`syslog`)
.description(`Syslog commands`)
.addCommand(ServeCommand())
.action(() => {
cmd.help()
})
return cmd
}

View File

@ -1,7 +1,6 @@
import { Command } from 'commander'
import { DaemonCommand } from './DaemonCommand'
import { FtpCommand } from './FtpCommand'
import { SyslogCommand } from './SyslogCommand'
type Options = {
debug: boolean
@ -13,7 +12,6 @@ export const EdgeCommand = () => {
cmd
.addCommand(DaemonCommand())
.addCommand(FtpCommand())
.addCommand(SyslogCommand())
.action(() => {
cmd.help()
})

View File

@ -2,7 +2,6 @@ import { Command } from 'commander'
import { logger } from '../../../common'
import { neverendingPromise } from '../../../core'
import { daemon } from '../EdgeCommand/DaemonCommand/ServeCommand/daemon'
import { syslog } from '../EdgeCommand/SyslogCommand/ServeCommand/syslog'
import { firewall } from '../FirewallCommand/ServeCommand/firewall/server'
import { mothership } from '../MothershipCommand/ServeCommand/mothership'
@ -18,7 +17,6 @@ export const ServeCommand = () => {
const { dbg, error, info, warn } = logger()
info(`Starting`)
await syslog()
await mothership(options)
await daemon()
await firewall()

View File

@ -92,7 +92,7 @@ export const SETTINGS = {
IPCIDR_LIST: mkCsvString([]),
DAEMON_PORT: mkNumber(3000),
DAEMON_PB_IDLE_TTL: mkNumber(1000 * 60 * 5), // 5 minutes
DAEMON_PB_IDLE_TTL: mkNumber(1000 * 5), // 5 seconds
MOTHERSHIP_NAME: mkString(_MOTHERSHIP_NAME),
MOTHERSHIP_ADMIN_USERNAME: mkString(),

View File

@ -1,29 +1,25 @@
import * as fs from 'fs'
import { appendFile } from 'fs/promises'
import { Tail } from 'tail'
import * as winston from 'winston'
import {
LoggerService,
asyncExitHook,
createCleanupManager,
discordAlert,
mergeConfig,
mkInstanceDataPath,
stringify,
} from '../../../core'
import { LoggerService, mkInstanceDataPath, stringify } from '../../../core'
type UnsubFunc = () => void
export type InstanceLoggerApi = {
info: (msg: string) => void
error: (msg: string) => void
tail: (linesBack: number, data: (line: winston.LogEntry) => void) => UnsubFunc
tail: (linesBack: number, data: (line: LogEntry) => void) => UnsubFunc
shutdown: () => void
}
export type InstanceLoggerOptions = {
ttl: number
export type LogEntry = {
message: string
stream: 'stdout' | 'stderr'
time: string
}
export type InstanceLoggerOptions = {}
const loggers: {
[key: string]: InstanceLoggerApi
} = {}
@ -33,18 +29,9 @@ export function InstanceLogger(
target: string,
options: Partial<InstanceLoggerOptions> = {},
) {
const { dbg, info } = LoggerService()
const { dbg, info, error, warn } = LoggerService()
.create(instanceId)
.breadcrumb({ target })
const { ttl } = mergeConfig<InstanceLoggerOptions>({ ttl: 0 }, options)
dbg({ ttl })
const loggerKey = `${instanceId}_${target}`
if (loggers[loggerKey]) {
dbg(`Logger exists, using cache`)
return loggers[loggerKey]!
}
const logDirectory = mkInstanceDataPath(instanceId, `logs`)
if (!fs.existsSync(logDirectory)) {
@ -54,77 +41,31 @@ export function InstanceLogger(
const logFile = mkInstanceDataPath(instanceId, `logs`, `${target}.log`)
const cm = createCleanupManager()
const fileTransport = new winston.transports.File({
filename: logFile,
maxsize: 100 * 1024 * 1024, // 100MB
maxFiles: 10,
tailable: true,
zippedArchive: true,
})
const logger = winston.createLogger({
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json(),
winston.format.printf((info) => {
return stringify({
stream: info.level === 'info' ? 'stdout' : 'stderr',
time: info.timestamp,
message: info.message,
})
}),
),
transports: [fileTransport],
})
cm.add(() => {
dbg(`Deleting and closing`)
delete loggers[loggerKey]
fileTransport.close?.()
logger.close()
})
const { error, warn } = LoggerService()
.create('InstanceLogger')
.breadcrumb({ instanceId, target })
const resetTtl = (() => {
let tid: ReturnType<typeof setTimeout>
return () => {
if (!ttl) return
clearTimeout(tid)
tid = setTimeout(() => {
dbg(`Logger timeout`)
api.shutdown()
}, ttl)
}
})()
const appendLogEntry = (msg: string, stream: 'stdout' | 'stderr') => {
appendFile(
logFile,
stringify({
message: msg,
stream,
time: new Date().toISOString(),
}) + '\n',
)
}
const api = {
info: (msg: string) => {
resetTtl()
info(`info: `, msg)
logger.info(msg)
info(msg)
appendLogEntry(msg, 'stdout')
},
error: (msg: string) => {
resetTtl()
error(`error: `, msg)
discordAlert(`error: ${msg}`)
logger.error(msg)
error(msg)
appendLogEntry(msg, 'stderr')
},
tail: (
linesBack: number,
data: (line: winston.LogEntry) => void,
): UnsubFunc => {
if (ttl) {
throw new Error(`Cannot tail with ttl active`)
}
tail: (linesBack: number, data: (line: LogEntry) => void): UnsubFunc => {
const logFile = mkInstanceDataPath(instanceId, `logs`, `${target}.log`)
let tid: any
cm.add(() => clearTimeout(tid))
let unsub: any
const check = () => {
try {
const tail = new Tail(logFile, { nLines: linesBack })
@ -135,7 +76,7 @@ export function InstanceLogger(
data(entry)
} catch (e) {
data({
level: 'info',
stream: 'stdout',
message: line,
time: new Date().toISOString(),
})
@ -145,7 +86,7 @@ export function InstanceLogger(
error(`Caught a tail error ${e}`)
})
cm.add(() => tail.close())
unsub = () => tail.close()
} catch (e) {
warn(e)
tid = setTimeout(check, 1000)
@ -153,17 +94,13 @@ export function InstanceLogger(
}
check()
const unsub = asyncExitHook(() => cm.shutdown())
return () => {
cm.shutdown()
clearTimeout(tid)
unsub()
}
},
shutdown: () => cm.shutdown(),
}
loggers[loggerKey] = api
return api
}

View File

@ -4,11 +4,8 @@ import { globSync } from 'glob'
import { basename, join } from 'path'
import { AsyncReturnType } from 'type-fest'
import {
APP_URL,
CLEANUP_PRIORITY_LAST,
ClientResponseError,
DAEMON_PB_IDLE_TTL,
DOCS_URL,
EDGE_APEX_DOMAIN,
INSTANCE_APP_HOOK_DIR,
INSTANCE_APP_MIGRATIONS_DIR,
@ -18,8 +15,6 @@ import {
LoggerService,
SingletonBaseConfig,
asyncExitHook,
createCleanupManager,
createTimerManager,
mkAppUrl,
mkContainerHomePath,
mkDocUrl,
@ -28,6 +23,7 @@ import {
mkSingleton,
now,
stringify,
tryFetch,
} from '../../../core'
import {
InstanceLogger,
@ -46,10 +42,9 @@ enum InstanceApiStatus {
}
type InstanceApi = {
status: () => InstanceApiStatus
internalUrl: () => string
internalUrl: string
startRequest: () => () => void
shutdown: (reason?: Error) => Promise<void>
shutdown: () => void
}
export type InstanceServiceConfig = SingletonBaseConfig & {
@ -60,168 +55,43 @@ export type InstanceServiceConfig = SingletonBaseConfig & {
export type InstanceServiceApi = AsyncReturnType<typeof instanceService>
export const instanceService = mkSingleton(
async (config: InstanceServiceConfig) => {
const { instanceApiTimeoutMs, instanceApiCheckIntervalMs } = config
const instanceServiceLogger = LoggerService().create('InstanceService')
const { dbg, raw, error, warn } = instanceServiceLogger
const { client } = await MothershipAdminClientService()
const pbService = await PocketbaseService()
const instanceApis: { [_: InstanceId]: InstanceApi } = {}
const instanceApis: { [_: InstanceId]: Promise<InstanceApi> } = {}
const createInstanceApi = async (
instance: InstanceFields,
): Promise<InstanceApi> => {
const shutdownManager: (() => void)[] = []
const getInstanceApi = (instance: InstanceFields): Promise<InstanceApi> => {
const _logger = instanceServiceLogger.create(`getInstanceApi`)
const { id, subdomain, version } = instance
_logger.breadcrumb({ subdomain, id, version })
const { dbg, trace } = _logger
return new Promise<InstanceApi>((resolve, reject) => {
let maxTries = instanceApiTimeoutMs / instanceApiCheckIntervalMs
const retry = (interval = instanceApiCheckIntervalMs) => {
maxTries--
if (maxTries <= 0) {
reject(
new Error(
`PocketBase instance failed to launch. Please check logs at ${APP_URL()}. [${id}:${subdomain}]. ${DOCS_URL(
`usage`,
`errors`,
)}`,
),
)
return
}
dbg(`${maxTries} tries remaining. Retrying in ${interval}ms`)
setTimeout(_check, interval)
}
const _check = () => {
dbg(`Checking for existing instance API`)
const instanceApi = instanceApis[id]
if (!instanceApi) {
dbg(`No API found, creating`)
createInstanceApi(instance)
retry(0)
return
}
try {
if (instanceApi.status() === InstanceApiStatus.Healthy) {
dbg(`API found and healthy, returning`)
resolve(instanceApi)
return
}
} catch (e) {
dbg(`Instance is in an error state, returning error`)
reject(e)
return
}
dbg(`API found but not healthy (${instanceApi.status()}), waiting`)
retry()
}
_check()
})
}
const createInstanceApi = (instance: InstanceFields): InstanceApi => {
const { id, subdomain, version } = instance
const systemInstanceLogger = instanceServiceLogger.create(
`${subdomain}:${id}:${version}`,
)
const { dbg, warn, error, info, trace } = systemInstanceLogger
const userInstanceLogger = InstanceLogger(instance.id, `exec`)
if (instanceApis[id]) {
throw new Error(
`Attempted to create an instance API when one is already available for ${id}`,
)
}
/*
Initialize shutdown manager
*/
const shutdownManager = createCleanupManager()
shutdownManager.add(() => {
dbg(`Shutting down: delete instanceApis[id]`)
dbg(
`Shutting down: There are ${
values(instanceApis).length
} still in API cache`,
)
shutdownManager.push(() => {
dbg(`Shutting down`)
userInstanceLogger.info(`Instance is shutting down.`)
delete instanceApis[id]
dbg(
`Shutting down: There are now ${
values(instanceApis).length
} still in API cache`,
)
}, CLEANUP_PRIORITY_LAST) // Make this the very last thing that happens
shutdownManager.add(() => {
dbg(`Shut down: InstanceApiStatus.ShuttingDown`)
status = InstanceApiStatus.ShuttingDown
})
}) // Make this the very last thing that happens
info(`Starting`)
let status = InstanceApiStatus.Starting
let internalUrl = ''
let startRequest: InstanceApi['startRequest'] = () => {
throw new Error(`Not ready yet`)
}
userInstanceLogger.info(`Instance is starting.`)
/*
Initialize API
*/
let _shutdownReason: Error | undefined
const api: InstanceApi = {
status: () => {
if (_shutdownReason) throw _shutdownReason
return status
},
internalUrl: () => {
if (status !== InstanceApiStatus.Healthy) {
throw new Error(
`Attempt to access instance URL when instance is not in a healthy state.`,
)
}
return internalUrl
},
startRequest: () => {
if (status !== InstanceApiStatus.Healthy) {
throw new Error(
`Attempt to start an instance request when instance is not in a healthy state.`,
)
}
return startRequest()
},
shutdown: async (reason) => {
dbg(`Shutting down`)
if (reason) {
_shutdownReason = reason
error(`Panic shutdown for ${reason}`)
} else {
dbg(`Graceful shutdown`)
}
if (status === InstanceApiStatus.ShuttingDown) {
warn(`Already shutting down`)
return
}
return shutdownManager.shutdown()
},
}
const _safeShutdown = async (reason?: Error) => {
if (status === InstanceApiStatus.ShuttingDown && reason) {
warn(`Already shutting down, ${reason} will not be reported.`)
return
}
return api.shutdown(reason)
}
instanceApis[id] = api
let internalUrl: string | undefined
const healthyGuard = () => {
if (status !== InstanceApiStatus.ShuttingDown) return
throw new Error(
`HealthyGuard detected instance is shutting down. Aborting further initialization.`,
)
}
/*
Create serialized client communication functions to prevent race conditions
*/
const clientLimiter = new Bottleneck({ maxConcurrent: 1 })
const updateInstance = clientLimiter.wrap(
(id: InstanceId, fields: Partial<InstanceFields>) => {
@ -239,42 +109,30 @@ export const instanceService = mkSingleton(
const updateInstanceStatus = (id: InstanceId, status: InstanceStatus) =>
updateInstance(id, { status })
/*
Handle async setup
*/
;(async () => {
const { version } = instance
let openRequestCount = 0
let lastRequest = now()
/*
Obtain empty port
*/
dbg(`Obtaining port`)
const [newPortPromise, releasePort] = PortService().alloc()
const newPort = await newPortPromise
shutdownManager.add(() => {
dbg(`shut down: releasing port`)
releasePort()
}, CLEANUP_PRIORITY_LAST)
systemInstanceLogger.breadcrumb({ port: newPort })
dbg(`Found port`)
/*
Create the user instance logger
*/
healthyGuard()
const userInstanceLogger = InstanceLogger(instance.id, `exec`)
/*
Start the instance
*/
try {
/** Mark the instance as starting */
dbg(`Starting instance`)
healthyGuard()
updateInstanceStatus(instance.id, InstanceStatus.Starting)
shutdownManager.add(async () => {
shutdownManager.push(async () => {
dbg(`Shutting down: set instance status: idle`)
updateInstanceStatus(id, InstanceStatus.Idle)
})
healthyGuard()
/** Obtain empty port */
dbg(`Obtaining port`)
const [newPortPromise, releasePort] = PortService().alloc()
const newPort = await newPortPromise
shutdownManager.push(() => {
dbg(`shut down: releasing port`)
releasePort()
})
systemInstanceLogger.breadcrumb({ port: newPort })
dbg(`Found port`)
const internalUrl = mkInternalUrl(newPort)
dbg(`internalUrl`, internalUrl)
/** Create spawn config */
const spawnArgs: SpawnConfig = {
@ -291,9 +149,7 @@ export const instanceService = mkSingleton(
),
globSync(join(INSTANCE_APP_HOOK_DIR(), '*.js')).map(
(file) =>
`${file}:${mkContainerHomePath(
`pb_hooks/${basename(file)}`,
)}:ro`,
`${file}:${mkContainerHomePath(`pb_hooks/${basename(file)}`)}:ro`,
),
]),
env: {
@ -304,7 +160,7 @@ export const instanceService = mkSingleton(
version,
}
/** Sync admin account */
/** Add admin sync info if enabled */
if (instance.syncAdmin) {
const id = instance.uid
dbg(`Fetching token info for uid ${id}`)
@ -319,108 +175,77 @@ export const instanceService = mkSingleton(
})
}
/*
Spawn the child process
*/
const childProcess = await (async () => {
try {
const cp = await pbService.spawn(spawnArgs)
return cp
} catch (e) {
warn(`Error spawning: ${e}`)
userInstanceLogger.error(`Error spawning: ${e}`)
throw new Error(
`Could not launch container. Please review your instance logs at https://app.pockethost.io/app/instances/${instance.id} or contact support at https://pockethost.io/support`,
)
}
})()
const { exitCode } = childProcess
exitCode.then((code) => {
info(`Processes exited with ${code}.`)
setImmediate(() => {
_safeShutdown().catch((err) => {
error(`Error shutting down ${id}`, { err })
})
})
})
shutdownManager.add(async () => {
/** Spawn the child process */
const childProcess = await pbService.spawn(spawnArgs)
const { exitCode, stopped, started } = childProcess
shutdownManager.push(() => {
dbg(`killing ${id}`)
await childProcess.kill().catch((err) => {
childProcess.kill().catch((err) => {
error(`Error killing ${id}`, { err })
})
dbg(`killed ${id}`)
})
/*
API state, timers, etc
*/
const tm = createTimerManager({})
shutdownManager.add(() => tm.shutdown())
let openRequestCount = 0
let lastRequest = now()
internalUrl = mkInternalUrl(newPort)
const RECHECK_TTL = 1000 // 1 second
startRequest = () => {
lastRequest = now()
openRequestCount++
const id = openRequestCount
trace(`started new request`)
return () => {
openRequestCount--
trace(`ended request (${openRequestCount} still open)`)
/** Health check */
await tryFetch(`${internalUrl}/api/health`, {
preflight: async () => {
if (stopped()) throw new Error(`Container stopped`)
return started()
},
})
/** Idle check */
const idleTid = setInterval(() => {
const lastRequestAge = now() - lastRequest
dbg(
`idle check: ${openRequestCount} open requests, ${lastRequestAge}ms since last request`,
)
if (openRequestCount === 0 && lastRequestAge > DAEMON_PB_IDLE_TTL()) {
info(`idle for ${DAEMON_PB_IDLE_TTL()}, shutting down`)
userInstanceLogger.info(
`Instance has been idle for ${DAEMON_PB_IDLE_TTL()}ms. Hibernating to conserve resources.`,
)
shutdownManager.forEach((fn) => fn())
return false
} else {
dbg(`${openRequestCount} requests remain open`)
}
}
{
tm.repeat(async () => {
trace(`idle check: ${openRequestCount} open requests`)
if (
openRequestCount === 0 &&
lastRequest + DAEMON_PB_IDLE_TTL() < now()
) {
info(`idle for ${DAEMON_PB_IDLE_TTL()}, shutting down`)
healthyGuard()
userInstanceLogger.info(
`Instance has been idle for ${DAEMON_PB_IDLE_TTL()}ms. Hibernating to conserve resources.`,
)
await _safeShutdown().catch((err) => {
error(`Error shutting down ${id}`, { err })
})
return false
} else {
trace(`${openRequestCount} requests remain open`)
return true
}, 1000)
shutdownManager.push(() => clearInterval(idleTid))
const api: InstanceApi = {
internalUrl,
startRequest: () => {
lastRequest = now()
openRequestCount++
trace(`started new request`)
return () => {
openRequestCount--
trace(`ended request (${openRequestCount} still open)`)
}
return true
}, RECHECK_TTL)
},
shutdown: () => {
dbg(`Shutting down`)
shutdownManager.forEach((fn) => fn())
},
}
dbg(`${internalUrl} is running`)
status = InstanceApiStatus.Healthy
healthyGuard()
updateInstanceStatus(instance.id, InstanceStatus.Running)
})().catch((e) => {
const detail = (() => {
if (e instanceof ClientResponseError) {
const { response } = e
if (response?.data) {
const detail = map(
response.data,
(v, k) => `${k}:${v?.code}:${v?.message}`,
).join(`\n`)
return detail
}
}
return `${e}`
})()
warn(`Instance failed to start: ${detail}`)
_safeShutdown(e).catch((err) => {
error(`Error shutting down ${id}`, { err })
})
})
return api
return api
} catch (e) {
error(`Error spawning: ${e}`)
userInstanceLogger.error(`Error spawning: ${e}`)
shutdownManager.forEach((fn) => fn())
throw e
}
}
const getInstance = (() => {
const getInstanceRecord = (() => {
const cache = mkInstanceCache(client.client)
return async (host: string) => {
@ -499,7 +324,7 @@ export const instanceService = mkSingleton(
const { host, proxy } = res.locals
const instance = await getInstance(host)
const instance = await getInstanceRecord(host)
if (!instance) {
res.status(404).end(`${host} not found`)
return
@ -523,8 +348,8 @@ export const instanceService = mkSingleton(
dbg(`Checking for maintenance mode`)
if (instance.maintenance) {
throw new Error(
`This instance is in Maintenance Mode. See ${mkDocUrl(
`usage/maintenance`,
`This instance is powered off. See ${mkDocUrl(
`power`,
)} for more information.`,
)
}
@ -537,25 +362,33 @@ export const instanceService = mkSingleton(
throw new Error(`Log in at ${mkAppUrl()} to verify your account.`)
}
const api = await getInstanceApi(instance)
const api = await (instanceApis[instance.id] =
instanceApis[instance.id] || createInstanceApi(instance)).catch((e) => {
throw new Error(
`Could not launch container. Please review your instance logs at https://app.pockethost.io/app/instances/${instance.id} or contact support at https://pockethost.io/support. [${res.locals.requestId}]`,
)
})
const endRequest = api.startRequest()
res.on('close', endRequest)
if (req.closed) {
throw new Error(`Request already closed.`)
error(`Request already closed. ${res.locals.requestId}`)
}
dbg(
`Forwarding proxy request for ${
req.url
} to instance ${api.internalUrl()}`,
} to instance ${api.internalUrl}`,
)
proxy.web(req, res, { target: api.internalUrl() })
proxy.web(req, res, { target: api.internalUrl })
})
asyncExitHook(async () => {
dbg(`Shutting down instance manager`)
const p = Promise.all(map(instanceApis, (api) => api.shutdown()))
const p = Promise.all(
map(instanceApis, async (api) => (await api).shutdown()),
)
await p
})

View File

@ -7,20 +7,17 @@ import { EventEmitter } from 'stream'
import { AsyncReturnType } from 'type-fest'
import {
APEX_DOMAIN,
DOCKER_CONTAINER_HOST,
LoggerService,
SYSLOGD_PORT,
SingletonBaseConfig,
SyslogLogger,
asyncExitHook,
createCleanupManager,
mkContainerHomePath,
mkInstanceDataPath,
mkInternalUrl,
mkSingleton,
tryFetch,
} from '../../../core'
import { GobotService } from '../GobotService'
import { InstanceLogger } from '../InstanceLoggerService'
import { PortService } from '../PortService'
export type Env = { [_: string]: string }
@ -45,6 +42,8 @@ export type PocketbaseProcess = {
url: string
kill: () => Promise<void>
exitCode: Promise<number>
stopped: () => boolean
started: () => boolean
}
export const DOCKER_INSTANCE_IMAGE_NAME = `benallfree/pockethost-instance`
@ -95,11 +94,7 @@ export const createPocketbaseService = async (
} = _cfg
logger.breadcrumb({ subdomain, instanceId })
const iLogger = SyslogLogger(instanceId, 'exec')
cm.add(async () => {
dbg(`Shutting down iLogger`)
await iLogger.shutdown()
})
const iLogger = InstanceLogger(instanceId, 'exec')
const _version = version || maxVersion // If _version is blank, we use the max version available
const realVersion = await bot.maxSatisfyingVersion(_version)
@ -119,18 +114,22 @@ export const createPocketbaseService = async (
let started = false
let stopped = false
let stopping = false
const container = await new Promise<{
on: EventEmitter['on']
kill: () => Promise<void>
}>((resolve) => {
}>((resolve, reject) => {
const docker = new Docker()
iLogger.info(`Starting instance`)
const handleData = (data: Buffer) => {
stdout.on('data', (data) => {
iLogger.info(data.toString())
dbg(data.toString())
}
stdout.on('data', handleData)
stderr.on('data', handleData)
})
stderr.on('data', (data) => {
iLogger.error(data.toString())
dbg(data.toString())
})
const Binds = [
`${mkInstanceDataPath(instanceId)}:${mkContainerHomePath()}`,
`${binPath}:${mkContainerHomePath(`pocketbase`)}:ro`,
@ -164,19 +163,11 @@ export const createPocketbaseService = async (
Hard: 4096,
},
],
LogConfig: {
Type: 'syslog',
Config: {
'syslog-address': `udp://${DOCKER_CONTAINER_HOST()}:${SYSLOGD_PORT()}`,
tag: instanceId,
},
},
},
Tty: false,
ExposedPorts: {
[`8090/tcp`]: {},
},
// User: 'pockethost',
}
createOptions.Cmd = ['node', `index.mjs`]
@ -200,8 +191,8 @@ export const createPocketbaseService = async (
createOptions,
(err, data) => {
stopped = true
stderr.off(`data`, handleData)
stdout.off(`data`, handleData)
stderr.removeAllListeners(`data`)
stdout.removeAllListeners(`data`)
const StatusCode = (() => {
if (!data?.StatusCode) return 0
return parseInt(data.StatusCode, 10)
@ -216,13 +207,19 @@ export const createPocketbaseService = async (
137 - SIGKILL (expected)
*/
if ((StatusCode > 0 && StatusCode !== 137) || err) {
const castStatusCode = StatusCode || 999
iLogger.error(
`Unexpected stop with code ${StatusCode} and error ${err}`,
`Unexpected stop with code ${castStatusCode} and error ${err}`,
)
error(
`${instanceId} stopped unexpectedly with code ${StatusCode} and error ${err}`,
`${instanceId} stopped unexpectedly with code ${castStatusCode} and error ${err}`,
)
emitter.emit(Events.Exit, castStatusCode)
reject(
new Error(
`${instanceId} stopped unexpectedly with code ${castStatusCode} and error ${err}`,
),
)
emitter.emit(Events.Exit, StatusCode || 999)
} else {
emitter.emit(Events.Exit, 0)
}
@ -241,7 +238,7 @@ export const createPocketbaseService = async (
})
})
}).catch((e) => {
error(`Error starting container`, e)
error(`Error starting container: ${e}`)
cm.shutdown()
throw e
})
@ -264,17 +261,18 @@ export const createPocketbaseService = async (
const unsub = asyncExitHook(async () => {
await api.kill()
})
await tryFetch(`${url}/api/health`, {
preflight: async () => {
dbg({ stopped, started, container: !!container })
if (stopped) throw new Error(`Container stopped`)
return started && !!container
},
})
const api: PocketbaseProcess = {
url,
exitCode,
stopped: () => stopped,
started: () => started,
kill: async () => {
if (stopping) {
warn(`${instanceId} already stopping`)
return
}
stopping = true
dbg(`Killing`)
iLogger.info(`Stopping instance`)
await container.kill()

View File

@ -59,7 +59,10 @@ export const realtimeLog = mkSingleton(async (config: RealtimeLogConfig) => {
/** Validate instance and ownership */
dbg(`Got a log request for instance ID ${instanceId}`)
const instance = await client
.collection('instances').getFirstListItem<InstanceFields>(`id = '${instanceId}' || subdomain='${instanceId}'` )
.collection('instances')
.getFirstListItem<InstanceFields>(
`id = '${instanceId}' || subdomain='${instanceId}'`,
)
if (!instance) {
throw new Error(`instanceId ${instanceId} not found for user ${user.id}`)
}
@ -82,7 +85,6 @@ export const realtimeLog = mkSingleton(async (config: RealtimeLogConfig) => {
res.on('close', () => {
unsub()
instanceLogger.shutdown()
})
})