enh: SqliteService async serialization enhancements

This commit is contained in:
Ben Allfree 2023-06-18 06:13:56 -07:00
parent c419a1f692
commit 63ad32e054
2 changed files with 65 additions and 25 deletions

View File

@ -4,10 +4,10 @@ import {
mkSingleton,
SingletonBaseConfig,
} from '@pockethost/common'
import Bottleneck from 'bottleneck'
import { Database as SqliteDatabase, open } from 'sqlite'
import { Database } from 'sqlite3'
import { JsonObject } from 'type-fest'
import { serialAsyncExecutionGuard } from '../../util/serialAsyncExecutionGuard'
export type SqliteUnsubscribe = () => void
export type SqliteChangeHandler<TRecord extends JsonObject> = (
@ -35,21 +35,25 @@ export type SqliteService = ReturnType<typeof sqliteService>
export const sqliteService = mkSingleton((config: SqliteServiceConfig) => {
const { logger } = config
const { dbg, trace } = logger.create(`sqliteService`)
const connections: { [_: string]: Promise<SqliteServiceApi> } = {}
const connections: { [_: string]: SqliteServiceApi } = {}
const cm = createCleanupManager()
const limiter = new Bottleneck({ maxConcurrent: 1 })
/*
This function
*/
const _unsafe_getDatabase = async (
filename: string
): Promise<SqliteServiceApi> => {
const _dbLogger = logger.create(`SqliteService`)
_dbLogger.breadcrumb(filename)
const { dbg, error, abort } = _dbLogger
const getDatabase = async (filename: string): Promise<SqliteServiceApi> => {
const _dbLogger = logger.create(`${filename}`)
const { dbg } = _dbLogger
trace(`Fetching database for ${filename}`, connections)
trace(`Fetching database`, connections)
if (!connections[filename]) {
dbg(`${filename} is not yet opened`)
dbg(`Not yet opened`)
connections[filename] = new Promise<SqliteServiceApi>(async (resolve) => {
const api = await (async () => {
const db = await open({ filename, driver: Database })
dbg(`Database opened`)
db.db.addListener(
@ -60,20 +64,23 @@ export const sqliteService = mkSingleton((config: SqliteServiceConfig) => {
table: string,
rowId: number
) => {
dbg(`Got a raw change event`, { eventType, database, table, rowId })
trace(`Got a raw change event`, {
eventType,
database,
table,
rowId,
})
if (eventType === 'delete') return // Not supported
await limiter.schedule(async () => {
const record = await db.get(
`select * from ${table} where rowid = '${rowId}'`
)
const e: SqliteChangeEvent<any> = {
table,
action: eventType,
record,
}
fireChange(e)
})
const record = await db.get(
`select * from ${table} where rowid = '${rowId}'`
)
const e: SqliteChangeEvent<any> = {
table,
action: eventType,
record,
}
fireChange(e)
}
)
@ -92,15 +99,22 @@ export const sqliteService = mkSingleton((config: SqliteServiceConfig) => {
exec: db.exec.bind(db),
subscribe: onChange,
}
resolve(api)
})
return api
})().catch(error)
if (!api) {
throw new Error(`Unable to connect to SQLite`)
}
connections[filename] = api
}
return connections[filename]!
}
const getDatabase = serialAsyncExecutionGuard(
_unsafe_getDatabase,
(fileName) => fileName
)
const shutdown = async () => {
dbg(`Shutting down sqlite service`)
await limiter.stop()
await cm.shutdown()
}
return {

View File

@ -0,0 +1,26 @@
import { logger } from '@pockethost/common'
import { uniqueId } from '@s-libs/micro-dash'
import Bottleneck from 'bottleneck'
import { SetReturnType } from 'type-fest'
const limiters: { [lane: string]: Bottleneck } = {}
export const serialAsyncExecutionGuard = <
T extends (...args: any[]) => Promise<any>
>(
cb: T,
lane?: SetReturnType<T, string>
): T => {
const uuid = uniqueId()
const _lane = lane || (() => uuid)
const wrapper = (...args: Parameters<T>) => {
const { dbg } = logger()
const key = _lane(...args)
if (!limiters[key]) {
dbg(`New singleton limiter with key ${key}`)
limiters[key] = new Bottleneck({ maxConcurrent: 1 })
}
const limiter = limiters[key]!
return limiter.schedule(() => cb(...args)) as unknown as ReturnType<T>
}
return wrapper as unknown as T
}