From 63ad32e0542dee298d2065a369affcab86ecc468 Mon Sep 17 00:00:00 2001 From: Ben Allfree Date: Sun, 18 Jun 2023 06:13:56 -0700 Subject: [PATCH] enh: SqliteService async serialization enhancements --- .../services/SqliteService/SqliteService.ts | 64 +++++++++++-------- .../src/util/serialAsyncExecutionGuard.ts | 26 ++++++++ 2 files changed, 65 insertions(+), 25 deletions(-) create mode 100644 packages/daemon/src/util/serialAsyncExecutionGuard.ts diff --git a/packages/daemon/src/services/SqliteService/SqliteService.ts b/packages/daemon/src/services/SqliteService/SqliteService.ts index 18489fa9..b2efa58e 100644 --- a/packages/daemon/src/services/SqliteService/SqliteService.ts +++ b/packages/daemon/src/services/SqliteService/SqliteService.ts @@ -4,10 +4,10 @@ import { mkSingleton, SingletonBaseConfig, } from '@pockethost/common' -import Bottleneck from 'bottleneck' import { Database as SqliteDatabase, open } from 'sqlite' import { Database } from 'sqlite3' import { JsonObject } from 'type-fest' +import { serialAsyncExecutionGuard } from '../../util/serialAsyncExecutionGuard' export type SqliteUnsubscribe = () => void export type SqliteChangeHandler = ( @@ -35,21 +35,25 @@ export type SqliteService = ReturnType export const sqliteService = mkSingleton((config: SqliteServiceConfig) => { const { logger } = config const { dbg, trace } = logger.create(`sqliteService`) - const connections: { [_: string]: Promise } = {} + const connections: { [_: string]: SqliteServiceApi } = {} const cm = createCleanupManager() - const limiter = new Bottleneck({ maxConcurrent: 1 }) + /* + This function + */ + const _unsafe_getDatabase = async ( + filename: string + ): Promise => { + const _dbLogger = logger.create(`SqliteService`) + _dbLogger.breadcrumb(filename) + const { dbg, error, abort } = _dbLogger - const getDatabase = async (filename: string): Promise => { - const _dbLogger = logger.create(`${filename}`) - const { dbg } = _dbLogger - - trace(`Fetching database for ${filename}`, connections) + trace(`Fetching database`, connections) if (!connections[filename]) { - dbg(`${filename} is not yet opened`) + dbg(`Not yet opened`) - connections[filename] = new Promise(async (resolve) => { + const api = await (async () => { const db = await open({ filename, driver: Database }) dbg(`Database opened`) db.db.addListener( @@ -60,20 +64,23 @@ export const sqliteService = mkSingleton((config: SqliteServiceConfig) => { table: string, rowId: number ) => { - dbg(`Got a raw change event`, { eventType, database, table, rowId }) + trace(`Got a raw change event`, { + eventType, + database, + table, + rowId, + }) if (eventType === 'delete') return // Not supported - await limiter.schedule(async () => { - const record = await db.get( - `select * from ${table} where rowid = '${rowId}'` - ) - const e: SqliteChangeEvent = { - table, - action: eventType, - record, - } - fireChange(e) - }) + const record = await db.get( + `select * from ${table} where rowid = '${rowId}'` + ) + const e: SqliteChangeEvent = { + table, + action: eventType, + record, + } + fireChange(e) } ) @@ -92,15 +99,22 @@ export const sqliteService = mkSingleton((config: SqliteServiceConfig) => { exec: db.exec.bind(db), subscribe: onChange, } - resolve(api) - }) + return api + })().catch(error) + if (!api) { + throw new Error(`Unable to connect to SQLite`) + } + connections[filename] = api } return connections[filename]! } + const getDatabase = serialAsyncExecutionGuard( + _unsafe_getDatabase, + (fileName) => fileName + ) const shutdown = async () => { dbg(`Shutting down sqlite service`) - await limiter.stop() await cm.shutdown() } return { diff --git a/packages/daemon/src/util/serialAsyncExecutionGuard.ts b/packages/daemon/src/util/serialAsyncExecutionGuard.ts new file mode 100644 index 00000000..b5a1cb5f --- /dev/null +++ b/packages/daemon/src/util/serialAsyncExecutionGuard.ts @@ -0,0 +1,26 @@ +import { logger } from '@pockethost/common' +import { uniqueId } from '@s-libs/micro-dash' +import Bottleneck from 'bottleneck' +import { SetReturnType } from 'type-fest' + +const limiters: { [lane: string]: Bottleneck } = {} +export const serialAsyncExecutionGuard = < + T extends (...args: any[]) => Promise +>( + cb: T, + lane?: SetReturnType +): T => { + const uuid = uniqueId() + const _lane = lane || (() => uuid) + const wrapper = (...args: Parameters) => { + const { dbg } = logger() + const key = _lane(...args) + if (!limiters[key]) { + dbg(`New singleton limiter with key ${key}`) + limiters[key] = new Bottleneck({ maxConcurrent: 1 }) + } + const limiter = limiters[key]! + return limiter.schedule(() => cb(...args)) as unknown as ReturnType + } + return wrapper as unknown as T +}