diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index 8357e249..b947f352 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -56,23 +56,35 @@ global.EventSource = require('eventsource') { info(`Migrating mothership`) await ( - await pbService.spawn({ - command: 'migrate', - isMothership: true, - version: DAEMON_PB_SEMVER, - slug: PUBLIC_APP_DB, - }) + await pbService.spawn( + { + command: 'migrate', + isMothership: true, + version: DAEMON_PB_SEMVER, + slug: PUBLIC_APP_DB, + onUnexpectedStop: () => { + error(`migrate had an unexpected stop. Check it out`) + }, + }, + { logger } + ) ).exited info(`Migrating done`) } info(`Serving`) - const { url } = await pbService.spawn({ - command: 'serve', - isMothership: true, - version: DAEMON_PB_SEMVER, - slug: PUBLIC_APP_DB, - port: DAEMON_PB_PORT_BASE, - }) + const { url } = await pbService.spawn( + { + command: 'serve', + isMothership: true, + version: DAEMON_PB_SEMVER, + slug: PUBLIC_APP_DB, + port: DAEMON_PB_PORT_BASE, + onUnexpectedStop: () => { + error(`migrate had an unexpected stop. Check it out`) + }, + }, + { logger } + ) /** * Launch services diff --git a/packages/daemon/src/services/InstanceService/InstanceService.ts b/packages/daemon/src/services/InstanceService/InstanceService.ts index 80f680e3..ab3b2245 100644 --- a/packages/daemon/src/services/InstanceService/InstanceService.ts +++ b/packages/daemon/src/services/InstanceService/InstanceService.ts @@ -43,7 +43,7 @@ type InstanceApi = { status: () => InstanceApiStatus internalUrl: () => string startRequest: () => () => void - shutdown: () => Promise + shutdown: (reason?: Error) => Promise } export type InstanceServiceConfig = SingletonBaseConfig & { @@ -116,16 +116,30 @@ export const instanceService = mkSingleton( ) } + /* + Initialize shutdown manager + */ + const shutdownManager = createCleanupManager() + shutdownManager.add(async () => { + dbg(`Deleting from cache`) + delete instanceApis[id] + dbg(`There are ${values(instanceApis).length} still in cache`) + }, CLEANUP_PRIORITY_LAST) // Make this the very last thing that happens + shutdownManager.add(async () => { + dbg(`Shutting down`) + status = InstanceApiStatus.ShuttingDown + }) + info(`Starting`) let status = InstanceApiStatus.Starting let internalUrl = '' let startRequest: InstanceApi['startRequest'] = () => { throw new Error(`Not ready yet`) } - let shutdown: InstanceApi['shutdown'] = () => { - throw new Error(`Not ready yet`) - } + /* + Initialize API + */ const api: InstanceApi = { status: () => { return status @@ -146,43 +160,38 @@ export const instanceService = mkSingleton( } return startRequest() }, - shutdown: async () => { - if (status !== InstanceApiStatus.Healthy) { - throw new Error( - `Attempt to shut down an instance request when instance is not in a healthy state.` - ) + shutdown: async (reason) => { + if (reason) { + error(`Panic shutdown for ${reason}`) + } else { + dbg(`Graceful shutdown`) } - return shutdown() + if (api.status() === InstanceApiStatus.ShuttingDown) { + throw new Error(`Already shutting down`) + } + return shutdownManager.shutdown() }, } + const _safeShutdown = async (reason?: Error) => + api.status() === InstanceApiStatus.ShuttingDown || api.shutdown(reason) instanceApis[id] = api - /* - Initialize shutdown manager - */ - const shutdownManager = createCleanupManager() - shutdownManager.add(async () => { - dbg(`Deleting from cache`) - delete instanceApis[id] - dbg(`There are ${values(instanceApis).length} still in cache`) - }, CLEANUP_PRIORITY_LAST) // Make this the very last thing that happens - shutdownManager.add(async () => { - dbg(`Shutting down`) - status = InstanceApiStatus.ShuttingDown - }) - shutdown = () => shutdownManager.shutdown() + const healthyGuard = () => { + if (api.status() !== InstanceApiStatus.ShuttingDown) return + throw new Error(`Instance is shutting down. Aborting.`) + } /* Create serialized client communication functions to prevent race conditions */ const clientLimiter = new Bottleneck({ maxConcurrent: 1 }) - const _updateInstanceStatus = clientLimiter.wrap( + const updateInstanceStatus = clientLimiter.wrap( client.updateInstanceStatus ) - const _updateInstance = clientLimiter.wrap(client.updateInstance) - const _createInvocation = clientLimiter.wrap(client.createInvocation) - const _pingInvocation = clientLimiter.wrap(client.pingInvocation) - const _finalizeInvocation = clientLimiter.wrap(client.finalizeInvocation) + const updateInstance = clientLimiter.wrap(client.updateInstance) + const createInvocation = clientLimiter.wrap(client.createInvocation) + const pingInvocation = clientLimiter.wrap(client.pingInvocation) + const finalizeInvocation = clientLimiter.wrap(client.finalizeInvocation) /* Handle async setup @@ -194,29 +203,34 @@ export const instanceService = mkSingleton( Obtain empty port */ dbg(`Obtaining port`) - await _updateInstanceStatus(instance.id, InstanceStatus.Port) + healthyGuard() + await updateInstanceStatus(instance.id, InstanceStatus.Port) + healthyGuard() const [newPort, releasePort] = await getNextPort() shutdownManager.add(() => { dbg(`Releasing port`) releasePort() }, CLEANUP_PRIORITY_LAST) - systemInstanceLogger.breadcrumb(`port:${newPort}`) dbg(`Found port`) /* Create the user instance logger */ + healthyGuard() const userInstanceLogger = await instanceLoggerService().get( instance.id, - { parentLogger: systemInstanceLogger } + { + parentLogger: systemInstanceLogger, + } ) - const _writeUserLog = serialAsyncExecutionGuard( + + const writeUserLog = serialAsyncExecutionGuard( userInstanceLogger.write, () => `${instance.id}:userLog` ) shutdownManager.add(() => - _writeUserLog(`Shutting down instance`).catch(error) + writeUserLog(`Shutting down instance`).catch(error) ) /* @@ -224,12 +238,14 @@ export const instanceService = mkSingleton( */ dbg(`Starting instance`) dbg(`Set instance status: starting`) - await _updateInstanceStatus(instance.id, InstanceStatus.Starting) + healthyGuard() + await updateInstanceStatus(instance.id, InstanceStatus.Starting) shutdownManager.add(async () => { dbg(`Set instance status: idle`) - await _updateInstanceStatus(id, InstanceStatus.Idle).catch(error) + await updateInstanceStatus(id, InstanceStatus.Idle).catch(error) }) - await _writeUserLog(`Starting instance`) + healthyGuard() + await writeUserLog(`Starting instance`) /* Spawn the child process @@ -241,33 +257,37 @@ export const instanceService = mkSingleton( slug: instance.id, port: newPort, version, - onUnexpectedStop: async (code, stdout, stderr) => { + onUnexpectedStop: (code, stdout, stderr) => { warn( `PocketBase processes exited unexpectedly with ${code}. Putting in maintenance mode.` ) warn(stdout) warn(stderr) shutdownManager.add(async () => { - await _updateInstance(instance.id, { + await updateInstance(instance.id, { maintenance: true, }) - await _writeUserLog( + await writeUserLog( `Putting instance in maintenance mode because it shut down with return code ${code}. `, StreamNames.Error ) await Promise.all( stdout.map((data) => - _writeUserLog(data, StreamNames.Error).catch(error) + writeUserLog(data, StreamNames.Error).catch(error) ) ) await Promise.all( stderr.map((data) => - _writeUserLog(data, StreamNames.Error).catch(error) + writeUserLog(data, StreamNames.Error).catch(error) ) ) }) setImmediate(() => { - api.shutdown().catch(error) + _safeShutdown( + new Error( + `PocketBase processes exited unexpectedly with ${code}. Putting in maintenance mode.` + ) + ).catch(error) }) }, }) @@ -292,14 +312,16 @@ export const instanceService = mkSingleton( /* Create the invocation record */ - const invocation = await _createInvocation(instance, pid) + healthyGuard() + const invocation = await createInvocation(instance, pid) shutdownManager.add(async () => { - await _finalizeInvocation(invocation).catch(error) + await finalizeInvocation(invocation).catch(error) }) /** * Deno worker */ + healthyGuard() const denoApi = await (async () => { const workerPath = join( DAEMON_PB_DATA_DIR, @@ -310,7 +332,9 @@ export const instanceService = mkSingleton( dbg(`Checking ${workerPath} for a worker entry point`) if (existsSync(workerPath)) { dbg(`Found worker ${workerPath}`) - await _writeUserLog(`Starting worker`) + healthyGuard() + await writeUserLog(`Starting worker`) + healthyGuard() const api = await createDenoProcess({ path: workerPath, port: newPort, @@ -323,7 +347,7 @@ export const instanceService = mkSingleton( } })() shutdownManager.add(async () => { - await _writeUserLog(`Shutting down worker`).catch(error) + await writeUserLog(`Shutting down worker`).catch(error) await denoApi?.shutdown().catch(error) }) @@ -355,7 +379,8 @@ export const instanceService = mkSingleton( lastRequest + DAEMON_PB_IDLE_TTL < now() ) { dbg(`idle for ${DAEMON_PB_IDLE_TTL}, shutting down`) - await shutdown() + healthyGuard() + await _safeShutdown().catch(error) return false } else { raw(`${openRequestCount} requests remain open`) @@ -369,7 +394,7 @@ export const instanceService = mkSingleton( { tm.repeat( () => - _pingInvocation(invocation) + pingInvocation(invocation) .then(() => true) .catch((e) => { warn(`_pingInvocation failed with ${e}`) @@ -381,10 +406,11 @@ export const instanceService = mkSingleton( dbg(`${internalUrl} is running`) status = InstanceApiStatus.Healthy - await _updateInstanceStatus(instance.id, InstanceStatus.Running) + healthyGuard() + await updateInstanceStatus(instance.id, InstanceStatus.Running) })().catch((e) => { warn(`Instance failed to start with ${e}`) - shutdown().catch(e) + _safeShutdown(e).catch(error) }) return api diff --git a/packages/daemon/src/services/PocketBaseService.ts b/packages/daemon/src/services/PocketBaseService.ts index 7ed4c15c..82072831 100644 --- a/packages/daemon/src/services/PocketBaseService.ts +++ b/packages/daemon/src/services/PocketBaseService.ts @@ -6,11 +6,7 @@ import { smartFetch, tryFetch, } from '$util' -import { - createCleanupManager, - createTimerManager, - safeCatch, -} from '@pockethost/common' +import { createCleanupManager, createTimerManager } from '@pockethost/common' import { mkSingleton, SingletonBaseConfig, @@ -23,6 +19,7 @@ import { type } from 'os' import { join } from 'path' import { gte, maxSatisfying, rsort } from 'semver' import { AsyncReturnType } from 'type-fest' +import { AsyncContext } from '../util/AsyncContext' export type PocketbaseCommand = 'serve' | 'migrate' export type SpawnConfig = { @@ -31,7 +28,7 @@ export type SpawnConfig = { version?: string port?: number isMothership?: boolean - onUnexpectedStop?: ( + onUnexpectedStop: ( code: number | null, stdout: string[], stderr: string[] @@ -149,133 +146,131 @@ export const createPocketbaseService = async ( } } - const _spawn = safeCatch( - `spawnInstance`, - _serviceLogger, - async (cfg: SpawnConfig) => { - const _cfg: Required = { - version: maxVersion, - port: await getPort(), - isMothership: false, - onUnexpectedStop: (code) => { - dbg(`Unexpected stop default handler. Exit code: ${code}`) - }, - ...cfg, - } - const { version, command, slug, port, onUnexpectedStop, isMothership } = - _cfg - const _version = version || maxVersion // If _version is blank, we use the max version available - const realVersion = await getVersion(_version) - const bin = realVersion.binPath - if (!existsSync(bin)) { - throw new Error( - `PocketBase binary (${bin}) not found. Contact pockethost.io.` - ) - } - - const args = [ - command, - `--dir`, - `${DAEMON_PB_DATA_DIR}/${slug}/pb_data`, - `--publicDir`, - `${DAEMON_PB_DATA_DIR}/${slug}/pb_static`, - ] - if (gte(realVersion.version, `0.9.0`)) { - args.push(`--migrationsDir`) - args.push( - isMothership - ? DAEMON_PB_MIGRATIONS_DIR - : `${DAEMON_PB_DATA_DIR}/${slug}/pb_migrations` - ) - } - if (command === 'serve') { - args.push(`--http`) - args.push(mkInternalAddress(port)) - } - - let isRunning = true - - dbg(`Spawning ${slug}`, { bin, args, cli: [bin, ...args].join(' ') }) - const ls = spawn(bin, args) - cm.add(() => { - ls.kill() - }) - - const stdout: string[] = [] - ls.stdout.on('data', (data: Buffer) => { - dbg(`${slug} stdout: ${data}`) - stdout.push(data.toString()) - if (stdout.length > 100) stdout.pop() - }) - - const stderr: string[] = [] - ls.stderr.on('data', (data: Buffer) => { - warn(`${slug} stderr: ${data}`) - stderr.push(data.toString()) - if (stderr.length > 100) stderr.pop() - }) - - ls.on('close', (code) => { - dbg(`${slug} closed with code ${code}`) - }) - - const exited = new Promise((resolve) => { - ls.on('exit', (code) => { - dbg(`${slug} exited with code ${code}`) - isRunning = false - if (code || stderr.length > 0) { - onUnexpectedStop?.(code, stdout, stderr) - } - resolve(code) - }) - }) - - ls.on('error', (err) => { - dbg(`${slug} had error ${err}`) - }) - - const url = mkInternalUrl(port) - if (command === 'serve') { - await tryFetch(_serviceLogger)(url, async () => isRunning) - } - const api: PocketbaseProcess = { - url, - exited, - pid: ls.pid, - kill: async () => { - const { pid } = ls - if (!pid) { - throw new Error( - `Attempt to kill a PocketBase process that was never running.` - ) - } - const p = new Promise((resolve, reject) => { - let cid: NodeJS.Timeout - const tid = setTimeout(() => { - clearTimeout(cid) - reject(new Error(`Timeout waiting for pid:${pid} to die`)) - }, 1000) - const _check = () => { - dbg(`Checking to see if pid:${pid} is running`) - if (pidIsRunning(pid)) { - dbg(`pid:${pid} is still running`) - ls.kill() - cid = setTimeout(_check, 50) - } else { - dbg(`pid:${pid} is not running`) - clearTimeout(tid) - - resolve(true) - } - } - _check() - }) - return p - }, - } - return api + const _spawn = async (cfg: SpawnConfig, context?: AsyncContext) => { + const logger = (context?.logger || _serviceLogger).create('spawn') + const { dbg, warn, error } = logger + const _cfg: Required = { + version: maxVersion, + port: await getPort(), + isMothership: false, + ...cfg, } - ) + const { version, command, slug, port, onUnexpectedStop, isMothership } = + _cfg + const _version = version || maxVersion // If _version is blank, we use the max version available + const realVersion = await getVersion(_version) + const bin = realVersion.binPath + if (!existsSync(bin)) { + throw new Error( + `PocketBase binary (${bin}) not found. Contact pockethost.io.` + ) + } + + const args = [ + command, + `--dir`, + `${DAEMON_PB_DATA_DIR}/${slug}/pb_data`, + `--publicDir`, + `${DAEMON_PB_DATA_DIR}/${slug}/pb_static`, + ] + if (gte(realVersion.version, `0.9.0`)) { + args.push(`--migrationsDir`) + args.push( + isMothership + ? DAEMON_PB_MIGRATIONS_DIR + : `${DAEMON_PB_DATA_DIR}/${slug}/pb_migrations` + ) + } + if (command === 'serve') { + args.push(`--http`) + args.push(mkInternalAddress(port)) + } + + let isRunning = true + + dbg(`Spawning ${slug}`, { bin, args, cli: [bin, ...args].join(' ') }) + const ls = spawn(bin, args) + cm.add(() => { + ls.kill() + }) + + const stdout: string[] = [] + ls.stdout.on('data', (data: Buffer) => { + dbg(`${slug} stdout: ${data}`) + stdout.push(data.toString()) + if (stdout.length > 100) stdout.pop() + }) + + const stderr: string[] = [] + ls.stderr.on('data', (data: Buffer) => { + warn(`${slug} stderr: ${data}`) + stderr.push(data.toString()) + if (stderr.length > 100) stderr.pop() + }) + + ls.on('close', (code) => { + dbg(`${slug} closed with code ${code}`) + }) + + const exited = new Promise((resolve) => { + ls.on('exit', (code) => { + dbg(`${slug} exited with code ${code}`) + isRunning = false + if (code || stderr.length > 0) { + onUnexpectedStop?.(code, stdout, stderr) + } + resolve(code) + }) + }) + + ls.on('error', (err) => { + dbg(`${slug} had error ${err}`) + }) + + const url = mkInternalUrl(port) + if (command === 'serve') { + await tryFetch(url, { + preflight: async () => isRunning, + logger: _serviceLogger, + }) + } + const api: PocketbaseProcess = { + url, + exited, + pid: ls.pid, + kill: async () => { + const { pid } = ls + if (!pid) { + throw new Error( + `Attempt to kill a PocketBase process that was never running.` + ) + } + const p = new Promise((resolve, reject) => { + let cid: NodeJS.Timeout + const tid = setTimeout(() => { + clearTimeout(cid) + reject(new Error(`Timeout waiting for pid:${pid} to die`)) + }, 1000) + const _check = () => { + dbg(`Checking to see if pid:${pid} is running`) + if (pidIsRunning(pid)) { + dbg(`pid:${pid} is still running`) + ls.kill() + cid = setTimeout(_check, 50) + } else { + dbg(`pid:${pid} is not running`) + clearTimeout(tid) + + resolve(true) + } + } + _check() + }) + return p + }, + } + return api + } const shutdown = () => { dbg(`Shutting down pocketbaseService`) diff --git a/packages/daemon/src/services/clientService/InstanceMIxin.ts b/packages/daemon/src/services/clientService/InstanceMIxin.ts index 29bab542..9ea4391c 100644 --- a/packages/daemon/src/services/clientService/InstanceMIxin.ts +++ b/packages/daemon/src/services/clientService/InstanceMIxin.ts @@ -10,6 +10,7 @@ import { import { reduce } from '@s-libs/micro-dash' import Bottleneck from 'bottleneck' import { endOfMonth, startOfMonth } from 'date-fns' +import { AsyncContext } from '../../util/AsyncContext' import { MixinContext } from './PbClient' export type InstanceApi = ReturnType @@ -59,30 +60,26 @@ export const createInstanceMixin = (context: MixinContext) => { }) ) - const getInstanceById = safeCatch( - `getInstanceById`, - logger, - async ( - instanceId: InstanceId - ): Promise<[InstanceFields, UserFields] | []> => { - return client - .collection(INSTANCE_COLLECTION) - .getOne(instanceId, { - $autoCancel: false, - }) - .then((instance) => { - if (!instance) return [] - return client - .collection('users') - .getOne(instance.uid, { - $autoCancel: false, - }) - .then((user) => { - return [instance, user] - }) - }) - } - ) + const getInstanceById = async ( + instanceId: InstanceId, + context?: AsyncContext + ): Promise<[InstanceFields, UserFields] | []> => + client + .collection(INSTANCE_COLLECTION) + .getOne(instanceId, { + $autoCancel: false, + }) + .then((instance) => { + if (!instance) return [] + return client + .collection('users') + .getOne(instance.uid, { + $autoCancel: false, + }) + .then((user) => { + return [instance, user] + }) + }) const updateInstance = safeCatch( `updateInstance`, diff --git a/packages/daemon/src/util/AsyncContext.ts b/packages/daemon/src/util/AsyncContext.ts new file mode 100644 index 00000000..336e49e3 --- /dev/null +++ b/packages/daemon/src/util/AsyncContext.ts @@ -0,0 +1,5 @@ +import { Logger } from '@pockethost/common' + +export type AsyncContext = { + logger: Logger +} diff --git a/packages/daemon/src/util/tryFetch.ts b/packages/daemon/src/util/tryFetch.ts index afa2565a..81c02e1f 100644 --- a/packages/daemon/src/util/tryFetch.ts +++ b/packages/daemon/src/util/tryFetch.ts @@ -1,40 +1,50 @@ -import { Logger, safeCatch } from '@pockethost/common' +import { logger as defaultLogger } from '@pockethost/common' +import fetch from 'node-fetch' +import { AsyncContext } from './AsyncContext' -const TRYFETCH_RETRY_MS = 50 -export const tryFetch = (logger: Logger) => - safeCatch( - `tryFetch`, - logger, - (url: string, preflight?: () => Promise) => { - const { dbg } = logger.create('tryFetch') - return new Promise((resolve, reject) => { - const tryFetch = async () => { - if (preflight) { - dbg(`Checking preflight`) - try { - const shouldFetch = await preflight() - if (!shouldFetch) { - reject(new Error(`failed preflight, aborting`)) - return - } - } catch (e) { - reject(new Error(`preflight threw error, aborting`)) - return - } - } - try { - dbg(`Trying to fetch ${url} `) - const res = await fetch(url) - dbg(`Fetch ${url} successful`) - resolve() - } catch (e) { - dbg( - `Could not fetch ${url}, trying again in ${TRYFETCH_RETRY_MS}ms. Raw error ${e}` - ) - setTimeout(tryFetch, TRYFETCH_RETRY_MS) +export const TRYFETCH_RETRY_MS = 50 + +export type Config = Required & { + preflight: () => Promise + retryMs: number +} + +export const tryFetch = async (url: string, config?: Partial) => { + const { logger, preflight, retryMs }: Config = { + logger: defaultLogger(), + preflight: async () => true, + retryMs: TRYFETCH_RETRY_MS, + ...config, + } + const _logger = logger.create(`tryFetch`) + const { dbg } = _logger + return new Promise((resolve, reject) => { + const tryFetch = async () => { + if (preflight) { + dbg(`Checking preflight`) + try { + const shouldFetch = await preflight() + if (!shouldFetch) { + reject(new Error(`failed preflight, aborting`)) + return } + } catch (e) { + reject(new Error(`preflight threw error, aborting`)) + return } - tryFetch() - }) + } + try { + dbg(`Trying to fetch ${url} `) + const res = await fetch(url) + dbg(`Fetch ${url} successful`) + resolve() + } catch (e) { + dbg( + `Could not fetch ${url}, trying again in ${retryMs}ms. Raw error ${e}` + ) + setTimeout(tryFetch, retryMs) + } } - ) + tryFetch() + }) +}