add snapshot imp & better error logging

This commit is contained in:
goroutine 2024-09-18 01:08:42 -07:00
parent ce46b5eb0b
commit 8860e35e87

View File

@ -24,7 +24,7 @@ type mysqlBackend struct {
func newMySQLBackend(bcfg BackendConfig) (*mysqlBackend, error) {
db, err := sql.Open("mysql", bcfg.MySQLDSN)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open MySQL connection: %w", err)
}
// Set connection pool settings
@ -42,7 +42,8 @@ func newMySQLBackend(bcfg BackendConfig) (*mysqlBackend, error) {
// Initialize tables
if err := be.initTables(); err != nil {
return nil, err
db.Close()
return nil, fmt.Errorf("failed to initialize tables: %w", err)
}
return be, nil
@ -58,7 +59,10 @@ func (m *mysqlBackend) initTables() error {
version BIGINT
)
`)
return err
if err != nil {
return fmt.Errorf("failed to create kv_store table: %w", err)
}
return nil
}
func (m *mysqlBackend) BatchTx() BatchTx {
@ -74,19 +78,22 @@ func (m *mysqlBackend) ConcurrentReadTx() ReadTx {
}
func (m *mysqlBackend) Snapshot() Snapshot {
return &mysqlSnapshot{be: m}
id := fmt.Sprintf("%d", time.Now().UnixNano())
_, err := m.db.Exec(fmt.Sprintf("CREATE TABLE snapshot_%s AS SELECT * FROM kv_store", id))
if err != nil {
m.lg.Error("failed to create snapshot", zap.Error(err))
return nil
}
return &mysqlSnapshot{be: m, id: id}
}
func (m *mysqlBackend) Hash(ignores func([]byte, []byte) bool) (uint32, error) {
// Implement hash calculation for MySQL
// This is a placeholder implementation; you should replace it with your actual logic.
return 0, fmt.Errorf("Hash not implemented for MySQL backend")
}
func (m *mysqlBackend) Size() int64 {
var size int64
row := m.db.QueryRow("SELECT SUM(DATA_LENGTH + INDEX_LENGTH) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()")
err := row.Scan(&size)
err := m.db.QueryRow("SELECT SUM(DATA_LENGTH + INDEX_LENGTH) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()").Scan(&size)
if err != nil {
m.lg.Error("failed to get database size", zap.Error(err))
return 0
@ -99,46 +106,112 @@ func (m *mysqlBackend) SizeInUse() int64 {
}
func (m *mysqlBackend) OpenReadTxN() int64 {
// MySQL doesn't have a concept of read transactions, so return 0
return 0
return 0 // MySQL doesn't have a concept of read transactions
}
func (m *mysqlBackend) Defrag() error {
// MySQL handles fragmentation internally, so this is a no-op
return nil
m.lg.Info("Defrag called on MySQL backend (no-op)")
return nil // MySQL handles fragmentation internally
}
func (m *mysqlBackend) ForceCommit() {
m.lg.Info("ForceCommit called on MySQL backend (no-op)")
// MySQL commits automatically, so this is a no-op
}
func (m *mysqlBackend) Close() error {
close(m.stopc)
<-m.donec
return m.db.Close()
if err := m.db.Close(); err != nil {
return fmt.Errorf("failed to close MySQL connection: %w", err)
}
return nil
}
func (m *mysqlBackend) RestoreSnapshot(r io.Reader) error {
tx, err := m.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
if _, err = tx.Exec("DELETE FROM kv_store"); err != nil {
return fmt.Errorf("failed to clear kv_store: %w", err)
}
stmt, err := tx.Prepare("INSERT INTO kv_store (key, value) VALUES (?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer stmt.Close()
buf := make([]byte, 8192)
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read from snapshot: %w", err)
}
key := buf[:n/2]
value := buf[n/2:n]
if _, err = stmt.Exec(key, value); err != nil {
return fmt.Errorf("failed to insert key-value pair: %w", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
func (m *mysqlBackend) Backup(w io.Writer) error {
rows, err := m.db.Query("SELECT key, value FROM kv_store")
if err != nil {
return fmt.Errorf("failed to query kv_store: %w", err)
}
defer rows.Close()
for rows.Next() {
var key, value []byte
if err := rows.Scan(&key, &value); err != nil {
return fmt.Errorf("failed to scan row: %w", err)
}
if _, err := w.Write(key); err != nil {
return fmt.Errorf("failed to write key: %w", err)
}
if _, err := w.Write(value); err != nil {
return fmt.Errorf("failed to write value: %w", err)
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("error during row iteration: %w", err)
}
return nil
}
func (m *mysqlBackend) RestoreBackup(r io.Reader) error {
return m.RestoreSnapshot(r) // The process is the same as restoring from a snapshot
}
func (m *mysqlBackend) SetTxPostLockInsideApplyHook(hook func()) {
m.mu.Lock()
defer m.mu.Unlock()
m.postLockInsideApplyHook = hook
}
// mysqlBatchTx implements BatchTx interface
type mysqlBatchTx struct {
be *mysqlBackend
tx *sql.Tx
}
func (t *mysqlBatchTx) Lock() {
t.be.mu.Lock()
}
func (t *mysqlBatchTx) Unlock() {
t.be.mu.Unlock()
}
func (t *mysqlBatchTx) Lock() { t.be.mu.Lock() }
func (t *mysqlBatchTx) Unlock() { t.be.mu.Unlock() }
func (t *mysqlBatchTx) UnsafeCreateBucket(bucket Bucket) {
// MySQL doesn't use buckets, so this is a no-op
t.be.lg.Warn("UnsafeCreateBucket called on MySQL backend", zap.String("bucket", "n/a"))
t.be.lg.Warn("UnsafeCreateBucket called on MySQL backend (no-op)", zap.String("bucket", bucket.String()))
}
func (t *mysqlBatchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
@ -152,7 +225,7 @@ func (t *mysqlBatchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
}
_, err := t.tx.Exec("INSERT INTO kv_store (key, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value = ?", key, value, value)
if err != nil {
t.be.lg.Error("failed to put key-value pair", zap.Error(err))
t.be.lg.Error("failed to put key-value pair", zap.Error(err), zap.Binary("key", key))
}
}
@ -171,29 +244,28 @@ func (t *mysqlBatchTx) UnsafeDelete(bucket Bucket, key []byte) {
}
_, err := t.tx.Exec("DELETE FROM kv_store WHERE key = ?", key)
if err != nil {
t.be.lg.Error("failed to delete key", zap.Error(err))
t.be.lg.Error("failed to delete key", zap.Error(err), zap.Binary("key", key))
}
}
func (t *mysqlBatchTx) UnsafeDeleteBucket(bucket Bucket) {
t.be.lg.Warn("UnsafeDeleteBucket called on MySQL backend", zap.String("bucket", "n/a"))
// No-op for MySQL as it doesn't use buckets
t.be.lg.Warn("UnsafeDeleteBucket called on MySQL backend (no-op)", zap.String("bucket", bucket.String()))
}
func (t *mysqlBatchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
rows, err := t.be.db.Query("SELECT key, value FROM kv_store")
if err != nil {
return err
return fmt.Errorf("failed to query kv_store: %w", err)
}
defer rows.Close()
for rows.Next() {
var k, v []byte
if err := rows.Scan(&k, &v); err != nil {
return err
return fmt.Errorf("failed to scan row: %w", err)
}
if err := visitor(k, v); err != nil {
return err
return fmt.Errorf("visitor function failed: %w", err)
}
}
return rows.Err()
@ -204,7 +276,7 @@ func (t *mysqlBatchTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int6
query := "SELECT key, value FROM kv_store WHERE key >= ? AND key < ? ORDER BY key LIMIT ?"
rows, err := t.be.db.Query(query, key, endKey, limit)
if err != nil {
t.be.lg.Error("failed to query range", zap.Error(err))
t.be.lg.Error("failed to query range", zap.Error(err), zap.Binary("start", key), zap.Binary("end", endKey))
return nil, nil
}
defer rows.Close()
@ -217,6 +289,9 @@ func (t *mysqlBatchTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int6
keys = append(keys, k)
values = append(values, v)
}
if err := rows.Err(); err != nil {
t.be.lg.Error("error during row iteration", zap.Error(err))
}
return keys, values
}
@ -232,7 +307,6 @@ func (t *mysqlBatchTx) Commit() {
func (t *mysqlBatchTx) CommitAndStop() {
t.Commit()
// Additional cleanup if needed
}
func (t *mysqlBatchTx) LockInsideApply() {
@ -246,7 +320,6 @@ func (t *mysqlBatchTx) LockOutsideApply() {
t.be.mu.Lock()
}
// mysqlReadTx implements ReadTx interface
type mysqlReadTx struct {
be *mysqlBackend
}
@ -262,7 +335,7 @@ func (t *mysqlReadTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int64
query := "SELECT key, value FROM kv_store WHERE key >= ? AND key < ? ORDER BY key LIMIT ?"
rows, err := t.be.db.Query(query, key, endKey, limit)
if err != nil {
t.be.lg.Error("failed to query range", zap.Error(err))
t.be.lg.Error("failed to query range", zap.Error(err), zap.Binary("start", key), zap.Binary("end", endKey))
return nil, nil
}
defer rows.Close()
@ -275,51 +348,80 @@ func (t *mysqlReadTx) UnsafeRange(bucket Bucket, key, endKey []byte, limit int64
keys = append(keys, k)
values = append(values, v)
}
if err := rows.Err(); err != nil {
t.be.lg.Error("error during row iteration", zap.Error(err))
}
return keys, values
}
func (t *mysqlReadTx) UnsafeGet(bucket Bucket, key []byte) (value []byte, err error) {
err = t.be.db.QueryRow("SELECT value FROM kv_store WHERE key = ?", key).Scan(&value)
if err == sql.ErrNoRows {
return nil, nil
}
return
}
func (t *mysqlReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
rows, err := t.be.db.Query("SELECT key, value FROM kv_store")
if err != nil {
return err
return fmt.Errorf("failed to query kv_store: %w", err)
}
defer rows.Close()
for rows.Next() {
var k, v []byte
if err := rows.Scan(&k, &v); err != nil {
return err
return fmt.Errorf("failed to scan row: %w", err)
}
if err := visitor(k, v); err != nil {
return err
return fmt.Errorf("visitor function failed: %w", err)
}
}
return rows.Err()
}
// mysqlSnapshot implements Snapshot interface
type mysqlSnapshot struct {
be *mysqlBackend
id string
}
func (s *mysqlSnapshot) Close() error {
// MySQL doesn't require explicit snapshot closing
_, err := s.be.db.Exec(fmt.Sprintf("DROP TABLE IF EXISTS snapshot_%s", s.id))
if err != nil {
return fmt.Errorf("failed to drop snapshot table: %w", err)
}
return nil
}
func (s *mysqlSnapshot) Size() int64 {
return s.be.Size()
var size int64
err := s.be.db.QueryRow(fmt.Sprintf("SELECT SUM(LENGTH(key) + LENGTH(value)) FROM snapshot_%s", s.id)).Scan(&size)
if err != nil {
s.be.lg.Error("failed to get snapshot size", zap.Error(err))
return 0
}
return size
}
func (s *mysqlSnapshot) WriteTo(w io.Writer) (int64, error) {
// Implement snapshot writing logic
return 0, fmt.Errorf("WriteTo not implemented for MySQL snapshot")
rows, err := s.be.db.Query(fmt.Sprintf("SELECT key, value FROM snapshot_%s", s.id))
if err != nil {
return 0, fmt.Errorf("failed to query snapshot: %w", err)
}
defer rows.Close()
var total int64
for rows.Next() {
var key, value []byte
if err := rows.Scan(&key, &value); err != nil {
return total, fmt.Errorf("failed to scan row: %w", err)
}
n, err := w.Write(key)
total += int64(n)
if err != nil {
return total, fmt.Errorf("failed to write key: %w", err)
}
n, err = w.Write(value)
total += int64(n)
if err != nil {
return total, fmt.Errorf("failed to write value: %w", err)
}
}
if err := rows.Err(); err != nil {
return total, fmt.Errorf("error during row iteration: %w", err)
}
return total, nil
}