mirror of
https://github.com/pockethost/pockethost.git
synced 2025-03-30 15:08:30 +00:00
enh: healthyguard
This commit is contained in:
parent
0a6301c7c2
commit
eb8bed9cea
@ -56,23 +56,35 @@ global.EventSource = require('eventsource')
|
|||||||
{
|
{
|
||||||
info(`Migrating mothership`)
|
info(`Migrating mothership`)
|
||||||
await (
|
await (
|
||||||
await pbService.spawn({
|
await pbService.spawn(
|
||||||
command: 'migrate',
|
{
|
||||||
isMothership: true,
|
command: 'migrate',
|
||||||
version: DAEMON_PB_SEMVER,
|
isMothership: true,
|
||||||
slug: PUBLIC_APP_DB,
|
version: DAEMON_PB_SEMVER,
|
||||||
})
|
slug: PUBLIC_APP_DB,
|
||||||
|
onUnexpectedStop: () => {
|
||||||
|
error(`migrate had an unexpected stop. Check it out`)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ logger }
|
||||||
|
)
|
||||||
).exited
|
).exited
|
||||||
info(`Migrating done`)
|
info(`Migrating done`)
|
||||||
}
|
}
|
||||||
info(`Serving`)
|
info(`Serving`)
|
||||||
const { url } = await pbService.spawn({
|
const { url } = await pbService.spawn(
|
||||||
command: 'serve',
|
{
|
||||||
isMothership: true,
|
command: 'serve',
|
||||||
version: DAEMON_PB_SEMVER,
|
isMothership: true,
|
||||||
slug: PUBLIC_APP_DB,
|
version: DAEMON_PB_SEMVER,
|
||||||
port: DAEMON_PB_PORT_BASE,
|
slug: PUBLIC_APP_DB,
|
||||||
})
|
port: DAEMON_PB_PORT_BASE,
|
||||||
|
onUnexpectedStop: () => {
|
||||||
|
error(`migrate had an unexpected stop. Check it out`)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ logger }
|
||||||
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Launch services
|
* Launch services
|
||||||
|
@ -43,7 +43,7 @@ type InstanceApi = {
|
|||||||
status: () => InstanceApiStatus
|
status: () => InstanceApiStatus
|
||||||
internalUrl: () => string
|
internalUrl: () => string
|
||||||
startRequest: () => () => void
|
startRequest: () => () => void
|
||||||
shutdown: () => Promise<void>
|
shutdown: (reason?: Error) => Promise<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
export type InstanceServiceConfig = SingletonBaseConfig & {
|
export type InstanceServiceConfig = SingletonBaseConfig & {
|
||||||
@ -116,16 +116,30 @@ export const instanceService = mkSingleton(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Initialize shutdown manager
|
||||||
|
*/
|
||||||
|
const shutdownManager = createCleanupManager()
|
||||||
|
shutdownManager.add(async () => {
|
||||||
|
dbg(`Deleting from cache`)
|
||||||
|
delete instanceApis[id]
|
||||||
|
dbg(`There are ${values(instanceApis).length} still in cache`)
|
||||||
|
}, CLEANUP_PRIORITY_LAST) // Make this the very last thing that happens
|
||||||
|
shutdownManager.add(async () => {
|
||||||
|
dbg(`Shutting down`)
|
||||||
|
status = InstanceApiStatus.ShuttingDown
|
||||||
|
})
|
||||||
|
|
||||||
info(`Starting`)
|
info(`Starting`)
|
||||||
let status = InstanceApiStatus.Starting
|
let status = InstanceApiStatus.Starting
|
||||||
let internalUrl = ''
|
let internalUrl = ''
|
||||||
let startRequest: InstanceApi['startRequest'] = () => {
|
let startRequest: InstanceApi['startRequest'] = () => {
|
||||||
throw new Error(`Not ready yet`)
|
throw new Error(`Not ready yet`)
|
||||||
}
|
}
|
||||||
let shutdown: InstanceApi['shutdown'] = () => {
|
|
||||||
throw new Error(`Not ready yet`)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Initialize API
|
||||||
|
*/
|
||||||
const api: InstanceApi = {
|
const api: InstanceApi = {
|
||||||
status: () => {
|
status: () => {
|
||||||
return status
|
return status
|
||||||
@ -146,43 +160,38 @@ export const instanceService = mkSingleton(
|
|||||||
}
|
}
|
||||||
return startRequest()
|
return startRequest()
|
||||||
},
|
},
|
||||||
shutdown: async () => {
|
shutdown: async (reason) => {
|
||||||
if (status !== InstanceApiStatus.Healthy) {
|
if (reason) {
|
||||||
throw new Error(
|
error(`Panic shutdown for ${reason}`)
|
||||||
`Attempt to shut down an instance request when instance is not in a healthy state.`
|
} else {
|
||||||
)
|
dbg(`Graceful shutdown`)
|
||||||
}
|
}
|
||||||
return shutdown()
|
if (api.status() === InstanceApiStatus.ShuttingDown) {
|
||||||
|
throw new Error(`Already shutting down`)
|
||||||
|
}
|
||||||
|
return shutdownManager.shutdown()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
const _safeShutdown = async (reason?: Error) =>
|
||||||
|
api.status() === InstanceApiStatus.ShuttingDown || api.shutdown(reason)
|
||||||
instanceApis[id] = api
|
instanceApis[id] = api
|
||||||
|
|
||||||
/*
|
const healthyGuard = () => {
|
||||||
Initialize shutdown manager
|
if (api.status() !== InstanceApiStatus.ShuttingDown) return
|
||||||
*/
|
throw new Error(`Instance is shutting down. Aborting.`)
|
||||||
const shutdownManager = createCleanupManager()
|
}
|
||||||
shutdownManager.add(async () => {
|
|
||||||
dbg(`Deleting from cache`)
|
|
||||||
delete instanceApis[id]
|
|
||||||
dbg(`There are ${values(instanceApis).length} still in cache`)
|
|
||||||
}, CLEANUP_PRIORITY_LAST) // Make this the very last thing that happens
|
|
||||||
shutdownManager.add(async () => {
|
|
||||||
dbg(`Shutting down`)
|
|
||||||
status = InstanceApiStatus.ShuttingDown
|
|
||||||
})
|
|
||||||
shutdown = () => shutdownManager.shutdown()
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Create serialized client communication functions to prevent race conditions
|
Create serialized client communication functions to prevent race conditions
|
||||||
*/
|
*/
|
||||||
const clientLimiter = new Bottleneck({ maxConcurrent: 1 })
|
const clientLimiter = new Bottleneck({ maxConcurrent: 1 })
|
||||||
const _updateInstanceStatus = clientLimiter.wrap(
|
const updateInstanceStatus = clientLimiter.wrap(
|
||||||
client.updateInstanceStatus
|
client.updateInstanceStatus
|
||||||
)
|
)
|
||||||
const _updateInstance = clientLimiter.wrap(client.updateInstance)
|
const updateInstance = clientLimiter.wrap(client.updateInstance)
|
||||||
const _createInvocation = clientLimiter.wrap(client.createInvocation)
|
const createInvocation = clientLimiter.wrap(client.createInvocation)
|
||||||
const _pingInvocation = clientLimiter.wrap(client.pingInvocation)
|
const pingInvocation = clientLimiter.wrap(client.pingInvocation)
|
||||||
const _finalizeInvocation = clientLimiter.wrap(client.finalizeInvocation)
|
const finalizeInvocation = clientLimiter.wrap(client.finalizeInvocation)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Handle async setup
|
Handle async setup
|
||||||
@ -194,29 +203,34 @@ export const instanceService = mkSingleton(
|
|||||||
Obtain empty port
|
Obtain empty port
|
||||||
*/
|
*/
|
||||||
dbg(`Obtaining port`)
|
dbg(`Obtaining port`)
|
||||||
await _updateInstanceStatus(instance.id, InstanceStatus.Port)
|
healthyGuard()
|
||||||
|
await updateInstanceStatus(instance.id, InstanceStatus.Port)
|
||||||
|
healthyGuard()
|
||||||
const [newPort, releasePort] = await getNextPort()
|
const [newPort, releasePort] = await getNextPort()
|
||||||
shutdownManager.add(() => {
|
shutdownManager.add(() => {
|
||||||
dbg(`Releasing port`)
|
dbg(`Releasing port`)
|
||||||
releasePort()
|
releasePort()
|
||||||
}, CLEANUP_PRIORITY_LAST)
|
}, CLEANUP_PRIORITY_LAST)
|
||||||
|
|
||||||
systemInstanceLogger.breadcrumb(`port:${newPort}`)
|
systemInstanceLogger.breadcrumb(`port:${newPort}`)
|
||||||
dbg(`Found port`)
|
dbg(`Found port`)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Create the user instance logger
|
Create the user instance logger
|
||||||
*/
|
*/
|
||||||
|
healthyGuard()
|
||||||
const userInstanceLogger = await instanceLoggerService().get(
|
const userInstanceLogger = await instanceLoggerService().get(
|
||||||
instance.id,
|
instance.id,
|
||||||
{ parentLogger: systemInstanceLogger }
|
{
|
||||||
|
parentLogger: systemInstanceLogger,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
const _writeUserLog = serialAsyncExecutionGuard(
|
|
||||||
|
const writeUserLog = serialAsyncExecutionGuard(
|
||||||
userInstanceLogger.write,
|
userInstanceLogger.write,
|
||||||
() => `${instance.id}:userLog`
|
() => `${instance.id}:userLog`
|
||||||
)
|
)
|
||||||
shutdownManager.add(() =>
|
shutdownManager.add(() =>
|
||||||
_writeUserLog(`Shutting down instance`).catch(error)
|
writeUserLog(`Shutting down instance`).catch(error)
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -224,12 +238,14 @@ export const instanceService = mkSingleton(
|
|||||||
*/
|
*/
|
||||||
dbg(`Starting instance`)
|
dbg(`Starting instance`)
|
||||||
dbg(`Set instance status: starting`)
|
dbg(`Set instance status: starting`)
|
||||||
await _updateInstanceStatus(instance.id, InstanceStatus.Starting)
|
healthyGuard()
|
||||||
|
await updateInstanceStatus(instance.id, InstanceStatus.Starting)
|
||||||
shutdownManager.add(async () => {
|
shutdownManager.add(async () => {
|
||||||
dbg(`Set instance status: idle`)
|
dbg(`Set instance status: idle`)
|
||||||
await _updateInstanceStatus(id, InstanceStatus.Idle).catch(error)
|
await updateInstanceStatus(id, InstanceStatus.Idle).catch(error)
|
||||||
})
|
})
|
||||||
await _writeUserLog(`Starting instance`)
|
healthyGuard()
|
||||||
|
await writeUserLog(`Starting instance`)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Spawn the child process
|
Spawn the child process
|
||||||
@ -241,33 +257,37 @@ export const instanceService = mkSingleton(
|
|||||||
slug: instance.id,
|
slug: instance.id,
|
||||||
port: newPort,
|
port: newPort,
|
||||||
version,
|
version,
|
||||||
onUnexpectedStop: async (code, stdout, stderr) => {
|
onUnexpectedStop: (code, stdout, stderr) => {
|
||||||
warn(
|
warn(
|
||||||
`PocketBase processes exited unexpectedly with ${code}. Putting in maintenance mode.`
|
`PocketBase processes exited unexpectedly with ${code}. Putting in maintenance mode.`
|
||||||
)
|
)
|
||||||
warn(stdout)
|
warn(stdout)
|
||||||
warn(stderr)
|
warn(stderr)
|
||||||
shutdownManager.add(async () => {
|
shutdownManager.add(async () => {
|
||||||
await _updateInstance(instance.id, {
|
await updateInstance(instance.id, {
|
||||||
maintenance: true,
|
maintenance: true,
|
||||||
})
|
})
|
||||||
await _writeUserLog(
|
await writeUserLog(
|
||||||
`Putting instance in maintenance mode because it shut down with return code ${code}. `,
|
`Putting instance in maintenance mode because it shut down with return code ${code}. `,
|
||||||
StreamNames.Error
|
StreamNames.Error
|
||||||
)
|
)
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
stdout.map((data) =>
|
stdout.map((data) =>
|
||||||
_writeUserLog(data, StreamNames.Error).catch(error)
|
writeUserLog(data, StreamNames.Error).catch(error)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
stderr.map((data) =>
|
stderr.map((data) =>
|
||||||
_writeUserLog(data, StreamNames.Error).catch(error)
|
writeUserLog(data, StreamNames.Error).catch(error)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
api.shutdown().catch(error)
|
_safeShutdown(
|
||||||
|
new Error(
|
||||||
|
`PocketBase processes exited unexpectedly with ${code}. Putting in maintenance mode.`
|
||||||
|
)
|
||||||
|
).catch(error)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -292,14 +312,16 @@ export const instanceService = mkSingleton(
|
|||||||
/*
|
/*
|
||||||
Create the invocation record
|
Create the invocation record
|
||||||
*/
|
*/
|
||||||
const invocation = await _createInvocation(instance, pid)
|
healthyGuard()
|
||||||
|
const invocation = await createInvocation(instance, pid)
|
||||||
shutdownManager.add(async () => {
|
shutdownManager.add(async () => {
|
||||||
await _finalizeInvocation(invocation).catch(error)
|
await finalizeInvocation(invocation).catch(error)
|
||||||
})
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deno worker
|
* Deno worker
|
||||||
*/
|
*/
|
||||||
|
healthyGuard()
|
||||||
const denoApi = await (async () => {
|
const denoApi = await (async () => {
|
||||||
const workerPath = join(
|
const workerPath = join(
|
||||||
DAEMON_PB_DATA_DIR,
|
DAEMON_PB_DATA_DIR,
|
||||||
@ -310,7 +332,9 @@ export const instanceService = mkSingleton(
|
|||||||
dbg(`Checking ${workerPath} for a worker entry point`)
|
dbg(`Checking ${workerPath} for a worker entry point`)
|
||||||
if (existsSync(workerPath)) {
|
if (existsSync(workerPath)) {
|
||||||
dbg(`Found worker ${workerPath}`)
|
dbg(`Found worker ${workerPath}`)
|
||||||
await _writeUserLog(`Starting worker`)
|
healthyGuard()
|
||||||
|
await writeUserLog(`Starting worker`)
|
||||||
|
healthyGuard()
|
||||||
const api = await createDenoProcess({
|
const api = await createDenoProcess({
|
||||||
path: workerPath,
|
path: workerPath,
|
||||||
port: newPort,
|
port: newPort,
|
||||||
@ -323,7 +347,7 @@ export const instanceService = mkSingleton(
|
|||||||
}
|
}
|
||||||
})()
|
})()
|
||||||
shutdownManager.add(async () => {
|
shutdownManager.add(async () => {
|
||||||
await _writeUserLog(`Shutting down worker`).catch(error)
|
await writeUserLog(`Shutting down worker`).catch(error)
|
||||||
await denoApi?.shutdown().catch(error)
|
await denoApi?.shutdown().catch(error)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -355,7 +379,8 @@ export const instanceService = mkSingleton(
|
|||||||
lastRequest + DAEMON_PB_IDLE_TTL < now()
|
lastRequest + DAEMON_PB_IDLE_TTL < now()
|
||||||
) {
|
) {
|
||||||
dbg(`idle for ${DAEMON_PB_IDLE_TTL}, shutting down`)
|
dbg(`idle for ${DAEMON_PB_IDLE_TTL}, shutting down`)
|
||||||
await shutdown()
|
healthyGuard()
|
||||||
|
await _safeShutdown().catch(error)
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
raw(`${openRequestCount} requests remain open`)
|
raw(`${openRequestCount} requests remain open`)
|
||||||
@ -369,7 +394,7 @@ export const instanceService = mkSingleton(
|
|||||||
{
|
{
|
||||||
tm.repeat(
|
tm.repeat(
|
||||||
() =>
|
() =>
|
||||||
_pingInvocation(invocation)
|
pingInvocation(invocation)
|
||||||
.then(() => true)
|
.then(() => true)
|
||||||
.catch((e) => {
|
.catch((e) => {
|
||||||
warn(`_pingInvocation failed with ${e}`)
|
warn(`_pingInvocation failed with ${e}`)
|
||||||
@ -381,10 +406,11 @@ export const instanceService = mkSingleton(
|
|||||||
|
|
||||||
dbg(`${internalUrl} is running`)
|
dbg(`${internalUrl} is running`)
|
||||||
status = InstanceApiStatus.Healthy
|
status = InstanceApiStatus.Healthy
|
||||||
await _updateInstanceStatus(instance.id, InstanceStatus.Running)
|
healthyGuard()
|
||||||
|
await updateInstanceStatus(instance.id, InstanceStatus.Running)
|
||||||
})().catch((e) => {
|
})().catch((e) => {
|
||||||
warn(`Instance failed to start with ${e}`)
|
warn(`Instance failed to start with ${e}`)
|
||||||
shutdown().catch(e)
|
_safeShutdown(e).catch(error)
|
||||||
})
|
})
|
||||||
|
|
||||||
return api
|
return api
|
||||||
|
@ -6,11 +6,7 @@ import {
|
|||||||
smartFetch,
|
smartFetch,
|
||||||
tryFetch,
|
tryFetch,
|
||||||
} from '$util'
|
} from '$util'
|
||||||
import {
|
import { createCleanupManager, createTimerManager } from '@pockethost/common'
|
||||||
createCleanupManager,
|
|
||||||
createTimerManager,
|
|
||||||
safeCatch,
|
|
||||||
} from '@pockethost/common'
|
|
||||||
import {
|
import {
|
||||||
mkSingleton,
|
mkSingleton,
|
||||||
SingletonBaseConfig,
|
SingletonBaseConfig,
|
||||||
@ -23,6 +19,7 @@ import { type } from 'os'
|
|||||||
import { join } from 'path'
|
import { join } from 'path'
|
||||||
import { gte, maxSatisfying, rsort } from 'semver'
|
import { gte, maxSatisfying, rsort } from 'semver'
|
||||||
import { AsyncReturnType } from 'type-fest'
|
import { AsyncReturnType } from 'type-fest'
|
||||||
|
import { AsyncContext } from '../util/AsyncContext'
|
||||||
|
|
||||||
export type PocketbaseCommand = 'serve' | 'migrate'
|
export type PocketbaseCommand = 'serve' | 'migrate'
|
||||||
export type SpawnConfig = {
|
export type SpawnConfig = {
|
||||||
@ -31,7 +28,7 @@ export type SpawnConfig = {
|
|||||||
version?: string
|
version?: string
|
||||||
port?: number
|
port?: number
|
||||||
isMothership?: boolean
|
isMothership?: boolean
|
||||||
onUnexpectedStop?: (
|
onUnexpectedStop: (
|
||||||
code: number | null,
|
code: number | null,
|
||||||
stdout: string[],
|
stdout: string[],
|
||||||
stderr: string[]
|
stderr: string[]
|
||||||
@ -149,133 +146,131 @@ export const createPocketbaseService = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const _spawn = safeCatch(
|
const _spawn = async (cfg: SpawnConfig, context?: AsyncContext) => {
|
||||||
`spawnInstance`,
|
const logger = (context?.logger || _serviceLogger).create('spawn')
|
||||||
_serviceLogger,
|
const { dbg, warn, error } = logger
|
||||||
async (cfg: SpawnConfig) => {
|
const _cfg: Required<SpawnConfig> = {
|
||||||
const _cfg: Required<SpawnConfig> = {
|
version: maxVersion,
|
||||||
version: maxVersion,
|
port: await getPort(),
|
||||||
port: await getPort(),
|
isMothership: false,
|
||||||
isMothership: false,
|
...cfg,
|
||||||
onUnexpectedStop: (code) => {
|
|
||||||
dbg(`Unexpected stop default handler. Exit code: ${code}`)
|
|
||||||
},
|
|
||||||
...cfg,
|
|
||||||
}
|
|
||||||
const { version, command, slug, port, onUnexpectedStop, isMothership } =
|
|
||||||
_cfg
|
|
||||||
const _version = version || maxVersion // If _version is blank, we use the max version available
|
|
||||||
const realVersion = await getVersion(_version)
|
|
||||||
const bin = realVersion.binPath
|
|
||||||
if (!existsSync(bin)) {
|
|
||||||
throw new Error(
|
|
||||||
`PocketBase binary (${bin}) not found. Contact pockethost.io.`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const args = [
|
|
||||||
command,
|
|
||||||
`--dir`,
|
|
||||||
`${DAEMON_PB_DATA_DIR}/${slug}/pb_data`,
|
|
||||||
`--publicDir`,
|
|
||||||
`${DAEMON_PB_DATA_DIR}/${slug}/pb_static`,
|
|
||||||
]
|
|
||||||
if (gte(realVersion.version, `0.9.0`)) {
|
|
||||||
args.push(`--migrationsDir`)
|
|
||||||
args.push(
|
|
||||||
isMothership
|
|
||||||
? DAEMON_PB_MIGRATIONS_DIR
|
|
||||||
: `${DAEMON_PB_DATA_DIR}/${slug}/pb_migrations`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if (command === 'serve') {
|
|
||||||
args.push(`--http`)
|
|
||||||
args.push(mkInternalAddress(port))
|
|
||||||
}
|
|
||||||
|
|
||||||
let isRunning = true
|
|
||||||
|
|
||||||
dbg(`Spawning ${slug}`, { bin, args, cli: [bin, ...args].join(' ') })
|
|
||||||
const ls = spawn(bin, args)
|
|
||||||
cm.add(() => {
|
|
||||||
ls.kill()
|
|
||||||
})
|
|
||||||
|
|
||||||
const stdout: string[] = []
|
|
||||||
ls.stdout.on('data', (data: Buffer) => {
|
|
||||||
dbg(`${slug} stdout: ${data}`)
|
|
||||||
stdout.push(data.toString())
|
|
||||||
if (stdout.length > 100) stdout.pop()
|
|
||||||
})
|
|
||||||
|
|
||||||
const stderr: string[] = []
|
|
||||||
ls.stderr.on('data', (data: Buffer) => {
|
|
||||||
warn(`${slug} stderr: ${data}`)
|
|
||||||
stderr.push(data.toString())
|
|
||||||
if (stderr.length > 100) stderr.pop()
|
|
||||||
})
|
|
||||||
|
|
||||||
ls.on('close', (code) => {
|
|
||||||
dbg(`${slug} closed with code ${code}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const exited = new Promise<number | null>((resolve) => {
|
|
||||||
ls.on('exit', (code) => {
|
|
||||||
dbg(`${slug} exited with code ${code}`)
|
|
||||||
isRunning = false
|
|
||||||
if (code || stderr.length > 0) {
|
|
||||||
onUnexpectedStop?.(code, stdout, stderr)
|
|
||||||
}
|
|
||||||
resolve(code)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
ls.on('error', (err) => {
|
|
||||||
dbg(`${slug} had error ${err}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
const url = mkInternalUrl(port)
|
|
||||||
if (command === 'serve') {
|
|
||||||
await tryFetch(_serviceLogger)(url, async () => isRunning)
|
|
||||||
}
|
|
||||||
const api: PocketbaseProcess = {
|
|
||||||
url,
|
|
||||||
exited,
|
|
||||||
pid: ls.pid,
|
|
||||||
kill: async () => {
|
|
||||||
const { pid } = ls
|
|
||||||
if (!pid) {
|
|
||||||
throw new Error(
|
|
||||||
`Attempt to kill a PocketBase process that was never running.`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
const p = new Promise<boolean>((resolve, reject) => {
|
|
||||||
let cid: NodeJS.Timeout
|
|
||||||
const tid = setTimeout(() => {
|
|
||||||
clearTimeout(cid)
|
|
||||||
reject(new Error(`Timeout waiting for pid:${pid} to die`))
|
|
||||||
}, 1000)
|
|
||||||
const _check = () => {
|
|
||||||
dbg(`Checking to see if pid:${pid} is running`)
|
|
||||||
if (pidIsRunning(pid)) {
|
|
||||||
dbg(`pid:${pid} is still running`)
|
|
||||||
ls.kill()
|
|
||||||
cid = setTimeout(_check, 50)
|
|
||||||
} else {
|
|
||||||
dbg(`pid:${pid} is not running`)
|
|
||||||
clearTimeout(tid)
|
|
||||||
|
|
||||||
resolve(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_check()
|
|
||||||
})
|
|
||||||
return p
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return api
|
|
||||||
}
|
}
|
||||||
)
|
const { version, command, slug, port, onUnexpectedStop, isMothership } =
|
||||||
|
_cfg
|
||||||
|
const _version = version || maxVersion // If _version is blank, we use the max version available
|
||||||
|
const realVersion = await getVersion(_version)
|
||||||
|
const bin = realVersion.binPath
|
||||||
|
if (!existsSync(bin)) {
|
||||||
|
throw new Error(
|
||||||
|
`PocketBase binary (${bin}) not found. Contact pockethost.io.`
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
const args = [
|
||||||
|
command,
|
||||||
|
`--dir`,
|
||||||
|
`${DAEMON_PB_DATA_DIR}/${slug}/pb_data`,
|
||||||
|
`--publicDir`,
|
||||||
|
`${DAEMON_PB_DATA_DIR}/${slug}/pb_static`,
|
||||||
|
]
|
||||||
|
if (gte(realVersion.version, `0.9.0`)) {
|
||||||
|
args.push(`--migrationsDir`)
|
||||||
|
args.push(
|
||||||
|
isMothership
|
||||||
|
? DAEMON_PB_MIGRATIONS_DIR
|
||||||
|
: `${DAEMON_PB_DATA_DIR}/${slug}/pb_migrations`
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if (command === 'serve') {
|
||||||
|
args.push(`--http`)
|
||||||
|
args.push(mkInternalAddress(port))
|
||||||
|
}
|
||||||
|
|
||||||
|
let isRunning = true
|
||||||
|
|
||||||
|
dbg(`Spawning ${slug}`, { bin, args, cli: [bin, ...args].join(' ') })
|
||||||
|
const ls = spawn(bin, args)
|
||||||
|
cm.add(() => {
|
||||||
|
ls.kill()
|
||||||
|
})
|
||||||
|
|
||||||
|
const stdout: string[] = []
|
||||||
|
ls.stdout.on('data', (data: Buffer) => {
|
||||||
|
dbg(`${slug} stdout: ${data}`)
|
||||||
|
stdout.push(data.toString())
|
||||||
|
if (stdout.length > 100) stdout.pop()
|
||||||
|
})
|
||||||
|
|
||||||
|
const stderr: string[] = []
|
||||||
|
ls.stderr.on('data', (data: Buffer) => {
|
||||||
|
warn(`${slug} stderr: ${data}`)
|
||||||
|
stderr.push(data.toString())
|
||||||
|
if (stderr.length > 100) stderr.pop()
|
||||||
|
})
|
||||||
|
|
||||||
|
ls.on('close', (code) => {
|
||||||
|
dbg(`${slug} closed with code ${code}`)
|
||||||
|
})
|
||||||
|
|
||||||
|
const exited = new Promise<number | null>((resolve) => {
|
||||||
|
ls.on('exit', (code) => {
|
||||||
|
dbg(`${slug} exited with code ${code}`)
|
||||||
|
isRunning = false
|
||||||
|
if (code || stderr.length > 0) {
|
||||||
|
onUnexpectedStop?.(code, stdout, stderr)
|
||||||
|
}
|
||||||
|
resolve(code)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ls.on('error', (err) => {
|
||||||
|
dbg(`${slug} had error ${err}`)
|
||||||
|
})
|
||||||
|
|
||||||
|
const url = mkInternalUrl(port)
|
||||||
|
if (command === 'serve') {
|
||||||
|
await tryFetch(url, {
|
||||||
|
preflight: async () => isRunning,
|
||||||
|
logger: _serviceLogger,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
const api: PocketbaseProcess = {
|
||||||
|
url,
|
||||||
|
exited,
|
||||||
|
pid: ls.pid,
|
||||||
|
kill: async () => {
|
||||||
|
const { pid } = ls
|
||||||
|
if (!pid) {
|
||||||
|
throw new Error(
|
||||||
|
`Attempt to kill a PocketBase process that was never running.`
|
||||||
|
)
|
||||||
|
}
|
||||||
|
const p = new Promise<boolean>((resolve, reject) => {
|
||||||
|
let cid: NodeJS.Timeout
|
||||||
|
const tid = setTimeout(() => {
|
||||||
|
clearTimeout(cid)
|
||||||
|
reject(new Error(`Timeout waiting for pid:${pid} to die`))
|
||||||
|
}, 1000)
|
||||||
|
const _check = () => {
|
||||||
|
dbg(`Checking to see if pid:${pid} is running`)
|
||||||
|
if (pidIsRunning(pid)) {
|
||||||
|
dbg(`pid:${pid} is still running`)
|
||||||
|
ls.kill()
|
||||||
|
cid = setTimeout(_check, 50)
|
||||||
|
} else {
|
||||||
|
dbg(`pid:${pid} is not running`)
|
||||||
|
clearTimeout(tid)
|
||||||
|
|
||||||
|
resolve(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_check()
|
||||||
|
})
|
||||||
|
return p
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return api
|
||||||
|
}
|
||||||
|
|
||||||
const shutdown = () => {
|
const shutdown = () => {
|
||||||
dbg(`Shutting down pocketbaseService`)
|
dbg(`Shutting down pocketbaseService`)
|
||||||
|
@ -10,6 +10,7 @@ import {
|
|||||||
import { reduce } from '@s-libs/micro-dash'
|
import { reduce } from '@s-libs/micro-dash'
|
||||||
import Bottleneck from 'bottleneck'
|
import Bottleneck from 'bottleneck'
|
||||||
import { endOfMonth, startOfMonth } from 'date-fns'
|
import { endOfMonth, startOfMonth } from 'date-fns'
|
||||||
|
import { AsyncContext } from '../../util/AsyncContext'
|
||||||
import { MixinContext } from './PbClient'
|
import { MixinContext } from './PbClient'
|
||||||
|
|
||||||
export type InstanceApi = ReturnType<typeof createInstanceMixin>
|
export type InstanceApi = ReturnType<typeof createInstanceMixin>
|
||||||
@ -59,30 +60,26 @@ export const createInstanceMixin = (context: MixinContext) => {
|
|||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
const getInstanceById = safeCatch(
|
const getInstanceById = async (
|
||||||
`getInstanceById`,
|
instanceId: InstanceId,
|
||||||
logger,
|
context?: AsyncContext
|
||||||
async (
|
): Promise<[InstanceFields, UserFields] | []> =>
|
||||||
instanceId: InstanceId
|
client
|
||||||
): Promise<[InstanceFields, UserFields] | []> => {
|
.collection(INSTANCE_COLLECTION)
|
||||||
return client
|
.getOne<InstanceFields>(instanceId, {
|
||||||
.collection(INSTANCE_COLLECTION)
|
$autoCancel: false,
|
||||||
.getOne<InstanceFields>(instanceId, {
|
})
|
||||||
$autoCancel: false,
|
.then((instance) => {
|
||||||
})
|
if (!instance) return []
|
||||||
.then((instance) => {
|
return client
|
||||||
if (!instance) return []
|
.collection('users')
|
||||||
return client
|
.getOne<UserFields>(instance.uid, {
|
||||||
.collection('users')
|
$autoCancel: false,
|
||||||
.getOne<UserFields>(instance.uid, {
|
})
|
||||||
$autoCancel: false,
|
.then((user) => {
|
||||||
})
|
return [instance, user]
|
||||||
.then((user) => {
|
})
|
||||||
return [instance, user]
|
})
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
const updateInstance = safeCatch(
|
const updateInstance = safeCatch(
|
||||||
`updateInstance`,
|
`updateInstance`,
|
||||||
|
5
packages/daemon/src/util/AsyncContext.ts
Normal file
5
packages/daemon/src/util/AsyncContext.ts
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
import { Logger } from '@pockethost/common'
|
||||||
|
|
||||||
|
export type AsyncContext = {
|
||||||
|
logger: Logger
|
||||||
|
}
|
@ -1,40 +1,50 @@
|
|||||||
import { Logger, safeCatch } from '@pockethost/common'
|
import { logger as defaultLogger } from '@pockethost/common'
|
||||||
|
import fetch from 'node-fetch'
|
||||||
|
import { AsyncContext } from './AsyncContext'
|
||||||
|
|
||||||
const TRYFETCH_RETRY_MS = 50
|
export const TRYFETCH_RETRY_MS = 50
|
||||||
export const tryFetch = (logger: Logger) =>
|
|
||||||
safeCatch(
|
export type Config = Required<AsyncContext> & {
|
||||||
`tryFetch`,
|
preflight: () => Promise<boolean>
|
||||||
logger,
|
retryMs: number
|
||||||
(url: string, preflight?: () => Promise<boolean>) => {
|
}
|
||||||
const { dbg } = logger.create('tryFetch')
|
|
||||||
return new Promise<void>((resolve, reject) => {
|
export const tryFetch = async (url: string, config?: Partial<Config>) => {
|
||||||
const tryFetch = async () => {
|
const { logger, preflight, retryMs }: Config = {
|
||||||
if (preflight) {
|
logger: defaultLogger(),
|
||||||
dbg(`Checking preflight`)
|
preflight: async () => true,
|
||||||
try {
|
retryMs: TRYFETCH_RETRY_MS,
|
||||||
const shouldFetch = await preflight()
|
...config,
|
||||||
if (!shouldFetch) {
|
}
|
||||||
reject(new Error(`failed preflight, aborting`))
|
const _logger = logger.create(`tryFetch`)
|
||||||
return
|
const { dbg } = _logger
|
||||||
}
|
return new Promise<void>((resolve, reject) => {
|
||||||
} catch (e) {
|
const tryFetch = async () => {
|
||||||
reject(new Error(`preflight threw error, aborting`))
|
if (preflight) {
|
||||||
return
|
dbg(`Checking preflight`)
|
||||||
}
|
try {
|
||||||
}
|
const shouldFetch = await preflight()
|
||||||
try {
|
if (!shouldFetch) {
|
||||||
dbg(`Trying to fetch ${url} `)
|
reject(new Error(`failed preflight, aborting`))
|
||||||
const res = await fetch(url)
|
return
|
||||||
dbg(`Fetch ${url} successful`)
|
|
||||||
resolve()
|
|
||||||
} catch (e) {
|
|
||||||
dbg(
|
|
||||||
`Could not fetch ${url}, trying again in ${TRYFETCH_RETRY_MS}ms. Raw error ${e}`
|
|
||||||
)
|
|
||||||
setTimeout(tryFetch, TRYFETCH_RETRY_MS)
|
|
||||||
}
|
}
|
||||||
|
} catch (e) {
|
||||||
|
reject(new Error(`preflight threw error, aborting`))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
tryFetch()
|
}
|
||||||
})
|
try {
|
||||||
|
dbg(`Trying to fetch ${url} `)
|
||||||
|
const res = await fetch(url)
|
||||||
|
dbg(`Fetch ${url} successful`)
|
||||||
|
resolve()
|
||||||
|
} catch (e) {
|
||||||
|
dbg(
|
||||||
|
`Could not fetch ${url}, trying again in ${retryMs}ms. Raw error ${e}`
|
||||||
|
)
|
||||||
|
setTimeout(tryFetch, retryMs)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
)
|
tryFetch()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user